From 57c2c0b822435555409b05dfffb62a50ad8d7890 Mon Sep 17 00:00:00 2001 From: Steffen Beyer Date: Thu, 19 Mar 2026 23:14:46 +0100 Subject: [PATCH] bench: Cloud tuning --- scripts/cloud_bench_orchestrate.mjs | 264 ++++++++++++++++++++++++---- scripts/run_bench_cloud.sh | 10 ++ 2 files changed, 243 insertions(+), 31 deletions(-) diff --git a/scripts/cloud_bench_orchestrate.mjs b/scripts/cloud_bench_orchestrate.mjs index 1f0b211..f534968 100755 --- a/scripts/cloud_bench_orchestrate.mjs +++ b/scripts/cloud_bench_orchestrate.mjs @@ -6,7 +6,6 @@ import path from "node:path"; import { spawn } from "node:child_process"; import readline from "node:readline"; import { fileURLToPath } from "node:url"; -import { seedEvents } from "./nostr_seed.mjs"; const __filename = fileURLToPath(import.meta.url); const __dirname = path.dirname(__filename); @@ -18,6 +17,9 @@ const ESTIMATE_WINDOW_HOURS = ESTIMATE_WINDOW_MINUTES / 60; const ESTIMATE_WINDOW_LABEL = `${ESTIMATE_WINDOW_MINUTES}m`; const BENCH_BUILD_DIR = path.join(ROOT_DIR, "_build", "bench"); const NOSTREAM_REDIS_IMAGE = "redis:7.0.5-alpine3.16"; +const SEED_TOLERANCE_RATIO = 0.01; +const SEED_MAX_ROUNDS = 4; +const SEED_EVENT_RATE = 5000; const DEFAULTS = { datacenter: "fsn1-dc14", @@ -39,20 +41,21 @@ const DEFAULTS = { havenImage: "holgerhatgarkeinenode/haven-docker:latest", keep: false, quick: false, - warmEvents: 100000, - hotEvents: 1000000, + warmEvents: 25000, + hotEvents: 250000, bench: { - connectCount: 1000, - connectRate: 200, - echoCount: 1000, - echoRate: 200, + connectCount: 3000, + connectRate: 1500, + echoCount: 3000, + echoRate: 1500, echoSize: 512, - eventCount: 2000, - eventRate: 300, - reqCount: 1000, - reqRate: 200, + eventCount: 5000, + eventRate: 2000, + reqCount: 3000, + reqRate: 1500, reqLimit: 50, - keepaliveSeconds: 5, + keepaliveSeconds: 10, + threads: 0, }, }; @@ -98,6 +101,7 @@ Options: --req-rate (default: ${DEFAULTS.bench.reqRate}) --req-limit (default: ${DEFAULTS.bench.reqLimit}) --keepalive-seconds (default: ${DEFAULTS.bench.keepaliveSeconds}) + --threads nostr-bench worker threads (0 = auto, default: ${DEFAULTS.bench.threads}) Phased benchmark: --warm-events DB fill level for warm phase (default: ${DEFAULTS.warmEvents}) @@ -134,6 +138,14 @@ function parseArgs(argv) { return n; }; + const nonNegativeIntOpt = (name, value) => { + const n = Number(value); + if (!Number.isInteger(n) || n < 0) { + throw new Error(`${name} must be a non-negative integer, got: ${value}`); + } + return n; + }; + for (let i = 0; i < argv.length; i += 1) { const arg = argv[i]; switch (arg) { @@ -223,6 +235,9 @@ function parseArgs(argv) { case "--keepalive-seconds": opts.bench.keepaliveSeconds = intOpt(arg, argv[++i]); break; + case "--threads": + opts.bench.threads = nonNegativeIntOpt(arg, argv[++i]); + break; case "--history-file": opts.historyFile = argv[++i]; break; @@ -248,6 +263,9 @@ function parseArgs(argv) { if (process.env.PARRHESIA_BENCH_WARM_EVENTS) opts.warmEvents = Number(process.env.PARRHESIA_BENCH_WARM_EVENTS); if (process.env.PARRHESIA_BENCH_HOT_EVENTS) opts.hotEvents = Number(process.env.PARRHESIA_BENCH_HOT_EVENTS); + if (process.env.PARRHESIA_BENCH_THREADS) { + opts.bench.threads = nonNegativeIntOpt("PARRHESIA_BENCH_THREADS", process.env.PARRHESIA_BENCH_THREADS); + } if (process.env.PARRHESIA_BENCH_QUICK === "1") opts.quick = true; if (!opts.targets.length) { @@ -1461,6 +1479,10 @@ if [[ -z "\$relay_url" ]]; then fi bench_bin="\${NOSTR_BENCH_BIN:-/usr/local/bin/nostr-bench}" +bench_threads="\${PARRHESIA_BENCH_THREADS:-0}" +client_nofile="\${PARRHESIA_BENCH_CLIENT_NOFILE:-262144}" + +ulimit -n "\${client_nofile}" >/dev/null 2>&1 || true run_connect() { echo "==> nostr-bench connect \${relay_url}" @@ -1468,6 +1490,7 @@ run_connect() { -c "\${PARRHESIA_BENCH_CONNECT_COUNT:-200}" \ -r "\${PARRHESIA_BENCH_CONNECT_RATE:-100}" \ -k "\${PARRHESIA_BENCH_KEEPALIVE_SECONDS:-5}" \ + -t "\${bench_threads}" \ "\${relay_url}" } @@ -1477,6 +1500,7 @@ run_echo() { -c "\${PARRHESIA_BENCH_ECHO_COUNT:-100}" \ -r "\${PARRHESIA_BENCH_ECHO_RATE:-50}" \ -k "\${PARRHESIA_BENCH_KEEPALIVE_SECONDS:-5}" \ + -t "\${bench_threads}" \ --size "\${PARRHESIA_BENCH_ECHO_SIZE:-512}" \ "\${relay_url}" } @@ -1487,6 +1511,7 @@ run_event() { -c "\${PARRHESIA_BENCH_EVENT_COUNT:-100}" \ -r "\${PARRHESIA_BENCH_EVENT_RATE:-50}" \ -k "\${PARRHESIA_BENCH_KEEPALIVE_SECONDS:-5}" \ + -t "\${bench_threads}" \ "\${relay_url}" } @@ -1496,6 +1521,7 @@ run_req() { -c "\${PARRHESIA_BENCH_REQ_COUNT:-100}" \ -r "\${PARRHESIA_BENCH_REQ_RATE:-50}" \ -k "\${PARRHESIA_BENCH_KEEPALIVE_SECONDS:-5}" \ + -t "\${bench_threads}" \ --limit "\${PARRHESIA_BENCH_REQ_LIMIT:-10}" \ "\${relay_url}" } @@ -1730,16 +1756,146 @@ function countEventsWritten(clientResults) { return total; } +function splitCountAcrossClients(total, clients) { + if (clients <= 0 || total <= 0) return []; + const base = Math.floor(total / clients); + const remainder = total % clients; + return Array.from({ length: clients }, (_, i) => base + (i < remainder ? 1 : 0)); +} + +async function runClientSeedingRound({ + target, + phase, + round, + deficit, + clientInfos, + keyPath, + relayUrl, + artifactDir, + threads, +}) { + const benchThreads = Number.isInteger(threads) && threads >= 0 ? threads : 0; + const clientsForRound = clientInfos.slice(0, Math.min(clientInfos.length, deficit)); + const shares = splitCountAcrossClients(deficit, clientsForRound.length); + const roundDir = path.join(artifactDir, `round-${round}`); + fs.mkdirSync(roundDir, { recursive: true }); + + const seedResults = await Promise.all( + clientsForRound.map(async (client, idx) => { + const desiredEvents = shares[idx] || 0; + const stdoutPath = path.join(roundDir, `${client.name}.stdout.log`); + const stderrPath = path.join(roundDir, `${client.name}.stderr.log`); + + if (desiredEvents <= 0) { + fs.writeFileSync(stdoutPath, "", "utf8"); + fs.writeFileSync(stderrPath, "", "utf8"); + return { + client_name: client.name, + client_ip: client.ip, + status: "skipped", + desired_events: desiredEvents, + projected_events: 0, + acked: 0, + stdout_path: path.relative(ROOT_DIR, stdoutPath), + stderr_path: path.relative(ROOT_DIR, stderrPath), + }; + } + + const eventConnections = 1; + const eventKeepalive = Math.max(5, Math.ceil(desiredEvents / SEED_EVENT_RATE)); + const eventRate = Math.max(1, Math.ceil(desiredEvents / eventKeepalive)); + const projectedEvents = eventConnections * eventRate * eventKeepalive; + + const seedEnvPrefix = [ + `PARRHESIA_BENCH_EVENT_COUNT=${eventConnections}`, + `PARRHESIA_BENCH_EVENT_RATE=${eventRate}`, + `PARRHESIA_BENCH_KEEPALIVE_SECONDS=${eventKeepalive}`, + `PARRHESIA_BENCH_THREADS=${benchThreads}`, + ].join(" "); + + try { + const benchRes = await sshExec( + client.ip, + keyPath, + `${seedEnvPrefix} /root/cloud-bench-client.sh ${shellEscape(relayUrl)} event`, + ); + + fs.writeFileSync(stdoutPath, benchRes.stdout, "utf8"); + fs.writeFileSync(stderrPath, benchRes.stderr, "utf8"); + + const parsed = parseNostrBenchSections(benchRes.stdout); + const acked = Number(parsed?.event?.message_stats?.complete) || 0; + + return { + client_name: client.name, + client_ip: client.ip, + status: "ok", + desired_events: desiredEvents, + projected_events: projectedEvents, + event_connections: eventConnections, + event_rate: eventRate, + event_keepalive_seconds: eventKeepalive, + acked, + stdout_path: path.relative(ROOT_DIR, stdoutPath), + stderr_path: path.relative(ROOT_DIR, stderrPath), + }; + } catch (error) { + const out = error.stdout || ""; + const err = error.stderr || String(error); + fs.writeFileSync(stdoutPath, out, "utf8"); + fs.writeFileSync(stderrPath, err, "utf8"); + + return { + client_name: client.name, + client_ip: client.ip, + status: "error", + desired_events: desiredEvents, + projected_events: projectedEvents, + event_connections: eventConnections, + event_rate: eventRate, + event_keepalive_seconds: eventKeepalive, + acked: 0, + stdout_path: path.relative(ROOT_DIR, stdoutPath), + stderr_path: path.relative(ROOT_DIR, stderrPath), + error: error.message || String(error), + }; + } + }), + ); + + const failed = seedResults.filter((r) => r.status === "error"); + if (failed.length > 0) { + throw new Error( + `[fill] ${target}:${phase} round ${round} failed on clients: ${failed.map((f) => f.client_name).join(", ")}`, + ); + } + + const acked = seedResults.reduce((sum, r) => sum + (Number(r.acked) || 0), 0); + const desired = seedResults.reduce((sum, r) => sum + (Number(r.desired_events) || 0), 0); + const projected = seedResults.reduce((sum, r) => sum + (Number(r.projected_events) || 0), 0); + + return { + desired, + projected, + acked, + clients: seedResults, + }; +} + // Ensure the relay has approximately `targetCount` events. -// Seeds from the orchestrator via WebSocket, or wipes and reseeds if over target. +// Uses client-side nostr-bench event seeding in parallel and accepts <=1% drift. async function smartFill({ target, + phase, targetCount, eventsInDb, relayUrl, serverIp, keyPath, + clientInfos, serverEnvPrefix, + artifactDir, + threads, }) { if (targetCount <= 0) return { eventsInDb, seeded: 0, wiped: false }; @@ -1752,30 +1908,63 @@ async function smartFill({ wiped = true; } - const deficit = targetCount - eventsInDb; - if (deficit <= 0) { - console.log(`[fill] ${target}: already at ${eventsInDb} events (target ${targetCount}), skipping`); + const tolerance = Math.max(1, Math.floor(targetCount * SEED_TOLERANCE_RATIO)); + let deficit = targetCount - eventsInDb; + + if (deficit <= tolerance) { + console.log( + `[fill] ${target}: already within tolerance (${eventsInDb}/${targetCount}, tolerance=${tolerance}), skipping`, + ); return { eventsInDb, seeded: 0, wiped }; } - console.log(`[fill] ${target}: seeding ${deficit} events (${eventsInDb} → ${targetCount})`); - const result = await seedEvents({ - url: relayUrl, - count: deficit, - concurrency: 16, - onProgress: (n) => { - if (n % 10000 === 0) console.log(`[fill] ${target}: ${n}/${deficit} seeded`); - }, - }); - const elapsedSec = result.elapsed_ms / 1000; - const eventsPerSec = elapsedSec > 0 ? Math.round(result.acked / elapsedSec) : 0; console.log( - `[fill] ${target}: seeded ${result.acked}/${deficit} in ${elapsedSec.toFixed(1)}s (${eventsPerSec} events/s)` + - (result.errors > 0 ? ` (${result.errors} errors)` : ""), + `[fill] ${target}:${phase}: seeding to ~${targetCount} events from ${eventsInDb} (deficit=${deficit}, tolerance=${tolerance})`, ); - eventsInDb += result.acked; - return { eventsInDb, seeded: result.acked, wiped }; + let seededTotal = 0; + + for (let round = 1; round <= SEED_MAX_ROUNDS; round += 1) { + if (deficit <= tolerance) break; + + const roundStartMs = Date.now(); + const roundResult = await runClientSeedingRound({ + target, + phase, + round, + deficit, + clientInfos, + keyPath, + relayUrl, + artifactDir, + threads, + }); + + const elapsedSec = (Date.now() - roundStartMs) / 1000; + const eventsPerSec = elapsedSec > 0 ? Math.round(roundResult.acked / elapsedSec) : 0; + + eventsInDb += roundResult.acked; + seededTotal += roundResult.acked; + deficit = targetCount - eventsInDb; + + console.log( + `[fill] ${target}:${phase} round ${round}: acked ${roundResult.acked} (desired=${roundResult.desired}, projected=${roundResult.projected}) in ${elapsedSec.toFixed(1)}s (${eventsPerSec} events/s), now ~${eventsInDb}/${targetCount}`, + ); + + if (roundResult.acked <= 0) { + console.warn(`[fill] ${target}:${phase} round ${round}: no progress, stopping early`); + break; + } + } + + const remaining = Math.max(0, targetCount - eventsInDb); + if (remaining > tolerance) { + console.warn( + `[fill] ${target}:${phase}: remaining deficit ${remaining} exceeds tolerance ${tolerance} after ${SEED_MAX_ROUNDS} rounds`, + ); + } + + return { eventsInDb, seeded: seededTotal, wiped }; } // Run a single benchmark type across all clients in parallel. @@ -2256,6 +2445,8 @@ async function main() { "apt-get update -y >/dev/null", "apt-get install -y docker.io curl jq git python3 >/dev/null", "systemctl enable --now docker >/dev/null", + "sysctl -w net.ipv4.ip_local_port_range='10000 65535' >/dev/null || true", + "sysctl -w net.core.somaxconn=65535 >/dev/null || true", "docker --version", "python3 --version", "git --version", @@ -2268,6 +2459,8 @@ async function main() { const clientBootstrapCmd = [ "set -euo pipefail", "mkdir -p /usr/local/bin", + "sysctl -w net.ipv4.ip_local_port_range='10000 65535' >/dev/null || true", + "sysctl -w net.core.somaxconn=65535 >/dev/null || true", "bash --version", "uname -m", ].join("; "); @@ -2414,6 +2607,7 @@ async function main() { `PARRHESIA_BENCH_REQ_RATE=${opts.bench.reqRate}`, `PARRHESIA_BENCH_REQ_LIMIT=${opts.bench.reqLimit}`, `PARRHESIA_BENCH_KEEPALIVE_SECONDS=${opts.bench.keepaliveSeconds}`, + `PARRHESIA_BENCH_THREADS=${opts.bench.threads}`, ].join(" "); const benchArgs = { clientInfos, keyPath, benchEnvPrefix, relayUrl }; @@ -2471,12 +2665,16 @@ async function main() { // Fill to warm const fillWarm = await smartFill({ target, + phase: "warm", targetCount: opts.warmEvents, eventsInDb, relayUrl, serverIp, keyPath, + clientInfos, serverEnvPrefix, + artifactDir: path.join(runTargetDir, "fill-warm"), + threads: opts.bench.threads, }); eventsInDb = fillWarm.eventsInDb; @@ -2499,12 +2697,16 @@ async function main() { // Fill to hot const fillHot = await smartFill({ target, + phase: "hot", targetCount: opts.hotEvents, eventsInDb, relayUrl, serverIp, keyPath, + clientInfos, serverEnvPrefix, + artifactDir: path.join(runTargetDir, "fill-hot"), + threads: opts.bench.threads, }); eventsInDb = fillHot.eventsInDb; diff --git a/scripts/run_bench_cloud.sh b/scripts/run_bench_cloud.sh index df12d52..600e450 100755 --- a/scripts/run_bench_cloud.sh +++ b/scripts/run_bench_cloud.sh @@ -31,6 +31,7 @@ Flags: --nostream-repo URL Override nostream repo (default: Cameri/nostream) --nostream-ref REF Override nostream ref (default: main) --haven-image IMAGE Override Haven image + --threads N Override nostr-bench worker threads (0 = auto) --keep Keep cloud resources after run -h, --help @@ -59,6 +60,7 @@ Bench knobs (forwarded): PARRHESIA_BENCH_REQ_RATE PARRHESIA_BENCH_REQ_LIMIT PARRHESIA_BENCH_KEEPALIVE_SECONDS + PARRHESIA_BENCH_THREADS Examples: # Default full cloud run @@ -83,6 +85,7 @@ GIT_REF="${PARRHESIA_CLOUD_GIT_REF:-}" NOSTREAM_REPO="${PARRHESIA_CLOUD_NOSTREAM_REPO:-}" NOSTREAM_REF="${PARRHESIA_CLOUD_NOSTREAM_REF:-}" HAVEN_IMAGE="${PARRHESIA_CLOUD_HAVEN_IMAGE:-}" +THREADS="${PARRHESIA_BENCH_THREADS:-}" KEEP=0 QUICK=0 @@ -142,6 +145,10 @@ while [[ $# -gt 0 ]]; do HAVEN_IMAGE="$2" shift 2 ;; + --threads) + THREADS="$2" + shift 2 + ;; --keep) KEEP=1 shift @@ -207,6 +214,9 @@ fi if [[ -n "$HAVEN_IMAGE" ]]; then CMD+=(--haven-image "$HAVEN_IMAGE") fi +if [[ -n "$THREADS" ]]; then + CMD+=(--threads "$THREADS") +fi if [[ -n "$PARRHESIA_IMAGE" ]]; then CMD+=(--parrhesia-image "$PARRHESIA_IMAGE")