diff --git a/examples.livemd b/examples.livemd index 5c6e68eb..e85fa87a 100644 --- a/examples.livemd +++ b/examples.livemd @@ -24,7 +24,8 @@ Mix.install([ :exla, :bumblebee, :websockex, - :membrane_simple_rtsp_server + :membrane_simple_rtsp_server, + {:coerce, ">= 1.0.2"} ]) Nx.global_default_backend(EXLA.Backend) @@ -573,34 +574,29 @@ reader2 = writer = Boombox.run(input: {:writer, video: :image, audio: false}, output: output) -Stream.unfold(%{}, fn _state -> - {result1, packet1} = Boombox.read(reader1) - {result2, packet2} = Boombox.read(reader2) - - joined_image = - Vix.Vips.Operation.join!(packet1.payload, packet2.payload, :VIPS_DIRECTION_HORIZONTAL) +Stream.repeatedly(fn -> + case {Boombox.read(reader1), Boombox.read(reader2)} do + {{:ok, packet1}, {:ok, packet2}} -> + joined_image = + Vix.Vips.Operation.join!(packet1.payload, packet2.payload, :VIPS_DIRECTION_HORIZONTAL) - packet = %Boombox.Packet{ - pts: max(packet1.pts, packet2.pts), - payload: joined_image, - kind: :video - } + packet = %Boombox.Packet{ + pts: max(packet1.pts, packet2.pts), + payload: joined_image, + kind: :video + } - Boombox.write(writer, packet) + Boombox.write(writer, packet) - if :finished in [result1, result2] do - if result1 == :ok, do: - Boombox.close(reader1) - if result2 == :ok, do: - Boombox.close(reader2) - nil - else - {nil, %{}} + _finished -> + :eos end end) -|> Stream.run() +|> Enum.find(& &1 == :eos) Boombox.close(writer) +Boombox.close(reader1) +Boombox.close(reader2) ``` The second cell uses `:message` endpoints, meaning that the server communicates with boomboxes by @@ -617,36 +613,42 @@ defmodule MyServer do @impl true def init(args) do - bb1 = Boombox.run(input: args.input1, output: {:message, video: :image, audio: false}) - bb2 = Boombox.run(input: args.input2, output: {:message, video: :image, audio: false}) + boombox1 = Boombox.run(input: args.input1, output: {:message, video: :image, audio: false}) + boombox2 = Boombox.run(input: args.input2, output: {:message, video: :image, audio: false}) output_writer = Boombox.run(input: {:writer, video: :image, audio: false}, output: args.output) {:ok, %{ - input_boomboxes_states: %{ - bb1: %{last_packet: nil, eos: false}, - bb2: %{last_packet: nil, eos: false} + boombox_states: %{ + boombox1: %{last_packet: nil, eos: false}, + boombox2: %{last_packet: nil, eos: false} }, - input_boomboxes: %{bb1 => :bb1, bb2 => :bb2}, + boomboxes: %{boombox1 => :boombox1, boombox2 => :boombox2}, output_writer: output_writer }} end @impl true - def handle_info({:boombox_packet, bb, packet}, state) do - boombox_id = state.input_boomboxes[bb] - state = put_in(state.input_boomboxes_states[boombox_id].last_packet, packet) + def handle_info({:boombox_packet, bb, %Boombox.Packet{} = packet}, state) do + boombox_id = state.boomboxes[bb] + state = put_in(state.boombox_states[boombox_id].last_packet, packet) - if Enum.all?(Map.values(state.input_boomboxes_states), &(&1.last_packet != nil)) do + if Enum.all?(Map.values(state.boombox_states), &(&1.last_packet != nil)) do joined_image = Vix.Vips.Operation.join!( - state.input_boomboxes_states.bb1.last_packet.payload, - state.input_boomboxes_states.bb2.last_packet.payload, + state.boombox_states.boombox1.last_packet.payload, + state.boombox_states.boombox2.last_packet.payload, :VIPS_DIRECTION_HORIZONTAL ) - packet = %Boombox.Packet{packet | payload: joined_image} + pts = + max( + state.boombox_states.boombox1.last_packet.pts, + state.boombox_states.boombox2.last_packet.pts + ) + + packet = %Boombox.Packet{packet | payload: joined_image, pts: pts} Boombox.write(state.output_writer, packet) end @@ -656,10 +658,10 @@ defmodule MyServer do @impl true def handle_info({:boombox_finished, bb}, state) do - boombox_id = state.input_boomboxes[bb] - state = put_in(state.input_boomboxes_states[boombox_id].eos, true) + boombox_id = state.boomboxes[bb] + state = put_in(state.boombox_states[boombox_id].eos, true) - if Enum.all?(Map.values(state.input_boomboxes_states), & &1.eos) do + if Enum.all?(Map.values(state.boombox_states), & &1.eos) do Boombox.close(state.output_writer) {:stop, :normal, state} else @@ -672,12 +674,11 @@ input1 = "#{input_dir}/bun.mp4" input2 = "#{input_dir}/ffmpeg-testsrc.mp4" output = "#{out_dir}/index.m3u8" -{:ok, server} = MyServer.start(%{input1: input, input2: input, output: output}) +{:ok, server} = MyServer.start(%{input1: input1, input2: input2, output: output}) monitor = Process.monitor(server) receive do {:DOWN, ^monitor, :process, ^server, reason} -> - IO.inspect(reason) :ok end ``` diff --git a/lib/boombox.ex b/lib/boombox.ex index 292b070c..4d590d7d 100644 --- a/lib/boombox.ex +++ b/lib/boombox.ex @@ -9,9 +9,12 @@ defmodule Boombox do require Membrane.Time require Membrane.Transcoder.{Audio, Video} + alias Boombox.Pipeline alias Membrane.HTTPAdaptiveStream alias Membrane.RTP + @elixir_endpoints [:stream, :message, :writer, :reader] + defmodule Writer do @moduledoc """ Defines a struct to be used when interacting with boombox when using `:writer` endpoint. @@ -158,13 +161,6 @@ defmodule Boombox do @type elixir_output :: {:stream | :reader | :message, out_raw_data_opts()} - @typep procs :: %{pipeline: pid(), supervisor: pid()} - @typep opts_map :: %{ - input: input() | elixir_input(), - output: output() | elixir_output(), - parent: pid() - } - @doc """ Runs boombox with given input and output. @@ -183,12 +179,10 @@ defmodule Boombox do write media packets to boombox with `write/2` and to finish writing with `close/1`. * `:message` - this function returns a PID of a process to communicate with. The process accepts the following types of messages: - - `{:boombox_packet, sender_pid :: pid(), packet :: Boombox.Packet.t()}` - provides boombox + - `{:boombox_packet, packet :: Boombox.Packet.t()}` - provides boombox with a media packet. The process will a `{:boombox_finished, boombox_pid :: pid()}` message to `sender_pid` if it has finished processing packets and should not be provided any more. - - `{:boombox_close, sender_pid :: pid()}` - tells boombox that no more packets will be - provided and that it should terminate. The process will reply by sending - `{:boombox_finished, boombox_pid :: pid()}` to `sender_pid` + - `:boombox_close` - tells boombox that no more packets will be provided and that it should terminate. Output endpoints with special behaviours: * `:stream` - this function will return a `Stream` that contains `Boombox.Packet`s @@ -211,18 +205,18 @@ defmodule Boombox do @spec run(Enumerable.t() | nil, input: input() | elixir_input(), output: output() | elixir_output() - ) :: :ok | Enumerable.t() | Writer.t() | Reader.t() + ) :: :ok | Enumerable.t() | Writer.t() | Reader.t() | pid() def run(stream \\ nil, opts) do opts = validate_opts!(stream, opts) case opts do %{input: {:stream, _stream_opts}} -> - procs = start_pipeline(opts) + procs = Pipeline.start(opts) source = await_source_ready() consume_stream(stream, source, procs) %{output: {:stream, _stream_opts}} -> - procs = start_pipeline(opts) + procs = Pipeline.start(opts) sink = await_sink_ready() produce_stream(sink, procs) @@ -242,7 +236,7 @@ defmodule Boombox do opts -> opts - |> start_pipeline() + |> Pipeline.start() |> await_pipeline() end end @@ -282,7 +276,7 @@ defmodule Boombox do case opts do %{input: {:stream, _stream_opts}} -> - procs = start_pipeline(opts) + procs = Pipeline.start(opts) source = await_source_ready() Task.async(fn -> @@ -291,7 +285,7 @@ defmodule Boombox do end) %{output: {:stream, _stream_opts}} -> - procs = start_pipeline(opts) + procs = Pipeline.start(opts) sink = await_sink_ready() produce_stream(sink, procs) @@ -312,7 +306,7 @@ defmodule Boombox do # In case of rtmp, rtmps, rtp, rtsp, we need to wait for the tcp/udp server to be ready # before returning from async/2. %{input: {protocol, _opts}} when protocol in [:rtmp, :rtmps, :rtp, :rtsp, :srt] -> - procs = start_pipeline(opts) + procs = Pipeline.start(opts) task = Task.async(fn -> @@ -324,7 +318,7 @@ defmodule Boombox do task opts -> - procs = start_pipeline(opts) + procs = Pipeline.start(opts) Task.async(fn -> Process.monitor(procs.supervisor) @@ -358,14 +352,14 @@ defmodule Boombox do @doc """ Reads a packet from Boombox. - If returned with `:ok`, then this function can be called - again to request the next packet, and if returned with `:finished`, then Boombox finished it's + If returned with `:ok`, then this function can be called again to request the + next packet, and if returned with `:finished`, then Boombox finished it's operation and will not produce any more packets. Can be called only when using `:reader` endpoint on output. """ @spec read(Reader.t()) :: - {:ok | :finished, Boombox.Packet.t()} | {:error, :incompatible_mode} + {:ok, Boombox.Packet.t()} | :finished | {:error, :incompatible_mode} def read(reader) do Boombox.Server.produce_packet(reader.server_reference) end @@ -375,7 +369,7 @@ defmodule Boombox do Returns `:ok` if more packets can be provided, and `:finished` when Boombox finished consuming and will not accept any more packets. Returns - synchronously once the packet has been processed by Boombox. + synchronously once the packet has been ingested and Boombox is ready for more packets. Can be called only when using `:writer` endpoint on input. """ @@ -390,15 +384,13 @@ defmodule Boombox do of type `:finished` has been received. When using `:reader` endpoint on output informs Boombox that no more packets will be read - from it with `read/1` and that it should terminate accordingly. This function will then - return one last packet. + from it with `read/1` and that it should terminate accordingly. When using `:writer` endpoint on input informs Boombox that it will not be provided any more packets with `write/2` and should terminate accordingly. """ - @spec close(Writer.t()) :: :finished | {:error, :incompatible_mode} - @spec close(Reader.t()) :: {:finished, Boombox.Packet.t()} | {:error, :incompatible_mode} + @spec close(Writer.t() | Reader.t()) :: :ok | {:error, :incompatible_mode | :already_finished} def close(%Writer{} = writer) do Boombox.Server.finish_consuming(writer.server_reference) end @@ -429,44 +421,111 @@ defmodule Boombox do end end - defp elixir_endpoint?({type, _opts}) when type in [:reader, :writer, :stream, :message], + defp elixir_endpoint?({type, _opts}) when type in @elixir_endpoints, do: true defp elixir_endpoint?(_io), do: false - @spec start_server(opts_map(), :messages | :calls) :: boombox_server() + @spec start_server(Pipeline.opts_map(), :messages | :calls) :: boombox_server() defp start_server(opts, server_communication_medium) do {:ok, pid} = Boombox.Server.start( packet_serialization: false, stop_application: false, - communication_medium: server_communication_medium + communication_medium: server_communication_medium, + parent_pid: self() ) Boombox.Server.run(pid, Map.to_list(opts)) pid end - @spec consume_stream(Enumerable.t(), pid(), procs()) :: term() + # funn = + # fn + # %Boombox.Packet{kind: :video} = packet, %{video_demand: 0} = state -> + # receive do + # {:boombox_demand, ^source, :video, demand} -> + # send(source, {:boombox_packet, self(), packet}) + # {:cont, %{state | video_demand: demand - 1}} + + # {:DOWN, _monitor, :process, supervisor, _reason} + # when supervisor == procs.supervisor -> + # {:halt, :terminated} + # end + + # %Boombox.Packet{kind: :audio} = packet, %{audio_demand: 0} = state -> + # receive do + # {:boombox_demand, ^source, :audio, demand} -> + # send(source, {:boombox_packet, self(), packet}) + # {:cont, %{state | audio_demand: demand - 1}} + + # {:DOWN, _monitor, :process, supervisor, _reason} + # when supervisor == procs.supervisor -> + # {:halt, :terminated} + # end + + # %Boombox.Packet{} = packet, state -> + # audio_demand = + # receive do + # {:boombox_demand, ^source, :audio, value} -> value + # after + # 0 -> state.audio_demand + # end + + # video_demand = + # receive do + # {:boombox_demand, ^source, :video, value} -> value + # after + # 0 -> state.video_demand + # end + + # send(source, {:boombox_packet, self(), packet}) + + # state = + # case packet.kind do + # :video -> + # %{state | video_demand: video_demand - 1} + + # :audio -> + # %{state | audio_demand: audio_demand - 1} + # end + + # {:cont, state} + + # value, _state -> + # raise ArgumentError, "Expected Boombox.Packet.t(), got: #{inspect(value)}" + # end + + @spec consume_stream(Enumerable.t(), pid(), Pipeline.procs()) :: term() defp consume_stream(stream, source, procs) do Enum.reduce_while( stream, - %{demand: 0}, + %{demands: %{audio: 0, video: 0}}, fn - %Boombox.Packet{} = packet, %{demand: 0} = state -> + %Boombox.Packet{kind: kind} = packet, state -> + demand_timeout = + if state.demands[kind] == 0, + do: :infinity, + else: 0 + receive do - {:boombox_demand, demand} -> - send(source, packet) - {:cont, %{state | demand: demand - 1}} + {:boombox_demand, ^source, ^kind, value} -> + value - 1 {:DOWN, _monitor, :process, supervisor, _reason} when supervisor == procs.supervisor -> - {:halt, :terminated} + nil + after + demand_timeout -> state.demands[kind] - 1 end + |> case do + nil -> + {:halt, :terminated} - %Boombox.Packet{} = packet, %{demand: demand} = state -> - send(source, packet) - {:cont, %{state | demand: demand - 1}} + new_demand -> + send(source, {:boombox_packet, self(), packet}) + {:cont, put_in(state.demands[kind], new_demand)} + end value, _state -> raise ArgumentError, "Expected Boombox.Packet.t(), got: #{inspect(value)}" @@ -477,22 +536,22 @@ defmodule Boombox do :ok _state -> - send(source, :boombox_eos) + send(source, {:boombox_eos, self()}) await_pipeline(procs) end end - @spec produce_stream(pid(), procs()) :: Enumerable.t() + @spec produce_stream(pid(), Pipeline.procs()) :: Enumerable.t() defp produce_stream(sink, procs) do Stream.resource( fn -> %{sink: sink, procs: procs} end, fn %{sink: sink, procs: procs} = state -> - send(sink, :boombox_demand) + send(sink, {:boombox_demand, self()}) receive do - %Boombox.Packet{} = packet -> + {:boombox_packet, ^sink, %Boombox.Packet{} = packet} -> verify_packet!(packet) {[packet], state} @@ -508,28 +567,13 @@ defmodule Boombox do ) end - @spec start_pipeline(opts_map()) :: procs() - defp start_pipeline(opts) do - opts = - opts - |> Map.update!(:input, &resolve_stream_endpoint(&1, self())) - |> Map.update!(:output, &resolve_stream_endpoint(&1, self())) - |> Map.put(:parent, self()) - - {:ok, supervisor, pipeline} = - Membrane.Pipeline.start_link(Boombox.Pipeline, opts) - - Process.monitor(supervisor) - %{supervisor: supervisor, pipeline: pipeline} - end - - @spec terminate_pipeline(procs) :: :ok + @spec terminate_pipeline(Pipeline.procs()) :: :ok defp terminate_pipeline(procs) do Membrane.Pipeline.terminate(procs.pipeline) await_pipeline(procs) end - @spec await_pipeline(procs) :: :ok + @spec await_pipeline(Pipeline.procs()) :: :ok defp await_pipeline(%{supervisor: supervisor}) do receive do {:DOWN, _monitor, :process, ^supervisor, _reason} -> :ok @@ -539,14 +583,14 @@ defmodule Boombox do @spec await_source_ready() :: pid() defp await_source_ready() do receive do - {:boombox_ex_stream_source, source} -> source + {:boombox_elixir_source, source} -> source end end @spec await_sink_ready() :: pid() defp await_sink_ready() do receive do - {:boombox_ex_stream_sink, sink} -> sink + {:boombox_elixir_sink, sink} -> sink end end @@ -579,9 +623,4 @@ defmodule Boombox do :ok end - - defp resolve_stream_endpoint({:stream, stream_options}, parent), - do: {:stream, parent, stream_options} - - defp resolve_stream_endpoint(endpoint, _parent), do: endpoint end diff --git a/lib/boombox/bin.ex b/lib/boombox/bin.ex index dc3b74b6..8ae73ecc 100644 --- a/lib/boombox/bin.ex +++ b/lib/boombox/bin.ex @@ -163,7 +163,8 @@ defmodule Boombox.Bin do spec = child(:boombox, %Boombox.InternalBin{ input: opts.input || :membrane_pad, - output: opts.output || :membrane_pad + output: opts.output || :membrane_pad, + parent: self() }) {[spec: spec], Map.from_struct(opts)} diff --git a/lib/boombox/internal_bin.ex b/lib/boombox/internal_bin.ex index 21652884..4cd891e6 100644 --- a/lib/boombox/internal_bin.ex +++ b/lib/boombox/internal_bin.ex @@ -9,6 +9,8 @@ defmodule Boombox.InternalBin do alias Membrane.Transcoder + @elixir_endpoint_types [:stream, :message, :reader, :writer] + @type input :: Boombox.input() | {:stream, pid(), Boombox.in_raw_data_opts()} @@ -133,7 +135,8 @@ defmodule Boombox.InternalBin do ] def_options input: [spec: input()], - output: [spec: output()] + output: [spec: output()], + parent: [spec: pid()] @impl true def handle_init(ctx, opts) do @@ -145,8 +148,8 @@ defmodule Boombox.InternalBin do state = %State{ - input: parse_endpoint_opt!(:input, opts.input), - output: parse_endpoint_opt!(:output, opts.output), + input: parse_endpoint_opt!(:input, opts.input, opts.parent), + output: parse_endpoint_opt!(:output, opts.output, opts.parent), status: :init } @@ -334,7 +337,7 @@ defmodule Boombox.InternalBin do end @impl true - def handle_element_end_of_stream(:elixir_stream_sink, Pad.ref(:input, id), _ctx, state) do + def handle_element_end_of_stream(:elixir_sink, Pad.ref(:input, id), _ctx, state) do eos_info = List.delete(state.eos_info, id) state = %{state | eos_info: eos_info} @@ -427,12 +430,15 @@ defmodule Boombox.InternalBin do case result do %Ready{actions: actions} = result when ready_status != nil -> - proceed(ctx, %{ - state - | status: ready_status, - last_result: result, - actions_acc: actions_acc ++ actions - }) + proceed( + ctx, + %{ + state + | status: ready_status, + last_result: result, + actions_acc: actions_acc ++ actions + } + ) %Wait{actions: actions} when wait_status != nil -> {actions_acc ++ actions, %{state | actions_acc: [], status: wait_status}} @@ -452,8 +458,12 @@ defmodule Boombox.InternalBin do Boombox.InternalBin.RTSP.create_input(uri) end - defp create_input({:stream, stream_process, params}, _ctx, _state) do - Boombox.InternalBin.ElixirStream.create_input(stream_process, params) + defp create_input({type, process, params}, _ctx, _state) when type in @elixir_endpoint_types do + Boombox.InternalBin.ElixirEndpoints.create_input( + process, + params, + elixir_endpoint_flow_control(type) + ) end defp create_input({:h264, location, opts}, _ctx, _state) do @@ -683,13 +693,15 @@ defmodule Boombox.InternalBin do {result, state} end - defp link_output({:stream, stream_process, params}, track_builders, spec_builder, _ctx, state) do + defp link_output({type, process, params}, track_builders, spec_builder, _ctx, state) + when type in @elixir_endpoint_types do is_input_realtime = input_realtime?(state.input) result = - Boombox.InternalBin.ElixirStream.link_output( - stream_process, + Boombox.InternalBin.ElixirEndpoints.link_output( + process, params, + elixir_endpoint_flow_control(type), track_builders, spec_builder, is_input_realtime @@ -719,14 +731,14 @@ defmodule Boombox.InternalBin do Process.sleep(500) end - @spec parse_endpoint_opt!(:input, input()) :: input() - @spec parse_endpoint_opt!(:output, output()) :: output() - defp parse_endpoint_opt!(direction, value) when is_binary(value) do - parse_endpoint_opt!(direction, {value, []}) + @spec parse_endpoint_opt!(:input, input(), pid()) :: input() + @spec parse_endpoint_opt!(:output, output(), pid()) :: output() + defp parse_endpoint_opt!(direction, value, parent) when is_binary(value) do + parse_endpoint_opt!(direction, {value, []}, parent) end # credo:disable-for-next-line Credo.Check.Refactor.CyclomaticComplexity - defp parse_endpoint_opt!(direction, {value, opts}) when is_binary(value) do + defp parse_endpoint_opt!(direction, {value, opts}, parent) when is_binary(value) do uri = URI.parse(value) scheme = uri.scheme extension = if uri.path, do: Path.extname(uri.path) @@ -758,16 +770,16 @@ defmodule Boombox.InternalBin do _other -> raise ArgumentError, "Unsupported URI: #{value} for direction: #{direction}" end - |> then(&parse_endpoint_opt!(direction, &1)) + |> then(&parse_endpoint_opt!(direction, &1, parent)) end # credo:disable-for-next-line Credo.Check.Refactor.CyclomaticComplexity - defp parse_endpoint_opt!(direction, value) when is_tuple(value) or is_atom(value) do + defp parse_endpoint_opt!(direction, value, parent) when is_tuple(value) or is_atom(value) do case value do {endpoint_type, location} when is_binary(location) and direction == :input and StorageEndpoints.is_storage_endpoint_type(endpoint_type) -> - parse_endpoint_opt!(:input, {endpoint_type, location, []}) + parse_endpoint_opt!(:input, {endpoint_type, location, []}, parent) {endpoint_type, location, opts} when endpoint_type in [:h264, :h265] and is_binary(location) and direction == :input -> @@ -808,7 +820,7 @@ defmodule Boombox.InternalBin do value {:whip, uri} when is_binary(uri) -> - parse_endpoint_opt!(direction, {:whip, uri, []}) + parse_endpoint_opt!(direction, {:whip, uri, []}, parent) {:whip, uri, opts} when is_binary(uri) and is_list(opts) and direction == :input -> if Keyword.keyword?(opts), do: {:webrtc, value} @@ -839,8 +851,9 @@ defmodule Boombox.InternalBin do {:rtp, opts} -> if Keyword.keyword?(opts), do: value - {:stream, stream_process, opts} when is_pid(stream_process) -> - if Keyword.keyword?(opts), do: value + {elixir_endpoint, opts} + when elixir_endpoint in @elixir_endpoint_types -> + if Keyword.keyword?(opts), do: {elixir_endpoint, parent, opts} {:srt, server_awaiting_accept} when direction == :input and is_pid(server_awaiting_accept) -> @@ -947,6 +960,14 @@ defmodule Boombox.InternalBin do defp stream?({:stream, _pid, _opts}), do: true defp stream?(_endpoint), do: false + @spec elixir_endpoint_flow_control(:stream | :message | :reader | :writer) :: :pull | :push + defp elixir_endpoint_flow_control(type) do + cond do + type in [:stream, :writer, :reader] -> :pull + type == :message -> :push + end + end + @spec handles_keyframe_requests?(input()) :: boolean() defp handles_keyframe_requests?(input) do stream?(input) or webrtc?(input) diff --git a/lib/boombox/internal_bin/elixir_stream.ex b/lib/boombox/internal_bin/elixir_endpoints.ex similarity index 72% rename from lib/boombox/internal_bin/elixir_stream.ex rename to lib/boombox/internal_bin/elixir_endpoints.ex index 9770b7a8..ad06d279 100644 --- a/lib/boombox/internal_bin/elixir_stream.ex +++ b/lib/boombox/internal_bin/elixir_endpoints.ex @@ -1,10 +1,10 @@ -defmodule Boombox.InternalBin.ElixirStream do +defmodule Boombox.InternalBin.ElixirEndpoints do @moduledoc false import Membrane.ChildrenSpec require Membrane.Pad, as: Pad - alias __MODULE__.{Sink, Source} + alias __MODULE__.{PullSink, PushSink, PullSource, PushSource} alias Boombox.InternalBin.Ready alias Membrane.FFmpeg.SWScale @@ -14,8 +14,10 @@ defmodule Boombox.InternalBin.ElixirStream do # the burst of packets from one segment of Live HLS stream @realtimer_toilet_capacity 10_000 - @spec create_input(producer :: pid, options :: Boombox.in_raw_data_opts()) :: Ready.t() - def create_input(producer, options) do + @type flow_control_mode :: :push | :pull + + @spec create_input(pid(), Boombox.in_raw_data_opts(), flow_control_mode()) :: Ready.t() + def create_input(producer, options, flow_control_mode) do options = parse_options(options, :input) builders = @@ -24,64 +26,84 @@ defmodule Boombox.InternalBin.ElixirStream do |> Map.new(fn :video -> {:video, - get_child(:elixir_stream_source) + get_child(:elixir_source) |> via_out(Pad.ref(:output, :video)) |> child(%SWScale.Converter{format: :I420}) |> child(%Membrane.H264.FFmpeg.Encoder{profile: :baseline, preset: :ultrafast})} :audio -> {:audio, - get_child(:elixir_stream_source) + get_child(:elixir_source) |> via_out(Pad.ref(:output, :audio))} end) - spec_builder = child(:elixir_stream_source, %Source{producer: producer}) + source_definition = + case flow_control_mode do + :push -> %PushSource{producer: producer} + :pull -> %PullSource{producer: producer} + end + + spec_builder = child(:elixir_source, source_definition) %Ready{track_builders: builders, spec_builder: spec_builder} end @spec link_output( - consumer :: pid, - options :: Boombox.out_raw_data_opts(), + pid(), + Boombox.out_raw_data_opts(), + flow_control_mode(), Boombox.InternalBin.track_builders(), Membrane.ChildrenSpec.t(), boolean() ) :: Ready.t() - def link_output(consumer, options, track_builders, spec_builder, is_input_realtime) do + def link_output( + consumer, + options, + flow_control_mode, + track_builders, + spec_builder, + is_input_realtime + ) do options = parse_options(options, :output) pace_control = Map.get(options, :pace_control, true) {track_builders, to_ignore} = Map.split_with(track_builders, fn {kind, _builder} -> options[kind] != false end) + sink_definition = + case flow_control_mode do + :push -> %PushSink{consumer: consumer} + :pull -> %PullSink{consumer: consumer} + end + spec = [ spec_builder, - child(:elixir_stream_sink, %Sink{consumer: consumer}), + child(:elixir_sink, sink_definition), Enum.map(track_builders, fn {:audio, builder} -> builder - |> child(:elixir_stream_audio_transcoder, %Membrane.Transcoder{ + |> child(:elixir_audio_transcoder, %Membrane.Transcoder{ output_stream_format: Membrane.RawAudio }) |> maybe_plug_resampler(options) |> maybe_plug_realtimer(:audio, pace_control, is_input_realtime) |> via_in(Pad.ref(:input, :audio)) - |> get_child(:elixir_stream_sink) + |> get_child(:elixir_sink) {:video, builder} -> builder - |> child(:elixir_stream_video_transcoder, %Membrane.Transcoder{ + |> child(:elixir_video_transcoder, %Membrane.Transcoder{ output_stream_format: Membrane.RawVideo }) - |> child(:elixir_stream_rgb_converter, %SWScale.Converter{ + |> child(:elixir_rgb_converter, %SWScale.Converter{ format: :RGB, output_width: options[:video_width], output_height: options[:video_height] }) |> maybe_plug_realtimer(:video, pace_control, is_input_realtime) |> via_in(Pad.ref(:input, :video)) - |> get_child(:elixir_stream_sink) + |> get_child(:elixir_sink) end), Enum.map(to_ignore, fn {_track, builder} -> builder |> child(Membrane.Debug.Sink) end) ] @@ -94,7 +116,7 @@ defmodule Boombox.InternalBin.ElixirStream do defp maybe_plug_realtimer(builder, kind, true, false) do builder |> via_in(:input, toilet_capacity: @realtimer_toilet_capacity) - |> child({:elixir_stream, kind, :realtimer}, Membrane.Realtimer) + |> child({:elixir, kind, :realtimer}, Membrane.Realtimer) end defp maybe_plug_realtimer(builder, _kind, _pace_control, _is_input_realtime), do: builder diff --git a/lib/boombox/internal_bin/elixir_endpoints/sink.ex b/lib/boombox/internal_bin/elixir_endpoints/sink.ex new file mode 100644 index 00000000..0f5b270c --- /dev/null +++ b/lib/boombox/internal_bin/elixir_endpoints/sink.ex @@ -0,0 +1,150 @@ +# This generates two variants of the Sink: +# * PullSink - The element has `:manual` flow control on input pads and demands the +# packets from the previous element when receiving `{:boombox_demand, demand}` +# messages from the consumer process. +# * PushSink - The element works has `:push` flow control on input pads and doesn't expect +# demands from the consumer process. + +[{PullSink, :manual}, {PushSink, :push}] +|> Enum.map(fn {module_name, flow_control} -> + defmodule Module.concat(Boombox.InternalBin.ElixirEndpoints, module_name) do + @moduledoc false + use Membrane.Sink + + case flow_control do + :manual -> + def_input_pad :input, + accepted_format: any_of(Membrane.RawAudio, Membrane.RawVideo), + availability: :on_request, + max_instances: 2, + flow_control: :manual, + demand_unit: :buffers + + :push -> + def_input_pad :input, + accepted_format: any_of(Membrane.RawAudio, Membrane.RawVideo), + availability: :on_request, + max_instances: 2, + flow_control: :push + end + + def_options consumer: [ + spec: pid(), + description: """ + PID of a process to which send packets. + """ + ] + + defmodule State do + @moduledoc false + @type t :: %__MODULE__{ + consumer: pid(), + last_pts: %{ + optional(:video) => Membrane.Time.t(), + optional(:audio) => Membrane.Time.t() + }, + audio_format: + %{ + audio_format: Membrane.RawAudio.SampleFormat.t(), + audio_rate: pos_integer(), + audio_channels: pos_integer() + } + | nil + } + + @enforce_keys [:consumer] + + defstruct @enforce_keys ++ + [ + last_pts: %{}, + audio_format: nil + ] + end + + @impl true + def handle_init(_ctx, opts) do + {[], %State{consumer: opts.consumer}} + end + + @impl true + def handle_pad_added(Pad.ref(:input, kind), _ctx, state) do + {[], %{state | last_pts: Map.put(state.last_pts, kind, 0)}} + end + + @impl true + def handle_playing(_ctx, state) do + send(state.consumer, {:boombox_elixir_sink, self()}) + {[], state} + end + + if flow_control == :manual do + @impl true + def handle_info({:boombox_demand, consumer}, _ctx, %{consumer: consumer} = state) do + if state.last_pts == %{} do + {[], state} + else + {kind, _pts} = + Enum.min_by(state.last_pts, fn {_kind, pts} -> pts end) + + {[demand: Pad.ref(:input, kind)], state} + end + end + end + + @impl true + def handle_stream_format(Pad.ref(:input, :audio), stream_format, _ctx, state) do + audio_format = %{ + audio_format: stream_format.sample_format, + audio_rate: stream_format.sample_rate, + audio_channels: stream_format.channels + } + + {[], %{state | audio_format: audio_format}} + end + + @impl true + def handle_stream_format(_pad, _stream_format, _ctx, state) do + {[], state} + end + + @impl true + def handle_buffer(Pad.ref(:input, :video), buffer, ctx, state) do + state = %{state | last_pts: %{state.last_pts | video: buffer.pts}} + %{width: width, height: height} = ctx.pads[Pad.ref(:input, :video)].stream_format + + {:ok, image} = + Vix.Vips.Image.new_from_binary(buffer.payload, width, height, 3, :VIPS_FORMAT_UCHAR) + + packet = %Boombox.Packet{ + payload: image, + pts: buffer.pts, + kind: :video + } + + send(state.consumer, {:boombox_packet, self(), packet}) + + {[], state} + end + + @impl true + def handle_buffer(Pad.ref(:input, :audio), buffer, _ctx, state) do + state = %{state | last_pts: %{state.last_pts | audio: buffer.pts}} + + packet = %Boombox.Packet{ + payload: buffer.payload, + pts: buffer.pts, + kind: :audio, + format: state.audio_format + } + + send(state.consumer, {:boombox_packet, self(), packet}) + + {[], state} + end + + @impl true + def handle_end_of_stream(Pad.ref(:input, kind), _ctx, state) do + {[], %{state | last_pts: Map.delete(state.last_pts, kind)}} + end + end +end) diff --git a/lib/boombox/internal_bin/elixir_endpoints/source.ex b/lib/boombox/internal_bin/elixir_endpoints/source.ex new file mode 100644 index 00000000..7a4b7b62 --- /dev/null +++ b/lib/boombox/internal_bin/elixir_endpoints/source.ex @@ -0,0 +1,134 @@ +# This generates two variants of the Source: +# * PullSource - The element has `:manual` flow control on output pads and +# handles demands from subsequent element by demanding packets from the producer +# process with `{:boombox_demand, self(), demand_amount}` messages. +# * PushSource - The element has `:push` flow control on output pads and expects +# the producer process to provide it with packets without demanding them. + +[{PullSource, :manual}, {PushSource, :push}] +|> Enum.map(fn {module_name, flow_control} -> + defmodule Module.concat(Boombox.InternalBin.ElixirEndpoints, module_name) do + @moduledoc false + use Membrane.Source + + def_output_pad :output, + accepted_format: any_of(Membrane.RawVideo, Membrane.RawAudio), + availability: :on_request, + max_instances: 2, + flow_control: flow_control + + def_options producer: [ + spec: pid(), + description: """ + PID of a process from which to demand and receive packets. + """ + ] + + defmodule State do + @moduledoc false + @type t :: %__MODULE__{ + producer: pid(), + audio_format: + %{ + audio_format: Membrane.RawAudio.SampleFormat.t(), + audio_rate: pos_integer(), + audio_channels: pos_integer() + } + | nil, + video_dims: %{width: pos_integer(), height: pos_integer()} | nil + } + + @enforce_keys [:producer] + + defstruct @enforce_keys ++ + [ + audio_format: nil, + video_dims: nil + ] + end + + @impl true + def handle_init(_ctx, opts) do + {[], %State{producer: opts.producer}} + end + + @impl true + def handle_playing(_ctx, state) do + send(state.producer, {:boombox_elixir_source, self()}) + {[], state} + end + + if flow_control == :manual do + @impl true + def handle_demand(Pad.ref(:output, id), size, _unit, _ctx, state) do + send(state.producer, {:boombox_demand, self(), id, size}) + {[], state} + end + end + + @impl true + def handle_info( + {:boombox_packet, producer, %Boombox.Packet{kind: :video} = packet}, + _ctx, + %{producer: producer} = state + ) do + image = packet.payload |> Image.flatten!() |> Image.to_colorspace!(:srgb) + video_dims = %{width: Image.width(image), height: Image.height(image)} + {:ok, payload} = Vix.Vips.Image.write_to_binary(image) + buffer = %Membrane.Buffer{payload: payload, pts: packet.pts} + + if video_dims == state.video_dims do + {[buffer: {Pad.ref(:output, :video), buffer}], state} + else + stream_format = %Membrane.RawVideo{ + width: video_dims.width, + height: video_dims.height, + pixel_format: :RGB, + aligned: true, + framerate: nil + } + + {[ + stream_format: {Pad.ref(:output, :video), stream_format}, + buffer: {Pad.ref(:output, :video), buffer} + ], %{state | video_dims: video_dims}} + end + end + + @impl true + def handle_info( + {:boombox_packet, producer, %Boombox.Packet{kind: :audio} = packet}, + _ctx, + %{producer: producer} = state + ) do + %Boombox.Packet{payload: payload, format: format} = packet + buffer = %Membrane.Buffer{payload: payload, pts: packet.pts} + + cond do + format == %{} and state.audio_format == nil -> + raise "No audio stream format provided" + + format == %{} or format == state.audio_format -> + {[buffer: {Pad.ref(:output, :audio), buffer}], state} + + true -> + stream_format = %Membrane.RawAudio{ + sample_format: format.audio_format, + sample_rate: format.audio_rate, + channels: format.audio_channels + } + + {[ + stream_format: {Pad.ref(:output, :audio), stream_format}, + buffer: {Pad.ref(:output, :audio), buffer} + ], %{state | audio_format: format}} + end + end + + @impl true + def handle_info({:boombox_eos, producer}, ctx, %{producer: producer} = state) do + actions = Enum.map(ctx.pads, fn {ref, _data} -> {:end_of_stream, ref} end) + {actions, state} + end + end +end) diff --git a/lib/boombox/internal_bin/elixir_stream/sink.ex b/lib/boombox/internal_bin/elixir_stream/sink.ex deleted file mode 100644 index 7595e5c2..00000000 --- a/lib/boombox/internal_bin/elixir_stream/sink.ex +++ /dev/null @@ -1,92 +0,0 @@ -defmodule Boombox.InternalBin.ElixirStream.Sink do - @moduledoc false - use Membrane.Sink - - def_input_pad :input, - accepted_format: any_of(Membrane.RawAudio, Membrane.RawVideo), - availability: :on_request, - flow_control: :manual, - demand_unit: :buffers - - def_options consumer: [spec: pid()] - - @impl true - def handle_init(_ctx, opts) do - {[], Map.merge(Map.from_struct(opts), %{last_pts: %{}, audio_format: nil})} - end - - @impl true - def handle_pad_added(Pad.ref(:input, kind), _ctx, state) do - {[], %{state | last_pts: Map.put(state.last_pts, kind, 0)}} - end - - @impl true - def handle_playing(_ctx, state) do - send(state.consumer, {:boombox_ex_stream_sink, self()}) - {[], state} - end - - @impl true - def handle_info(:boombox_demand, _ctx, state) do - if state.last_pts == %{} do - {[], state} - else - {kind, _pts} = - Enum.min_by(state.last_pts, fn {_kind, pts} -> pts end) - - {[demand: Pad.ref(:input, kind)], state} - end - end - - @impl true - def handle_stream_format(Pad.ref(:input, :audio), stream_format, _ctx, state) do - audio_format = %{ - audio_format: stream_format.sample_format, - audio_rate: stream_format.sample_rate, - audio_channels: stream_format.channels - } - - {[], %{state | audio_format: audio_format}} - end - - @impl true - def handle_stream_format(_pad, _stream_format, _ctx, state) do - {[], state} - end - - @impl true - def handle_buffer(Pad.ref(:input, :video), buffer, ctx, state) do - state = %{state | last_pts: %{state.last_pts | video: buffer.pts}} - %{width: width, height: height} = ctx.pads[Pad.ref(:input, :video)].stream_format - - {:ok, image} = - Vix.Vips.Image.new_from_binary(buffer.payload, width, height, 3, :VIPS_FORMAT_UCHAR) - - send(state.consumer, %Boombox.Packet{ - payload: image, - pts: buffer.pts, - kind: :video - }) - - {[], state} - end - - @impl true - def handle_buffer(Pad.ref(:input, :audio), buffer, _ctx, state) do - state = %{state | last_pts: %{state.last_pts | audio: buffer.pts}} - - send(state.consumer, %Boombox.Packet{ - payload: buffer.payload, - pts: buffer.pts, - kind: :audio, - format: state.audio_format - }) - - {[], state} - end - - @impl true - def handle_end_of_stream(Pad.ref(:input, kind), _ctx, state) do - {[], %{state | last_pts: Map.delete(state.last_pts, kind)}} - end -end diff --git a/lib/boombox/internal_bin/elixir_stream/source.ex b/lib/boombox/internal_bin/elixir_stream/source.ex deleted file mode 100644 index 877c32cd..00000000 --- a/lib/boombox/internal_bin/elixir_stream/source.ex +++ /dev/null @@ -1,102 +0,0 @@ -defmodule Boombox.InternalBin.ElixirStream.Source do - @moduledoc false - use Membrane.Source - - def_output_pad :output, - accepted_format: any_of(Membrane.RawVideo, Membrane.RawAudio), - availability: :on_request, - flow_control: :manual, - demand_unit: :buffers - - def_options producer: [ - spec: pid() - ] - - @impl true - def handle_init(_ctx, opts) do - state = %{ - producer: opts.producer, - audio_format: nil, - video_dims: nil - } - - {[], state} - end - - @impl true - def handle_playing(_ctx, state) do - send(state.producer, {:boombox_ex_stream_source, self()}) - {[], state} - end - - @impl true - def handle_demand(Pad.ref(:output, _id), _size, _unit, ctx, state) do - demands = Enum.map(ctx.pads, fn {_pad, %{demand: demand}} -> demand end) - - if Enum.all?(demands, &(&1 > 0)) do - send(state.producer, {:boombox_demand, Enum.sum(demands)}) - end - - {[], state} - end - - @impl true - def handle_info(%Boombox.Packet{kind: :video} = packet, _ctx, state) do - image = packet.payload |> Image.flatten!() |> Image.to_colorspace!(:srgb) - video_dims = %{width: Image.width(image), height: Image.height(image)} - {:ok, payload} = Vix.Vips.Image.write_to_binary(image) - buffer = %Membrane.Buffer{payload: payload, pts: packet.pts} - - if video_dims == state.video_dims do - {[buffer: {Pad.ref(:output, :video), buffer}], state} - else - stream_format = %Membrane.RawVideo{ - width: video_dims.width, - height: video_dims.height, - pixel_format: :RGB, - aligned: true, - framerate: nil - } - - {[ - stream_format: {Pad.ref(:output, :video), stream_format}, - buffer: {Pad.ref(:output, :video), buffer} - ], %{state | video_dims: video_dims}} - end - end - - @impl true - def handle_info(%Boombox.Packet{kind: :audio} = packet, _ctx, state) do - %Boombox.Packet{payload: payload, format: format} = packet - buffer = %Membrane.Buffer{payload: payload, pts: packet.pts} - - case format do - empty_format when empty_format == %{} and state.audio_format == nil -> - raise "No audio stream format provided" - - empty_format when empty_format == %{} -> - {[buffer: {Pad.ref(:output, :audio), buffer}], state} - - unchanged_format when unchanged_format == state.audio_format -> - {[buffer: {Pad.ref(:output, :audio), buffer}], state} - - new_format -> - stream_format = %Membrane.RawAudio{ - sample_format: new_format.audio_format, - sample_rate: new_format.audio_rate, - channels: new_format.audio_channels - } - - {[ - stream_format: {Pad.ref(:output, :audio), stream_format}, - buffer: {Pad.ref(:output, :audio), buffer} - ], %{state | audio_format: format}} - end - end - - @impl true - def handle_info(:boombox_eos, ctx, state) do - actions = Enum.map(ctx.pads, fn {ref, _data} -> {:end_of_stream, ref} end) - {actions, state} - end -end diff --git a/lib/boombox/pipeline.ex b/lib/boombox/pipeline.ex index e37103bd..ceca4dd8 100644 --- a/lib/boombox/pipeline.ex +++ b/lib/boombox/pipeline.ex @@ -2,12 +2,30 @@ defmodule Boombox.Pipeline do @moduledoc false use Membrane.Pipeline + @type opts_map :: %{ + input: Boombox.input() | Boombox.elixir_input(), + output: Boombox.output() | Boombox.elixir_output() + } + @type procs :: %{pipeline: pid(), supervisor: pid()} + + @spec start(opts_map()) :: procs() + def start(opts) do + opts = Map.put(opts, :parent, self()) + + {:ok, supervisor, pipeline} = + Membrane.Pipeline.start_link(Boombox.Pipeline, opts) + + Process.monitor(supervisor) + %{supervisor: supervisor, pipeline: pipeline} + end + @impl true def handle_init(_ctx, opts) do spec = child(:boombox, %Boombox.InternalBin{ input: opts.input, - output: opts.output + output: opts.output, + parent: opts.parent }) {[spec: spec], %{parent: opts.parent}} diff --git a/lib/boombox/server.ex b/lib/boombox/server.ex index d6b4c278..fa1dc166 100644 --- a/lib/boombox/server.ex +++ b/lib/boombox/server.ex @@ -16,6 +16,14 @@ defmodule Boombox.Server do # tuple to the `sender` when finished. # The packets that Boombox is consuming and producing are in the form of # `t:serialized_boombox_packet/0` or `t:Boombox.Packet.t/0`, depending on set options. + # + # Avaliable actions: + # write buffer to boombox synchronously + # write buffer to boombox asynchronously + # read buffer from boombox synchronously + # receive buffer from boombox asynchronously (message) + # close boombox for wrtiting + # close boombox for reading use GenServer @@ -26,17 +34,19 @@ defmodule Boombox.Server do @type t :: GenServer.server() @type communication_medium :: :calls | :messages + @type flow_control :: :push | :pull @type opts :: [ name: GenServer.name(), packet_serialization: boolean(), stop_application: boolean(), - communication_medium: communication_medium() + communication_medium: communication_medium(), + parent_pid: pid() ] @type boombox_opts :: [ - input: Boombox.input() | {:writer, Boombox.in_raw_data_opts()}, - output: Boombox.output() | {:reader, Boombox.out_raw_data_opts()} + input: Boombox.input() | {:writer | :message, Boombox.in_raw_data_opts()}, + output: Boombox.output() | {:reader | :message, Boombox.out_raw_data_opts()} ] @typedoc """ @@ -94,15 +104,38 @@ defmodule Boombox.Server do @type t :: %__MODULE__{ packet_serialization: boolean(), stop_application: boolean(), - boombox_pid: pid() | nil, boombox_mode: Boombox.Server.boombox_mode() | nil, communication_medium: Boombox.Server.communication_medium(), - parent_pid: pid() + parent_pid: pid(), + membrane_sink: pid() | nil, + membrane_source: pid() | nil, + membrane_source_demands: %{audio: non_neg_integer(), video: non_neg_integer()}, + pipeline_supervisor: pid() | nil, + pipeline: pid() | nil, + current_client: GenServer.from() | Process.dest() | nil, + pipeline_termination_reason: term(), + termination_requested: boolean() } - @enforce_keys [:packet_serialization, :stop_application, :communication_medium, :parent_pid] - - defstruct @enforce_keys ++ [boombox_pid: nil, boombox_mode: nil] + @enforce_keys [ + :packet_serialization, + :stop_application, + :communication_medium, + :parent_pid + ] + + defstruct @enforce_keys ++ + [ + boombox_mode: nil, + membrane_sink: nil, + membrane_source: nil, + membrane_source_demands: %{audio: 0, video: 0}, + pipeline_supervisor: nil, + pipeline: nil, + current_client: nil, + pipeline_termination_reason: nil, + termination_requested: false + ] end @doc """ @@ -157,9 +190,13 @@ defmodule Boombox.Server do accordingly. Can be called only when Boombox is in `:consuming` mode. """ - @spec finish_consuming(t()) :: :finished | {:error, :incompatible_mode} + @spec finish_consuming(t()) :: :ok | {:error, :incompatible_mode | :already_finished} def finish_consuming(server) do - GenServer.call(server, :finish_consuming) + if Process.alive?(server) do + GenServer.call(server, :finish_consuming) + else + {:error, :already_finished} + end end @doc """ @@ -169,7 +206,8 @@ defmodule Boombox.Server do Can be called only when Boombox is in `:producing` mode. """ @spec produce_packet(t()) :: - {:ok | :finished, serialized_boombox_packet() | Boombox.Packet.t()} + {:ok, serialized_boombox_packet() | Boombox.Packet.t()} + | :finished | {:error, :incompatible_mode} def produce_packet(server) do GenServer.call(server, :produce_packet) @@ -179,11 +217,13 @@ defmodule Boombox.Server do Informs Boombox that no more packets will be read and shouldn't be produced. Can be called only when Boombox is in `:producing` mode. """ - @spec finish_producing(t()) :: - {:finished, serialized_boombox_packet() | Boombox.Packet.t()} - | {:error, :incompatible_mode} + @spec finish_producing(t()) :: :ok | {:error, :incompatible_mode | :already_finished} def finish_producing(server) do - GenServer.call(server, :finish_producing) + if Process.alive?(server) do + GenServer.call(server, :finish_producing) + else + {:error, :already_finished} + end end @impl true @@ -198,63 +238,148 @@ defmodule Boombox.Server do end @impl true - def handle_call(request, _from, state) do - {response, state} = handle_request(request, state) - {:reply, response, state} + def handle_call(request, from, state) do + handle_request(request, from, state) end + # Imitating calls with messages @impl true def handle_info({:call, sender, request}, state) do - {response, state} = handle_request(request, state) - send(sender, {:response, response}) + case handle_request(request, sender, state) do + {:reply, reply, state} -> + reply(sender, reply) + {:noreply, state} + + {:stop, reason, reply, state} -> + reply(sender, reply) + {:stop, reason, state} + + other -> + other + end + end + + # Message API - writing packets + @impl true + def handle_info( + {:boombox_packet, packet}, + %State{communication_medium: :messages, boombox_mode: :consuming} = state + ) do + packet = + if state.packet_serialization, + do: deserialize_packet(packet), + else: packet + + send(state.membrane_source, {:boombox_packet, self(), packet}) + {:noreply, state} end + # Message API - closing for writing @impl true def handle_info( - {:boombox_packet, sender_pid, %Boombox.Packet{} = packet}, - %State{communication_medium: :messages} = state + :boombox_close, + %State{communication_medium: :messages, boombox_mode: :consuming} = state ) do - {response, state} = handle_request({:consume_packet, packet}, state) - if response == :finished, do: send(sender_pid, :boombox_finished) + send(state.membrane_source, {:boombox_eos, self()}) {:noreply, state} end @impl true - def handle_info({:boombox_close, sender_pid}, %State{communication_medium: :messages} = state) do - handle_request(:finish_consuming, state) - send(sender_pid, {:boombox_finished, self()}) + def handle_info({boombox_elixir_element, pid}, %State{} = state) + when boombox_elixir_element in [:boombox_elixir_source, :boombox_elixir_sink] do + reply(state.current_client, state.boombox_mode) + + state = %State{state | current_client: nil} + + state = + case boombox_elixir_element do + :boombox_elixir_source -> %State{state | membrane_source: pid} + :boombox_elixir_sink -> %State{state | membrane_sink: pid} + end + {:noreply, state} end @impl true def handle_info( - {:packet_produced, packet, boombox_pid}, - %State{communication_medium: :messages, boombox_pid: boombox_pid} = state + {:boombox_demand, source, kind, value}, + %State{membrane_source: source} = state ) do - send(state.parent_pid, {:boombox_packet, self(), packet}) - {:noreply, state} + %State{} = state = put_in(state.membrane_source_demands[kind], value) + + if state.current_client != nil and + Enum.all?(state.membrane_source_demands, fn {_kind, value} -> value > 0 end) do + reply(state.current_client, :ok) + + {:noreply, %State{state | current_client: nil}} + else + {:noreply, state} + end + end + + @impl true + def handle_info( + {:boombox_packet, sink, packet}, + %State{membrane_sink: sink, communication_medium: :calls} = state + ) do + if state.current_client != nil do + packet = + if state.packet_serialization, + do: serialize_packet(packet), + else: packet + + reply(state.current_client, {:ok, packet}) + {:noreply, %State{state | current_client: nil}} + else + {:noreply, state} + end end @impl true def handle_info( - {:finished, packet, boombox_pid}, - %State{communication_medium: :messages, boombox_pid: boombox_pid} = state + {:boombox_packet, sink, packet}, + %State{membrane_sink: sink, communication_medium: :messages} = state ) do + packet = + if state.packet_serialization, + do: serialize_packet(packet), + else: packet + send(state.parent_pid, {:boombox_packet, self(), packet}) - send(state.parent_pid, {:boombox_finished, self()}) + {:noreply, state} end @impl true - def handle_info({:DOWN, _ref, :process, pid, reason}, %State{boombox_pid: pid} = state) do + def handle_info( + {:DOWN, _ref, :process, pid, reason}, + %State{pipeline_supervisor: pid} = state + ) do reason = case reason do :normal -> :normal reason -> {:boombox_crash, reason} end - {:stop, reason, state} + case state.communication_medium do + :calls -> + cond do + state.current_client != nil -> + reply(state.current_client, :finished) + {:stop, reason, state} + + state.termination_requested -> + {:stop, reason, state} + + true -> + {:noreply, %State{state | pipeline_termination_reason: reason}} + end + + :messages -> + send(state.parent_pid, {:boombox_finished, self()}) + {:stop, reason, state} + end end @impl true @@ -266,8 +391,8 @@ defmodule Boombox.Server do @impl true def terminate(reason, state) do if state.stop_application do - # Stop the application after the process terminates, allowing it to exit with the original - # reason, not :shutdown coming from the top. + # Stop the application after the process terminates, allowing for the process to exit with the original + # reason, not :shutdown coming from Application. pid = self() spawn(fn -> @@ -281,218 +406,165 @@ defmodule Boombox.Server do end end - @spec handle_request({:run, boombox_opts()}, State.t()) :: {boombox_mode(), State.t()} - defp handle_request({:run, boombox_opts}, %State{} = state) do - boombox_opts = - boombox_opts - |> Enum.map(fn - {direction, {:message, opts}} -> {direction, {:stream, opts}} - {direction, {:writer, opts}} -> {direction, {:stream, opts}} - {direction, {:reader, opts}} -> {direction, {:stream, opts}} - other -> other - end) - + @spec handle_request({:run, boombox_opts()}, GenServer.from() | Process.dest(), State.t()) :: + {:noreply, State.t()} + defp handle_request({:run, boombox_opts}, from, %State{} = state) do boombox_mode = get_boombox_mode(boombox_opts) - server_pid = self() - - boombox_process_fun = - case boombox_mode do - :consuming -> - fn -> consuming_boombox_run(boombox_opts, server_pid) end - - :producing -> - fn -> producing_boombox_run(boombox_opts, server_pid, state.communication_medium) end - - :standalone -> - fn -> standalone_boombox_run(boombox_opts) end - end - - boombox_pid = spawn(boombox_process_fun) - Process.monitor(boombox_pid) + %{supervisor: pipeline_supervisor, pipeline: pipeline} = + boombox_opts + |> Map.new() + |> Boombox.Pipeline.start() - {boombox_mode, %State{state | boombox_pid: boombox_pid, boombox_mode: boombox_mode}} + {:noreply, + %State{ + state + | boombox_mode: boombox_mode, + pipeline_supervisor: pipeline_supervisor, + pipeline: pipeline, + current_client: from + }} end - @spec handle_request(:get_pid, State.t()) :: {pid(), State.t()} - defp handle_request(:get_pid, state) do - {self(), state} + @spec handle_request(:get_pid, GenServer.from() | Process.dest(), State.t()) :: + {:reply, pid(), State.t()} + defp handle_request(:get_pid, _from, state) do + {:reply, self(), state} end - defp handle_request(_request, %State{boombox_pid: nil} = state) do - {{:error, :boombox_not_running}, state} + defp handle_request(_request, _from, %State{pipeline: nil} = state) do + {:reply, {:error, :boombox_not_running}, state} end @spec handle_request( {:consume_packet, serialized_boombox_packet() | Boombox.Packet.t()}, + GenServer.from() | Process.dest(), State.t() ) :: - {:ok | :finished | {:error, :incompatible_mode | :boombox_not_running}, State.t()} + {:reply, :ok | {:error, :incompatible_mode | :boombox_not_running}, State.t()} + | {:noreply, State.t()} + | {:stop, term(), :finished, State.t()} defp handle_request( {:consume_packet, packet}, - %State{boombox_mode: :consuming, boombox_pid: boombox_pid} = state + from, + %State{boombox_mode: :consuming} = state ) do packet = - if state.packet_serialization do - deserialize_packet(packet) - else - packet - end + if state.packet_serialization, + do: deserialize_packet(packet), + else: packet - send(boombox_pid, {:consume_packet, packet}) + send(state.membrane_source, {:boombox_packet, self(), packet}) + %State{} = state = update_in(state.membrane_source_demands[packet.kind], &(&1 - 1)) - receive do - {:packet_consumed, ^boombox_pid} -> - {:ok, state} + cond do + state.pipeline_termination_reason != nil -> + {:stop, state.pipeline_termination_reason, :finished, state} - {:finished, ^boombox_pid} -> - {:finished, state} - end - end + state.membrane_source_demands[packet.kind] == 0 -> + {:noreply, %State{state | current_client: from}} - defp handle_request({:consume_packet, _packet}, %State{boombox_mode: _other_mode} = state) do - {{:error, :incompatible_mode}, state} + true -> + {:reply, :ok, state} + end end - @spec handle_request(:finish_consuming, State.t()) :: - {:finished | {:error, :incompatible_mode | :boombox_not_running}, State.t()} defp handle_request( - :finish_consuming, - %State{boombox_mode: :consuming, boombox_pid: boombox_pid} = state + {:consume_packet, _packet}, + _from, + %State{boombox_mode: _other_mode} = state ) do - send(boombox_pid, :finish_consuming) - - receive do - {:finished, ^boombox_pid} -> - {:finished, state} + {:reply, {:error, :incompatible_mode}, state} + end + + @spec handle_request(:finish_consuming, GenServer.from() | Process.dest(), State.t()) :: + {:reply, :ok | {:error, :incompatible_mode}, State.t()} + | {:stop, term(), :ok, State.t()} + defp handle_request(:finish_consuming, _from, %State{boombox_mode: :consuming} = state) do + if state.pipeline_termination_reason != nil do + {:stop, state.pipeline_termination_reason, :ok, state} + else + send(state.membrane_source, {:boombox_eos, self()}) + {:reply, :ok, %State{state | termination_requested: true}} end end - defp handle_request(:finish_consuming, %State{boombox_mode: _other_mode} = state) do - {{:error, :incompatible_mode}, state} + defp handle_request(:finish_consuming, _from, %State{boombox_mode: _other_mode} = state) do + {:reply, {:error, :incompatible_mode}, state} end - @spec handle_request(:produce_packet, State.t()) :: - {{:ok | :finished, serialized_boombox_packet() | Boombox.Packet.t()} - | {:error, :incompatible_mode | :boombox_not_running}, State.t()} - defp handle_request( - :produce_packet, - %State{boombox_mode: :producing, boombox_pid: boombox_pid} = state - ) do - send(boombox_pid, :produce_packet) - - {response_type, packet} = - receive do - {:packet_produced, packet, ^boombox_pid} -> {:ok, packet} - {:finished, packet, ^boombox_pid} -> {:finished, packet} - end + @spec handle_request(:produce_packet, GenServer.from() | Process.dest(), State.t()) :: + {:reply, {:error, :incompatible_mode | :boombox_not_running}, State.t()} + | {:noreply, State.t()} + | {:stop, term(), :finished, State.t()} + defp handle_request(:produce_packet, from, %State{boombox_mode: :producing} = state) do + if state.pipeline_termination_reason != nil do + {:stop, state.pipeline_termination_reason, :finished, state} + else + send(state.membrane_sink, {:boombox_demand, self()}) + {:noreply, %State{state | current_client: from}} + end + end - packet = - if state.packet_serialization do - serialize_packet(packet) - else - packet - end + defp handle_request(:produce_packet, _from, %State{boombox_mode: _other_mode} = state) do + {:reply, {:error, :incompatible_mode}, state} + end - {{response_type, packet}, state} + @spec handle_request(:finish_producing, GenServer.from() | Process.dest(), State.t()) :: + {:reply, :ok | {:error, :incompatible_mode}, State.t()} + defp handle_request(:finish_producing, _from, %State{boombox_mode: :producing} = state) do + if state.pipeline_termination_reason != nil do + {:stop, state.pipeline_termination_reason, :ok, state} + else + Membrane.Pipeline.terminate(state.pipeline, asynchronous?: true) + {:reply, :ok, %State{state | termination_requested: true}} + end end - defp handle_request(:produce_packet, %State{boombox_mode: _other_mode} = state) do - {{:error, :incompatible_mode}, state} + defp handle_request(:finish_producing, _from, %State{boombox_mode: _other_mode} = state) do + {:reply, {:error, :incompatible_mode}, state} end - @spec handle_request(:finish_producing, State.t()) :: - {{:finished, serialized_boombox_packet() | Boombox.Packet.t()} - | {:error, :incompatible_mode}, State.t()} - defp handle_request( - :finish_producing, - %State{boombox_mode: :producing, boombox_pid: boombox_pid} = state - ) do - send(boombox_pid, :finish_producing) + @spec handle_request(term(), GenServer.from() | Process.dest(), State.t()) :: + {:reply, {:error, :invalid_request}, State.t()} + defp handle_request(_invalid_request, _from, state) do + {:reply, {:error, :invalid_request}, state} + end - receive do - {:finished, packet, ^boombox_pid} -> - {{:finished, packet}, state} - end + @spec reply(GenServer.from() | Process.dest(), term()) :: :ok + defp reply(dest, reply_content) when is_pid(dest) or is_port(dest) or is_atom(dest) do + send(dest, {:response, reply_content}) end - defp handle_request(:finish_producing, %State{boombox_mode: _other_mode} = state) do - {{:error, :incompatible_mode}, state} + defp reply({name, node} = dest, reply_content) when is_atom(name) and is_atom(node) do + send(dest, {:response, reply_content}) end - @spec handle_request(term(), State.t()) :: {{:error, :invalid_request}, State.t()} - defp handle_request(_invalid_request, state) do - {{:error, :invalid_request}, state} + defp reply({pid, _tag} = genserver_from, reply_content) when is_pid(pid) do + GenServer.reply(genserver_from, reply_content) end @spec get_boombox_mode(boombox_opts()) :: boombox_mode() defp get_boombox_mode(boombox_opts) do - case Map.new(boombox_opts) do - %{input: {:stream, _input_opts}, output: {:stream, _output_opts}} -> - raise ArgumentError, "Elixir endpoint on both input and output is not supported" + cond do + elixir_endpoint?(boombox_opts[:input]) and elixir_endpoint?(boombox_opts[:output]) -> + raise ArgumentError, "Using an elixir endpoint on both input and output is not supported" - %{input: {:stream, _input_opts}} -> + elixir_endpoint?(boombox_opts[:input]) -> :consuming - %{output: {:stream, _output_opts}} -> + elixir_endpoint?(boombox_opts[:output]) -> :producing - _other -> + true -> :standalone end end - @spec consuming_boombox_run(boombox_opts(), pid()) :: :ok - defp consuming_boombox_run(boombox_opts, server_pid) do - Stream.resource( - fn -> true end, - fn is_first_iteration -> - if not is_first_iteration do - send(server_pid, {:packet_consumed, self()}) - end - - receive do - {:consume_packet, packet} -> - {[packet], false} - - :finish_consuming -> - {:halt, false} - end - end, - fn _is_first_iteration -> send(server_pid, {:finished, self()}) end - ) - |> Boombox.run(boombox_opts) - end - - @spec producing_boombox_run(boombox_opts(), pid(), communication_medium()) :: :ok - defp producing_boombox_run(boombox_opts, server_pid, communication_medium) do - last_packet = - Boombox.run(boombox_opts) - |> Enum.reduce_while(nil, fn new_packet, last_produced_packet -> - if last_produced_packet != nil do - send(server_pid, {:packet_produced, last_produced_packet, self()}) - end - - action = - if communication_medium == :calls do - receive do - :produce_packet -> :cont - :finish_producing -> :halt - end - else - :cont - end - - {action, new_packet} - end) + defp elixir_endpoint?({type, _opts}) when type in [:stream, :reader, :writer, :message], + do: true - send(server_pid, {:finished, last_packet, self()}) - end - - @spec standalone_boombox_run(boombox_opts()) :: :ok - defp standalone_boombox_run(boombox_opts) do - Boombox.run(boombox_opts) - end + defp elixir_endpoint?(_io), do: false @spec deserialize_packet(serialized_boombox_packet()) :: Packet.t() defp deserialize_packet(%{payload: {:audio, payload}} = serialized_packet) do diff --git a/python/examples/anonymization_demo.py b/python/examples/anonymization_demo.py index 28303bc6..621d0fbf 100644 --- a/python/examples/anonymization_demo.py +++ b/python/examples/anonymization_demo.py @@ -26,6 +26,7 @@ import queue import threading import time +import logging from typing import NoReturn @@ -66,6 +67,7 @@ def read_packets(boombox: Boombox, packet_queue: queue.Queue) -> None: packet_queue.put(packet) if packet_queue.qsize() > MAX_QUEUE_SIZE: packet_queue.get() + print("dupsko") def resize_frame(frame: np.ndarray, scale_factor: float) -> np.ndarray: @@ -267,6 +269,9 @@ def main(): SERVER_ADDRESS = "localhost" SERVER_PORT = 8000 + logging.basicConfig() + Boombox.logger.setLevel(logging.INFO) + threading.Thread( target=run_server, args=(SERVER_ADDRESS, SERVER_PORT), daemon=True ).start() @@ -322,6 +327,7 @@ def main(): args=(input_boombox, packet_queue), daemon=True, ) + print("c") reading_thread.start() print("Input boombox initialized.") @@ -363,7 +369,7 @@ def main(): if should_anonymize: packet.payload = distort_audio(packet.payload, packet.sample_rate) - output_boombox.write(packet) + print(output_boombox.write(packet)) if isinstance(packet, VideoPacket): video_read_end_time = time.time() * 1000 @@ -398,7 +404,7 @@ def main(): render_transcription(transcription_lines, frame) packet.payload = frame - output_boombox.write(packet) + print(output_boombox.write(packet)) video_read_start_time = time.time() * 1000 output_boombox.close() diff --git a/python/src/boombox/_vendor/pyrlang/node.py b/python/src/boombox/_vendor/pyrlang/node.py index c3375f3d..038d4952 100644 --- a/python/src/boombox/_vendor/pyrlang/node.py +++ b/python/src/boombox/_vendor/pyrlang/node.py @@ -288,7 +288,7 @@ def _send_local_registered(self, receiver, message) -> None: receiver, receiver_obj, message) receiver_obj.deliver_message(msg=message) else: - LOG.warning("Send to unknown %s ignored", receiver) + LOG.info("Send to unknown %s ignored", receiver) def _send_local(self, receiver, message) -> None: """ Try find a process by pid and drop a message into its ``inbox_``. diff --git a/python/src/boombox/_vendor/pyrlang/rex.py b/python/src/boombox/_vendor/pyrlang/rex.py index d8582d51..1fb81c03 100644 --- a/python/src/boombox/_vendor/pyrlang/rex.py +++ b/python/src/boombox/_vendor/pyrlang/rex.py @@ -42,7 +42,7 @@ def handle_cast(self, msg): @info(1, lambda msg: True) def handle_info(self, msg): - LOG.error("rex unhandled info msg: %s", msg) + LOG.info("rex unhandled info msg: %s", msg) def act_on_msg(msg): diff --git a/python/src/boombox/boombox.py b/python/src/boombox/boombox.py index a512e772..4bbe65b7 100644 --- a/python/src/boombox/boombox.py +++ b/python/src/boombox/boombox.py @@ -24,7 +24,7 @@ from ._vendor.term import Atom, Pid from .endpoints import BoomboxEndpoint, AudioSampleFormat -from typing import Generator, ClassVar, Optional, Any, get_args +from typing import Generator, ClassVar, Literal, Optional, Any, get_args from typing_extensions import override @@ -85,8 +85,15 @@ class Boombox(process.Process): Definition of an input or output of Boombox. Can be provided explicitly by an appropriate :py:class:`.BoomboxEndpoint` or a string of a path to a file or an URL, that Boombox will attempt to interpret as an endpoint. + + Attributes + ---------- + logger : ClassVar[logging.Logger] + Logger used in this class """ + logger: ClassVar[logging.Logger] + _python_node_name: ClassVar[str] _cookie: ClassVar[str] @@ -100,10 +107,12 @@ class Boombox(process.Process): _terminated: asyncio.Future _finished: bool _erlang_process: subprocess.Popen + _boombox_mode: Atom _python_node_name = f"{uuid.uuid4()}@127.0.0.1" _cookie = str(uuid.uuid4()) _node = node.Node(node_name=_python_node_name, cookie=_cookie) + logger = logging.getLogger(__name__) threading.Thread(target=_node.run, daemon=True).start() def __init__( @@ -120,7 +129,9 @@ def __init__( self._download_elixir_boombox_release() - self._erlang_process = subprocess.Popen([self._server_release_path, "start"], env=env) + self._erlang_process = subprocess.Popen( + [self._server_release_path, "start"], env=env + ) atexit.register(lambda: self._erlang_process.kill()) super().__init__(True) @@ -132,10 +143,10 @@ def __init__( self.get_node().monitor_process(self.pid_, self._receiver) boombox_arg = [ - (Atom("input"), self._serialize_endpoint(input)), - (Atom("output"), self._serialize_endpoint(output)), + (Atom("input"), self._serialize_endpoint(input, "input")), + (Atom("output"), self._serialize_endpoint(output, "output")), ] - self._call((Atom("run"), boombox_arg)) + self._boombox_mode = self._call((Atom("run"), boombox_arg)) def read(self) -> Generator[AudioPacket | VideoPacket, None, None]: """Read media packets produced by Boombox. @@ -159,8 +170,7 @@ def read(self) -> Generator[AudioPacket | VideoPacket, None, None]: match self._call(Atom("produce_packet")): case (Atom("ok"), packet): yield self._deserialize_packet(packet) - case (Atom("finished"), packet): - yield self._deserialize_packet(packet) + case Atom("finished"): return case (Atom("error"), Atom("incompatible_mode")): raise RuntimeError("Output not defined with an RawData endpoint.") @@ -185,7 +195,7 @@ def write(self, packet: AudioPacket | VideoPacket) -> bool: Returns ------- finished : bool - Informs if Boombox has finished accepting packets and closed its + If true then Boombox has finished accepting packets and closed its input for any further ones. Once it finishes processing the previously provided packet, it will terminate. @@ -212,12 +222,12 @@ def write(self, packet: AudioPacket | VideoPacket) -> bool: raise RuntimeError(f"Unknown response: {other}") def close(self, wait: bool = True, kill: bool = False) -> None: - """Closes Boombox for writing. + """Closes Boombox for writing or reading. - Enabled only if Boombox has been initialized with input defined with an - :py:class:`.RawData` endpoint. + Enabled only if Boombox has been initialized with input or output defined + with a :py:class:`.RawData` endpoint. - This method informs Boombox that it shouldn't expect any more packets. + This method informs Boombox that it shouldn't expect or produce any more packets. Parameters ---------- @@ -234,17 +244,25 @@ def close(self, wait: bool = True, kill: bool = False) -> None: Raises ------ RuntimeError - If Boombox's input was not defined with an :py:class:`.RawData` - endpoint. + If neither of Boombox's input or output was defined with a + :py:class:`.RawData` endpoint. """ - match self._call(Atom("finish_consuming")): - case Atom("finished"): + match self._boombox_mode: + case Atom("consuming"): + request = Atom("finish_consuming") + case Atom("producing"): + request = Atom("finish_producing") + case other: + raise RuntimeError( + "Can't close boombox if not using a RawData endpoint" + ) + + match self._call(request): + case Atom("ok"): if kill: self.kill() elif wait: self.wait() - case (Atom("error"), Atom("incompatible_mode")): - raise RuntimeError("Input should be defined with an RawData endpoint.") case other: raise RuntimeError(f"Unknown response: {other}") @@ -332,9 +350,9 @@ def update_to(self, b=1, bsize=1, tsize=None): self._server_release_path = os.path.join(self._data_dir, "bin", "server") if os.path.exists(self._server_release_path): - logging.info("Elixir boombox release already present.") + self.logger.info("Elixir boombox release already present.") return - logging.info("Elixir boombox release not found, downloading...") + self.logger.info("Elixir boombox release not found, downloading...") if self._version == "dev": release_url = os.path.join(RELEASES_URL, "latest/download") @@ -359,13 +377,13 @@ def update_to(self, b=1, bsize=1, tsize=None): unit_scale=True, unit_divisor=1024, miniters=1, - desc=f"Downloading {release_tarball}", + desc=f"Downloading {release_tarball} from {release_url}", ) as t: urllib.request.urlretrieve( download_url, filename=tarball_path, reporthook=t.update_to ) - logging.info("Download complete. Extracting...") + self.logger.info("Download complete. Extracting...") with tarfile.open(tarball_path) as tar: tar.extractall(self._data_dir) os.remove(tarball_path) @@ -507,11 +525,13 @@ def _serialize_packet(packet: AudioPacket | VideoPacket) -> dict[Atom, Any]: } @staticmethod - def _serialize_endpoint(endpoint: BoomboxEndpoint | str) -> Any: + def _serialize_endpoint( + endpoint: BoomboxEndpoint | str, direction: Literal["input", "output"] + ) -> Any: if isinstance(endpoint, str): return endpoint.encode() else: - return endpoint.serialize() + return endpoint.serialize(direction) @dataclasses.dataclass diff --git a/python/src/boombox/endpoints.py b/python/src/boombox/endpoints.py index a37016ac..c04e1794 100644 --- a/python/src/boombox/endpoints.py +++ b/python/src/boombox/endpoints.py @@ -81,7 +81,7 @@ def get_atom_fields(self) -> set[str]: # def validate_direction(self, direction: Literal['input', 'output']) -> # bool: ... - def serialize(self) -> tuple: + def serialize(self, direction: Literal['input', 'output']) -> tuple: """Serializes itself to an Elixir-compatible term. To allow Pyrlang to send the endpoint definition to Elixir it first @@ -111,11 +111,11 @@ def serialize(self) -> tuple: if f.kw_only and self.__dict__[f.name] is not None ] if keyword_fields: - return (self.get_endpoint_name(), *required_field_values, keyword_fields) + return (self.get_endpoint_name(direction), *required_field_values, keyword_fields) else: - return (self.get_endpoint_name(), *required_field_values) + return (self.get_endpoint_name(direction), *required_field_values) - def get_endpoint_name(self) -> Atom: + def get_endpoint_name(self, direction: Literal["input", "output"]) -> Atom: """:meta private:""" return Atom(self.__class__.__name__.lower()) @@ -179,8 +179,10 @@ class RawData(BoomboxEndpoint): is_live: bool | None = None @override - def get_endpoint_name(self) -> Atom: - return Atom("stream") + def get_endpoint_name(self, direction) -> Atom: + match direction: + case "input": return Atom("writer") + case "output": return Atom("reader") @override def get_atom_fields(self) -> set[str]: diff --git a/test/boombox_test.exs b/test/boombox_test.exs index 15f04c80..7da7060b 100644 --- a/test/boombox_test.exs +++ b/test/boombox_test.exs @@ -456,24 +456,41 @@ defmodule BoomboxTest do Compare.compare(output, "test/fixtures/ref_bun10s_opus_aac.mp4") end - @tag :mp4_elixir_rotate_mp4 - async_test "mp4 -> elixir rotate -> mp4", %{tmp_dir: tmp} do - Boombox.run(input: @bbb_mp4, output: {:stream, video: :image, audio: :binary}) - |> Stream.map(fn - %Boombox.Packet{kind: :video, payload: image} = packet -> - image = Image.rotate!(image, 180) - %Boombox.Packet{packet | payload: image} - - %Boombox.Packet{kind: :audio} = packet -> - packet - end) - |> Boombox.run(input: {:stream, video: :image, audio: :binary}, output: "#{tmp}/output.mp4") + for( + output <- [:stream, :reader, :message], + input <- [:stream, :writer, :message], + do: {output, input} + ) + |> Enum.each(fn {output, input} -> + @tag :mp4_elixir_rotate_mp4 + @tag String.to_atom("mp4_#{output}_rotate_#{input}_mp4") + async_test "mp4 -> #{output} -> rotate -> #{input} -> mp4", %{tmp_dir: tmp} do + produce_packet_stream( + input: @bbb_mp4, + output: {unquote(output), video: :image, audio: :binary} + ) + |> Stream.map(fn + %Boombox.Packet{kind: :video, payload: image} = packet -> + image = Image.rotate!(image, 180) + %Boombox.Packet{packet | payload: image} - Compare.compare("#{tmp}/output.mp4", "test/fixtures/ref_bun_rotated.mp4") - end + %Boombox.Packet{kind: :audio} = packet -> + packet + end) + |> consume_packet_stream( + input: {unquote(input), video: :image, audio: :binary}, + output: "#{tmp}/output.mp4" + ) + + Process.sleep(100) + + Compare.compare("#{tmp}/output.mp4", "test/fixtures/ref_bun_rotated.mp4") + end + end) [:stream, :writer, :message] |> Enum.each(fn elixir_endpoint -> + @tag :bouncing_bubble_elixir_webrtc_mp4 @tag String.to_atom("bouncing_bubble_#{elixir_endpoint}_webrtc_mp4") async_test "bouncing bubble -> #{elixir_endpoint} -> webrtc -> mp4", %{tmp_dir: tmp} do signaling = Membrane.WebRTC.Signaling.new() @@ -487,44 +504,6 @@ defmodule BoomboxTest do max_y = Image.height(bg) - Image.height(overlay) fps = 60 - image_sink = - case unquote(elixir_endpoint) do - :stream -> - &Boombox.run(&1, - input: {:stream, video: :image, audio: false}, - output: {:webrtc, signaling} - ) - - :writer -> - writer = - Boombox.run( - input: {:writer, video: :image, audio: false}, - output: {:webrtc, signaling} - ) - - fn stream -> - Enum.each(stream, &Boombox.write(writer, &1)) - Boombox.close(writer) - end - - :message -> - server = - Boombox.run( - input: {:message, video: :image, audio: false}, - output: {:webrtc, signaling} - ) - - fn stream -> - Enum.each(stream, &send(server, {:boombox_packet, self(), &1})) - send(server, {:boombox_close, self()}) - - receive do - {:boombox_finished, ^server} -> - :ok - end - end - end - Task.async(fn -> Stream.iterate({_x = 300, _y = 0, _dx = 1, _dy = 2, _pts = 0}, fn {x, y, dx, dy, pts} -> dx = if (x + dx) in 0..max_x, do: dx, else: -dx @@ -537,21 +516,26 @@ defmodule BoomboxTest do %Boombox.Packet{kind: :video, payload: img, pts: pts} end) |> Stream.take(5 * fps) - |> then(image_sink) + |> consume_packet_stream( + input: {unquote(elixir_endpoint), video: :image, audio: false}, + output: {:webrtc, signaling} + ) end) output = Path.join(tmp, "output.mp4") Boombox.run(input: {:webrtc, signaling}, output: output) + Compare.compare(output, "test/fixtures/ref_bouncing_bubble.mp4", kinds: [:video]) end end) [:stream, :reader, :message] |> Enum.each(fn elixir_endpoint -> + @tag :mp4_elixir_resampled_pcm @tag String.to_atom("mp4_#{elixir_endpoint}_resampled_pcm") async_test "mp4 -> #{elixir_endpoint} -> resampled PCM" do - boombox = - Boombox.run( + output_pcm = + produce_packet_stream( input: @bbb_mp4, output: {unquote(elixir_endpoint), @@ -561,37 +545,10 @@ defmodule BoomboxTest do audio_channels: 1, audio_format: :s16le} ) - - pcm = - case unquote(elixir_endpoint) do - :stream -> - boombox - - :reader -> - Stream.unfold(:ok, fn - :ok -> - {result, packet} = Boombox.read(boombox) - {packet, result} - - :finished -> - nil - end) - - :message -> - Stream.unfold(:ok, fn :ok -> - receive do - {:boombox_packet, ^boombox, packet} -> - {packet, :ok} - - {:boombox_finished, ^boombox} -> - nil - end - end) - end |> Enum.map_join(& &1.payload) ref = File.read!("test/fixtures/ref_bun.pcm") - assert Compare.samples_min_squared_error(ref, pcm, 16) < 500 + assert Compare.samples_min_squared_error(ref, output_pcm, 16) < 500 end end) @@ -651,6 +608,61 @@ defmodule BoomboxTest do end end) + @spec produce_packet_stream(input: Boombox.input(), output: Boombox.elixir_output()) :: + Enumerable.t() + defp produce_packet_stream([input: _input, output: {:stream, _opts}] = opts) do + Boombox.run(opts) + end + + defp produce_packet_stream([input: _input, output: {:reader, _opts}] = opts) do + boombox = Boombox.run(opts) + + Stream.repeatedly(fn -> + case Boombox.read(boombox) do + {:ok, packet} -> packet + :finished -> :eos + end + end) + |> Stream.take_while(&(&1 != :eos)) + end + + defp produce_packet_stream([input: _input, output: {:message, _opts}] = opts) do + boombox = Boombox.run(opts) + + Stream.repeatedly(fn -> + receive do + {:boombox_packet, ^boombox, packet} -> + packet + + {:boombox_finished, ^boombox} -> + :eos + end + end) + |> Stream.take_while(&(&1 != :eos)) + end + + @spec consume_packet_stream(Enumerable.t(), + input: Boombox.elixir_input(), + output: Boombox.output() + ) :: term() + defp consume_packet_stream(stream, [input: {:stream, _opts}, output: _output] = opts) do + stream |> Boombox.run(opts) + end + + defp consume_packet_stream(stream, [input: {:writer, _opts}, output: _output] = opts) do + writer = Boombox.run(opts) + + Enum.each(stream, &Boombox.write(writer, &1)) + Boombox.close(writer) + end + + defp consume_packet_stream(stream, [input: {:message, _opts}, output: _output] = opts) do + server = Boombox.run(opts) + + Enum.each(stream, &send(server, {:boombox_packet, &1})) + send(server, :boombox_close) + end + defp send_rtmp(url) do p = Testing.Pipeline.start_link_supervised!(