From 070464f2ebf7a74c2ff076e1847c1b77e6b0320e Mon Sep 17 00:00:00 2001 From: Steffen Beyer Date: Fri, 20 Mar 2026 14:19:58 +0100 Subject: [PATCH] bench: Cloud seeding --- scripts/cloud_bench_orchestrate.mjs | 235 ++++++++++++++++++++++++---- scripts/cloud_bench_results.mjs | 9 +- scripts/cloud_bench_server.sh | 10 +- 3 files changed, 216 insertions(+), 38 deletions(-) diff --git a/scripts/cloud_bench_orchestrate.mjs b/scripts/cloud_bench_orchestrate.mjs index edf2490..dd22048 100755 --- a/scripts/cloud_bench_orchestrate.mjs +++ b/scripts/cloud_bench_orchestrate.mjs @@ -23,15 +23,25 @@ const __filename = fileURLToPath(import.meta.url); const __dirname = path.dirname(__filename); const ROOT_DIR = path.resolve(__dirname, ".."); -const DEFAULT_TARGETS = ["parrhesia-pg", "parrhesia-memory", "strfry", "nostr-rs-relay", "nostream", "haven"]; +const DEFAULT_TARGETS = [ + "parrhesia-pg", + "parrhesia-memory", + "strfry", + "nostr-rs-relay", + "nostream", + // "haven", // disabled by default: Haven rejects generic nostr-bench event seeding (auth/whitelist/WoT policies) +]; const ESTIMATE_WINDOW_MINUTES = 30; const ESTIMATE_WINDOW_HOURS = ESTIMATE_WINDOW_MINUTES / 60; const ESTIMATE_WINDOW_LABEL = `${ESTIMATE_WINDOW_MINUTES}m`; const BENCH_BUILD_DIR = path.join(ROOT_DIR, "_build", "bench"); const NOSTREAM_REDIS_IMAGE = "redis:7.0.5-alpine3.16"; const SEED_TOLERANCE_RATIO = 0.01; -const SEED_MAX_ROUNDS = 4; -const SEED_EVENT_RATE = 5000; +const SEED_MAX_ROUNDS = 8; +const SEED_ROUND_DEFICIT_RATIO = 0.2; +const SEED_KEEPALIVE_SECONDS = 10; +const SEED_EVENTS_PER_CONNECTION_FALLBACK = 3; +const PHASE_PREP_OFFSET_MINUTES = 3; const DEFAULTS = { datacenter: "fsn1-dc14", @@ -454,6 +464,34 @@ function formatEuro(value) { return `€${value.toFixed(4)}`; } +function createPhaseLogger(prepOffsetMinutes = PHASE_PREP_OFFSET_MINUTES) { + let phaseZeroMs = null; + + const prefix = () => { + if (phaseZeroMs === null) { + return "T+0m"; + } + + const elapsedMinutes = Math.floor((Date.now() - phaseZeroMs) / 60000); + const sign = elapsedMinutes >= 0 ? "+" : ""; + return `T${sign}${elapsedMinutes}m`; + }; + + return { + setPrepOffsetNow() { + phaseZeroMs = Date.now() + prepOffsetMinutes * 60_000; + }, + + setZeroNow() { + phaseZeroMs = Date.now(); + }, + + logPhase(message) { + console.log(`${prefix()} ${message}`); + }, + }; +} + function compatibleDatacenterChoices(datacenters, serverType, clientType, clientCount) { const compatible = []; @@ -883,6 +921,8 @@ async function runClientSeedingRound({ relayUrl, artifactDir, threads, + seedEventsPerConnection, + seedKeepaliveSeconds, }) { const benchThreads = Number.isInteger(threads) && threads >= 0 ? threads : 0; const clientsForRound = clientInfos.slice(0, Math.min(clientInfos.length, deficit)); @@ -911,10 +951,14 @@ async function runClientSeedingRound({ }; } - const eventConnections = 1; - const eventKeepalive = Math.max(5, Math.ceil(desiredEvents / SEED_EVENT_RATE)); - const eventRate = Math.max(1, Math.ceil(desiredEvents / eventKeepalive)); - const projectedEvents = eventConnections * eventRate * eventKeepalive; + const eventsPerConnection = Math.max( + 0.1, + Number(seedEventsPerConnection) || SEED_EVENTS_PER_CONNECTION_FALLBACK, + ); + const eventConnections = Math.max(1, Math.ceil(desiredEvents / eventsPerConnection)); + const eventKeepalive = Math.max(2, Number(seedKeepaliveSeconds) || SEED_KEEPALIVE_SECONDS); + const eventRate = Math.max(1, Math.ceil(eventConnections / eventKeepalive)); + const projectedEvents = Math.round(eventConnections * eventsPerConnection); const seedEnvPrefix = [ `PARRHESIA_BENCH_EVENT_COUNT=${eventConnections}`, @@ -934,7 +978,9 @@ async function runClientSeedingRound({ fs.writeFileSync(stderrPath, benchRes.stderr, "utf8"); const parsed = parseNostrBenchSections(benchRes.stdout); - const acked = Number(parsed?.event?.message_stats?.complete) || 0; + const complete = Number(parsed?.event?.message_stats?.complete) || 0; + const error = Number(parsed?.event?.message_stats?.error) || 0; + const acked = Math.max(0, complete - error); return { client_name: client.name, @@ -945,6 +991,9 @@ async function runClientSeedingRound({ event_connections: eventConnections, event_rate: eventRate, event_keepalive_seconds: eventKeepalive, + events_per_connection_estimate: eventsPerConnection, + event_complete: complete, + event_error: error, acked, stdout_path: path.relative(ROOT_DIR, stdoutPath), stderr_path: path.relative(ROOT_DIR, stderrPath), @@ -964,6 +1013,7 @@ async function runClientSeedingRound({ event_connections: eventConnections, event_rate: eventRate, event_keepalive_seconds: eventKeepalive, + events_per_connection_estimate: eventsPerConnection, acked: 0, stdout_path: path.relative(ROOT_DIR, stdoutPath), stderr_path: path.relative(ROOT_DIR, stderrPath), @@ -992,6 +1042,35 @@ async function runClientSeedingRound({ }; } +async function fetchServerEventCount({ target, serverIp, keyPath, serverEnvPrefix }) { + const countableTargets = new Set(["parrhesia-pg", "nostream"]); + if (!countableTargets.has(target)) return null; + + const countCmd = `count-data-${target}`; + + try { + const res = await sshExec( + serverIp, + keyPath, + `${serverEnvPrefix} /root/cloud-bench-server.sh ${shellEscape(countCmd)}`, + ); + + const lines = res.stdout + .split(/\r?\n/) + .map((line) => line.trim()) + .filter(Boolean); + + const numericLine = [...lines].reverse().find((line) => /^\d+$/.test(line)); + if (!numericLine) return null; + + const count = Number(numericLine); + return Number.isInteger(count) && count >= 0 ? count : null; + } catch (error) { + console.warn(`[fill] ${target}: failed to fetch server event count (${error.message || error})`); + return null; + } +} + // Ensure the relay has approximately `targetCount` events. // Uses client-side nostr-bench event seeding in parallel and accepts <=1% drift. async function smartFill({ @@ -1006,8 +1085,17 @@ async function smartFill({ serverEnvPrefix, artifactDir, threads, + seedEventsPerConnection, + seedKeepaliveSeconds, + skipFill, + fetchEventCount, }) { - if (targetCount <= 0) return { eventsInDb, seeded: 0, wiped: false }; + if (targetCount <= 0) return { eventsInDb, seeded: 0, wiped: false, skipped: false }; + + if (skipFill) { + console.log(`[fill] ${target}:${phase}: skipped (target does not support generic event seeding)`); + return { eventsInDb, seeded: 0, wiped: false, skipped: true }; + } let wiped = false; if (eventsInDb > targetCount) { @@ -1018,6 +1106,13 @@ async function smartFill({ wiped = true; } + if (typeof fetchEventCount === "function") { + const authoritative = await fetchEventCount(); + if (Number.isInteger(authoritative) && authoritative >= 0) { + eventsInDb = authoritative; + } + } + const tolerance = Math.max(1, Math.floor(targetCount * SEED_TOLERANCE_RATIO)); let deficit = targetCount - eventsInDb; @@ -1025,43 +1120,69 @@ async function smartFill({ console.log( `[fill] ${target}: already within tolerance (${eventsInDb}/${targetCount}, tolerance=${tolerance}), skipping`, ); - return { eventsInDb, seeded: 0, wiped }; + return { eventsInDb, seeded: 0, wiped, skipped: false }; } + const perConnectionEstimate = Math.max( + 0.1, + Number(seedEventsPerConnection) || SEED_EVENTS_PER_CONNECTION_FALLBACK, + ); + const keepaliveSeconds = Math.max(2, Number(seedKeepaliveSeconds) || SEED_KEEPALIVE_SECONDS); + console.log( - `[fill] ${target}:${phase}: seeding to ~${targetCount} events from ${eventsInDb} (deficit=${deficit}, tolerance=${tolerance})`, + `[fill] ${target}:${phase}: seeding to ~${targetCount} events from ${eventsInDb} (deficit=${deficit}, tolerance=${tolerance}, events_per_connection≈${perConnectionEstimate.toFixed(2)}, keepalive=${keepaliveSeconds}s)`, ); let seededTotal = 0; + let roundsExecuted = 0; for (let round = 1; round <= SEED_MAX_ROUNDS; round += 1) { if (deficit <= tolerance) break; + roundsExecuted = round; + const roundDeficit = Math.max(1, Math.ceil(deficit * SEED_ROUND_DEFICIT_RATIO)); const roundStartMs = Date.now(); + const eventsBeforeRound = eventsInDb; + const roundResult = await runClientSeedingRound({ target, phase, round, - deficit, + deficit: roundDeficit, clientInfos, keyPath, relayUrl, artifactDir, threads, + seedEventsPerConnection: perConnectionEstimate, + seedKeepaliveSeconds: keepaliveSeconds, }); - const elapsedSec = (Date.now() - roundStartMs) / 1000; - const eventsPerSec = elapsedSec > 0 ? Math.round(roundResult.acked / elapsedSec) : 0; + let observedAdded = roundResult.acked; - eventsInDb += roundResult.acked; - seededTotal += roundResult.acked; + if (typeof fetchEventCount === "function") { + const authoritative = await fetchEventCount(); + if (Number.isInteger(authoritative) && authoritative >= 0) { + eventsInDb = authoritative; + observedAdded = Math.max(0, eventsInDb - eventsBeforeRound); + } else { + eventsInDb += roundResult.acked; + } + } else { + eventsInDb += roundResult.acked; + } + + const elapsedSec = (Date.now() - roundStartMs) / 1000; + const eventsPerSec = elapsedSec > 0 ? Math.round(observedAdded / elapsedSec) : 0; + + seededTotal += observedAdded; deficit = targetCount - eventsInDb; console.log( - `[fill] ${target}:${phase} round ${round}: acked ${roundResult.acked} (desired=${roundResult.desired}, projected=${roundResult.projected}) in ${elapsedSec.toFixed(1)}s (${eventsPerSec} events/s), now ~${eventsInDb}/${targetCount}`, + `[fill] ${target}:${phase} round ${round}: observed ${observedAdded} (acked=${roundResult.acked}, desired=${roundResult.desired}, projected=${roundResult.projected}) in ${elapsedSec.toFixed(1)}s (${eventsPerSec} events/s), now ~${eventsInDb}/${targetCount}`, ); - if (roundResult.acked <= 0) { + if (observedAdded <= 0) { console.warn(`[fill] ${target}:${phase} round ${round}: no progress, stopping early`); break; } @@ -1070,11 +1191,11 @@ async function smartFill({ const remaining = Math.max(0, targetCount - eventsInDb); if (remaining > tolerance) { console.warn( - `[fill] ${target}:${phase}: remaining deficit ${remaining} exceeds tolerance ${tolerance} after ${SEED_MAX_ROUNDS} rounds`, + `[fill] ${target}:${phase}: remaining deficit ${remaining} exceeds tolerance ${tolerance} after ${roundsExecuted} rounds`, ); } - return { eventsInDb, seeded: seededTotal, wiped }; + return { eventsInDb, seeded: seededTotal, wiped, skipped: false }; } // Run a single benchmark type across all clients in parallel. @@ -1361,8 +1482,11 @@ async function main() { const opts = parseArgs(process.argv.slice(2)); await ensureLocalPrereqs(opts); + const phaseLogger = createPhaseLogger(); + const datacenterChoice = await chooseDatacenter(opts); opts.datacenter = datacenterChoice.name; + phaseLogger.setPrepOffsetNow(); console.log( `[plan] selected datacenter=${opts.datacenter} (${ESTIMATE_WINDOW_LABEL} est gross=${formatEuro(datacenterChoice.estimatedTotal.gross)} net=${formatEuro(datacenterChoice.estimatedTotal.net)})`, ); @@ -1394,7 +1518,7 @@ async function main() { fs.mkdirSync(path.dirname(historyFile), { recursive: true }); console.log(`[run] ${runId}`); - console.log("[phase] local preparation"); + phaseLogger.logPhase("[phase] local preparation"); const nostrBench = await buildNostrBenchBinary(tmpDir); const needsParrhesia = opts.targets.includes("parrhesia-pg") || opts.targets.includes("parrhesia-memory"); @@ -1475,7 +1599,7 @@ async function main() { }); try { - console.log("[phase] create ssh credentials"); + phaseLogger.logPhase("[phase] create ssh credentials"); await runCommand("ssh-keygen", ["-t", "ed25519", "-N", "", "-f", keyPath, "-C", keyName], { stdio: "inherit", }); @@ -1485,7 +1609,7 @@ async function main() { }); sshKeyCreated = true; - console.log("[phase] create cloud servers in parallel"); + phaseLogger.logPhase("[phase] create cloud servers in parallel"); const serverName = `${runId}-server`; const clientNames = Array.from({ length: opts.clients }, (_, i) => `${runId}-client-${i + 1}`); @@ -1555,7 +1679,9 @@ async function main() { ip: c.server.public_net.ipv4.ip, })); - console.log("[phase] wait for SSH"); + // Reset phase clock to T+0 when cloud servers are successfully created. + phaseLogger.setZeroNow(); + phaseLogger.logPhase("[phase] wait for SSH"); await Promise.all([ waitForSsh(serverIp, keyPath), ...clientInfos.map((client) => waitForSsh(client.ip, keyPath)), @@ -1597,7 +1723,7 @@ async function main() { } console.log(`[firewall] ${firewallName} applied (sources: ${sourceIps.join(", ")})`); - console.log("[phase] install runtime dependencies on server node"); + phaseLogger.logPhase("[phase] install runtime dependencies on server node"); const serverInstallCmd = [ "set -euo pipefail", "export DEBIAN_FRONTEND=noninteractive", @@ -1614,19 +1740,17 @@ async function main() { await sshExec(serverIp, keyPath, serverInstallCmd, { stdio: "inherit" }); - console.log("[phase] minimal client setup (no apt install)"); + phaseLogger.logPhase("[phase] minimal client setup (no apt install)"); const clientBootstrapCmd = [ "set -euo pipefail", "mkdir -p /usr/local/bin", "sysctl -w net.ipv4.ip_local_port_range='10000 65535' >/dev/null || true", "sysctl -w net.core.somaxconn=65535 >/dev/null || true", - "bash --version", - "uname -m", ].join("; "); await Promise.all(clientInfos.map((client) => sshExec(client.ip, keyPath, clientBootstrapCmd, { stdio: "inherit" }))); - console.log("[phase] upload control scripts + nostr-bench binary"); + phaseLogger.logPhase("[phase] upload control scripts + nostr-bench binary"); await scpToHost(serverIp, keyPath, localServerScriptPath, "/root/cloud-bench-server.sh"); await sshExec(serverIp, keyPath, "chmod +x /root/cloud-bench-server.sh"); @@ -1637,7 +1761,7 @@ async function main() { await sshExec(client.ip, keyPath, "chmod +x /root/cloud-bench-client.sh /usr/local/bin/nostr-bench"); } - console.log("[phase] server image setup"); + phaseLogger.logPhase("[phase] server image setup"); let parrhesiaImageOnServer = parrhesiaSource.image; @@ -1726,7 +1850,7 @@ async function main() { const results = []; const targetOrderPerRun = []; - console.log(`[phase] benchmark execution (mode=${opts.quick ? "quick" : "phased"})`); + phaseLogger.logPhase(`[phase] benchmark execution (mode=${opts.quick ? "quick" : "phased"})`); for (let runIndex = 1; runIndex <= opts.runs; runIndex += 1) { const runTargets = shuffled(opts.targets); @@ -1782,6 +1906,13 @@ async function main() { ].join(" "); const benchArgs = { clientInfos, keyPath, benchEnvPrefix, relayUrl }; + const fetchEventCountForTarget = async () => + fetchServerEventCount({ + target, + serverIp, + keyPath, + serverEnvPrefix, + }); if (opts.quick) { // Flat mode: run all benchmarks in one shot (backward compat) @@ -1830,9 +1961,23 @@ async function main() { mode: "event", artifactDir: path.join(runTargetDir, "empty-event"), }); - eventsInDb += countEventsWritten(emptyEventResults); + const estimatedEmptyEventWritten = countEventsWritten(emptyEventResults); + eventsInDb += estimatedEmptyEventWritten; + + let observedEmptyEventWritten = estimatedEmptyEventWritten; + const authoritativeAfterEmpty = await fetchEventCountForTarget(); + if (Number.isInteger(authoritativeAfterEmpty) && authoritativeAfterEmpty >= 0) { + observedEmptyEventWritten = Math.max(0, authoritativeAfterEmpty); + eventsInDb = authoritativeAfterEmpty; + } + console.log(`[bench] ${target}: ~${eventsInDb} events in DB after empty phase`); + const warmSeedEventsPerConnection = Math.max( + 0.1, + observedEmptyEventWritten / Math.max(1, opts.bench.eventCount * clientInfos.length), + ); + // Fill to warm const fillWarm = await smartFill({ target, @@ -1846,6 +1991,10 @@ async function main() { serverEnvPrefix, artifactDir: path.join(runTargetDir, "fill-warm"), threads: opts.bench.threads, + seedEventsPerConnection: warmSeedEventsPerConnection, + seedKeepaliveSeconds: opts.bench.keepaliveSeconds, + skipFill: target === "haven", + fetchEventCount: fetchEventCountForTarget, }); eventsInDb = fillWarm.eventsInDb; @@ -1863,7 +2012,21 @@ async function main() { mode: "event", artifactDir: path.join(runTargetDir, "warm-event"), }); - eventsInDb += countEventsWritten(warmEventResults); + const estimatedWarmEventWritten = countEventsWritten(warmEventResults); + const warmEventsBefore = eventsInDb; + eventsInDb += estimatedWarmEventWritten; + + let observedWarmEventWritten = estimatedWarmEventWritten; + const authoritativeAfterWarmEvent = await fetchEventCountForTarget(); + if (Number.isInteger(authoritativeAfterWarmEvent) && authoritativeAfterWarmEvent >= 0) { + observedWarmEventWritten = Math.max(0, authoritativeAfterWarmEvent - warmEventsBefore); + eventsInDb = authoritativeAfterWarmEvent; + } + + const hotSeedEventsPerConnection = Math.max( + 0.1, + observedWarmEventWritten / Math.max(1, opts.bench.eventCount * clientInfos.length), + ); // Fill to hot const fillHot = await smartFill({ @@ -1878,6 +2041,10 @@ async function main() { serverEnvPrefix, artifactDir: path.join(runTargetDir, "fill-hot"), threads: opts.bench.threads, + seedEventsPerConnection: hotSeedEventsPerConnection, + seedKeepaliveSeconds: opts.bench.keepaliveSeconds, + skipFill: target === "haven", + fetchEventCount: fetchEventCountForTarget, }); eventsInDb = fillHot.eventsInDb; @@ -1965,7 +2132,7 @@ async function main() { }); } - console.log("[phase] final server cleanup (containers)"); + phaseLogger.logPhase("[phase] final server cleanup (containers)"); await sshExec(serverIp, keyPath, "/root/cloud-bench-server.sh cleanup"); const servers = summariseServersFromResults(results); diff --git a/scripts/cloud_bench_results.mjs b/scripts/cloud_bench_results.mjs index 37a514a..08fd352 100644 --- a/scripts/cloud_bench_results.mjs +++ b/scripts/cloud_bench_results.mjs @@ -215,9 +215,12 @@ export function countEventsWritten(clientResults) { for (const cr of clientResults) { if (cr.status !== "ok") continue; const eventSection = cr.sections?.event; - if (eventSection?.message_stats?.complete) { - total += Number(eventSection.message_stats.complete) || 0; - } + if (!eventSection?.message_stats) continue; + + const complete = Number(eventSection.message_stats.complete) || 0; + const error = Number(eventSection.message_stats.error) || 0; + const accepted = Math.max(0, complete - error); + total += accepted; } return total; } diff --git a/scripts/cloud_bench_server.sh b/scripts/cloud_bench_server.sh index 32532c9..8b89fa4 100755 --- a/scripts/cloud_bench_server.sh +++ b/scripts/cloud_bench_server.sh @@ -265,7 +265,7 @@ common_parrhesia_env+=( -e PARRHESIA_LIMITS_MAX_NEGENTROPY_ITEMS_PER_SESSION=100 cmd="${1:-}" if [[ -z "$cmd" ]]; then - echo "usage: cloud-bench-server.sh " >&2 + echo "usage: cloud-bench-server.sh " >&2 exit 1 fi @@ -626,6 +626,14 @@ EOF wait_port 3355 120 haven ;; + count-data-parrhesia-pg) + docker exec pg psql -U parrhesia -d parrhesia -At -c "SELECT count(*) FROM events" + ;; + + count-data-nostream) + docker exec nostream-db psql -U nostr_ts_relay -d nostr_ts_relay -At -c "SELECT count(*) FROM events" + ;; + cleanup) cleanup_containers ;;