diff --git a/docs/arch.md b/docs/arch.md index a31ee32..b448823 100644 --- a/docs/arch.md +++ b/docs/arch.md @@ -332,12 +332,15 @@ Minimal control flow: 1. Legion creates or updates the stream through Sender management methods. 2. Legion creates a one-time stream key for OBS. 3. Legion upserts the local Tribes node as a media endpoint. -4. Legion calls `streams.start` with mode `origin_ingest`. +4. Legion calls `stream.start` with mode `origin_ingest`. 5. Sender creates the generation/rendition state and starts a local `MediaSession` through the configured media backend. -6. Viewers load Sender's web player and request playback metadata from +6. The creator connects OBS. If OBS disconnects, Sender keeps the generation + active and restarts the local media backend so OBS can reconnect to the same + stream. +7. Viewers load Sender's web player and request playback metadata from `/plugins-api/sender/...`. -7. Legion calls `streams.stop` when the event ends. +8. Legion calls `stream.stop` when the event ends. Standalone Sender web controls can later call the same service layer directly, but they are not part of the MVP dependency path. @@ -431,7 +434,16 @@ Media sessions are role-aware. The first local role is `origin_ingest`, where a Tribes node accepts the creator's RTMP input and produces HLS output. Future roles can cover pull-through edges, external-origin tracking, or fanout without renaming the admin API, which should use stream lifecycle methods such as -`streams.start`, `streams.stop`, and `streams.status`. +`stream.start`, `stream.stop`, and `stream.status`. + +The generation lifecycle is admin-owned: `stream.start` opens the stream window +and `stream.stop` closes it. A creator source disconnect or an ffmpeg process +exit should not end the generation by itself. The local `MediaSession` keeps the +generation active, reports `restarting` while the backend is down, and restarts +the backend with backoff so OBS can reconnect to the same stream whenever +possible. Initial backend startup failure is different: if no backend can be +started at all, `stream.start` fails and the generation is marked failed so it +does not block the next start attempt. ## Management API diff --git a/docs/progress.md b/docs/progress.md index b58501a..d715835 100644 --- a/docs/progress.md +++ b/docs/progress.md @@ -50,8 +50,10 @@ This file tracks the planned implementation from plugin template to clustered st - [x] Start a `StreamGeneration` when Legion starts a stream. - [x] Write HLS output to `spool_root/streams/:stream_id/:generation_id`. - [x] Create initial `Rendition` rows from configured output variants. +- [x] Keep a started generation active across source disconnects/backend exits until admin stop. - [ ] Mark generation `live` once the first playable manifest exists. -- [ ] Mark generation `ended` or `failed` on ffmpeg exit. +- [x] Mark generation `ended` on explicit admin stop. +- [x] Mark generation `failed` when startup cannot create an initial backend session. - [ ] Clean old live spool directories according to retention config. - [x] Add tests around backend command construction and lifecycle state changes. diff --git a/lib/sender/management/stream_lifecycle.ex b/lib/sender/management/stream_lifecycle.ex index 9441aa2..dbae18b 100644 --- a/lib/sender/management/stream_lifecycle.ex +++ b/lib/sender/management/stream_lifecycle.ex @@ -221,6 +221,8 @@ defmodule Sender.Management.StreamLifecycle do "role" => "origin_ingest", "status" => to_string(Map.get(health, :status, :unknown)), "pid" => inspect(pid), + "restart_count" => Map.get(health, :backend_restart_count, 0), + "last_backend_exit" => normalize_value(Map.get(health, :last_backend_exit)), "backend_health" => stringify_keys(Map.get(health, :backend_health, %{})) } end @@ -244,6 +246,10 @@ defmodule Sender.Management.StreamLifecycle do end defp normalize_value(%DateTime{} = datetime), do: DateTime.to_iso8601(datetime) + defp normalize_value(nil), do: nil defp normalize_value(value) when is_atom(value), do: Atom.to_string(value) + defp normalize_value(value) when is_tuple(value), do: inspect(value) + defp normalize_value(value) when is_list(value), do: Enum.map(value, &normalize_value/1) + defp normalize_value(value) when is_map(value), do: stringify_keys(value) defp normalize_value(value), do: value end diff --git a/lib/sender/media_session.ex b/lib/sender/media_session.ex index eec8f67..b772cd5 100644 --- a/lib/sender/media_session.ex +++ b/lib/sender/media_session.ex @@ -18,11 +18,16 @@ defmodule Sender.MediaSession do :backend, :backend_supervisor, :generation, + :backend_restart_delay_ms, + :backend_restart_max_delay_ms, :role, :session_supervisor, :supervisor ] + @default_backend_restart_delay_ms 1_000 + @default_backend_restart_max_delay_ms 30_000 + @type role :: :origin_ingest @spec child_spec(keyword()) :: Supervisor.child_spec() @@ -103,6 +108,14 @@ defmodule Sender.MediaSession do backend_opts: backend_opts(opts), backend_pid: nil, backend_ref: nil, + backend_restart_count: 0, + backend_restart_delay_ms: + Keyword.get(opts, :backend_restart_delay_ms, @default_backend_restart_delay_ms), + backend_restart_max_delay_ms: + Keyword.get(opts, :backend_restart_max_delay_ms, @default_backend_restart_max_delay_ms), + backend_restart_timer: nil, + backend_status: :starting, + last_backend_exit: nil, started_at: DateTime.utc_now() } @@ -124,6 +137,8 @@ defmodule Sender.MediaSession do generation_id: state.generation.id, backend: state.backend, backend_health: backend_health(state), + backend_restart_count: state.backend_restart_count, + last_backend_exit: state.last_backend_exit, started_at: state.started_at }, state} end @@ -133,16 +148,29 @@ defmodule Sender.MediaSession do {:DOWN, ref, :process, pid, :normal}, %{backend_ref: ref, backend_pid: pid} = state ) do - {:stop, :normal, %{state | backend_pid: nil, backend_ref: nil}} + {:noreply, schedule_backend_restart(:normal, state)} end def handle_info( {:DOWN, ref, :process, pid, reason}, %{backend_ref: ref, backend_pid: pid} = state ) do - {:stop, {:backend_down, reason}, %{state | backend_pid: nil, backend_ref: nil}} + {:noreply, schedule_backend_restart(reason, state)} end + def handle_info(:restart_backend, %{backend_restart_timer: timer} = state) + when is_reference(timer) do + case start_backend(%{state | backend_restart_timer: nil, backend_status: :starting}) do + {:ok, state} -> + {:noreply, state} + + {:error, reason} -> + {:noreply, schedule_backend_restart(reason, state)} + end + end + + def handle_info(:restart_backend, state), do: {:noreply, state} + @impl true def terminate(reason, state) do stop_backend(state) @@ -176,7 +204,13 @@ defmodule Sender.MediaSession do ref = Process.monitor(backend_pid) {:ok, - %{state | backend_pid: backend_pid, backend_ref: ref} + %{ + state + | backend_pid: backend_pid, + backend_ref: ref, + backend_status: :running, + last_backend_exit: nil + } |> emit_backend_started()} {:error, reason} -> @@ -184,6 +218,47 @@ defmodule Sender.MediaSession do end end + defp schedule_backend_restart(reason, state) do + cancel_restart_timer(state.backend_restart_timer) + + restart_count = state.backend_restart_count + 1 + delay_ms = restart_delay_ms(state, restart_count) + timer = Process.send_after(self(), :restart_backend, delay_ms) + + :telemetry.execute( + [:sender, :media_session, :backend_restart], + %{count: 1, delay_ms: delay_ms}, + %{ + role: state.role, + generation_id: state.generation.id, + reason: reason, + restart_count: restart_count + } + ) + + %{ + state + | backend_pid: nil, + backend_ref: nil, + backend_restart_count: restart_count, + backend_restart_timer: timer, + backend_status: :restarting, + last_backend_exit: reason + } + end + + defp cancel_restart_timer(timer) when is_reference(timer), do: Process.cancel_timer(timer) + defp cancel_restart_timer(_timer), do: false + + defp restart_delay_ms(state, restart_count) do + multiplier = :math.pow(2, max(restart_count - 1, 0)) + + state.backend_restart_delay_ms + |> Kernel.*(multiplier) + |> round() + |> min(state.backend_restart_max_delay_ms) + end + defp emit_backend_started(state) do :telemetry.execute( [:sender, :media_session, :backend_start], @@ -203,6 +278,7 @@ defmodule Sender.MediaSession do if Process.alive?(backend_pid), do: :running, else: :stopped end + defp status(%{backend_status: status}) when status in [:starting, :restarting], do: status defp status(_state), do: :stopped defp backend_health(%{backend_pid: backend_pid, backend: backend}) when is_pid(backend_pid) do @@ -215,7 +291,7 @@ defmodule Sender.MediaSession do defp stop_backend(%{backend_pid: backend_pid, backend: backend}) when is_pid(backend_pid) do if Process.alive?(backend_pid) do - backend.stop_ingest(backend_pid) + backend.stop_ingest(backend_pid, []) end :ok diff --git a/test/sender/media_session_test.exs b/test/sender/media_session_test.exs index 85f778f..03ac837 100644 --- a/test/sender/media_session_test.exs +++ b/test/sender/media_session_test.exs @@ -1,25 +1,85 @@ +defmodule Sender.MediaSessionTest.DisconnectingBackend do + @behaviour Sender.MediaBackend + + @impl true + def start_ingest(generation, opts) do + owner = Keyword.fetch!(opts, :owner) + exit_after_ms = Keyword.get(opts, :exit_after_ms, 10) + + send(owner, {:backend_started, generation.id}) + + pid = + spawn(fn -> + receive do + :stop -> :ok + after + exit_after_ms -> :ok + end + end) + + {:ok, pid} + end + + @impl true + def stop_ingest(pid, _opts) when is_pid(pid) do + send(pid, :stop) + :ok + end + + @impl true + def health(pid, _opts) when is_pid(pid) do + %{status: if(Process.alive?(pid), do: :running, else: :stopped)} + end +end + defmodule Sender.MediaSessionTest do use ExUnit.Case, async: true alias Sender.MediaSession alias Sender.Streaming.StreamGeneration - test "origin_ingest starts the configured backend for a generation" do - spool_root = - Path.join( - System.tmp_dir!(), - "sender-media-session-test-#{System.unique_integer([:positive])}" - ) + test "origin_ingest keeps the session alive across backend disconnects" do + generation = generation() + generation_id = generation.id assert {:ok, pid} = - MediaSession.start_origin_ingest(generation(), - executable: System.find_executable("true"), - spool_root: spool_root, - input_url: "rtmp://0.0.0.0:1935/live/test-key" + MediaSession.start_origin_ingest(generation, + backend: Sender.MediaSessionTest.DisconnectingBackend, + owner: self(), + exit_after_ms: 10, + backend_restart_delay_ms: 10, + backend_restart_max_delay_ms: 10 ) - ref = Process.monitor(pid) - assert_receive {:DOWN, ^ref, :process, ^pid, :normal}, 1_000 + assert_receive {:backend_started, ^generation_id}, 100 + assert_receive {:backend_started, ^generation_id}, 200 + + assert Process.alive?(pid) + assert :ok = MediaSession.stop(pid) + end + + test "origin_ingest reports reconnecting state while waiting for backend restart" do + generation = generation() + generation_id = generation.id + + assert {:ok, pid} = + MediaSession.start_origin_ingest(generation, + backend: Sender.MediaSessionTest.DisconnectingBackend, + owner: self(), + exit_after_ms: 10, + backend_restart_delay_ms: 1_000 + ) + + assert_receive {:backend_started, ^generation_id}, 100 + Process.sleep(50) + + health = MediaSession.health(pid) + + assert health.status == :restarting + assert health.backend_restart_count == 1 + assert health.last_backend_exit == :normal + + assert :ok = MediaSession.stop(pid) end defp generation do