Skip to content

Commit

Permalink
Merge branch 'master' into prepare-release-0.9.0
Browse files Browse the repository at this point in the history
  • Loading branch information
bblaszkow06 authored Feb 15, 2022
2 parents 5015151 + 786d2fc commit 6b8e467
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 6 deletions.
14 changes: 11 additions & 3 deletions lib/membrane/core/element/demand_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -132,12 +132,20 @@ defmodule Membrane.Core.Element.DemandHandler do
PadModel.set_data!(state, pad_ref, :demand, demand - buf_size)
end

def handle_outgoing_buffers(_pad_ref, %{mode: :push, toilet: toilet} = data, buffers, state)
def handle_outgoing_buffers(pad_ref, %{mode: :push, toilet: toilet} = data, buffers, state)
when toilet != nil do
%{other_demand_unit: other_demand_unit} = data
buf_size = Buffer.Metric.from_unit(other_demand_unit).buffers_size(buffers)
Toilet.fill(toilet, buf_size)
state

case Toilet.fill(toilet, buf_size) do
:ok ->
state

:overflow ->
# if the toilet has overflowed, we remove it so it didn't overflow again
# and let the parent handle that situation by unlinking this output pad or crashing
PadModel.set_data!(state, pad_ref, :toilet, nil)
end
end

def handle_outgoing_buffers(_pad_ref, _pad_data, _buffers, state) do
Expand Down
11 changes: 8 additions & 3 deletions lib/membrane/core/element/toilet.ex
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,16 @@ defmodule Membrane.Core.Element.Toilet do
{__MODULE__, :atomics.new(1, []), capacity, responsible_process}
end

@spec fill(t, non_neg_integer) :: :ok
@spec fill(t, non_neg_integer) :: :ok | :overflow
def fill({__MODULE__, atomic, capacity, responsible_process}, amount) do
size = :atomics.add_get(atomic, 1, amount)
if size > capacity, do: overflow(size, capacity, responsible_process)
:ok

if size > capacity do
overflow(size, capacity, responsible_process)
:overflow
else
:ok
end
end

@spec drain(t, non_neg_integer) :: :ok
Expand Down

0 comments on commit 6b8e467

Please sign in to comment.