diff --git a/scripts/cloud_bench_client.sh b/scripts/cloud_bench_client.sh index 3319e25..31e7d99 100755 --- a/scripts/cloud_bench_client.sh +++ b/scripts/cloud_bench_client.sh @@ -5,7 +5,7 @@ relay_url="${1:-}" mode="${2:-all}" if [[ -z "$relay_url" ]]; then - echo "usage: cloud-bench-client.sh [connect|echo|event|req|all]" >&2 + echo "usage: cloud-bench-client.sh [connect|echo|event|req|seed|all]" >&2 exit 1 fi @@ -57,11 +57,33 @@ run_req() { "${relay_url}" } +run_seed() { + local target_accepted="${PARRHESIA_BENCH_SEED_TARGET_ACCEPTED:-}" + + if [[ -z "$target_accepted" ]]; then + echo "PARRHESIA_BENCH_SEED_TARGET_ACCEPTED must be set for seed mode" >&2 + exit 2 + fi + + echo "==> nostr-bench seed ${relay_url}" + "$bench_bin" seed --json \ + --target-accepted "$target_accepted" \ + -c "${PARRHESIA_BENCH_SEED_CONNECTION_COUNT:-64}" \ + -r "${PARRHESIA_BENCH_SEED_CONNECTION_RATE:-64}" \ + -k "${PARRHESIA_BENCH_SEED_KEEPALIVE_SECONDS:-0}" \ + -t "${bench_threads}" \ + --send-strategy "${PARRHESIA_BENCH_SEED_SEND_STRATEGY:-ack-loop}" \ + --inflight "${PARRHESIA_BENCH_SEED_INFLIGHT:-32}" \ + --ack-timeout "${PARRHESIA_BENCH_SEED_ACK_TIMEOUT:-30}" \ + "${relay_url}" +} + case "$mode" in connect) run_connect ;; echo) run_echo ;; event) run_event ;; req) run_req ;; + seed) run_seed ;; all) run_connect; echo; run_echo; echo; run_event; echo; run_req ;; *) echo "unknown mode: $mode" >&2; exit 1 ;; esac diff --git a/scripts/cloud_bench_orchestrate.mjs b/scripts/cloud_bench_orchestrate.mjs index 8dcc443..f9aa939 100755 --- a/scripts/cloud_bench_orchestrate.mjs +++ b/scripts/cloud_bench_orchestrate.mjs @@ -38,10 +38,14 @@ const BENCH_BUILD_DIR = path.join(ROOT_DIR, "_build", "bench"); const NOSTR_BENCH_SUBMODULE_DIR = path.join(ROOT_DIR, "nix", "nostr-bench"); const NOSTREAM_REDIS_IMAGE = "redis:7.0.5-alpine3.16"; const SEED_TOLERANCE_RATIO = 0.01; -const SEED_MAX_ROUNDS = 8; -const SEED_ROUND_DEFICIT_RATIO = 0.2; -const SEED_KEEPALIVE_SECONDS = 10; -const SEED_EVENTS_PER_CONNECTION_FALLBACK = 3; +const SEED_MAX_ROUNDS = 4; +const SEED_KEEPALIVE_SECONDS = 0; +const SEED_EVENTS_PER_CONNECTION_TARGET = 2000; +const SEED_CONNECTIONS_MIN = 1; +const SEED_CONNECTIONS_MAX = 512; +const SEED_SEND_STRATEGY = "ack-loop"; +const SEED_INFLIGHT = 32; +const SEED_ACK_TIMEOUT_SECONDS = 30; const PHASE_PREP_OFFSET_MINUTES = 3; const DEFAULTS = { @@ -66,8 +70,8 @@ const DEFAULTS = { quick: false, monitoring: true, yes: false, - warmEvents: 25000, - hotEvents: 250000, + warmEvents: 50000, + hotEvents: 500000, bench: { connectCount: 3000, connectRate: 1500, @@ -950,6 +954,28 @@ function splitCountAcrossClients(total, clients) { return Array.from({ length: clients }, (_, i) => base + (i < remainder ? 1 : 0)); } +function extractSeedAccepted(parsed) { + const finalAccepted = Number(parsed?.seed_final?.accepted); + if (Number.isFinite(finalAccepted) && finalAccepted >= 0) { + return Math.floor(finalAccepted); + } + + const sectionAccepted = Number(parsed?.seed?.message_stats?.accepted); + if (Number.isFinite(sectionAccepted) && sectionAccepted >= 0) { + return Math.floor(sectionAccepted); + } + + return 0; +} + +function extractSeedTargetReached(parsed, desiredAccepted) { + if (typeof parsed?.seed_final?.target_reached === "boolean") { + return parsed.seed_final.target_reached; + } + + return extractSeedAccepted(parsed) >= desiredAccepted; +} + async function runClientSeedingRound({ target, phase, @@ -960,8 +986,6 @@ async function runClientSeedingRound({ relayUrl, artifactDir, threads, - seedEventsPerConnection, - seedKeepaliveSeconds, }) { const benchThreads = Number.isInteger(threads) && threads >= 0 ? threads : 0; const clientsForRound = clientInfos.slice(0, Math.min(clientInfos.length, deficit)); @@ -971,38 +995,39 @@ async function runClientSeedingRound({ const seedResults = await Promise.all( clientsForRound.map(async (client, idx) => { - const desiredEvents = shares[idx] || 0; + const desiredAccepted = shares[idx] || 0; const stdoutPath = path.join(roundDir, `${client.name}.stdout.log`); const stderrPath = path.join(roundDir, `${client.name}.stderr.log`); - if (desiredEvents <= 0) { + if (desiredAccepted <= 0) { fs.writeFileSync(stdoutPath, "", "utf8"); fs.writeFileSync(stderrPath, "", "utf8"); return { client_name: client.name, client_ip: client.ip, status: "skipped", - desired_events: desiredEvents, - projected_events: 0, - acked: 0, + desired_accepted: desiredAccepted, + accepted: 0, + target_reached: true, stdout_path: path.relative(ROOT_DIR, stdoutPath), stderr_path: path.relative(ROOT_DIR, stderrPath), }; } - const eventsPerConnection = Math.max( - 0.1, - Number(seedEventsPerConnection) || SEED_EVENTS_PER_CONNECTION_FALLBACK, + const seedConnectionCount = Math.min( + SEED_CONNECTIONS_MAX, + Math.max(SEED_CONNECTIONS_MIN, Math.ceil(desiredAccepted / SEED_EVENTS_PER_CONNECTION_TARGET)), ); - const eventConnections = Math.max(1, Math.ceil(desiredEvents / eventsPerConnection)); - const eventKeepalive = Math.max(2, Number(seedKeepaliveSeconds) || SEED_KEEPALIVE_SECONDS); - const eventRate = Math.max(1, Math.ceil(eventConnections / eventKeepalive)); - const projectedEvents = Math.round(eventConnections * eventsPerConnection); + const seedConnectionRate = Math.max(1, seedConnectionCount); const seedEnvPrefix = [ - `PARRHESIA_BENCH_EVENT_COUNT=${eventConnections}`, - `PARRHESIA_BENCH_EVENT_RATE=${eventRate}`, - `PARRHESIA_BENCH_KEEPALIVE_SECONDS=${eventKeepalive}`, + `PARRHESIA_BENCH_SEED_TARGET_ACCEPTED=${desiredAccepted}`, + `PARRHESIA_BENCH_SEED_CONNECTION_COUNT=${seedConnectionCount}`, + `PARRHESIA_BENCH_SEED_CONNECTION_RATE=${seedConnectionRate}`, + `PARRHESIA_BENCH_SEED_KEEPALIVE_SECONDS=${SEED_KEEPALIVE_SECONDS}`, + `PARRHESIA_BENCH_SEED_SEND_STRATEGY=${SEED_SEND_STRATEGY}`, + `PARRHESIA_BENCH_SEED_INFLIGHT=${SEED_INFLIGHT}`, + `PARRHESIA_BENCH_SEED_ACK_TIMEOUT=${SEED_ACK_TIMEOUT_SECONDS}`, `PARRHESIA_BENCH_THREADS=${benchThreads}`, ].join(" "); @@ -1010,30 +1035,29 @@ async function runClientSeedingRound({ const benchRes = await sshExec( client.ip, keyPath, - `${seedEnvPrefix} /root/cloud-bench-client.sh ${shellEscape(relayUrl)} event`, + `${seedEnvPrefix} /root/cloud-bench-client.sh ${shellEscape(relayUrl)} seed`, ); fs.writeFileSync(stdoutPath, benchRes.stdout, "utf8"); fs.writeFileSync(stderrPath, benchRes.stderr, "utf8"); const parsed = parseNostrBenchSections(benchRes.stdout); - const complete = Number(parsed?.event?.message_stats?.complete) || 0; - const error = Number(parsed?.event?.message_stats?.error) || 0; - const acked = Math.max(0, complete - error); + const accepted = extractSeedAccepted(parsed); + const targetReached = extractSeedTargetReached(parsed, desiredAccepted); return { client_name: client.name, client_ip: client.ip, - status: "ok", - desired_events: desiredEvents, - projected_events: projectedEvents, - event_connections: eventConnections, - event_rate: eventRate, - event_keepalive_seconds: eventKeepalive, - events_per_connection_estimate: eventsPerConnection, - event_complete: complete, - event_error: error, - acked, + status: targetReached ? "ok" : "partial", + desired_accepted: desiredAccepted, + accepted, + target_reached: targetReached, + seed_connection_count: seedConnectionCount, + seed_connection_rate: seedConnectionRate, + seed_keepalive_seconds: SEED_KEEPALIVE_SECONDS, + seed_send_strategy: SEED_SEND_STRATEGY, + seed_inflight: SEED_INFLIGHT, + seed_ack_timeout_seconds: SEED_ACK_TIMEOUT_SECONDS, stdout_path: path.relative(ROOT_DIR, stdoutPath), stderr_path: path.relative(ROOT_DIR, stderrPath), }; @@ -1043,17 +1067,25 @@ async function runClientSeedingRound({ fs.writeFileSync(stdoutPath, out, "utf8"); fs.writeFileSync(stderrPath, err, "utf8"); + const parsed = parseNostrBenchSections(out); + const accepted = extractSeedAccepted(parsed); + const targetReached = extractSeedTargetReached(parsed, desiredAccepted); + const targetNotReached = Number(error.code) === 2; + return { client_name: client.name, client_ip: client.ip, - status: "error", - desired_events: desiredEvents, - projected_events: projectedEvents, - event_connections: eventConnections, - event_rate: eventRate, - event_keepalive_seconds: eventKeepalive, - events_per_connection_estimate: eventsPerConnection, - acked: 0, + status: targetNotReached ? "partial" : "error", + desired_accepted: desiredAccepted, + accepted, + target_reached: targetReached, + seed_connection_count: seedConnectionCount, + seed_connection_rate: seedConnectionRate, + seed_keepalive_seconds: SEED_KEEPALIVE_SECONDS, + seed_send_strategy: SEED_SEND_STRATEGY, + seed_inflight: SEED_INFLIGHT, + seed_ack_timeout_seconds: SEED_ACK_TIMEOUT_SECONDS, + exit_code: Number(error.code) || null, stdout_path: path.relative(ROOT_DIR, stdoutPath), stderr_path: path.relative(ROOT_DIR, stderrPath), error: error.message || String(error), @@ -1069,14 +1101,12 @@ async function runClientSeedingRound({ ); } - const acked = seedResults.reduce((sum, r) => sum + (Number(r.acked) || 0), 0); - const desired = seedResults.reduce((sum, r) => sum + (Number(r.desired_events) || 0), 0); - const projected = seedResults.reduce((sum, r) => sum + (Number(r.projected_events) || 0), 0); + const accepted = seedResults.reduce((sum, r) => sum + (Number(r.accepted) || 0), 0); + const desired = seedResults.reduce((sum, r) => sum + (Number(r.desired_accepted) || 0), 0); return { desired, - projected, - acked, + accepted, clients: seedResults, }; } @@ -1111,7 +1141,7 @@ async function fetchServerEventCount({ target, serverIp, keyPath, serverEnvPrefi } // Ensure the relay has approximately `targetCount` events. -// Uses client-side nostr-bench event seeding in parallel and accepts <=1% drift. +// Uses client-side nostr-bench seed mode and accepts <=1% drift. async function smartFill({ target, phase, @@ -1124,8 +1154,6 @@ async function smartFill({ serverEnvPrefix, artifactDir, threads, - seedEventsPerConnection, - seedKeepaliveSeconds, skipFill, fetchEventCount, }) { @@ -1162,14 +1190,8 @@ async function smartFill({ return { eventsInDb, seeded: 0, wiped, skipped: false }; } - const perConnectionEstimate = Math.max( - 0.1, - Number(seedEventsPerConnection) || SEED_EVENTS_PER_CONNECTION_FALLBACK, - ); - const keepaliveSeconds = Math.max(2, Number(seedKeepaliveSeconds) || SEED_KEEPALIVE_SECONDS); - console.log( - `[fill] ${target}:${phase}: seeding to ~${targetCount} events from ${eventsInDb} (deficit=${deficit}, tolerance=${tolerance}, events_per_connection≈${perConnectionEstimate.toFixed(2)}, keepalive=${keepaliveSeconds}s)`, + `[fill] ${target}:${phase}: seeding to ~${targetCount} events from ${eventsInDb} (deficit=${deficit}, tolerance=${tolerance})`, ); let seededTotal = 0; @@ -1179,7 +1201,7 @@ async function smartFill({ if (deficit <= tolerance) break; roundsExecuted = round; - const roundDeficit = Math.max(1, Math.ceil(deficit * SEED_ROUND_DEFICIT_RATIO)); + const roundDeficit = Math.max(1, deficit); const roundStartMs = Date.now(); const eventsBeforeRound = eventsInDb; @@ -1193,11 +1215,9 @@ async function smartFill({ relayUrl, artifactDir, threads, - seedEventsPerConnection: perConnectionEstimate, - seedKeepaliveSeconds: keepaliveSeconds, }); - let observedAdded = roundResult.acked; + let observedAdded = roundResult.accepted; if (typeof fetchEventCount === "function") { const authoritative = await fetchEventCount(); @@ -1205,10 +1225,10 @@ async function smartFill({ eventsInDb = authoritative; observedAdded = Math.max(0, eventsInDb - eventsBeforeRound); } else { - eventsInDb += roundResult.acked; + eventsInDb += roundResult.accepted; } } else { - eventsInDb += roundResult.acked; + eventsInDb += roundResult.accepted; } const elapsedSec = (Date.now() - roundStartMs) / 1000; @@ -1218,7 +1238,7 @@ async function smartFill({ deficit = targetCount - eventsInDb; console.log( - `[fill] ${target}:${phase} round ${round}: observed ${observedAdded} (acked=${roundResult.acked}, desired=${roundResult.desired}, projected=${roundResult.projected}) in ${elapsedSec.toFixed(1)}s (${eventsPerSec} events/s), now ~${eventsInDb}/${targetCount}`, + `[fill] ${target}:${phase} round ${round}: observed ${observedAdded} (accepted=${roundResult.accepted}, desired=${roundResult.desired}) in ${elapsedSec.toFixed(1)}s (${eventsPerSec} events/s), now ~${eventsInDb}/${targetCount}`, ); if (observedAdded <= 0) { @@ -2003,20 +2023,13 @@ async function main() { const estimatedEmptyEventWritten = countEventsWritten(emptyEventResults); eventsInDb += estimatedEmptyEventWritten; - let observedEmptyEventWritten = estimatedEmptyEventWritten; const authoritativeAfterEmpty = await fetchEventCountForTarget(); if (Number.isInteger(authoritativeAfterEmpty) && authoritativeAfterEmpty >= 0) { - observedEmptyEventWritten = Math.max(0, authoritativeAfterEmpty); eventsInDb = authoritativeAfterEmpty; } console.log(`[bench] ${target}: ~${eventsInDb} events in DB after empty phase`); - const warmSeedEventsPerConnection = Math.max( - 0.1, - observedEmptyEventWritten / Math.max(1, opts.bench.eventCount * clientInfos.length), - ); - // Fill to warm const fillWarm = await smartFill({ target, @@ -2030,8 +2043,6 @@ async function main() { serverEnvPrefix, artifactDir: path.join(runTargetDir, "fill-warm"), threads: opts.bench.threads, - seedEventsPerConnection: warmSeedEventsPerConnection, - seedKeepaliveSeconds: opts.bench.keepaliveSeconds, skipFill: target === "haven", fetchEventCount: fetchEventCountForTarget, }); @@ -2052,21 +2063,13 @@ async function main() { artifactDir: path.join(runTargetDir, "warm-event"), }); const estimatedWarmEventWritten = countEventsWritten(warmEventResults); - const warmEventsBefore = eventsInDb; eventsInDb += estimatedWarmEventWritten; - let observedWarmEventWritten = estimatedWarmEventWritten; const authoritativeAfterWarmEvent = await fetchEventCountForTarget(); if (Number.isInteger(authoritativeAfterWarmEvent) && authoritativeAfterWarmEvent >= 0) { - observedWarmEventWritten = Math.max(0, authoritativeAfterWarmEvent - warmEventsBefore); eventsInDb = authoritativeAfterWarmEvent; } - const hotSeedEventsPerConnection = Math.max( - 0.1, - observedWarmEventWritten / Math.max(1, opts.bench.eventCount * clientInfos.length), - ); - // Fill to hot const fillHot = await smartFill({ target, @@ -2080,8 +2083,6 @@ async function main() { serverEnvPrefix, artifactDir: path.join(runTargetDir, "fill-hot"), threads: opts.bench.threads, - seedEventsPerConnection: hotSeedEventsPerConnection, - seedKeepaliveSeconds: opts.bench.keepaliveSeconds, skipFill: target === "haven", fetchEventCount: fetchEventCountForTarget, }); diff --git a/scripts/cloud_bench_results.mjs b/scripts/cloud_bench_results.mjs index 08fd352..155b28c 100644 --- a/scripts/cloud_bench_results.mjs +++ b/scripts/cloud_bench_results.mjs @@ -10,7 +10,7 @@ export function parseNostrBenchSections(output) { for (const lineRaw of lines) { const line = lineRaw.trim(); - const header = line.match(/^==>\s+nostr-bench\s+(connect|echo|event|req)\s+/); + const header = line.match(/^==>\s+nostr-bench\s+(connect|echo|event|req|seed)\s+/); if (header) { section = header[1]; continue; @@ -20,9 +20,24 @@ export function parseNostrBenchSections(output) { try { const json = JSON.parse(line); - if (section) { - parsed[section] = json; + if (!section) continue; + + if (section === "seed" && json?.type === "seed_final") { + parsed.seed_final = json; + continue; } + + const existing = parsed[section]; + if (!existing) { + parsed[section] = json; + continue; + } + + if (existing?.type === "final" && json?.type !== "final") { + continue; + } + + parsed[section] = json; } catch { // ignore noisy non-json lines } @@ -43,14 +58,21 @@ export function sum(values) { return valid.reduce((a, b) => a + b, 0); } -export function throughputFromSection(section) { +export function throughputFromSection(section, options = {}) { + const { preferAccepted = false } = options; + const elapsedMs = Number(section?.elapsed ?? NaN); + const accepted = Number(section?.message_stats?.accepted ?? NaN); const complete = Number(section?.message_stats?.complete ?? NaN); + const effectiveCount = + preferAccepted && Number.isFinite(accepted) + ? accepted + : complete; const totalBytes = Number(section?.message_stats?.size ?? NaN); const cumulativeTps = - Number.isFinite(elapsedMs) && elapsedMs > 0 && Number.isFinite(complete) - ? complete / (elapsedMs / 1000) + Number.isFinite(elapsedMs) && elapsedMs > 0 && Number.isFinite(effectiveCount) + ? effectiveCount / (elapsedMs / 1000) : NaN; const cumulativeMibs = @@ -58,7 +80,11 @@ export function throughputFromSection(section) { ? totalBytes / (1024 * 1024) / (elapsedMs / 1000) : NaN; - const sampleTps = Number(section?.tps ?? NaN); + const sampleTps = Number( + preferAccepted + ? section?.accepted_tps ?? section?.tps + : section?.tps, + ); const sampleMibs = Number(section?.size ?? NaN); return { @@ -67,10 +93,16 @@ export function throughputFromSection(section) { }; } +function messageCounter(section, field) { + const value = Number(section?.message_stats?.[field]); + return Number.isFinite(value) ? value : 0; +} + export function metricFromSections(sections) { const connect = sections?.connect?.connect_stats?.success_time || {}; const echo = throughputFromSection(sections?.echo || {}); - const event = throughputFromSection(sections?.event || {}); + const eventSection = sections?.event || {}; + const event = throughputFromSection(eventSection, { preferAccepted: true }); const req = throughputFromSection(sections?.req || {}); return { @@ -80,6 +112,10 @@ export function metricFromSections(sections) { echo_mibs: echo.mibs, event_tps: event.tps, event_mibs: event.mibs, + event_notice: messageCounter(eventSection, "notice"), + event_auth_challenge: messageCounter(eventSection, "auth_challenge"), + event_reply_unrecognized: messageCounter(eventSection, "reply_unrecognized"), + event_ack_timeout: messageCounter(eventSection, "ack_timeout"), req_tps: req.tps, req_mibs: req.mibs, }; @@ -109,6 +145,10 @@ export function summariseFlatResults(results) { echo_mibs: sum(clientSamples.map((s) => s.echo_mibs)), event_tps: sum(clientSamples.map((s) => s.event_tps)), event_mibs: sum(clientSamples.map((s) => s.event_mibs)), + event_notice: sum(clientSamples.map((s) => s.event_notice)), + event_auth_challenge: sum(clientSamples.map((s) => s.event_auth_challenge)), + event_reply_unrecognized: sum(clientSamples.map((s) => s.event_reply_unrecognized)), + event_ack_timeout: sum(clientSamples.map((s) => s.event_ack_timeout)), req_tps: sum(clientSamples.map((s) => s.req_tps)), req_mibs: sum(clientSamples.map((s) => s.req_mibs)), }); @@ -121,6 +161,10 @@ export function summariseFlatResults(results) { "echo_mibs", "event_tps", "event_mibs", + "event_notice", + "event_auth_challenge", + "event_reply_unrecognized", + "event_ack_timeout", "req_tps", "req_mibs", ]; @@ -184,6 +228,16 @@ export function summarisePhasedResults(results) { if (eventClients.length > 0) { sample[`event_${level}_tps`] = sum(eventClients.map((s) => s.event_tps)); sample[`event_${level}_mibs`] = sum(eventClients.map((s) => s.event_mibs)); + sample[`event_${level}_notice`] = sum(eventClients.map((s) => s.event_notice)); + sample[`event_${level}_auth_challenge`] = sum( + eventClients.map((s) => s.event_auth_challenge), + ); + sample[`event_${level}_reply_unrecognized`] = sum( + eventClients.map((s) => s.event_reply_unrecognized), + ); + sample[`event_${level}_ack_timeout`] = sum( + eventClients.map((s) => s.event_ack_timeout), + ); } } @@ -217,10 +271,15 @@ export function countEventsWritten(clientResults) { const eventSection = cr.sections?.event; if (!eventSection?.message_stats) continue; + const accepted = Number(eventSection.message_stats.accepted); + if (Number.isFinite(accepted)) { + total += Math.max(0, accepted); + continue; + } + const complete = Number(eventSection.message_stats.complete) || 0; const error = Number(eventSection.message_stats.error) || 0; - const accepted = Math.max(0, complete - error); - total += accepted; + total += Math.max(0, complete - error); } return total; }