diff --git a/flake.nix b/flake.nix index 7fb471a..8712116 100644 --- a/flake.nix +++ b/flake.nix @@ -83,7 +83,7 @@ inherit version src; hostRelease = tribes.packages.${system}.tribesRelease; hostSource = tribes; - mixFodDepsHash = "sha256-IlAyZ2ED01ktTA4QP8L/FkaxJm8tmuwW9nV6C+tNKcw="; + mixFodDepsHash = "sha256-XcX20Q8KN6PTIlLt3nLDx5/XrWew9BYep73HFBhIScg="; assets = senderAssets; nativeBuildInputs = [pkgs.gcc pkgs.gnumake]; meta.description = "RTMP ingest and HLS streaming plugin for Tribes"; @@ -94,7 +94,7 @@ inherit version src; hostRelease = tribes.packages.${system}.tribesReleaseE2E; hostSource = tribes; - mixFodDepsHash = "sha256-IlAyZ2ED01ktTA4QP8L/FkaxJm8tmuwW9nV6C+tNKcw="; + mixFodDepsHash = "sha256-XcX20Q8KN6PTIlLt3nLDx5/XrWew9BYep73HFBhIScg="; assets = senderAssets; nativeBuildInputs = [pkgs.gcc pkgs.gnumake]; meta.description = "RTMP ingest and HLS streaming plugin for Tribes"; diff --git a/lib/sender/edge_session_reconciler.ex b/lib/sender/edge_session_reconciler.ex index edab98d..9ccfe8f 100644 --- a/lib/sender/edge_session_reconciler.ex +++ b/lib/sender/edge_session_reconciler.ex @@ -59,7 +59,7 @@ defmodule Sender.EdgeSessionReconciler do @impl true def handle_info(:reconcile, state) do - case reconcile_once(state.opts) do + case safe_reconcile_once(state.opts) do {:ok, %{started: started, errors: errors}} -> if started > 0 do Logger.info("Sender restarted #{started} HLS edge session(s) from persisted topology.") @@ -79,6 +79,16 @@ defmodule Sender.EdgeSessionReconciler do {:noreply, state} end + defp safe_reconcile_once(opts) do + reconcile_once(opts) + rescue + exception -> + {:error, {exception.__struct__, Exception.message(exception)}} + catch + kind, reason -> + {:error, {kind, reason}} + end + defp reconcile_endpoint(endpoint, generations, ash_opts, opts, acc) do case source_endpoint(endpoint, ash_opts) do {:ok, source_endpoint} -> diff --git a/scripts/sender_e2e_runner.exs b/scripts/sender_e2e_runner.exs index 4d800b1..1e1a580 100644 --- a/scripts/sender_e2e_runner.exs +++ b/scripts/sender_e2e_runner.exs @@ -33,7 +33,7 @@ defmodule Sender.E2ERunner do {:ok, %{"endpoint" => origin_endpoint}} = sender(@origin, "media_endpoints.upsert", %{ "type" => "tribes_origin", - "node_pubkey" => origin_info["pubkey"], + "node_pubkey" => node_pubkey(origin_info), "display_name" => "Origin", "hls_base_url" => @origin_hls_base, "rtmp_ingest_url" => "rtmp://sender-origin:1935/live/source" @@ -43,7 +43,7 @@ defmodule Sender.E2ERunner do sender(@edge, "media_endpoints.upsert", %{ "id" => origin_endpoint["id"], "type" => "tribes_origin", - "node_pubkey" => origin_info["pubkey"], + "node_pubkey" => node_pubkey(origin_info), "display_name" => "Origin", "hls_base_url" => @origin_hls_base, "rtmp_ingest_url" => "rtmp://sender-origin:1935/live/source" @@ -52,7 +52,7 @@ defmodule Sender.E2ERunner do {:ok, %{"endpoint" => edge_endpoint}} = sender(@edge, "media_endpoints.upsert", %{ "type" => "tribes_edge", - "node_pubkey" => edge_info["pubkey"], + "node_pubkey" => node_pubkey(edge_info), "source_endpoint_id" => origin_endpoint["id"], "display_name" => "Edge" }) @@ -429,13 +429,19 @@ defmodule Sender.E2ERunner do defp upsert_cluster_node(base_url, node_info) do admin(base_url, "cluster_nodes.upsert", %{ - "pubkey" => node_info["pubkey"], + "pubkey" => node_pubkey(node_info), "transport_address" => node_info["sync_url"], "scope" => "all", "status" => "active" }) end + defp node_pubkey(%{"node_pubkey" => pubkey}) when is_binary(pubkey) and pubkey != "", + do: pubkey + + defp node_pubkey(%{"pubkey" => pubkey}) when is_binary(pubkey) and pubkey != "", + do: pubkey + defp admin(base_url, method, params) do url = base_url <> "/api/admin/management" body = JSON.encode!(%{"method" => method, "params" => params}) diff --git a/test/sender/management_test.exs b/test/sender/management_test.exs index ca0a1b2..91f153b 100644 --- a/test/sender/management_test.exs +++ b/test/sender/management_test.exs @@ -1,6 +1,8 @@ defmodule Sender.ManagementTest do use Sender.DataCase, async: false + import ExUnit.CaptureLog + alias Sender.{EdgeSessionReconciler, MediaSession} alias Sender.Management alias Sender.Management.{EndpointSnapshots, Stream, StreamKey, StreamLifecycle, Topology} @@ -95,6 +97,36 @@ defmodule Sender.ManagementTest do assert metrics["hls_log_parse_errors"].stat == "sum" end + test "edge session reconciler logs startup-time errors instead of crashing" do + previous_trap_exit = Process.flag(:trap_exit, true) + + try do + name = :"sender_edge_reconciler_#{System.unique_integer([:positive])}" + + assert {:ok, pid} = + EdgeSessionReconciler.start_link( + name: name, + initial_delay_ms: 60_000, + interval_ms: 60_000, + node_pubkey_fun: fn -> raise RuntimeError, "repo unavailable" end + ) + + log = + capture_log(fn -> + send(pid, :reconcile) + Process.sleep(50) + end) + + assert Process.alive?(pid) + assert log =~ "Sender edge session reconciliation failed" + assert log =~ "repo unavailable" + + GenServer.stop(pid) + after + Process.flag(:trap_exit, previous_trap_exit) + end + end + test "default stream methods use a singleton product surface" do assert {:ok, %{"stream" => created}} = Stream.ensure_default(@context, %{