Skip to content

Commit

Permalink
Merge pull request #418 from membraneframework/error-cleanup
Browse files Browse the repository at this point in the history
Error cleanup
  • Loading branch information
mat-hek authored May 13, 2022
2 parents 506898e + cc00f1f commit 2477fcb
Show file tree
Hide file tree
Showing 44 changed files with 1,187 additions and 1,406 deletions.
8 changes: 0 additions & 8 deletions lib/membrane/bin/action.ex
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,6 @@ defmodule Membrane.Bin.Action do
@type remove_child_t ::
{:remove_child, Child.name_t() | [Child.name_t()]}

@typedoc """
Action that sets `Logger` metadata for the bin and all its descendants.
Uses `Logger.metadata/1` underneath.
"""
@type log_metadata_t :: {:log_metadata, Keyword.t()}

@typedoc """
Starts a timer that will invoke `c:Membrane.Bin.handle_tick/3` callback
every `interval` according to the given `clock`.
Expand Down Expand Up @@ -93,7 +86,6 @@ defmodule Membrane.Bin.Action do
forward_t
| spec_t
| remove_child_t
| log_metadata_t
| start_timer_t
| timer_interval_t
| stop_timer_t
Expand Down
127 changes: 100 additions & 27 deletions lib/membrane/core/bin.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,10 @@ defmodule Membrane.Core.Bin do
use Bunch
use GenServer

import Membrane.Core.Helper.GenServer

alias __MODULE__.State
alias Membrane.{CallbackError, ComponentPath, Core, Sync}
alias Membrane.{ComponentPath, Sync}
alias Membrane.Core.Bin.PadController
alias Membrane.Core.{CallbackHandler, Message, Telemetry}
alias Membrane.Core.Child.PadSpecHandler
alias Membrane.Core.{CallbackHandler, Child, Message, Parent, Telemetry, TimerController}

require Membrane.Core.Message
require Membrane.Core.Telemetry
Expand Down Expand Up @@ -113,30 +110,24 @@ defmodule Membrane.Core.Bin do
},
children_log_metadata: log_metadata
}
|> PadSpecHandler.init_pads()

with {:ok, state} <-
CallbackHandler.exec_and_handle_callback(
:handle_init,
Membrane.Core.Bin.ActionHandler,
%{},
[options.user_options],
state
) do
{:ok, state}
else
{{:error, reason}, _state} ->
raise CallbackError, kind: :error, callback: {module, :handle_init}, reason: reason
|> Child.PadSpecHandler.init_pads()

{other, _state} ->
raise CallbackError, kind: :bad_return, callback: {module, :handle_init}, value: other
end
state =
CallbackHandler.exec_and_handle_callback(
:handle_init,
Membrane.Core.Bin.ActionHandler,
%{},
[options.user_options],
state
)

{:ok, state}
end

@impl GenServer
def handle_info(Message.new(:handle_unlink, pad_ref), state) do
PadController.handle_unlink(pad_ref, state)
|> noreply()
state = PadController.handle_unlink(pad_ref, state)
{:noreply, state}
end

@impl GenServer
Expand All @@ -147,9 +138,84 @@ defmodule Membrane.Core.Bin do
{:noreply, state}
end

@impl GenServer
def handle_info(
Message.new(:playback_state_changed, [pid, new_playback_state]),
state
) do
state = Parent.ChildLifeController.child_playback_changed(pid, new_playback_state, state)
{:noreply, state}
end

@impl GenServer
def handle_info(Message.new(:change_playback_state, new_state), state) do
state = Parent.LifecycleController.change_playback_state(new_state, state)
{:noreply, state}
end

@impl GenServer
def handle_info(Message.new(:stream_management_event, [element_name, pad_ref, event]), state) do
state =
Parent.LifecycleController.handle_stream_management_event(
event,
element_name,
pad_ref,
state
)

{:noreply, state}
end

@impl GenServer
def handle_info(Message.new(:notification, [from, notification]), state) do
state = Parent.LifecycleController.handle_notification(from, notification, state)
{:noreply, state}
end

@impl GenServer
def handle_info(Message.new(:timer_tick, timer_id), state) do
state = TimerController.handle_tick(timer_id, state)
{:noreply, state}
end

@impl GenServer
def handle_info(Message.new(:link_response, link_id), state) do
state = Parent.ChildLifeController.LinkHandler.handle_link_response(link_id, state)
{:noreply, state}
end

@impl GenServer
def handle_info(Message.new(:spec_linking_timeout, spec_ref), state) do
state = Parent.ChildLifeController.LinkHandler.handle_spec_timeout(spec_ref, state)
{:noreply, state}
end

@impl GenServer
def handle_info({:membrane_clock_ratio, clock, ratio}, state) do
state = TimerController.handle_clock_update(clock, ratio, state)
{:noreply, state}
end

@impl GenServer
def handle_info({:DOWN, _ref, :process, pid, reason} = message, state) do
cond do
is_child_pid?(pid, state) ->
state = Parent.ChildLifeController.handle_child_death(pid, reason, state)
{:noreply, state}

is_parent_pid?(pid, state) ->
{:stop, {:shutdown, :parent_crash}, state}

true ->
state = Parent.LifecycleController.handle_other(message, state)
{:noreply, state}
end
end

@impl GenServer
def handle_info(message, state) do
Core.Parent.MessageDispatcher.handle_message(message, state)
state = Parent.LifecycleController.handle_other(message, state)
{:noreply, state}
end

@impl GenServer
Expand All @@ -164,13 +230,20 @@ defmodule Membrane.Core.Bin do

@impl GenServer
def handle_call(Message.new(:get_clock), _from, state) do
reply({{:ok, state.synchronization.clock}, state})
{:reply, state.synchronization.clock, state}
end

@impl GenServer
def terminate(reason, state) do
Telemetry.report_terminate(:bin)

:ok = state.module.handle_shutdown(reason, state.internal_state)
end

defp is_parent_pid?(pid, state) do
state.parent_pid == pid
end

defp is_child_pid?(pid, state) do
Enum.any?(state.children, fn {_name, entry} -> entry.pid == pid end)
end
end
34 changes: 13 additions & 21 deletions lib/membrane/core/bin/action_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ defmodule Membrane.Core.Bin.ActionHandler do
@moduledoc false
use Membrane.Core.CallbackHandler

alias Membrane.{CallbackError, Notification, ParentSpec}
alias Membrane.{ActionError, ParentSpec}
alias Membrane.Core.Bin.State
alias Membrane.Core.{Message, Parent, TimerController}

Expand All @@ -11,13 +11,13 @@ defmodule Membrane.Core.Bin.ActionHandler do

@impl CallbackHandler
def handle_action({:forward, children_messages}, _cb, _params, state) do
Parent.ChildLifeController.handle_forward(Bunch.listify(children_messages), state)
:ok = children_messages |> Bunch.listify() |> Parent.ChildLifeController.handle_forward(state)
state
end

@impl CallbackHandler
def handle_action({:spec, spec = %ParentSpec{}}, _cb, _params, state) do
with {{:ok, _children}, state} <- Parent.ChildLifeController.handle_spec(spec, state),
do: {:ok, state}
Parent.ChildLifeController.handle_spec(spec, state)
end

@impl CallbackHandler
Expand All @@ -27,12 +27,14 @@ defmodule Membrane.Core.Bin.ActionHandler do

@impl CallbackHandler
def handle_action({:notify, notification}, _cb, _params, state) do
send_notification(notification, state)
end
%State{parent_pid: parent_pid, name: name} = state

@impl CallbackHandler
def handle_action({:log_metadata, metadata}, _cb, _params, state) do
Parent.LifecycleController.handle_log_metadata(metadata, state)
Membrane.Logger.debug_verbose(
"Sending notification #{inspect(notification)} (parent PID: #{inspect(parent_pid)})"
)

Message.send(parent_pid, :notification, [name, notification])
state
end

@impl CallbackHandler
Expand All @@ -58,17 +60,7 @@ defmodule Membrane.Core.Bin.ActionHandler do
end

@impl CallbackHandler
def handle_action(action, callback, _params, state) do
raise CallbackError, kind: :invalid_action, action: action, callback: {state.module, callback}
end

@spec send_notification(Notification.t(), State.t()) :: {:ok, State.t()}
defp send_notification(notification, %State{parent_pid: parent_pid, name: name} = state) do
Membrane.Logger.debug_verbose(
"Sending notification #{inspect(notification)} (parent PID: #{inspect(parent_pid)})"
)

Message.send(parent_pid, :notification, [name, notification])
{:ok, state}
def handle_action(action, _callback, _params, _state) do
raise ActionError, action: action, reason: {:unknown_action, Membrane.Bin.Action}
end
end
30 changes: 13 additions & 17 deletions lib/membrane/core/bin/pad_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Membrane.Core.Bin.PadController do
# Module handling linking and unlinking pads.

use Bunch
alias Bunch.Type

alias Membrane.Bin.CallbackContext
alias Membrane.{Core, LinkError, Pad}
alias Membrane.Core.Bin.{ActionHandler, State}
Expand Down Expand Up @@ -47,7 +47,7 @@ defmodule Membrane.Core.Bin.PadController do

state =
case PadModel.get_data(state, pad_ref) do
{:error, {:unknown_pad, pad_ref}} ->
{:error, :unknown_pad} ->
init_pad_data(pad_ref, info, state)

# This case is for pads that were instantiated before the external link request,
Expand All @@ -64,8 +64,7 @@ defmodule Membrane.Core.Bin.PadController do
end

state = PadModel.update_data!(state, pad_ref, &%{&1 | link_id: link_id, options: pad_options})
{:ok, state} = maybe_handle_pad_added(pad_ref, state)
state
maybe_handle_pad_added(pad_ref, state)
end

@doc """
Expand Down Expand Up @@ -146,7 +145,7 @@ defmodule Membrane.Core.Bin.PadController do
%{initiator: :parent}
| %{initiator: :sibling, other_info: PadModel.pad_info_t() | nil, link_metadata: map},
Core.Bin.State.t()
) :: Type.stateful_try_t(PadModel.pad_info_t(), Core.Bin.State.t())
) :: {Core.Element.PadController.link_call_reply_t(), Core.Bin.State.t()}
def handle_link(direction, endpoint, other_endpoint, params, state) do
pad_data = PadModel.get_data!(state, endpoint.pad_ref)
%{spec_ref: spec_ref, endpoint: child_endpoint} = pad_data
Expand Down Expand Up @@ -194,17 +193,15 @@ defmodule Membrane.Core.Bin.PadController do
@doc """
Handles situation where the pad has been unlinked (e.g. when connected element has been removed from the pipeline)
"""
@spec handle_unlink(Pad.ref_t(), Core.Bin.State.t()) :: Type.stateful_try_t(Core.Bin.State.t())
@spec handle_unlink(Pad.ref_t(), Core.Bin.State.t()) :: Core.Bin.State.t()
def handle_unlink(pad_ref, state) do
with {:ok, state} <- maybe_handle_pad_removed(pad_ref, state),
{:ok, endpoint} <- PadModel.get_data(state, pad_ref, :endpoint) do
Message.send(endpoint.pid, :handle_unlink, endpoint.pad_ref)
PadModel.delete_data(state, pad_ref)
end
state = maybe_handle_pad_removed(pad_ref, state)
endpoint = PadModel.get_data!(state, pad_ref, :endpoint)
Message.send(endpoint.pid, :handle_unlink, endpoint.pad_ref)
PadModel.delete_data!(state, pad_ref)
end

@spec maybe_handle_pad_added(Pad.ref_t(), Core.Bin.State.t()) ::
Type.stateful_try_t(Core.Bin.State.t())
@spec maybe_handle_pad_added(Pad.ref_t(), Core.Bin.State.t()) :: Core.Bin.State.t()
defp maybe_handle_pad_added(ref, state) do
%{options: pad_opts, direction: direction, availability: availability} =
PadModel.get_data!(state, ref)
Expand All @@ -220,12 +217,11 @@ defmodule Membrane.Core.Bin.PadController do
state
)
else
{:ok, state}
state
end
end

@spec maybe_handle_pad_removed(Pad.ref_t(), Core.Bin.State.t()) ::
Type.stateful_try_t(Core.Bin.State.t())
@spec maybe_handle_pad_removed(Pad.ref_t(), Core.Bin.State.t()) :: Core.Bin.State.t()
defp maybe_handle_pad_removed(ref, state) do
%{direction: direction, availability: availability} = PadModel.get_data!(state, ref)

Expand All @@ -240,7 +236,7 @@ defmodule Membrane.Core.Bin.PadController do
state
)
else
{:ok, state}
state
end
end

Expand Down
Loading

0 comments on commit 2477fcb

Please sign in to comment.