Skip to content

Commit bd7808e

Browse files
committed
Handle termination requests correctly
1 parent 36cb5c5 commit bd7808e

File tree

5 files changed

+88
-50
lines changed

5 files changed

+88
-50
lines changed

lib/boombox/server.ex

Lines changed: 41 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -111,8 +111,9 @@ defmodule Boombox.Server do
111111
membrane_source_demand: non_neg_integer(),
112112
pipeline_supervisor: pid() | nil,
113113
pipeline: pid() | nil,
114-
ghosted_client: GenServer.from() | pid() | nil,
115-
pipeline_exit_reason: term()
114+
ghosted_client: GenServer.from() | Process.dest() | nil,
115+
pipeline_termination_reason: term(),
116+
termination_requested: boolean()
116117
}
117118

118119
@enforce_keys [
@@ -131,7 +132,8 @@ defmodule Boombox.Server do
131132
pipeline_supervisor: nil,
132133
pipeline: nil,
133134
ghosted_client: nil,
134-
pipeline_exit_reason: nil
135+
pipeline_termination_reason: nil,
136+
termination_requested: false
135137
]
136138
end
137139

@@ -349,11 +351,16 @@ defmodule Boombox.Server do
349351

350352
case state.communication_medium do
351353
:calls ->
352-
if state.ghosted_client != nil do
353-
reply(state.ghosted_client, :finished)
354-
{:stop, reason, state}
355-
else
356-
{:noreply, %State{state | pipeline_exit_reason: reason}}
354+
cond do
355+
state.ghosted_client != nil ->
356+
reply(state.ghosted_client, :finished)
357+
{:stop, reason, state}
358+
359+
state.termination_requested ->
360+
{:stop, reason, state}
361+
362+
true ->
363+
{:noreply, %State{state | pipeline_termination_reason: reason}}
357364
end
358365

359366
:messages ->
@@ -386,7 +393,7 @@ defmodule Boombox.Server do
386393
end
387394
end
388395

389-
@spec handle_request({:run, boombox_opts()}, GenServer.from() | pid(), State.t()) ::
396+
@spec handle_request({:run, boombox_opts()}, GenServer.from() | Process.dest(), State.t()) ::
390397
{:reply, boombox_mode(), State.t()}
391398
defp handle_request({:run, boombox_opts}, from, state) do
392399
boombox_mode = get_boombox_mode(boombox_opts)
@@ -406,7 +413,7 @@ defmodule Boombox.Server do
406413
}}
407414
end
408415

409-
@spec handle_request(:get_pid, GenServer.from() | pid(), State.t()) ::
416+
@spec handle_request(:get_pid, GenServer.from() | Process.dest(), State.t()) ::
410417
{:reply, pid(), State.t()}
411418
defp handle_request(:get_pid, _from, state) do
412419
{:reply, self(), state}
@@ -418,7 +425,7 @@ defmodule Boombox.Server do
418425

419426
@spec handle_request(
420427
{:consume_packet, serialized_boombox_packet() | Boombox.Packet.t()},
421-
GenServer.from() | pid(),
428+
GenServer.from() | Process.dest(),
422429
State.t()
423430
) ::
424431
{:reply, :ok | {:error, :incompatible_mode | :boombox_not_running}, State.t()}
@@ -438,8 +445,8 @@ defmodule Boombox.Server do
438445
state = %State{state | membrane_source_demand: state.membrane_source_demand - 1}
439446

440447
cond do
441-
state.pipeline_exit_reason != nil ->
442-
{:stop, state.pipeline_exit_reason, :finished, state}
448+
state.pipeline_termination_reason != nil ->
449+
{:stop, state.pipeline_termination_reason, :finished, state}
443450

444451
state.membrane_source_demand == 0 ->
445452
{:noreply, %State{state | ghosted_client: from}}
@@ -457,29 +464,29 @@ defmodule Boombox.Server do
457464
{:reply, {:error, :incompatible_mode}, state}
458465
end
459466

460-
@spec handle_request(:finish_consuming, GenServer.from() | pid(), State.t()) ::
461-
{:reply, :ok | :finished | {:error, :incompatible_mode}, State.t()}
462-
| {:stop, term(), :finished, State.t()}
467+
@spec handle_request(:finish_consuming, GenServer.from() | Process.dest(), State.t()) ::
468+
{:reply, :ok | {:error, :incompatible_mode}, State.t()}
469+
| {:stop, term(), :ok, State.t()}
463470
defp handle_request(:finish_consuming, _from, %State{boombox_mode: :consuming} = state) do
464-
if state.pipeline_exit_reason != nil do
465-
{:stop, state.pipeline_exit_reason, :finished, state}
471+
if state.pipeline_termination_reason != nil do
472+
{:stop, state.pipeline_termination_reason, :ok, state}
466473
else
467474
send(state.membrane_source, {:boombox_eos, self()})
468-
{:reply, :ok, state}
475+
{:reply, :ok, %State{state | termination_requested: true}}
469476
end
470477
end
471478

472479
defp handle_request(:finish_consuming, _from, %State{boombox_mode: _other_mode} = state) do
473480
{:reply, {:error, :incompatible_mode}, state}
474481
end
475482

476-
@spec handle_request(:produce_packet, GenServer.from() | pid(), State.t()) ::
483+
@spec handle_request(:produce_packet, GenServer.from() | Process.dest(), State.t()) ::
477484
{:reply, {:error, :incompatible_mode | :boombox_not_running}, State.t()}
478485
| {:noreply, State.t()}
479486
| {:stop, term(), :finished, State.t()}
480487
defp handle_request(:produce_packet, from, %State{boombox_mode: :producing} = state) do
481-
if state.pipeline_exit_reason != nil do
482-
{:stop, state.pipeline_exit_reason, :finished, state}
488+
if state.pipeline_termination_reason != nil do
489+
{:stop, state.pipeline_termination_reason, :finished, state}
483490
else
484491
send(state.membrane_sink, {:boombox_demand, self()})
485492
{:noreply, %State{state | ghosted_client: from}}
@@ -490,30 +497,34 @@ defmodule Boombox.Server do
490497
{:reply, {:error, :incompatible_mode}, state}
491498
end
492499

493-
@spec handle_request(:finish_producing, GenServer.from() | pid(), State.t()) ::
500+
@spec handle_request(:finish_producing, GenServer.from() | Process.dest(), State.t()) ::
494501
{:reply, :ok | {:error, :incompatible_mode}, State.t()}
495502
defp handle_request(:finish_producing, _from, %State{boombox_mode: :producing} = state) do
496503
Membrane.Pipeline.terminate(state.pipeline, asynchronous?: true)
497-
{:reply, :ok, state}
504+
{:reply, :ok, %State{state | termination_requested: true}}
498505
end
499506

500507
defp handle_request(:finish_producing, _from, %State{boombox_mode: _other_mode} = state) do
501508
{:reply, {:error, :incompatible_mode}, state}
502509
end
503510

504-
@spec handle_request(term(), GenServer.from() | pid(), State.t()) ::
511+
@spec handle_request(term(), GenServer.from() | Process.dest(), State.t()) ::
505512
{:reply, {:error, :invalid_request}, State.t()}
506513
defp handle_request(_invalid_request, _from, state) do
507514
{:reply, {:error, :invalid_request}, state}
508515
end
509516

510-
@spec reply(GenServer.from() | pid(), term()) :: :ok
511-
defp reply(pid, reply_content) when is_pid(pid) do
512-
send(pid, {:response, reply_content})
517+
@spec reply(GenServer.from() | Process.dest(), term()) :: :ok
518+
defp reply(dest, reply_content) when is_pid(dest) or is_port(dest) or is_atom(dest) do
519+
send(dest, {:response, reply_content})
520+
end
521+
522+
defp reply({name, node} = dest, reply_content) when is_atom(name) and is_atom(node) do
523+
send(dest, {:response, reply_content})
513524
end
514525

515-
defp reply({pid, _tag} = client, reply_content) when is_pid(pid) do
516-
GenServer.reply(client, reply_content)
526+
defp reply({pid, _tag} = genserver_from, reply_content) when is_pid(pid) do
527+
GenServer.reply(genserver_from, reply_content)
517528
end
518529

519530
@spec get_boombox_mode(boombox_opts()) :: boombox_mode()

python/examples/anonymization_demo.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import queue
2727
import threading
2828
import time
29+
import logging
2930
from typing import NoReturn
3031

3132

@@ -66,6 +67,7 @@ def read_packets(boombox: Boombox, packet_queue: queue.Queue) -> None:
6667
packet_queue.put(packet)
6768
if packet_queue.qsize() > MAX_QUEUE_SIZE:
6869
packet_queue.get()
70+
print("dupsko")
6971

7072

7173
def resize_frame(frame: np.ndarray, scale_factor: float) -> np.ndarray:
@@ -267,6 +269,9 @@ def main():
267269
SERVER_ADDRESS = "localhost"
268270
SERVER_PORT = 8000
269271

272+
logging.basicConfig()
273+
Boombox.logger.setLevel(logging.INFO)
274+
270275
threading.Thread(
271276
target=run_server, args=(SERVER_ADDRESS, SERVER_PORT), daemon=True
272277
).start()
@@ -322,6 +327,7 @@ def main():
322327
args=(input_boombox, packet_queue),
323328
daemon=True,
324329
)
330+
print("c")
325331
reading_thread.start()
326332
print("Input boombox initialized.")
327333

@@ -363,7 +369,7 @@ def main():
363369
if should_anonymize:
364370
packet.payload = distort_audio(packet.payload, packet.sample_rate)
365371

366-
output_boombox.write(packet)
372+
print(output_boombox.write(packet))
367373

368374
if isinstance(packet, VideoPacket):
369375
video_read_end_time = time.time() * 1000
@@ -398,7 +404,7 @@ def main():
398404
render_transcription(transcription_lines, frame)
399405

400406
packet.payload = frame
401-
output_boombox.write(packet)
407+
print(output_boombox.write(packet))
402408
video_read_start_time = time.time() * 1000
403409

404410
output_boombox.close()

python/src/boombox/_vendor/pyrlang/node.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -288,7 +288,7 @@ def _send_local_registered(self, receiver, message) -> None:
288288
receiver, receiver_obj, message)
289289
receiver_obj.deliver_message(msg=message)
290290
else:
291-
LOG.warning("Send to unknown %s ignored", receiver)
291+
LOG.info("Send to unknown %s ignored", receiver)
292292

293293
def _send_local(self, receiver, message) -> None:
294294
""" Try find a process by pid and drop a message into its ``inbox_``.

python/src/boombox/_vendor/pyrlang/rex.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ def handle_cast(self, msg):
4242

4343
@info(1, lambda msg: True)
4444
def handle_info(self, msg):
45-
LOG.error("rex unhandled info msg: %s", msg)
45+
LOG.info("rex unhandled info msg: %s", msg)
4646

4747

4848
def act_on_msg(msg):

python/src/boombox/boombox.py

Lines changed: 37 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929

3030

3131
RELEASES_URL = "https://github.com/membraneframework/boombox/releases"
32-
PACKAGE_NAME = "boomboxlib"
32+
PACKAGE_NAME = "boomboxlibb"
3333

3434

3535
class Boombox(process.Process):
@@ -85,8 +85,15 @@ class Boombox(process.Process):
8585
Definition of an input or output of Boombox. Can be provided explicitly
8686
by an appropriate :py:class:`.BoomboxEndpoint` or a string of a path to
8787
a file or an URL, that Boombox will attempt to interpret as an endpoint.
88+
89+
Attributes
90+
----------
91+
logger : ClassVar[logging.Logger]
92+
Logger used in this class
8893
"""
8994

95+
logger: ClassVar[logging.Logger]
96+
9097
_python_node_name: ClassVar[str]
9198
_cookie: ClassVar[str]
9299

@@ -105,6 +112,7 @@ class Boombox(process.Process):
105112
_python_node_name = f"{uuid.uuid4()}@127.0.0.1"
106113
_cookie = str(uuid.uuid4())
107114
_node = node.Node(node_name=_python_node_name, cookie=_cookie)
115+
logger = logging.getLogger(__name__)
108116
threading.Thread(target=_node.run, daemon=True).start()
109117

110118
def __init__(
@@ -131,7 +139,9 @@ def __init__(
131139
self._terminated = self.get_node().get_loop().create_future()
132140
self._finished = False
133141
self._receiver = (self._erlang_node_name, Atom("boombox_server"))
142+
print("a")
134143
self._receiver = self._call(Atom("get_pid"))
144+
print("b")
135145
self.get_node().monitor_process(self.pid_, self._receiver)
136146

137147
boombox_arg = [
@@ -158,16 +168,22 @@ def read(self) -> Generator[AudioPacket | VideoPacket, None, None]:
158168
RuntimeError
159169
If Boombox's output was not defined by an :py:class:`.RawData` endpoint.
160170
"""
161-
while True:
162-
match self._call(Atom("produce_packet")):
163-
case (Atom("ok"), packet):
164-
yield self._deserialize_packet(packet)
165-
case Atom("finished"):
166-
return
167-
case (Atom("error"), Atom("incompatible_mode")):
168-
raise RuntimeError("Output not defined with an RawData endpoint.")
169-
case other:
170-
raise RuntimeError(f"Unknown response: {other}")
171+
try:
172+
while True:
173+
match self._call(Atom("produce_packet")):
174+
case (Atom("ok"), packet):
175+
yield self._deserialize_packet(packet)
176+
case Atom("finished"):
177+
return
178+
case (Atom("error"), Atom("incompatible_mode")):
179+
raise RuntimeError(
180+
"Output not defined with an RawData endpoint."
181+
)
182+
case other:
183+
raise RuntimeError(f"Unknown response: {other}")
184+
finally:
185+
# pass
186+
self.close()
171187

172188
def write(self, packet: AudioPacket | VideoPacket) -> bool:
173189
"""Write packets to Boombox.
@@ -187,7 +203,7 @@ def write(self, packet: AudioPacket | VideoPacket) -> bool:
187203
Returns
188204
-------
189205
finished : bool
190-
Informs if Boombox has finished accepting packets and closed its
206+
If true then Boombox has finished accepting packets and closed its
191207
input for any further ones. Once it finishes processing the
192208
previously provided packet, it will terminate.
193209
@@ -251,6 +267,7 @@ def close(self, wait: bool = True, kill: bool = False) -> None:
251267

252268
match self._call(request):
253269
case Atom("ok"):
270+
print("uuuu")
254271
if kill:
255272
self.kill()
256273
elif wait:
@@ -284,8 +301,12 @@ def handle_one_inbox_message(self, msg: Any) -> None:
284301
if not self._response.done():
285302
self._response.set_result(response)
286303
case (Atom("DOWN"), _, Atom("process"), _, Atom("normal")):
304+
print(self._boombox_mode)
305+
print("1")
287306
self._terminated.set_result(Atom("normal"))
288307
case (Atom("DOWN"), _, Atom("process"), _, reason):
308+
print(self._boombox_mode)
309+
print("2")
289310
self._terminated.set_result(reason)
290311
if not self._response.done():
291312
self._response.set_exception(
@@ -342,9 +363,9 @@ def update_to(self, b=1, bsize=1, tsize=None):
342363
self._server_release_path = os.path.join(self._data_dir, "bin", "server")
343364

344365
if os.path.exists(self._server_release_path):
345-
logging.info("Elixir boombox release already present.")
366+
self.logger.info("Elixir boombox release already present.")
346367
return
347-
logging.info("Elixir boombox release not found, downloading...")
368+
self.logger.info("Elixir boombox release not found, downloading...")
348369

349370
if self._version == "dev":
350371
release_url = os.path.join(RELEASES_URL, "latest/download")
@@ -369,13 +390,13 @@ def update_to(self, b=1, bsize=1, tsize=None):
369390
unit_scale=True,
370391
unit_divisor=1024,
371392
miniters=1,
372-
desc=f"Downloading {release_tarball}",
393+
desc=f"Downloading {release_tarball} from {release_url}",
373394
) as t:
374395
urllib.request.urlretrieve(
375396
download_url, filename=tarball_path, reporthook=t.update_to
376397
)
377398

378-
logging.info("Download complete. Extracting...")
399+
self.logger.info("Download complete. Extracting...")
379400
with tarfile.open(tarball_path) as tar:
380401
tar.extractall(self._data_dir)
381402
os.remove(tarball_path)

0 commit comments

Comments
 (0)