defmodule ThreeNodeGapE2E.RelayClient do use WebSockex def start_link(url, owner) do WebSockex.start_link(url, __MODULE__, owner, handle_initial_conn_failure: true, async: true, socket_connect_timeout: 2_000, socket_recv_timeout: 2_000 ) end def send_json(pid, payload), do: WebSockex.cast(pid, {:send_json, payload}) def close(pid), do: WebSockex.cast(pid, :close) @impl true def handle_connect(_conn, owner) do send(owner, {:relay_client, self(), :connected}) {:ok, owner} end @impl true def handle_frame({:text, payload}, owner) do frame = case JSON.decode(payload) do {:ok, decoded} -> decoded {:error, reason} -> {:decode_error, reason, payload} end send(owner, {:relay_client, self(), :frame, frame}) {:ok, owner} end def handle_frame(frame, owner) do send(owner, {:relay_client, self(), :frame, frame}) {:ok, owner} end @impl true def handle_cast({:send_json, payload}, owner) do {:reply, {:text, JSON.encode!(payload)}, owner} end def handle_cast(:close, owner), do: {:close, owner} @impl true def handle_disconnect(status, owner) do send(owner, {:relay_client, self(), :disconnected, status}) {:ok, owner} end end defmodule ThreeNodeGapE2E.Runner do alias Parrhesia.API.Auth alias ThreeNodeGapE2E.RelayClient @kind 5000 @resource "tribes.logs.entry" @admin_private_key String.duplicate("1", 64) @node_event_keys %{ "a" => String.duplicate("2", 64), "b" => String.duplicate("3", 64), "c" => String.duplicate("4", 64) } @frame_timeout_ms 20_000 def main(_argv) do with {:ok, _apps} <- Application.ensure_all_started(:req), {:ok, _apps} <- Application.ensure_all_started(:websockex), {:ok, config} <- load_config(), :ok <- run(config) do IO.puts("three-node gap e2e completed") else {:error, reason} -> IO.puts(:stderr, "three-node gap e2e failed: #{format_reason(reason)}") System.halt(1) end end defp load_config do count = parse_int_env("PARRHESIA_THREE_NODE_EVENTS_PER_NODE", 6_000) negentropy_timeout_ms = parse_int_env("PARRHESIA_THREE_NODE_NEGENTROPY_TIMEOUT_MS", 3_000) backfill_page_size = parse_int_env("PARRHESIA_THREE_NODE_BACKFILL_PAGE_SIZE", 25) sync_mode = System.get_env("PARRHESIA_THREE_NODE_SYNC_MODE", "req_stream") run_id = System.get_env("PARRHESIA_THREE_NODE_RUN_ID", default_run_id()) with {:ok, node_a} <- load_node("A"), {:ok, node_b} <- load_node("B"), {:ok, node_c} <- load_node("C"), {:ok, event_pubkeys} <- event_pubkeys() do {:ok, %{ run_id: run_id, count: count, resource: @resource, filter: %{"kinds" => [@kind], "#r" => [@resource]}, admin_private_key: @admin_private_key, node_event_keys: @node_event_keys, admin_pubkey: elem(derive_pubkey(@admin_private_key), 1), event_pubkeys: event_pubkeys, negentropy_timeout_ms: negentropy_timeout_ms, backfill_page_size: backfill_page_size, sync_mode: sync_mode, node_a: node_a, node_b: node_b, node_c: node_c }} end end defp run(config) do IO.puts("scenario run_id=#{config.run_id} count=#{config.count}") with :ok <- wait_for_health(config.node_a), {:ok, a_node_pubkey} <- fetch_node_pubkey(config, config.node_a), {:ok, a_relay_pubkey} <- fetch_relay_pubkey(config, config.node_a), :ok <- ensure_sync_page_advertised(config.node_a, :node_a), :ok <- ensure_client_access(config, config.node_a), :ok <- publish_many(config, config.node_a, "a", config.count), :ok <- wait_for_author_count(config, config.node_a, "a", config.count), :ok <- maybe_start_node("B"), :ok <- wait_for_health(config.node_b), {:ok, b_node_pubkey} <- fetch_node_pubkey(config, config.node_b), {:ok, b_relay_pubkey} <- fetch_relay_pubkey(config, config.node_b), :ok <- ensure_sync_page_advertised(config.node_b, :node_b), :ok <- ensure_client_access(config, config.node_b), :ok <- publish_many(config, config.node_b, "b", config.count), :ok <- wait_for_author_count(config, config.node_b, "b", config.count), :ok <- grant_pair(config, config.node_a, b_node_pubkey), :ok <- grant_pair(config, config.node_b, a_node_pubkey), :ok <- configure_sync( config, config.node_b, "b-from-a", config.node_a, a_node_pubkey, a_relay_pubkey ), :ok <- configure_sync( config, config.node_a, "a-from-b", config.node_b, b_node_pubkey, b_relay_pubkey ), :ok <- wait_for_author_count(config, config.node_b, "a", config.count), :ok <- wait_for_author_count(config, config.node_a, "b", config.count), :ok <- stop_sync(config, config.node_a, "a-from-b"), :ok <- stop_sync(config, config.node_b, "b-from-a"), :ok <- maybe_stop_node("B"), :ok <- maybe_start_node("C"), :ok <- wait_for_health(config.node_c), {:ok, c_node_pubkey} <- fetch_node_pubkey(config, config.node_c), {:ok, c_relay_pubkey} <- fetch_relay_pubkey(config, config.node_c), :ok <- ensure_sync_page_advertised(config.node_c, :node_c), :ok <- ensure_client_access(config, config.node_c), :ok <- publish_many(config, config.node_c, "c", config.count), :ok <- wait_for_author_count(config, config.node_c, "c", config.count), :ok <- grant_pair(config, config.node_a, c_node_pubkey), :ok <- grant_pair(config, config.node_c, a_node_pubkey), :ok <- configure_sync( config, config.node_c, "c-from-a", config.node_a, a_node_pubkey, a_relay_pubkey ), :ok <- configure_sync( config, config.node_a, "a-from-c", config.node_c, c_node_pubkey, c_relay_pubkey ), :ok <- wait_for_author_count(config, config.node_c, "a", config.count), :ok <- wait_for_author_count(config, config.node_c, "b", config.count), :ok <- wait_for_author_count(config, config.node_a, "c", config.count), {:ok, summary} <- summarize(config) do IO.puts("summary=#{JSON.encode!(summary)}") :ok end end defp maybe_start_node(suffix), do: maybe_run_node_command("start", suffix, "PARRHESIA_THREE_NODE_START_#{suffix}_CMD") defp maybe_stop_node(suffix), do: maybe_run_node_command("stop", suffix, "PARRHESIA_THREE_NODE_STOP_#{suffix}_CMD") defp maybe_run_node_command(action, suffix, env_name) do case System.get_env(env_name) do nil -> :ok "" -> :ok command -> IO.puts("#{action} node #{suffix}: #{command}") case System.cmd("sh", ["-lc", command], stderr_to_stdout: true) do {output, 0} -> if String.trim(output) != "", do: IO.write(output) :ok {output, status} -> {:error, {:node_command_failed, action, suffix, status, output}} end end end defp load_node(suffix) do http_url = System.fetch_env!("PARRHESIA_NODE_#{suffix}_HTTP_URL") websocket_url = System.fetch_env!("PARRHESIA_NODE_#{suffix}_WS_URL") relay_auth_url = System.get_env("PARRHESIA_NODE_#{suffix}_RELAY_AUTH_URL", websocket_url) sync_url = System.get_env("PARRHESIA_NODE_#{suffix}_SYNC_URL", relay_auth_url) {:ok, %{ http_url: http_url, websocket_url: websocket_url, relay_auth_url: relay_auth_url, sync_url: sync_url }} rescue KeyError -> {:error, {:missing_node_env, suffix}} end defp wait_for_health(node) do wait_until("health #{node.http_url}", 30_000, 250, fn -> case Req.get( url: node.http_url <> "/health", decode_body: false, retry: false, connect_options: [timeout: 1_000], receive_timeout: 1_000 ) do {:ok, %{status: 200, body: "ok"}} -> {:ok, :ready} {:ok, %{status: status}} -> {:retry, {:status, status}} {:error, reason} -> {:retry, reason} end end) |> unwrap_wait() end defp ensure_client_access(config, node) do clients = Map.put(config.event_pubkeys, "admin", config.admin_pubkey) Enum.reduce_while(clients, :ok, fn {_label, pubkey}, :ok -> case grant_pair(config, node, pubkey) do :ok -> {:cont, :ok} {:error, reason} -> {:halt, {:error, reason}} end end) end defp grant_pair(config, node, principal) do with :ok <- ensure_acl(config, node, principal, "sync_read", config.filter), :ok <- ensure_acl(config, node, principal, "sync_write", config.filter) do :ok end end defp ensure_acl(config, node, principal, capability, match) do params = %{ "principal_type" => "pubkey", "principal" => principal, "capability" => capability, "match" => match } case management_call(config, node, "acl_grant", params) do {:ok, %{"ok" => true}} -> :ok {:ok, other} -> {:error, {:unexpected_acl_result, other}} {:error, reason} -> {:error, {:acl_grant_failed, capability, principal, reason}} end end defp configure_sync( config, target_node, server_id, source_node, source_node_pubkey, source_relay_pubkey ) do params = %{ "id" => server_id, "url" => source_node.sync_url, "enabled?" => true, "auth_pubkey" => source_node_pubkey, "relay_pubkey" => source_relay_pubkey, "filters" => [config.filter], "mode" => config.sync_mode, "metadata" => %{ "negentropy_timeout_ms" => config.negentropy_timeout_ms, "backfill_page_size" => config.backfill_page_size }, "tls" => %{"mode" => "disabled", "pins" => []} } with {:ok, _server} <- management_call(config, target_node, "sync_put_server", params), {:ok, %{"ok" => true}} <- management_call(config, target_node, "sync_start_server", %{"id" => server_id}), :ok <- wait_for_sync_connected(config, target_node, server_id) do :ok end end defp stop_sync(config, node, server_id) do case management_call(config, node, "sync_stop_server", %{"id" => server_id}) do {:ok, %{"ok" => true}} -> :ok {:ok, _other} -> :ok {:error, _reason} -> :ok end end defp wait_for_sync_connected(config, node, server_id) do wait_until("sync connected #{server_id}", 30_000, 500, fn -> case management_call(config, node, "sync_server_stats", %{"id" => server_id}) do {:ok, %{"connected" => true, "query_runs" => query_runs} = stats} when query_runs >= 1 -> {:ok, stats} {:ok, stats} -> {:retry, stats} {:error, reason} -> {:retry, reason} end end) |> unwrap_wait() end defp publish_many(config, node, label, count) do private_key = Map.fetch!(config.node_event_keys, label) created_at = System.system_time(:second) IO.puts("publish #{count} events for #{label} to #{node.http_url}") with {:ok, client} <- RelayClient.start_link(node.websocket_url, self()), :ok <- await_client_connect(client) do try do 1..count |> Enum.reduce_while(:ok, fn index, :ok -> if rem(index, 1_000) == 0, do: IO.puts("published #{label} #{index}/#{count}") event = %{ "created_at" => created_at, "kind" => @kind, "tags" => [ ["r", config.resource], ["t", "three-node-gap-e2e"], ["run", config.run_id], ["node", label] ], "content" => "#{config.run_id}:#{label}:#{index}" } |> sign_event!(private_key) case publish_event(client, node.relay_auth_url, private_key, event) do :ok -> {:cont, :ok} {:error, reason} -> {:halt, {:error, {:publish_many_failed, label, index, reason}}} end end) after RelayClient.close(client) end end end defp wait_for_author_count(config, node, label, expected) do author = Map.fetch!(config.event_pubkeys, label) wait_until("#{node.http_url} author #{label} count #{expected}", 120_000, 1_000, fn -> case query_count(config, node, author, expected + 100) do {:ok, count} when count >= expected -> IO.puts("#{node.http_url} has #{count} events from #{label}") {:ok, count} {:ok, count} -> IO.puts("#{node.http_url} has #{count}/#{expected} events from #{label}") {:retry, count} {:error, reason} -> {:retry, reason} end end) |> unwrap_wait() end defp query_count(config, node, author, _limit) do filter = config.filter |> Map.put("authors", [author]) |> Map.delete("limit") with {:ok, client} <- RelayClient.start_link(node.websocket_url, self()), :ok <- await_client_connect(client) do subscription_id = "three-node-gap-count-#{System.unique_integer([:positive, :monotonic])}" try do with :ok <- authenticate_connection( config_auth_resource(filter), client, node.relay_auth_url, config.admin_private_key ) do :ok = RelayClient.send_json(client, ["COUNT", subscription_id, filter]) receive_count(client, subscription_id) end after RelayClient.close(client) end end end defp receive_count(client, subscription_id) do receive do {:relay_client, ^client, :frame, ["COUNT", ^subscription_id, %{"count" => count}]} when is_integer(count) -> {:ok, count} {:relay_client, ^client, :frame, ["CLOSED", ^subscription_id, message]} -> {:error, {:count_closed, message}} {:relay_client, ^client, :frame, {:decode_error, reason, payload}} -> {:error, {:decode_error, reason, payload}} {:relay_client, ^client, :disconnected, status} -> {:error, {:disconnected, status.reason}} after @frame_timeout_ms -> {:error, :count_timeout} end end defp summarize(config) do nodes = [{"a", config.node_a}, {"c", config.node_c}] summary = Map.new(nodes, fn {node_label, node} -> counts = Map.new(["a", "b", "c"], fn author_label -> author = Map.fetch!(config.event_pubkeys, author_label) value = case query_count(config, node, author, config.count * 3 + 100) do {:ok, count} -> count {:error, reason} -> %{"error" => inspect(reason)} end {author_label, value} end) {node_label, counts} end) {:ok, summary} end defp fetch_node_pubkey(config, node), do: fetch_identity_pubkey(config, node, "node") defp fetch_relay_pubkey(config, node), do: fetch_identity_pubkey(config, node, "relay") defp ensure_sync_page_advertised(node, label) do relay_info_url = node.http_url <> "/relay" case Req.get( url: relay_info_url, headers: [{"accept", "application/nostr+json"}], decode_body: false, retry: false, connect_options: [timeout: 1_000], receive_timeout: 1_000 ) do {:ok, %{status: 200, body: body}} -> case JSON.decode(body) do {:ok, %{"limitation" => %{"parrhesia_sync_page" => true}}} -> :ok {:ok, document} -> {:error, {label, :sync_page_not_advertised, document}} {:error, reason} -> {:error, {label, :relay_info_decode_failed, reason}} end {:ok, %{status: status}} -> {:error, {label, :relay_info_request_failed, status}} {:error, reason} -> {:error, {label, :relay_info_request_failed, reason}} end end defp fetch_identity_pubkey(config, node, role) do case management_call(config, node, "identity_get", %{"role" => role}) do {:ok, %{"pubkey" => pubkey}} when is_binary(pubkey) -> {:ok, String.downcase(pubkey)} {:ok, other} -> {:error, {:unexpected_identity_payload, other}} {:error, reason} -> {:error, {:identity_get_failed, role, reason}} end end defp authenticate_connection(resource, client, relay_auth_url, private_key) do event = %{ "created_at" => System.system_time(:second), "kind" => @kind, "tags" => [["r", resource], ["t", "three-node-gap-e2e-auth"]], "content" => "auth:#{System.unique_integer([:positive, :monotonic])}" } |> sign_event!(private_key) publish_event(client, relay_auth_url, private_key, event) end defp config_auth_resource(%{"#r" => [resource | _rest]}) when is_binary(resource), do: resource defp config_auth_resource(_filter), do: @resource defp publish_event(client, relay_auth_url, private_key, event) do :ok = RelayClient.send_json(client, ["EVENT", event]) do_publish_event(client, relay_auth_url, private_key, event, event["id"], false, nil, false) end defp do_publish_event( client, relay_auth_url, private_key, event, published_event_id, authenticated?, auth_event_id, replayed_after_auth? ) do receive do {:relay_client, ^client, :frame, ["AUTH", challenge]} -> auth_event = auth_event(relay_auth_url, challenge) |> sign_event!(private_key) :ok = RelayClient.send_json(client, ["AUTH", auth_event]) do_publish_event( client, relay_auth_url, private_key, event, published_event_id, authenticated?, auth_event["id"], replayed_after_auth? ) {:relay_client, ^client, :frame, ["OK", event_id, true, _message]} when event_id == auth_event_id -> :ok = RelayClient.send_json(client, ["EVENT", event]) do_publish_event( client, relay_auth_url, private_key, event, published_event_id, true, nil, true ) {:relay_client, ^client, :frame, ["OK", event_id, false, message]} when event_id == auth_event_id -> {:error, {:auth_failed, message}} {:relay_client, ^client, :frame, ["OK", event_id, true, _message]} when event_id == published_event_id -> :ok {:relay_client, ^client, :frame, ["OK", event_id, false, message]} when event_id == published_event_id -> cond do authenticated? and replayed_after_auth? and not auth_required_message?(message) -> {:error, {:event_rejected, message}} auth_required_message?(message) -> do_publish_event( client, relay_auth_url, private_key, event, published_event_id, authenticated?, auth_event_id, replayed_after_auth? ) true -> {:error, {:event_rejected, message}} end {:relay_client, ^client, :frame, {:decode_error, reason, payload}} -> {:error, {:decode_error, reason, payload}} {:relay_client, ^client, :disconnected, status} -> {:error, {:disconnected, status.reason}} after @frame_timeout_ms -> {:error, :publish_timeout} end end defp await_client_connect(client) do receive do {:relay_client, ^client, :connected} -> :ok {:relay_client, ^client, :disconnected, status} -> {:error, {:disconnected, status.reason}} after @frame_timeout_ms -> {:error, :connect_timeout} end end defp management_call(config, node, method, params) do url = node.http_url <> "/management" auth_header = nip98_event("POST", url) |> sign_event!(config.admin_private_key) |> then(&("Nostr " <> Base.encode64(JSON.encode!(&1)))) case Req.post( url: url, headers: [{"authorization", auth_header}], json: %{"method" => method, "params" => params}, decode_body: false, connect_options: [timeout: 2_000], receive_timeout: 10_000 ) do {:ok, %{status: 200, body: body}} -> decode_management_response(body) {:ok, %{status: status, body: body}} -> {:error, {:management_http_error, status, body}} {:error, reason} -> {:error, reason} end end defp decode_management_response(body) do with {:ok, %{"ok" => true, "result" => result}} <- JSON.decode(body) do {:ok, result} else {:ok, %{"ok" => false, "error" => error}} -> {:error, {:management_error, error}} {:ok, other} -> {:error, {:unexpected_management_response, other}} {:error, reason} -> {:error, {:invalid_management_json, reason}} end end defp sign_event!(event, private_key_hex) do {:ok, pubkey} = derive_pubkey(private_key_hex) seckey = Base.decode16!(String.downcase(private_key_hex), case: :lower) unsigned_event = event |> Map.put("pubkey", pubkey) |> Map.put("sig", String.duplicate("0", 128)) id = Auth.compute_event_id(unsigned_event) signature = id |> Base.decode16!(case: :lower) |> Secp256k1.schnorr_sign(seckey) |> Base.encode16(case: :lower) unsigned_event |> Map.put("id", id) |> Map.put("sig", signature) end defp event_pubkeys do @node_event_keys |> Enum.map(fn {label, key} -> with {:ok, pubkey} <- derive_pubkey(key), do: {label, pubkey} end) |> Enum.reduce_while({:ok, %{}}, fn {label, pubkey}, {:ok, acc} -> {:cont, {:ok, Map.put(acc, label, pubkey)}} {:error, reason}, _acc -> {:halt, {:error, reason}} end) end defp derive_pubkey(private_key_hex) do normalized = String.downcase(private_key_hex) case Base.decode16(normalized, case: :lower) do {:ok, <<_::256>> = seckey} -> {:ok, seckey |> Secp256k1.pubkey(:xonly) |> Base.encode16(case: :lower)} _other -> {:error, {:invalid_private_key, private_key_hex}} end rescue _error -> {:error, {:invalid_private_key, private_key_hex}} end defp nip98_event(method, url), do: %{ "created_at" => System.system_time(:second), "kind" => 27_235, "tags" => [ ["method", method], ["u", url], ["nonce", "#{System.unique_integer([:positive, :monotonic])}"] ], "content" => "" } defp auth_event(relay_auth_url, challenge), do: %{ "created_at" => System.system_time(:second), "kind" => 22_242, "tags" => [["challenge", challenge], ["relay", relay_auth_url]], "content" => "" } defp auth_required_message?(message) when is_binary(message), do: String.contains?(String.downcase(message), "auth") defp auth_required_message?(_message), do: false defp wait_until(label, timeout_ms, interval_ms, fun) do started_at = System.monotonic_time(:millisecond) do_wait_until(label, timeout_ms, interval_ms, started_at, fun) end defp do_wait_until(label, timeout_ms, interval_ms, started_at, fun) do case fun.() do {:ok, value} -> {:ok, value} {:retry, reason} -> if System.monotonic_time(:millisecond) - started_at >= timeout_ms do {:error, {:timeout, label, reason}} else Process.sleep(interval_ms) do_wait_until(label, timeout_ms, interval_ms, started_at, fun) end end end defp unwrap_wait({:ok, _value}), do: :ok defp unwrap_wait({:error, reason}), do: {:error, reason} defp parse_int_env(name, default) do case System.get_env(name) do nil -> default value -> String.to_integer(value) end end defp default_run_id, do: "three-node-gap-#{System.system_time(:millisecond)}" defp format_reason({:timeout, label, reason}), do: "timeout waiting for #{label}: #{inspect(reason)}" defp format_reason(reason), do: inspect(reason) end ThreeNodeGapE2E.Runner.main(System.argv())