1+ /*
2+ * File: 01-remuxing.hpp
3+ *
4+ * Author: Rim Zaydullin
5+ * Repo: https://github.com/tinybit/ffmpeg_code_examples
6+ *
7+ * receive SRT video stream, remux from mpeg-ts to FLV and write to a file
8+ *
9+ */
10+
11+ #include < stdio.h>
12+ #include < stdlib.h>
13+ #include < iostream>
14+ #include < fstream>
15+ #include < sstream>
16+ #include < memory>
17+ #include < vector>
18+ #include < thread>
19+ #include < chrono>
20+ #include < mutex>
21+ #include < atomic>
22+ #include < condition_variable>
23+
24+ extern " C" {
25+ #include < libavcodec/avcodec.h>
26+ #include < libavcodec/codec.h>
27+ #include < libavutil/avutil.h>
28+ #include < libavutil/mem.h>
29+ #include < libavutil/mathematics.h>
30+ #include < libavformat/avio.h>
31+ #include < libavutil/timestamp.h>
32+ #include < libavformat/avformat.h>
33+ }
34+
35+ #include < srt/srt.h>
36+
37+ #include " ring_buffer.hpp"
38+
39+ // predeclarations
40+ int start (int argc, char ** argv);
41+
42+ #ifdef av_err2str
43+ #undef av_err2str
44+ #include < string>
45+ av_always_inline std::string av_err2string (int errnum) {
46+ char str[AV_ERROR_MAX_STRING_SIZE];
47+ return av_make_error_string (str, AV_ERROR_MAX_STRING_SIZE, errnum);
48+ }
49+ #define av_err2str (err ) av_err2string(err).c_str()
50+ #endif // av_err2str
51+
52+ std::mutex buf_mutex;
53+ std::condition_variable cond;
54+ std::atomic<bool > done{false };
55+ std::atomic<bool > done_converting_to_flv{false };
56+ int count_read = 0 ;
57+
58+ std::ofstream wf (" test.flv" , std::ios::out | std::ios::binary);
59+
60+ static int readFunction (void * opaque, uint8_t * buf, int buf_size) {
61+ auto & buff = *reinterpret_cast <RingBuffer*>(opaque);
62+
63+ std::unique_lock<std::mutex> lk (buf_mutex);
64+
65+ // wait for more data arrival
66+ while (buff.size () == 0 ) {
67+ if (done.load ()) {
68+ return 0 ;
69+ }
70+
71+ cond.wait (lk);
72+ }
73+
74+ size_t size = buff.read ((char *)buf, buf_size);
75+ cond.notify_one ();
76+
77+ count_read += size;
78+ return size;
79+ }
80+
81+ static int writeFunction (void * opaque, uint8_t * buf, int buf_size) {
82+ // std::cout << "write flv chunk with size: " << buf_size << std::endl << std::flush;
83+ wf.write ((char *)buf, buf_size);
84+ }
85+
86+ void convert_to_flv (AVIOContext* avio_input_context, AVIOContext* avio_output_context) {
87+ std::cout << " convert_to_flv" << std::endl << std::flush;
88+
89+ AVFormatContext *output_format_context = NULL ;
90+ AVPacket packet;
91+
92+ int ret, i;
93+ int stream_index = 0 ;
94+ int * streams_list = NULL ;
95+ int number_of_streams = 0 ;
96+
97+ AVFormatContext* input_format_context = avformat_alloc_context ();
98+ input_format_context->pb = avio_input_context;
99+ avformat_open_input (&input_format_context, " dummyFilename" , nullptr , nullptr );
100+
101+ if ((ret = avformat_find_stream_info (input_format_context, NULL )) < 0 ) {
102+ fprintf (stderr, " Failed to retrieve input stream information" );
103+ done_converting_to_flv.store (true );
104+ return ;
105+ }
106+
107+ // https://ffmpeg.org/doxygen/trunk/group__lavf__misc.html#gae2645941f2dc779c307eb6314fd39f10
108+ std::cout << " ---------------------------------- INPUT FORMAT ----------------------------" << std::endl << std::flush;
109+ av_dump_format (input_format_context, 0 , NULL , 0 );
110+
111+ avformat_alloc_output_context2 (&output_format_context, NULL , " flv" , " dummyFilename" );
112+ if (!output_format_context) {
113+ fprintf (stderr, " Could not create output context\n " );
114+ ret = AVERROR_UNKNOWN;
115+ done_converting_to_flv.store (true );
116+ return ;
117+ }
118+
119+ output_format_context->pb = avio_output_context;
120+ output_format_context->flags |= AVFMT_FLAG_CUSTOM_IO | AVFMT_NOFILE;
121+
122+ number_of_streams = input_format_context->nb_streams ;
123+ streams_list = (int *)av_mallocz_array (number_of_streams, sizeof (*streams_list));
124+
125+ if (!streams_list) {
126+ ret = AVERROR (ENOMEM);
127+ done_converting_to_flv.store (true );
128+ return ;
129+ }
130+
131+ for (i = 0 ; i < input_format_context->nb_streams ; i++) {
132+ AVStream *out_stream;
133+ AVStream *in_stream = input_format_context->streams [i];
134+ AVCodecParameters *in_codecpar = in_stream->codecpar ;
135+
136+ if (in_codecpar->codec_type != AVMEDIA_TYPE_AUDIO &&
137+ in_codecpar->codec_type != AVMEDIA_TYPE_VIDEO &&
138+ in_codecpar->codec_type != AVMEDIA_TYPE_SUBTITLE) {
139+ streams_list[i] = -1 ;
140+ continue ;
141+ }
142+
143+ streams_list[i] = stream_index++;
144+
145+ out_stream = avformat_new_stream (output_format_context, NULL );
146+ if (!out_stream) {
147+ fprintf (stderr, " Failed allocating output stream\n " );
148+ ret = AVERROR_UNKNOWN;
149+ done_converting_to_flv.store (true );
150+ return ;
151+ }
152+
153+ ret = avcodec_parameters_copy (out_stream->codecpar , in_codecpar);
154+ if (ret < 0 ) {
155+ fprintf (stderr, " Failed to copy codec parameters\n " );
156+ done_converting_to_flv.store (true );
157+ return ;
158+ }
159+
160+ out_stream->codecpar ->codec_tag = 0 ;
161+ }
162+
163+ std::cout << " ---------------------------------- OUTPUT FORMAT ----------------------------" << std::endl << std::flush;
164+ av_dump_format (output_format_context, 0 , NULL , 1 );
165+
166+ // // unless it's a no file (we'll talk later about that) write to the disk (FLAG_WRITE)
167+ // // but basically it's a way to save the file to a buffer so you can store it
168+ // // wherever you want.
169+ // if (!(output_format_context->oformat->flags & AVFMT_NOFILE)) {
170+ // ret = avio_open(&output_format_context->pb, "test.flv", AVIO_FLAG_WRITE);
171+ // if (ret < 0) {
172+ // fprintf(stderr, "Could not open output file '%s'", "test.flv");
173+ // done_converting_to_flv.store(true);
174+ // return;
175+ // }
176+ // }
177+
178+ // https://ffmpeg.org/doxygen/trunk/group__lavf__encoding.html#ga18b7b10bb5b94c4842de18166bc677cb
179+ ret = avformat_write_header (output_format_context, NULL );
180+ if (ret < 0 ) {
181+ fprintf (stderr, " Error occurred when opening output file\n " );
182+ goto end;
183+ }
184+
185+ while (!done.load ()) {
186+ AVStream *in_stream, *out_stream;
187+ ret = av_read_frame (input_format_context, &packet);
188+ if (ret < 0 ) {
189+ break ;
190+ }
191+
192+ in_stream = input_format_context->streams [packet.stream_index ];
193+ if (packet.stream_index >= number_of_streams || streams_list[packet.stream_index ] < 0 ) {
194+ av_packet_unref (&packet);
195+ continue ;
196+ }
197+
198+ packet.stream_index = streams_list[packet.stream_index ];
199+ out_stream = output_format_context->streams [packet.stream_index ];
200+
201+ /* copy packet */
202+ packet.pts = av_rescale_q_rnd (packet.pts , in_stream->time_base , out_stream->time_base , AVRounding (AV_ROUND_NEAR_INF|AV_ROUND_PASS_MINMAX));
203+ packet.dts = av_rescale_q_rnd (packet.dts , in_stream->time_base , out_stream->time_base , AVRounding (AV_ROUND_NEAR_INF|AV_ROUND_PASS_MINMAX));
204+ packet.duration = av_rescale_q (packet.duration , in_stream->time_base , out_stream->time_base );
205+
206+ // https://ffmpeg.org/doxygen/trunk/structAVPacket.html#ab5793d8195cf4789dfb3913b7a693903
207+ packet.pos = -1 ;
208+
209+ // https://ffmpeg.org/doxygen/trunk/group__lavf__encoding.html#ga37352ed2c63493c38219d935e71db6c1
210+ ret = av_interleaved_write_frame (output_format_context, &packet);
211+ if (ret < 0 ) {
212+ fprintf (stderr, " Error muxing packet\n " );
213+ break ;
214+ }
215+
216+ av_packet_unref (&packet);
217+ }
218+
219+ // https://ffmpeg.org/doxygen/trunk/group__lavf__encoding.html#ga7f14007e7dc8f481f054b21614dfec13
220+ av_write_trailer (output_format_context);
221+
222+ end:
223+ avformat_close_input (&input_format_context);
224+
225+ /* close output */
226+ if (output_format_context && !(output_format_context->oformat ->flags & AVFMT_NOFILE)) {
227+ avio_closep (&output_format_context->pb );
228+ }
229+
230+ avformat_free_context (output_format_context);
231+ av_freep (&streams_list);
232+
233+ if (ret < 0 && ret != AVERROR_EOF) {
234+ fprintf (stderr, " Error occurred: %s\n " , av_err2str (ret));
235+ }
236+
237+ done_converting_to_flv.store (true );
238+ }
239+
240+ int ss, st;
241+
242+ int main (int argc, char **argv) {
243+ RingBuffer buff (40960 );
244+
245+ unsigned char * in_buffer = (unsigned char *)(av_malloc (8192 ));
246+ AVIOContext* avio_input_context = avio_alloc_context (
247+ in_buffer,
248+ 8192 ,
249+ 0 ,
250+ reinterpret_cast <void *>(static_cast <RingBuffer*>(&buff)),
251+ &readFunction, nullptr , nullptr );
252+
253+ unsigned char * out_buffer = (unsigned char *)(av_malloc (8192 ));
254+ AVIOContext* avio_output_context = avio_alloc_context (
255+ out_buffer,
256+ 8192 ,
257+ 1 ,
258+ reinterpret_cast <void *>(static_cast <RingBuffer*>(&buff)),
259+ nullptr , &writeFunction, nullptr );
260+
261+
262+ int their_fd = start (argc, argv);
263+ int count = 0 ;
264+
265+ std::thread flv_thread (convert_to_flv, avio_input_context, avio_output_context);
266+
267+ int i;
268+ for (i = 0 ; i < 20000 ; i++) {
269+ char msg[2048 ];
270+ st = srt_recvmsg (their_fd, msg, sizeof msg);
271+
272+ if (st == SRT_ERROR) {
273+ break ;
274+ }
275+
276+ count += st;
277+
278+ if (!done_converting_to_flv.load ()) {
279+ std::unique_lock<std::mutex> lk (buf_mutex);
280+
281+ // wait for available free space in ring buffer
282+ while (buff.avail () < st) {
283+ if (done.load ()) {
284+ break ;
285+ }
286+
287+ cond.wait (lk);
288+ }
289+
290+ buff.write (msg, st);
291+ cond.notify_one ();
292+ }
293+ }
294+
295+ std::cout << " DONE\n " ;
296+ wf.close ();
297+
298+ done.store (true );
299+ std::this_thread::sleep_for (std::chrono::milliseconds (100 ));
300+
301+ cond.notify_all ();
302+ flv_thread.join ();
303+
304+ std::cout << " RECEIVED " << count << " bytes.\n " << std::flush;
305+ std::cout << " READ " << count_read << " bytes for avformat.\n " << std::flush;
306+
307+ av_free (avio_input_context);
308+ // av_free(avio_output_context);
309+
310+ return 0 ;
311+ }
312+
313+ int start (int argc, char ** argv) {
314+ struct sockaddr_in sa;
315+ int yes = 1 ;
316+ struct sockaddr_storage their_addr;
317+
318+ if (argc != 3 ) {
319+ fprintf (stderr, " Usage: %s <host> <port>\n " , argv[0 ]);
320+ return 1 ;
321+ }
322+
323+ printf (" srt startup\n " );
324+ srt_startup ();
325+
326+ printf (" srt socket\n " );
327+ ss = srt_create_socket ();
328+ if (ss == SRT_ERROR) {
329+ fprintf (stderr, " srt_socket: %s\n " , srt_getlasterror_str ());
330+ return 1 ;
331+ }
332+
333+ printf (" srt bind address\n " );
334+ sa.sin_family = AF_INET;
335+ sa.sin_port = htons (atoi (argv[2 ]));
336+ if (inet_pton (AF_INET, argv[1 ], &sa.sin_addr ) != 1 ) {
337+ return 1 ;
338+ }
339+
340+ printf (" srt setsockflag\n " );
341+ srt_setsockflag (ss, SRTO_RCVSYN, &yes, sizeof yes);
342+
343+ printf (" srt bind\n " );
344+ st = srt_bind (ss, (struct sockaddr *)&sa, sizeof sa);
345+ if (st == SRT_ERROR) {
346+ fprintf (stderr, " srt_bind: %s\n " , srt_getlasterror_str ());
347+ return 1 ;
348+ }
349+
350+ printf (" srt listen\n " );
351+ st = srt_listen (ss, 2 );
352+ if (st == SRT_ERROR) {
353+ fprintf (stderr, " srt_listen: %s\n " , srt_getlasterror_str ());
354+ return 1 ;
355+ }
356+
357+ printf (" srt accept\n " );
358+ int addr_size = sizeof their_addr;
359+ return srt_accept (ss, (struct sockaddr *)&their_addr, &addr_size);
360+ }
361+
362+ int stop () {
363+ // outdata.close();
364+ printf (" srt close\n " );
365+
366+ st = srt_close (ss);
367+ if (st == SRT_ERROR)
368+ {
369+ fprintf (stderr, " srt_close: %s\n " , srt_getlasterror_str ());
370+ return 1 ;
371+ }
372+
373+ printf (" srt cleanup\n " );
374+ srt_cleanup ();
375+ }
0 commit comments