Skip to content

Commit a053f6c

Browse files
committed
Fix naming
1 parent b4ac36b commit a053f6c

File tree

7 files changed

+81
-87
lines changed

7 files changed

+81
-87
lines changed

examples.livemd

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ System.put_env("PATH", "/opt/homebrew/bin:#{System.get_env("PATH")}")
1111
# Examples that don't mention them should still work.
1212

1313
# MIX_INSTALL_CONFIG_BEGIN
14-
boombox = {:boombox, github: "membraneframework/boombox", branch: "refactor-elixir-endpoints"}
14+
boombox = {:boombox, github: "membraneframework/boombox"}
1515

1616
# This livebook uses boombox from the master branch. If any examples happen to not work, the latest stable version of this livebook
1717
# can be found on https://hexdocs.pm/boombox/examples.html or in the latest github release.
@@ -575,27 +575,27 @@ writer = Boombox.run(input: {:writer, video: :image, audio: false}, output: outp
575575

576576
Stream.unfold(%{}, fn _state ->
577577
case {Boombox.read(reader1), Boombox.read(reader2)} do
578-
{:finished, :finished} ->
578+
{:finished, :finished} ->
579579
nil
580580

581-
{{:ok, _packet}, :finished} ->
581+
{{:ok, _packet}, :finished} ->
582582
Boombox.close(reader1)
583583
nil
584-
585-
{:finished, {:ok, _packet}} ->
584+
585+
{:finished, {:ok, _packet}} ->
586586
Boombox.close(reader2)
587587
nil
588-
589-
{{:ok, packet1}, {:ok, packet2}} ->
588+
589+
{{:ok, packet1}, {:ok, packet2}} ->
590590
joined_image =
591591
Vix.Vips.Operation.join!(packet1.payload, packet2.payload, :VIPS_DIRECTION_HORIZONTAL)
592-
592+
593593
packet = %Boombox.Packet{
594594
pts: max(packet1.pts, packet2.pts),
595595
payload: joined_image,
596596
kind: :video
597597
}
598-
598+
599599
Boombox.write(writer, packet)
600600

601601
{nil, %{}}
@@ -649,7 +649,7 @@ defmodule MyServer do
649649
:VIPS_DIRECTION_HORIZONTAL
650650
)
651651

652-
pts =
652+
pts =
653653
max(
654654
state.bb_states.bb1.last_packet.pts,
655655
state.bb_states.bb2.last_packet.pts

lib/boombox.ex

Lines changed: 43 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -211,13 +211,13 @@ defmodule Boombox do
211211

212212
case opts do
213213
%{input: {:stream, _stream_opts}} ->
214-
procs = Pipeline.start_pipeline(opts)
215-
source = Pipeline.await_source_ready()
214+
procs = Pipeline.start(opts)
215+
source = await_source_ready()
216216
consume_stream(stream, source, procs)
217217

218218
%{output: {:stream, _stream_opts}} ->
219-
procs = Pipeline.start_pipeline(opts)
220-
sink = Pipeline.await_sink_ready()
219+
procs = Pipeline.start(opts)
220+
sink = await_sink_ready()
221221
produce_stream(sink, procs)
222222

223223
%{input: {:writer, _writer_opts}} ->
@@ -236,8 +236,8 @@ defmodule Boombox do
236236

237237
opts ->
238238
opts
239-
|> Pipeline.start_pipeline()
240-
|> Pipeline.await_pipeline()
239+
|> Pipeline.start()
240+
|> await_pipeline()
241241
end
242242
end
243243

@@ -276,17 +276,17 @@ defmodule Boombox do
276276

277277
case opts do
278278
%{input: {:stream, _stream_opts}} ->
279-
procs = Pipeline.start_pipeline(opts)
280-
source = Pipeline.await_source_ready()
279+
procs = Pipeline.start(opts)
280+
source = await_source_ready()
281281

282282
Task.async(fn ->
283283
Process.monitor(procs.supervisor)
284284
consume_stream(stream, source, procs)
285285
end)
286286

287287
%{output: {:stream, _stream_opts}} ->
288-
procs = Pipeline.start_pipeline(opts)
289-
sink = Pipeline.await_sink_ready()
288+
procs = Pipeline.start(opts)
289+
sink = await_sink_ready()
290290
produce_stream(sink, procs)
291291

292292
%{input: {:writer, _writer_opts}} ->
@@ -306,23 +306,23 @@ defmodule Boombox do
306306
# In case of rtmp, rtmps, rtp, rtsp, we need to wait for the tcp/udp server to be ready
307307
# before returning from async/2.
308308
%{input: {protocol, _opts}} when protocol in [:rtmp, :rtmps, :rtp, :rtsp, :srt] ->
309-
procs = Pipeline.start_pipeline(opts)
309+
procs = Pipeline.start(opts)
310310

311311
task =
312312
Task.async(fn ->
313313
Process.monitor(procs.supervisor)
314-
Pipeline.await_pipeline(procs)
314+
await_pipeline(procs)
315315
end)
316316

317317
await_external_resource_ready()
318318
task
319319

320320
opts ->
321-
procs = Pipeline.start_pipeline(opts)
321+
procs = Pipeline.start(opts)
322322

323323
Task.async(fn ->
324324
Process.monitor(procs.supervisor)
325-
Pipeline.await_pipeline(procs)
325+
await_pipeline(procs)
326326
end)
327327
end
328328
end
@@ -471,7 +471,7 @@ defmodule Boombox do
471471

472472
_state ->
473473
send(source, {:boombox_eos, self()})
474-
Pipeline.await_pipeline(procs)
474+
await_pipeline(procs)
475475
end
476476
end
477477

@@ -495,12 +495,39 @@ defmodule Boombox do
495495
end
496496
end,
497497
fn
498-
%{procs: procs} -> Pipeline.terminate_pipeline(procs)
498+
%{procs: procs} -> terminate_pipeline(procs)
499499
:eos -> :ok
500500
end
501501
)
502502
end
503503

504+
@spec terminate_pipeline(Pipeline.procs()) :: :ok
505+
defp terminate_pipeline(procs) do
506+
Membrane.Pipeline.terminate(procs.pipeline)
507+
await_pipeline(procs)
508+
end
509+
510+
@spec await_pipeline(Pipeline.procs()) :: :ok
511+
defp await_pipeline(%{supervisor: supervisor}) do
512+
receive do
513+
{:DOWN, _monitor, :process, ^supervisor, _reason} -> :ok
514+
end
515+
end
516+
517+
@spec await_source_ready() :: pid()
518+
defp await_source_ready() do
519+
receive do
520+
{:boombox_elixir_source, source} -> source
521+
end
522+
end
523+
524+
@spec await_sink_ready() :: pid()
525+
defp await_sink_ready() do
526+
receive do
527+
{:boombox_elixir_sink, sink} -> sink
528+
end
529+
end
530+
504531
# Waits for the external resource to be ready.
505532
# This is used to wait for the tcp/udp server to be ready before returning from async/2.
506533
# It is used for rtmp, rtmps, rtp, rtsp.

lib/boombox/internal_bin.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -336,7 +336,7 @@ defmodule Boombox.InternalBin do
336336
end
337337

338338
@impl true
339-
def handle_element_end_of_stream(:elixir_stream_sink, Pad.ref(:input, id), _ctx, state) do
339+
def handle_element_end_of_stream(:elixir_sink, Pad.ref(:input, id), _ctx, state) do
340340
eos_info = List.delete(state.eos_info, id)
341341
state = %{state | eos_info: eos_info}
342342

lib/boombox/internal_bin/elixir_endpoints.ex

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,14 @@ defmodule Boombox.InternalBin.ElixirEndpoints do
2626
|> Map.new(fn
2727
:video ->
2828
{:video,
29-
get_child(:elixir_stream_source)
29+
get_child(:elixir_source)
3030
|> via_out(Pad.ref(:output, :video))
3131
|> child(%SWScale.Converter{format: :I420})
3232
|> child(%Membrane.H264.FFmpeg.Encoder{profile: :baseline, preset: :ultrafast})}
3333

3434
:audio ->
3535
{:audio,
36-
get_child(:elixir_stream_source)
36+
get_child(:elixir_source)
3737
|> via_out(Pad.ref(:output, :audio))}
3838
end)
3939

@@ -43,7 +43,7 @@ defmodule Boombox.InternalBin.ElixirEndpoints do
4343
:pull -> %PullSource{producer: producer}
4444
end
4545

46-
spec_builder = child(:elixir_stream_source, source_definition)
46+
spec_builder = child(:elixir_source, source_definition)
4747

4848
%Ready{track_builders: builders, spec_builder: spec_builder}
4949
end
@@ -79,31 +79,31 @@ defmodule Boombox.InternalBin.ElixirEndpoints do
7979
spec =
8080
[
8181
spec_builder,
82-
child(:elixir_stream_sink, sink_definition),
82+
child(:elixir_sink, sink_definition),
8383
Enum.map(track_builders, fn
8484
{:audio, builder} ->
8585
builder
86-
|> child(:elixir_stream_audio_transcoder, %Membrane.Transcoder{
86+
|> child(:elixir_audio_transcoder, %Membrane.Transcoder{
8787
output_stream_format: Membrane.RawAudio
8888
})
8989
|> maybe_plug_resampler(options)
9090
|> maybe_plug_realtimer(:audio, pace_control, is_input_realtime)
9191
|> via_in(Pad.ref(:input, :audio))
92-
|> get_child(:elixir_stream_sink)
92+
|> get_child(:elixir_sink)
9393

9494
{:video, builder} ->
9595
builder
96-
|> child(:elixir_stream_video_transcoder, %Membrane.Transcoder{
96+
|> child(:elixir_video_transcoder, %Membrane.Transcoder{
9797
output_stream_format: Membrane.RawVideo
9898
})
99-
|> child(:elixir_stream_rgb_converter, %SWScale.Converter{
99+
|> child(:elixir_rgb_converter, %SWScale.Converter{
100100
format: :RGB,
101101
output_width: options[:video_width],
102102
output_height: options[:video_height]
103103
})
104104
|> maybe_plug_realtimer(:video, pace_control, is_input_realtime)
105105
|> via_in(Pad.ref(:input, :video))
106-
|> get_child(:elixir_stream_sink)
106+
|> get_child(:elixir_sink)
107107
end),
108108
Enum.map(to_ignore, fn {_track, builder} -> builder |> child(Membrane.Debug.Sink) end)
109109
]
@@ -116,7 +116,7 @@ defmodule Boombox.InternalBin.ElixirEndpoints do
116116
defp maybe_plug_realtimer(builder, kind, true, false) do
117117
builder
118118
|> via_in(:input, toilet_capacity: @realtimer_toilet_capacity)
119-
|> child({:elixir_stream, kind, :realtimer}, Membrane.Realtimer)
119+
|> child({:elixir, kind, :realtimer}, Membrane.Realtimer)
120120
end
121121

122122
defp maybe_plug_realtimer(builder, _kind, _pace_control, _is_input_realtime), do: builder

lib/boombox/pipeline.ex

Lines changed: 6 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,12 @@ defmodule Boombox.Pipeline do
99
}
1010
@type procs :: %{pipeline: pid(), supervisor: pid()}
1111

12-
@spec start_pipeline(opts_map()) :: procs()
13-
def start_pipeline(opts) do
12+
@spec start(opts_map()) :: procs()
13+
def start(opts) do
1414
opts =
1515
opts
16-
|> Map.update!(:input, &resolve_stream_endpoint(&1, self()))
17-
|> Map.update!(:output, &resolve_stream_endpoint(&1, self()))
16+
|> Map.update!(:input, &resolve_elixir_endpoint(&1, self()))
17+
|> Map.update!(:output, &resolve_elixir_endpoint(&1, self()))
1818
|> Map.put(:parent, self())
1919

2020
{:ok, supervisor, pipeline} =
@@ -24,33 +24,6 @@ defmodule Boombox.Pipeline do
2424
%{supervisor: supervisor, pipeline: pipeline}
2525
end
2626

27-
@spec terminate_pipeline(procs()) :: :ok
28-
def terminate_pipeline(procs) do
29-
Membrane.Pipeline.terminate(procs.pipeline)
30-
await_pipeline(procs)
31-
end
32-
33-
@spec await_pipeline(procs()) :: :ok
34-
def await_pipeline(%{supervisor: supervisor}) do
35-
receive do
36-
{:DOWN, _monitor, :process, ^supervisor, _reason} -> :ok
37-
end
38-
end
39-
40-
@spec await_source_ready() :: pid()
41-
def await_source_ready() do
42-
receive do
43-
{:boombox_elixir_source, source} -> source
44-
end
45-
end
46-
47-
@spec await_sink_ready() :: pid()
48-
def await_sink_ready() do
49-
receive do
50-
{:boombox_elixir_sink, sink} -> sink
51-
end
52-
end
53-
5427
@impl true
5528
def handle_init(_ctx, opts) do
5629
spec =
@@ -73,9 +46,9 @@ defmodule Boombox.Pipeline do
7346
{[terminate: :normal], state}
7447
end
7548

76-
defp resolve_stream_endpoint({endpoint_type, opts}, parent)
49+
defp resolve_elixir_endpoint({endpoint_type, opts}, parent)
7750
when endpoint_type in @elixir_endpoints,
7851
do: {endpoint_type, parent, opts}
7952

80-
defp resolve_stream_endpoint(endpoint, _parent), do: endpoint
53+
defp resolve_elixir_endpoint(endpoint, _parent), do: endpoint
8154
end

lib/boombox/server.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -401,7 +401,7 @@ defmodule Boombox.Server do
401401
%{supervisor: pipeline_supervisor, pipeline: pipeline} =
402402
boombox_opts
403403
|> Map.new()
404-
|> Boombox.Pipeline.start_pipeline()
404+
|> Boombox.Pipeline.start()
405405

406406
{:noreply,
407407
%State{

python/src/boombox/boombox.py

Lines changed: 10 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -168,22 +168,16 @@ def read(self) -> Generator[AudioPacket | VideoPacket, None, None]:
168168
RuntimeError
169169
If Boombox's output was not defined by an :py:class:`.RawData` endpoint.
170170
"""
171-
try:
172-
while True:
173-
match self._call(Atom("produce_packet")):
174-
case (Atom("ok"), packet):
175-
yield self._deserialize_packet(packet)
176-
case Atom("finished"):
177-
return
178-
case (Atom("error"), Atom("incompatible_mode")):
179-
raise RuntimeError(
180-
"Output not defined with an RawData endpoint."
181-
)
182-
case other:
183-
raise RuntimeError(f"Unknown response: {other}")
184-
finally:
185-
# pass
186-
self.close()
171+
while True:
172+
match self._call(Atom("produce_packet")):
173+
case (Atom("ok"), packet):
174+
yield self._deserialize_packet(packet)
175+
case Atom("finished"):
176+
return
177+
case (Atom("error"), Atom("incompatible_mode")):
178+
raise RuntimeError("Output not defined with an RawData endpoint.")
179+
case other:
180+
raise RuntimeError(f"Unknown response: {other}")
187181

188182
def write(self, packet: AudioPacket | VideoPacket) -> bool:
189183
"""Write packets to Boombox.

0 commit comments

Comments
 (0)