diff --git a/compose.node-sync-e2e.yaml b/compose.node-sync-e2e.yaml new file mode 100644 index 0000000..c5b34e8 --- /dev/null +++ b/compose.node-sync-e2e.yaml @@ -0,0 +1,92 @@ +services: + db-a: + image: postgres:17 + restart: unless-stopped + environment: + POSTGRES_DB: parrhesia_a + POSTGRES_USER: parrhesia + POSTGRES_PASSWORD: parrhesia + healthcheck: + test: ["CMD-SHELL", "pg_isready -U $$POSTGRES_USER -d $$POSTGRES_DB"] + interval: 5s + timeout: 5s + retries: 12 + volumes: + - postgres-a-data:/var/lib/postgresql/data + + db-b: + image: postgres:17 + restart: unless-stopped + environment: + POSTGRES_DB: parrhesia_b + POSTGRES_USER: parrhesia + POSTGRES_PASSWORD: parrhesia + healthcheck: + test: ["CMD-SHELL", "pg_isready -U $$POSTGRES_USER -d $$POSTGRES_DB"] + interval: 5s + timeout: 5s + retries: 12 + volumes: + - postgres-b-data:/var/lib/postgresql/data + + migrate-a: + image: ${PARRHESIA_IMAGE:-parrhesia:latest} + restart: "no" + depends_on: + db-a: + condition: service_healthy + environment: + DATABASE_URL: ecto://parrhesia:parrhesia@db-a:5432/parrhesia_a + POOL_SIZE: ${POOL_SIZE:-20} + PARRHESIA_ACL_PROTECTED_FILTERS: ${PARRHESIA_ACL_PROTECTED_FILTERS} + command: ["eval", "Parrhesia.Release.migrate()"] + + migrate-b: + image: ${PARRHESIA_IMAGE:-parrhesia:latest} + restart: "no" + depends_on: + db-b: + condition: service_healthy + environment: + DATABASE_URL: ecto://parrhesia:parrhesia@db-b:5432/parrhesia_b + POOL_SIZE: ${POOL_SIZE:-20} + PARRHESIA_ACL_PROTECTED_FILTERS: ${PARRHESIA_ACL_PROTECTED_FILTERS} + command: ["eval", "Parrhesia.Release.migrate()"] + + parrhesia-a: + image: ${PARRHESIA_IMAGE:-parrhesia:latest} + restart: unless-stopped + depends_on: + db-a: + condition: service_healthy + environment: + DATABASE_URL: ecto://parrhesia:parrhesia@db-a:5432/parrhesia_a + POOL_SIZE: ${POOL_SIZE:-20} + PORT: 4413 + PARRHESIA_RELAY_URL: ${PARRHESIA_NODE_A_RELAY_URL:-ws://parrhesia-a:4413/relay} + PARRHESIA_ACL_PROTECTED_FILTERS: ${PARRHESIA_ACL_PROTECTED_FILTERS} + PARRHESIA_IDENTITY_PATH: /tmp/parrhesia-a/server_identity.json + PARRHESIA_SYNC_PATH: /tmp/parrhesia-a/sync_servers.json + ports: + - "${PARRHESIA_NODE_A_HOST_PORT:-45131}:4413" + + parrhesia-b: + image: ${PARRHESIA_IMAGE:-parrhesia:latest} + restart: unless-stopped + depends_on: + db-b: + condition: service_healthy + environment: + DATABASE_URL: ecto://parrhesia:parrhesia@db-b:5432/parrhesia_b + POOL_SIZE: ${POOL_SIZE:-20} + PORT: 4413 + PARRHESIA_RELAY_URL: ${PARRHESIA_NODE_B_RELAY_URL:-ws://parrhesia-b:4413/relay} + PARRHESIA_ACL_PROTECTED_FILTERS: ${PARRHESIA_ACL_PROTECTED_FILTERS} + PARRHESIA_IDENTITY_PATH: /tmp/parrhesia-b/server_identity.json + PARRHESIA_SYNC_PATH: /tmp/parrhesia-b/sync_servers.json + ports: + - "${PARRHESIA_NODE_B_HOST_PORT:-45132}:4413" + +volumes: + postgres-a-data: + postgres-b-data: diff --git a/config/runtime.exs b/config/runtime.exs index 52e32e9..aafeca5 100644 --- a/config/runtime.exs +++ b/config/runtime.exs @@ -48,6 +48,25 @@ csv_env = fn name, default -> end end +json_env = fn name, default -> + case System.get_env(name) do + nil -> + default + + "" -> + default + + value -> + case JSON.decode(value) do + {:ok, decoded} -> + decoded + + {:error, reason} -> + raise "environment variable #{name} must contain valid JSON: #{inspect(reason)}" + end + end +end + infinity_or_int_env = fn name, default -> case System.get_env(name) do nil -> @@ -124,6 +143,7 @@ if config_env() == :prod do listeners_defaults = Application.get_env(:parrhesia, :listeners, %{}) retention_defaults = Application.get_env(:parrhesia, :retention, []) features_defaults = Application.get_env(:parrhesia, :features, []) + acl_defaults = Application.get_env(:parrhesia, :acl, []) default_pool_size = Keyword.get(repo_defaults, :pool_size, 32) default_queue_target = Keyword.get(repo_defaults, :queue_target, 1_000) @@ -568,6 +588,13 @@ if config_env() == :prod do config :parrhesia, relay_url: string_env.("PARRHESIA_RELAY_URL", relay_url_default), + acl: [ + protected_filters: + json_env.( + "PARRHESIA_ACL_PROTECTED_FILTERS", + Keyword.get(acl_defaults, :protected_filters, []) + ) + ], identity: [ path: string_env.("PARRHESIA_IDENTITY_PATH", nil), private_key: string_env.("PARRHESIA_IDENTITY_PRIVATE_KEY", nil) diff --git a/default.nix b/default.nix index f3a8a71..ff1ae35 100644 --- a/default.nix +++ b/default.nix @@ -10,7 +10,7 @@ vips, }: let pname = "parrhesia"; - version = "0.4.0"; + version = "0.5.0"; beamPackages = beam.packages.erlang_28.extend ( final: _prev: { @@ -48,7 +48,7 @@ beamPackages.fetchMixDeps { pname = "${pname}-mix-deps"; inherit version src; - hash = "sha256-I09Q2PG22lOrZjjXoq8Py3P3o5dgaz9LhKJSmP+/r6k="; + hash = "sha256-D69wuFnIChQzm1PmpIW+X/1sPpsIcDHe4V5fKmFeJ3k="; } else null; diff --git a/flake.nix b/flake.nix index 56733ae..132e36c 100644 --- a/flake.nix +++ b/flake.nix @@ -45,7 +45,7 @@ config = { Entrypoint = ["${parrhesia}/bin/parrhesia"]; - Cmd = ["foreground"]; + Cmd = ["start"]; ExposedPorts = { "4413/tcp" = {}; }; diff --git a/mix.exs b/mix.exs index cdb92c3..439d27e 100644 --- a/mix.exs +++ b/mix.exs @@ -4,7 +4,7 @@ defmodule Parrhesia.MixProject do def project do [ app: :parrhesia, - version: "0.4.0", + version: "0.5.0", elixir: "~> 1.18", start_permanent: Mix.env() == :prod, deps: deps(), @@ -36,6 +36,7 @@ defmodule Parrhesia.MixProject do # Runtime: storage adapter (Postgres first) {:ecto_sql, "~> 3.12"}, {:postgrex, ">= 0.0.0"}, + {:req, "~> 0.5"}, # Runtime: telemetry + prometheus exporter (/metrics) {:telemetry_metrics, "~> 1.0"}, diff --git a/scripts/node_sync_e2e.exs b/scripts/node_sync_e2e.exs new file mode 100644 index 0000000..d2ecb80 --- /dev/null +++ b/scripts/node_sync_e2e.exs @@ -0,0 +1,847 @@ +defmodule NodeSyncE2E.RelayClient do + use WebSockex + + def start_link(url, owner, opts \\ []) do + WebSockex.start_link( + url, + __MODULE__, + owner, + Keyword.put(opts, :handle_initial_conn_failure, true) + ) + end + + def send_json(pid, payload) do + WebSockex.cast(pid, {:send_json, payload}) + end + + def close(pid) do + WebSockex.cast(pid, :close) + end + + @impl true + def handle_connect(_conn, owner) do + send(owner, {:node_sync_e2e_relay_client, self(), :connected}) + {:ok, owner} + end + + @impl true + def handle_frame({:text, payload}, owner) do + frame = + case JSON.decode(payload) do + {:ok, decoded} -> decoded + {:error, reason} -> {:decode_error, reason, payload} + end + + send(owner, {:node_sync_e2e_relay_client, self(), :frame, frame}) + {:ok, owner} + end + + def handle_frame(frame, owner) do + send(owner, {:node_sync_e2e_relay_client, self(), :frame, frame}) + {:ok, owner} + end + + @impl true + def handle_cast({:send_json, payload}, owner) do + {:reply, {:text, JSON.encode!(payload)}, owner} + end + + def handle_cast(:close, owner) do + {:close, owner} + end + + @impl true + def handle_disconnect(status, owner) do + send(owner, {:node_sync_e2e_relay_client, self(), :disconnected, status}) + {:ok, owner} + end +end + +defmodule NodeSyncE2E.Runner do + alias NodeSyncE2E.RelayClient + alias Parrhesia.API.Auth + + @kind 5000 + @subsystem_tag "node-sync-e2e" + @default_resource "tribes.accounts.user" + @default_server_id "node-a-upstream" + @default_admin_private_key String.duplicate("1", 64) + @default_client_private_key String.duplicate("2", 64) + @frame_timeout_ms 5_000 + + def main(argv) do + with {:ok, _apps} <- Application.ensure_all_started(:req), + {:ok, _apps} <- Application.ensure_all_started(:websockex), + {:ok, command, opts} <- parse_args(argv), + {:ok, config} <- load_config(), + :ok <- dispatch(command, config, opts) do + IO.puts("node-sync-e2e #{command} completed") + else + {:error, reason} -> + IO.puts(:stderr, "node-sync-e2e failed: #{format_reason(reason)}") + System.halt(1) + end + end + + defp parse_args(argv) do + {opts, rest, invalid} = OptionParser.parse(argv, strict: [state_file: :string]) + + cond do + invalid != [] -> + {:error, {:invalid_arguments, invalid}} + + length(rest) != 1 -> + {:error, :missing_command} + + true -> + {:ok, hd(rest), opts} + end + end + + defp dispatch("bootstrap", config, opts) do + with {:ok, state_file} <- fetch_state_file(opts), + :ok <- ensure_nodes_ready(config), + {:ok, node_a_pubkey} <- fetch_node_pubkey(config, config.node_a), + {:ok, node_b_pubkey} <- fetch_node_pubkey(config, config.node_b), + :ok <- ensure_identity_matches(config.node_a, node_a_pubkey, :node_a), + :ok <- ensure_identity_matches(config.node_b, node_b_pubkey, :node_b), + :ok <- ensure_acl(config, config.node_a, node_b_pubkey, "sync_read", config.filter), + :ok <- + ensure_acl(config, config.node_a, config.client_pubkey, "sync_write", config.filter), + :ok <- ensure_acl(config, config.node_b, node_a_pubkey, "sync_write", config.filter), + :ok <- + ensure_acl(config, config.node_b, config.client_pubkey, "sync_read", config.filter), + {:ok, catchup_event} <- publish_phase_event(config, config.node_a, "catchup"), + :ok <- configure_sync(config, node_a_pubkey), + :ok <- wait_for_sync_connected(config, config.node_b, config.server_id), + :ok <- wait_for_event(config, config.node_b, catchup_event["id"]), + {:ok, live_event} <- publish_phase_event(config, config.node_a, "live"), + :ok <- wait_for_event(config, config.node_b, live_event["id"]), + {:ok, stats} <- fetch_sync_server_stats(config, config.node_b, config.server_id), + :ok <- ensure_minimum_counter(stats, "events_accepted", 2), + :ok <- + save_state(state_file, %{ + "run_id" => config.run_id, + "resource" => config.resource, + "server_id" => config.server_id, + "node_a_pubkey" => node_a_pubkey, + "node_b_pubkey" => node_b_pubkey, + "catchup_event_id" => catchup_event["id"], + "live_event_id" => live_event["id"] + }) do + :ok + end + end + + defp dispatch("publish-resume", config, opts) do + with {:ok, state_file} <- fetch_state_file(opts), + :ok <- ensure_run_matches(config, load_state(state_file)), + {:ok, resume_event} <- publish_phase_event(config, config.node_a, "resume"), + :ok <- + save_state(state_file, %{ + "run_id" => config.run_id, + "resource" => config.resource, + "server_id" => config.server_id, + "resume_event_id" => resume_event["id"] + }) do + :ok + end + end + + defp dispatch("verify-resume", config, opts) do + with {:ok, state_file} <- fetch_state_file(opts), + state = load_state(state_file), + :ok <- ensure_run_matches(config, state), + {:ok, resume_event_id} <- fetch_state_value(state, "resume_event_id"), + :ok <- ensure_nodes_ready(config), + :ok <- wait_for_sync_connected(config, config.node_b, config.server_id), + :ok <- wait_for_event(config, config.node_b, resume_event_id), + {:ok, stats} <- fetch_sync_server_stats(config, config.node_b, config.server_id), + :ok <- ensure_minimum_counter(stats, "events_accepted", 3), + :ok <- ensure_minimum_counter(stats, "query_runs", 2) do + :ok + end + end + + defp dispatch(other, _config, _opts), do: {:error, {:unknown_command, other}} + + defp fetch_state_file(opts) do + case Keyword.get(opts, :state_file) do + nil -> {:error, :missing_state_file} + path -> {:ok, path} + end + end + + defp load_config do + resource = System.get_env("PARRHESIA_NODE_SYNC_E2E_RESOURCE", @default_resource) + + admin_private_key = + System.get_env("PARRHESIA_NODE_SYNC_E2E_ADMIN_PRIVATE_KEY", @default_admin_private_key) + + client_private_key = + System.get_env("PARRHESIA_NODE_SYNC_E2E_CLIENT_PRIVATE_KEY", @default_client_private_key) + + with {:ok, node_a} <- load_node("A"), + {:ok, node_b} <- load_node("B"), + {:ok, client_pubkey} <- derive_pubkey(client_private_key) do + {:ok, + %{ + run_id: System.get_env("PARRHESIA_NODE_SYNC_E2E_RUN_ID", default_run_id()), + resource: resource, + filter: %{"kinds" => [@kind], "#r" => [resource]}, + admin_private_key: admin_private_key, + client_private_key: client_private_key, + client_pubkey: client_pubkey, + server_id: System.get_env("PARRHESIA_NODE_SYNC_E2E_SERVER_ID", @default_server_id), + node_a: node_a, + node_b: node_b + }} + end + end + + defp load_node(suffix) do + http_url = + System.get_env("PARRHESIA_NODE_#{suffix}_HTTP_URL") || + System.get_env("PARRHESIA_NODE_#{suffix}_MANAGEMENT_BASE_URL") + + websocket_url = System.get_env("PARRHESIA_NODE_#{suffix}_WS_URL") + relay_auth_url = System.get_env("PARRHESIA_NODE_#{suffix}_RELAY_AUTH_URL", websocket_url) + sync_url = System.get_env("PARRHESIA_NODE_#{suffix}_SYNC_URL", relay_auth_url) + + cond do + is_nil(http_url) or http_url == "" -> + {:error, {:missing_env, "PARRHESIA_NODE_#{suffix}_HTTP_URL"}} + + is_nil(websocket_url) or websocket_url == "" -> + {:error, {:missing_env, "PARRHESIA_NODE_#{suffix}_WS_URL"}} + + true -> + {:ok, + %{ + http_url: http_url, + websocket_url: websocket_url, + relay_auth_url: relay_auth_url, + sync_url: sync_url + }} + end + end + + defp ensure_nodes_ready(config) do + with :ok <- wait_for_health(config.node_a), + :ok <- wait_for_health(config.node_b) do + :ok + end + end + + defp wait_for_health(node) do + case wait_until("node health #{node.http_url}", 15_000, 250, fn -> + health_url = node.http_url <> "/health" + + case Req.get( + url: health_url, + decode_body: false, + connect_options: [timeout: 1_000], + receive_timeout: 1_000 + ) do + {:ok, %{status: 200, body: "ok"}} -> {:ok, :ready} + {:ok, %{status: status}} -> {:retry, {:unexpected_status, status}} + {:error, reason} -> {:retry, reason} + end + end) do + {:ok, _value} -> :ok + {:error, reason} -> {:error, reason} + end + end + + defp fetch_node_pubkey(config, node) do + case management_call(config, node, "identity_get", %{}) do + {:ok, %{"pubkey" => pubkey}} when is_binary(pubkey) -> {:ok, String.downcase(pubkey)} + {:ok, other} -> {:error, {:unexpected_identity_payload, other}} + {:error, reason} -> {:error, {:identity_get_failed, reason}} + end + end + + defp ensure_identity_matches(node, expected_pubkey, label) do + if fetch_nip11_pubkey(node) == expected_pubkey do + :ok + else + {:error, {label, :identity_mismatch}} + end + end + + defp fetch_nip11_pubkey(node) do + relay_url = node.http_url <> "/relay" + + case Req.get( + url: relay_url, + headers: [{"accept", "application/nostr+json"}], + decode_body: false, + connect_options: [timeout: 1_000], + receive_timeout: 1_000 + ) do + {:ok, %{status: 200, body: body}} -> + case JSON.decode(body) do + {:ok, %{"pubkey" => pubkey}} when is_binary(pubkey) -> String.downcase(pubkey) + {:ok, other} -> raise "unexpected relay info payload: #{inspect(other)}" + {:error, reason} -> raise "relay info JSON decode failed: #{inspect(reason)}" + end + + {:ok, %{status: status}} -> + raise "relay info request failed with status #{status}" + + {:error, reason} -> + raise "relay info request failed: #{inspect(reason)}" + end + end + + defp ensure_acl(config, node, principal, capability, match) do + params = %{ + "principal_type" => "pubkey", + "principal" => principal, + "capability" => capability, + "match" => match + } + + case management_call(config, node, "acl_grant", params) do + {:ok, %{"ok" => true}} -> :ok + {:ok, other} -> {:error, {:unexpected_acl_result, other}} + {:error, reason} -> {:error, {:acl_grant_failed, capability, principal, reason}} + end + end + + defp configure_sync(config, node_a_pubkey) do + params = %{ + "id" => config.server_id, + "url" => config.node_a.sync_url, + "enabled?" => true, + "auth_pubkey" => node_a_pubkey, + "filters" => [config.filter], + "tls" => sync_tls_config(config.node_a.sync_url) + } + + with {:ok, _server} <- management_call(config, config.node_b, "sync_put_server", params), + {:ok, %{"ok" => true}} <- + management_call(config, config.node_b, "sync_start_server", %{"id" => config.server_id}) do + :ok + end + end + + defp sync_tls_config("wss://" <> _rest) do + raise "wss sync URLs are not supported by this harness without explicit pin configuration" + end + + defp sync_tls_config(_url) do + %{"mode" => "disabled", "pins" => []} + end + + defp publish_phase_event(config, node, phase) do + event = + %{ + "created_at" => System.system_time(:second), + "kind" => @kind, + "tags" => [ + ["r", config.resource], + ["t", @subsystem_tag], + ["run", config.run_id], + ["phase", phase] + ], + "content" => "#{phase}:#{config.run_id}" + } + |> sign_event!(config.client_private_key) + + with {:ok, client} <- RelayClient.start_link(node.websocket_url, self()), + :ok <- await_client_connect(client) do + try do + case publish_event(client, node.relay_auth_url, config.client_private_key, event) do + :ok -> {:ok, event} + {:error, reason} -> {:error, reason} + end + after + RelayClient.close(client) + end + end + end + + defp wait_for_event(config, node, event_id) do + case wait_until("event #{event_id} on #{node.websocket_url}", 20_000, 250, fn -> + filter = + config.filter + |> Map.put("ids", [event_id]) + |> Map.put("limit", 1) + + case query_events(node, config.client_private_key, filter) do + {:ok, events} -> + if Enum.any?(events, &(&1["id"] == event_id)) do + {:ok, :replicated} + else + {:retry, :missing_event} + end + + {:error, reason} -> + {:retry, reason} + end + end) do + {:ok, _value} -> :ok + {:error, reason} -> {:error, reason} + end + end + + defp wait_for_sync_connected(config, node, server_id) do + case wait_until("sync connected #{server_id}", 20_000, 250, fn -> + case management_call(config, node, "sync_server_stats", %{"id" => server_id}) do + {:ok, %{"connected" => true, "query_runs" => query_runs} = stats} + when query_runs >= 1 -> + {:ok, stats} + + {:ok, stats} -> + {:retry, stats} + + {:error, reason} -> + {:retry, reason} + end + end) do + {:ok, _value} -> :ok + {:error, reason} -> {:error, reason} + end + end + + defp fetch_sync_server_stats(config, node, server_id) do + case management_call(config, node, "sync_server_stats", %{"id" => server_id}) do + {:ok, stats} -> {:ok, stats} + {:error, reason} -> {:error, {:sync_server_stats_failed, reason}} + end + end + + defp query_events(node, private_key, filter) do + with {:ok, client} <- RelayClient.start_link(node.websocket_url, self()), + :ok <- await_client_connect(client) do + subscription_id = "node-sync-e2e-#{System.unique_integer([:positive, :monotonic])}" + + try do + :ok = RelayClient.send_json(client, ["REQ", subscription_id, filter]) + + authenticated_query( + client, + node.relay_auth_url, + private_key, + subscription_id, + [filter], + [], + false, + nil + ) + after + RelayClient.close(client) + end + end + end + + defp authenticated_query( + client, + relay_auth_url, + private_key, + subscription_id, + filters, + events, + authenticated?, + auth_event_id + ) do + receive do + {:node_sync_e2e_relay_client, ^client, :frame, ["AUTH", challenge]} -> + auth_event = + auth_event(relay_auth_url, challenge) + |> sign_event!(private_key) + + :ok = RelayClient.send_json(client, ["AUTH", auth_event]) + + authenticated_query( + client, + relay_auth_url, + private_key, + subscription_id, + filters, + events, + authenticated?, + auth_event["id"] + ) + + {:node_sync_e2e_relay_client, ^client, :frame, ["OK", event_id, true, _message]} + when event_id == auth_event_id -> + :ok = RelayClient.send_json(client, ["REQ", subscription_id | filters]) + + authenticated_query( + client, + relay_auth_url, + private_key, + subscription_id, + filters, + events, + true, + nil + ) + + {:node_sync_e2e_relay_client, ^client, :frame, ["OK", event_id, false, message]} + when event_id == auth_event_id -> + {:error, {:auth_failed, message}} + + {:node_sync_e2e_relay_client, ^client, :frame, ["EVENT", ^subscription_id, event]} -> + authenticated_query( + client, + relay_auth_url, + private_key, + subscription_id, + filters, + [event | events], + authenticated?, + auth_event_id + ) + + {:node_sync_e2e_relay_client, ^client, :frame, ["EOSE", ^subscription_id]} -> + :ok = RelayClient.send_json(client, ["CLOSE", subscription_id]) + {:ok, Enum.reverse(events)} + + {:node_sync_e2e_relay_client, ^client, :frame, ["CLOSED", ^subscription_id, message]} -> + cond do + authenticated? and not auth_required_message?(message) -> + {:error, {:subscription_closed, message}} + + auth_required_message?(message) and not is_nil(auth_event_id) -> + authenticated_query( + client, + relay_auth_url, + private_key, + subscription_id, + filters, + events, + authenticated?, + auth_event_id + ) + + true -> + {:error, {:subscription_closed, message}} + end + + {:node_sync_e2e_relay_client, ^client, :frame, {:decode_error, reason, payload}} -> + {:error, {:decode_error, reason, payload}} + + {:node_sync_e2e_relay_client, ^client, :disconnected, status} -> + {:error, {:disconnected, status.reason}} + after + @frame_timeout_ms -> {:error, :query_timeout} + end + end + + defp publish_event(client, relay_auth_url, private_key, event) do + :ok = RelayClient.send_json(client, ["EVENT", event]) + + do_publish_event( + client, + relay_auth_url, + private_key, + event, + Map.fetch!(event, "id"), + false, + nil, + false + ) + end + + defp do_publish_event( + client, + relay_auth_url, + private_key, + event, + published_event_id, + authenticated?, + auth_event_id, + replayed_after_auth? + ) do + receive do + {:node_sync_e2e_relay_client, ^client, :frame, ["AUTH", challenge]} -> + auth_event = + auth_event(relay_auth_url, challenge) + |> sign_event!(private_key) + + :ok = RelayClient.send_json(client, ["AUTH", auth_event]) + + do_publish_event( + client, + relay_auth_url, + private_key, + event, + published_event_id, + authenticated?, + auth_event["id"], + replayed_after_auth? + ) + + {:node_sync_e2e_relay_client, ^client, :frame, ["OK", event_id, true, _message]} + when event_id == auth_event_id -> + :ok = RelayClient.send_json(client, ["EVENT", event]) + + do_publish_event( + client, + relay_auth_url, + private_key, + event, + published_event_id, + true, + nil, + true + ) + + {:node_sync_e2e_relay_client, ^client, :frame, ["OK", event_id, false, message]} + when event_id == auth_event_id -> + {:error, {:auth_failed, message}} + + {:node_sync_e2e_relay_client, ^client, :frame, ["OK", event_id, true, _message]} + when event_id == published_event_id -> + :ok + + {:node_sync_e2e_relay_client, ^client, :frame, ["OK", event_id, false, message]} + when event_id == published_event_id -> + cond do + authenticated? and replayed_after_auth? and not auth_required_message?(message) -> + {:error, {:event_rejected, message}} + + auth_required_message?(message) -> + do_publish_event( + client, + relay_auth_url, + private_key, + event, + published_event_id, + authenticated?, + auth_event_id, + replayed_after_auth? + ) + + true -> + {:error, {:event_rejected, message}} + end + + {:node_sync_e2e_relay_client, ^client, :frame, {:decode_error, reason, payload}} -> + {:error, {:decode_error, reason, payload}} + + {:node_sync_e2e_relay_client, ^client, :disconnected, status} -> + {:error, {:disconnected, status.reason}} + after + @frame_timeout_ms -> {:error, :publish_timeout} + end + end + + defp await_client_connect(client) do + receive do + {:node_sync_e2e_relay_client, ^client, :connected} -> + :ok + + {:node_sync_e2e_relay_client, ^client, :disconnected, status} -> + {:error, {:disconnected, status.reason}} + after + @frame_timeout_ms -> {:error, :connect_timeout} + end + end + + defp management_call(config, node, method, params) do + url = node.http_url <> "/management" + + auth_header = + nip98_event("POST", url) + |> sign_event!(config.admin_private_key) + |> then(&("Nostr " <> Base.encode64(JSON.encode!(&1)))) + + case Req.post( + url: url, + headers: [{"authorization", auth_header}], + json: %{"method" => method, "params" => params}, + decode_body: false, + connect_options: [timeout: 1_000], + receive_timeout: 5_000 + ) do + {:ok, %{status: 200, body: body}} -> + decode_management_response(body) + + {:ok, %{status: status, body: body}} -> + {:error, {:management_http_error, status, body}} + + {:error, reason} -> + {:error, reason} + end + end + + defp decode_management_response(body) when is_binary(body) do + with {:ok, %{"ok" => true, "result" => result}} <- JSON.decode(body) do + {:ok, result} + else + {:ok, %{"ok" => false, "error" => error}} -> {:error, {:management_error, error}} + {:ok, other} -> {:error, {:unexpected_management_response, other}} + {:error, reason} -> {:error, {:invalid_management_json, reason}} + end + end + + defp sign_event!(event, private_key_hex) do + {:ok, pubkey} = derive_pubkey(private_key_hex) + seckey = Base.decode16!(String.downcase(private_key_hex), case: :lower) + + unsigned_event = + event + |> Map.put("pubkey", pubkey) + |> Map.put("sig", String.duplicate("0", 128)) + + id = Auth.compute_event_id(unsigned_event) + + signature = + id + |> Base.decode16!(case: :lower) + |> Secp256k1.schnorr_sign(seckey) + |> Base.encode16(case: :lower) + + unsigned_event + |> Map.put("id", id) + |> Map.put("sig", signature) + end + + defp derive_pubkey(private_key_hex) do + normalized = String.downcase(private_key_hex) + + case Base.decode16(normalized, case: :lower) do + {:ok, <<_::256>> = seckey} -> + pubkey = + seckey + |> Secp256k1.pubkey(:xonly) + |> Base.encode16(case: :lower) + + {:ok, pubkey} + + _other -> + {:error, {:invalid_private_key, private_key_hex}} + end + rescue + _error -> {:error, {:invalid_private_key, private_key_hex}} + end + + defp nip98_event(method, url) do + %{ + "created_at" => System.system_time(:second), + "kind" => 27_235, + "tags" => [["method", method], ["u", url]], + "content" => "" + } + end + + defp auth_event(relay_auth_url, challenge) do + %{ + "created_at" => System.system_time(:second), + "kind" => 22_242, + "tags" => [["challenge", challenge], ["relay", relay_auth_url]], + "content" => "" + } + end + + defp wait_until(label, timeout_ms, interval_ms, fun) do + started_at = System.monotonic_time(:millisecond) + do_wait_until(label, timeout_ms, interval_ms, started_at, fun) + end + + defp do_wait_until(label, timeout_ms, interval_ms, started_at, fun) do + case fun.() do + {:ok, value} -> + {:ok, value} + + {:retry, reason} -> + if System.monotonic_time(:millisecond) - started_at >= timeout_ms do + {:error, {:timeout, label, reason}} + else + Process.sleep(interval_ms) + do_wait_until(label, timeout_ms, interval_ms, started_at, fun) + end + end + end + + defp load_state(path) do + case File.read(path) do + {:ok, body} -> + case JSON.decode(body) do + {:ok, state} when is_map(state) -> state + {:ok, _other} -> %{} + {:error, _reason} -> %{} + end + + {:error, :enoent} -> + %{} + + {:error, reason} -> + raise "failed to read state file #{path}: #{inspect(reason)}" + end + end + + defp save_state(path, attrs) when is_binary(path) and is_map(attrs) do + existing = load_state(path) + merged = Map.merge(existing, attrs) + + with :ok <- File.mkdir_p(Path.dirname(path)), + :ok <- File.write(path, JSON.encode!(merged)) do + :ok + end + end + + defp ensure_run_matches(config, %{"run_id" => run_id}) when run_id == config.run_id, do: :ok + + defp ensure_run_matches(config, %{"run_id" => run_id}), + do: {:error, {:run_id_mismatch, run_id, config.run_id}} + + defp ensure_run_matches(_config, %{}), do: :ok + + defp fetch_state_value(state, key) do + case Map.fetch(state, key) do + {:ok, value} -> {:ok, value} + :error -> {:error, {:missing_state_value, key}} + end + end + + defp ensure_minimum_counter(stats, key, minimum) do + case Map.get(stats, key) do + value when is_integer(value) and value >= minimum -> :ok + _other -> {:error, {:unexpected_sync_stats, stats}} + end + end + + defp auth_required_message?(message) when is_binary(message) do + String.contains?(String.downcase(message), "auth") + end + + defp auth_required_message?(_message), do: false + + defp default_run_id do + "run-#{System.system_time(:millisecond)}-#{System.unique_integer([:positive, :monotonic])}" + end + + defp format_reason({:timeout, label, reason}), + do: "timeout waiting for #{label}: #{inspect(reason)}" + + defp format_reason({:invalid_arguments, invalid}), + do: "invalid arguments: #{inspect(invalid)}" + + defp format_reason({:missing_env, env_var}), + do: "missing environment variable #{env_var}" + + defp format_reason({:unknown_command, command}), + do: "unknown command #{command}" + + defp format_reason({:run_id_mismatch, stored_run_id, requested_run_id}), + do: "state file run id #{stored_run_id} does not match requested run id #{requested_run_id}" + + defp format_reason({:missing_state_value, key}), + do: "state file is missing #{key}" + + defp format_reason(:missing_command), + do: + "usage: elixir scripts/node_sync_e2e.exs --state-file " + + defp format_reason(:missing_state_file), + do: "--state-file is required" + + defp format_reason(reason), do: inspect(reason) +end + +NodeSyncE2E.Runner.main(System.argv()) diff --git a/scripts/run_node_sync_docker_e2e.sh b/scripts/run_node_sync_docker_e2e.sh new file mode 100755 index 0000000..7b80da2 --- /dev/null +++ b/scripts/run_node_sync_docker_e2e.sh @@ -0,0 +1,111 @@ +#!/usr/bin/env bash +set -euo pipefail + +ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)" +cd "$ROOT_DIR" + +RUN_ID="${PARRHESIA_NODE_SYNC_E2E_RUN_ID:-docker-$(date +%s)}" +RESOURCE="${PARRHESIA_NODE_SYNC_E2E_RESOURCE:-tribes.accounts.user}" +RUNNER_MIX_ENV="${PARRHESIA_NODE_SYNC_E2E_RUNNER_MIX_ENV:-test}" +TMP_DIR="${PARRHESIA_NODE_SYNC_E2E_TMP_DIR:-$(mktemp -d "${TMPDIR:-/tmp}/parrhesia-node-sync-docker-e2e.XXXXXX")}" +STATE_FILE="$TMP_DIR/state.json" +COMPOSE_FILE="$ROOT_DIR/compose.node-sync-e2e.yaml" +COMPOSE_PROJECT_SUFFIX="$(basename "$TMP_DIR" | tr '[:upper:]' '[:lower:]' | tr -c 'a-z0-9' '_')" +COMPOSE_PROJECT_NAME="${COMPOSE_PROJECT_NAME:-parrhesia-node-sync-e2e-${COMPOSE_PROJECT_SUFFIX}}" +export COMPOSE_PROJECT_NAME + +NODE_A_HOST_PORT="${PARRHESIA_NODE_A_HOST_PORT:-45131}" +NODE_B_HOST_PORT="${PARRHESIA_NODE_B_HOST_PORT:-45132}" +NODE_A_HTTP_URL="http://127.0.0.1:${NODE_A_HOST_PORT}" +NODE_B_HTTP_URL="http://127.0.0.1:${NODE_B_HOST_PORT}" +NODE_A_WS_URL="ws://127.0.0.1:${NODE_A_HOST_PORT}/relay" +NODE_B_WS_URL="ws://127.0.0.1:${NODE_B_HOST_PORT}/relay" +NODE_A_INTERNAL_RELAY_URL="${PARRHESIA_NODE_A_INTERNAL_RELAY_URL:-ws://parrhesia-a:4413/relay}" +NODE_B_INTERNAL_RELAY_URL="${PARRHESIA_NODE_B_INTERNAL_RELAY_URL:-ws://parrhesia-b:4413/relay}" +printf -v PROTECTED_FILTERS_JSON '[{"kinds":[5000],"#r":["%s"]}]' "$RESOURCE" + +cleanup() { + docker compose -f "$COMPOSE_FILE" down -v >/dev/null 2>&1 || true + + if [[ "${PARRHESIA_NODE_SYNC_E2E_KEEP_TMP:-0}" != "1" ]]; then + rm -rf "$TMP_DIR" + fi +} + +trap cleanup EXIT INT TERM + +load_docker_image() { + if [[ -n "${PARRHESIA_IMAGE:-}" ]]; then + return + fi + + if [[ "$(uname -s)" != "Linux" ]]; then + echo "PARRHESIA_IMAGE must be set on non-Linux hosts; .#dockerImage is Linux-only." >&2 + exit 1 + fi + + local image_path + image_path="$(nix build .#dockerImage --print-out-paths --no-link)" + docker load <"$image_path" >/dev/null + export PARRHESIA_IMAGE="parrhesia:latest" +} + +wait_for_health() { + local url="$1" + local label="$2" + + for _ in {1..150}; do + if curl -fsS "$url/health" >/dev/null 2>&1; then + return + fi + + sleep 0.2 + done + + echo "${label} did not become healthy at ${url}" >&2 + exit 1 +} + +run_runner() { + ERL_LIBS="_build/${RUNNER_MIX_ENV}/lib" \ + elixir scripts/node_sync_e2e.exs "$@" --state-file "$STATE_FILE" +} + +load_docker_image +MIX_ENV="$RUNNER_MIX_ENV" mix compile >/dev/null + +export PARRHESIA_NODE_A_HOST_PORT +export PARRHESIA_NODE_B_HOST_PORT +export PARRHESIA_NODE_A_RELAY_URL="$NODE_A_INTERNAL_RELAY_URL" +export PARRHESIA_NODE_B_RELAY_URL="$NODE_B_INTERNAL_RELAY_URL" +export PARRHESIA_ACL_PROTECTED_FILTERS="$PROTECTED_FILTERS_JSON" + +docker compose -f "$COMPOSE_FILE" up -d db-a db-b +docker compose -f "$COMPOSE_FILE" run -T --rm migrate-a +docker compose -f "$COMPOSE_FILE" run -T --rm migrate-b +docker compose -f "$COMPOSE_FILE" up -d parrhesia-a parrhesia-b + +wait_for_health "$NODE_A_HTTP_URL" "Node A" +wait_for_health "$NODE_B_HTTP_URL" "Node B" + +export PARRHESIA_NODE_SYNC_E2E_RUN_ID="$RUN_ID" +export PARRHESIA_NODE_SYNC_E2E_RESOURCE="$RESOURCE" +export PARRHESIA_NODE_A_HTTP_URL="$NODE_A_HTTP_URL" +export PARRHESIA_NODE_B_HTTP_URL="$NODE_B_HTTP_URL" +export PARRHESIA_NODE_A_WS_URL="$NODE_A_WS_URL" +export PARRHESIA_NODE_B_WS_URL="$NODE_B_WS_URL" +export PARRHESIA_NODE_A_RELAY_AUTH_URL="$NODE_A_INTERNAL_RELAY_URL" +export PARRHESIA_NODE_B_RELAY_AUTH_URL="$NODE_B_INTERNAL_RELAY_URL" +export PARRHESIA_NODE_A_SYNC_URL="$NODE_A_INTERNAL_RELAY_URL" +export PARRHESIA_NODE_B_SYNC_URL="$NODE_B_INTERNAL_RELAY_URL" + +run_runner bootstrap + +docker compose -f "$COMPOSE_FILE" stop parrhesia-b +run_runner publish-resume +docker compose -f "$COMPOSE_FILE" up -d parrhesia-b + +wait_for_health "$NODE_B_HTTP_URL" "Node B" +run_runner verify-resume + +printf 'node-sync-e2e docker run completed\nstate: %s\n' "$STATE_FILE" diff --git a/scripts/run_node_sync_e2e.sh b/scripts/run_node_sync_e2e.sh new file mode 100755 index 0000000..807b1be --- /dev/null +++ b/scripts/run_node_sync_e2e.sh @@ -0,0 +1,227 @@ +#!/usr/bin/env bash +set -euo pipefail + +ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)" +cd "$ROOT_DIR" + +RUN_ID="${PARRHESIA_NODE_SYNC_E2E_RUN_ID:-local-$(date +%s)}" +RESOURCE="${PARRHESIA_NODE_SYNC_E2E_RESOURCE:-tribes.accounts.user}" +RUNNER_MIX_ENV="${PARRHESIA_NODE_SYNC_E2E_RUNNER_MIX_ENV:-test}" +TMP_DIR="${PARRHESIA_NODE_SYNC_E2E_TMP_DIR:-$(mktemp -d "${TMPDIR:-/tmp}/parrhesia-node-sync-e2e.XXXXXX")}" +STATE_FILE="$TMP_DIR/state.json" +LOG_DIR="$TMP_DIR/logs" +mkdir -p "$LOG_DIR" + +SUFFIX="$(basename "$TMP_DIR" | tr -c 'a-zA-Z0-9' '_')" +DB_NAME_A="${PARRHESIA_NODE_SYNC_E2E_DB_A:-parrhesia_node_sync_a_${SUFFIX}}" +DB_NAME_B="${PARRHESIA_NODE_SYNC_E2E_DB_B:-parrhesia_node_sync_b_${SUFFIX}}" + +port_in_use() { + local port="$1" + + if command -v ss >/dev/null 2>&1; then + ss -ltn "( sport = :${port} )" | tail -n +2 | grep -q . + return + fi + + if command -v lsof >/dev/null 2>&1; then + lsof -nP -iTCP:"${port}" -sTCP:LISTEN >/dev/null 2>&1 + return + fi + + echo "Neither ss nor lsof is available for checking port usage." >&2 + exit 1 +} + +pick_port() { + local port + + while true; do + port="$(( (RANDOM % 10000) + 40000 ))" + + if ! port_in_use "$port"; then + printf '%s\n' "$port" + return + fi + done +} + +NODE_A_PORT="${PARRHESIA_NODE_A_PORT:-$(pick_port)}" +NODE_B_PORT="${PARRHESIA_NODE_B_PORT:-$(pick_port)}" + +if [[ "$NODE_A_PORT" == "$NODE_B_PORT" ]]; then + echo "Node A and Node B ports must differ." >&2 + exit 1 +fi + +database_url_for() { + local database_name="$1" + local pg_user="${PGUSER:-${USER:-agent}}" + local pg_host="${PGHOST:-localhost}" + local pg_port="${PGPORT:-5432}" + + if [[ "$pg_host" == /* ]]; then + if [[ -n "${PGPASSWORD:-}" ]]; then + printf 'ecto://%s:%s@localhost/%s?socket_dir=%s&port=%s\n' \ + "$pg_user" "$PGPASSWORD" "$database_name" "$pg_host" "$pg_port" + else + printf 'ecto://%s@localhost/%s?socket_dir=%s&port=%s\n' \ + "$pg_user" "$database_name" "$pg_host" "$pg_port" + fi + else + if [[ -n "${PGPASSWORD:-}" ]]; then + printf 'ecto://%s:%s@%s:%s/%s\n' \ + "$pg_user" "$PGPASSWORD" "$pg_host" "$pg_port" "$database_name" + else + printf 'ecto://%s@%s:%s/%s\n' \ + "$pg_user" "$pg_host" "$pg_port" "$database_name" + fi + fi +} + +DATABASE_URL_A="$(database_url_for "$DB_NAME_A")" +DATABASE_URL_B="$(database_url_for "$DB_NAME_B")" +printf -v PROTECTED_FILTERS_JSON '[{"kinds":[5000],"#r":["%s"]}]' "$RESOURCE" + +cleanup() { + if [[ -n "${NODE_A_PID:-}" ]] && kill -0 "$NODE_A_PID" 2>/dev/null; then + kill "$NODE_A_PID" 2>/dev/null || true + wait "$NODE_A_PID" 2>/dev/null || true + fi + + if [[ -n "${NODE_B_PID:-}" ]] && kill -0 "$NODE_B_PID" 2>/dev/null; then + kill "$NODE_B_PID" 2>/dev/null || true + wait "$NODE_B_PID" 2>/dev/null || true + fi + + if [[ "${PARRHESIA_NODE_SYNC_E2E_DROP_DB_ON_EXIT:-1}" == "1" ]]; then + DATABASE_URL="$DATABASE_URL_A" MIX_ENV=prod mix ecto.drop --quiet --force || true + DATABASE_URL="$DATABASE_URL_B" MIX_ENV=prod mix ecto.drop --quiet --force || true + fi + + if [[ "${PARRHESIA_NODE_SYNC_E2E_KEEP_TMP:-0}" != "1" ]]; then + rm -rf "$TMP_DIR" + fi +} + +trap cleanup EXIT INT TERM + +wait_for_health() { + local port="$1" + local label="$2" + + for _ in {1..150}; do + if curl -fsS "http://127.0.0.1:${port}/health" >/dev/null 2>&1; then + return + fi + + sleep 0.1 + done + + echo "${label} did not become healthy on port ${port}" >&2 + exit 1 +} + +setup_database() { + local database_url="$1" + + DATABASE_URL="$database_url" MIX_ENV=prod mix ecto.drop --quiet --force || true + DATABASE_URL="$database_url" MIX_ENV=prod mix ecto.create --quiet + DATABASE_URL="$database_url" MIX_ENV=prod mix ecto.migrate --quiet +} + +start_node() { + local node_name="$1" + local port="$2" + local database_url="$3" + local relay_url="$4" + local identity_path="$5" + local sync_path="$6" + local log_path="$7" + + DATABASE_URL="$database_url" \ + PORT="$port" \ + PARRHESIA_RELAY_URL="$relay_url" \ + PARRHESIA_ACL_PROTECTED_FILTERS="$PROTECTED_FILTERS_JSON" \ + PARRHESIA_IDENTITY_PATH="$identity_path" \ + PARRHESIA_SYNC_PATH="$sync_path" \ + MIX_ENV=prod \ + mix run --no-halt >"$log_path" 2>&1 & + + if [[ "$node_name" == "a" ]]; then + NODE_A_PID=$! + else + NODE_B_PID=$! + fi +} + +run_runner() { + ERL_LIBS="_build/${RUNNER_MIX_ENV}/lib" \ + elixir scripts/node_sync_e2e.exs "$@" --state-file "$STATE_FILE" +} + +export DATABASE_URL="$DATABASE_URL_A" +MIX_ENV=prod mix compile +MIX_ENV="$RUNNER_MIX_ENV" mix compile >/dev/null + +setup_database "$DATABASE_URL_A" +setup_database "$DATABASE_URL_B" + +NODE_A_HTTP_URL="http://127.0.0.1:${NODE_A_PORT}" +NODE_B_HTTP_URL="http://127.0.0.1:${NODE_B_PORT}" +NODE_A_WS_URL="ws://127.0.0.1:${NODE_A_PORT}/relay" +NODE_B_WS_URL="ws://127.0.0.1:${NODE_B_PORT}/relay" + +start_node \ + a \ + "$NODE_A_PORT" \ + "$DATABASE_URL_A" \ + "$NODE_A_WS_URL" \ + "$TMP_DIR/node-a-identity.json" \ + "$TMP_DIR/node-a-sync.json" \ + "$LOG_DIR/node-a.log" + +start_node \ + b \ + "$NODE_B_PORT" \ + "$DATABASE_URL_B" \ + "$NODE_B_WS_URL" \ + "$TMP_DIR/node-b-identity.json" \ + "$TMP_DIR/node-b-sync.json" \ + "$LOG_DIR/node-b.log" + +wait_for_health "$NODE_A_PORT" "Node A" +wait_for_health "$NODE_B_PORT" "Node B" + +export PARRHESIA_NODE_SYNC_E2E_RUN_ID="$RUN_ID" +export PARRHESIA_NODE_SYNC_E2E_RESOURCE="$RESOURCE" +export PARRHESIA_NODE_A_HTTP_URL="$NODE_A_HTTP_URL" +export PARRHESIA_NODE_B_HTTP_URL="$NODE_B_HTTP_URL" +export PARRHESIA_NODE_A_WS_URL="$NODE_A_WS_URL" +export PARRHESIA_NODE_B_WS_URL="$NODE_B_WS_URL" +export PARRHESIA_NODE_A_RELAY_AUTH_URL="$NODE_A_WS_URL" +export PARRHESIA_NODE_B_RELAY_AUTH_URL="$NODE_B_WS_URL" +export PARRHESIA_NODE_A_SYNC_URL="$NODE_A_WS_URL" +export PARRHESIA_NODE_B_SYNC_URL="$NODE_B_WS_URL" + +run_runner bootstrap + +kill "$NODE_B_PID" +wait "$NODE_B_PID" 2>/dev/null || true +unset NODE_B_PID + +run_runner publish-resume + +start_node \ + b \ + "$NODE_B_PORT" \ + "$DATABASE_URL_B" \ + "$NODE_B_WS_URL" \ + "$TMP_DIR/node-b-identity.json" \ + "$TMP_DIR/node-b-sync.json" \ + "$LOG_DIR/node-b.log" + +wait_for_health "$NODE_B_PORT" "Node B" +run_runner verify-resume + +printf 'node-sync-e2e local run completed\nlogs: %s\n' "$LOG_DIR" diff --git a/test/parrhesia/web/connection_test.exs b/test/parrhesia/web/connection_test.exs index 4c21ffc..82802c6 100644 --- a/test/parrhesia/web/connection_test.exs +++ b/test/parrhesia/web/connection_test.exs @@ -12,6 +12,7 @@ defmodule Parrhesia.Web.ConnectionTest do alias Parrhesia.Web.Connection setup do + ensure_repo_started() :ok = Sandbox.checkout(Repo) ensure_stream_runtime_started() :ok @@ -895,6 +896,12 @@ defmodule Parrhesia.Web.ConnectionTest do end end + defp ensure_repo_started do + if is_nil(Process.whereis(Repo)) do + start_supervised!(Repo) + end + end + defp listener(overrides) do base = %{ id: :test,