Skip to content

Commit ceea1ea

Browse files
committed
Add public function for closing boombox reader
1 parent 6f20232 commit ceea1ea

File tree

2 files changed

+43
-5
lines changed

2 files changed

+43
-5
lines changed

lib/boombox.ex

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -371,16 +371,24 @@ defmodule Boombox do
371371
end
372372

373373
@doc """
374-
Informs Boombox that it will not be provided any more packets with `write/2` and should terminate
375-
accordingly.
374+
Gracefully terminates Boombox when using `:reader` or `:writer` endpoints.
375+
376+
When using `:reader` endpoint on output informs Boombox that no more packets will be read
377+
from it with `read/1` and that it should terminate accordingly.
378+
379+
When using `:writer` endpoint on input informs Boombox that it will not be provided
380+
any more packets with `write/2` and should terminate accordingly.
376381
377-
Can be called only when using `:writer` endpoint on input.
378382
"""
379-
@spec close(Writer.t()) :: :finished | {:error, :incompatible_mode}
380-
def close(writer) do
383+
@spec close(Writer.t() | Reader.t()) :: :finished | {:error, :incompatible_mode}
384+
def close(%Writer{} = writer) do
381385
Boombox.Server.finish_consuming(writer.server_reference)
382386
end
383387

388+
def close(%Reader{} = reader) do
389+
Boombox.Server.finish_producing(reader.server_reference)
390+
end
391+
384392
@endpoint_opts [:input, :output]
385393
defp validate_opts!(stream, opts) do
386394
opts = opts |> Keyword.validate!(@endpoint_opts) |> Map.new()

lib/boombox/server.ex

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,17 @@ defmodule Boombox.Server do
175175
GenServer.call(server, :produce_packet)
176176
end
177177

178+
@doc """
179+
Informs Boombox that no more packets will be read and shouldn't be produced.
180+
Can be called only when Boombox is in `:producing` mode.
181+
"""
182+
@spec finish_producing(t()) ::
183+
{:finished, serialized_boombox_packet() | Boombox.Packet.t()}
184+
| {:error, :incompatible_mode}
185+
def finish_producing(server) do
186+
GenServer.call(server, :finish_producing)
187+
end
188+
178189
@impl true
179190
def init(opts) do
180191
{:ok,
@@ -390,6 +401,25 @@ defmodule Boombox.Server do
390401
{{:error, :incompatible_mode}, state}
391402
end
392403

404+
@spec handle_request(:finish_producing, State.t()) ::
405+
{{:finished, serialized_boombox_packet() | Boombox.Packet.t()}
406+
| {:error, :incompatible_mode}, State.t()}
407+
defp handle_request(
408+
:finish_producing,
409+
%State{boombox_mode: :producing, boombox_pid: boombox_pid} = state
410+
) do
411+
send(boombox_pid, :finish_producing)
412+
413+
receive do
414+
{:finished, packet, ^boombox_pid} ->
415+
{{:finished, packet}, state}
416+
end
417+
end
418+
419+
defp handle_request(:finish_producing, %State{boombox_mode: _other_mode} = state) do
420+
{{:error, :incompatible_mode}, state}
421+
end
422+
393423
@spec handle_request(term(), State.t()) :: {{:error, :invalid_request}, State.t()}
394424
defp handle_request(_invalid_request, state) do
395425
{{:error, :invalid_request}, state}

0 commit comments

Comments
 (0)