diff --git a/scripts/node_sync_e2e.exs b/scripts/node_sync_e2e.exs index 654553d..e753eec 100644 --- a/scripts/node_sync_e2e.exs +++ b/scripts/node_sync_e2e.exs @@ -2,12 +2,14 @@ defmodule NodeSyncE2E.RelayClient do use WebSockex def start_link(url, owner, opts \\ []) do - WebSockex.start_link( - url, - __MODULE__, - owner, - Keyword.put(opts, :handle_initial_conn_failure, true) - ) + ws_opts = + opts + |> Keyword.put_new(:handle_initial_conn_failure, true) + |> Keyword.put_new(:async, true) + |> Keyword.put_new(:socket_connect_timeout, 2_000) + |> Keyword.put_new(:socket_recv_timeout, 2_000) + + WebSockex.start_link(url, __MODULE__, owner, ws_opts) end def send_json(pid, payload) do @@ -163,6 +165,84 @@ defmodule NodeSyncE2E.Runner do end end + defp dispatch("filter-selectivity", config, opts) do + with {:ok, state_file} <- fetch_state_file(opts), + state = load_state(state_file), + :ok <- ensure_run_matches(config, state), + :ok <- ensure_nodes_ready(config), + :ok <- wait_for_sync_connected(config, config.node_b, config.server_id), + {:ok, non_matching_event} <- publish_non_matching_event(config, config.node_a), + _ = Process.sleep(2_000), + :ok <- ensure_event_absent(config, config.node_b, non_matching_event["id"]), + :ok <- + save_state(state_file, %{ + "run_id" => config.run_id, + "resource" => config.resource, + "server_id" => config.server_id, + "non_matching_event_id" => non_matching_event["id"] + }) do + :ok + end + end + + defp dispatch("sync-stop-restart", config, opts) do + with {:ok, state_file} <- fetch_state_file(opts), + state = load_state(state_file), + :ok <- ensure_run_matches(config, state), + :ok <- ensure_nodes_ready(config), + {:ok, %{"ok" => true}} <- + management_call(config, config.node_b, "sync_stop_server", %{"id" => config.server_id}), + {:ok, while_stopped_event} <- + publish_phase_event(config, config.node_a, "while-stopped"), + _ = Process.sleep(2_000), + :ok <- ensure_event_absent(config, config.node_b, while_stopped_event["id"]), + {:ok, %{"ok" => true}} <- + management_call(config, config.node_b, "sync_start_server", %{ + "id" => config.server_id + }), + :ok <- wait_for_sync_connected(config, config.node_b, config.server_id), + :ok <- wait_for_event(config, config.node_b, while_stopped_event["id"]), + :ok <- + save_state(state_file, %{ + "run_id" => config.run_id, + "resource" => config.resource, + "server_id" => config.server_id, + "while_stopped_event_id" => while_stopped_event["id"] + }) do + :ok + end + end + + defp dispatch("bidirectional-sync", config, opts) do + with {:ok, state_file} <- fetch_state_file(opts), + state = load_state(state_file), + :ok <- ensure_run_matches(config, state), + {:ok, node_a_pubkey} <- fetch_state_value(state, "node_a_pubkey"), + {:ok, node_b_pubkey} <- fetch_state_value(state, "node_b_pubkey"), + :ok <- ensure_nodes_ready(config), + :ok <- ensure_acl(config, config.node_b, node_a_pubkey, "sync_read", config.filter), + :ok <- + ensure_acl(config, config.node_b, config.client_pubkey, "sync_write", config.filter), + :ok <- ensure_acl(config, config.node_a, node_b_pubkey, "sync_write", config.filter), + :ok <- + ensure_acl(config, config.node_a, config.client_pubkey, "sync_read", config.filter), + reverse_server_id = "node-b-upstream", + :ok <- configure_reverse_sync(config, node_b_pubkey, reverse_server_id), + :ok <- wait_for_sync_connected(config, config.node_a, reverse_server_id), + {:ok, bidir_event} <- publish_phase_event(config, config.node_b, "bidirectional"), + :ok <- wait_for_event(config, config.node_a, bidir_event["id"]), + :ok <- + save_state(state_file, %{ + "run_id" => config.run_id, + "resource" => config.resource, + "server_id" => config.server_id, + "reverse_server_id" => reverse_server_id, + "bidir_event_id" => bidir_event["id"] + }) do + :ok + end + end + defp dispatch(other, _config, _opts), do: {:error, {:unknown_command, other}} defp fetch_state_file(opts) do @@ -334,6 +414,77 @@ defmodule NodeSyncE2E.Runner do %{"mode" => "disabled", "pins" => []} end + defp configure_reverse_sync(config, node_b_pubkey, reverse_server_id) do + params = %{ + "id" => reverse_server_id, + "url" => config.node_b.sync_url, + "enabled?" => true, + "auth_pubkey" => node_b_pubkey, + "filters" => [config.filter], + "tls" => sync_tls_config(config.node_b.sync_url) + } + + with {:ok, _server} <- management_call(config, config.node_a, "sync_put_server", params), + {:ok, %{"ok" => true}} <- + management_call(config, config.node_a, "sync_start_server", %{ + "id" => reverse_server_id + }) do + :ok + end + end + + defp publish_non_matching_event(config, node) do + event = + %{ + "created_at" => System.system_time(:second), + "kind" => 5001, + "tags" => [ + ["r", config.resource], + ["t", @subsystem_tag], + ["run", config.run_id], + ["phase", "filter-selectivity"] + ], + "content" => "filter-selectivity:#{config.run_id}" + } + |> sign_event!(config.client_private_key) + + with {:ok, client} <- RelayClient.start_link(node.websocket_url, self()), + :ok <- await_client_connect(client) do + try do + case publish_event(client, node.relay_auth_url, config.client_private_key, event) do + :ok -> {:ok, event} + {:error, reason} -> {:error, reason} + end + after + RelayClient.close(client) + end + end + end + + defp ensure_event_absent(config, node, event_id) do + filter = %{ + "kinds" => [5001], + "#r" => [config.resource], + "ids" => [event_id], + "limit" => 1 + } + + case query_events(node, config.client_private_key, filter) do + {:ok, []} -> + :ok + + {:ok, events} when is_list(events) -> + if Enum.any?(events, &(&1["id"] == event_id)) do + {:error, {:unexpected_replication, event_id}} + else + :ok + end + + {:error, reason} -> + {:error, {:event_absence_query_failed, reason}} + end + end + defp publish_phase_event(config, node, phase) do event = %{ @@ -428,7 +579,8 @@ defmodule NodeSyncE2E.Runner do [filter], [], false, - nil + nil, + 0 ) after RelayClient.close(client) @@ -444,26 +596,32 @@ defmodule NodeSyncE2E.Runner do filters, events, authenticated?, - auth_event_id + auth_event_id, + auth_attempts ) do receive do {:node_sync_e2e_relay_client, ^client, :frame, ["AUTH", challenge]} -> - auth_event = - auth_event(relay_auth_url, challenge) - |> sign_event!(private_key) + if auth_attempts >= 5 do + {:error, :too_many_auth_challenges} + else + auth_event = + auth_event(relay_auth_url, challenge) + |> sign_event!(private_key) - :ok = RelayClient.send_json(client, ["AUTH", auth_event]) + :ok = RelayClient.send_json(client, ["AUTH", auth_event]) - authenticated_query( - client, - relay_auth_url, - private_key, - subscription_id, - filters, - events, - authenticated?, - auth_event["id"] - ) + authenticated_query( + client, + relay_auth_url, + private_key, + subscription_id, + filters, + events, + authenticated?, + auth_event["id"], + auth_attempts + 1 + ) + end {:node_sync_e2e_relay_client, ^client, :frame, ["OK", event_id, true, _message]} when event_id == auth_event_id -> @@ -477,7 +635,8 @@ defmodule NodeSyncE2E.Runner do filters, events, true, - nil + nil, + auth_attempts ) {:node_sync_e2e_relay_client, ^client, :frame, ["OK", event_id, false, message]} @@ -493,7 +652,8 @@ defmodule NodeSyncE2E.Runner do filters, [event | events], authenticated?, - auth_event_id + auth_event_id, + auth_attempts ) {:node_sync_e2e_relay_client, ^client, :frame, ["EOSE", ^subscription_id]} -> @@ -514,7 +674,8 @@ defmodule NodeSyncE2E.Runner do filters, events, authenticated?, - auth_event_id + auth_event_id, + auth_attempts ) true -> @@ -838,9 +999,12 @@ defmodule NodeSyncE2E.Runner do defp format_reason({:missing_state_value, key}), do: "state file is missing #{key}" + defp format_reason({:unexpected_replication, event_id}), + do: "event #{event_id} should not have replicated but was found" + defp format_reason(:missing_command), do: - "usage: elixir scripts/node_sync_e2e.exs --state-file " + "usage: elixir scripts/node_sync_e2e.exs --state-file " defp format_reason(:missing_state_file), do: "--state-file is required" diff --git a/scripts/run_node_sync_e2e.sh b/scripts/run_node_sync_e2e.sh index f34654c..65510ee 100755 --- a/scripts/run_node_sync_e2e.sh +++ b/scripts/run_node_sync_e2e.sh @@ -115,7 +115,7 @@ wait_for_health() { return fi - sleep 0.1 + sleep 0.2 done echo "${label} did not become healthy on port ${port}" >&2 @@ -224,4 +224,8 @@ start_node \ wait_for_health "$NODE_B_PORT" "Node B" run_runner verify-resume +run_runner filter-selectivity +run_runner sync-stop-restart +run_runner bidirectional-sync + printf 'node-sync-e2e local run completed\nlogs: %s\n' "$LOG_DIR" diff --git a/test/marmot_e2e/marmot_client_e2e.test.mjs b/test/marmot_e2e/marmot_client_e2e.test.mjs index 6350cdd..712c7f7 100644 --- a/test/marmot_e2e/marmot_client_e2e.test.mjs +++ b/test/marmot_e2e/marmot_client_e2e.test.mjs @@ -11,6 +11,7 @@ import { MarmotClient, KeyPackageStore, KeyValueGroupStateBackend, + Proposals, createKeyPackageRelayListEvent, deserializeApplicationData, extractMarmotGroupData, @@ -142,6 +143,129 @@ function createClient(account, network) { }); } +/** + * Bootstraps a group with one admin and `count` invited members. + */ +async function createGroupWithMembers(network, count) { + const adminAccount = PrivateKeyAccount.generateNew(); + const adminPubkey = await adminAccount.signer.getPublicKey(); + const adminClient = createClient(adminAccount, network); + + const adminRelayList = await adminAccount.signer.signEvent( + createKeyPackageRelayListEvent({ + pubkey: adminPubkey, + relays: [relayWsUrl], + client: "parrhesia-marmot-e2e", + }), + ); + await network.publish([relayWsUrl], adminRelayList); + + const memberInfos = []; + for (let i = 0; i < count; i++) { + const account = PrivateKeyAccount.generateNew(); + const pubkey = await account.signer.getPublicKey(); + const client = createClient(account, network); + + const relayList = await account.signer.signEvent( + createKeyPackageRelayListEvent({ + pubkey, + relays: [relayWsUrl], + client: "parrhesia-marmot-e2e", + }), + ); + await network.publish([relayWsUrl], relayList); + await client.keyPackages.create({ + relays: [relayWsUrl], + client: "parrhesia-marmot-e2e", + }); + + memberInfos.push({ account, pubkey, client }); + } + + const adminGroup = await adminClient.createGroup( + `E2E Group ${randomUUID()}`, + { relays: [relayWsUrl], adminPubkeys: [adminPubkey] }, + ); + + const members = []; + for (const mi of memberInfos) { + const keyPackageEvents = await network.request([relayWsUrl], [ + { kinds: [KEY_PACKAGE_KIND], authors: [mi.pubkey], limit: 1 }, + ]); + assert.equal(keyPackageEvents.length, 1, `key package missing for ${mi.pubkey}`); + await adminGroup.inviteByKeyPackageEvent(keyPackageEvents[0]); + + const giftWraps = await requestGiftWrapsWithAuth({ + relayUrl: relayWsUrl, + relayHttpUrl, + signer: mi.account.signer, + recipientPubkey: mi.pubkey, + }); + assert.ok(giftWraps.length >= 1, `gift wrap missing for ${mi.pubkey}`); + + const inviteReader = new InviteReader({ + signer: mi.account.signer, + store: { + received: new MemoryBackend(), + unread: new MemoryBackend(), + seen: new MemoryBackend(), + }, + }); + await inviteReader.ingestEvents(giftWraps); + const invites = await inviteReader.decryptGiftWraps(); + assert.equal(invites.length, 1); + + const { group } = await mi.client.joinGroupFromWelcome({ + welcomeRumor: invites[0], + }); + + members.push({ account: mi.account, pubkey: mi.pubkey, client: mi.client, group }); + } + + return { + admin: { account: adminAccount, pubkey: adminPubkey, client: adminClient, group: adminGroup }, + members, + }; +} + +function getNostrGroupId(group) { + const data = extractMarmotGroupData(group.state); + assert.ok(data, "MarmotGroupData should exist on group"); + return bytesToHex(data.nostrGroupId); +} + +async function fetchGroupEvents(network, group) { + const nostrGroupId = getNostrGroupId(group); + return network.request([relayWsUrl], [ + { kinds: [GROUP_EVENT_KIND], "#h": [nostrGroupId], limit: 100 }, + ]); +} + +async function countEvents(relayUrl, filters) { + const relay = await openRelay(relayUrl, 5_000); + const subId = randomSubId("count"); + const filtersArray = Array.isArray(filters) ? filters : [filters]; + + try { + relay.send(["COUNT", subId, ...filtersArray]); + + while (true) { + const frame = await relay.nextFrame(5_000); + if (!Array.isArray(frame)) continue; + + if (frame[0] === "COUNT" && frame[1] === subId) { + return frame[2]; + } + + if (frame[0] === "CLOSED" && frame[1] === subId) { + throw new Error(`COUNT closed: ${frame[2]}`); + } + } + } finally { + await relay.close(); + } +} + function computeEventId(event) { const payload = [ 0, @@ -603,3 +727,230 @@ test("admin invites user, user joins, and message round-trip decrypts", async () "admin should decrypt invitee application message", ); }); + +test("sendChatMessage convenience API works", async () => { + const network = new RelayNetwork(relayWsUrl, relayHttpUrl); + const { admin, members } = await createGroupWithMembers(network, 1); + const invitee = members[0]; + + const content = `chat-msg-${randomUUID()}`; + await invitee.group.sendChatMessage(content); + + const groupEvents = await fetchGroupEvents(network, admin.group); + + const decrypted = []; + for await (const result of admin.group.ingest(groupEvents)) { + if (result.kind === "processed" && result.result.kind === "applicationMessage") { + decrypted.push(deserializeApplicationData(result.result.message)); + } + } + + assert.ok( + decrypted.some((r) => r.content === content && r.kind === 9), + "admin should decrypt kind 9 chat message from invitee", + ); +}); + +test("admin removes member from group", async () => { + const network = new RelayNetwork(relayWsUrl, relayHttpUrl); + const { admin, members } = await createGroupWithMembers(network, 2); + const [user1, user2] = members; + + // proposeRemoveUser returns ProposalRemove[] — use propose() which handles arrays + await admin.group.propose(Proposals.proposeRemoveUser(user2.pubkey)); + await admin.group.commit(); + + // user1 ingests the removal commit + const groupEvents = await fetchGroupEvents(network, admin.group); + + let user1Processed = false; + for await (const result of user1.group.ingest(groupEvents)) { + if (result.kind === "processed" && result.result.kind === "newState") { + user1Processed = true; + } + } + assert.ok(user1Processed, "user1 should process the removal commit"); + + // user1 can still send a message that admin decrypts + const msg = `post-removal-${randomUUID()}`; + await user1.group.sendChatMessage(msg); + + const updatedEvents = await fetchGroupEvents(network, admin.group); + + const decrypted = []; + for await (const result of admin.group.ingest(updatedEvents)) { + if (result.kind === "processed" && result.result.kind === "applicationMessage") { + decrypted.push(deserializeApplicationData(result.result.message)); + } + } + + assert.ok( + decrypted.some((r) => r.content === msg), + "admin should decrypt message from user1 after user2 removal", + ); +}); + +test("group metadata update via proposal", async () => { + const network = new RelayNetwork(relayWsUrl, relayHttpUrl); + const { admin, members } = await createGroupWithMembers(network, 1); + const invitee = members[0]; + + const updatedName = `Updated-${randomUUID()}`; + await admin.group.commit({ + extraProposals: [Proposals.proposeUpdateMetadata({ name: updatedName })], + }); + + // Admin sees the updated name locally + const adminGroupData = extractMarmotGroupData(admin.group.state); + assert.equal(adminGroupData.name, updatedName); + + // Invitee ingests the commit and sees updated metadata + const groupEvents = await fetchGroupEvents(network, admin.group); + + for await (const _result of invitee.group.ingest(groupEvents)) { + // drain ingest + } + + const inviteeGroupData = extractMarmotGroupData(invitee.group.state); + assert.equal(inviteeGroupData.name, updatedName, "invitee should see updated group name"); +}); + +test("member self-update rotates leaf keys (forward secrecy)", async () => { + const network = new RelayNetwork(relayWsUrl, relayHttpUrl); + const { admin, members } = await createGroupWithMembers(network, 1); + const invitee = members[0]; + + const epochBefore = admin.group.state.groupContext.epoch; + + await invitee.group.selfUpdate(); + + // Admin ingests the self-update commit + const groupEvents = await fetchGroupEvents(network, admin.group); + + for await (const result of admin.group.ingest(groupEvents)) { + // drain + } + + const epochAfter = admin.group.state.groupContext.epoch; + assert.ok(epochAfter > epochBefore, "admin epoch should advance after self-update commit"); + + // Both parties can still exchange messages after key rotation + const msg = `after-selfupdate-${randomUUID()}`; + await invitee.group.sendChatMessage(msg); + + const updatedEvents = await fetchGroupEvents(network, admin.group); + + const decrypted = []; + for await (const result of admin.group.ingest(updatedEvents)) { + if (result.kind === "processed" && result.result.kind === "applicationMessage") { + decrypted.push(deserializeApplicationData(result.result.message)); + } + } + + assert.ok( + decrypted.some((r) => r.content === msg), + "admin should decrypt message sent after self-update", + ); +}); + +test("member leaves group voluntarily", async () => { + const network = new RelayNetwork(relayWsUrl, relayHttpUrl); + const { admin, members } = await createGroupWithMembers(network, 2); + const [stayer, leaver] = members; + + // Leaver publishes self-remove proposals and destroys local state + await leaver.group.leave(); + + // Admin ingests the leave proposals (they become unapplied) + const groupEventsForAdmin = await fetchGroupEvents(network, admin.group); + for await (const _result of admin.group.ingest(groupEventsForAdmin)) { + // drain + } + + // Admin commits all unapplied proposals (the leave removals) + await admin.group.commit(); + + // Stayer ingests everything and the group continues + const allEvents = await fetchGroupEvents(network, admin.group); + for await (const _result of stayer.group.ingest(allEvents)) { + // drain + } + + const msgAfterLeave = `after-leave-${randomUUID()}`; + await stayer.group.sendChatMessage(msgAfterLeave); + + const latestEvents = await fetchGroupEvents(network, admin.group); + + const decrypted = []; + for await (const result of admin.group.ingest(latestEvents)) { + if (result.kind === "processed" && result.result.kind === "applicationMessage") { + decrypted.push(deserializeApplicationData(result.result.message)); + } + } + + assert.ok( + decrypted.some((r) => r.content === msgAfterLeave), + "group should continue functioning after member departure", + ); +}); + +test("NIP-45 COUNT returns accurate event count", async () => { + const account = PrivateKeyAccount.generateNew(); + const pubkey = await account.signer.getPublicKey(); + const tag = `count-test-${randomUUID()}`; + + for (let i = 0; i < 3; i++) { + const event = { + kind: 1, + pubkey, + created_at: unixNow() + i, + tags: [["t", tag]], + content: `count-event-${i}`, + }; + event.id = computeEventId(event); + const signed = await account.signer.signEvent(event); + const frame = await publishEvent(relayWsUrl, signed); + assert.equal(frame[2], true, `publish of event ${i} failed: ${frame[3]}`); + } + + const result = await countEvents(relayWsUrl, { kinds: [1], "#t": [tag] }); + assert.equal(result.count, 3, "COUNT should report 3 events"); +}); + +test("NIP-9 event deletion removes event from relay", async () => { + const account = PrivateKeyAccount.generateNew(); + const pubkey = await account.signer.getPublicKey(); + + const original = { + kind: 1, + pubkey, + created_at: unixNow(), + tags: [], + content: `to-be-deleted-${randomUUID()}`, + }; + original.id = computeEventId(original); + const signedOriginal = await account.signer.signEvent(original); + const publishFrame = await publishEvent(relayWsUrl, signedOriginal); + assert.equal(publishFrame[2], true, `original publish failed: ${publishFrame[3]}`); + + // Verify it exists + const beforeDelete = await requestEvents(relayWsUrl, [{ ids: [signedOriginal.id] }]); + assert.equal(beforeDelete.length, 1, "event should exist before deletion"); + + // Publish kind 5 deletion referencing the original + const deletion = { + kind: 5, + pubkey, + created_at: unixNow(), + tags: [["e", signedOriginal.id]], + content: "", + }; + deletion.id = computeEventId(deletion); + const signedDeletion = await account.signer.signEvent(deletion); + const deleteFrame = await publishEvent(relayWsUrl, signedDeletion); + assert.equal(deleteFrame[2], true, `deletion publish failed: ${deleteFrame[3]}`); + + // Query again -- the original should be gone + const afterDelete = await requestEvents(relayWsUrl, [{ ids: [signedOriginal.id] }]); + assert.equal(afterDelete.length, 0, "event should be deleted after kind 5 request"); +});