defmodule TribeOne.TribesPlugin.Sender.E2ERunner do alias Parrhesia.API.Auth @admin_pubkey "985826ac4ec99f6304785ebfaa303ec42973653a682cfa0a06caf9fec662eaa7" @admin_privkey Base.decode16!( "7ee088208557dcc06405ccf6547115b62a597c5d107ee6044bb430e71a68655c", case: :lower ) @origin "http://127.0.0.1:46101" @edge "http://127.0.0.1:46102" @edge_vm "http://127.0.0.1:48112" @origin_hls_base "http://sender-origin:4000/sender/hls" @rtmp_push_url "rtmp://127.0.0.1:49135/live/source" def main(_argv) do :ok = ensure_http_client_started() assert_ok(wait_victoria_metrics(@edge_vm), "edge VictoriaMetrics ready") assert_ok(wait_http(@origin <> "/sign-in"), "origin node ready") assert_ok(wait_http(@edge <> "/sign-in"), "edge node ready") {:ok, origin_info} = admin_with_retry(@origin, "node_info", %{}, attempts: 90, delay_ms: 1_000) {:ok, edge_info} = admin_with_retry(@edge, "node_info", %{}, attempts: 90, delay_ms: 1_000) assert_plugin_loaded!(@origin, "origin") assert_plugin_loaded!(@edge, "edge") :ok = connect_cluster_pair(@origin, origin_info, @edge, edge_info) {:ok, %{"endpoint" => origin_endpoint}} = sender(@origin, "media_endpoints.upsert", %{ "type" => "tribes_origin", "node_pubkey" => node_pubkey(origin_info), "display_name" => "Origin", "hls_base_url" => @origin_hls_base, "rtmp_ingest_url" => "rtmp://sender-origin:1935/live/source" }) {:ok, _mirrored_origin_endpoint} = sender(@edge, "media_endpoints.upsert", %{ "id" => origin_endpoint["id"], "type" => "tribes_origin", "node_pubkey" => node_pubkey(origin_info), "display_name" => "Origin", "hls_base_url" => @origin_hls_base, "rtmp_ingest_url" => "rtmp://sender-origin:1935/live/source" }) {:ok, %{"endpoint" => edge_endpoint}} = sender(@edge, "media_endpoints.upsert", %{ "type" => "tribes_edge", "node_pubkey" => node_pubkey(edge_info), "source_endpoint_id" => origin_endpoint["id"], "display_name" => "Edge" }) {:ok, %{"generation" => generation, "stream" => stream}} = sender(@origin, "stream.start", %{ "endpoint_id" => origin_endpoint["id"], "input_url" => "rtmp://0.0.0.0:1935/live/source", "ffmpeg_loglevel" => "info", "ffmpeg_log_output" => "warning", "backend_restart_delay_ms" => 250, "backend_restart_max_delay_ms" => 1_000, "readiness_poll_interval_ms" => 250 }) {:ok, _edge_start} = sender_with_retry( @edge, "stream.start", %{ "mode" => "hls_edge", "endpoint_id" => edge_endpoint["id"], "generation_id" => generation["id"], "backend_restart_delay_ms" => 250, "backend_restart_max_delay_ms" => 1_000, "edge_poll_interval_ms" => 250, "source_request_headers" => %{ "x-forwarded-proto" => "https" } }, attempts: 90, delay_ms: 1_000 ) :ok = push_test_stream() assert_ok( wait_sender_status(@origin, generation["id"], "origin_ingest", "live") == :ok, "origin generation becomes live", &dump_debug/0 ) {_playlist, segment_ref} = wait_edge_playlist!(stream["id"], generation["id"]) assert_ok( fetch_edge_segment(stream["id"], generation["id"], segment_ref), "edge segment served" ) {:ok, %{"session" => edge_status}} = sender(@edge, "stream.status", %{ "mode" => "hls_edge", "generation_id" => generation["id"] }) assert_ok(edge_status["status"] == "running", "edge session running", &dump_debug/0) :ok = emit_viewer_heartbeats!(stream["id"], generation["id"]) assert_ok( wait_for_sender_metric_export(@edge, 2) == :ok, "sender viewer count exported on /metrics", &dump_metrics_debug/0 ) assert_ok( wait_for_sender_metric_rollup(@edge, 2, fn -> emit_viewer_heartbeats!(stream["id"], generation["id"]) end) == :ok, "sender viewer count appears in metrics rollup", &dump_metrics_debug/0 ) _ = sender(@edge, "stream.stop", %{"mode" => "hls_edge", "generation_id" => generation["id"]}) _ = sender(@origin, "stream.stop", %{"generation_id" => generation["id"]}) IO.puts("sender e2e assertions passed") end defp assert_plugin_loaded!(base_url, label) do {:ok, plugins} = admin_with_retry(base_url, "plugin_list", %{}, attempts: 30, delay_ms: 1_000) loaded? = Enum.any?(plugins["plugins"], fn %{"name" => "tribe-one-sender", "status" => "loaded"} -> true _other -> false end) assert_ok(loaded?, "#{label} sender plugin loaded") end defp sender(base_url, method, params) do admin(base_url, "plugin.call", %{ "plugin" => "tribe-one-sender", "method" => method, "version" => "1", "params" => params }) end defp sender_with_retry(base_url, method, params, opts) do attempts = Keyword.fetch!(opts, :attempts) delay_ms = Keyword.fetch!(opts, :delay_ms) Enum.reduce_while(1..attempts, {:error, :retry_exhausted}, fn attempt, _acc -> case sender(base_url, method, params) do {:ok, _result} = ok -> {:halt, ok} {:error, reason} = error when attempt < attempts -> maybe_log_retry(method, attempt, attempts, reason) Process.sleep(delay_ms) {:cont, error} error -> {:halt, error} end end) end defp maybe_log_retry(method, attempt, attempts, reason) do if attempt == 1 or rem(attempt, 10) == 0 do IO.puts("sender #{method} retry #{attempt}/#{attempts}: #{retry_reason(reason)}") end end defp retry_reason(%{status: status, body: %{"error" => error}}) when is_binary(error) do "#{status} #{String.slice(error, 0, 160)}" end defp retry_reason(reason), do: inspect(reason) defp push_test_stream do args = [ "-hide_banner", "-loglevel", "warning", "-re", "-f", "lavfi", "-i", "testsrc=size=320x180:rate=15", "-f", "lavfi", "-i", "sine=frequency=1000:sample_rate=48000", "-t", "18", "-c:v", "libx264", "-preset", "ultrafast", "-tune", "zerolatency", "-pix_fmt", "yuv420p", "-g", "30", "-keyint_min", "30", "-sc_threshold", "0", "-c:a", "aac", "-f", "flv", @rtmp_push_url ] Enum.reduce_while(1..30, {:error, :retry_exhausted}, fn attempt, _acc -> case System.cmd("ffmpeg", args, stderr_to_stdout: true) do {_output, 0} -> {:halt, :ok} {output, _status} when attempt < 30 -> IO.puts("ffmpeg push retry #{attempt}/30: #{String.trim(output)}") Process.sleep(1_000) {:cont, {:error, output}} {output, status} -> {:halt, {:error, {:ffmpeg_push_failed, status, output}}} end end) |> case do :ok -> :ok {:error, reason} -> assert_ok(false, "ffmpeg push failed: #{inspect(reason)}", &dump_debug/0) end end defp wait_sender_status(base_url, generation_id, role, expected_generation_status) do mode = if role == "hls_edge", do: "hls_edge", else: "origin_ingest" Enum.reduce_while(1..90, {:error, :timeout}, fn _attempt, _acc -> case sender(base_url, "stream.status", %{"mode" => mode, "generation_id" => generation_id}) do {:ok, %{"generation" => %{"status" => ^expected_generation_status}}} -> {:halt, :ok} other -> Process.sleep(1_000) {:cont, other} end end) end defp wait_edge_playlist!(stream_id, generation_id) do path = viewer_playlist_path(stream_id, generation_id, "browser-1") Enum.reduce_while(1..120, {:error, :timeout}, fn _attempt, _acc -> case http_get(@edge <> path, viewer_headers()) do {:ok, 200, "#EXTM3U" <> _rest = playlist} -> case first_segment_ref(playlist) do {:ok, segment_ref} -> {:halt, {playlist, segment_ref}} {:error, _reason} -> retry_playlist({:missing_segment, playlist}) end {:ok, 200, body} -> retry_playlist({:non_hls_playlist, String.slice(body, 0, 200)}) other -> retry_playlist(other) end end) |> case do {playlist, segment_ref} -> {playlist, segment_ref} other -> assert_ok(false, "edge playlist not ready: #{inspect(other)}", &dump_debug/0) end end defp retry_playlist(reason) do Process.sleep(1_000) {:cont, reason} end defp first_segment_ref(playlist) do playlist |> String.split("\n") |> Enum.map(&String.trim/1) |> Enum.find(fn line -> line != "" and not String.starts_with?(line, "#") end) |> case do nil -> {:error, :missing_segment} segment -> {:ok, segment} end end defp fetch_edge_segment(stream_id, generation_id, segment_ref) do path = "/sender/hls/streams/#{stream_id}/#{generation_id}/#{segment_ref}" case http_get(@edge <> path, viewer_headers()) do {:ok, 200, body} -> byte_size(body) > 0 _other -> false end end defp viewer_playlist_path(stream_id, generation_id, viewer_session_id) do "/sender/hls/streams/#{stream_id}/#{generation_id}/media.m3u8?vsid=#{viewer_session_id}" end defp emit_viewer_heartbeats!(stream_id, generation_id) do ["browser-1", "browser-2"] |> Enum.with_index(1) |> Enum.each(fn {viewer_session_id, expected_count} -> {:ok, 202, %{"ok" => true, "viewer_count" => viewer_count}} = post_viewer_heartbeat(stream_id, generation_id, viewer_session_id) assert_ok( viewer_count == expected_count or viewer_count == 2, "viewer heartbeat counted session #{viewer_session_id}, got #{inspect(viewer_count)}" ) end) :ok end defp post_viewer_heartbeat(stream_id, generation_id, viewer_session_id) do http_post_json(@edge <> "/plugins-api/tribe-one-sender/player-events", %{ "event" => "heartbeat", "stream_id" => stream_id, "generation_id" => generation_id, "viewer_session_id" => viewer_session_id }) end defp wait_for_sender_metric_rollup(base_url, expected_value, keepalive_fun) do Enum.reduce_while(1..150, {:error, :timeout}, fn _attempt, _acc -> keepalive_fun.() case admin(base_url, "metrics_rollups.list", %{"limit" => 120}) do {:ok, %{"rollups" => rollups}} -> if Enum.any?(rollups, &sender_metric_rollup?(&1, expected_value)) do {:halt, :ok} else Process.sleep(1_000) {:cont, {:error, rollups}} end other -> Process.sleep(1_000) {:cont, other} end end) end defp wait_for_sender_metric_export(base_url, expected_value) do Enum.reduce_while(1..30, {:error, :timeout}, fn _attempt, _acc -> case http_get(base_url <> "/metrics") do {:ok, 200, body} -> if metrics_body_has_sender_viewer_count?(body, expected_value) do {:halt, :ok} else Process.sleep(1_000) {:cont, {:error, String.slice(body, 0, 2_000)}} end other -> Process.sleep(1_000) {:cont, other} end end) end defp metrics_body_has_sender_viewer_count?(body, expected_value) do expected = Integer.to_string(expected_value) body |> String.split("\n") |> Enum.any?(fn line -> String.starts_with?(line, "plugins_tribe_one_sender_viewer_count ") and String.trim_trailing(line) in [ "plugins_tribe_one_sender_viewer_count #{expected}", "plugins_tribe_one_sender_viewer_count #{expected}.0" ] end) end defp sender_metric_rollup?(%{"plugins" => plugins}, expected_value) when is_map(plugins) do plugins |> get_in(["tribe-one-sender", "viewer_count", "last"]) |> case do nil -> get_in(plugins, ["tribe-one-sender", "viewer_count", "max"]) value -> value end |> numeric_equal?(expected_value) end defp sender_metric_rollup?(_rollup, _expected_value), do: false defp numeric_equal?(value, expected) when is_integer(value) or is_float(value), do: value == expected defp numeric_equal?(value, expected) when is_binary(value) do case Float.parse(value) do {parsed, ""} -> parsed == expected _other -> false end end defp numeric_equal?(_value, _expected), do: false defp connect_cluster_pair(left_base_url, left_node_info, right_base_url, right_node_info) do with {:ok, _} <- upsert_cluster_node(left_base_url, right_node_info), {:ok, _} <- upsert_cluster_node(right_base_url, left_node_info) do :ok else {:error, reason} -> assert_ok(false, "unable to connect cluster pair: #{inspect(reason)}") end end defp upsert_cluster_node(base_url, node_info) do admin(base_url, "cluster_nodes.upsert", %{ "pubkey" => node_pubkey(node_info), "transport_address" => node_info["sync_url"], "scope" => "all", "status" => "active" }) end defp node_pubkey(%{"node_pubkey" => pubkey}) when is_binary(pubkey) and pubkey != "", do: pubkey defp node_pubkey(%{"pubkey" => pubkey}) when is_binary(pubkey) and pubkey != "", do: pubkey defp admin(base_url, method, params) do url = base_url <> "/api/admin/management" body = JSON.encode!(%{"method" => method, "params" => params}) headers = [ {~c"content-type", ~c"application/json"}, {~c"authorization", to_charlist(nip98_authorization("POST", url))} ] case :httpc.request(:post, {to_charlist(url), headers, ~c"application/json", body}, [], body_format: :binary ) do {:ok, {{_version, status, _reason}, _headers, response_body}} -> decoded = decode_json(response_body) if status == 200 and decoded["ok"] == true do {:ok, decoded["result"]} else {:error, %{status: status, body: decoded}} end {:error, reason} -> {:error, reason} end end defp admin_with_retry(base_url, method, params, opts) do attempts = Keyword.get(opts, :attempts, 10) delay_ms = Keyword.get(opts, :delay_ms, 500) Enum.reduce_while(1..attempts, {:error, :retry_exhausted}, fn attempt, _acc -> case admin(base_url, method, params) do {:ok, _result} = ok -> {:halt, ok} error when attempt < attempts -> Process.sleep(delay_ms) {:cont, error} error -> {:halt, error} end end) end defp http_get(url, headers \\ base_headers()) do case :httpc.request(:get, {to_charlist(url), headers}, [], body_format: :binary) do {:ok, {{_version, status, _reason}, _headers, body}} -> {:ok, status, body} {:error, reason} -> {:error, reason} end end defp http_post_json(url, payload) do body = JSON.encode!(payload) headers = [{~c"content-type", ~c"application/json"} | viewer_headers()] case :httpc.request(:post, {to_charlist(url), headers, ~c"application/json", body}, [], body_format: :binary ) do {:ok, {{_version, status, _reason}, _headers, response_body}} -> {:ok, status, decode_json(response_body)} {:error, reason} -> {:error, reason} end end defp base_headers, do: [{~c"x-forwarded-proto", ~c"https"}] defp viewer_headers, do: base_headers() defp wait_http(url) do Enum.reduce_while(1..90, {:error, :timeout}, fn _attempt, _acc -> case http_get(url) do {:ok, status, _body} when status in 200..499 -> {:halt, :ok} other -> Process.sleep(1_000) {:cont, other} end end) end defp wait_victoria_metrics(base_url) do Enum.reduce_while(1..90, {:error, :timeout}, fn _attempt, _acc -> case http_get(base_url <> "/api/v1/query?query=vector(1)") do {:ok, 200, _body} -> {:halt, :ok} other -> Process.sleep(1_000) {:cont, other} end end) end defp nip98_authorization(method, url) do base = %{ "pubkey" => @admin_pubkey, "created_at" => System.system_time(:second), "kind" => 27_235, "tags" => [["method", method], ["u", url]], "content" => "sender-e2e-#{System.unique_integer([:positive, :monotonic])}" } event_id = Auth.compute_event_id(base) {:ok, sig} = Tribes.Keyring.sign_event(Base.decode16!(event_id, case: :lower), @admin_privkey) event = Map.merge(base, %{"id" => event_id, "sig" => sig}) "Nostr " <> Base.encode64(JSON.encode!(event)) end defp decode_json(body) when is_binary(body) do case JSON.decode(body) do {:ok, decoded} -> decoded {:error, _reason} -> %{"raw" => body} end end defp ensure_http_client_started do case :inets.start() do :ok -> :ok {:error, {:already_started, :inets}} -> :ok {:error, reason} -> {:error, reason} end end defp dump_debug do IO.puts(:stderr, "==== sender e2e debug dump ====") Enum.each([origin: @origin, edge: @edge], fn {name, base_url} -> IO.puts(:stderr, "-- #{name} #{base_url} --") IO.puts( :stderr, "node_info: #{inspect(admin(base_url, "node_info", %{}), limit: :infinity)}" ) IO.puts( :stderr, "plugin_list: #{inspect(admin(base_url, "plugin_list", %{}), limit: :infinity)}" ) IO.puts( :stderr, "sender status: #{inspect(sender(base_url, "stream.status", %{}), limit: :infinity)}" ) end) IO.puts(:stderr, "==== end sender e2e debug dump ====") end defp dump_metrics_debug do IO.puts(:stderr, "==== sender e2e metrics debug dump ====") Enum.each([origin: @origin, edge: @edge], fn {name, base_url} -> IO.puts(:stderr, "-- #{name} #{base_url} --") IO.puts( :stderr, "metrics_rollups.list: #{inspect(admin(base_url, "metrics_rollups.list", %{"limit" => 10}), limit: :infinity)}" ) end) IO.puts(:stderr, "==== end sender e2e metrics debug dump ====") end defp assert_ok(true, _message), do: :ok defp assert_ok(:ok, _message), do: :ok defp assert_ok(false, message) do IO.puts(:stderr, "assertion failed: #{message}") System.halt(1) end defp assert_ok(other, message) do IO.puts(:stderr, "assertion failed: #{message} (got: #{inspect(other)})") System.halt(1) end defp assert_ok(true, _message, _on_failure), do: :ok defp assert_ok(false, message, on_failure) when is_function(on_failure, 0) do on_failure.() assert_ok(false, message) end defp assert_ok(other, message, on_failure) when is_function(on_failure, 0) do on_failure.() assert_ok(other, message) end end TribeOne.TribesPlugin.Sender.E2ERunner.main(System.argv())