Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 136 additions & 0 deletions examples.livemd
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ unless File.exists?("#{input_dir}/ffmpeg-testsrc.h264") do
File.write!("#{input_dir}/ffmpeg-testsrc.h264", testsrc_h264)
end

unless File.exists?("#{input_dir}/ffmpeg-testsrc.mp4") do
%{status: 200, body: testsrc_mp4} = Req.get!("#{samples_url}/ffmpeg-testsrc-480x270.mp4")
File.write!("#{input_dir}/ffmpeg-testsrc.mp4", testsrc_mp4)
end

unless File.exists?("#{input_dir}/test-audio.aac") do
%{status: 200, body: test_audio} = Req.get!("#{samples_url}/test-audio.aac")
File.write!("#{input_dir}/test-audio.aac", test_audio)
Expand Down Expand Up @@ -548,6 +553,137 @@ end)

<!-- livebook:{"branch_parent_index":0} -->

## Compose two streams side by side, broadcast via HLS

To receive the stream, visit http://localhost:1234/hls.html after running the cells below

The first cell uses `:reader` and `:writer` endpoints to communicate with boombox. In this
configuration the process calling `Boombox.read/1` controls when packets are being provided.

```elixir
input1 = "#{input_dir}/bun.mp4"
input2 = "#{input_dir}/ffmpeg-testsrc.mp4"
output = "#{out_dir}/index.m3u8"

reader1 =
Boombox.run(input: input1, output: {:reader, video: :image, audio: false})

reader2 =
Boombox.run(input: input2, output: {:reader, video: :image, audio: false})

writer = Boombox.run(input: {:writer, video: :image, audio: false}, output: output)

Stream.unfold(%{}, fn _state ->
{result1, packet1} = Boombox.read(reader1)
{result2, packet2} = Boombox.read(reader2)

joined_image =
Vix.Vips.Operation.join!(packet1.payload, packet2.payload, :VIPS_DIRECTION_HORIZONTAL)

packet = %Boombox.Packet{
pts: max(packet1.pts, packet2.pts),
payload: joined_image,
kind: :video
}

Boombox.write(writer, packet)

if :finished in [result1, result2] do
if result1 == :ok, do:
Boombox.close(reader1)
if result2 == :ok, do:
Boombox.close(reader2)
nil
else
{nil, %{}}
end
end)
|> Stream.run()

Boombox.close(writer)
```

The second cell uses `:message` endpoints, meaning that the server communicates with boomboxes by
exchanging messages. A consequence of this is that the inputting boomboxes will control the
pace of providing the packets to the server, what can be useful in some circumstances:

```elixir
defmodule MyServer do
use GenServer

def start(args) do
GenServer.start(__MODULE__, args)
end

@impl true
def init(args) do
bb1 = Boombox.run(input: args.input1, output: {:message, video: :image, audio: false})
bb2 = Boombox.run(input: args.input2, output: {:message, video: :image, audio: false})
output_writer =
Boombox.run(input: {:writer, video: :image, audio: false}, output: args.output)

{:ok,
%{
input_boomboxes_states: %{
bb1: %{last_packet: nil, eos: false},
bb2: %{last_packet: nil, eos: false}
},
input_boomboxes: %{bb1 => :bb1, bb2 => :bb2},
output_writer: output_writer
}}
end

@impl true
def handle_info({:boombox_packet, bb, packet}, state) do
boombox_id = state.input_boomboxes[bb]
state = put_in(state.input_boomboxes_states[boombox_id].last_packet, packet)

if Enum.all?(Map.values(state.input_boomboxes_states), &(&1.last_packet != nil)) do
joined_image =
Vix.Vips.Operation.join!(
state.input_boomboxes_states.bb1.last_packet.payload,
state.input_boomboxes_states.bb2.last_packet.payload,
:VIPS_DIRECTION_HORIZONTAL
)

packet = %Boombox.Packet{packet | payload: joined_image}

Boombox.write(state.output_writer, packet)
end

{:noreply, state}
end

@impl true
def handle_info({:boombox_finished, bb}, state) do
boombox_id = state.input_boomboxes[bb]
state = put_in(state.input_boomboxes_states[boombox_id].eos, true)

if Enum.all?(Map.values(state.input_boomboxes_states), & &1.eos) do
Boombox.close(state.output_writer)
{:stop, :normal, state}
else
{:noreply, state}
end
end
end

input1 = "#{input_dir}/bun.mp4"
input2 = "#{input_dir}/ffmpeg-testsrc.mp4"
output = "#{out_dir}/index.m3u8"

{:ok, server} = MyServer.start(%{input1: input, input2: input, output: output})
monitor = Process.monitor(server)

receive do
{:DOWN, ^monitor, :process, ^server, reason} ->
IO.inspect(reason)
:ok
end
```

<!-- livebook:{"branch_parent_index":0} -->

## Forward RTMP via WebRTC

To receive the stream, visit http://localhost:1234/webrtc_to_browser.html
Expand Down
110 changes: 77 additions & 33 deletions lib/boombox.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ defmodule Boombox do
@moduledoc """
Defines a struct to be used when interacting with boombox when using `:writer` endpoint.
"""
@opaque t :: %__MODULE__{
server_reference: GenServer.server()
}
@type t :: %__MODULE__{
server_reference: GenServer.server()
}

@enforce_keys [:server_reference]
defstruct @enforce_keys
Expand All @@ -28,9 +28,9 @@ defmodule Boombox do
@moduledoc """
Defines a struct to be used when interacting with boombox when using `:reader` endpoint.
"""
@opaque t :: %__MODULE__{
server_reference: GenServer.server()
}
@type t :: %__MODULE__{
server_reference: GenServer.server()
}

@enforce_keys [:server_reference]
defstruct @enforce_keys
Expand Down Expand Up @@ -137,7 +137,7 @@ defmodule Boombox do
| {:srt, url :: String.t(), srt_auth_opts()}
| {:srt, server_awaiting_accept :: ExLibSRT.Server.t()}

@type elixir_input :: {:stream | :writer, in_raw_data_opts()}
@type elixir_input :: {:stream | :writer | :message, in_raw_data_opts()}

@type output ::
(path_or_uri :: String.t())
Expand All @@ -156,7 +156,7 @@ defmodule Boombox do
| {:srt, url :: String.t(), srt_auth_opts()}
| :player

@type elixir_output :: {:stream | :reader, out_raw_data_opts()}
@type elixir_output :: {:stream | :reader | :message, out_raw_data_opts()}

@typep procs :: %{pipeline: pid(), supervisor: pid()}
@typep opts_map :: %{
Expand All @@ -177,14 +177,29 @@ defmodule Boombox do
See `t:input/0` and `t:output/0` for available inputs and outputs and
[examples.livemd](examples.livemd) for examples.

If the input is a `:stream` endpoint, a `Stream` or other `Enumerable` is expected
as the first argument.

If the input is a `:writer` endpoint this function will return a `Boombox.Writer` struct,
which is used to write media packets to boombox with `write/2` and to finish writing with `close/1`.

If the output is a `:reader` endpoint this function will return a `Boombox.Reader` struct,
which is used to read media packets from boombox with `read/1`.
Input endpoints with special behaviours:
* `:stream` - a `Stream` or other `Enumerable` containing `Boombox.Packet`s is expected as the first argument.
* `:writer` - this function will return a `Boombox.Writer` struct, which is used to
write media packets to boombox with `write/2` and to finish writing with `close/1`.
* `:message` - this function returns a PID of a process to communicate with. The process accepts
the following types of messages:
- `{:boombox_packet, sender_pid :: pid(), packet :: Boombox.Packet.t()}` - provides boombox
with a media packet. The process will a `{:boombox_finished, boombox_pid :: pid()}` message to
`sender_pid` if it has finished processing packets and should not be provided any more.
- `{:boombox_close, sender_pid :: pid()}` - tells boombox that no more packets will be
provided and that it should terminate. The process will reply by sending
`{:boombox_finished, boombox_pid :: pid()}` to `sender_pid`

Output endpoints with special behaviours:
* `:stream` - this function will return a `Stream` that contains `Boombox.Packet`s
* `:reader` - this function will return a `Boombox.Reader` struct, which is used to read media packets from
boombox with `read/1` and to stop reading with `close/1`.
* `:message` - this function returns a PID of a process to communicate with. The process will
send the following types of messages to the process that called this function:
- `{:boombox_packet, boombox_pid :: pid(), packet :: Boombox.Packet.t()}` - contains a packet
produced by boombox.
- `{:boombox_finished, boombox_pid :: pid()}` - informs that boombox has finished producing
packets and will begin terminating. No more messages will be sent.

```
Boombox.run(
Expand Down Expand Up @@ -212,13 +227,19 @@ defmodule Boombox do
produce_stream(sink, procs)

%{input: {:writer, _writer_opts}} ->
pid = start_server(opts)
pid = start_server(opts, :calls)
%Writer{server_reference: pid}

%{input: {:message, _message_opts}} ->
start_server(opts, :messages)

%{output: {:reader, _reader_opts}} ->
pid = start_server(opts)
pid = start_server(opts, :calls)
%Reader{server_reference: pid}

%{output: {:message, _message_opts}} ->
start_server(opts, :messages)

opts ->
opts
|> start_pipeline()
Expand Down Expand Up @@ -249,8 +270,8 @@ defmodule Boombox do

It returns a `Task.t()` that can be awaited later.

If the output is a `:stream` or `:reader` endpoint, or the input is a `:writer` endpoint,
the behaviour is identical to `run/2`.
If the output is a `:stream`, `:reader` or `:message` endpoint, or the input
is a `:writer` or `:message` endpoint, the behaviour is identical to `run/2`.
"""
@spec async(Enumerable.t() | nil,
input: input(),
Expand All @@ -275,13 +296,19 @@ defmodule Boombox do
produce_stream(sink, procs)

%{input: {:writer, _writer_opts}} ->
pid = start_server(opts)
pid = start_server(opts, :calls)
%Writer{server_reference: pid}

%{input: {:message, _message_opts}} ->
start_server(opts, :messages)

%{output: {:reader, _reader_opts}} ->
pid = start_server(opts)
pid = start_server(opts, :calls)
%Reader{server_reference: pid}

%{output: {:message, _message_opts}} ->
start_server(opts, :messages)

# In case of rtmp, rtmps, rtp, rtsp, we need to wait for the tcp/udp server to be ready
# before returning from async/2.
%{input: {protocol, _opts}} when protocol in [:rtmp, :rtmps, :rtp, :rtsp, :srt] ->
Expand Down Expand Up @@ -359,16 +386,27 @@ defmodule Boombox do
end

@doc """
Informs Boombox that it will not be provided any more packets with `write/2` and should terminate
accordingly.
Gracefully terminates Boombox when using `:reader` or `:writer` endpoints before a response
of type `:finished` has been received.

When using `:reader` endpoint on output informs Boombox that no more packets will be read
from it with `read/1` and that it should terminate accordingly. This function will then
return one last packet.

When using `:writer` endpoint on input informs Boombox that it will not be provided
any more packets with `write/2` and should terminate accordingly.

Can be called only when using `:writer` endpoint on input.
"""
@spec close(Writer.t()) :: :finished | {:error, :incompatible_mode}
def close(writer) do
@spec close(Reader.t()) :: {:finished, Boombox.Packet.t()} | {:error, :incompatible_mode}
def close(%Writer{} = writer) do
Boombox.Server.finish_consuming(writer.server_reference)
end

def close(%Reader{} = reader) do
Boombox.Server.finish_producing(reader.server_reference)
end

@endpoint_opts [:input, :output]
defp validate_opts!(stream, opts) do
opts = opts |> Keyword.validate!(@endpoint_opts) |> Map.new()
Expand All @@ -384,21 +422,27 @@ defmodule Boombox do

elixir_endpoint?(opts.input) and elixir_endpoint?(opts.output) ->
raise ArgumentError,
":stream, :writer or :reader on both input and output is not supported"
"Using an elixir endpoint (:reader, :writer, :message, :stream) on both input and output is not supported"

true ->
opts
end
end

defp elixir_endpoint?({:reader, _opts}), do: true
defp elixir_endpoint?({:writer, _opts}), do: true
defp elixir_endpoint?({:stream, _opts}), do: true
defp elixir_endpoint?({type, _opts}) when type in [:reader, :writer, :stream, :message],
do: true

defp elixir_endpoint?(_io), do: false

@spec start_server(opts_map()) :: boombox_server()
defp start_server(opts) do
{:ok, pid} = Boombox.Server.start(packet_serialization: false, stop_application: false)
@spec start_server(opts_map(), :messages | :calls) :: boombox_server()
defp start_server(opts, server_communication_medium) do
{:ok, pid} =
Boombox.Server.start(
packet_serialization: false,
stop_application: false,
communication_medium: server_communication_medium
)

Boombox.Server.run(pid, Map.to_list(opts))
pid
end
Expand Down
Loading