improve: Keep media sessions alive for reconnects

This commit is contained in:
2026-05-09 00:19:38 +02:00
parent b4a6ba55d1
commit 9e6024d196
5 changed files with 177 additions and 21 deletions
+16 -4
View File
@@ -332,12 +332,15 @@ Minimal control flow:
1. Legion creates or updates the stream through Sender management methods.
2. Legion creates a one-time stream key for OBS.
3. Legion upserts the local Tribes node as a media endpoint.
4. Legion calls `streams.start` with mode `origin_ingest`.
4. Legion calls `stream.start` with mode `origin_ingest`.
5. Sender creates the generation/rendition state and starts a local
`MediaSession` through the configured media backend.
6. Viewers load Sender's web player and request playback metadata from
6. The creator connects OBS. If OBS disconnects, Sender keeps the generation
active and restarts the local media backend so OBS can reconnect to the same
stream.
7. Viewers load Sender's web player and request playback metadata from
`/plugins-api/sender/...`.
7. Legion calls `streams.stop` when the event ends.
8. Legion calls `stream.stop` when the event ends.
Standalone Sender web controls can later call the same service layer directly,
but they are not part of the MVP dependency path.
@@ -431,7 +434,16 @@ Media sessions are role-aware. The first local role is `origin_ingest`, where a
Tribes node accepts the creator's RTMP input and produces HLS output. Future
roles can cover pull-through edges, external-origin tracking, or fanout without
renaming the admin API, which should use stream lifecycle methods such as
`streams.start`, `streams.stop`, and `streams.status`.
`stream.start`, `stream.stop`, and `stream.status`.
The generation lifecycle is admin-owned: `stream.start` opens the stream window
and `stream.stop` closes it. A creator source disconnect or an ffmpeg process
exit should not end the generation by itself. The local `MediaSession` keeps the
generation active, reports `restarting` while the backend is down, and restarts
the backend with backoff so OBS can reconnect to the same stream whenever
possible. Initial backend startup failure is different: if no backend can be
started at all, `stream.start` fails and the generation is marked failed so it
does not block the next start attempt.
## Management API
+3 -1
View File
@@ -50,8 +50,10 @@ This file tracks the planned implementation from plugin template to clustered st
- [x] Start a `StreamGeneration` when Legion starts a stream.
- [x] Write HLS output to `spool_root/streams/:stream_id/:generation_id`.
- [x] Create initial `Rendition` rows from configured output variants.
- [x] Keep a started generation active across source disconnects/backend exits until admin stop.
- [ ] Mark generation `live` once the first playable manifest exists.
- [ ] Mark generation `ended` or `failed` on ffmpeg exit.
- [x] Mark generation `ended` on explicit admin stop.
- [x] Mark generation `failed` when startup cannot create an initial backend session.
- [ ] Clean old live spool directories according to retention config.
- [x] Add tests around backend command construction and lifecycle state changes.
@@ -221,6 +221,8 @@ defmodule Sender.Management.StreamLifecycle do
"role" => "origin_ingest",
"status" => to_string(Map.get(health, :status, :unknown)),
"pid" => inspect(pid),
"restart_count" => Map.get(health, :backend_restart_count, 0),
"last_backend_exit" => normalize_value(Map.get(health, :last_backend_exit)),
"backend_health" => stringify_keys(Map.get(health, :backend_health, %{}))
}
end
@@ -244,6 +246,10 @@ defmodule Sender.Management.StreamLifecycle do
end
defp normalize_value(%DateTime{} = datetime), do: DateTime.to_iso8601(datetime)
defp normalize_value(nil), do: nil
defp normalize_value(value) when is_atom(value), do: Atom.to_string(value)
defp normalize_value(value) when is_tuple(value), do: inspect(value)
defp normalize_value(value) when is_list(value), do: Enum.map(value, &normalize_value/1)
defp normalize_value(value) when is_map(value), do: stringify_keys(value)
defp normalize_value(value), do: value
end
+80 -4
View File
@@ -18,11 +18,16 @@ defmodule Sender.MediaSession do
:backend,
:backend_supervisor,
:generation,
:backend_restart_delay_ms,
:backend_restart_max_delay_ms,
:role,
:session_supervisor,
:supervisor
]
@default_backend_restart_delay_ms 1_000
@default_backend_restart_max_delay_ms 30_000
@type role :: :origin_ingest
@spec child_spec(keyword()) :: Supervisor.child_spec()
@@ -103,6 +108,14 @@ defmodule Sender.MediaSession do
backend_opts: backend_opts(opts),
backend_pid: nil,
backend_ref: nil,
backend_restart_count: 0,
backend_restart_delay_ms:
Keyword.get(opts, :backend_restart_delay_ms, @default_backend_restart_delay_ms),
backend_restart_max_delay_ms:
Keyword.get(opts, :backend_restart_max_delay_ms, @default_backend_restart_max_delay_ms),
backend_restart_timer: nil,
backend_status: :starting,
last_backend_exit: nil,
started_at: DateTime.utc_now()
}
@@ -124,6 +137,8 @@ defmodule Sender.MediaSession do
generation_id: state.generation.id,
backend: state.backend,
backend_health: backend_health(state),
backend_restart_count: state.backend_restart_count,
last_backend_exit: state.last_backend_exit,
started_at: state.started_at
}, state}
end
@@ -133,16 +148,29 @@ defmodule Sender.MediaSession do
{:DOWN, ref, :process, pid, :normal},
%{backend_ref: ref, backend_pid: pid} = state
) do
{:stop, :normal, %{state | backend_pid: nil, backend_ref: nil}}
{:noreply, schedule_backend_restart(:normal, state)}
end
def handle_info(
{:DOWN, ref, :process, pid, reason},
%{backend_ref: ref, backend_pid: pid} = state
) do
{:stop, {:backend_down, reason}, %{state | backend_pid: nil, backend_ref: nil}}
{:noreply, schedule_backend_restart(reason, state)}
end
def handle_info(:restart_backend, %{backend_restart_timer: timer} = state)
when is_reference(timer) do
case start_backend(%{state | backend_restart_timer: nil, backend_status: :starting}) do
{:ok, state} ->
{:noreply, state}
{:error, reason} ->
{:noreply, schedule_backend_restart(reason, state)}
end
end
def handle_info(:restart_backend, state), do: {:noreply, state}
@impl true
def terminate(reason, state) do
stop_backend(state)
@@ -176,7 +204,13 @@ defmodule Sender.MediaSession do
ref = Process.monitor(backend_pid)
{:ok,
%{state | backend_pid: backend_pid, backend_ref: ref}
%{
state
| backend_pid: backend_pid,
backend_ref: ref,
backend_status: :running,
last_backend_exit: nil
}
|> emit_backend_started()}
{:error, reason} ->
@@ -184,6 +218,47 @@ defmodule Sender.MediaSession do
end
end
defp schedule_backend_restart(reason, state) do
cancel_restart_timer(state.backend_restart_timer)
restart_count = state.backend_restart_count + 1
delay_ms = restart_delay_ms(state, restart_count)
timer = Process.send_after(self(), :restart_backend, delay_ms)
:telemetry.execute(
[:sender, :media_session, :backend_restart],
%{count: 1, delay_ms: delay_ms},
%{
role: state.role,
generation_id: state.generation.id,
reason: reason,
restart_count: restart_count
}
)
%{
state
| backend_pid: nil,
backend_ref: nil,
backend_restart_count: restart_count,
backend_restart_timer: timer,
backend_status: :restarting,
last_backend_exit: reason
}
end
defp cancel_restart_timer(timer) when is_reference(timer), do: Process.cancel_timer(timer)
defp cancel_restart_timer(_timer), do: false
defp restart_delay_ms(state, restart_count) do
multiplier = :math.pow(2, max(restart_count - 1, 0))
state.backend_restart_delay_ms
|> Kernel.*(multiplier)
|> round()
|> min(state.backend_restart_max_delay_ms)
end
defp emit_backend_started(state) do
:telemetry.execute(
[:sender, :media_session, :backend_start],
@@ -203,6 +278,7 @@ defmodule Sender.MediaSession do
if Process.alive?(backend_pid), do: :running, else: :stopped
end
defp status(%{backend_status: status}) when status in [:starting, :restarting], do: status
defp status(_state), do: :stopped
defp backend_health(%{backend_pid: backend_pid, backend: backend}) when is_pid(backend_pid) do
@@ -215,7 +291,7 @@ defmodule Sender.MediaSession do
defp stop_backend(%{backend_pid: backend_pid, backend: backend}) when is_pid(backend_pid) do
if Process.alive?(backend_pid) do
backend.stop_ingest(backend_pid)
backend.stop_ingest(backend_pid, [])
end
:ok
+72 -12
View File
@@ -1,25 +1,85 @@
defmodule Sender.MediaSessionTest.DisconnectingBackend do
@behaviour Sender.MediaBackend
@impl true
def start_ingest(generation, opts) do
owner = Keyword.fetch!(opts, :owner)
exit_after_ms = Keyword.get(opts, :exit_after_ms, 10)
send(owner, {:backend_started, generation.id})
pid =
spawn(fn ->
receive do
:stop -> :ok
after
exit_after_ms -> :ok
end
end)
{:ok, pid}
end
@impl true
def stop_ingest(pid, _opts) when is_pid(pid) do
send(pid, :stop)
:ok
end
@impl true
def health(pid, _opts) when is_pid(pid) do
%{status: if(Process.alive?(pid), do: :running, else: :stopped)}
end
end
defmodule Sender.MediaSessionTest do
use ExUnit.Case, async: true
alias Sender.MediaSession
alias Sender.Streaming.StreamGeneration
test "origin_ingest starts the configured backend for a generation" do
spool_root =
Path.join(
System.tmp_dir!(),
"sender-media-session-test-#{System.unique_integer([:positive])}"
)
test "origin_ingest keeps the session alive across backend disconnects" do
generation = generation()
generation_id = generation.id
assert {:ok, pid} =
MediaSession.start_origin_ingest(generation(),
executable: System.find_executable("true"),
spool_root: spool_root,
input_url: "rtmp://0.0.0.0:1935/live/test-key"
MediaSession.start_origin_ingest(generation,
backend: Sender.MediaSessionTest.DisconnectingBackend,
owner: self(),
exit_after_ms: 10,
backend_restart_delay_ms: 10,
backend_restart_max_delay_ms: 10
)
ref = Process.monitor(pid)
assert_receive {:DOWN, ^ref, :process, ^pid, :normal}, 1_000
assert_receive {:backend_started, ^generation_id}, 100
assert_receive {:backend_started, ^generation_id}, 200
assert Process.alive?(pid)
assert :ok = MediaSession.stop(pid)
end
test "origin_ingest reports reconnecting state while waiting for backend restart" do
generation = generation()
generation_id = generation.id
assert {:ok, pid} =
MediaSession.start_origin_ingest(generation,
backend: Sender.MediaSessionTest.DisconnectingBackend,
owner: self(),
exit_after_ms: 10,
backend_restart_delay_ms: 1_000
)
assert_receive {:backend_started, ^generation_id}, 100
Process.sleep(50)
health = MediaSession.health(pid)
assert health.status == :restarting
assert health.backend_restart_count == 1
assert health.last_backend_exit == :normal
assert :ok = MediaSession.stop(pid)
end
defp generation do