Files
self f8e2bfaada refactor: use chat capability surfaces
Move Sender modules under TribeOne.TribesPlugin.Sender and replace the Aether-specific chat integration with the public chat@1 surface contract.
2026-05-26 01:13:38 +02:00

325 lines
8.9 KiB
Elixir

defmodule TribeOne.TribesPlugin.Sender.MediaSession do
@moduledoc """
Owns one local node's runtime participation in a stream generation.
A media session is role-aware. The first supported role is `:origin_ingest`,
where this node accepts the creator's RTMP input and produces local HLS output.
Later roles can cover pull-through edge and external-origin tracking without
changing the backend boundary.
"""
use GenServer
alias TribeOne.TribesPlugin.Sender.MediaBackend.{FFmpeg, HLSPullThrough}
alias TribeOne.TribesPlugin.Sender.Streaming.StreamGeneration
@roles [:origin_ingest, :hls_edge]
@session_keys [
:backend,
:backend_supervisor,
:generation,
:backend_restart_delay_ms,
:backend_restart_max_delay_ms,
:role,
:session_supervisor,
:supervisor
]
@default_backend_restart_delay_ms 1_000
@default_backend_restart_max_delay_ms 30_000
@type role :: :origin_ingest | :hls_edge
@spec child_spec(keyword()) :: Supervisor.child_spec()
def child_spec(opts) do
generation = Keyword.fetch!(opts, :generation)
role = Keyword.fetch!(opts, :role)
%{
id: {__MODULE__, role, generation.id},
start: {__MODULE__, :start_link, [opts]},
restart: :transient,
type: :worker
}
end
@spec start_origin_ingest(StreamGeneration.t(), keyword()) ::
DynamicSupervisor.on_start_child()
def start_origin_ingest(%StreamGeneration{} = generation, opts \\ []) do
start_role(:origin_ingest, generation, opts)
end
@spec start_hls_edge(StreamGeneration.t(), keyword()) :: DynamicSupervisor.on_start_child()
def start_hls_edge(%StreamGeneration{} = generation, opts \\ []) do
start_role(:hls_edge, generation, opts)
end
@spec start_role(role(), StreamGeneration.t(), keyword()) ::
DynamicSupervisor.on_start_child()
def start_role(role, %StreamGeneration{} = generation, opts)
when role in @roles and is_list(opts) do
supervisor =
Keyword.get(opts, :session_supervisor, TribeOne.TribesPlugin.Sender.MediaSessionSupervisor)
DynamicSupervisor.start_child(
supervisor,
{__MODULE__, Keyword.merge(opts, role: role, generation: generation)}
)
end
@spec stop(pid()) :: :ok
def stop(pid) when is_pid(pid), do: GenServer.stop(pid, :normal)
@spec stop_generation(String.t()) :: :ok | {:error, :not_found}
def stop_generation(generation_id) when is_binary(generation_id) do
stopped =
@roles
|> Enum.map(&whereis(&1, generation_id))
|> Enum.reject(&is_nil/1)
case stopped do
[] ->
{:error, :not_found}
pids ->
Enum.each(pids, &stop/1)
:ok
end
end
@spec whereis(role(), String.t()) :: pid() | nil
def whereis(role, generation_id) when role in @roles and is_binary(generation_id) do
case Registry.lookup(TribeOne.TribesPlugin.Sender.MediaSessionRegistry, {role, generation_id}) do
[{pid, _value}] -> pid
[] -> nil
end
end
@spec health(pid()) :: map()
def health(pid) when is_pid(pid) do
GenServer.call(pid, :health)
catch
:exit, _reason -> %{status: :stopped}
end
def start_link(opts) do
generation = Keyword.fetch!(opts, :generation)
role = Keyword.fetch!(opts, :role)
GenServer.start_link(__MODULE__, opts,
name:
{:via, Registry,
{TribeOne.TribesPlugin.Sender.MediaSessionRegistry, {role, generation.id}}}
)
end
@impl true
def init(opts) do
generation = Keyword.fetch!(opts, :generation)
role = Keyword.fetch!(opts, :role)
state = %{
role: role,
generation: generation,
backend: Keyword.get(opts, :backend, default_backend(role)),
backend_opts: backend_opts(opts),
backend_pid: nil,
backend_ref: nil,
backend_restart_count: 0,
backend_restart_delay_ms:
Keyword.get(opts, :backend_restart_delay_ms, @default_backend_restart_delay_ms),
backend_restart_max_delay_ms:
Keyword.get(opts, :backend_restart_max_delay_ms, @default_backend_restart_max_delay_ms),
backend_restart_timer: nil,
backend_status: :starting,
last_backend_exit: nil,
started_at: DateTime.utc_now()
}
case start_backend(state) do
{:ok, state} ->
{:ok, state}
{:error, reason} ->
{:stop, reason}
end
end
@impl true
def handle_call(:health, _from, state) do
{:reply,
%{
status: status(state),
role: state.role,
generation_id: state.generation.id,
backend: state.backend,
backend_health: backend_health(state),
backend_restart_count: state.backend_restart_count,
last_backend_exit: state.last_backend_exit,
started_at: state.started_at
}, state}
end
@impl true
def handle_info(
{:DOWN, ref, :process, pid, :normal},
%{backend_ref: ref, backend_pid: pid} = state
) do
{:noreply, schedule_backend_restart(:normal, state)}
end
def handle_info(
{:DOWN, ref, :process, pid, reason},
%{backend_ref: ref, backend_pid: pid} = state
) do
{:noreply, schedule_backend_restart(reason, state)}
end
def handle_info(:restart_backend, %{backend_restart_timer: timer} = state)
when is_reference(timer) do
case start_backend(%{state | backend_restart_timer: nil, backend_status: :starting}) do
{:ok, state} ->
{:noreply, state}
{:error, reason} ->
{:noreply, schedule_backend_restart(reason, state)}
end
end
def handle_info(:restart_backend, state), do: {:noreply, state}
@impl true
def terminate(reason, state) do
stop_backend(state)
:telemetry.execute(
[:sender, :media_session, :stop],
%{count: 1},
%{role: state.role, generation_id: state.generation.id, reason: reason}
)
:ok
end
defp backend_opts(opts) do
backend_supervisor =
Keyword.get(opts, :backend_supervisor, TribeOne.TribesPlugin.Sender.MediaBackendSupervisor)
opts
|> Keyword.drop(@session_keys)
|> Keyword.put_new(:supervisor, backend_supervisor)
end
defp default_backend(:origin_ingest), do: FFmpeg
defp default_backend(:hls_edge), do: HLSPullThrough
defp start_backend(state) do
:telemetry.execute(
[:sender, :media_session, :start],
%{count: 1},
%{role: state.role, generation_id: state.generation.id}
)
case state.backend.start_ingest(state.generation, state.backend_opts) do
{:ok, backend_pid} ->
ref = Process.monitor(backend_pid)
{:ok,
%{
state
| backend_pid: backend_pid,
backend_ref: ref,
backend_status: :running,
last_backend_exit: nil
}
|> emit_backend_started()}
{:error, reason} ->
{:error, reason}
end
end
defp schedule_backend_restart(reason, state) do
cancel_restart_timer(state.backend_restart_timer)
restart_count = state.backend_restart_count + 1
delay_ms = restart_delay_ms(state, restart_count)
timer = Process.send_after(self(), :restart_backend, delay_ms)
:telemetry.execute(
[:sender, :media_session, :backend_restart],
%{count: 1, delay_ms: delay_ms},
%{
role: state.role,
generation_id: state.generation.id,
reason: reason,
restart_count: restart_count
}
)
%{
state
| backend_pid: nil,
backend_ref: nil,
backend_restart_count: restart_count,
backend_restart_timer: timer,
backend_status: :restarting,
last_backend_exit: reason
}
end
defp cancel_restart_timer(timer) when is_reference(timer), do: Process.cancel_timer(timer)
defp cancel_restart_timer(_timer), do: false
defp restart_delay_ms(state, restart_count) do
multiplier = :math.pow(2, max(restart_count - 1, 0))
state.backend_restart_delay_ms
|> Kernel.*(multiplier)
|> round()
|> min(state.backend_restart_max_delay_ms)
end
defp emit_backend_started(state) do
:telemetry.execute(
[:sender, :media_session, :backend_start],
%{count: 1},
%{
role: state.role,
generation_id: state.generation.id,
backend: state.backend,
backend_pid: state.backend_pid
}
)
state
end
defp status(%{backend_pid: backend_pid}) when is_pid(backend_pid) do
if Process.alive?(backend_pid), do: :running, else: :stopped
end
defp status(%{backend_status: status}) when status in [:starting, :restarting], do: status
defp status(_state), do: :stopped
defp backend_health(%{backend_pid: backend_pid, backend: backend}) when is_pid(backend_pid) do
backend.health(backend_pid)
catch
:exit, _reason -> %{status: :stopped}
end
defp backend_health(_state), do: %{status: :stopped}
defp stop_backend(%{backend_pid: backend_pid, backend: backend}) when is_pid(backend_pid) do
if Process.alive?(backend_pid) do
backend.stop_ingest(backend_pid, [])
end
:ok
catch
:exit, _reason -> :ok
end
defp stop_backend(_state), do: :ok
end