[FFmpeg-devel] [PATCH 1/1] libavformat/rtmp: Implements RTMP reconnect feature
Jordi Cenzano
jordi.cenzano at gmail.com
Mon Dec 6 03:30:30 EET 2021
>
> > Nowadays when you are streaming to a live platform if the RTMP(s)
> > server needs to restarted for any reason (ex: deploy new version)
> > the RTMP connection is interrupted (probably after some draining time).
> > Facebook will publish a proposal to avoid that by sending a
> > GoAway message in the RTMP protocol.
> > This code is the reference client implementation of that proposal.
> > AFAIK other big live platforms showed their interest in implementing
> > this mechanism.
> > This can be already tested against Facebook live production using
> > the querystring parameter ?ccr_sec=120 (that indicates the backend
> > to send a disconnect signal after those seconds)
>
> It seems like this approach is operating from the assumption that the
> time to setup a new connection and process all RPCs necessary to send
> media is on the order of normal jitter. Or am I misunderstanding?
>
You are right!
>
> For many services I don't really think that's the case.
>
Fair enough
>
> And even with a very fast publish response we are looking at like 1.5
> RTTs for TCP, another 1.5 for TLS, another 1.5 for RTMP handshake,
> another 1 RTT for RTMP connect, an RTT on createStream, and an RTT on
> publish. That's like 7.5 RTTs (or 300 ms at 40ms RTT) where we are
> leaving the media flow on pause while we are re-building the connection.
>
Yes, and we are aware that is NOT ideal. But we thought it would be better
to add ~400ms of latency (that in most cases the disruption will be hidden
by the player buffer) rather than just drop frames until the next IDR, that
will produce (usually) a much longer disruption since GOPs are usually 2s+
long
>
> This also seems to conflate rebootstrapping a media decode session vs
> re-bootstrapping an RMTP session. The cost of doing this seems to be
> sending your biggest frame after a pause to resolve a bunch of
> synchronous RPCs on a relatively fresh TCP connection.
>
YES, and as I mention we know that is NOT ideal at all.
BTW here we are talking about LIVE streams.
>
> > ---
> > libavformat/rtmppkt.c | 19 +++
> > libavformat/rtmppkt.h | 10 ++
> > libavformat/rtmpproto.c | 356 +++++++++++++++++++++++++++++++++++++---
> > 3 files changed, 359 insertions(+), 26 deletions(-)
> >
> >[...]
> > diff --git a/libavformat/rtmppkt.h b/libavformat/rtmppkt.h
> > index a15d2a5773..cdb901df89 100644
> > --- a/libavformat/rtmppkt.h
> > +++ b/libavformat/rtmppkt.h
> > @@ -59,6 +59,7 @@ typedef enum RTMPPacketType {
> > RTMP_PT_SHARED_OBJ, ///< shared object
> > RTMP_PT_INVOKE, ///< invoke some stream action
> > RTMP_PT_METADATA = 22, ///< FLV metadata
> > + RTMP_PT_GO_AWAY = 32, ///< Indicates please reconnect ASAP,
> server is about to go down
> > } RTMPPacketType;
> >
>
> I'm curious as to why this is a new top level message rather than just
> another
> type 20 command message. Message types have a small address space while
> commands have a large address space and a well chosen command name is
> unlikely
> to conflict with (and therefore can be used in concert with) any other
> protocol
> extensions.
>
We thought about it, and it seemed more logical to add it as a packet type
since it was affecting the RTMP connections itself. If you read the
definition of Command message, that seems more oriented to send data /
actions over the underlying connection.
Thanks a lot for your comments Alex, happy to hear ideas to improve this
proposal. And sorry for my late reply
>
> > [snip]
>
>
> On Sun, Sep 26, 2021 at 1:52 PM Jordi Cenzano <jordi.cenzano at gmail.com>
> wrote:
> >
> > Nowadays when you are streaming to a live platform if the RTMP(s)
> > server needs to restarted for any reason (ex: deploy new version)
> > the RTMP connection is interrupted (probably after some draining time).
> > Facebook will publish a proposal to avoid that by sending a
> > GoAway message in the RTMP protocol.
> > This code is the reference client implementation of that proposal.
> > AFAIK other big live platforms showed their interest in implementing
> > this mechanism.
> > This can be already tested against Facebook live production using
> > the querystring parameter ?ccr_sec=120 (that indicates the backend
> > to send a disconnect signal after those seconds)
> > ---
> > libavformat/rtmppkt.c | 19 +++
> > libavformat/rtmppkt.h | 10 ++
> > libavformat/rtmpproto.c | 356 +++++++++++++++++++++++++++++++++++++---
> > 3 files changed, 359 insertions(+), 26 deletions(-)
> >
> > diff --git a/libavformat/rtmppkt.c b/libavformat/rtmppkt.c
> > index 4b97c0833f..84ec72740d 100644
> > --- a/libavformat/rtmppkt.c
> > +++ b/libavformat/rtmppkt.c
> > @@ -405,6 +405,25 @@ int ff_rtmp_packet_write(URLContext *h, RTMPPacket
> *pkt,
> > return written;
> > }
> >
> > +int ff_rtmp_packet_clone(RTMPPacket *pkt_dst, const RTMPPacket *pkt_src)
> > +{
> > + if (pkt_src->size) {
> > + pkt_dst->data = av_realloc(NULL, pkt_src->size);
> > + if (!pkt_dst->data)
> > + return AVERROR(ENOMEM);
> > + else
> > + memcpy(pkt_dst->data, pkt_src->data, pkt_src->size);
> > + }
> > + pkt_dst->size = pkt_src->size;
> > + pkt_dst->channel_id = pkt_src->channel_id;
> > + pkt_dst->type = pkt_src->type;
> > + pkt_dst->timestamp = pkt_src->timestamp;
> > + pkt_dst->extra = pkt_src->extra;
> > + pkt_dst->ts_field = pkt_src->ts_field;
> > +
> > + return 0;
> > +}
> > +
> > int ff_rtmp_packet_create(RTMPPacket *pkt, int channel_id,
> RTMPPacketType type,
> > int timestamp, int size)
> > {
> > diff --git a/libavformat/rtmppkt.h b/libavformat/rtmppkt.h
> > index a15d2a5773..cdb901df89 100644
> > --- a/libavformat/rtmppkt.h
> > +++ b/libavformat/rtmppkt.h
> > @@ -59,6 +59,7 @@ typedef enum RTMPPacketType {
> > RTMP_PT_SHARED_OBJ, ///< shared object
> > RTMP_PT_INVOKE, ///< invoke some stream action
> > RTMP_PT_METADATA = 22, ///< FLV metadata
> > + RTMP_PT_GO_AWAY = 32, ///< Indicates please reconnect ASAP,
> server is about to go down
> > } RTMPPacketType;
> >
> > /**
> > @@ -99,6 +100,15 @@ typedef struct RTMPPacket {
> > int ff_rtmp_packet_create(RTMPPacket *pkt, int channel_id,
> RTMPPacketType type,
> > int timestamp, int size);
> >
> > +/**
> > + * Clone RTMP packet
> > + *
> > + * @param pkt_dst packet destination
> > + * @param pkt_src packet source
> > + * @return zero on success, negative value otherwise
> > + */
> > +int ff_rtmp_packet_clone(RTMPPacket *pkt_dst, const RTMPPacket
> *pkt_src);
> > +
> > /**
> > * Free RTMP packet.
> > *
> > diff --git a/libavformat/rtmpproto.c b/libavformat/rtmpproto.c
> > index b14d23b919..ea37b9880a 100644
> > --- a/libavformat/rtmpproto.c
> > +++ b/libavformat/rtmpproto.c
> > @@ -124,11 +124,21 @@ typedef struct RTMPContext {
> > int nb_streamid; ///< The next stream id
> to return on createStream calls
> > double duration; ///< Duration of the
> stream in seconds as returned by the server (only valid if non-zero)
> > int tcp_nodelay; ///< Use TCP_NODELAY to
> disable Nagle's algorithm if set to 1
> > + int reconnect_interval; ///< Forces a reconnected
> every Xs (in media time)
> > char username[50];
> > char password[50];
> > char auth_params[500];
> > int do_reconnect;
> > + uint32_t last_reconnect_timestamp;
> > int auth_tried;
> > + int force_reconnection_now;
> > + int go_away_received;
> > + AVDictionary* original_opts;
> > + char original_uri[TCURL_MAX_LENGTH];
> > + int original_flags;
> > + RTMPPacket last_avc_seq_header_pkt; ///< rtmp packet, used to
> save last AVC video header, used on reconnection
> > + RTMPPacket last_aac_seq_header_pkt; ///< rtmp packet, used to
> save last AAC audio header, used on reconnection
> > + RTMPPacket last_metadata_pkt; ///< rtmp packet, used to
> save last onMetadata info, used on reconnection
> > } RTMPContext;
> >
> > #define PLAYER_KEY_OPEN_PART_LEN 30 ///< length of partial key used
> for first client digest signing
> > @@ -224,7 +234,7 @@ static void free_tracked_methods(RTMPContext *rt)
> > rt->nb_tracked_methods = 0;
> > }
> >
> > -static int rtmp_send_packet(RTMPContext *rt, RTMPPacket *pkt, int track)
> > +static int rtmp_send_packet(RTMPContext *rt, RTMPPacket *pkt, int
> track, int destroy)
> > {
> > int ret;
> >
> > @@ -248,7 +258,9 @@ static int rtmp_send_packet(RTMPContext *rt,
> RTMPPacket *pkt, int track)
> > ret = ff_rtmp_packet_write(rt->stream, pkt, rt->out_chunk_size,
> > &rt->prev_pkt[1], &rt->nb_prev_pkt[1]);
> > fail:
> > - ff_rtmp_packet_destroy(pkt);
> > + if (destroy)
> > + ff_rtmp_packet_destroy(pkt);
> > +
> > return ret;
> > }
> >
> > @@ -336,6 +348,9 @@ static int gen_connect(URLContext *s, RTMPContext
> *rt)
> > if (!rt->is_input) {
> > ff_amf_write_field_name(&p, "type");
> > ff_amf_write_string(&p, "nonprivate");
> > + // Indicates accepts goaway
> > + ff_amf_write_field_name(&p, "supportsGoAway");
> > + ff_amf_write_bool(&p, 1);
> > }
> > ff_amf_write_field_name(&p, "flashVer");
> > ff_amf_write_string(&p, rt->flashver);
> > @@ -400,7 +415,7 @@ static int gen_connect(URLContext *s, RTMPContext
> *rt)
> >
> > pkt.size = p - pkt.data;
> >
> > - return rtmp_send_packet(rt, &pkt, 1);
> > + return rtmp_send_packet(rt, &pkt, 1, 1);
> > }
> >
> >
> > @@ -611,7 +626,7 @@ static int gen_release_stream(URLContext *s,
> RTMPContext *rt)
> > ff_amf_write_null(&p);
> > ff_amf_write_string(&p, rt->playpath);
> >
> > - return rtmp_send_packet(rt, &pkt, 1);
> > + return rtmp_send_packet(rt, &pkt, 1, 1);
> > }
> >
> > /**
> > @@ -635,7 +650,7 @@ static int gen_fcpublish_stream(URLContext *s,
> RTMPContext *rt)
> > ff_amf_write_null(&p);
> > ff_amf_write_string(&p, rt->playpath);
> >
> > - return rtmp_send_packet(rt, &pkt, 1);
> > + return rtmp_send_packet(rt, &pkt, 1, 1);
> > }
> >
> > /**
> > @@ -659,7 +674,7 @@ static int gen_fcunpublish_stream(URLContext *s,
> RTMPContext *rt)
> > ff_amf_write_null(&p);
> > ff_amf_write_string(&p, rt->playpath);
> >
> > - return rtmp_send_packet(rt, &pkt, 0);
> > + return rtmp_send_packet(rt, &pkt, 0, 1);
> > }
> >
> > /**
> > @@ -683,7 +698,7 @@ static int gen_create_stream(URLContext *s,
> RTMPContext *rt)
> > ff_amf_write_number(&p, ++rt->nb_invokes);
> > ff_amf_write_null(&p);
> >
> > - return rtmp_send_packet(rt, &pkt, 1);
> > + return rtmp_send_packet(rt, &pkt, 1, 1);
> > }
> >
> >
> > @@ -709,7 +724,7 @@ static int gen_delete_stream(URLContext *s,
> RTMPContext *rt)
> > ff_amf_write_null(&p);
> > ff_amf_write_number(&p, rt->stream_id);
> >
> > - return rtmp_send_packet(rt, &pkt, 0);
> > + return rtmp_send_packet(rt, &pkt, 0, 1);
> > }
> >
> > /**
> > @@ -733,7 +748,7 @@ static int gen_get_stream_length(URLContext *s,
> RTMPContext *rt)
> > ff_amf_write_null(&p);
> > ff_amf_write_string(&p, rt->playpath);
> >
> > - return rtmp_send_packet(rt, &pkt, 1);
> > + return rtmp_send_packet(rt, &pkt, 1, 1);
> > }
> >
> > /**
> > @@ -754,7 +769,7 @@ static int gen_buffer_time(URLContext *s,
> RTMPContext *rt)
> > bytestream_put_be32(&p, rt->stream_id);
> > bytestream_put_be32(&p, rt->client_buffer_time);
> >
> > - return rtmp_send_packet(rt, &pkt, 0);
> > + return rtmp_send_packet(rt, &pkt, 0, 1);
> > }
> >
> > /**
> > @@ -782,7 +797,7 @@ static int gen_play(URLContext *s, RTMPContext *rt)
> > ff_amf_write_string(&p, rt->playpath);
> > ff_amf_write_number(&p, rt->live * 1000);
> >
> > - return rtmp_send_packet(rt, &pkt, 1);
> > + return rtmp_send_packet(rt, &pkt, 1, 1);
> > }
> >
> > static int gen_seek(URLContext *s, RTMPContext *rt, int64_t timestamp)
> > @@ -805,7 +820,7 @@ static int gen_seek(URLContext *s, RTMPContext *rt,
> int64_t timestamp)
> > ff_amf_write_null(&p); //as usual, the first null param
> > ff_amf_write_number(&p, timestamp); //where we want to jump
> >
> > - return rtmp_send_packet(rt, &pkt, 1);
> > + return rtmp_send_packet(rt, &pkt, 1, 1);
> > }
> >
> > /**
> > @@ -832,7 +847,7 @@ static int gen_pause(URLContext *s, RTMPContext *rt,
> int pause, uint32_t timesta
> > ff_amf_write_bool(&p, pause); // pause or unpause
> > ff_amf_write_number(&p, timestamp); //where we pause the stream
> >
> > - return rtmp_send_packet(rt, &pkt, 1);
> > + return rtmp_send_packet(rt, &pkt, 1, 1);
> > }
> >
> > /**
> > @@ -859,7 +874,7 @@ static int gen_publish(URLContext *s, RTMPContext
> *rt)
> > ff_amf_write_string(&p, rt->playpath);
> > ff_amf_write_string(&p, "live");
> >
> > - return rtmp_send_packet(rt, &pkt, 1);
> > + return rtmp_send_packet(rt, &pkt, 1, 1);
> > }
> >
> > /**
> > @@ -885,7 +900,7 @@ static int gen_pong(URLContext *s, RTMPContext *rt,
> RTMPPacket *ppkt)
> > bytestream_put_be16(&p, 7); // PingResponse
> > bytestream_put_be32(&p, AV_RB32(ppkt->data+2));
> >
> > - return rtmp_send_packet(rt, &pkt, 0);
> > + return rtmp_send_packet(rt, &pkt, 0, 1);
> > }
> >
> > /**
> > @@ -906,7 +921,7 @@ static int gen_swf_verification(URLContext *s,
> RTMPContext *rt)
> > bytestream_put_be16(&p, 27);
> > memcpy(p, rt->swfverification, 42);
> >
> > - return rtmp_send_packet(rt, &pkt, 0);
> > + return rtmp_send_packet(rt, &pkt, 0, 1);
> > }
> >
> > /**
> > @@ -925,7 +940,7 @@ static int gen_window_ack_size(URLContext *s,
> RTMPContext *rt)
> > p = pkt.data;
> > bytestream_put_be32(&p, rt->max_sent_unacked);
> >
> > - return rtmp_send_packet(rt, &pkt, 0);
> > + return rtmp_send_packet(rt, &pkt, 0, 1);
> > }
> >
> > /**
> > @@ -946,7 +961,7 @@ static int gen_check_bw(URLContext *s, RTMPContext
> *rt)
> > ff_amf_write_number(&p, ++rt->nb_invokes);
> > ff_amf_write_null(&p);
> >
> > - return rtmp_send_packet(rt, &pkt, 1);
> > + return rtmp_send_packet(rt, &pkt, 1, 1);
> > }
> >
> > /**
> > @@ -965,7 +980,7 @@ static int gen_bytes_read(URLContext *s, RTMPContext
> *rt, uint32_t ts)
> > p = pkt.data;
> > bytestream_put_be32(&p, rt->bytes_read);
> >
> > - return rtmp_send_packet(rt, &pkt, 0);
> > + return rtmp_send_packet(rt, &pkt, 0, 1);
> > }
> >
> > static int gen_fcsubscribe_stream(URLContext *s, RTMPContext *rt,
> > @@ -985,7 +1000,7 @@ static int gen_fcsubscribe_stream(URLContext *s,
> RTMPContext *rt,
> > ff_amf_write_null(&p);
> > ff_amf_write_string(&p, subscribe);
> >
> > - return rtmp_send_packet(rt, &pkt, 1);
> > + return rtmp_send_packet(rt, &pkt, 1, 1);
> > }
> >
> > /**
> > @@ -2153,6 +2168,16 @@ static int handle_invoke_status(URLContext *s,
> RTMPPacket *pkt)
> > return 0;
> > }
> >
> > +static int handle_go_away(URLContext *s, RTMPPacket *pkt) {
> > + RTMPContext *rt = s->priv_data;
> > +
> > + av_log(s, AV_LOG_TRACE, "go away signal received");
> > +
> > + rt->go_away_received = 1;
> > +
> > + return 0;
> > +}
> > +
> > static int handle_invoke(URLContext *s, RTMPPacket *pkt)
> > {
> > RTMPContext *rt = s->priv_data;
> > @@ -2331,6 +2356,10 @@ static int rtmp_parse_result(URLContext *s,
> RTMPContext *rt, RTMPPacket *pkt)
> > if ((ret = handle_invoke(s, pkt)) < 0)
> > return ret;
> > break;
> > + case RTMP_PT_GO_AWAY:
> > + if ((ret = handle_go_away(s, pkt)) < 0)
> > + return ret;
> > + break;
> > case RTMP_PT_VIDEO:
> > case RTMP_PT_AUDIO:
> > case RTMP_PT_METADATA:
> > @@ -2513,6 +2542,15 @@ static int rtmp_close(URLContext *h)
> > free_tracked_methods(rt);
> > av_freep(&rt->flv_data);
> > ffurl_closep(&rt->stream);
> > + if (rt->last_avc_seq_header_pkt.size)
> > + ff_rtmp_packet_destroy(&rt->last_avc_seq_header_pkt);
> > +
> > + if (rt->last_aac_seq_header_pkt.size)
> > + ff_rtmp_packet_destroy(&rt->last_aac_seq_header_pkt);
> > +
> > + if (rt->last_metadata_pkt.size)
> > + ff_rtmp_packet_destroy(&rt->last_metadata_pkt);
> > +
> > return ret;
> > }
> >
> > @@ -2871,14 +2909,23 @@ reconnect:
> > goto fail;
> > }
> > } else {
> > - rt->flv_size = 0;
> > - rt->flv_data = NULL;
> > - rt->flv_off = 0;
> > - rt->skip_bytes = 13;
> > + // Do not clean buffers if it is a forced reconnection
> > + if (rt->force_reconnection_now <= 0) {
> > + rt->flv_size = 0;
> > + rt->flv_data = NULL;
> > + rt->flv_off = 0;
> > + rt->skip_bytes = 13;
> > + }
> > }
> >
> > s->max_packet_size = rt->stream->max_packet_size;
> > s->is_streamed = 1;
> > +
> > + // Copy original params
> > + av_dict_copy(&rt->original_opts, *opts, 0);
> > + rt->original_flags = flags;
> > + av_strlcpy(rt->original_uri, uri, TCURL_MAX_LENGTH);
> > +
> > return 0;
> >
> > fail:
> > @@ -2951,6 +2998,107 @@ static int rtmp_pause(URLContext *s, int pause)
> > return 0;
> > }
> >
> > +/**
> > + * Reconnect RTMP connection.
> > +*/
> > +static int rtmp_reconnect(URLContext *s) {
> > + RTMPContext *rt = s->priv_data;
> > + int i;
> > +
> > + // Close current RTMP connection
> > + av_log(s, AV_LOG_INFO, "reconnecting!\n");
> > +
> > + ffurl_closep(&rt->stream);
> > + rt->do_reconnect = 0;
> > + rt->nb_invokes = 0;
> > + for (i = 0; i < 2; i++)
> > + memset(rt->prev_pkt[i], 0, sizeof(**rt->prev_pkt) *
> rt->nb_prev_pkt[i]);
> > +
> > + free_tracked_methods(rt);
> > +
> > + // Connect RTMP again using orignal values
> > + return rtmp_open(s, rt->original_uri, rt->original_flags,
> &rt->original_opts);
> > +}
> > +
> > +/**
> > + * Checks RTMP packet and return 1 when it contains an AAC header
> > +*/
> > +static int rtmp_packet_is_aac_audio_header(RTMPPacket *pkt) {
> > + uint8_t sound_format;
> > + uint8_t aac_packet_type;
> > +
> > + if ((!pkt) || (pkt->size < 2) || pkt->type != RTMP_PT_AUDIO)
> > + return 0;
> > +
> > + sound_format = (pkt->data[0] & 0xF0) >> 4;
> > + aac_packet_type = pkt->data[1];
> > + // Check codec == AVC and avc contains seq header
> > + if (sound_format == 10 && aac_packet_type == 0)
> > + return 1;
> > +
> > + return 0;
> > +}
> > +
> > +/**
> > + * Checks RTMP packet and return 1 when it contains an AVC header
> > +*/
> > +static int rtmp_packet_is_avc_video_header(RTMPPacket *pkt) {
> > + uint8_t codec_id;
> > + uint8_t avc_packet_type;
> > +
> > + if ((!pkt) || (pkt->size < 2) || pkt->type != RTMP_PT_VIDEO)
> > + return 0;
> > +
> > + codec_id = pkt->data[0] & 0xF;
> > + avc_packet_type = pkt->data[1];
> > + // Check codec == AVC and avc contains seq header
> > + if (codec_id == 7 && avc_packet_type == 0)
> > + return 1;
> > +
> > + return 0;
> > +}
> > +
> > +/**
> > + * Checks RTMP packet and return 1 when it contains video IDR point
> > +*/
> > +static int rtmp_packet_is_video_avc_IDR(RTMPPacket *pkt) {
> > + uint8_t frame_type;
> > + uint8_t codec_id;
> > +
> > + if ((!pkt) || (pkt->size < 1) || pkt->type != RTMP_PT_VIDEO)
> > + return 0;
> > +
> > + frame_type = (pkt->data[0] & 0xF0) >> 4;
> > + codec_id = pkt->data[0] & 0xF;
> > + // Check codec == AVC and videoFrame == Keyframe / seekable
> (assuming that means IDR)
> > + if (codec_id == 7 && frame_type == 1)
> > + return 1;
> > +
> > + return 0;
> > +}
> > +
> > +/**
> > + * Checks RTMP packet and return 1 when it contains onMetadata info
> > +*/
> > +static int rtmp_packet_is_onMetadata_packet(RTMPPacket *pkt) {
> > + uint8_t commandbuffer[64];
> > + int stringlen;
> > + GetByteContext gbc;
> > +
> > + if ((!pkt) || (pkt->size < 10) || pkt->type != RTMP_PT_NOTIFY)
> > + return 0;
> > +
> > + bytestream2_init(&gbc, pkt->data, pkt->size);
> > + if (ff_amf_read_string(&gbc, commandbuffer,
> sizeof(commandbuffer),&stringlen))
> > + return 0;
> > +
> > + // onMetadata is prepended by "@setDataFrame"
> > + if (!strcmp(commandbuffer, "@setDataFrame"))
> > + return 1;
> > +
> > + return 0;
> > +}
> > +
> > static int rtmp_write(URLContext *s, const uint8_t *buf, int size)
> > {
> > RTMPContext *rt = s->priv_data;
> > @@ -2960,6 +3108,8 @@ static int rtmp_write(URLContext *s, const uint8_t
> *buf, int size)
> > const uint8_t *buf_temp = buf;
> > uint8_t c;
> > int ret;
> > + int execute_reconnection = 0;
> > + int is_idr = 0;
> >
> > do {
> > if (rt->skip_bytes) {
> > @@ -2988,8 +3138,13 @@ static int rtmp_write(URLContext *s, const
> uint8_t *buf, int size)
> > bytestream_get_be24(&header);
> > rt->flv_size = pktsize;
> >
> > - if (pkttype == RTMP_PT_VIDEO)
> > + if (pkttype == RTMP_PT_VIDEO) {
> > channel = RTMP_VIDEO_CHANNEL;
> > + rt->has_video = 1;
> > + }
> > + if (pkttype == RTMP_PT_AUDIO) {
> > + rt->has_audio = 1;
> > + }
> >
> > if (((pkttype == RTMP_PT_VIDEO || pkttype == RTMP_PT_AUDIO)
> && ts == 0) ||
> > pkttype == RTMP_PT_NOTIFY) {
> > @@ -3047,7 +3202,155 @@ static int rtmp_write(URLContext *s, const
> uint8_t *buf, int size)
> > }
> > }
> >
> > - if ((ret = rtmp_send_packet(rt, &rt->out_pkt, 0)) < 0)
> > + // Check if a reconnection is required
> > + // Per interval
> > + if ((rt->reconnect_interval > 0) &&
> > + (rt->out_pkt.timestamp >= (rt->last_reconnect_timestamp
> + rt->reconnect_interval * 1000))) {
> > + rt->last_reconnect_timestamp = rt->out_pkt.timestamp;
> > + rt->force_reconnection_now = 1;
> > + av_log(s, AV_LOG_TRACE,
> > + "trigered internal interval reconnection\n");
> > + }
> > + // Per go away signal
> > + if (rt->go_away_received > 0) {
> > + rt->go_away_received = 0;
> > + rt->force_reconnection_now = 1;
> > + av_log(s, AV_LOG_TRACE,
> > + "detected go away signal from the peer\n");
> > + }
> > +
> > + if (rtmp_packet_is_avc_video_header(&rt->out_pkt)) {
> > + // Save last video header
> > + if (rt->last_avc_seq_header_pkt.size) {
> > + av_log(s, AV_LOG_DEBUG,
> > + "freeing last video header packet saved\n");
> > +
> ff_rtmp_packet_destroy(&rt->last_avc_seq_header_pkt);
> > + }
> > + // Save AVC seq header packet
> > + if ((ret =
> ff_rtmp_packet_clone(&rt->last_avc_seq_header_pkt, &rt->out_pkt)) < 0) {
> > + return ret;
> > + }
> > + av_log(s, AV_LOG_DEBUG, "saved video header packet\n");
> > + } else if (rtmp_packet_is_aac_audio_header(&rt->out_pkt)) {
> > + // Save last audio header
> > + if (rt->last_aac_seq_header_pkt.size) {
> > + av_log(s, AV_LOG_DEBUG,
> > + "freeing last audio header packet saved\n");
> > +
> ff_rtmp_packet_destroy(&rt->last_aac_seq_header_pkt);
> > + }
> > + // Save AAC seq header packet
> > + if ((ret =
> ff_rtmp_packet_clone(&rt->last_aac_seq_header_pkt, &rt->out_pkt)) < 0) {
> > + return ret;
> > + }
> > + av_log(s, AV_LOG_DEBUG, "saved audio header packet\n");
> > + } else if (rtmp_packet_is_onMetadata_packet(&rt->out_pkt)) {
> > + // Save last onMetadata packet
> > + if (rt->last_metadata_pkt.size) {
> > + av_log(s, AV_LOG_DEBUG,
> > + "freeing last onMetadata packet saved\n");
> > + ff_rtmp_packet_destroy(&rt->last_metadata_pkt);
> > + }
> > + // Save onMetadata packet
> > + if ((ret = ff_rtmp_packet_clone(&rt->last_metadata_pkt,
> &rt->out_pkt)) < 0) {
> > + return ret;
> > + }
> > + av_log(s, AV_LOG_DEBUG, "saved onMetadata packet\n");
> > + }
> > +
> > + // Reconnection has been requested
> > + if (rt->force_reconnection_now >= 1) {
> > + // Check if packet is video IDR
> > + is_idr = rtmp_packet_is_video_avc_IDR(&rt->out_pkt);
> > + av_log(s, AV_LOG_DEBUG,
> > + "looking for the right disconnect point. Is IDR:
> %d, "
> > + "has_video: %d, has_audio: %d, state: %d, "
> > + "last_avc_seq_header_pkt.size: %d, "
> > + "last_aac_seq_header_pkt.size: %d\n",
> > + is_idr, rt->has_video, rt->has_audio, rt->state,
> > + rt->last_avc_seq_header_pkt.size,
> > + rt->last_aac_seq_header_pkt.size);
> > +
> > + if (rt->has_video && rt->has_audio &&
> > + (rt->state == STATE_PUBLISHING)) {
> > + // If we only video let's do the reconnection in an
> IDR
> > + // frame when we have both headers saved
> > + if (is_idr && rt->last_avc_seq_header_pkt.size &&
> > + rt->last_aac_seq_header_pkt.size)
> > + execute_reconnection = 1;
> > + } else if (rt->has_video && !rt->has_audio &&
> > + (rt->state == STATE_PUBLISHING)) {
> > + // If we have video and NO audio let's do the
> reconnection
> > + // in an IDR frame when we have video header saved
> > + if (is_idr && rt->last_avc_seq_header_pkt.size)
> > + execute_reconnection = 1;
> > + } else if (!rt->has_video &&
> > + rt->has_audio & (rt->state ==
> STATE_PUBLISHING)) {
> > + // If we have only audio let's do the reconnection
> when we
> > + // have the audio header saved
> > + if (rt->last_aac_seq_header_pkt.size)
> > + execute_reconnection = 1;
> > + } else {
> > + av_log(s, AV_LOG_DEBUG,
> > + "reconnection is requested but can NOT be
> executed "
> > + "now, waiting! rt->state: %d, has_video: %d,
> "
> > + "has_audio: %d, is_idr: %d\n",
> > + rt->state, rt->has_video, rt->has_audio,
> is_idr);
> > + }
> > + }
> > +
> > + if (execute_reconnection) {
> > + execute_reconnection = 0;
> > +
> > + av_log(s, AV_LOG_DEBUG,
> > + "executing reconnection. rt->flv_off: %d,
> rt->flv_size: "
> > + "%d\n",
> > + rt->flv_off, rt->flv_size);
> > +
> > + if ((ret = rtmp_reconnect(s)) < 0)
> > + return ret;
> > +
> > + // Reconnect executed, clear the flag
> > + rt->force_reconnection_now = 0;
> > +
> > + av_log(s, AV_LOG_DEBUG,
> > + "reconnected. rt->flv_off: %d, rt->flv_size:
> %d\n",
> > + rt->flv_off, rt->flv_size);
> > +
> > + // Send last video header if it is saved
> > + if (rt->last_avc_seq_header_pkt.size) {
> > + av_log(s, AV_LOG_DEBUG,
> > + "sending last saved video header\n");
> > + rt->last_avc_seq_header_pkt.timestamp =
> > + rt->out_pkt.timestamp;
> > + if ((ret = rtmp_send_packet(
> > + rt, &rt->last_avc_seq_header_pkt, 0, 0)) <
> 0)
> > + return ret;
> > + }
> > +
> > + // Send last audio header if it is saved
> > + if (rt->last_aac_seq_header_pkt.size) {
> > + av_log(s, AV_LOG_DEBUG,
> > + "sending last saved audio header\n");
> > + rt->last_aac_seq_header_pkt.timestamp =
> > + rt->out_pkt.timestamp;
> > + if ((ret = rtmp_send_packet(
> > + rt, &rt->last_aac_seq_header_pkt, 0, 0)) <
> 0)
> > + return ret;
> > + }
> > +
> > + // Send last onMetadata packet, optional
> > + if (rt->last_metadata_pkt.size) {
> > + av_log(s, AV_LOG_DEBUG,
> > + "sending last saved onMetadata header\n");
> > + rt->last_metadata_pkt.timestamp =
> rt->out_pkt.timestamp;
> > + if ((ret = rtmp_send_packet(rt,
> &rt->last_metadata_pkt, 0,
> > + 0)) < 0)
> > + return ret;
> > + }
> > + }
> > +
> > + // Send actual packet
> > + if ((ret = rtmp_send_packet(rt, &rt->out_pkt, 0, 1)) < 0)
> > return ret;
> > rt->flv_size = 0;
> > rt->flv_off = 0;
> > @@ -3118,6 +3421,7 @@ static const AVOption rtmp_options[] = {
> > {"listen", "Listen for incoming rtmp connections",
> OFFSET(listen), AV_OPT_TYPE_INT, {.i64 = 0}, INT_MIN, INT_MAX, DEC,
> "rtmp_listen" },
> > {"tcp_nodelay", "Use TCP_NODELAY to disable Nagle's algorithm",
> OFFSET(tcp_nodelay), AV_OPT_TYPE_INT, {.i64 = 0}, 0, 1, DEC|ENC},
> > {"timeout", "Maximum timeout (in seconds) to wait for incoming
> connections. -1 is infinite. Implies -rtmp_listen 1",
> OFFSET(listen_timeout), AV_OPT_TYPE_INT, {.i64 = -1}, INT_MIN, INT_MAX,
> DEC, "rtmp_listen" },
> > + {"rtmp_reconnect_time", "Interval (in seconds) to force a client
> reconnection, it is based on media time. By default is 0 (no
> reconnection)", OFFSET(reconnect_interval), AV_OPT_TYPE_INT, {.i64 = 0}, 0,
> INT_MAX, ENC },
> > { NULL },
> > };
> >
> > --
> > 2.32.0
> >
> > _______________________________________________
> > ffmpeg-devel mailing list
> > ffmpeg-devel at ffmpeg.org
> > https://ffmpeg.org/mailman/listinfo/ffmpeg-devel
> >
> > To unsubscribe, visit link above, or email
> > ffmpeg-devel-request at ffmpeg.org with subject "unsubscribe".
> _______________________________________________
> ffmpeg-devel mailing list
> ffmpeg-devel at ffmpeg.org
> https://ffmpeg.org/mailman/listinfo/ffmpeg-devel
>
> To unsubscribe, visit link above, or email
> ffmpeg-devel-request at ffmpeg.org with subject "unsubscribe".
>
More information about the ffmpeg-devel
mailing list