Files
parrhesia/scripts/node_sync_three_node_gap_e2e.exs
self 35ae17cc05 chore: prepare 0.14.0 release
Document and test SYNC-PAGE behavior, add the NIP-11 advertisement toggle, extend e2e coverage, apply small backfill cleanup fixes, and bump release metadata.
2026-05-25 20:35:06 +02:00

757 lines
24 KiB
Elixir

defmodule ThreeNodeGapE2E.RelayClient do
use WebSockex
def start_link(url, owner) do
WebSockex.start_link(url, __MODULE__, owner,
handle_initial_conn_failure: true,
async: true,
socket_connect_timeout: 2_000,
socket_recv_timeout: 2_000
)
end
def send_json(pid, payload), do: WebSockex.cast(pid, {:send_json, payload})
def close(pid), do: WebSockex.cast(pid, :close)
@impl true
def handle_connect(_conn, owner) do
send(owner, {:relay_client, self(), :connected})
{:ok, owner}
end
@impl true
def handle_frame({:text, payload}, owner) do
frame =
case JSON.decode(payload) do
{:ok, decoded} -> decoded
{:error, reason} -> {:decode_error, reason, payload}
end
send(owner, {:relay_client, self(), :frame, frame})
{:ok, owner}
end
def handle_frame(frame, owner) do
send(owner, {:relay_client, self(), :frame, frame})
{:ok, owner}
end
@impl true
def handle_cast({:send_json, payload}, owner) do
{:reply, {:text, JSON.encode!(payload)}, owner}
end
def handle_cast(:close, owner), do: {:close, owner}
@impl true
def handle_disconnect(status, owner) do
send(owner, {:relay_client, self(), :disconnected, status})
{:ok, owner}
end
end
defmodule ThreeNodeGapE2E.Runner do
alias Parrhesia.API.Auth
alias ThreeNodeGapE2E.RelayClient
@kind 5000
@resource "tribes.logs.entry"
@admin_private_key String.duplicate("1", 64)
@node_event_keys %{
"a" => String.duplicate("2", 64),
"b" => String.duplicate("3", 64),
"c" => String.duplicate("4", 64)
}
@frame_timeout_ms 20_000
def main(_argv) do
with {:ok, _apps} <- Application.ensure_all_started(:req),
{:ok, _apps} <- Application.ensure_all_started(:websockex),
{:ok, config} <- load_config(),
:ok <- run(config) do
IO.puts("three-node gap e2e completed")
else
{:error, reason} ->
IO.puts(:stderr, "three-node gap e2e failed: #{format_reason(reason)}")
System.halt(1)
end
end
defp load_config do
count = parse_int_env("PARRHESIA_THREE_NODE_EVENTS_PER_NODE", 6_000)
negentropy_timeout_ms = parse_int_env("PARRHESIA_THREE_NODE_NEGENTROPY_TIMEOUT_MS", 3_000)
backfill_page_size = parse_int_env("PARRHESIA_THREE_NODE_BACKFILL_PAGE_SIZE", 25)
sync_mode = System.get_env("PARRHESIA_THREE_NODE_SYNC_MODE", "req_stream")
run_id = System.get_env("PARRHESIA_THREE_NODE_RUN_ID", default_run_id())
with {:ok, node_a} <- load_node("A"),
{:ok, node_b} <- load_node("B"),
{:ok, node_c} <- load_node("C"),
{:ok, event_pubkeys} <- event_pubkeys() do
{:ok,
%{
run_id: run_id,
count: count,
resource: @resource,
filter: %{"kinds" => [@kind], "#r" => [@resource]},
admin_private_key: @admin_private_key,
node_event_keys: @node_event_keys,
admin_pubkey: elem(derive_pubkey(@admin_private_key), 1),
event_pubkeys: event_pubkeys,
negentropy_timeout_ms: negentropy_timeout_ms,
backfill_page_size: backfill_page_size,
sync_mode: sync_mode,
node_a: node_a,
node_b: node_b,
node_c: node_c
}}
end
end
defp run(config) do
IO.puts("scenario run_id=#{config.run_id} count=#{config.count}")
with :ok <- wait_for_health(config.node_a),
{:ok, a_node_pubkey} <- fetch_node_pubkey(config, config.node_a),
{:ok, a_relay_pubkey} <- fetch_relay_pubkey(config, config.node_a),
:ok <- ensure_sync_page_advertised(config.node_a, :node_a),
:ok <- ensure_client_access(config, config.node_a),
:ok <- publish_many(config, config.node_a, "a", config.count),
:ok <- wait_for_author_count(config, config.node_a, "a", config.count),
:ok <- maybe_start_node("B"),
:ok <- wait_for_health(config.node_b),
{:ok, b_node_pubkey} <- fetch_node_pubkey(config, config.node_b),
{:ok, b_relay_pubkey} <- fetch_relay_pubkey(config, config.node_b),
:ok <- ensure_sync_page_advertised(config.node_b, :node_b),
:ok <- ensure_client_access(config, config.node_b),
:ok <- publish_many(config, config.node_b, "b", config.count),
:ok <- wait_for_author_count(config, config.node_b, "b", config.count),
:ok <- grant_pair(config, config.node_a, b_node_pubkey),
:ok <- grant_pair(config, config.node_b, a_node_pubkey),
:ok <-
configure_sync(
config,
config.node_b,
"b-from-a",
config.node_a,
a_node_pubkey,
a_relay_pubkey
),
:ok <-
configure_sync(
config,
config.node_a,
"a-from-b",
config.node_b,
b_node_pubkey,
b_relay_pubkey
),
:ok <- wait_for_author_count(config, config.node_b, "a", config.count),
:ok <- wait_for_author_count(config, config.node_a, "b", config.count),
:ok <- stop_sync(config, config.node_a, "a-from-b"),
:ok <- stop_sync(config, config.node_b, "b-from-a"),
:ok <- maybe_stop_node("B"),
:ok <- maybe_start_node("C"),
:ok <- wait_for_health(config.node_c),
{:ok, c_node_pubkey} <- fetch_node_pubkey(config, config.node_c),
{:ok, c_relay_pubkey} <- fetch_relay_pubkey(config, config.node_c),
:ok <- ensure_sync_page_advertised(config.node_c, :node_c),
:ok <- ensure_client_access(config, config.node_c),
:ok <- publish_many(config, config.node_c, "c", config.count),
:ok <- wait_for_author_count(config, config.node_c, "c", config.count),
:ok <- grant_pair(config, config.node_a, c_node_pubkey),
:ok <- grant_pair(config, config.node_c, a_node_pubkey),
:ok <-
configure_sync(
config,
config.node_c,
"c-from-a",
config.node_a,
a_node_pubkey,
a_relay_pubkey
),
:ok <-
configure_sync(
config,
config.node_a,
"a-from-c",
config.node_c,
c_node_pubkey,
c_relay_pubkey
),
:ok <- wait_for_author_count(config, config.node_c, "a", config.count),
:ok <- wait_for_author_count(config, config.node_c, "b", config.count),
:ok <- wait_for_author_count(config, config.node_a, "c", config.count),
{:ok, summary} <- summarize(config) do
IO.puts("summary=#{JSON.encode!(summary)}")
:ok
end
end
defp maybe_start_node(suffix),
do: maybe_run_node_command("start", suffix, "PARRHESIA_THREE_NODE_START_#{suffix}_CMD")
defp maybe_stop_node(suffix),
do: maybe_run_node_command("stop", suffix, "PARRHESIA_THREE_NODE_STOP_#{suffix}_CMD")
defp maybe_run_node_command(action, suffix, env_name) do
case System.get_env(env_name) do
nil ->
:ok
"" ->
:ok
command ->
IO.puts("#{action} node #{suffix}: #{command}")
case System.cmd("sh", ["-lc", command], stderr_to_stdout: true) do
{output, 0} ->
if String.trim(output) != "", do: IO.write(output)
:ok
{output, status} ->
{:error, {:node_command_failed, action, suffix, status, output}}
end
end
end
defp load_node(suffix) do
http_url = System.fetch_env!("PARRHESIA_NODE_#{suffix}_HTTP_URL")
websocket_url = System.fetch_env!("PARRHESIA_NODE_#{suffix}_WS_URL")
relay_auth_url = System.get_env("PARRHESIA_NODE_#{suffix}_RELAY_AUTH_URL", websocket_url)
sync_url = System.get_env("PARRHESIA_NODE_#{suffix}_SYNC_URL", relay_auth_url)
{:ok,
%{
http_url: http_url,
websocket_url: websocket_url,
relay_auth_url: relay_auth_url,
sync_url: sync_url
}}
rescue
KeyError -> {:error, {:missing_node_env, suffix}}
end
defp wait_for_health(node) do
wait_until("health #{node.http_url}", 30_000, 250, fn ->
case Req.get(
url: node.http_url <> "/health",
decode_body: false,
retry: false,
connect_options: [timeout: 1_000],
receive_timeout: 1_000
) do
{:ok, %{status: 200, body: "ok"}} -> {:ok, :ready}
{:ok, %{status: status}} -> {:retry, {:status, status}}
{:error, reason} -> {:retry, reason}
end
end)
|> unwrap_wait()
end
defp ensure_client_access(config, node) do
clients = Map.put(config.event_pubkeys, "admin", config.admin_pubkey)
Enum.reduce_while(clients, :ok, fn {_label, pubkey}, :ok ->
case grant_pair(config, node, pubkey) do
:ok -> {:cont, :ok}
{:error, reason} -> {:halt, {:error, reason}}
end
end)
end
defp grant_pair(config, node, principal) do
with :ok <- ensure_acl(config, node, principal, "sync_read", config.filter),
:ok <- ensure_acl(config, node, principal, "sync_write", config.filter) do
:ok
end
end
defp ensure_acl(config, node, principal, capability, match) do
params = %{
"principal_type" => "pubkey",
"principal" => principal,
"capability" => capability,
"match" => match
}
case management_call(config, node, "acl_grant", params) do
{:ok, %{"ok" => true}} -> :ok
{:ok, other} -> {:error, {:unexpected_acl_result, other}}
{:error, reason} -> {:error, {:acl_grant_failed, capability, principal, reason}}
end
end
defp configure_sync(
config,
target_node,
server_id,
source_node,
source_node_pubkey,
source_relay_pubkey
) do
params = %{
"id" => server_id,
"url" => source_node.sync_url,
"enabled?" => true,
"auth_pubkey" => source_node_pubkey,
"relay_pubkey" => source_relay_pubkey,
"filters" => [config.filter],
"mode" => config.sync_mode,
"metadata" => %{
"negentropy_timeout_ms" => config.negentropy_timeout_ms,
"backfill_page_size" => config.backfill_page_size
},
"tls" => %{"mode" => "disabled", "pins" => []}
}
with {:ok, _server} <- management_call(config, target_node, "sync_put_server", params),
{:ok, %{"ok" => true}} <-
management_call(config, target_node, "sync_start_server", %{"id" => server_id}),
:ok <- wait_for_sync_connected(config, target_node, server_id) do
:ok
end
end
defp stop_sync(config, node, server_id) do
case management_call(config, node, "sync_stop_server", %{"id" => server_id}) do
{:ok, %{"ok" => true}} -> :ok
{:ok, _other} -> :ok
{:error, _reason} -> :ok
end
end
defp wait_for_sync_connected(config, node, server_id) do
wait_until("sync connected #{server_id}", 30_000, 500, fn ->
case management_call(config, node, "sync_server_stats", %{"id" => server_id}) do
{:ok, %{"connected" => true, "query_runs" => query_runs} = stats} when query_runs >= 1 ->
{:ok, stats}
{:ok, stats} ->
{:retry, stats}
{:error, reason} ->
{:retry, reason}
end
end)
|> unwrap_wait()
end
defp publish_many(config, node, label, count) do
private_key = Map.fetch!(config.node_event_keys, label)
created_at = System.system_time(:second)
IO.puts("publish #{count} events for #{label} to #{node.http_url}")
with {:ok, client} <- RelayClient.start_link(node.websocket_url, self()),
:ok <- await_client_connect(client) do
try do
1..count
|> Enum.reduce_while(:ok, fn index, :ok ->
if rem(index, 1_000) == 0, do: IO.puts("published #{label} #{index}/#{count}")
event =
%{
"created_at" => created_at,
"kind" => @kind,
"tags" => [
["r", config.resource],
["t", "three-node-gap-e2e"],
["run", config.run_id],
["node", label]
],
"content" => "#{config.run_id}:#{label}:#{index}"
}
|> sign_event!(private_key)
case publish_event(client, node.relay_auth_url, private_key, event) do
:ok -> {:cont, :ok}
{:error, reason} -> {:halt, {:error, {:publish_many_failed, label, index, reason}}}
end
end)
after
RelayClient.close(client)
end
end
end
defp wait_for_author_count(config, node, label, expected) do
author = Map.fetch!(config.event_pubkeys, label)
wait_until("#{node.http_url} author #{label} count #{expected}", 120_000, 1_000, fn ->
case query_count(config, node, author, expected + 100) do
{:ok, count} when count >= expected ->
IO.puts("#{node.http_url} has #{count} events from #{label}")
{:ok, count}
{:ok, count} ->
IO.puts("#{node.http_url} has #{count}/#{expected} events from #{label}")
{:retry, count}
{:error, reason} ->
{:retry, reason}
end
end)
|> unwrap_wait()
end
defp query_count(config, node, author, _limit) do
filter = config.filter |> Map.put("authors", [author]) |> Map.delete("limit")
with {:ok, client} <- RelayClient.start_link(node.websocket_url, self()),
:ok <- await_client_connect(client) do
subscription_id = "three-node-gap-count-#{System.unique_integer([:positive, :monotonic])}"
try do
with :ok <-
authenticate_connection(
config_auth_resource(filter),
client,
node.relay_auth_url,
config.admin_private_key
) do
:ok = RelayClient.send_json(client, ["COUNT", subscription_id, filter])
receive_count(client, subscription_id)
end
after
RelayClient.close(client)
end
end
end
defp receive_count(client, subscription_id) do
receive do
{:relay_client, ^client, :frame, ["COUNT", ^subscription_id, %{"count" => count}]}
when is_integer(count) ->
{:ok, count}
{:relay_client, ^client, :frame, ["CLOSED", ^subscription_id, message]} ->
{:error, {:count_closed, message}}
{:relay_client, ^client, :frame, {:decode_error, reason, payload}} ->
{:error, {:decode_error, reason, payload}}
{:relay_client, ^client, :disconnected, status} ->
{:error, {:disconnected, status.reason}}
after
@frame_timeout_ms -> {:error, :count_timeout}
end
end
defp summarize(config) do
nodes = [{"a", config.node_a}, {"c", config.node_c}]
summary =
Map.new(nodes, fn {node_label, node} ->
counts =
Map.new(["a", "b", "c"], fn author_label ->
author = Map.fetch!(config.event_pubkeys, author_label)
value =
case query_count(config, node, author, config.count * 3 + 100) do
{:ok, count} -> count
{:error, reason} -> %{"error" => inspect(reason)}
end
{author_label, value}
end)
{node_label, counts}
end)
{:ok, summary}
end
defp fetch_node_pubkey(config, node), do: fetch_identity_pubkey(config, node, "node")
defp fetch_relay_pubkey(config, node), do: fetch_identity_pubkey(config, node, "relay")
defp ensure_sync_page_advertised(node, label) do
relay_info_url = node.http_url <> "/relay"
case Req.get(
url: relay_info_url,
headers: [{"accept", "application/nostr+json"}],
decode_body: false,
retry: false,
connect_options: [timeout: 1_000],
receive_timeout: 1_000
) do
{:ok, %{status: 200, body: body}} ->
case JSON.decode(body) do
{:ok, %{"limitation" => %{"parrhesia_sync_page" => true}}} ->
:ok
{:ok, document} ->
{:error, {label, :sync_page_not_advertised, document}}
{:error, reason} ->
{:error, {label, :relay_info_decode_failed, reason}}
end
{:ok, %{status: status}} ->
{:error, {label, :relay_info_request_failed, status}}
{:error, reason} ->
{:error, {label, :relay_info_request_failed, reason}}
end
end
defp fetch_identity_pubkey(config, node, role) do
case management_call(config, node, "identity_get", %{"role" => role}) do
{:ok, %{"pubkey" => pubkey}} when is_binary(pubkey) -> {:ok, String.downcase(pubkey)}
{:ok, other} -> {:error, {:unexpected_identity_payload, other}}
{:error, reason} -> {:error, {:identity_get_failed, role, reason}}
end
end
defp authenticate_connection(resource, client, relay_auth_url, private_key) do
event =
%{
"created_at" => System.system_time(:second),
"kind" => @kind,
"tags" => [["r", resource], ["t", "three-node-gap-e2e-auth"]],
"content" => "auth:#{System.unique_integer([:positive, :monotonic])}"
}
|> sign_event!(private_key)
publish_event(client, relay_auth_url, private_key, event)
end
defp config_auth_resource(%{"#r" => [resource | _rest]}) when is_binary(resource), do: resource
defp config_auth_resource(_filter), do: @resource
defp publish_event(client, relay_auth_url, private_key, event) do
:ok = RelayClient.send_json(client, ["EVENT", event])
do_publish_event(client, relay_auth_url, private_key, event, event["id"], false, nil, false)
end
defp do_publish_event(
client,
relay_auth_url,
private_key,
event,
published_event_id,
authenticated?,
auth_event_id,
replayed_after_auth?
) do
receive do
{:relay_client, ^client, :frame, ["AUTH", challenge]} ->
auth_event = auth_event(relay_auth_url, challenge) |> sign_event!(private_key)
:ok = RelayClient.send_json(client, ["AUTH", auth_event])
do_publish_event(
client,
relay_auth_url,
private_key,
event,
published_event_id,
authenticated?,
auth_event["id"],
replayed_after_auth?
)
{:relay_client, ^client, :frame, ["OK", event_id, true, _message]}
when event_id == auth_event_id ->
:ok = RelayClient.send_json(client, ["EVENT", event])
do_publish_event(
client,
relay_auth_url,
private_key,
event,
published_event_id,
true,
nil,
true
)
{:relay_client, ^client, :frame, ["OK", event_id, false, message]}
when event_id == auth_event_id ->
{:error, {:auth_failed, message}}
{:relay_client, ^client, :frame, ["OK", event_id, true, _message]}
when event_id == published_event_id ->
:ok
{:relay_client, ^client, :frame, ["OK", event_id, false, message]}
when event_id == published_event_id ->
cond do
authenticated? and replayed_after_auth? and not auth_required_message?(message) ->
{:error, {:event_rejected, message}}
auth_required_message?(message) ->
do_publish_event(
client,
relay_auth_url,
private_key,
event,
published_event_id,
authenticated?,
auth_event_id,
replayed_after_auth?
)
true ->
{:error, {:event_rejected, message}}
end
{:relay_client, ^client, :frame, {:decode_error, reason, payload}} ->
{:error, {:decode_error, reason, payload}}
{:relay_client, ^client, :disconnected, status} ->
{:error, {:disconnected, status.reason}}
after
@frame_timeout_ms -> {:error, :publish_timeout}
end
end
defp await_client_connect(client) do
receive do
{:relay_client, ^client, :connected} -> :ok
{:relay_client, ^client, :disconnected, status} -> {:error, {:disconnected, status.reason}}
after
@frame_timeout_ms -> {:error, :connect_timeout}
end
end
defp management_call(config, node, method, params) do
url = node.http_url <> "/management"
auth_header =
nip98_event("POST", url)
|> sign_event!(config.admin_private_key)
|> then(&("Nostr " <> Base.encode64(JSON.encode!(&1))))
case Req.post(
url: url,
headers: [{"authorization", auth_header}],
json: %{"method" => method, "params" => params},
decode_body: false,
connect_options: [timeout: 2_000],
receive_timeout: 10_000
) do
{:ok, %{status: 200, body: body}} -> decode_management_response(body)
{:ok, %{status: status, body: body}} -> {:error, {:management_http_error, status, body}}
{:error, reason} -> {:error, reason}
end
end
defp decode_management_response(body) do
with {:ok, %{"ok" => true, "result" => result}} <- JSON.decode(body) do
{:ok, result}
else
{:ok, %{"ok" => false, "error" => error}} -> {:error, {:management_error, error}}
{:ok, other} -> {:error, {:unexpected_management_response, other}}
{:error, reason} -> {:error, {:invalid_management_json, reason}}
end
end
defp sign_event!(event, private_key_hex) do
{:ok, pubkey} = derive_pubkey(private_key_hex)
seckey = Base.decode16!(String.downcase(private_key_hex), case: :lower)
unsigned_event =
event |> Map.put("pubkey", pubkey) |> Map.put("sig", String.duplicate("0", 128))
id = Auth.compute_event_id(unsigned_event)
signature =
id
|> Base.decode16!(case: :lower)
|> Secp256k1.schnorr_sign(seckey)
|> Base.encode16(case: :lower)
unsigned_event |> Map.put("id", id) |> Map.put("sig", signature)
end
defp event_pubkeys do
@node_event_keys
|> Enum.map(fn {label, key} ->
with {:ok, pubkey} <- derive_pubkey(key), do: {label, pubkey}
end)
|> Enum.reduce_while({:ok, %{}}, fn
{label, pubkey}, {:ok, acc} -> {:cont, {:ok, Map.put(acc, label, pubkey)}}
{:error, reason}, _acc -> {:halt, {:error, reason}}
end)
end
defp derive_pubkey(private_key_hex) do
normalized = String.downcase(private_key_hex)
case Base.decode16(normalized, case: :lower) do
{:ok, <<_::256>> = seckey} ->
{:ok, seckey |> Secp256k1.pubkey(:xonly) |> Base.encode16(case: :lower)}
_other ->
{:error, {:invalid_private_key, private_key_hex}}
end
rescue
_error -> {:error, {:invalid_private_key, private_key_hex}}
end
defp nip98_event(method, url),
do: %{
"created_at" => System.system_time(:second),
"kind" => 27_235,
"tags" => [
["method", method],
["u", url],
["nonce", "#{System.unique_integer([:positive, :monotonic])}"]
],
"content" => ""
}
defp auth_event(relay_auth_url, challenge),
do: %{
"created_at" => System.system_time(:second),
"kind" => 22_242,
"tags" => [["challenge", challenge], ["relay", relay_auth_url]],
"content" => ""
}
defp auth_required_message?(message) when is_binary(message),
do: String.contains?(String.downcase(message), "auth")
defp auth_required_message?(_message), do: false
defp wait_until(label, timeout_ms, interval_ms, fun) do
started_at = System.monotonic_time(:millisecond)
do_wait_until(label, timeout_ms, interval_ms, started_at, fun)
end
defp do_wait_until(label, timeout_ms, interval_ms, started_at, fun) do
case fun.() do
{:ok, value} ->
{:ok, value}
{:retry, reason} ->
if System.monotonic_time(:millisecond) - started_at >= timeout_ms do
{:error, {:timeout, label, reason}}
else
Process.sleep(interval_ms)
do_wait_until(label, timeout_ms, interval_ms, started_at, fun)
end
end
end
defp unwrap_wait({:ok, _value}), do: :ok
defp unwrap_wait({:error, reason}), do: {:error, reason}
defp parse_int_env(name, default) do
case System.get_env(name) do
nil -> default
value -> String.to_integer(value)
end
end
defp default_run_id, do: "three-node-gap-#{System.system_time(:millisecond)}"
defp format_reason({:timeout, label, reason}),
do: "timeout waiting for #{label}: #{inspect(reason)}"
defp format_reason(reason), do: inspect(reason)
end
ThreeNodeGapE2E.Runner.main(System.argv())