You've already forked tribes-plugin-sender
forked from tribes/tribes-plugin-template
fix: make HLS edge start idempotent
Treat an already-started HLS edge media session as a successful stream.start response so the edge reconciler can race safely with explicit cluster starts.
This commit is contained in:
@@ -299,7 +299,10 @@ defmodule TribeOne.TribesPlugin.Sender.Management.StreamLifecycle do
|
||||
Map.get(params, "backend_restart_max_delay_ms")
|
||||
)
|
||||
|
||||
MediaSession.start_hls_edge(generation, opts)
|
||||
case MediaSession.start_hls_edge(generation, opts) do
|
||||
{:error, {:already_started, pid}} when is_pid(pid) -> {:ok, pid}
|
||||
result -> result
|
||||
end
|
||||
end
|
||||
|
||||
defp generation(%{"generation_id" => generation_id}, ash_opts) when is_binary(generation_id) do
|
||||
|
||||
@@ -450,6 +450,69 @@ defmodule TribeOne.TribesPlugin.Sender.ManagementTest do
|
||||
assert edge["source_endpoint_id"] == source["id"]
|
||||
end
|
||||
|
||||
test "stream lifecycle treats an already reconciled HLS edge session as running" do
|
||||
node_pubkey = String.duplicate("d", 64)
|
||||
assert {:ok, _node} = cluster_node(node_pubkey)
|
||||
|
||||
assert {:ok, %{"endpoint" => source}} =
|
||||
Topology.upsert_endpoint(@context, %{
|
||||
"type" => "external_origin",
|
||||
"display_name" => "Mux",
|
||||
"hls_base_url" => "https://origin.example/sender/hls"
|
||||
})
|
||||
|
||||
assert {:ok, %{"endpoint" => edge}} =
|
||||
Topology.upsert_endpoint(@context, %{
|
||||
"type" => "tribes_edge",
|
||||
"node_pubkey" => node_pubkey,
|
||||
"source_endpoint_id" => source["id"],
|
||||
"display_name" => "Local edge",
|
||||
"metadata" => %{"source_tls_verify" => false}
|
||||
})
|
||||
|
||||
assert {:ok, stream} =
|
||||
Streaming.create_stream(
|
||||
%{slug: "edge-idempotent-test", title: "Edge Idempotent Test"},
|
||||
Management.ash_opts(@context)
|
||||
)
|
||||
|
||||
assert {:ok, generation} =
|
||||
Streaming.create_stream_generation(
|
||||
%{
|
||||
stream_id: stream.id,
|
||||
status: :live,
|
||||
started_at: DateTime.utc_now() |> DateTime.truncate(:second),
|
||||
primary_origin_endpoint_id: source["id"]
|
||||
},
|
||||
Management.ash_opts(@context)
|
||||
)
|
||||
|
||||
assert {:ok, %{started: 1, skipped: 0, errors: []}} =
|
||||
EdgeSessionReconciler.reconcile_once(
|
||||
ash_opts: Management.ash_opts(@context),
|
||||
node_pubkey_fun: fn -> {:ok, node_pubkey} end,
|
||||
backend: EdgeBackend
|
||||
)
|
||||
|
||||
reconciled_pid = MediaSession.whereis(:hls_edge, generation.id)
|
||||
assert is_pid(reconciled_pid)
|
||||
|
||||
assert {:ok, %{"session" => started}} =
|
||||
StreamLifecycle.start(@context, %{
|
||||
"mode" => "hls_edge",
|
||||
"endpoint_id" => edge["id"],
|
||||
"generation_id" => generation.id,
|
||||
"source_tls_verify" => false,
|
||||
backend: EdgeBackend
|
||||
})
|
||||
|
||||
assert started["role"] == "hls_edge"
|
||||
assert started["status"] == "running"
|
||||
assert started["pid"] == inspect(reconciled_pid)
|
||||
|
||||
assert :ok = MediaSession.stop_generation(generation.id)
|
||||
end
|
||||
|
||||
test "stream lifecycle marks failed media startup attempts inactive" do
|
||||
previous_ffmpeg = System.get_env("SENDER_FFMPEG_EXECUTABLE")
|
||||
System.put_env("SENDER_FFMPEG_EXECUTABLE", "sender-missing-ffmpeg-test")
|
||||
|
||||
Reference in New Issue
Block a user