Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 139 additions & 2 deletions examples.livemd
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ System.put_env("PATH", "/opt/homebrew/bin:#{System.get_env("PATH")}")
# Examples that don't mention them should still work.

# MIX_INSTALL_CONFIG_BEGIN
boombox = {:boombox, github: "membraneframework/boombox"}
boombox = {:boombox, github: "membraneframework/boombox", branch: "refactor-elixir-endpoints"}

# This livebook uses boombox from the master branch. If any examples happen to not work, the latest stable version of this livebook
# can be found on https://hexdocs.pm/boombox/examples.html or in the latest github release.
Expand All @@ -24,7 +24,8 @@ Mix.install([
:exla,
:bumblebee,
:websockex,
:membrane_simple_rtsp_server
:membrane_simple_rtsp_server,
{:coerce, ">= 1.0.2"}
])

Nx.global_default_backend(EXLA.Backend)
Expand Down Expand Up @@ -60,6 +61,11 @@ unless File.exists?("#{input_dir}/ffmpeg-testsrc.h264") do
File.write!("#{input_dir}/ffmpeg-testsrc.h264", testsrc_h264)
end

unless File.exists?("#{input_dir}/ffmpeg-testsrc.mp4") do
%{status: 200, body: testsrc_mp4} = Req.get!("#{samples_url}/ffmpeg-testsrc-480x270.mp4")
File.write!("#{input_dir}/ffmpeg-testsrc.mp4", testsrc_mp4)
end

unless File.exists?("#{input_dir}/test-audio.aac") do
%{status: 200, body: test_audio} = Req.get!("#{samples_url}/test-audio.aac")
File.write!("#{input_dir}/test-audio.aac", test_audio)
Expand Down Expand Up @@ -548,6 +554,137 @@ end)

<!-- livebook:{"branch_parent_index":0} -->

## Compose two streams side by side, broadcast via HLS

To receive the stream, visit http://localhost:1234/hls.html after running the cells below

The first cell uses `:reader` and `:writer` endpoints to communicate with boombox. In this
configuration the process calling `Boombox.read/1` controls when packets are being provided.

```elixir
input1 = "#{input_dir}/bun.mp4"
input2 = "#{input_dir}/ffmpeg-testsrc.mp4"
output = "#{out_dir}/index.m3u8"

reader1 =
Boombox.run(input: input1, output: {:reader, video: :image, audio: false})

reader2 =
Boombox.run(input: input2, output: {:reader, video: :image, audio: false})

writer = Boombox.run(input: {:writer, video: :image, audio: false}, output: output)

Stream.repeatedly(fn ->
case {Boombox.read(reader1), Boombox.read(reader2)} do
{{:ok, packet1}, {:ok, packet2}} ->
joined_image =
Vix.Vips.Operation.join!(packet1.payload, packet2.payload, :VIPS_DIRECTION_HORIZONTAL)

packet = %Boombox.Packet{
pts: max(packet1.pts, packet2.pts),
payload: joined_image,
kind: :video
}

Boombox.write(writer, packet)

_finished ->
:eos
end
end)
|> Enum.find(& &1 == :eos)

Boombox.close(writer)
Boombox.close(reader1)
Boombox.close(reader2)
```

The second cell uses `:message` endpoints, meaning that the server communicates with boomboxes by
exchanging messages. A consequence of this is that the inputting boomboxes will control the
pace of providing the packets to the server, what can be useful in some circumstances:

```elixir
defmodule MyServer do
use GenServer

def start(args) do
GenServer.start(__MODULE__, args)
end

@impl true
def init(args) do
boombox1 = Boombox.run(input: args.input1, output: {:message, video: :image, audio: false})
boombox2 = Boombox.run(input: args.input2, output: {:message, video: :image, audio: false})
output_writer =
Boombox.run(input: {:writer, video: :image, audio: false}, output: args.output)

{:ok,
%{
boombox_states: %{
boombox1: %{last_packet: nil, eos: false},
boombox2: %{last_packet: nil, eos: false}
},
boomboxes: %{boombox1 => :boombox1, boombox2 => :boombox2},
output_writer: output_writer
}}
end

@impl true
def handle_info({:boombox_packet, bb, %Boombox.Packet{} = packet}, state) do
boombox_id = state.boomboxes[bb]
state = put_in(state.boombox_states[boombox_id].last_packet, packet)

if Enum.all?(Map.values(state.boombox_states), &(&1.last_packet != nil)) do
joined_image =
Vix.Vips.Operation.join!(
state.boombox_states.boombox1.last_packet.payload,
state.boombox_states.boombox2.last_packet.payload,
:VIPS_DIRECTION_HORIZONTAL
)

pts =
max(
state.boombox_states.boombox1.last_packet.pts,
state.boombox_states.boombox2.last_packet.pts
)

packet = %Boombox.Packet{packet | payload: joined_image, pts: pts}

Boombox.write(state.output_writer, packet)
end

{:noreply, state}
end

@impl true
def handle_info({:boombox_finished, bb}, state) do
boombox_id = state.boomboxes[bb]
state = put_in(state.boombox_states[boombox_id].eos, true)

if Enum.all?(Map.values(state.boombox_states), & &1.eos) do
Boombox.close(state.output_writer)
{:stop, :normal, state}
else
{:noreply, state}
end
end
end

input1 = "#{input_dir}/bun.mp4"
input2 = "#{input_dir}/ffmpeg-testsrc.mp4"
output = "#{out_dir}/index.m3u8"

{:ok, server} = MyServer.start(%{input1: input1, input2: input2, output: output})
monitor = Process.monitor(server)

receive do
{:DOWN, ^monitor, :process, ^server, reason} ->
:ok
end
```

<!-- livebook:{"branch_parent_index":0} -->

## Forward RTMP via WebRTC

To receive the stream, visit http://localhost:1234/webrtc_to_browser.html
Expand Down
Loading