@@ -24,7 +24,8 @@ Mix.install([
2424 :exla ,
2525 :bumblebee ,
2626 :websockex ,
27- :membrane_simple_rtsp_server
27+ :membrane_simple_rtsp_server ,
28+ {:coerce , github: " ypconstante/elixir-coerce" , override: true }
2829])
2930
3031Nx .global_default_backend (EXLA .Backend )
@@ -60,6 +61,11 @@ unless File.exists?("#{input_dir}/ffmpeg-testsrc.h264") do
6061 File .write! (" #{ input_dir } /ffmpeg-testsrc.h264" , testsrc_h264)
6162end
6263
64+ unless File .exists? (" #{ input_dir } /test-audio.aac" ) do
65+ %{status: 200 , body: testsrc_mp4} = Req .get! (" #{ samples_url } /ffmpeg-testsrc-480x270.mp4" )
66+ File .write! (" #{ input_dir } /ffmpeg-testsrc.mp4" , testsrc_mp4)
67+ end
68+
6369unless File .exists? (" #{ input_dir } /test-audio.aac" ) do
6470 %{status: 200 , body: test_audio} = Req .get! (" #{ samples_url } /test-audio.aac" )
6571 File .write! (" #{ input_dir } /test-audio.aac" , test_audio)
@@ -548,6 +554,133 @@ end)
548554
549555<!-- livebook:{"branch_parent_index":0} -->
550556
557+ ## Compose two streams side by side, broadcast via HLS
558+
559+ To receive the stream, visit http://localhost:1234/hls.html after running the cells below
560+
561+ The first cell uses ` :reader ` and ` :writer ` endpoints to communicate with boombox. In this
562+ configuration the process calling ` Boombox.read/1 ` controls when packets are being provided.
563+
564+ ``` elixir
565+ input1 = " #{ input_dir } /bun.mp4"
566+ input2 = " #{ input_dir } /ffmpeg-testsrc.mp4"
567+ output = " #{ out_dir } /index.m3u8"
568+
569+ reader1 =
570+ Boombox .run (input: input1, output: {:reader , video: :image , audio: false })
571+
572+ reader2 =
573+ Boombox .run (input: input2, output: {:reader , video: :image , audio: false })
574+
575+ writer = Boombox .run (input: {:writer , video: :image , audio: false }, output: output)
576+
577+ Stream .unfold (%{}, fn _state ->
578+ {result1, packet1} = Boombox .read (reader1)
579+ {result2, packet2} = Boombox .read (reader2)
580+
581+ joined_image =
582+ Vix .Vips .Operation .join! (packet1.payload, packet2.payload, :VIPS_DIRECTION_HORIZONTAL )
583+
584+ packet = %Boombox .Packet {
585+ pts: max (packet1.pts, packet2.pts),
586+ payload: joined_image,
587+ kind: :video
588+ }
589+
590+ Boombox .write (writer, packet)
591+
592+ if :finished in [result1, result2] do
593+ nil
594+ else
595+ {nil , %{}}
596+ end
597+ end )
598+ |> Stream .run ()
599+
600+ Boombox .close (writer)
601+ ```
602+
603+ The second cell uses ` :message ` endpoints, meaning that the server communicates with boomboxes by
604+ exchanging messages. A consequence of this is that the inputting boomboxes will control the
605+ pace of providing the packets to the server, what can be useful in some circumstances:
606+
607+ ``` elixir
608+ defmodule MyServer do
609+ use GenServer
610+
611+ def start (args) do
612+ GenServer .start (__MODULE__ , args)
613+ end
614+
615+ @impl true
616+ def init (args) do
617+ bb1 = Boombox .run (input: args.input1, output: {:message , video: :image , audio: false })
618+ bb2 = Boombox .run (input: args.input2, output: {:message , video: :image , audio: false })
619+ output_writer =
620+ Boombox .run (input: {:writer , video: :image , audio: false }, output: args.output)
621+
622+ {:ok ,
623+ %{
624+ input_boomboxes_states: %{
625+ bb1: %{last_packet: nil , eos: false },
626+ bb2: %{last_packet: nil , eos: false }
627+ },
628+ input_boomboxes: %{bb1 => :bb1 , bb2 => :bb2 },
629+ output_writer: output_writer
630+ }}
631+ end
632+
633+ @impl true
634+ def handle_info ({:boombox_packet , bb, packet}, state) do
635+ boombox_id = state.input_boomboxes[bb]
636+ state = put_in (state.input_boomboxes_states[boombox_id].last_packet, packet)
637+
638+ if Enum .all? (Map .values (state.input_boomboxes_states), & (&1 .last_packet != nil )) do
639+ joined_image =
640+ Vix .Vips .Operation .join! (
641+ state.input_boomboxes_states.bb1.last_packet.payload,
642+ state.input_boomboxes_states.bb2.last_packet.payload,
643+ :VIPS_DIRECTION_HORIZONTAL
644+ )
645+
646+ packet = %Boombox .Packet {packet | payload: joined_image}
647+
648+ Boombox .write (state.output_writer, packet)
649+ end
650+
651+ {:noreply , state}
652+ end
653+
654+ @impl true
655+ def handle_info ({:boombox_finished , bb}, state) do
656+ boombox_id = state.input_boomboxes[bb]
657+ state = put_in (state.input_boomboxes_states[boombox_id].eos, true )
658+
659+ if Enum .all? (Map .values (state.input_boomboxes_states), & &1 .eos) do
660+ Boombox .close (state.output_writer)
661+ {:stop , :normal , state}
662+ else
663+ {:noreply , state}
664+ end
665+ end
666+ end
667+
668+ input1 = " #{ input_dir } /bun.mp4"
669+ input2 = " #{ input_dir } /ffmpeg-testsrc.mp4"
670+ output = " #{ out_dir } /index.m3u8"
671+
672+ {:ok , server} = MyServer .start (%{input1: input, input2: input, output: output})
673+ monitor = Process .monitor (server)
674+
675+ receive do
676+ {:DOWN , ^monitor , :process , ^server , reason} ->
677+ IO .inspect (reason)
678+ :ok
679+ end
680+ ```
681+
682+ <!-- livebook:{"branch_parent_index":0} -->
683+
551684## Forward RTMP via WebRTC
552685
553686To receive the stream, visit http://localhost:1234/webrtc_to_browser.html
0 commit comments