You've already forked tribes-plugin-sender
forked from tribes/tribes-plugin-template
f8e2bfaada
Move Sender modules under TribeOne.TribesPlugin.Sender and replace the Aether-specific chat integration with the public chat@1 surface contract.
325 lines
8.9 KiB
Elixir
325 lines
8.9 KiB
Elixir
defmodule TribeOne.TribesPlugin.Sender.MediaSession do
|
|
@moduledoc """
|
|
Owns one local node's runtime participation in a stream generation.
|
|
|
|
A media session is role-aware. The first supported role is `:origin_ingest`,
|
|
where this node accepts the creator's RTMP input and produces local HLS output.
|
|
Later roles can cover pull-through edge and external-origin tracking without
|
|
changing the backend boundary.
|
|
"""
|
|
|
|
use GenServer
|
|
|
|
alias TribeOne.TribesPlugin.Sender.MediaBackend.{FFmpeg, HLSPullThrough}
|
|
alias TribeOne.TribesPlugin.Sender.Streaming.StreamGeneration
|
|
|
|
@roles [:origin_ingest, :hls_edge]
|
|
@session_keys [
|
|
:backend,
|
|
:backend_supervisor,
|
|
:generation,
|
|
:backend_restart_delay_ms,
|
|
:backend_restart_max_delay_ms,
|
|
:role,
|
|
:session_supervisor,
|
|
:supervisor
|
|
]
|
|
|
|
@default_backend_restart_delay_ms 1_000
|
|
@default_backend_restart_max_delay_ms 30_000
|
|
|
|
@type role :: :origin_ingest | :hls_edge
|
|
|
|
@spec child_spec(keyword()) :: Supervisor.child_spec()
|
|
def child_spec(opts) do
|
|
generation = Keyword.fetch!(opts, :generation)
|
|
role = Keyword.fetch!(opts, :role)
|
|
|
|
%{
|
|
id: {__MODULE__, role, generation.id},
|
|
start: {__MODULE__, :start_link, [opts]},
|
|
restart: :transient,
|
|
type: :worker
|
|
}
|
|
end
|
|
|
|
@spec start_origin_ingest(StreamGeneration.t(), keyword()) ::
|
|
DynamicSupervisor.on_start_child()
|
|
def start_origin_ingest(%StreamGeneration{} = generation, opts \\ []) do
|
|
start_role(:origin_ingest, generation, opts)
|
|
end
|
|
|
|
@spec start_hls_edge(StreamGeneration.t(), keyword()) :: DynamicSupervisor.on_start_child()
|
|
def start_hls_edge(%StreamGeneration{} = generation, opts \\ []) do
|
|
start_role(:hls_edge, generation, opts)
|
|
end
|
|
|
|
@spec start_role(role(), StreamGeneration.t(), keyword()) ::
|
|
DynamicSupervisor.on_start_child()
|
|
def start_role(role, %StreamGeneration{} = generation, opts)
|
|
when role in @roles and is_list(opts) do
|
|
supervisor =
|
|
Keyword.get(opts, :session_supervisor, TribeOne.TribesPlugin.Sender.MediaSessionSupervisor)
|
|
|
|
DynamicSupervisor.start_child(
|
|
supervisor,
|
|
{__MODULE__, Keyword.merge(opts, role: role, generation: generation)}
|
|
)
|
|
end
|
|
|
|
@spec stop(pid()) :: :ok
|
|
def stop(pid) when is_pid(pid), do: GenServer.stop(pid, :normal)
|
|
|
|
@spec stop_generation(String.t()) :: :ok | {:error, :not_found}
|
|
def stop_generation(generation_id) when is_binary(generation_id) do
|
|
stopped =
|
|
@roles
|
|
|> Enum.map(&whereis(&1, generation_id))
|
|
|> Enum.reject(&is_nil/1)
|
|
|
|
case stopped do
|
|
[] ->
|
|
{:error, :not_found}
|
|
|
|
pids ->
|
|
Enum.each(pids, &stop/1)
|
|
:ok
|
|
end
|
|
end
|
|
|
|
@spec whereis(role(), String.t()) :: pid() | nil
|
|
def whereis(role, generation_id) when role in @roles and is_binary(generation_id) do
|
|
case Registry.lookup(TribeOne.TribesPlugin.Sender.MediaSessionRegistry, {role, generation_id}) do
|
|
[{pid, _value}] -> pid
|
|
[] -> nil
|
|
end
|
|
end
|
|
|
|
@spec health(pid()) :: map()
|
|
def health(pid) when is_pid(pid) do
|
|
GenServer.call(pid, :health)
|
|
catch
|
|
:exit, _reason -> %{status: :stopped}
|
|
end
|
|
|
|
def start_link(opts) do
|
|
generation = Keyword.fetch!(opts, :generation)
|
|
role = Keyword.fetch!(opts, :role)
|
|
|
|
GenServer.start_link(__MODULE__, opts,
|
|
name:
|
|
{:via, Registry,
|
|
{TribeOne.TribesPlugin.Sender.MediaSessionRegistry, {role, generation.id}}}
|
|
)
|
|
end
|
|
|
|
@impl true
|
|
def init(opts) do
|
|
generation = Keyword.fetch!(opts, :generation)
|
|
role = Keyword.fetch!(opts, :role)
|
|
|
|
state = %{
|
|
role: role,
|
|
generation: generation,
|
|
backend: Keyword.get(opts, :backend, default_backend(role)),
|
|
backend_opts: backend_opts(opts),
|
|
backend_pid: nil,
|
|
backend_ref: nil,
|
|
backend_restart_count: 0,
|
|
backend_restart_delay_ms:
|
|
Keyword.get(opts, :backend_restart_delay_ms, @default_backend_restart_delay_ms),
|
|
backend_restart_max_delay_ms:
|
|
Keyword.get(opts, :backend_restart_max_delay_ms, @default_backend_restart_max_delay_ms),
|
|
backend_restart_timer: nil,
|
|
backend_status: :starting,
|
|
last_backend_exit: nil,
|
|
started_at: DateTime.utc_now()
|
|
}
|
|
|
|
case start_backend(state) do
|
|
{:ok, state} ->
|
|
{:ok, state}
|
|
|
|
{:error, reason} ->
|
|
{:stop, reason}
|
|
end
|
|
end
|
|
|
|
@impl true
|
|
def handle_call(:health, _from, state) do
|
|
{:reply,
|
|
%{
|
|
status: status(state),
|
|
role: state.role,
|
|
generation_id: state.generation.id,
|
|
backend: state.backend,
|
|
backend_health: backend_health(state),
|
|
backend_restart_count: state.backend_restart_count,
|
|
last_backend_exit: state.last_backend_exit,
|
|
started_at: state.started_at
|
|
}, state}
|
|
end
|
|
|
|
@impl true
|
|
def handle_info(
|
|
{:DOWN, ref, :process, pid, :normal},
|
|
%{backend_ref: ref, backend_pid: pid} = state
|
|
) do
|
|
{:noreply, schedule_backend_restart(:normal, state)}
|
|
end
|
|
|
|
def handle_info(
|
|
{:DOWN, ref, :process, pid, reason},
|
|
%{backend_ref: ref, backend_pid: pid} = state
|
|
) do
|
|
{:noreply, schedule_backend_restart(reason, state)}
|
|
end
|
|
|
|
def handle_info(:restart_backend, %{backend_restart_timer: timer} = state)
|
|
when is_reference(timer) do
|
|
case start_backend(%{state | backend_restart_timer: nil, backend_status: :starting}) do
|
|
{:ok, state} ->
|
|
{:noreply, state}
|
|
|
|
{:error, reason} ->
|
|
{:noreply, schedule_backend_restart(reason, state)}
|
|
end
|
|
end
|
|
|
|
def handle_info(:restart_backend, state), do: {:noreply, state}
|
|
|
|
@impl true
|
|
def terminate(reason, state) do
|
|
stop_backend(state)
|
|
|
|
:telemetry.execute(
|
|
[:sender, :media_session, :stop],
|
|
%{count: 1},
|
|
%{role: state.role, generation_id: state.generation.id, reason: reason}
|
|
)
|
|
|
|
:ok
|
|
end
|
|
|
|
defp backend_opts(opts) do
|
|
backend_supervisor =
|
|
Keyword.get(opts, :backend_supervisor, TribeOne.TribesPlugin.Sender.MediaBackendSupervisor)
|
|
|
|
opts
|
|
|> Keyword.drop(@session_keys)
|
|
|> Keyword.put_new(:supervisor, backend_supervisor)
|
|
end
|
|
|
|
defp default_backend(:origin_ingest), do: FFmpeg
|
|
defp default_backend(:hls_edge), do: HLSPullThrough
|
|
|
|
defp start_backend(state) do
|
|
:telemetry.execute(
|
|
[:sender, :media_session, :start],
|
|
%{count: 1},
|
|
%{role: state.role, generation_id: state.generation.id}
|
|
)
|
|
|
|
case state.backend.start_ingest(state.generation, state.backend_opts) do
|
|
{:ok, backend_pid} ->
|
|
ref = Process.monitor(backend_pid)
|
|
|
|
{:ok,
|
|
%{
|
|
state
|
|
| backend_pid: backend_pid,
|
|
backend_ref: ref,
|
|
backend_status: :running,
|
|
last_backend_exit: nil
|
|
}
|
|
|> emit_backend_started()}
|
|
|
|
{:error, reason} ->
|
|
{:error, reason}
|
|
end
|
|
end
|
|
|
|
defp schedule_backend_restart(reason, state) do
|
|
cancel_restart_timer(state.backend_restart_timer)
|
|
|
|
restart_count = state.backend_restart_count + 1
|
|
delay_ms = restart_delay_ms(state, restart_count)
|
|
timer = Process.send_after(self(), :restart_backend, delay_ms)
|
|
|
|
:telemetry.execute(
|
|
[:sender, :media_session, :backend_restart],
|
|
%{count: 1, delay_ms: delay_ms},
|
|
%{
|
|
role: state.role,
|
|
generation_id: state.generation.id,
|
|
reason: reason,
|
|
restart_count: restart_count
|
|
}
|
|
)
|
|
|
|
%{
|
|
state
|
|
| backend_pid: nil,
|
|
backend_ref: nil,
|
|
backend_restart_count: restart_count,
|
|
backend_restart_timer: timer,
|
|
backend_status: :restarting,
|
|
last_backend_exit: reason
|
|
}
|
|
end
|
|
|
|
defp cancel_restart_timer(timer) when is_reference(timer), do: Process.cancel_timer(timer)
|
|
defp cancel_restart_timer(_timer), do: false
|
|
|
|
defp restart_delay_ms(state, restart_count) do
|
|
multiplier = :math.pow(2, max(restart_count - 1, 0))
|
|
|
|
state.backend_restart_delay_ms
|
|
|> Kernel.*(multiplier)
|
|
|> round()
|
|
|> min(state.backend_restart_max_delay_ms)
|
|
end
|
|
|
|
defp emit_backend_started(state) do
|
|
:telemetry.execute(
|
|
[:sender, :media_session, :backend_start],
|
|
%{count: 1},
|
|
%{
|
|
role: state.role,
|
|
generation_id: state.generation.id,
|
|
backend: state.backend,
|
|
backend_pid: state.backend_pid
|
|
}
|
|
)
|
|
|
|
state
|
|
end
|
|
|
|
defp status(%{backend_pid: backend_pid}) when is_pid(backend_pid) do
|
|
if Process.alive?(backend_pid), do: :running, else: :stopped
|
|
end
|
|
|
|
defp status(%{backend_status: status}) when status in [:starting, :restarting], do: status
|
|
defp status(_state), do: :stopped
|
|
|
|
defp backend_health(%{backend_pid: backend_pid, backend: backend}) when is_pid(backend_pid) do
|
|
backend.health(backend_pid)
|
|
catch
|
|
:exit, _reason -> %{status: :stopped}
|
|
end
|
|
|
|
defp backend_health(_state), do: %{status: :stopped}
|
|
|
|
defp stop_backend(%{backend_pid: backend_pid, backend: backend}) when is_pid(backend_pid) do
|
|
if Process.alive?(backend_pid) do
|
|
backend.stop_ingest(backend_pid, [])
|
|
end
|
|
|
|
:ok
|
|
catch
|
|
:exit, _reason -> :ok
|
|
end
|
|
|
|
defp stop_backend(_state), do: :ok
|
|
end
|