bench: Cloud tuning
Some checks failed
CI / Test (OTP 27.2 / Elixir 1.18.2) (push) Failing after 0s
CI / Test (OTP 28.4 / Elixir 1.19.4 + E2E) (push) Failing after 0s

This commit is contained in:
2026-03-19 23:14:46 +01:00
parent e02bd99a43
commit 57c2c0b822
2 changed files with 243 additions and 31 deletions

View File

@@ -6,7 +6,6 @@ import path from "node:path";
import { spawn } from "node:child_process"; import { spawn } from "node:child_process";
import readline from "node:readline"; import readline from "node:readline";
import { fileURLToPath } from "node:url"; import { fileURLToPath } from "node:url";
import { seedEvents } from "./nostr_seed.mjs";
const __filename = fileURLToPath(import.meta.url); const __filename = fileURLToPath(import.meta.url);
const __dirname = path.dirname(__filename); const __dirname = path.dirname(__filename);
@@ -18,6 +17,9 @@ const ESTIMATE_WINDOW_HOURS = ESTIMATE_WINDOW_MINUTES / 60;
const ESTIMATE_WINDOW_LABEL = `${ESTIMATE_WINDOW_MINUTES}m`; const ESTIMATE_WINDOW_LABEL = `${ESTIMATE_WINDOW_MINUTES}m`;
const BENCH_BUILD_DIR = path.join(ROOT_DIR, "_build", "bench"); const BENCH_BUILD_DIR = path.join(ROOT_DIR, "_build", "bench");
const NOSTREAM_REDIS_IMAGE = "redis:7.0.5-alpine3.16"; const NOSTREAM_REDIS_IMAGE = "redis:7.0.5-alpine3.16";
const SEED_TOLERANCE_RATIO = 0.01;
const SEED_MAX_ROUNDS = 4;
const SEED_EVENT_RATE = 5000;
const DEFAULTS = { const DEFAULTS = {
datacenter: "fsn1-dc14", datacenter: "fsn1-dc14",
@@ -39,20 +41,21 @@ const DEFAULTS = {
havenImage: "holgerhatgarkeinenode/haven-docker:latest", havenImage: "holgerhatgarkeinenode/haven-docker:latest",
keep: false, keep: false,
quick: false, quick: false,
warmEvents: 100000, warmEvents: 25000,
hotEvents: 1000000, hotEvents: 250000,
bench: { bench: {
connectCount: 1000, connectCount: 3000,
connectRate: 200, connectRate: 1500,
echoCount: 1000, echoCount: 3000,
echoRate: 200, echoRate: 1500,
echoSize: 512, echoSize: 512,
eventCount: 2000, eventCount: 5000,
eventRate: 300, eventRate: 2000,
reqCount: 1000, reqCount: 3000,
reqRate: 200, reqRate: 1500,
reqLimit: 50, reqLimit: 50,
keepaliveSeconds: 5, keepaliveSeconds: 10,
threads: 0,
}, },
}; };
@@ -98,6 +101,7 @@ Options:
--req-rate <n> (default: ${DEFAULTS.bench.reqRate}) --req-rate <n> (default: ${DEFAULTS.bench.reqRate})
--req-limit <n> (default: ${DEFAULTS.bench.reqLimit}) --req-limit <n> (default: ${DEFAULTS.bench.reqLimit})
--keepalive-seconds <n> (default: ${DEFAULTS.bench.keepaliveSeconds}) --keepalive-seconds <n> (default: ${DEFAULTS.bench.keepaliveSeconds})
--threads <n> nostr-bench worker threads (0 = auto, default: ${DEFAULTS.bench.threads})
Phased benchmark: Phased benchmark:
--warm-events <n> DB fill level for warm phase (default: ${DEFAULTS.warmEvents}) --warm-events <n> DB fill level for warm phase (default: ${DEFAULTS.warmEvents})
@@ -134,6 +138,14 @@ function parseArgs(argv) {
return n; return n;
}; };
const nonNegativeIntOpt = (name, value) => {
const n = Number(value);
if (!Number.isInteger(n) || n < 0) {
throw new Error(`${name} must be a non-negative integer, got: ${value}`);
}
return n;
};
for (let i = 0; i < argv.length; i += 1) { for (let i = 0; i < argv.length; i += 1) {
const arg = argv[i]; const arg = argv[i];
switch (arg) { switch (arg) {
@@ -223,6 +235,9 @@ function parseArgs(argv) {
case "--keepalive-seconds": case "--keepalive-seconds":
opts.bench.keepaliveSeconds = intOpt(arg, argv[++i]); opts.bench.keepaliveSeconds = intOpt(arg, argv[++i]);
break; break;
case "--threads":
opts.bench.threads = nonNegativeIntOpt(arg, argv[++i]);
break;
case "--history-file": case "--history-file":
opts.historyFile = argv[++i]; opts.historyFile = argv[++i];
break; break;
@@ -248,6 +263,9 @@ function parseArgs(argv) {
if (process.env.PARRHESIA_BENCH_WARM_EVENTS) opts.warmEvents = Number(process.env.PARRHESIA_BENCH_WARM_EVENTS); if (process.env.PARRHESIA_BENCH_WARM_EVENTS) opts.warmEvents = Number(process.env.PARRHESIA_BENCH_WARM_EVENTS);
if (process.env.PARRHESIA_BENCH_HOT_EVENTS) opts.hotEvents = Number(process.env.PARRHESIA_BENCH_HOT_EVENTS); if (process.env.PARRHESIA_BENCH_HOT_EVENTS) opts.hotEvents = Number(process.env.PARRHESIA_BENCH_HOT_EVENTS);
if (process.env.PARRHESIA_BENCH_THREADS) {
opts.bench.threads = nonNegativeIntOpt("PARRHESIA_BENCH_THREADS", process.env.PARRHESIA_BENCH_THREADS);
}
if (process.env.PARRHESIA_BENCH_QUICK === "1") opts.quick = true; if (process.env.PARRHESIA_BENCH_QUICK === "1") opts.quick = true;
if (!opts.targets.length) { if (!opts.targets.length) {
@@ -1461,6 +1479,10 @@ if [[ -z "\$relay_url" ]]; then
fi fi
bench_bin="\${NOSTR_BENCH_BIN:-/usr/local/bin/nostr-bench}" bench_bin="\${NOSTR_BENCH_BIN:-/usr/local/bin/nostr-bench}"
bench_threads="\${PARRHESIA_BENCH_THREADS:-0}"
client_nofile="\${PARRHESIA_BENCH_CLIENT_NOFILE:-262144}"
ulimit -n "\${client_nofile}" >/dev/null 2>&1 || true
run_connect() { run_connect() {
echo "==> nostr-bench connect \${relay_url}" echo "==> nostr-bench connect \${relay_url}"
@@ -1468,6 +1490,7 @@ run_connect() {
-c "\${PARRHESIA_BENCH_CONNECT_COUNT:-200}" \ -c "\${PARRHESIA_BENCH_CONNECT_COUNT:-200}" \
-r "\${PARRHESIA_BENCH_CONNECT_RATE:-100}" \ -r "\${PARRHESIA_BENCH_CONNECT_RATE:-100}" \
-k "\${PARRHESIA_BENCH_KEEPALIVE_SECONDS:-5}" \ -k "\${PARRHESIA_BENCH_KEEPALIVE_SECONDS:-5}" \
-t "\${bench_threads}" \
"\${relay_url}" "\${relay_url}"
} }
@@ -1477,6 +1500,7 @@ run_echo() {
-c "\${PARRHESIA_BENCH_ECHO_COUNT:-100}" \ -c "\${PARRHESIA_BENCH_ECHO_COUNT:-100}" \
-r "\${PARRHESIA_BENCH_ECHO_RATE:-50}" \ -r "\${PARRHESIA_BENCH_ECHO_RATE:-50}" \
-k "\${PARRHESIA_BENCH_KEEPALIVE_SECONDS:-5}" \ -k "\${PARRHESIA_BENCH_KEEPALIVE_SECONDS:-5}" \
-t "\${bench_threads}" \
--size "\${PARRHESIA_BENCH_ECHO_SIZE:-512}" \ --size "\${PARRHESIA_BENCH_ECHO_SIZE:-512}" \
"\${relay_url}" "\${relay_url}"
} }
@@ -1487,6 +1511,7 @@ run_event() {
-c "\${PARRHESIA_BENCH_EVENT_COUNT:-100}" \ -c "\${PARRHESIA_BENCH_EVENT_COUNT:-100}" \
-r "\${PARRHESIA_BENCH_EVENT_RATE:-50}" \ -r "\${PARRHESIA_BENCH_EVENT_RATE:-50}" \
-k "\${PARRHESIA_BENCH_KEEPALIVE_SECONDS:-5}" \ -k "\${PARRHESIA_BENCH_KEEPALIVE_SECONDS:-5}" \
-t "\${bench_threads}" \
"\${relay_url}" "\${relay_url}"
} }
@@ -1496,6 +1521,7 @@ run_req() {
-c "\${PARRHESIA_BENCH_REQ_COUNT:-100}" \ -c "\${PARRHESIA_BENCH_REQ_COUNT:-100}" \
-r "\${PARRHESIA_BENCH_REQ_RATE:-50}" \ -r "\${PARRHESIA_BENCH_REQ_RATE:-50}" \
-k "\${PARRHESIA_BENCH_KEEPALIVE_SECONDS:-5}" \ -k "\${PARRHESIA_BENCH_KEEPALIVE_SECONDS:-5}" \
-t "\${bench_threads}" \
--limit "\${PARRHESIA_BENCH_REQ_LIMIT:-10}" \ --limit "\${PARRHESIA_BENCH_REQ_LIMIT:-10}" \
"\${relay_url}" "\${relay_url}"
} }
@@ -1730,16 +1756,146 @@ function countEventsWritten(clientResults) {
return total; return total;
} }
function splitCountAcrossClients(total, clients) {
if (clients <= 0 || total <= 0) return [];
const base = Math.floor(total / clients);
const remainder = total % clients;
return Array.from({ length: clients }, (_, i) => base + (i < remainder ? 1 : 0));
}
async function runClientSeedingRound({
target,
phase,
round,
deficit,
clientInfos,
keyPath,
relayUrl,
artifactDir,
threads,
}) {
const benchThreads = Number.isInteger(threads) && threads >= 0 ? threads : 0;
const clientsForRound = clientInfos.slice(0, Math.min(clientInfos.length, deficit));
const shares = splitCountAcrossClients(deficit, clientsForRound.length);
const roundDir = path.join(artifactDir, `round-${round}`);
fs.mkdirSync(roundDir, { recursive: true });
const seedResults = await Promise.all(
clientsForRound.map(async (client, idx) => {
const desiredEvents = shares[idx] || 0;
const stdoutPath = path.join(roundDir, `${client.name}.stdout.log`);
const stderrPath = path.join(roundDir, `${client.name}.stderr.log`);
if (desiredEvents <= 0) {
fs.writeFileSync(stdoutPath, "", "utf8");
fs.writeFileSync(stderrPath, "", "utf8");
return {
client_name: client.name,
client_ip: client.ip,
status: "skipped",
desired_events: desiredEvents,
projected_events: 0,
acked: 0,
stdout_path: path.relative(ROOT_DIR, stdoutPath),
stderr_path: path.relative(ROOT_DIR, stderrPath),
};
}
const eventConnections = 1;
const eventKeepalive = Math.max(5, Math.ceil(desiredEvents / SEED_EVENT_RATE));
const eventRate = Math.max(1, Math.ceil(desiredEvents / eventKeepalive));
const projectedEvents = eventConnections * eventRate * eventKeepalive;
const seedEnvPrefix = [
`PARRHESIA_BENCH_EVENT_COUNT=${eventConnections}`,
`PARRHESIA_BENCH_EVENT_RATE=${eventRate}`,
`PARRHESIA_BENCH_KEEPALIVE_SECONDS=${eventKeepalive}`,
`PARRHESIA_BENCH_THREADS=${benchThreads}`,
].join(" ");
try {
const benchRes = await sshExec(
client.ip,
keyPath,
`${seedEnvPrefix} /root/cloud-bench-client.sh ${shellEscape(relayUrl)} event`,
);
fs.writeFileSync(stdoutPath, benchRes.stdout, "utf8");
fs.writeFileSync(stderrPath, benchRes.stderr, "utf8");
const parsed = parseNostrBenchSections(benchRes.stdout);
const acked = Number(parsed?.event?.message_stats?.complete) || 0;
return {
client_name: client.name,
client_ip: client.ip,
status: "ok",
desired_events: desiredEvents,
projected_events: projectedEvents,
event_connections: eventConnections,
event_rate: eventRate,
event_keepalive_seconds: eventKeepalive,
acked,
stdout_path: path.relative(ROOT_DIR, stdoutPath),
stderr_path: path.relative(ROOT_DIR, stderrPath),
};
} catch (error) {
const out = error.stdout || "";
const err = error.stderr || String(error);
fs.writeFileSync(stdoutPath, out, "utf8");
fs.writeFileSync(stderrPath, err, "utf8");
return {
client_name: client.name,
client_ip: client.ip,
status: "error",
desired_events: desiredEvents,
projected_events: projectedEvents,
event_connections: eventConnections,
event_rate: eventRate,
event_keepalive_seconds: eventKeepalive,
acked: 0,
stdout_path: path.relative(ROOT_DIR, stdoutPath),
stderr_path: path.relative(ROOT_DIR, stderrPath),
error: error.message || String(error),
};
}
}),
);
const failed = seedResults.filter((r) => r.status === "error");
if (failed.length > 0) {
throw new Error(
`[fill] ${target}:${phase} round ${round} failed on clients: ${failed.map((f) => f.client_name).join(", ")}`,
);
}
const acked = seedResults.reduce((sum, r) => sum + (Number(r.acked) || 0), 0);
const desired = seedResults.reduce((sum, r) => sum + (Number(r.desired_events) || 0), 0);
const projected = seedResults.reduce((sum, r) => sum + (Number(r.projected_events) || 0), 0);
return {
desired,
projected,
acked,
clients: seedResults,
};
}
// Ensure the relay has approximately `targetCount` events. // Ensure the relay has approximately `targetCount` events.
// Seeds from the orchestrator via WebSocket, or wipes and reseeds if over target. // Uses client-side nostr-bench event seeding in parallel and accepts <=1% drift.
async function smartFill({ async function smartFill({
target, target,
phase,
targetCount, targetCount,
eventsInDb, eventsInDb,
relayUrl, relayUrl,
serverIp, serverIp,
keyPath, keyPath,
clientInfos,
serverEnvPrefix, serverEnvPrefix,
artifactDir,
threads,
}) { }) {
if (targetCount <= 0) return { eventsInDb, seeded: 0, wiped: false }; if (targetCount <= 0) return { eventsInDb, seeded: 0, wiped: false };
@@ -1752,30 +1908,63 @@ async function smartFill({
wiped = true; wiped = true;
} }
const deficit = targetCount - eventsInDb; const tolerance = Math.max(1, Math.floor(targetCount * SEED_TOLERANCE_RATIO));
if (deficit <= 0) { let deficit = targetCount - eventsInDb;
console.log(`[fill] ${target}: already at ${eventsInDb} events (target ${targetCount}), skipping`);
if (deficit <= tolerance) {
console.log(
`[fill] ${target}: already within tolerance (${eventsInDb}/${targetCount}, tolerance=${tolerance}), skipping`,
);
return { eventsInDb, seeded: 0, wiped }; return { eventsInDb, seeded: 0, wiped };
} }
console.log(`[fill] ${target}: seeding ${deficit} events (${eventsInDb}${targetCount})`);
const result = await seedEvents({
url: relayUrl,
count: deficit,
concurrency: 16,
onProgress: (n) => {
if (n % 10000 === 0) console.log(`[fill] ${target}: ${n}/${deficit} seeded`);
},
});
const elapsedSec = result.elapsed_ms / 1000;
const eventsPerSec = elapsedSec > 0 ? Math.round(result.acked / elapsedSec) : 0;
console.log( console.log(
`[fill] ${target}: seeded ${result.acked}/${deficit} in ${elapsedSec.toFixed(1)}s (${eventsPerSec} events/s)` + `[fill] ${target}:${phase}: seeding to ~${targetCount} events from ${eventsInDb} (deficit=${deficit}, tolerance=${tolerance})`,
(result.errors > 0 ? ` (${result.errors} errors)` : ""),
); );
eventsInDb += result.acked;
return { eventsInDb, seeded: result.acked, wiped }; let seededTotal = 0;
for (let round = 1; round <= SEED_MAX_ROUNDS; round += 1) {
if (deficit <= tolerance) break;
const roundStartMs = Date.now();
const roundResult = await runClientSeedingRound({
target,
phase,
round,
deficit,
clientInfos,
keyPath,
relayUrl,
artifactDir,
threads,
});
const elapsedSec = (Date.now() - roundStartMs) / 1000;
const eventsPerSec = elapsedSec > 0 ? Math.round(roundResult.acked / elapsedSec) : 0;
eventsInDb += roundResult.acked;
seededTotal += roundResult.acked;
deficit = targetCount - eventsInDb;
console.log(
`[fill] ${target}:${phase} round ${round}: acked ${roundResult.acked} (desired=${roundResult.desired}, projected=${roundResult.projected}) in ${elapsedSec.toFixed(1)}s (${eventsPerSec} events/s), now ~${eventsInDb}/${targetCount}`,
);
if (roundResult.acked <= 0) {
console.warn(`[fill] ${target}:${phase} round ${round}: no progress, stopping early`);
break;
}
}
const remaining = Math.max(0, targetCount - eventsInDb);
if (remaining > tolerance) {
console.warn(
`[fill] ${target}:${phase}: remaining deficit ${remaining} exceeds tolerance ${tolerance} after ${SEED_MAX_ROUNDS} rounds`,
);
}
return { eventsInDb, seeded: seededTotal, wiped };
} }
// Run a single benchmark type across all clients in parallel. // Run a single benchmark type across all clients in parallel.
@@ -2256,6 +2445,8 @@ async function main() {
"apt-get update -y >/dev/null", "apt-get update -y >/dev/null",
"apt-get install -y docker.io curl jq git python3 >/dev/null", "apt-get install -y docker.io curl jq git python3 >/dev/null",
"systemctl enable --now docker >/dev/null", "systemctl enable --now docker >/dev/null",
"sysctl -w net.ipv4.ip_local_port_range='10000 65535' >/dev/null || true",
"sysctl -w net.core.somaxconn=65535 >/dev/null || true",
"docker --version", "docker --version",
"python3 --version", "python3 --version",
"git --version", "git --version",
@@ -2268,6 +2459,8 @@ async function main() {
const clientBootstrapCmd = [ const clientBootstrapCmd = [
"set -euo pipefail", "set -euo pipefail",
"mkdir -p /usr/local/bin", "mkdir -p /usr/local/bin",
"sysctl -w net.ipv4.ip_local_port_range='10000 65535' >/dev/null || true",
"sysctl -w net.core.somaxconn=65535 >/dev/null || true",
"bash --version", "bash --version",
"uname -m", "uname -m",
].join("; "); ].join("; ");
@@ -2414,6 +2607,7 @@ async function main() {
`PARRHESIA_BENCH_REQ_RATE=${opts.bench.reqRate}`, `PARRHESIA_BENCH_REQ_RATE=${opts.bench.reqRate}`,
`PARRHESIA_BENCH_REQ_LIMIT=${opts.bench.reqLimit}`, `PARRHESIA_BENCH_REQ_LIMIT=${opts.bench.reqLimit}`,
`PARRHESIA_BENCH_KEEPALIVE_SECONDS=${opts.bench.keepaliveSeconds}`, `PARRHESIA_BENCH_KEEPALIVE_SECONDS=${opts.bench.keepaliveSeconds}`,
`PARRHESIA_BENCH_THREADS=${opts.bench.threads}`,
].join(" "); ].join(" ");
const benchArgs = { clientInfos, keyPath, benchEnvPrefix, relayUrl }; const benchArgs = { clientInfos, keyPath, benchEnvPrefix, relayUrl };
@@ -2471,12 +2665,16 @@ async function main() {
// Fill to warm // Fill to warm
const fillWarm = await smartFill({ const fillWarm = await smartFill({
target, target,
phase: "warm",
targetCount: opts.warmEvents, targetCount: opts.warmEvents,
eventsInDb, eventsInDb,
relayUrl, relayUrl,
serverIp, serverIp,
keyPath, keyPath,
clientInfos,
serverEnvPrefix, serverEnvPrefix,
artifactDir: path.join(runTargetDir, "fill-warm"),
threads: opts.bench.threads,
}); });
eventsInDb = fillWarm.eventsInDb; eventsInDb = fillWarm.eventsInDb;
@@ -2499,12 +2697,16 @@ async function main() {
// Fill to hot // Fill to hot
const fillHot = await smartFill({ const fillHot = await smartFill({
target, target,
phase: "hot",
targetCount: opts.hotEvents, targetCount: opts.hotEvents,
eventsInDb, eventsInDb,
relayUrl, relayUrl,
serverIp, serverIp,
keyPath, keyPath,
clientInfos,
serverEnvPrefix, serverEnvPrefix,
artifactDir: path.join(runTargetDir, "fill-hot"),
threads: opts.bench.threads,
}); });
eventsInDb = fillHot.eventsInDb; eventsInDb = fillHot.eventsInDb;

View File

@@ -31,6 +31,7 @@ Flags:
--nostream-repo URL Override nostream repo (default: Cameri/nostream) --nostream-repo URL Override nostream repo (default: Cameri/nostream)
--nostream-ref REF Override nostream ref (default: main) --nostream-ref REF Override nostream ref (default: main)
--haven-image IMAGE Override Haven image --haven-image IMAGE Override Haven image
--threads N Override nostr-bench worker threads (0 = auto)
--keep Keep cloud resources after run --keep Keep cloud resources after run
-h, --help -h, --help
@@ -59,6 +60,7 @@ Bench knobs (forwarded):
PARRHESIA_BENCH_REQ_RATE PARRHESIA_BENCH_REQ_RATE
PARRHESIA_BENCH_REQ_LIMIT PARRHESIA_BENCH_REQ_LIMIT
PARRHESIA_BENCH_KEEPALIVE_SECONDS PARRHESIA_BENCH_KEEPALIVE_SECONDS
PARRHESIA_BENCH_THREADS
Examples: Examples:
# Default full cloud run # Default full cloud run
@@ -83,6 +85,7 @@ GIT_REF="${PARRHESIA_CLOUD_GIT_REF:-}"
NOSTREAM_REPO="${PARRHESIA_CLOUD_NOSTREAM_REPO:-}" NOSTREAM_REPO="${PARRHESIA_CLOUD_NOSTREAM_REPO:-}"
NOSTREAM_REF="${PARRHESIA_CLOUD_NOSTREAM_REF:-}" NOSTREAM_REF="${PARRHESIA_CLOUD_NOSTREAM_REF:-}"
HAVEN_IMAGE="${PARRHESIA_CLOUD_HAVEN_IMAGE:-}" HAVEN_IMAGE="${PARRHESIA_CLOUD_HAVEN_IMAGE:-}"
THREADS="${PARRHESIA_BENCH_THREADS:-}"
KEEP=0 KEEP=0
QUICK=0 QUICK=0
@@ -142,6 +145,10 @@ while [[ $# -gt 0 ]]; do
HAVEN_IMAGE="$2" HAVEN_IMAGE="$2"
shift 2 shift 2
;; ;;
--threads)
THREADS="$2"
shift 2
;;
--keep) --keep)
KEEP=1 KEEP=1
shift shift
@@ -207,6 +214,9 @@ fi
if [[ -n "$HAVEN_IMAGE" ]]; then if [[ -n "$HAVEN_IMAGE" ]]; then
CMD+=(--haven-image "$HAVEN_IMAGE") CMD+=(--haven-image "$HAVEN_IMAGE")
fi fi
if [[ -n "$THREADS" ]]; then
CMD+=(--threads "$THREADS")
fi
if [[ -n "$PARRHESIA_IMAGE" ]]; then if [[ -n "$PARRHESIA_IMAGE" ]]; then
CMD+=(--parrhesia-image "$PARRHESIA_IMAGE") CMD+=(--parrhesia-image "$PARRHESIA_IMAGE")