You've already forked tribes-plugin-sender
forked from tribes/tribes-plugin-template
fix: align sender startup with current node info
Use the current node_info node_pubkey field in the Docker e2e runner and refresh the Nix Mix dependency hash after the muontrap dependency update. Keep edge session reconciliation alive when it runs before the host repo is ready during startup.
This commit is contained in:
@@ -83,7 +83,7 @@
|
||||
inherit version src;
|
||||
hostRelease = tribes.packages.${system}.tribesRelease;
|
||||
hostSource = tribes;
|
||||
mixFodDepsHash = "sha256-IlAyZ2ED01ktTA4QP8L/FkaxJm8tmuwW9nV6C+tNKcw=";
|
||||
mixFodDepsHash = "sha256-XcX20Q8KN6PTIlLt3nLDx5/XrWew9BYep73HFBhIScg=";
|
||||
assets = senderAssets;
|
||||
nativeBuildInputs = [pkgs.gcc pkgs.gnumake];
|
||||
meta.description = "RTMP ingest and HLS streaming plugin for Tribes";
|
||||
@@ -94,7 +94,7 @@
|
||||
inherit version src;
|
||||
hostRelease = tribes.packages.${system}.tribesReleaseE2E;
|
||||
hostSource = tribes;
|
||||
mixFodDepsHash = "sha256-IlAyZ2ED01ktTA4QP8L/FkaxJm8tmuwW9nV6C+tNKcw=";
|
||||
mixFodDepsHash = "sha256-XcX20Q8KN6PTIlLt3nLDx5/XrWew9BYep73HFBhIScg=";
|
||||
assets = senderAssets;
|
||||
nativeBuildInputs = [pkgs.gcc pkgs.gnumake];
|
||||
meta.description = "RTMP ingest and HLS streaming plugin for Tribes";
|
||||
|
||||
@@ -59,7 +59,7 @@ defmodule Sender.EdgeSessionReconciler do
|
||||
|
||||
@impl true
|
||||
def handle_info(:reconcile, state) do
|
||||
case reconcile_once(state.opts) do
|
||||
case safe_reconcile_once(state.opts) do
|
||||
{:ok, %{started: started, errors: errors}} ->
|
||||
if started > 0 do
|
||||
Logger.info("Sender restarted #{started} HLS edge session(s) from persisted topology.")
|
||||
@@ -79,6 +79,16 @@ defmodule Sender.EdgeSessionReconciler do
|
||||
{:noreply, state}
|
||||
end
|
||||
|
||||
defp safe_reconcile_once(opts) do
|
||||
reconcile_once(opts)
|
||||
rescue
|
||||
exception ->
|
||||
{:error, {exception.__struct__, Exception.message(exception)}}
|
||||
catch
|
||||
kind, reason ->
|
||||
{:error, {kind, reason}}
|
||||
end
|
||||
|
||||
defp reconcile_endpoint(endpoint, generations, ash_opts, opts, acc) do
|
||||
case source_endpoint(endpoint, ash_opts) do
|
||||
{:ok, source_endpoint} ->
|
||||
|
||||
@@ -33,7 +33,7 @@ defmodule Sender.E2ERunner do
|
||||
{:ok, %{"endpoint" => origin_endpoint}} =
|
||||
sender(@origin, "media_endpoints.upsert", %{
|
||||
"type" => "tribes_origin",
|
||||
"node_pubkey" => origin_info["pubkey"],
|
||||
"node_pubkey" => node_pubkey(origin_info),
|
||||
"display_name" => "Origin",
|
||||
"hls_base_url" => @origin_hls_base,
|
||||
"rtmp_ingest_url" => "rtmp://sender-origin:1935/live/source"
|
||||
@@ -43,7 +43,7 @@ defmodule Sender.E2ERunner do
|
||||
sender(@edge, "media_endpoints.upsert", %{
|
||||
"id" => origin_endpoint["id"],
|
||||
"type" => "tribes_origin",
|
||||
"node_pubkey" => origin_info["pubkey"],
|
||||
"node_pubkey" => node_pubkey(origin_info),
|
||||
"display_name" => "Origin",
|
||||
"hls_base_url" => @origin_hls_base,
|
||||
"rtmp_ingest_url" => "rtmp://sender-origin:1935/live/source"
|
||||
@@ -52,7 +52,7 @@ defmodule Sender.E2ERunner do
|
||||
{:ok, %{"endpoint" => edge_endpoint}} =
|
||||
sender(@edge, "media_endpoints.upsert", %{
|
||||
"type" => "tribes_edge",
|
||||
"node_pubkey" => edge_info["pubkey"],
|
||||
"node_pubkey" => node_pubkey(edge_info),
|
||||
"source_endpoint_id" => origin_endpoint["id"],
|
||||
"display_name" => "Edge"
|
||||
})
|
||||
@@ -429,13 +429,19 @@ defmodule Sender.E2ERunner do
|
||||
|
||||
defp upsert_cluster_node(base_url, node_info) do
|
||||
admin(base_url, "cluster_nodes.upsert", %{
|
||||
"pubkey" => node_info["pubkey"],
|
||||
"pubkey" => node_pubkey(node_info),
|
||||
"transport_address" => node_info["sync_url"],
|
||||
"scope" => "all",
|
||||
"status" => "active"
|
||||
})
|
||||
end
|
||||
|
||||
defp node_pubkey(%{"node_pubkey" => pubkey}) when is_binary(pubkey) and pubkey != "",
|
||||
do: pubkey
|
||||
|
||||
defp node_pubkey(%{"pubkey" => pubkey}) when is_binary(pubkey) and pubkey != "",
|
||||
do: pubkey
|
||||
|
||||
defp admin(base_url, method, params) do
|
||||
url = base_url <> "/api/admin/management"
|
||||
body = JSON.encode!(%{"method" => method, "params" => params})
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
defmodule Sender.ManagementTest do
|
||||
use Sender.DataCase, async: false
|
||||
|
||||
import ExUnit.CaptureLog
|
||||
|
||||
alias Sender.{EdgeSessionReconciler, MediaSession}
|
||||
alias Sender.Management
|
||||
alias Sender.Management.{EndpointSnapshots, Stream, StreamKey, StreamLifecycle, Topology}
|
||||
@@ -95,6 +97,36 @@ defmodule Sender.ManagementTest do
|
||||
assert metrics["hls_log_parse_errors"].stat == "sum"
|
||||
end
|
||||
|
||||
test "edge session reconciler logs startup-time errors instead of crashing" do
|
||||
previous_trap_exit = Process.flag(:trap_exit, true)
|
||||
|
||||
try do
|
||||
name = :"sender_edge_reconciler_#{System.unique_integer([:positive])}"
|
||||
|
||||
assert {:ok, pid} =
|
||||
EdgeSessionReconciler.start_link(
|
||||
name: name,
|
||||
initial_delay_ms: 60_000,
|
||||
interval_ms: 60_000,
|
||||
node_pubkey_fun: fn -> raise RuntimeError, "repo unavailable" end
|
||||
)
|
||||
|
||||
log =
|
||||
capture_log(fn ->
|
||||
send(pid, :reconcile)
|
||||
Process.sleep(50)
|
||||
end)
|
||||
|
||||
assert Process.alive?(pid)
|
||||
assert log =~ "Sender edge session reconciliation failed"
|
||||
assert log =~ "repo unavailable"
|
||||
|
||||
GenServer.stop(pid)
|
||||
after
|
||||
Process.flag(:trap_exit, previous_trap_exit)
|
||||
end
|
||||
end
|
||||
|
||||
test "default stream methods use a singleton product surface" do
|
||||
assert {:ok, %{"stream" => created}} =
|
||||
Stream.ensure_default(@context, %{
|
||||
|
||||
Reference in New Issue
Block a user