More E2E tests
Some checks failed
CI / Test (OTP 27.2 / Elixir 1.18.2) (push) Failing after 0s
CI / Test (OTP 28.4 / Elixir 1.19.4 + E2E) (push) Failing after 0s

This commit is contained in:
2026-03-19 02:03:41 +01:00
parent f70d50933d
commit b8f1bea459
3 changed files with 545 additions and 26 deletions

View File

@@ -2,12 +2,14 @@ defmodule NodeSyncE2E.RelayClient do
use WebSockex use WebSockex
def start_link(url, owner, opts \\ []) do def start_link(url, owner, opts \\ []) do
WebSockex.start_link( ws_opts =
url, opts
__MODULE__, |> Keyword.put_new(:handle_initial_conn_failure, true)
owner, |> Keyword.put_new(:async, true)
Keyword.put(opts, :handle_initial_conn_failure, true) |> Keyword.put_new(:socket_connect_timeout, 2_000)
) |> Keyword.put_new(:socket_recv_timeout, 2_000)
WebSockex.start_link(url, __MODULE__, owner, ws_opts)
end end
def send_json(pid, payload) do def send_json(pid, payload) do
@@ -163,6 +165,84 @@ defmodule NodeSyncE2E.Runner do
end end
end end
defp dispatch("filter-selectivity", config, opts) do
with {:ok, state_file} <- fetch_state_file(opts),
state = load_state(state_file),
:ok <- ensure_run_matches(config, state),
:ok <- ensure_nodes_ready(config),
:ok <- wait_for_sync_connected(config, config.node_b, config.server_id),
{:ok, non_matching_event} <- publish_non_matching_event(config, config.node_a),
_ = Process.sleep(2_000),
:ok <- ensure_event_absent(config, config.node_b, non_matching_event["id"]),
:ok <-
save_state(state_file, %{
"run_id" => config.run_id,
"resource" => config.resource,
"server_id" => config.server_id,
"non_matching_event_id" => non_matching_event["id"]
}) do
:ok
end
end
defp dispatch("sync-stop-restart", config, opts) do
with {:ok, state_file} <- fetch_state_file(opts),
state = load_state(state_file),
:ok <- ensure_run_matches(config, state),
:ok <- ensure_nodes_ready(config),
{:ok, %{"ok" => true}} <-
management_call(config, config.node_b, "sync_stop_server", %{"id" => config.server_id}),
{:ok, while_stopped_event} <-
publish_phase_event(config, config.node_a, "while-stopped"),
_ = Process.sleep(2_000),
:ok <- ensure_event_absent(config, config.node_b, while_stopped_event["id"]),
{:ok, %{"ok" => true}} <-
management_call(config, config.node_b, "sync_start_server", %{
"id" => config.server_id
}),
:ok <- wait_for_sync_connected(config, config.node_b, config.server_id),
:ok <- wait_for_event(config, config.node_b, while_stopped_event["id"]),
:ok <-
save_state(state_file, %{
"run_id" => config.run_id,
"resource" => config.resource,
"server_id" => config.server_id,
"while_stopped_event_id" => while_stopped_event["id"]
}) do
:ok
end
end
defp dispatch("bidirectional-sync", config, opts) do
with {:ok, state_file} <- fetch_state_file(opts),
state = load_state(state_file),
:ok <- ensure_run_matches(config, state),
{:ok, node_a_pubkey} <- fetch_state_value(state, "node_a_pubkey"),
{:ok, node_b_pubkey} <- fetch_state_value(state, "node_b_pubkey"),
:ok <- ensure_nodes_ready(config),
:ok <- ensure_acl(config, config.node_b, node_a_pubkey, "sync_read", config.filter),
:ok <-
ensure_acl(config, config.node_b, config.client_pubkey, "sync_write", config.filter),
:ok <- ensure_acl(config, config.node_a, node_b_pubkey, "sync_write", config.filter),
:ok <-
ensure_acl(config, config.node_a, config.client_pubkey, "sync_read", config.filter),
reverse_server_id = "node-b-upstream",
:ok <- configure_reverse_sync(config, node_b_pubkey, reverse_server_id),
:ok <- wait_for_sync_connected(config, config.node_a, reverse_server_id),
{:ok, bidir_event} <- publish_phase_event(config, config.node_b, "bidirectional"),
:ok <- wait_for_event(config, config.node_a, bidir_event["id"]),
:ok <-
save_state(state_file, %{
"run_id" => config.run_id,
"resource" => config.resource,
"server_id" => config.server_id,
"reverse_server_id" => reverse_server_id,
"bidir_event_id" => bidir_event["id"]
}) do
:ok
end
end
defp dispatch(other, _config, _opts), do: {:error, {:unknown_command, other}} defp dispatch(other, _config, _opts), do: {:error, {:unknown_command, other}}
defp fetch_state_file(opts) do defp fetch_state_file(opts) do
@@ -334,6 +414,77 @@ defmodule NodeSyncE2E.Runner do
%{"mode" => "disabled", "pins" => []} %{"mode" => "disabled", "pins" => []}
end end
defp configure_reverse_sync(config, node_b_pubkey, reverse_server_id) do
params = %{
"id" => reverse_server_id,
"url" => config.node_b.sync_url,
"enabled?" => true,
"auth_pubkey" => node_b_pubkey,
"filters" => [config.filter],
"tls" => sync_tls_config(config.node_b.sync_url)
}
with {:ok, _server} <- management_call(config, config.node_a, "sync_put_server", params),
{:ok, %{"ok" => true}} <-
management_call(config, config.node_a, "sync_start_server", %{
"id" => reverse_server_id
}) do
:ok
end
end
defp publish_non_matching_event(config, node) do
event =
%{
"created_at" => System.system_time(:second),
"kind" => 5001,
"tags" => [
["r", config.resource],
["t", @subsystem_tag],
["run", config.run_id],
["phase", "filter-selectivity"]
],
"content" => "filter-selectivity:#{config.run_id}"
}
|> sign_event!(config.client_private_key)
with {:ok, client} <- RelayClient.start_link(node.websocket_url, self()),
:ok <- await_client_connect(client) do
try do
case publish_event(client, node.relay_auth_url, config.client_private_key, event) do
:ok -> {:ok, event}
{:error, reason} -> {:error, reason}
end
after
RelayClient.close(client)
end
end
end
defp ensure_event_absent(config, node, event_id) do
filter = %{
"kinds" => [5001],
"#r" => [config.resource],
"ids" => [event_id],
"limit" => 1
}
case query_events(node, config.client_private_key, filter) do
{:ok, []} ->
:ok
{:ok, events} when is_list(events) ->
if Enum.any?(events, &(&1["id"] == event_id)) do
{:error, {:unexpected_replication, event_id}}
else
:ok
end
{:error, reason} ->
{:error, {:event_absence_query_failed, reason}}
end
end
defp publish_phase_event(config, node, phase) do defp publish_phase_event(config, node, phase) do
event = event =
%{ %{
@@ -428,7 +579,8 @@ defmodule NodeSyncE2E.Runner do
[filter], [filter],
[], [],
false, false,
nil nil,
0
) )
after after
RelayClient.close(client) RelayClient.close(client)
@@ -444,26 +596,32 @@ defmodule NodeSyncE2E.Runner do
filters, filters,
events, events,
authenticated?, authenticated?,
auth_event_id auth_event_id,
auth_attempts
) do ) do
receive do receive do
{:node_sync_e2e_relay_client, ^client, :frame, ["AUTH", challenge]} -> {:node_sync_e2e_relay_client, ^client, :frame, ["AUTH", challenge]} ->
auth_event = if auth_attempts >= 5 do
auth_event(relay_auth_url, challenge) {:error, :too_many_auth_challenges}
|> sign_event!(private_key) else
auth_event =
auth_event(relay_auth_url, challenge)
|> sign_event!(private_key)
:ok = RelayClient.send_json(client, ["AUTH", auth_event]) :ok = RelayClient.send_json(client, ["AUTH", auth_event])
authenticated_query( authenticated_query(
client, client,
relay_auth_url, relay_auth_url,
private_key, private_key,
subscription_id, subscription_id,
filters, filters,
events, events,
authenticated?, authenticated?,
auth_event["id"] auth_event["id"],
) auth_attempts + 1
)
end
{:node_sync_e2e_relay_client, ^client, :frame, ["OK", event_id, true, _message]} {:node_sync_e2e_relay_client, ^client, :frame, ["OK", event_id, true, _message]}
when event_id == auth_event_id -> when event_id == auth_event_id ->
@@ -477,7 +635,8 @@ defmodule NodeSyncE2E.Runner do
filters, filters,
events, events,
true, true,
nil nil,
auth_attempts
) )
{:node_sync_e2e_relay_client, ^client, :frame, ["OK", event_id, false, message]} {:node_sync_e2e_relay_client, ^client, :frame, ["OK", event_id, false, message]}
@@ -493,7 +652,8 @@ defmodule NodeSyncE2E.Runner do
filters, filters,
[event | events], [event | events],
authenticated?, authenticated?,
auth_event_id auth_event_id,
auth_attempts
) )
{:node_sync_e2e_relay_client, ^client, :frame, ["EOSE", ^subscription_id]} -> {:node_sync_e2e_relay_client, ^client, :frame, ["EOSE", ^subscription_id]} ->
@@ -514,7 +674,8 @@ defmodule NodeSyncE2E.Runner do
filters, filters,
events, events,
authenticated?, authenticated?,
auth_event_id auth_event_id,
auth_attempts
) )
true -> true ->
@@ -838,9 +999,12 @@ defmodule NodeSyncE2E.Runner do
defp format_reason({:missing_state_value, key}), defp format_reason({:missing_state_value, key}),
do: "state file is missing #{key}" do: "state file is missing #{key}"
defp format_reason({:unexpected_replication, event_id}),
do: "event #{event_id} should not have replicated but was found"
defp format_reason(:missing_command), defp format_reason(:missing_command),
do: do:
"usage: elixir scripts/node_sync_e2e.exs <bootstrap|publish-resume|verify-resume> --state-file <path>" "usage: elixir scripts/node_sync_e2e.exs <bootstrap|publish-resume|verify-resume|filter-selectivity|sync-stop-restart|bidirectional-sync> --state-file <path>"
defp format_reason(:missing_state_file), defp format_reason(:missing_state_file),
do: "--state-file is required" do: "--state-file is required"

View File

@@ -224,4 +224,8 @@ start_node \
wait_for_health "$NODE_B_PORT" "Node B" wait_for_health "$NODE_B_PORT" "Node B"
run_runner verify-resume run_runner verify-resume
run_runner filter-selectivity
run_runner sync-stop-restart
run_runner bidirectional-sync
printf 'node-sync-e2e local run completed\nlogs: %s\n' "$LOG_DIR" printf 'node-sync-e2e local run completed\nlogs: %s\n' "$LOG_DIR"

View File

@@ -11,6 +11,7 @@ import {
MarmotClient, MarmotClient,
KeyPackageStore, KeyPackageStore,
KeyValueGroupStateBackend, KeyValueGroupStateBackend,
Proposals,
createKeyPackageRelayListEvent, createKeyPackageRelayListEvent,
deserializeApplicationData, deserializeApplicationData,
extractMarmotGroupData, extractMarmotGroupData,
@@ -142,6 +143,129 @@ function createClient(account, network) {
}); });
} }
/**
* Bootstraps a group with one admin and `count` invited members.
*/
async function createGroupWithMembers(network, count) {
const adminAccount = PrivateKeyAccount.generateNew();
const adminPubkey = await adminAccount.signer.getPublicKey();
const adminClient = createClient(adminAccount, network);
const adminRelayList = await adminAccount.signer.signEvent(
createKeyPackageRelayListEvent({
pubkey: adminPubkey,
relays: [relayWsUrl],
client: "parrhesia-marmot-e2e",
}),
);
await network.publish([relayWsUrl], adminRelayList);
const memberInfos = [];
for (let i = 0; i < count; i++) {
const account = PrivateKeyAccount.generateNew();
const pubkey = await account.signer.getPublicKey();
const client = createClient(account, network);
const relayList = await account.signer.signEvent(
createKeyPackageRelayListEvent({
pubkey,
relays: [relayWsUrl],
client: "parrhesia-marmot-e2e",
}),
);
await network.publish([relayWsUrl], relayList);
await client.keyPackages.create({
relays: [relayWsUrl],
client: "parrhesia-marmot-e2e",
});
memberInfos.push({ account, pubkey, client });
}
const adminGroup = await adminClient.createGroup(
`E2E Group ${randomUUID()}`,
{ relays: [relayWsUrl], adminPubkeys: [adminPubkey] },
);
const members = [];
for (const mi of memberInfos) {
const keyPackageEvents = await network.request([relayWsUrl], [
{ kinds: [KEY_PACKAGE_KIND], authors: [mi.pubkey], limit: 1 },
]);
assert.equal(keyPackageEvents.length, 1, `key package missing for ${mi.pubkey}`);
await adminGroup.inviteByKeyPackageEvent(keyPackageEvents[0]);
const giftWraps = await requestGiftWrapsWithAuth({
relayUrl: relayWsUrl,
relayHttpUrl,
signer: mi.account.signer,
recipientPubkey: mi.pubkey,
});
assert.ok(giftWraps.length >= 1, `gift wrap missing for ${mi.pubkey}`);
const inviteReader = new InviteReader({
signer: mi.account.signer,
store: {
received: new MemoryBackend(),
unread: new MemoryBackend(),
seen: new MemoryBackend(),
},
});
await inviteReader.ingestEvents(giftWraps);
const invites = await inviteReader.decryptGiftWraps();
assert.equal(invites.length, 1);
const { group } = await mi.client.joinGroupFromWelcome({
welcomeRumor: invites[0],
});
members.push({ account: mi.account, pubkey: mi.pubkey, client: mi.client, group });
}
return {
admin: { account: adminAccount, pubkey: adminPubkey, client: adminClient, group: adminGroup },
members,
};
}
function getNostrGroupId(group) {
const data = extractMarmotGroupData(group.state);
assert.ok(data, "MarmotGroupData should exist on group");
return bytesToHex(data.nostrGroupId);
}
async function fetchGroupEvents(network, group) {
const nostrGroupId = getNostrGroupId(group);
return network.request([relayWsUrl], [
{ kinds: [GROUP_EVENT_KIND], "#h": [nostrGroupId], limit: 100 },
]);
}
async function countEvents(relayUrl, filters) {
const relay = await openRelay(relayUrl, 5_000);
const subId = randomSubId("count");
const filtersArray = Array.isArray(filters) ? filters : [filters];
try {
relay.send(["COUNT", subId, ...filtersArray]);
while (true) {
const frame = await relay.nextFrame(5_000);
if (!Array.isArray(frame)) continue;
if (frame[0] === "COUNT" && frame[1] === subId) {
return frame[2];
}
if (frame[0] === "CLOSED" && frame[1] === subId) {
throw new Error(`COUNT closed: ${frame[2]}`);
}
}
} finally {
await relay.close();
}
}
function computeEventId(event) { function computeEventId(event) {
const payload = [ const payload = [
0, 0,
@@ -603,3 +727,230 @@ test("admin invites user, user joins, and message round-trip decrypts", async ()
"admin should decrypt invitee application message", "admin should decrypt invitee application message",
); );
}); });
test("sendChatMessage convenience API works", async () => {
const network = new RelayNetwork(relayWsUrl, relayHttpUrl);
const { admin, members } = await createGroupWithMembers(network, 1);
const invitee = members[0];
const content = `chat-msg-${randomUUID()}`;
await invitee.group.sendChatMessage(content);
const groupEvents = await fetchGroupEvents(network, admin.group);
const decrypted = [];
for await (const result of admin.group.ingest(groupEvents)) {
if (result.kind === "processed" && result.result.kind === "applicationMessage") {
decrypted.push(deserializeApplicationData(result.result.message));
}
}
assert.ok(
decrypted.some((r) => r.content === content && r.kind === 9),
"admin should decrypt kind 9 chat message from invitee",
);
});
test("admin removes member from group", async () => {
const network = new RelayNetwork(relayWsUrl, relayHttpUrl);
const { admin, members } = await createGroupWithMembers(network, 2);
const [user1, user2] = members;
// proposeRemoveUser returns ProposalRemove[] — use propose() which handles arrays
await admin.group.propose(Proposals.proposeRemoveUser(user2.pubkey));
await admin.group.commit();
// user1 ingests the removal commit
const groupEvents = await fetchGroupEvents(network, admin.group);
let user1Processed = false;
for await (const result of user1.group.ingest(groupEvents)) {
if (result.kind === "processed" && result.result.kind === "newState") {
user1Processed = true;
}
}
assert.ok(user1Processed, "user1 should process the removal commit");
// user1 can still send a message that admin decrypts
const msg = `post-removal-${randomUUID()}`;
await user1.group.sendChatMessage(msg);
const updatedEvents = await fetchGroupEvents(network, admin.group);
const decrypted = [];
for await (const result of admin.group.ingest(updatedEvents)) {
if (result.kind === "processed" && result.result.kind === "applicationMessage") {
decrypted.push(deserializeApplicationData(result.result.message));
}
}
assert.ok(
decrypted.some((r) => r.content === msg),
"admin should decrypt message from user1 after user2 removal",
);
});
test("group metadata update via proposal", async () => {
const network = new RelayNetwork(relayWsUrl, relayHttpUrl);
const { admin, members } = await createGroupWithMembers(network, 1);
const invitee = members[0];
const updatedName = `Updated-${randomUUID()}`;
await admin.group.commit({
extraProposals: [Proposals.proposeUpdateMetadata({ name: updatedName })],
});
// Admin sees the updated name locally
const adminGroupData = extractMarmotGroupData(admin.group.state);
assert.equal(adminGroupData.name, updatedName);
// Invitee ingests the commit and sees updated metadata
const groupEvents = await fetchGroupEvents(network, admin.group);
for await (const _result of invitee.group.ingest(groupEvents)) {
// drain ingest
}
const inviteeGroupData = extractMarmotGroupData(invitee.group.state);
assert.equal(inviteeGroupData.name, updatedName, "invitee should see updated group name");
});
test("member self-update rotates leaf keys (forward secrecy)", async () => {
const network = new RelayNetwork(relayWsUrl, relayHttpUrl);
const { admin, members } = await createGroupWithMembers(network, 1);
const invitee = members[0];
const epochBefore = admin.group.state.groupContext.epoch;
await invitee.group.selfUpdate();
// Admin ingests the self-update commit
const groupEvents = await fetchGroupEvents(network, admin.group);
for await (const result of admin.group.ingest(groupEvents)) {
// drain
}
const epochAfter = admin.group.state.groupContext.epoch;
assert.ok(epochAfter > epochBefore, "admin epoch should advance after self-update commit");
// Both parties can still exchange messages after key rotation
const msg = `after-selfupdate-${randomUUID()}`;
await invitee.group.sendChatMessage(msg);
const updatedEvents = await fetchGroupEvents(network, admin.group);
const decrypted = [];
for await (const result of admin.group.ingest(updatedEvents)) {
if (result.kind === "processed" && result.result.kind === "applicationMessage") {
decrypted.push(deserializeApplicationData(result.result.message));
}
}
assert.ok(
decrypted.some((r) => r.content === msg),
"admin should decrypt message sent after self-update",
);
});
test("member leaves group voluntarily", async () => {
const network = new RelayNetwork(relayWsUrl, relayHttpUrl);
const { admin, members } = await createGroupWithMembers(network, 2);
const [stayer, leaver] = members;
// Leaver publishes self-remove proposals and destroys local state
await leaver.group.leave();
// Admin ingests the leave proposals (they become unapplied)
const groupEventsForAdmin = await fetchGroupEvents(network, admin.group);
for await (const _result of admin.group.ingest(groupEventsForAdmin)) {
// drain
}
// Admin commits all unapplied proposals (the leave removals)
await admin.group.commit();
// Stayer ingests everything and the group continues
const allEvents = await fetchGroupEvents(network, admin.group);
for await (const _result of stayer.group.ingest(allEvents)) {
// drain
}
const msgAfterLeave = `after-leave-${randomUUID()}`;
await stayer.group.sendChatMessage(msgAfterLeave);
const latestEvents = await fetchGroupEvents(network, admin.group);
const decrypted = [];
for await (const result of admin.group.ingest(latestEvents)) {
if (result.kind === "processed" && result.result.kind === "applicationMessage") {
decrypted.push(deserializeApplicationData(result.result.message));
}
}
assert.ok(
decrypted.some((r) => r.content === msgAfterLeave),
"group should continue functioning after member departure",
);
});
test("NIP-45 COUNT returns accurate event count", async () => {
const account = PrivateKeyAccount.generateNew();
const pubkey = await account.signer.getPublicKey();
const tag = `count-test-${randomUUID()}`;
for (let i = 0; i < 3; i++) {
const event = {
kind: 1,
pubkey,
created_at: unixNow() + i,
tags: [["t", tag]],
content: `count-event-${i}`,
};
event.id = computeEventId(event);
const signed = await account.signer.signEvent(event);
const frame = await publishEvent(relayWsUrl, signed);
assert.equal(frame[2], true, `publish of event ${i} failed: ${frame[3]}`);
}
const result = await countEvents(relayWsUrl, { kinds: [1], "#t": [tag] });
assert.equal(result.count, 3, "COUNT should report 3 events");
});
test("NIP-9 event deletion removes event from relay", async () => {
const account = PrivateKeyAccount.generateNew();
const pubkey = await account.signer.getPublicKey();
const original = {
kind: 1,
pubkey,
created_at: unixNow(),
tags: [],
content: `to-be-deleted-${randomUUID()}`,
};
original.id = computeEventId(original);
const signedOriginal = await account.signer.signEvent(original);
const publishFrame = await publishEvent(relayWsUrl, signedOriginal);
assert.equal(publishFrame[2], true, `original publish failed: ${publishFrame[3]}`);
// Verify it exists
const beforeDelete = await requestEvents(relayWsUrl, [{ ids: [signedOriginal.id] }]);
assert.equal(beforeDelete.length, 1, "event should exist before deletion");
// Publish kind 5 deletion referencing the original
const deletion = {
kind: 5,
pubkey,
created_at: unixNow(),
tags: [["e", signedOriginal.id]],
content: "",
};
deletion.id = computeEventId(deletion);
const signedDeletion = await account.signer.signEvent(deletion);
const deleteFrame = await publishEvent(relayWsUrl, signedDeletion);
assert.equal(deleteFrame[2], true, `deletion publish failed: ${deleteFrame[3]}`);
// Query again -- the original should be gone
const afterDelete = await requestEvents(relayWsUrl, [{ ids: [signedOriginal.id] }]);
assert.equal(afterDelete.length, 0, "event should be deleted after kind 5 request");
});