More E2E tests

This commit is contained in:
2026-03-19 02:03:41 +01:00
parent f70d50933d
commit 9947635855
4 changed files with 550 additions and 27 deletions

View File

@@ -2,12 +2,14 @@ defmodule NodeSyncE2E.RelayClient do
use WebSockex
def start_link(url, owner, opts \\ []) do
WebSockex.start_link(
url,
__MODULE__,
owner,
Keyword.put(opts, :handle_initial_conn_failure, true)
)
ws_opts =
opts
|> Keyword.put_new(:handle_initial_conn_failure, true)
|> Keyword.put_new(:async, true)
|> Keyword.put_new(:socket_connect_timeout, 2_000)
|> Keyword.put_new(:socket_recv_timeout, 2_000)
WebSockex.start_link(url, __MODULE__, owner, ws_opts)
end
def send_json(pid, payload) do
@@ -163,6 +165,84 @@ defmodule NodeSyncE2E.Runner do
end
end
defp dispatch("filter-selectivity", config, opts) do
with {:ok, state_file} <- fetch_state_file(opts),
state = load_state(state_file),
:ok <- ensure_run_matches(config, state),
:ok <- ensure_nodes_ready(config),
:ok <- wait_for_sync_connected(config, config.node_b, config.server_id),
{:ok, non_matching_event} <- publish_non_matching_event(config, config.node_a),
_ = Process.sleep(2_000),
:ok <- ensure_event_absent(config, config.node_b, non_matching_event["id"]),
:ok <-
save_state(state_file, %{
"run_id" => config.run_id,
"resource" => config.resource,
"server_id" => config.server_id,
"non_matching_event_id" => non_matching_event["id"]
}) do
:ok
end
end
defp dispatch("sync-stop-restart", config, opts) do
with {:ok, state_file} <- fetch_state_file(opts),
state = load_state(state_file),
:ok <- ensure_run_matches(config, state),
:ok <- ensure_nodes_ready(config),
{:ok, %{"ok" => true}} <-
management_call(config, config.node_b, "sync_stop_server", %{"id" => config.server_id}),
{:ok, while_stopped_event} <-
publish_phase_event(config, config.node_a, "while-stopped"),
_ = Process.sleep(2_000),
:ok <- ensure_event_absent(config, config.node_b, while_stopped_event["id"]),
{:ok, %{"ok" => true}} <-
management_call(config, config.node_b, "sync_start_server", %{
"id" => config.server_id
}),
:ok <- wait_for_sync_connected(config, config.node_b, config.server_id),
:ok <- wait_for_event(config, config.node_b, while_stopped_event["id"]),
:ok <-
save_state(state_file, %{
"run_id" => config.run_id,
"resource" => config.resource,
"server_id" => config.server_id,
"while_stopped_event_id" => while_stopped_event["id"]
}) do
:ok
end
end
defp dispatch("bidirectional-sync", config, opts) do
with {:ok, state_file} <- fetch_state_file(opts),
state = load_state(state_file),
:ok <- ensure_run_matches(config, state),
{:ok, node_a_pubkey} <- fetch_state_value(state, "node_a_pubkey"),
{:ok, node_b_pubkey} <- fetch_state_value(state, "node_b_pubkey"),
:ok <- ensure_nodes_ready(config),
:ok <- ensure_acl(config, config.node_b, node_a_pubkey, "sync_read", config.filter),
:ok <-
ensure_acl(config, config.node_b, config.client_pubkey, "sync_write", config.filter),
:ok <- ensure_acl(config, config.node_a, node_b_pubkey, "sync_write", config.filter),
:ok <-
ensure_acl(config, config.node_a, config.client_pubkey, "sync_read", config.filter),
reverse_server_id = "node-b-upstream",
:ok <- configure_reverse_sync(config, node_b_pubkey, reverse_server_id),
:ok <- wait_for_sync_connected(config, config.node_a, reverse_server_id),
{:ok, bidir_event} <- publish_phase_event(config, config.node_b, "bidirectional"),
:ok <- wait_for_event(config, config.node_a, bidir_event["id"]),
:ok <-
save_state(state_file, %{
"run_id" => config.run_id,
"resource" => config.resource,
"server_id" => config.server_id,
"reverse_server_id" => reverse_server_id,
"bidir_event_id" => bidir_event["id"]
}) do
:ok
end
end
defp dispatch(other, _config, _opts), do: {:error, {:unknown_command, other}}
defp fetch_state_file(opts) do
@@ -334,6 +414,77 @@ defmodule NodeSyncE2E.Runner do
%{"mode" => "disabled", "pins" => []}
end
defp configure_reverse_sync(config, node_b_pubkey, reverse_server_id) do
params = %{
"id" => reverse_server_id,
"url" => config.node_b.sync_url,
"enabled?" => true,
"auth_pubkey" => node_b_pubkey,
"filters" => [config.filter],
"tls" => sync_tls_config(config.node_b.sync_url)
}
with {:ok, _server} <- management_call(config, config.node_a, "sync_put_server", params),
{:ok, %{"ok" => true}} <-
management_call(config, config.node_a, "sync_start_server", %{
"id" => reverse_server_id
}) do
:ok
end
end
defp publish_non_matching_event(config, node) do
event =
%{
"created_at" => System.system_time(:second),
"kind" => 5001,
"tags" => [
["r", config.resource],
["t", @subsystem_tag],
["run", config.run_id],
["phase", "filter-selectivity"]
],
"content" => "filter-selectivity:#{config.run_id}"
}
|> sign_event!(config.client_private_key)
with {:ok, client} <- RelayClient.start_link(node.websocket_url, self()),
:ok <- await_client_connect(client) do
try do
case publish_event(client, node.relay_auth_url, config.client_private_key, event) do
:ok -> {:ok, event}
{:error, reason} -> {:error, reason}
end
after
RelayClient.close(client)
end
end
end
defp ensure_event_absent(config, node, event_id) do
filter = %{
"kinds" => [5001],
"#r" => [config.resource],
"ids" => [event_id],
"limit" => 1
}
case query_events(node, config.client_private_key, filter) do
{:ok, []} ->
:ok
{:ok, events} when is_list(events) ->
if Enum.any?(events, &(&1["id"] == event_id)) do
{:error, {:unexpected_replication, event_id}}
else
:ok
end
{:error, reason} ->
{:error, {:event_absence_query_failed, reason}}
end
end
defp publish_phase_event(config, node, phase) do
event =
%{
@@ -428,7 +579,8 @@ defmodule NodeSyncE2E.Runner do
[filter],
[],
false,
nil
nil,
0
)
after
RelayClient.close(client)
@@ -444,26 +596,32 @@ defmodule NodeSyncE2E.Runner do
filters,
events,
authenticated?,
auth_event_id
auth_event_id,
auth_attempts
) do
receive do
{:node_sync_e2e_relay_client, ^client, :frame, ["AUTH", challenge]} ->
auth_event =
auth_event(relay_auth_url, challenge)
|> sign_event!(private_key)
if auth_attempts >= 5 do
{:error, :too_many_auth_challenges}
else
auth_event =
auth_event(relay_auth_url, challenge)
|> sign_event!(private_key)
:ok = RelayClient.send_json(client, ["AUTH", auth_event])
:ok = RelayClient.send_json(client, ["AUTH", auth_event])
authenticated_query(
client,
relay_auth_url,
private_key,
subscription_id,
filters,
events,
authenticated?,
auth_event["id"]
)
authenticated_query(
client,
relay_auth_url,
private_key,
subscription_id,
filters,
events,
authenticated?,
auth_event["id"],
auth_attempts + 1
)
end
{:node_sync_e2e_relay_client, ^client, :frame, ["OK", event_id, true, _message]}
when event_id == auth_event_id ->
@@ -477,7 +635,8 @@ defmodule NodeSyncE2E.Runner do
filters,
events,
true,
nil
nil,
auth_attempts
)
{:node_sync_e2e_relay_client, ^client, :frame, ["OK", event_id, false, message]}
@@ -493,7 +652,8 @@ defmodule NodeSyncE2E.Runner do
filters,
[event | events],
authenticated?,
auth_event_id
auth_event_id,
auth_attempts
)
{:node_sync_e2e_relay_client, ^client, :frame, ["EOSE", ^subscription_id]} ->
@@ -514,7 +674,8 @@ defmodule NodeSyncE2E.Runner do
filters,
events,
authenticated?,
auth_event_id
auth_event_id,
auth_attempts
)
true ->
@@ -838,9 +999,12 @@ defmodule NodeSyncE2E.Runner do
defp format_reason({:missing_state_value, key}),
do: "state file is missing #{key}"
defp format_reason({:unexpected_replication, event_id}),
do: "event #{event_id} should not have replicated but was found"
defp format_reason(:missing_command),
do:
"usage: elixir scripts/node_sync_e2e.exs <bootstrap|publish-resume|verify-resume> --state-file <path>"
"usage: elixir scripts/node_sync_e2e.exs <bootstrap|publish-resume|verify-resume|filter-selectivity|sync-stop-restart|bidirectional-sync> --state-file <path>"
defp format_reason(:missing_state_file),
do: "--state-file is required"

View File

@@ -108,4 +108,8 @@ docker compose -f "$COMPOSE_FILE" up -d parrhesia-b
wait_for_health "$NODE_B_HTTP_URL" "Node B"
run_runner verify-resume
run_runner filter-selectivity
run_runner sync-stop-restart
run_runner bidirectional-sync
printf 'node-sync-e2e docker run completed\nstate: %s\n' "$STATE_FILE"

View File

@@ -115,7 +115,7 @@ wait_for_health() {
return
fi
sleep 0.1
sleep 0.2
done
echo "${label} did not become healthy on port ${port}" >&2
@@ -224,4 +224,8 @@ start_node \
wait_for_health "$NODE_B_PORT" "Node B"
run_runner verify-resume
run_runner filter-selectivity
run_runner sync-stop-restart
run_runner bidirectional-sync
printf 'node-sync-e2e local run completed\nlogs: %s\n' "$LOG_DIR"