GStreamer基础教程08——pipeline的快捷访问

目标

GStreamer建立的pipeline不需要完全关闭
​有多种方法可以让数据在任何时候送到pipeline中或者从pipeline中取出。
​本教程会展示:

如何把外部数据送到pipeline中

如何把数据从pipeline中取出

如何操作这些数据

介绍

有几种方法可以让应用通过pipeline和数据流交互。
​本教程讲述了最简单的一种,因为使用了专门为这个而创建的element。

appsrc,专门让应用可以往pipeline里面传入数据的element
​(https://www.freedesktop.org/software/gstreamer-sdk/data/docs/2012.5/gst-plugins-base-libs-0.10/gst-plugins-base-libs-appsrc.html#appsrc),
​appsink,就正好相反,让应用可以从pipeline中获得数据。

​为了避免混淆,我们可以这么来理解,
​appsrc是一个普通的source element,不过它的数据都是来自外太空,
​appsink是一个普通的sink element,数据从这里出去的就消失不见了。


​appsrc和appsink用得非常多,所以他们都自己提供API,你只要连接了gstreamer-app库,那么就可以访问到。
​在本教程里,我们会使用一种简单地方法通过信号来实现。


​appsrc可以有不同的工作模式:
​ 在pull模式,在需要时向应用请求数据;
​ 在push模式,应用根据自己的节奏把数据推送过来。

​而且,在push模式,如果已经有了足够的数据,应用可以在push时被阻塞,或者可以经由enough-data和need-data信号来控制。
​本教程中得例子就采用了这种信号控制的方式,其他没有提及的方法可以在appsrc的文档中查阅。


Buffers

通过pipeline传递的大块数据被称为buffers。
​因为本例子会制造数据同时也消耗数据,所以我们需要了解GstBuffer。

Source Pads负责制造buffer,这些buffer被sink pad消耗掉。GStreamer在一个个element之间传递这些buffer。

一个buffer只能简单地描述一小片数据,不要认为我们所有的buffer都是一样大小的。
而且,buffer有一个时间戳和有效期,这个就描述了什么时候buffer里的数据需要渲染出来。

​时间戳是个非常复杂和精深的话题,但目前这个简单地解释也足够了。

作为一个例子,一个filesrc会提供“ANY”属性的buffers并且没有时间戳信息。
​在demux(《GStreamer基础教程03——动态pipeline》)之后,buffers会有一些特定的cap了,比如"video/x-h264",
​在解码后,每一个buffer都会包含一帧有原始caps的视频帧(比如:video/x-raw-yuv),
​并且有非常明确地时间戳用来指示这一帧在什么时候显示

教程

本教程是上一篇教程(《GStreamer基础教程07——多线程和Pad的有效性》)在两个方面的扩展:
​第一是用appsrc来取代audiotestsrc来生成音频数据;
​第二是在tee里新加了一个分支,这样流入audio sink和波形显示的数据同样复制了一份传给appsink。
​这个appsink就把信息回传给应用,应用就可以通知用户收到了数据或者做其他更复杂的工作。

一个粗糙的波形发生器


[objc] view plain copy
  1. #include<gst/gst.h>
  2. #include<string.h>
  3. #defineCHUNK_SIZE1024/*Amountofbyteswearesendingineachbuffer*/
  4. #definesAMPLE_RATE44100/*Samplespersecondwearesending*/
  5. #defineAUdio_CAPS"audio/x-raw-int,channels=1,rate=%d,signed=(boolean)true,width=16,depth=16,endianness=BYTE_ORDER"
  6. /*Structuretocontainallourinformation,sowecanpassittocallbacks*/
  7. typedefstruct_CustomData{
  8. GstElement*pipeline,*app_source,*tee,*audio_queue,*audio_convert1,*audio_resample,*audio_sink;
  9. GstElement*video_queue,*audio_convert2,*visual,*video_convert,*video_sink;
  10. GstElement*app_queue,*app_sink;
  11. guint64num_samples;/*Numberofsamplesgeneratedsofar(fortimestampgeneration)*/
  12. gfloata,b,c,d;/*Forwaveformgeneration*/
  13. guintsourceid;/*TocontroltheGSource*/
  14. GMainLoop*main_loop;/*GLib'sMainLooP*/
  15. }CustomData;
  16. /*ThismethodiscalledbytheidleGSourceinthemainloop,toFeedCHUNK_SIZEbytesintoappsrc.
  17. *Theidlehandlerisaddedtothemainloopwhenappsrcrequestsustostartsendingdata(need-datasignal)
  18. *andisremovedwhenappsrchasenoughdata(enough-datasignal).
  19. */
  20. staticgbooleanpush_data(CustomData*data){
  21. GstBuffer*buffer;
  22. GstFlowReturnret;
  23. inti;
  24. gint16*raw;
  25. gintnum_samples=CHUNK_SIZE/2;/*Becauseeachsampleis16bits*/
  26. gfloatfreq;
  27. /*Createanewemptybuffer*/
  28. buffer=gst_buffer_new_and_alloc(CHUNK_SIZE);
  29. /*Setitstimestampandduration*/
  30. GST_BUFFER_TIMESTAMP(buffer)=gst_util_uint64_scale(data->num_samples,GST_SECOND,SAMPLE_RATE);
  31. GST_BUFFER_DURATION(buffer)=gst_util_uint64_scale(CHUNK_SIZE,SAMPLE_RATE);
  32. /*Generatesomepsychodelicwaveforms*/
  33. raw=(gint16*)GST_BUFFER_DATA(buffer);
  34. data->c+=data->d;
  35. data->d-=data->c/1000;
  36. freq=1100+11000*data->d;
  37. for(i=0;i<num_samples;i++){
  38. data->a+=data->b;
  39. data->b-=data->a/freq;
  40. raw[i]=(gint16)(5500*data->a);
  41. }
  42. data->num_samples+=num_samples;
  43. /*Pushthebufferintotheappsrc*/
  44. g_signal_emit_by_name(data->app_source,"push-buffer",buffer,&ret);
  45. /*FreethebufferNowthatwearedonewithit*/
  46. gst_buffer_unref(buffer);
  47. if(ret!=GST_FLOW_OK){
  48. /*Wegotsomeerror,stopsendingdata*/
  49. returnFALSE;
  50. returnTRUE;
  51. }
  52. /*Thissignalcallbacktriggerswhenappsrcneedsdata.Here,weaddanidlehandler
  53. *tothemainlooptostartpushingdataintotheappsrc*/
  54. staticvoidstart_Feed(GstElement*source,guintsize,CustomData*data){
  55. if(data->sourceid==0){
  56. g_print("StartFeeding\n");
  57. data->sourceid=g_idle_add((GSourceFunc)push_data,data);
  58. /*Thiscallbacktriggerswhenappsrchasenoughdataandwecanstopsending.
  59. *WeremovetheidlehandlerfromthemainlooP*/
  60. voidstop_Feed(if(data->sourceid!=0){
  61. g_print("StopFeeding\n");
  62. g_source_remove(data->sourceid);
  63. data->sourceid=0;
  64. /*Theappsinkhasreceivedabuffer*/
  65. voidnew_buffer(GstElement*sink,153); font-weight:bold; background-color:inherit">GstBuffer*buffer;
  66. /*Retrievethebuffer*/
  67. g_signal_emit_by_name(sink,"pull-buffer",&buffer);
  68. if(buffer){
  69. /*Theonlythingwedointhisexampleisprinta*toindicateareceivedbuffer*/
  70. g_print("*");
  71. /*Thisfunctioniscalledwhenanerrormessageispostedonthebus*/
  72. voiderror_cb(GstBus*bus,153); font-weight:bold; background-color:inherit">GstMessage*msg,153); font-weight:bold; background-color:inherit">GError*err;
  73. gchar*debug_info;
  74. /*Printerrordetailsonthescreen*/
  75. gst_message_parse_error(msg,&err,&debug_info);
  76. g_printerr("ErrorreceivedfromElement%s:%s\n",GST_OBJECT_NAME(msg->src),err->message);
  77. g_printerr("Debugginginformation:%s\n",debug_info?debug_info:"none");
  78. g_clear_error(&err);
  79. g_free(debug_info);
  80. g_main_loop_quit(data->main_loop);
  81. intmain(intargc,153); font-weight:bold; background-color:inherit">charchar*argv[]){
  82. CustomDatadata;
  83. GstPadTemplate*tee_src_pad_template;
  84. GstPad*tee_audio_pad,*tee_video_pad,*tee_app_pad;
  85. GstPad*queue_audio_pad,*queue_video_pad,*queue_app_pad;
  86. gchar*audio_caps_text;
  87. GstCaps*audio_caps;
  88. GstBus*bus;
  89. /*Initializecumstomdatastructure*/
  90. memset(&data,0,153); font-weight:bold; background-color:inherit">sizeof(data));
  91. data.b=1; data.d=1;
  92. /*InitializeGStreamer*/
  93. gst_init(&argc,&argv);
  94. /*Createtheelements*/
  95. data.app_source=gst_element_factory_make("appsrc","audio_source");
  96. data.tee=gst_element_factory_make("tee","tee");
  97. data.audio_queue=gst_element_factory_make("queue","audio_queue");
  98. data.audio_convert1=gst_element_factory_make("audioconvert","audio_convert1");
  99. data.audio_resample=gst_element_factory_make("audioresample","audio_resample");
  100. data.audio_sink=gst_element_factory_make("autoaudiosink","audio_sink");
  101. data.video_queue=gst_element_factory_make("queue","video_queue");
  102. data.audio_convert2=gst_element_factory_make("audioconvert","audio_convert2");
  103. data.visual=gst_element_factory_make("wavescope","visual");
  104. data.video_convert=gst_element_factory_make("ffmpegcolorspace","csp");
  105. data.video_sink=gst_element_factory_make("autovideosink","video_sink");
  106. data.app_queue=gst_element_factory_make("queue","app_queue");
  107. data.app_sink=gst_element_factory_make("appsink","app_sink");
  108. /*Createtheemptypipeline*/
  109. data.pipeline=gst_pipeline_new("test-pipeline");
  110. if(!data.pipeline||!data.app_source||!data.tee||!data.audio_queue||!data.audio_convert1||
  111. !data.audio_resample||!data.audio_sink||!data.video_queue||!data.audio_convert2||!data.visual||
  112. !data.video_convert||!data.video_sink||!data.app_queue||!data.app_sink){
  113. g_printerr("NotallelementsCouldbecreated.\n");
  114. return-1;
  115. /*Configurewavescope*/
  116. g_object_set(data.visual,"shader",0,"style",153); font-weight:bold; background-color:inherit">NULL);
  117. /*Configureappsrc*/
  118. audio_caps_text=g_strdup_printf(AUdio_CAPS,SAMPLE_RATE);
  119. audio_caps=gst_caps_from_string(audio_caps_text);
  120. g_object_set(data.app_source,"caps",audio_caps,153); font-weight:bold; background-color:inherit">NULL);
  121. g_signal_connect(data.app_source,"need-data",G_CALLBACK(start_Feed),&data);
  122. g_signal_connect(data.app_source,"enough-data",G_CALLBACK(stop_Feed),&data);
  123. /*Configureappsink*/
  124. g_object_set(data.app_sink,"emit-signals",TRUE,248)"> g_signal_connect(data.app_sink,"new-buffer",G_CALLBACK(new_buffer),108); list-style:decimal-leading-zero outside; color:inherit; line-height:18px; margin:0px!important; padding:0px 3px 0px 10px!important"> gst_caps_unref(audio_caps);
  125. g_free(audio_caps_text);
  126. /*Linkallelementsthatcanbeautomaticallylinkedbecausetheyhave"Always"pads*/
  127. gst_bin_add_many(GST_BIN(data.pipeline),data.app_source,data.tee,data.audio_queue,data.audio_convert1,data.audio_resample,
  128. data.audio_sink,data.video_queue,data.audio_convert2,data.visual,data.video_convert,data.video_sink,data.app_queue,
  129. data.app_sink,153); font-weight:bold; background-color:inherit">if(gst_element_link_many(data.app_source,153); font-weight:bold; background-color:inherit">NULL)!=TRUE||
  130. gst_element_link_many(data.audio_queue,data.audio_sink,153); font-weight:bold; background-color:inherit">NULL)!=TRUE||
  131. gst_element_link_many(data.video_queue,108); list-style:decimal-leading-zero outside; color:inherit; line-height:18px; margin:0px!important; padding:0px 3px 0px 10px!important"> gst_element_link_many(data.app_queue,data.app_sink,153); font-weight:bold; background-color:inherit">NULL)!=TRUE){
  132. g_printerr("ElementsCouldnotbelinked.\n");
  133. gst_object_unref(data.pipeline);
  134. return-1;
  135. /*ManuallylinktheTee,whichhas"Request"pads*/
  136. tee_src_pad_template=gst_element_class_get_pad_template(GST_ELEMENT_GET_CLASS(data.tee),"src%d");
  137. tee_audio_pad=gst_element_request_pad(data.tee,tee_src_pad_template,153); font-weight:bold; background-color:inherit">NULL,248)"> g_print("Obtainedrequestpad%sforaudiobranch.\n",gst_pad_get_name(tee_audio_pad));
  138. queue_audio_pad=gst_element_get_static_pad(data.audio_queue,"sink");
  139. tee_video_pad=gst_element_request_pad(data.tee,108); list-style:decimal-leading-zero outside; color:inherit; line-height:18px; margin:0px!important; padding:0px 3px 0px 10px!important"> g_print("Obtainedrequestpad%sforvideobranch.\n",gst_pad_get_name(tee_video_pad));
  140. queue_video_pad=gst_element_get_static_pad(data.video_queue,"sink");
  141. tee_app_pad=gst_element_request_pad(data.tee,248)"> g_print("Obtainedrequestpad%sforappbranch.\n",gst_pad_get_name(tee_app_pad));
  142. queue_app_pad=gst_element_get_static_pad(data.app_queue,153); font-weight:bold; background-color:inherit">if(gst_pad_link(tee_audio_pad,queue_audio_pad)!=GST_PAD_LINK_OK||
  143. gst_pad_link(tee_video_pad,queue_video_pad)!=GST_PAD_LINK_OK||
  144. gst_pad_link(tee_app_pad,queue_app_pad)!=GST_PAD_LINK_OK){
  145. g_printerr("TeeCouldnotbelinked\n");
  146. gst_object_unref(data.pipeline);
  147. gst_object_unref(queue_audio_pad);
  148. gst_object_unref(queue_video_pad);
  149. gst_object_unref(queue_app_pad);
  150. /*Instructthebustoemitsignalsforeachreceivedmessage,andconnecttotheinterestingsignals*/
  151. bus=gst_element_get_bus(data.pipeline);
  152. gst_bus_add_signal_watch(bus);
  153. g_signal_connect(G_OBJECT(bus),"message::error",(GCallback)error_cb,108); list-style:decimal-leading-zero outside; color:inherit; line-height:18px; margin:0px!important; padding:0px 3px 0px 10px!important"> gst_object_unref(bus);
  154. /*Startplayingthepipeline*/
  155. gst_element_set_state(data.pipeline,GST_STATE_PLAYING);
  156. /*CreateaGlibmainLoopandsetittorun*/
  157. data.main_loop=g_main_loop_new( g_main_loop_run(data.main_loop);
  158. /*ReleasetherequestpadsfromtheTee,andunrefthem*/
  159. gst_element_release_request_pad(data.tee,tee_audio_pad);
  160. gst_element_release_request_pad(data.tee,tee_video_pad);
  161. gst_object_unref(tee_audio_pad);
  162. gst_object_unref(tee_video_pad);
  163. gst_object_unref(tee_app_pad);
  164. /*Freeresources*/
  165. gst_element_set_state(data.pipeline,GST_STATE_NULL);
  166. return0;
  167. }

工作流程

创建pipeline段的代码就是上一篇的教程中得例子的扩大版。
包括初始或所有的element,连接有Always Pad的element然后手动连接tee element的Request Pad。

下面我们关注一下appsrc和appsink这两个element的配置:

关于appsink的配置,我们连接了new-buffer的信号,这个信号在每次收到buffer的时候发出。
​当然,这个信号的发出需要emit-signals这个信号属性被开启(默认是关闭的)。
启动pipeline,等到消息和最后的清理资源都和以前的没什么区别。让我们关注我们刚刚注册的回调吧。

这个函数在appsrc内部队列将要空的时候调用,在这里我们做的事情仅仅是用g_idle_add()方法注册一个GLib的idle函数,
​这个函数会给appsrc输入数据知道内部队列满为止。
​一个GLib的idle函数是一个GLib在主循环在“idle”时会调用的方法,也就是说,当时没有更高优先级的任务运行。
这只是appsrc多种发出数据方法中的一个。
​特别需要指出的是,buffer不是必须要在主线程中用GLib方法来传递给appsrc的,
​你也不是一定要用need-data和enough-data信号来同步appsrc的(据说这样最方便)。

我们记录下g_idle_add()的返回的sourceid,这样后面可以关掉它。

copy

/*Thiscallbacktriggerswhenappsrchasenoughdataandwecanstopsending.
  • *Weremovetheidlehandlerfromthemainloop*/
  • if(data->sourceid!=0){
  • g_print("Stopfeeding\n");
  • g_source_remove(data->sourceid);
  • data->sourceid=0;
  • }

  • ​这个函数当appsrc内部的队列满的时候调用,所以我们需要停止发送数据。这里我们简单地用g_source_remove()来把idle函数移走。
    这个函数给appsrc发送数据。它被GLib调用的次数和频率我们不加以控制,但我们会在它任务完成时关闭它(appsrc内部队列满)。 这里第一步是用gst_buffer_new_and_alloc()方法和给定的大小创建一个新buffer(例子中是1024字节)。

    我们计算我们生成的采样数据的数据量,把数据存在CustomData.num_samples里面,
    ​这样我们可以用GstBuffer提供的GST_BUFFER_TIMESTAMP宏来生成buffer的时间戳。

    gst_util_uint64_scale是一个工具函数,用来缩放数据,确保不会溢出。

    这些给buffer的数据可以用GstBuffer提供的GST_BUFFER_DATA宏来访问。

    我们会跳过波形的生成部分,因为这不是本教程要讲述的内容。

    copy

    gst_buffer_unref(buffer);
    一旦我们的buffer已经准备好,我们把带着这个buffer的push-buffer信号传给appsrc,
    ​然后就调用gst_buffer_unref()方法,因为我们不会再用到它了。
    copy
    /*Theappsinkhasreceivedabuffer*/
  • /*Retrievethebuffer*/
  • g_signal_emit_by_name(sink,&buffer);
  • if(buffer){
  • /*Theonlythingwedointhisexampleisprinta*toindicateareceivedbuffer*/
  • g_print("*");
  • gst_buffer_unref(buffer);
  • 最后,这个函数在appsink收到buffer时被调用
    ​我们使用了pull-buffer的信号来重新获得buffer,因为是例子,所以仅仅在屏幕上打印一些内容
    ​我们可以用GstBuffer的GST_BUFFER_DATA宏来获得数据指针和用GST_BUFFER_SIZE宏来获得数据大小。
    ​请记住,这里的buffer不是一定要和我们在push_data函数里面创建的buffer完全一致的,
    ​在传输路径上得任何一个element都可能对buffer进行一些改变。
    ​(这个例子中仅仅是在appsrc和appsink中间通过一个tee element,所以buffer没有变化)。 请不要忘记调用gst_buffer_unref()来释放buffer,就讲这么多吧。

  • 相关文章

    迭代器模式(Iterator)迭代器模式(Iterator)[Cursor]意图...
    高性能IO模型浅析服务器端编程经常需要构造高性能的IO模型,...
    策略模式(Strategy)策略模式(Strategy)[Policy]意图:定...
    访问者模式(Visitor)访问者模式(Visitor)意图:表示一个...
    命令模式(Command)命令模式(Command)[Action/Transactio...
    生成器模式(Builder)生成器模式(Builder)意图:将一个对...