From 07eb4e01c4462ff81a9a3f0859fbf4ddc7c07f2c Mon Sep 17 00:00:00 2001 From: Steffen Beyer Date: Mon, 18 May 2026 18:30:57 +0200 Subject: [PATCH] fix: align sender startup with current node info Use the current node_info node_pubkey field in the Docker e2e runner and refresh the Nix Mix dependency hash after the muontrap dependency update. Keep edge session reconciliation alive when it runs before the host repo is ready during startup. --- flake.nix | 4 ++-- lib/sender/edge_session_reconciler.ex | 12 +++++++++- scripts/sender_e2e_runner.exs | 14 ++++++++---- test/sender/management_test.exs | 32 +++++++++++++++++++++++++++ 4 files changed, 55 insertions(+), 7 deletions(-) diff --git a/flake.nix b/flake.nix index 7fb471a..8712116 100644 --- a/flake.nix +++ b/flake.nix @@ -83,7 +83,7 @@ inherit version src; hostRelease = tribes.packages.${system}.tribesRelease; hostSource = tribes; - mixFodDepsHash = "sha256-IlAyZ2ED01ktTA4QP8L/FkaxJm8tmuwW9nV6C+tNKcw="; + mixFodDepsHash = "sha256-XcX20Q8KN6PTIlLt3nLDx5/XrWew9BYep73HFBhIScg="; assets = senderAssets; nativeBuildInputs = [pkgs.gcc pkgs.gnumake]; meta.description = "RTMP ingest and HLS streaming plugin for Tribes"; @@ -94,7 +94,7 @@ inherit version src; hostRelease = tribes.packages.${system}.tribesReleaseE2E; hostSource = tribes; - mixFodDepsHash = "sha256-IlAyZ2ED01ktTA4QP8L/FkaxJm8tmuwW9nV6C+tNKcw="; + mixFodDepsHash = "sha256-XcX20Q8KN6PTIlLt3nLDx5/XrWew9BYep73HFBhIScg="; assets = senderAssets; nativeBuildInputs = [pkgs.gcc pkgs.gnumake]; meta.description = "RTMP ingest and HLS streaming plugin for Tribes"; diff --git a/lib/sender/edge_session_reconciler.ex b/lib/sender/edge_session_reconciler.ex index edab98d..9ccfe8f 100644 --- a/lib/sender/edge_session_reconciler.ex +++ b/lib/sender/edge_session_reconciler.ex @@ -59,7 +59,7 @@ defmodule Sender.EdgeSessionReconciler do @impl true def handle_info(:reconcile, state) do - case reconcile_once(state.opts) do + case safe_reconcile_once(state.opts) do {:ok, %{started: started, errors: errors}} -> if started > 0 do Logger.info("Sender restarted #{started} HLS edge session(s) from persisted topology.") @@ -79,6 +79,16 @@ defmodule Sender.EdgeSessionReconciler do {:noreply, state} end + defp safe_reconcile_once(opts) do + reconcile_once(opts) + rescue + exception -> + {:error, {exception.__struct__, Exception.message(exception)}} + catch + kind, reason -> + {:error, {kind, reason}} + end + defp reconcile_endpoint(endpoint, generations, ash_opts, opts, acc) do case source_endpoint(endpoint, ash_opts) do {:ok, source_endpoint} -> diff --git a/scripts/sender_e2e_runner.exs b/scripts/sender_e2e_runner.exs index 4d800b1..1e1a580 100644 --- a/scripts/sender_e2e_runner.exs +++ b/scripts/sender_e2e_runner.exs @@ -33,7 +33,7 @@ defmodule Sender.E2ERunner do {:ok, %{"endpoint" => origin_endpoint}} = sender(@origin, "media_endpoints.upsert", %{ "type" => "tribes_origin", - "node_pubkey" => origin_info["pubkey"], + "node_pubkey" => node_pubkey(origin_info), "display_name" => "Origin", "hls_base_url" => @origin_hls_base, "rtmp_ingest_url" => "rtmp://sender-origin:1935/live/source" @@ -43,7 +43,7 @@ defmodule Sender.E2ERunner do sender(@edge, "media_endpoints.upsert", %{ "id" => origin_endpoint["id"], "type" => "tribes_origin", - "node_pubkey" => origin_info["pubkey"], + "node_pubkey" => node_pubkey(origin_info), "display_name" => "Origin", "hls_base_url" => @origin_hls_base, "rtmp_ingest_url" => "rtmp://sender-origin:1935/live/source" @@ -52,7 +52,7 @@ defmodule Sender.E2ERunner do {:ok, %{"endpoint" => edge_endpoint}} = sender(@edge, "media_endpoints.upsert", %{ "type" => "tribes_edge", - "node_pubkey" => edge_info["pubkey"], + "node_pubkey" => node_pubkey(edge_info), "source_endpoint_id" => origin_endpoint["id"], "display_name" => "Edge" }) @@ -429,13 +429,19 @@ defmodule Sender.E2ERunner do defp upsert_cluster_node(base_url, node_info) do admin(base_url, "cluster_nodes.upsert", %{ - "pubkey" => node_info["pubkey"], + "pubkey" => node_pubkey(node_info), "transport_address" => node_info["sync_url"], "scope" => "all", "status" => "active" }) end + defp node_pubkey(%{"node_pubkey" => pubkey}) when is_binary(pubkey) and pubkey != "", + do: pubkey + + defp node_pubkey(%{"pubkey" => pubkey}) when is_binary(pubkey) and pubkey != "", + do: pubkey + defp admin(base_url, method, params) do url = base_url <> "/api/admin/management" body = JSON.encode!(%{"method" => method, "params" => params}) diff --git a/test/sender/management_test.exs b/test/sender/management_test.exs index ca0a1b2..91f153b 100644 --- a/test/sender/management_test.exs +++ b/test/sender/management_test.exs @@ -1,6 +1,8 @@ defmodule Sender.ManagementTest do use Sender.DataCase, async: false + import ExUnit.CaptureLog + alias Sender.{EdgeSessionReconciler, MediaSession} alias Sender.Management alias Sender.Management.{EndpointSnapshots, Stream, StreamKey, StreamLifecycle, Topology} @@ -95,6 +97,36 @@ defmodule Sender.ManagementTest do assert metrics["hls_log_parse_errors"].stat == "sum" end + test "edge session reconciler logs startup-time errors instead of crashing" do + previous_trap_exit = Process.flag(:trap_exit, true) + + try do + name = :"sender_edge_reconciler_#{System.unique_integer([:positive])}" + + assert {:ok, pid} = + EdgeSessionReconciler.start_link( + name: name, + initial_delay_ms: 60_000, + interval_ms: 60_000, + node_pubkey_fun: fn -> raise RuntimeError, "repo unavailable" end + ) + + log = + capture_log(fn -> + send(pid, :reconcile) + Process.sleep(50) + end) + + assert Process.alive?(pid) + assert log =~ "Sender edge session reconciliation failed" + assert log =~ "repo unavailable" + + GenServer.stop(pid) + after + Process.flag(:trap_exit, previous_trap_exit) + end + end + test "default stream methods use a singleton product surface" do assert {:ok, %{"stream" => created}} = Stream.ensure_default(@context, %{