35ae17cc05
Document and test SYNC-PAGE behavior, add the NIP-11 advertisement toggle, extend e2e coverage, apply small backfill cleanup fixes, and bump release metadata.
757 lines
24 KiB
Elixir
757 lines
24 KiB
Elixir
defmodule ThreeNodeGapE2E.RelayClient do
|
|
use WebSockex
|
|
|
|
def start_link(url, owner) do
|
|
WebSockex.start_link(url, __MODULE__, owner,
|
|
handle_initial_conn_failure: true,
|
|
async: true,
|
|
socket_connect_timeout: 2_000,
|
|
socket_recv_timeout: 2_000
|
|
)
|
|
end
|
|
|
|
def send_json(pid, payload), do: WebSockex.cast(pid, {:send_json, payload})
|
|
def close(pid), do: WebSockex.cast(pid, :close)
|
|
|
|
@impl true
|
|
def handle_connect(_conn, owner) do
|
|
send(owner, {:relay_client, self(), :connected})
|
|
{:ok, owner}
|
|
end
|
|
|
|
@impl true
|
|
def handle_frame({:text, payload}, owner) do
|
|
frame =
|
|
case JSON.decode(payload) do
|
|
{:ok, decoded} -> decoded
|
|
{:error, reason} -> {:decode_error, reason, payload}
|
|
end
|
|
|
|
send(owner, {:relay_client, self(), :frame, frame})
|
|
{:ok, owner}
|
|
end
|
|
|
|
def handle_frame(frame, owner) do
|
|
send(owner, {:relay_client, self(), :frame, frame})
|
|
{:ok, owner}
|
|
end
|
|
|
|
@impl true
|
|
def handle_cast({:send_json, payload}, owner) do
|
|
{:reply, {:text, JSON.encode!(payload)}, owner}
|
|
end
|
|
|
|
def handle_cast(:close, owner), do: {:close, owner}
|
|
|
|
@impl true
|
|
def handle_disconnect(status, owner) do
|
|
send(owner, {:relay_client, self(), :disconnected, status})
|
|
{:ok, owner}
|
|
end
|
|
end
|
|
|
|
defmodule ThreeNodeGapE2E.Runner do
|
|
alias Parrhesia.API.Auth
|
|
alias ThreeNodeGapE2E.RelayClient
|
|
|
|
@kind 5000
|
|
@resource "tribes.logs.entry"
|
|
@admin_private_key String.duplicate("1", 64)
|
|
@node_event_keys %{
|
|
"a" => String.duplicate("2", 64),
|
|
"b" => String.duplicate("3", 64),
|
|
"c" => String.duplicate("4", 64)
|
|
}
|
|
@frame_timeout_ms 20_000
|
|
|
|
def main(_argv) do
|
|
with {:ok, _apps} <- Application.ensure_all_started(:req),
|
|
{:ok, _apps} <- Application.ensure_all_started(:websockex),
|
|
{:ok, config} <- load_config(),
|
|
:ok <- run(config) do
|
|
IO.puts("three-node gap e2e completed")
|
|
else
|
|
{:error, reason} ->
|
|
IO.puts(:stderr, "three-node gap e2e failed: #{format_reason(reason)}")
|
|
System.halt(1)
|
|
end
|
|
end
|
|
|
|
defp load_config do
|
|
count = parse_int_env("PARRHESIA_THREE_NODE_EVENTS_PER_NODE", 6_000)
|
|
negentropy_timeout_ms = parse_int_env("PARRHESIA_THREE_NODE_NEGENTROPY_TIMEOUT_MS", 3_000)
|
|
backfill_page_size = parse_int_env("PARRHESIA_THREE_NODE_BACKFILL_PAGE_SIZE", 25)
|
|
sync_mode = System.get_env("PARRHESIA_THREE_NODE_SYNC_MODE", "req_stream")
|
|
run_id = System.get_env("PARRHESIA_THREE_NODE_RUN_ID", default_run_id())
|
|
|
|
with {:ok, node_a} <- load_node("A"),
|
|
{:ok, node_b} <- load_node("B"),
|
|
{:ok, node_c} <- load_node("C"),
|
|
{:ok, event_pubkeys} <- event_pubkeys() do
|
|
{:ok,
|
|
%{
|
|
run_id: run_id,
|
|
count: count,
|
|
resource: @resource,
|
|
filter: %{"kinds" => [@kind], "#r" => [@resource]},
|
|
admin_private_key: @admin_private_key,
|
|
node_event_keys: @node_event_keys,
|
|
admin_pubkey: elem(derive_pubkey(@admin_private_key), 1),
|
|
event_pubkeys: event_pubkeys,
|
|
negentropy_timeout_ms: negentropy_timeout_ms,
|
|
backfill_page_size: backfill_page_size,
|
|
sync_mode: sync_mode,
|
|
node_a: node_a,
|
|
node_b: node_b,
|
|
node_c: node_c
|
|
}}
|
|
end
|
|
end
|
|
|
|
defp run(config) do
|
|
IO.puts("scenario run_id=#{config.run_id} count=#{config.count}")
|
|
|
|
with :ok <- wait_for_health(config.node_a),
|
|
{:ok, a_node_pubkey} <- fetch_node_pubkey(config, config.node_a),
|
|
{:ok, a_relay_pubkey} <- fetch_relay_pubkey(config, config.node_a),
|
|
:ok <- ensure_sync_page_advertised(config.node_a, :node_a),
|
|
:ok <- ensure_client_access(config, config.node_a),
|
|
:ok <- publish_many(config, config.node_a, "a", config.count),
|
|
:ok <- wait_for_author_count(config, config.node_a, "a", config.count),
|
|
:ok <- maybe_start_node("B"),
|
|
:ok <- wait_for_health(config.node_b),
|
|
{:ok, b_node_pubkey} <- fetch_node_pubkey(config, config.node_b),
|
|
{:ok, b_relay_pubkey} <- fetch_relay_pubkey(config, config.node_b),
|
|
:ok <- ensure_sync_page_advertised(config.node_b, :node_b),
|
|
:ok <- ensure_client_access(config, config.node_b),
|
|
:ok <- publish_many(config, config.node_b, "b", config.count),
|
|
:ok <- wait_for_author_count(config, config.node_b, "b", config.count),
|
|
:ok <- grant_pair(config, config.node_a, b_node_pubkey),
|
|
:ok <- grant_pair(config, config.node_b, a_node_pubkey),
|
|
:ok <-
|
|
configure_sync(
|
|
config,
|
|
config.node_b,
|
|
"b-from-a",
|
|
config.node_a,
|
|
a_node_pubkey,
|
|
a_relay_pubkey
|
|
),
|
|
:ok <-
|
|
configure_sync(
|
|
config,
|
|
config.node_a,
|
|
"a-from-b",
|
|
config.node_b,
|
|
b_node_pubkey,
|
|
b_relay_pubkey
|
|
),
|
|
:ok <- wait_for_author_count(config, config.node_b, "a", config.count),
|
|
:ok <- wait_for_author_count(config, config.node_a, "b", config.count),
|
|
:ok <- stop_sync(config, config.node_a, "a-from-b"),
|
|
:ok <- stop_sync(config, config.node_b, "b-from-a"),
|
|
:ok <- maybe_stop_node("B"),
|
|
:ok <- maybe_start_node("C"),
|
|
:ok <- wait_for_health(config.node_c),
|
|
{:ok, c_node_pubkey} <- fetch_node_pubkey(config, config.node_c),
|
|
{:ok, c_relay_pubkey} <- fetch_relay_pubkey(config, config.node_c),
|
|
:ok <- ensure_sync_page_advertised(config.node_c, :node_c),
|
|
:ok <- ensure_client_access(config, config.node_c),
|
|
:ok <- publish_many(config, config.node_c, "c", config.count),
|
|
:ok <- wait_for_author_count(config, config.node_c, "c", config.count),
|
|
:ok <- grant_pair(config, config.node_a, c_node_pubkey),
|
|
:ok <- grant_pair(config, config.node_c, a_node_pubkey),
|
|
:ok <-
|
|
configure_sync(
|
|
config,
|
|
config.node_c,
|
|
"c-from-a",
|
|
config.node_a,
|
|
a_node_pubkey,
|
|
a_relay_pubkey
|
|
),
|
|
:ok <-
|
|
configure_sync(
|
|
config,
|
|
config.node_a,
|
|
"a-from-c",
|
|
config.node_c,
|
|
c_node_pubkey,
|
|
c_relay_pubkey
|
|
),
|
|
:ok <- wait_for_author_count(config, config.node_c, "a", config.count),
|
|
:ok <- wait_for_author_count(config, config.node_c, "b", config.count),
|
|
:ok <- wait_for_author_count(config, config.node_a, "c", config.count),
|
|
{:ok, summary} <- summarize(config) do
|
|
IO.puts("summary=#{JSON.encode!(summary)}")
|
|
:ok
|
|
end
|
|
end
|
|
|
|
defp maybe_start_node(suffix),
|
|
do: maybe_run_node_command("start", suffix, "PARRHESIA_THREE_NODE_START_#{suffix}_CMD")
|
|
|
|
defp maybe_stop_node(suffix),
|
|
do: maybe_run_node_command("stop", suffix, "PARRHESIA_THREE_NODE_STOP_#{suffix}_CMD")
|
|
|
|
defp maybe_run_node_command(action, suffix, env_name) do
|
|
case System.get_env(env_name) do
|
|
nil ->
|
|
:ok
|
|
|
|
"" ->
|
|
:ok
|
|
|
|
command ->
|
|
IO.puts("#{action} node #{suffix}: #{command}")
|
|
|
|
case System.cmd("sh", ["-lc", command], stderr_to_stdout: true) do
|
|
{output, 0} ->
|
|
if String.trim(output) != "", do: IO.write(output)
|
|
:ok
|
|
|
|
{output, status} ->
|
|
{:error, {:node_command_failed, action, suffix, status, output}}
|
|
end
|
|
end
|
|
end
|
|
|
|
defp load_node(suffix) do
|
|
http_url = System.fetch_env!("PARRHESIA_NODE_#{suffix}_HTTP_URL")
|
|
websocket_url = System.fetch_env!("PARRHESIA_NODE_#{suffix}_WS_URL")
|
|
relay_auth_url = System.get_env("PARRHESIA_NODE_#{suffix}_RELAY_AUTH_URL", websocket_url)
|
|
sync_url = System.get_env("PARRHESIA_NODE_#{suffix}_SYNC_URL", relay_auth_url)
|
|
|
|
{:ok,
|
|
%{
|
|
http_url: http_url,
|
|
websocket_url: websocket_url,
|
|
relay_auth_url: relay_auth_url,
|
|
sync_url: sync_url
|
|
}}
|
|
rescue
|
|
KeyError -> {:error, {:missing_node_env, suffix}}
|
|
end
|
|
|
|
defp wait_for_health(node) do
|
|
wait_until("health #{node.http_url}", 30_000, 250, fn ->
|
|
case Req.get(
|
|
url: node.http_url <> "/health",
|
|
decode_body: false,
|
|
retry: false,
|
|
connect_options: [timeout: 1_000],
|
|
receive_timeout: 1_000
|
|
) do
|
|
{:ok, %{status: 200, body: "ok"}} -> {:ok, :ready}
|
|
{:ok, %{status: status}} -> {:retry, {:status, status}}
|
|
{:error, reason} -> {:retry, reason}
|
|
end
|
|
end)
|
|
|> unwrap_wait()
|
|
end
|
|
|
|
defp ensure_client_access(config, node) do
|
|
clients = Map.put(config.event_pubkeys, "admin", config.admin_pubkey)
|
|
|
|
Enum.reduce_while(clients, :ok, fn {_label, pubkey}, :ok ->
|
|
case grant_pair(config, node, pubkey) do
|
|
:ok -> {:cont, :ok}
|
|
{:error, reason} -> {:halt, {:error, reason}}
|
|
end
|
|
end)
|
|
end
|
|
|
|
defp grant_pair(config, node, principal) do
|
|
with :ok <- ensure_acl(config, node, principal, "sync_read", config.filter),
|
|
:ok <- ensure_acl(config, node, principal, "sync_write", config.filter) do
|
|
:ok
|
|
end
|
|
end
|
|
|
|
defp ensure_acl(config, node, principal, capability, match) do
|
|
params = %{
|
|
"principal_type" => "pubkey",
|
|
"principal" => principal,
|
|
"capability" => capability,
|
|
"match" => match
|
|
}
|
|
|
|
case management_call(config, node, "acl_grant", params) do
|
|
{:ok, %{"ok" => true}} -> :ok
|
|
{:ok, other} -> {:error, {:unexpected_acl_result, other}}
|
|
{:error, reason} -> {:error, {:acl_grant_failed, capability, principal, reason}}
|
|
end
|
|
end
|
|
|
|
defp configure_sync(
|
|
config,
|
|
target_node,
|
|
server_id,
|
|
source_node,
|
|
source_node_pubkey,
|
|
source_relay_pubkey
|
|
) do
|
|
params = %{
|
|
"id" => server_id,
|
|
"url" => source_node.sync_url,
|
|
"enabled?" => true,
|
|
"auth_pubkey" => source_node_pubkey,
|
|
"relay_pubkey" => source_relay_pubkey,
|
|
"filters" => [config.filter],
|
|
"mode" => config.sync_mode,
|
|
"metadata" => %{
|
|
"negentropy_timeout_ms" => config.negentropy_timeout_ms,
|
|
"backfill_page_size" => config.backfill_page_size
|
|
},
|
|
"tls" => %{"mode" => "disabled", "pins" => []}
|
|
}
|
|
|
|
with {:ok, _server} <- management_call(config, target_node, "sync_put_server", params),
|
|
{:ok, %{"ok" => true}} <-
|
|
management_call(config, target_node, "sync_start_server", %{"id" => server_id}),
|
|
:ok <- wait_for_sync_connected(config, target_node, server_id) do
|
|
:ok
|
|
end
|
|
end
|
|
|
|
defp stop_sync(config, node, server_id) do
|
|
case management_call(config, node, "sync_stop_server", %{"id" => server_id}) do
|
|
{:ok, %{"ok" => true}} -> :ok
|
|
{:ok, _other} -> :ok
|
|
{:error, _reason} -> :ok
|
|
end
|
|
end
|
|
|
|
defp wait_for_sync_connected(config, node, server_id) do
|
|
wait_until("sync connected #{server_id}", 30_000, 500, fn ->
|
|
case management_call(config, node, "sync_server_stats", %{"id" => server_id}) do
|
|
{:ok, %{"connected" => true, "query_runs" => query_runs} = stats} when query_runs >= 1 ->
|
|
{:ok, stats}
|
|
|
|
{:ok, stats} ->
|
|
{:retry, stats}
|
|
|
|
{:error, reason} ->
|
|
{:retry, reason}
|
|
end
|
|
end)
|
|
|> unwrap_wait()
|
|
end
|
|
|
|
defp publish_many(config, node, label, count) do
|
|
private_key = Map.fetch!(config.node_event_keys, label)
|
|
created_at = System.system_time(:second)
|
|
IO.puts("publish #{count} events for #{label} to #{node.http_url}")
|
|
|
|
with {:ok, client} <- RelayClient.start_link(node.websocket_url, self()),
|
|
:ok <- await_client_connect(client) do
|
|
try do
|
|
1..count
|
|
|> Enum.reduce_while(:ok, fn index, :ok ->
|
|
if rem(index, 1_000) == 0, do: IO.puts("published #{label} #{index}/#{count}")
|
|
|
|
event =
|
|
%{
|
|
"created_at" => created_at,
|
|
"kind" => @kind,
|
|
"tags" => [
|
|
["r", config.resource],
|
|
["t", "three-node-gap-e2e"],
|
|
["run", config.run_id],
|
|
["node", label]
|
|
],
|
|
"content" => "#{config.run_id}:#{label}:#{index}"
|
|
}
|
|
|> sign_event!(private_key)
|
|
|
|
case publish_event(client, node.relay_auth_url, private_key, event) do
|
|
:ok -> {:cont, :ok}
|
|
{:error, reason} -> {:halt, {:error, {:publish_many_failed, label, index, reason}}}
|
|
end
|
|
end)
|
|
after
|
|
RelayClient.close(client)
|
|
end
|
|
end
|
|
end
|
|
|
|
defp wait_for_author_count(config, node, label, expected) do
|
|
author = Map.fetch!(config.event_pubkeys, label)
|
|
|
|
wait_until("#{node.http_url} author #{label} count #{expected}", 120_000, 1_000, fn ->
|
|
case query_count(config, node, author, expected + 100) do
|
|
{:ok, count} when count >= expected ->
|
|
IO.puts("#{node.http_url} has #{count} events from #{label}")
|
|
{:ok, count}
|
|
|
|
{:ok, count} ->
|
|
IO.puts("#{node.http_url} has #{count}/#{expected} events from #{label}")
|
|
{:retry, count}
|
|
|
|
{:error, reason} ->
|
|
{:retry, reason}
|
|
end
|
|
end)
|
|
|> unwrap_wait()
|
|
end
|
|
|
|
defp query_count(config, node, author, _limit) do
|
|
filter = config.filter |> Map.put("authors", [author]) |> Map.delete("limit")
|
|
|
|
with {:ok, client} <- RelayClient.start_link(node.websocket_url, self()),
|
|
:ok <- await_client_connect(client) do
|
|
subscription_id = "three-node-gap-count-#{System.unique_integer([:positive, :monotonic])}"
|
|
|
|
try do
|
|
with :ok <-
|
|
authenticate_connection(
|
|
config_auth_resource(filter),
|
|
client,
|
|
node.relay_auth_url,
|
|
config.admin_private_key
|
|
) do
|
|
:ok = RelayClient.send_json(client, ["COUNT", subscription_id, filter])
|
|
receive_count(client, subscription_id)
|
|
end
|
|
after
|
|
RelayClient.close(client)
|
|
end
|
|
end
|
|
end
|
|
|
|
defp receive_count(client, subscription_id) do
|
|
receive do
|
|
{:relay_client, ^client, :frame, ["COUNT", ^subscription_id, %{"count" => count}]}
|
|
when is_integer(count) ->
|
|
{:ok, count}
|
|
|
|
{:relay_client, ^client, :frame, ["CLOSED", ^subscription_id, message]} ->
|
|
{:error, {:count_closed, message}}
|
|
|
|
{:relay_client, ^client, :frame, {:decode_error, reason, payload}} ->
|
|
{:error, {:decode_error, reason, payload}}
|
|
|
|
{:relay_client, ^client, :disconnected, status} ->
|
|
{:error, {:disconnected, status.reason}}
|
|
after
|
|
@frame_timeout_ms -> {:error, :count_timeout}
|
|
end
|
|
end
|
|
|
|
defp summarize(config) do
|
|
nodes = [{"a", config.node_a}, {"c", config.node_c}]
|
|
|
|
summary =
|
|
Map.new(nodes, fn {node_label, node} ->
|
|
counts =
|
|
Map.new(["a", "b", "c"], fn author_label ->
|
|
author = Map.fetch!(config.event_pubkeys, author_label)
|
|
|
|
value =
|
|
case query_count(config, node, author, config.count * 3 + 100) do
|
|
{:ok, count} -> count
|
|
{:error, reason} -> %{"error" => inspect(reason)}
|
|
end
|
|
|
|
{author_label, value}
|
|
end)
|
|
|
|
{node_label, counts}
|
|
end)
|
|
|
|
{:ok, summary}
|
|
end
|
|
|
|
defp fetch_node_pubkey(config, node), do: fetch_identity_pubkey(config, node, "node")
|
|
defp fetch_relay_pubkey(config, node), do: fetch_identity_pubkey(config, node, "relay")
|
|
|
|
defp ensure_sync_page_advertised(node, label) do
|
|
relay_info_url = node.http_url <> "/relay"
|
|
|
|
case Req.get(
|
|
url: relay_info_url,
|
|
headers: [{"accept", "application/nostr+json"}],
|
|
decode_body: false,
|
|
retry: false,
|
|
connect_options: [timeout: 1_000],
|
|
receive_timeout: 1_000
|
|
) do
|
|
{:ok, %{status: 200, body: body}} ->
|
|
case JSON.decode(body) do
|
|
{:ok, %{"limitation" => %{"parrhesia_sync_page" => true}}} ->
|
|
:ok
|
|
|
|
{:ok, document} ->
|
|
{:error, {label, :sync_page_not_advertised, document}}
|
|
|
|
{:error, reason} ->
|
|
{:error, {label, :relay_info_decode_failed, reason}}
|
|
end
|
|
|
|
{:ok, %{status: status}} ->
|
|
{:error, {label, :relay_info_request_failed, status}}
|
|
|
|
{:error, reason} ->
|
|
{:error, {label, :relay_info_request_failed, reason}}
|
|
end
|
|
end
|
|
|
|
defp fetch_identity_pubkey(config, node, role) do
|
|
case management_call(config, node, "identity_get", %{"role" => role}) do
|
|
{:ok, %{"pubkey" => pubkey}} when is_binary(pubkey) -> {:ok, String.downcase(pubkey)}
|
|
{:ok, other} -> {:error, {:unexpected_identity_payload, other}}
|
|
{:error, reason} -> {:error, {:identity_get_failed, role, reason}}
|
|
end
|
|
end
|
|
|
|
defp authenticate_connection(resource, client, relay_auth_url, private_key) do
|
|
event =
|
|
%{
|
|
"created_at" => System.system_time(:second),
|
|
"kind" => @kind,
|
|
"tags" => [["r", resource], ["t", "three-node-gap-e2e-auth"]],
|
|
"content" => "auth:#{System.unique_integer([:positive, :monotonic])}"
|
|
}
|
|
|> sign_event!(private_key)
|
|
|
|
publish_event(client, relay_auth_url, private_key, event)
|
|
end
|
|
|
|
defp config_auth_resource(%{"#r" => [resource | _rest]}) when is_binary(resource), do: resource
|
|
defp config_auth_resource(_filter), do: @resource
|
|
|
|
defp publish_event(client, relay_auth_url, private_key, event) do
|
|
:ok = RelayClient.send_json(client, ["EVENT", event])
|
|
do_publish_event(client, relay_auth_url, private_key, event, event["id"], false, nil, false)
|
|
end
|
|
|
|
defp do_publish_event(
|
|
client,
|
|
relay_auth_url,
|
|
private_key,
|
|
event,
|
|
published_event_id,
|
|
authenticated?,
|
|
auth_event_id,
|
|
replayed_after_auth?
|
|
) do
|
|
receive do
|
|
{:relay_client, ^client, :frame, ["AUTH", challenge]} ->
|
|
auth_event = auth_event(relay_auth_url, challenge) |> sign_event!(private_key)
|
|
:ok = RelayClient.send_json(client, ["AUTH", auth_event])
|
|
|
|
do_publish_event(
|
|
client,
|
|
relay_auth_url,
|
|
private_key,
|
|
event,
|
|
published_event_id,
|
|
authenticated?,
|
|
auth_event["id"],
|
|
replayed_after_auth?
|
|
)
|
|
|
|
{:relay_client, ^client, :frame, ["OK", event_id, true, _message]}
|
|
when event_id == auth_event_id ->
|
|
:ok = RelayClient.send_json(client, ["EVENT", event])
|
|
|
|
do_publish_event(
|
|
client,
|
|
relay_auth_url,
|
|
private_key,
|
|
event,
|
|
published_event_id,
|
|
true,
|
|
nil,
|
|
true
|
|
)
|
|
|
|
{:relay_client, ^client, :frame, ["OK", event_id, false, message]}
|
|
when event_id == auth_event_id ->
|
|
{:error, {:auth_failed, message}}
|
|
|
|
{:relay_client, ^client, :frame, ["OK", event_id, true, _message]}
|
|
when event_id == published_event_id ->
|
|
:ok
|
|
|
|
{:relay_client, ^client, :frame, ["OK", event_id, false, message]}
|
|
when event_id == published_event_id ->
|
|
cond do
|
|
authenticated? and replayed_after_auth? and not auth_required_message?(message) ->
|
|
{:error, {:event_rejected, message}}
|
|
|
|
auth_required_message?(message) ->
|
|
do_publish_event(
|
|
client,
|
|
relay_auth_url,
|
|
private_key,
|
|
event,
|
|
published_event_id,
|
|
authenticated?,
|
|
auth_event_id,
|
|
replayed_after_auth?
|
|
)
|
|
|
|
true ->
|
|
{:error, {:event_rejected, message}}
|
|
end
|
|
|
|
{:relay_client, ^client, :frame, {:decode_error, reason, payload}} ->
|
|
{:error, {:decode_error, reason, payload}}
|
|
|
|
{:relay_client, ^client, :disconnected, status} ->
|
|
{:error, {:disconnected, status.reason}}
|
|
after
|
|
@frame_timeout_ms -> {:error, :publish_timeout}
|
|
end
|
|
end
|
|
|
|
defp await_client_connect(client) do
|
|
receive do
|
|
{:relay_client, ^client, :connected} -> :ok
|
|
{:relay_client, ^client, :disconnected, status} -> {:error, {:disconnected, status.reason}}
|
|
after
|
|
@frame_timeout_ms -> {:error, :connect_timeout}
|
|
end
|
|
end
|
|
|
|
defp management_call(config, node, method, params) do
|
|
url = node.http_url <> "/management"
|
|
|
|
auth_header =
|
|
nip98_event("POST", url)
|
|
|> sign_event!(config.admin_private_key)
|
|
|> then(&("Nostr " <> Base.encode64(JSON.encode!(&1))))
|
|
|
|
case Req.post(
|
|
url: url,
|
|
headers: [{"authorization", auth_header}],
|
|
json: %{"method" => method, "params" => params},
|
|
decode_body: false,
|
|
connect_options: [timeout: 2_000],
|
|
receive_timeout: 10_000
|
|
) do
|
|
{:ok, %{status: 200, body: body}} -> decode_management_response(body)
|
|
{:ok, %{status: status, body: body}} -> {:error, {:management_http_error, status, body}}
|
|
{:error, reason} -> {:error, reason}
|
|
end
|
|
end
|
|
|
|
defp decode_management_response(body) do
|
|
with {:ok, %{"ok" => true, "result" => result}} <- JSON.decode(body) do
|
|
{:ok, result}
|
|
else
|
|
{:ok, %{"ok" => false, "error" => error}} -> {:error, {:management_error, error}}
|
|
{:ok, other} -> {:error, {:unexpected_management_response, other}}
|
|
{:error, reason} -> {:error, {:invalid_management_json, reason}}
|
|
end
|
|
end
|
|
|
|
defp sign_event!(event, private_key_hex) do
|
|
{:ok, pubkey} = derive_pubkey(private_key_hex)
|
|
seckey = Base.decode16!(String.downcase(private_key_hex), case: :lower)
|
|
|
|
unsigned_event =
|
|
event |> Map.put("pubkey", pubkey) |> Map.put("sig", String.duplicate("0", 128))
|
|
|
|
id = Auth.compute_event_id(unsigned_event)
|
|
|
|
signature =
|
|
id
|
|
|> Base.decode16!(case: :lower)
|
|
|> Secp256k1.schnorr_sign(seckey)
|
|
|> Base.encode16(case: :lower)
|
|
|
|
unsigned_event |> Map.put("id", id) |> Map.put("sig", signature)
|
|
end
|
|
|
|
defp event_pubkeys do
|
|
@node_event_keys
|
|
|> Enum.map(fn {label, key} ->
|
|
with {:ok, pubkey} <- derive_pubkey(key), do: {label, pubkey}
|
|
end)
|
|
|> Enum.reduce_while({:ok, %{}}, fn
|
|
{label, pubkey}, {:ok, acc} -> {:cont, {:ok, Map.put(acc, label, pubkey)}}
|
|
{:error, reason}, _acc -> {:halt, {:error, reason}}
|
|
end)
|
|
end
|
|
|
|
defp derive_pubkey(private_key_hex) do
|
|
normalized = String.downcase(private_key_hex)
|
|
|
|
case Base.decode16(normalized, case: :lower) do
|
|
{:ok, <<_::256>> = seckey} ->
|
|
{:ok, seckey |> Secp256k1.pubkey(:xonly) |> Base.encode16(case: :lower)}
|
|
|
|
_other ->
|
|
{:error, {:invalid_private_key, private_key_hex}}
|
|
end
|
|
rescue
|
|
_error -> {:error, {:invalid_private_key, private_key_hex}}
|
|
end
|
|
|
|
defp nip98_event(method, url),
|
|
do: %{
|
|
"created_at" => System.system_time(:second),
|
|
"kind" => 27_235,
|
|
"tags" => [
|
|
["method", method],
|
|
["u", url],
|
|
["nonce", "#{System.unique_integer([:positive, :monotonic])}"]
|
|
],
|
|
"content" => ""
|
|
}
|
|
|
|
defp auth_event(relay_auth_url, challenge),
|
|
do: %{
|
|
"created_at" => System.system_time(:second),
|
|
"kind" => 22_242,
|
|
"tags" => [["challenge", challenge], ["relay", relay_auth_url]],
|
|
"content" => ""
|
|
}
|
|
|
|
defp auth_required_message?(message) when is_binary(message),
|
|
do: String.contains?(String.downcase(message), "auth")
|
|
|
|
defp auth_required_message?(_message), do: false
|
|
|
|
defp wait_until(label, timeout_ms, interval_ms, fun) do
|
|
started_at = System.monotonic_time(:millisecond)
|
|
do_wait_until(label, timeout_ms, interval_ms, started_at, fun)
|
|
end
|
|
|
|
defp do_wait_until(label, timeout_ms, interval_ms, started_at, fun) do
|
|
case fun.() do
|
|
{:ok, value} ->
|
|
{:ok, value}
|
|
|
|
{:retry, reason} ->
|
|
if System.monotonic_time(:millisecond) - started_at >= timeout_ms do
|
|
{:error, {:timeout, label, reason}}
|
|
else
|
|
Process.sleep(interval_ms)
|
|
do_wait_until(label, timeout_ms, interval_ms, started_at, fun)
|
|
end
|
|
end
|
|
end
|
|
|
|
defp unwrap_wait({:ok, _value}), do: :ok
|
|
defp unwrap_wait({:error, reason}), do: {:error, reason}
|
|
|
|
defp parse_int_env(name, default) do
|
|
case System.get_env(name) do
|
|
nil -> default
|
|
value -> String.to_integer(value)
|
|
end
|
|
end
|
|
|
|
defp default_run_id, do: "three-node-gap-#{System.system_time(:millisecond)}"
|
|
|
|
defp format_reason({:timeout, label, reason}),
|
|
do: "timeout waiting for #{label}: #{inspect(reason)}"
|
|
|
|
defp format_reason(reason), do: inspect(reason)
|
|
end
|
|
|
|
ThreeNodeGapE2E.Runner.main(System.argv())
|