Files
self 3e288a7e95 feat: prefix Sender plugin slug
Rename Sender plugin identity to tribe-one-sender across manifest, runtime API routes, metrics, e2e fixtures, and tests.
2026-06-17 22:33:33 +02:00

641 lines
19 KiB
Elixir

defmodule TribeOne.TribesPlugin.Sender.E2ERunner do
alias Parrhesia.API.Auth
@admin_pubkey "985826ac4ec99f6304785ebfaa303ec42973653a682cfa0a06caf9fec662eaa7"
@admin_privkey Base.decode16!(
"7ee088208557dcc06405ccf6547115b62a597c5d107ee6044bb430e71a68655c",
case: :lower
)
@origin "http://127.0.0.1:46101"
@edge "http://127.0.0.1:46102"
@edge_vm "http://127.0.0.1:48112"
@origin_hls_base "http://sender-origin:4000/sender/hls"
@rtmp_push_url "rtmp://127.0.0.1:49135/live/source"
def main(_argv) do
:ok = ensure_http_client_started()
assert_ok(wait_victoria_metrics(@edge_vm), "edge VictoriaMetrics ready")
assert_ok(wait_http(@origin <> "/sign-in"), "origin node ready")
assert_ok(wait_http(@edge <> "/sign-in"), "edge node ready")
{:ok, origin_info} =
admin_with_retry(@origin, "node_info", %{}, attempts: 90, delay_ms: 1_000)
{:ok, edge_info} = admin_with_retry(@edge, "node_info", %{}, attempts: 90, delay_ms: 1_000)
assert_plugin_loaded!(@origin, "origin")
assert_plugin_loaded!(@edge, "edge")
:ok = connect_cluster_pair(@origin, origin_info, @edge, edge_info)
{:ok, %{"endpoint" => origin_endpoint}} =
sender(@origin, "media_endpoints.upsert", %{
"type" => "tribes_origin",
"node_pubkey" => node_pubkey(origin_info),
"display_name" => "Origin",
"hls_base_url" => @origin_hls_base,
"rtmp_ingest_url" => "rtmp://sender-origin:1935/live/source"
})
{:ok, _mirrored_origin_endpoint} =
sender(@edge, "media_endpoints.upsert", %{
"id" => origin_endpoint["id"],
"type" => "tribes_origin",
"node_pubkey" => node_pubkey(origin_info),
"display_name" => "Origin",
"hls_base_url" => @origin_hls_base,
"rtmp_ingest_url" => "rtmp://sender-origin:1935/live/source"
})
{:ok, %{"endpoint" => edge_endpoint}} =
sender(@edge, "media_endpoints.upsert", %{
"type" => "tribes_edge",
"node_pubkey" => node_pubkey(edge_info),
"source_endpoint_id" => origin_endpoint["id"],
"display_name" => "Edge"
})
{:ok, %{"generation" => generation, "stream" => stream}} =
sender(@origin, "stream.start", %{
"endpoint_id" => origin_endpoint["id"],
"input_url" => "rtmp://0.0.0.0:1935/live/source",
"ffmpeg_loglevel" => "info",
"ffmpeg_log_output" => "warning",
"backend_restart_delay_ms" => 250,
"backend_restart_max_delay_ms" => 1_000,
"readiness_poll_interval_ms" => 250
})
{:ok, _edge_start} =
sender_with_retry(
@edge,
"stream.start",
%{
"mode" => "hls_edge",
"endpoint_id" => edge_endpoint["id"],
"generation_id" => generation["id"],
"backend_restart_delay_ms" => 250,
"backend_restart_max_delay_ms" => 1_000,
"edge_poll_interval_ms" => 250,
"source_request_headers" => %{
"x-forwarded-proto" => "https"
}
},
attempts: 90,
delay_ms: 1_000
)
:ok = push_test_stream()
assert_ok(
wait_sender_status(@origin, generation["id"], "origin_ingest", "live") == :ok,
"origin generation becomes live",
&dump_debug/0
)
{_playlist, segment_ref} = wait_edge_playlist!(stream["id"], generation["id"])
assert_ok(
fetch_edge_segment(stream["id"], generation["id"], segment_ref),
"edge segment served"
)
{:ok, %{"session" => edge_status}} =
sender(@edge, "stream.status", %{
"mode" => "hls_edge",
"generation_id" => generation["id"]
})
assert_ok(edge_status["status"] == "running", "edge session running", &dump_debug/0)
:ok = emit_viewer_heartbeats!(stream["id"], generation["id"])
assert_ok(
wait_for_sender_metric_export(@edge, 2) == :ok,
"sender viewer count exported on /metrics",
&dump_metrics_debug/0
)
assert_ok(
wait_for_sender_metric_rollup(@edge, 2, fn ->
emit_viewer_heartbeats!(stream["id"], generation["id"])
end) == :ok,
"sender viewer count appears in metrics rollup",
&dump_metrics_debug/0
)
_ = sender(@edge, "stream.stop", %{"mode" => "hls_edge", "generation_id" => generation["id"]})
_ = sender(@origin, "stream.stop", %{"generation_id" => generation["id"]})
IO.puts("sender e2e assertions passed")
end
defp assert_plugin_loaded!(base_url, label) do
{:ok, plugins} = admin_with_retry(base_url, "plugin_list", %{}, attempts: 30, delay_ms: 1_000)
loaded? =
Enum.any?(plugins["plugins"], fn
%{"name" => "tribe-one-sender", "status" => "loaded"} -> true
_other -> false
end)
assert_ok(loaded?, "#{label} sender plugin loaded")
end
defp sender(base_url, method, params) do
admin(base_url, "plugin.call", %{
"plugin" => "tribe-one-sender",
"method" => method,
"version" => "1",
"params" => params
})
end
defp sender_with_retry(base_url, method, params, opts) do
attempts = Keyword.fetch!(opts, :attempts)
delay_ms = Keyword.fetch!(opts, :delay_ms)
Enum.reduce_while(1..attempts, {:error, :retry_exhausted}, fn attempt, _acc ->
case sender(base_url, method, params) do
{:ok, _result} = ok ->
{:halt, ok}
{:error, reason} = error when attempt < attempts ->
maybe_log_retry(method, attempt, attempts, reason)
Process.sleep(delay_ms)
{:cont, error}
error ->
{:halt, error}
end
end)
end
defp maybe_log_retry(method, attempt, attempts, reason) do
if attempt == 1 or rem(attempt, 10) == 0 do
IO.puts("sender #{method} retry #{attempt}/#{attempts}: #{retry_reason(reason)}")
end
end
defp retry_reason(%{status: status, body: %{"error" => error}}) when is_binary(error) do
"#{status} #{String.slice(error, 0, 160)}"
end
defp retry_reason(reason), do: inspect(reason)
defp push_test_stream do
args = [
"-hide_banner",
"-loglevel",
"warning",
"-re",
"-f",
"lavfi",
"-i",
"testsrc=size=320x180:rate=15",
"-f",
"lavfi",
"-i",
"sine=frequency=1000:sample_rate=48000",
"-t",
"18",
"-c:v",
"libx264",
"-preset",
"ultrafast",
"-tune",
"zerolatency",
"-pix_fmt",
"yuv420p",
"-g",
"30",
"-keyint_min",
"30",
"-sc_threshold",
"0",
"-c:a",
"aac",
"-f",
"flv",
@rtmp_push_url
]
Enum.reduce_while(1..30, {:error, :retry_exhausted}, fn attempt, _acc ->
case System.cmd("ffmpeg", args, stderr_to_stdout: true) do
{_output, 0} ->
{:halt, :ok}
{output, _status} when attempt < 30 ->
IO.puts("ffmpeg push retry #{attempt}/30: #{String.trim(output)}")
Process.sleep(1_000)
{:cont, {:error, output}}
{output, status} ->
{:halt, {:error, {:ffmpeg_push_failed, status, output}}}
end
end)
|> case do
:ok ->
:ok
{:error, reason} ->
assert_ok(false, "ffmpeg push failed: #{inspect(reason)}", &dump_debug/0)
end
end
defp wait_sender_status(base_url, generation_id, role, expected_generation_status) do
mode = if role == "hls_edge", do: "hls_edge", else: "origin_ingest"
Enum.reduce_while(1..90, {:error, :timeout}, fn _attempt, _acc ->
case sender(base_url, "stream.status", %{"mode" => mode, "generation_id" => generation_id}) do
{:ok, %{"generation" => %{"status" => ^expected_generation_status}}} ->
{:halt, :ok}
other ->
Process.sleep(1_000)
{:cont, other}
end
end)
end
defp wait_edge_playlist!(stream_id, generation_id) do
path = viewer_playlist_path(stream_id, generation_id, "browser-1")
Enum.reduce_while(1..120, {:error, :timeout}, fn _attempt, _acc ->
case http_get(@edge <> path, viewer_headers()) do
{:ok, 200, "#EXTM3U" <> _rest = playlist} ->
case first_segment_ref(playlist) do
{:ok, segment_ref} -> {:halt, {playlist, segment_ref}}
{:error, _reason} -> retry_playlist({:missing_segment, playlist})
end
{:ok, 200, body} ->
retry_playlist({:non_hls_playlist, String.slice(body, 0, 200)})
other ->
retry_playlist(other)
end
end)
|> case do
{playlist, segment_ref} ->
{playlist, segment_ref}
other ->
assert_ok(false, "edge playlist not ready: #{inspect(other)}", &dump_debug/0)
end
end
defp retry_playlist(reason) do
Process.sleep(1_000)
{:cont, reason}
end
defp first_segment_ref(playlist) do
playlist
|> String.split("\n")
|> Enum.map(&String.trim/1)
|> Enum.find(fn line -> line != "" and not String.starts_with?(line, "#") end)
|> case do
nil -> {:error, :missing_segment}
segment -> {:ok, segment}
end
end
defp fetch_edge_segment(stream_id, generation_id, segment_ref) do
path = "/sender/hls/streams/#{stream_id}/#{generation_id}/#{segment_ref}"
case http_get(@edge <> path, viewer_headers()) do
{:ok, 200, body} -> byte_size(body) > 0
_other -> false
end
end
defp viewer_playlist_path(stream_id, generation_id, viewer_session_id) do
"/sender/hls/streams/#{stream_id}/#{generation_id}/media.m3u8?vsid=#{viewer_session_id}"
end
defp emit_viewer_heartbeats!(stream_id, generation_id) do
["browser-1", "browser-2"]
|> Enum.with_index(1)
|> Enum.each(fn {viewer_session_id, expected_count} ->
{:ok, 202, %{"ok" => true, "viewer_count" => viewer_count}} =
post_viewer_heartbeat(stream_id, generation_id, viewer_session_id)
assert_ok(
viewer_count == expected_count or viewer_count == 2,
"viewer heartbeat counted session #{viewer_session_id}, got #{inspect(viewer_count)}"
)
end)
:ok
end
defp post_viewer_heartbeat(stream_id, generation_id, viewer_session_id) do
http_post_json(@edge <> "/plugins-api/tribe-one-sender/player-events", %{
"event" => "heartbeat",
"stream_id" => stream_id,
"generation_id" => generation_id,
"viewer_session_id" => viewer_session_id
})
end
defp wait_for_sender_metric_rollup(base_url, expected_value, keepalive_fun) do
Enum.reduce_while(1..150, {:error, :timeout}, fn _attempt, _acc ->
keepalive_fun.()
case admin(base_url, "metrics_rollups.list", %{"limit" => 120}) do
{:ok, %{"rollups" => rollups}} ->
if Enum.any?(rollups, &sender_metric_rollup?(&1, expected_value)) do
{:halt, :ok}
else
Process.sleep(1_000)
{:cont, {:error, rollups}}
end
other ->
Process.sleep(1_000)
{:cont, other}
end
end)
end
defp wait_for_sender_metric_export(base_url, expected_value) do
Enum.reduce_while(1..30, {:error, :timeout}, fn _attempt, _acc ->
case http_get(base_url <> "/metrics") do
{:ok, 200, body} ->
if metrics_body_has_sender_viewer_count?(body, expected_value) do
{:halt, :ok}
else
Process.sleep(1_000)
{:cont, {:error, String.slice(body, 0, 2_000)}}
end
other ->
Process.sleep(1_000)
{:cont, other}
end
end)
end
defp metrics_body_has_sender_viewer_count?(body, expected_value) do
expected = Integer.to_string(expected_value)
body
|> String.split("\n")
|> Enum.any?(fn line ->
String.starts_with?(line, "plugins_tribe_one_sender_viewer_count ") and
String.trim_trailing(line) in [
"plugins_tribe_one_sender_viewer_count #{expected}",
"plugins_tribe_one_sender_viewer_count #{expected}.0"
]
end)
end
defp sender_metric_rollup?(%{"plugins" => plugins}, expected_value) when is_map(plugins) do
plugins
|> get_in(["tribe-one-sender", "viewer_count", "last"])
|> case do
nil -> get_in(plugins, ["tribe-one-sender", "viewer_count", "max"])
value -> value
end
|> numeric_equal?(expected_value)
end
defp sender_metric_rollup?(_rollup, _expected_value), do: false
defp numeric_equal?(value, expected) when is_integer(value) or is_float(value),
do: value == expected
defp numeric_equal?(value, expected) when is_binary(value) do
case Float.parse(value) do
{parsed, ""} -> parsed == expected
_other -> false
end
end
defp numeric_equal?(_value, _expected), do: false
defp connect_cluster_pair(left_base_url, left_node_info, right_base_url, right_node_info) do
with {:ok, _} <- upsert_cluster_node(left_base_url, right_node_info),
{:ok, _} <- upsert_cluster_node(right_base_url, left_node_info) do
:ok
else
{:error, reason} ->
assert_ok(false, "unable to connect cluster pair: #{inspect(reason)}")
end
end
defp upsert_cluster_node(base_url, node_info) do
admin(base_url, "cluster_nodes.upsert", %{
"pubkey" => node_pubkey(node_info),
"transport_address" => node_info["sync_url"],
"scope" => "all",
"status" => "active"
})
end
defp node_pubkey(%{"node_pubkey" => pubkey}) when is_binary(pubkey) and pubkey != "",
do: pubkey
defp node_pubkey(%{"pubkey" => pubkey}) when is_binary(pubkey) and pubkey != "",
do: pubkey
defp admin(base_url, method, params) do
url = base_url <> "/api/admin/management"
body = JSON.encode!(%{"method" => method, "params" => params})
headers = [
{~c"content-type", ~c"application/json"},
{~c"authorization", to_charlist(nip98_authorization("POST", url))}
]
case :httpc.request(:post, {to_charlist(url), headers, ~c"application/json", body}, [],
body_format: :binary
) do
{:ok, {{_version, status, _reason}, _headers, response_body}} ->
decoded = decode_json(response_body)
if status == 200 and decoded["ok"] == true do
{:ok, decoded["result"]}
else
{:error, %{status: status, body: decoded}}
end
{:error, reason} ->
{:error, reason}
end
end
defp admin_with_retry(base_url, method, params, opts) do
attempts = Keyword.get(opts, :attempts, 10)
delay_ms = Keyword.get(opts, :delay_ms, 500)
Enum.reduce_while(1..attempts, {:error, :retry_exhausted}, fn attempt, _acc ->
case admin(base_url, method, params) do
{:ok, _result} = ok ->
{:halt, ok}
error when attempt < attempts ->
Process.sleep(delay_ms)
{:cont, error}
error ->
{:halt, error}
end
end)
end
defp http_get(url, headers \\ base_headers()) do
case :httpc.request(:get, {to_charlist(url), headers}, [], body_format: :binary) do
{:ok, {{_version, status, _reason}, _headers, body}} -> {:ok, status, body}
{:error, reason} -> {:error, reason}
end
end
defp http_post_json(url, payload) do
body = JSON.encode!(payload)
headers = [{~c"content-type", ~c"application/json"} | viewer_headers()]
case :httpc.request(:post, {to_charlist(url), headers, ~c"application/json", body}, [],
body_format: :binary
) do
{:ok, {{_version, status, _reason}, _headers, response_body}} ->
{:ok, status, decode_json(response_body)}
{:error, reason} ->
{:error, reason}
end
end
defp base_headers, do: [{~c"x-forwarded-proto", ~c"https"}]
defp viewer_headers, do: base_headers()
defp wait_http(url) do
Enum.reduce_while(1..90, {:error, :timeout}, fn _attempt, _acc ->
case http_get(url) do
{:ok, status, _body} when status in 200..499 ->
{:halt, :ok}
other ->
Process.sleep(1_000)
{:cont, other}
end
end)
end
defp wait_victoria_metrics(base_url) do
Enum.reduce_while(1..90, {:error, :timeout}, fn _attempt, _acc ->
case http_get(base_url <> "/api/v1/query?query=vector(1)") do
{:ok, 200, _body} ->
{:halt, :ok}
other ->
Process.sleep(1_000)
{:cont, other}
end
end)
end
defp nip98_authorization(method, url) do
base = %{
"pubkey" => @admin_pubkey,
"created_at" => System.system_time(:second),
"kind" => 27_235,
"tags" => [["method", method], ["u", url]],
"content" => "sender-e2e-#{System.unique_integer([:positive, :monotonic])}"
}
event_id = Auth.compute_event_id(base)
{:ok, sig} = Tribes.Keyring.sign_event(Base.decode16!(event_id, case: :lower), @admin_privkey)
event = Map.merge(base, %{"id" => event_id, "sig" => sig})
"Nostr " <> Base.encode64(JSON.encode!(event))
end
defp decode_json(body) when is_binary(body) do
case JSON.decode(body) do
{:ok, decoded} -> decoded
{:error, _reason} -> %{"raw" => body}
end
end
defp ensure_http_client_started do
case :inets.start() do
:ok -> :ok
{:error, {:already_started, :inets}} -> :ok
{:error, reason} -> {:error, reason}
end
end
defp dump_debug do
IO.puts(:stderr, "==== sender e2e debug dump ====")
Enum.each([origin: @origin, edge: @edge], fn {name, base_url} ->
IO.puts(:stderr, "-- #{name} #{base_url} --")
IO.puts(
:stderr,
"node_info: #{inspect(admin(base_url, "node_info", %{}), limit: :infinity)}"
)
IO.puts(
:stderr,
"plugin_list: #{inspect(admin(base_url, "plugin_list", %{}), limit: :infinity)}"
)
IO.puts(
:stderr,
"sender status: #{inspect(sender(base_url, "stream.status", %{}), limit: :infinity)}"
)
end)
IO.puts(:stderr, "==== end sender e2e debug dump ====")
end
defp dump_metrics_debug do
IO.puts(:stderr, "==== sender e2e metrics debug dump ====")
Enum.each([origin: @origin, edge: @edge], fn {name, base_url} ->
IO.puts(:stderr, "-- #{name} #{base_url} --")
IO.puts(
:stderr,
"metrics_rollups.list: #{inspect(admin(base_url, "metrics_rollups.list", %{"limit" => 10}), limit: :infinity)}"
)
end)
IO.puts(:stderr, "==== end sender e2e metrics debug dump ====")
end
defp assert_ok(true, _message), do: :ok
defp assert_ok(:ok, _message), do: :ok
defp assert_ok(false, message) do
IO.puts(:stderr, "assertion failed: #{message}")
System.halt(1)
end
defp assert_ok(other, message) do
IO.puts(:stderr, "assertion failed: #{message} (got: #{inspect(other)})")
System.halt(1)
end
defp assert_ok(true, _message, _on_failure), do: :ok
defp assert_ok(false, message, on_failure) when is_function(on_failure, 0) do
on_failure.()
assert_ok(false, message)
end
defp assert_ok(other, message, on_failure) when is_function(on_failure, 0) do
on_failure.()
assert_ok(other, message)
end
end
TribeOne.TribesPlugin.Sender.E2ERunner.main(System.argv())