bench: Cloud seeding
Some checks failed
CI / Test (OTP 27.2 / Elixir 1.18.2) (push) Failing after 0s
CI / Test (OTP 28.4 / Elixir 1.19.4 + E2E) (push) Failing after 0s

This commit is contained in:
2026-03-20 14:19:58 +01:00
parent 6bd0143de4
commit 070464f2eb
3 changed files with 216 additions and 38 deletions

View File

@@ -23,15 +23,25 @@ const __filename = fileURLToPath(import.meta.url);
const __dirname = path.dirname(__filename);
const ROOT_DIR = path.resolve(__dirname, "..");
const DEFAULT_TARGETS = ["parrhesia-pg", "parrhesia-memory", "strfry", "nostr-rs-relay", "nostream", "haven"];
const DEFAULT_TARGETS = [
"parrhesia-pg",
"parrhesia-memory",
"strfry",
"nostr-rs-relay",
"nostream",
// "haven", // disabled by default: Haven rejects generic nostr-bench event seeding (auth/whitelist/WoT policies)
];
const ESTIMATE_WINDOW_MINUTES = 30;
const ESTIMATE_WINDOW_HOURS = ESTIMATE_WINDOW_MINUTES / 60;
const ESTIMATE_WINDOW_LABEL = `${ESTIMATE_WINDOW_MINUTES}m`;
const BENCH_BUILD_DIR = path.join(ROOT_DIR, "_build", "bench");
const NOSTREAM_REDIS_IMAGE = "redis:7.0.5-alpine3.16";
const SEED_TOLERANCE_RATIO = 0.01;
const SEED_MAX_ROUNDS = 4;
const SEED_EVENT_RATE = 5000;
const SEED_MAX_ROUNDS = 8;
const SEED_ROUND_DEFICIT_RATIO = 0.2;
const SEED_KEEPALIVE_SECONDS = 10;
const SEED_EVENTS_PER_CONNECTION_FALLBACK = 3;
const PHASE_PREP_OFFSET_MINUTES = 3;
const DEFAULTS = {
datacenter: "fsn1-dc14",
@@ -454,6 +464,34 @@ function formatEuro(value) {
return `${value.toFixed(4)}`;
}
function createPhaseLogger(prepOffsetMinutes = PHASE_PREP_OFFSET_MINUTES) {
let phaseZeroMs = null;
const prefix = () => {
if (phaseZeroMs === null) {
return "T+0m";
}
const elapsedMinutes = Math.floor((Date.now() - phaseZeroMs) / 60000);
const sign = elapsedMinutes >= 0 ? "+" : "";
return `T${sign}${elapsedMinutes}m`;
};
return {
setPrepOffsetNow() {
phaseZeroMs = Date.now() + prepOffsetMinutes * 60_000;
},
setZeroNow() {
phaseZeroMs = Date.now();
},
logPhase(message) {
console.log(`${prefix()} ${message}`);
},
};
}
function compatibleDatacenterChoices(datacenters, serverType, clientType, clientCount) {
const compatible = [];
@@ -883,6 +921,8 @@ async function runClientSeedingRound({
relayUrl,
artifactDir,
threads,
seedEventsPerConnection,
seedKeepaliveSeconds,
}) {
const benchThreads = Number.isInteger(threads) && threads >= 0 ? threads : 0;
const clientsForRound = clientInfos.slice(0, Math.min(clientInfos.length, deficit));
@@ -911,10 +951,14 @@ async function runClientSeedingRound({
};
}
const eventConnections = 1;
const eventKeepalive = Math.max(5, Math.ceil(desiredEvents / SEED_EVENT_RATE));
const eventRate = Math.max(1, Math.ceil(desiredEvents / eventKeepalive));
const projectedEvents = eventConnections * eventRate * eventKeepalive;
const eventsPerConnection = Math.max(
0.1,
Number(seedEventsPerConnection) || SEED_EVENTS_PER_CONNECTION_FALLBACK,
);
const eventConnections = Math.max(1, Math.ceil(desiredEvents / eventsPerConnection));
const eventKeepalive = Math.max(2, Number(seedKeepaliveSeconds) || SEED_KEEPALIVE_SECONDS);
const eventRate = Math.max(1, Math.ceil(eventConnections / eventKeepalive));
const projectedEvents = Math.round(eventConnections * eventsPerConnection);
const seedEnvPrefix = [
`PARRHESIA_BENCH_EVENT_COUNT=${eventConnections}`,
@@ -934,7 +978,9 @@ async function runClientSeedingRound({
fs.writeFileSync(stderrPath, benchRes.stderr, "utf8");
const parsed = parseNostrBenchSections(benchRes.stdout);
const acked = Number(parsed?.event?.message_stats?.complete) || 0;
const complete = Number(parsed?.event?.message_stats?.complete) || 0;
const error = Number(parsed?.event?.message_stats?.error) || 0;
const acked = Math.max(0, complete - error);
return {
client_name: client.name,
@@ -945,6 +991,9 @@ async function runClientSeedingRound({
event_connections: eventConnections,
event_rate: eventRate,
event_keepalive_seconds: eventKeepalive,
events_per_connection_estimate: eventsPerConnection,
event_complete: complete,
event_error: error,
acked,
stdout_path: path.relative(ROOT_DIR, stdoutPath),
stderr_path: path.relative(ROOT_DIR, stderrPath),
@@ -964,6 +1013,7 @@ async function runClientSeedingRound({
event_connections: eventConnections,
event_rate: eventRate,
event_keepalive_seconds: eventKeepalive,
events_per_connection_estimate: eventsPerConnection,
acked: 0,
stdout_path: path.relative(ROOT_DIR, stdoutPath),
stderr_path: path.relative(ROOT_DIR, stderrPath),
@@ -992,6 +1042,35 @@ async function runClientSeedingRound({
};
}
async function fetchServerEventCount({ target, serverIp, keyPath, serverEnvPrefix }) {
const countableTargets = new Set(["parrhesia-pg", "nostream"]);
if (!countableTargets.has(target)) return null;
const countCmd = `count-data-${target}`;
try {
const res = await sshExec(
serverIp,
keyPath,
`${serverEnvPrefix} /root/cloud-bench-server.sh ${shellEscape(countCmd)}`,
);
const lines = res.stdout
.split(/\r?\n/)
.map((line) => line.trim())
.filter(Boolean);
const numericLine = [...lines].reverse().find((line) => /^\d+$/.test(line));
if (!numericLine) return null;
const count = Number(numericLine);
return Number.isInteger(count) && count >= 0 ? count : null;
} catch (error) {
console.warn(`[fill] ${target}: failed to fetch server event count (${error.message || error})`);
return null;
}
}
// Ensure the relay has approximately `targetCount` events.
// Uses client-side nostr-bench event seeding in parallel and accepts <=1% drift.
async function smartFill({
@@ -1006,8 +1085,17 @@ async function smartFill({
serverEnvPrefix,
artifactDir,
threads,
seedEventsPerConnection,
seedKeepaliveSeconds,
skipFill,
fetchEventCount,
}) {
if (targetCount <= 0) return { eventsInDb, seeded: 0, wiped: false };
if (targetCount <= 0) return { eventsInDb, seeded: 0, wiped: false, skipped: false };
if (skipFill) {
console.log(`[fill] ${target}:${phase}: skipped (target does not support generic event seeding)`);
return { eventsInDb, seeded: 0, wiped: false, skipped: true };
}
let wiped = false;
if (eventsInDb > targetCount) {
@@ -1018,6 +1106,13 @@ async function smartFill({
wiped = true;
}
if (typeof fetchEventCount === "function") {
const authoritative = await fetchEventCount();
if (Number.isInteger(authoritative) && authoritative >= 0) {
eventsInDb = authoritative;
}
}
const tolerance = Math.max(1, Math.floor(targetCount * SEED_TOLERANCE_RATIO));
let deficit = targetCount - eventsInDb;
@@ -1025,43 +1120,69 @@ async function smartFill({
console.log(
`[fill] ${target}: already within tolerance (${eventsInDb}/${targetCount}, tolerance=${tolerance}), skipping`,
);
return { eventsInDb, seeded: 0, wiped };
return { eventsInDb, seeded: 0, wiped, skipped: false };
}
const perConnectionEstimate = Math.max(
0.1,
Number(seedEventsPerConnection) || SEED_EVENTS_PER_CONNECTION_FALLBACK,
);
const keepaliveSeconds = Math.max(2, Number(seedKeepaliveSeconds) || SEED_KEEPALIVE_SECONDS);
console.log(
`[fill] ${target}:${phase}: seeding to ~${targetCount} events from ${eventsInDb} (deficit=${deficit}, tolerance=${tolerance})`,
`[fill] ${target}:${phase}: seeding to ~${targetCount} events from ${eventsInDb} (deficit=${deficit}, tolerance=${tolerance}, events_per_connection≈${perConnectionEstimate.toFixed(2)}, keepalive=${keepaliveSeconds}s)`,
);
let seededTotal = 0;
let roundsExecuted = 0;
for (let round = 1; round <= SEED_MAX_ROUNDS; round += 1) {
if (deficit <= tolerance) break;
roundsExecuted = round;
const roundDeficit = Math.max(1, Math.ceil(deficit * SEED_ROUND_DEFICIT_RATIO));
const roundStartMs = Date.now();
const eventsBeforeRound = eventsInDb;
const roundResult = await runClientSeedingRound({
target,
phase,
round,
deficit,
deficit: roundDeficit,
clientInfos,
keyPath,
relayUrl,
artifactDir,
threads,
seedEventsPerConnection: perConnectionEstimate,
seedKeepaliveSeconds: keepaliveSeconds,
});
const elapsedSec = (Date.now() - roundStartMs) / 1000;
const eventsPerSec = elapsedSec > 0 ? Math.round(roundResult.acked / elapsedSec) : 0;
let observedAdded = roundResult.acked;
eventsInDb += roundResult.acked;
seededTotal += roundResult.acked;
if (typeof fetchEventCount === "function") {
const authoritative = await fetchEventCount();
if (Number.isInteger(authoritative) && authoritative >= 0) {
eventsInDb = authoritative;
observedAdded = Math.max(0, eventsInDb - eventsBeforeRound);
} else {
eventsInDb += roundResult.acked;
}
} else {
eventsInDb += roundResult.acked;
}
const elapsedSec = (Date.now() - roundStartMs) / 1000;
const eventsPerSec = elapsedSec > 0 ? Math.round(observedAdded / elapsedSec) : 0;
seededTotal += observedAdded;
deficit = targetCount - eventsInDb;
console.log(
`[fill] ${target}:${phase} round ${round}: acked ${roundResult.acked} (desired=${roundResult.desired}, projected=${roundResult.projected}) in ${elapsedSec.toFixed(1)}s (${eventsPerSec} events/s), now ~${eventsInDb}/${targetCount}`,
`[fill] ${target}:${phase} round ${round}: observed ${observedAdded} (acked=${roundResult.acked}, desired=${roundResult.desired}, projected=${roundResult.projected}) in ${elapsedSec.toFixed(1)}s (${eventsPerSec} events/s), now ~${eventsInDb}/${targetCount}`,
);
if (roundResult.acked <= 0) {
if (observedAdded <= 0) {
console.warn(`[fill] ${target}:${phase} round ${round}: no progress, stopping early`);
break;
}
@@ -1070,11 +1191,11 @@ async function smartFill({
const remaining = Math.max(0, targetCount - eventsInDb);
if (remaining > tolerance) {
console.warn(
`[fill] ${target}:${phase}: remaining deficit ${remaining} exceeds tolerance ${tolerance} after ${SEED_MAX_ROUNDS} rounds`,
`[fill] ${target}:${phase}: remaining deficit ${remaining} exceeds tolerance ${tolerance} after ${roundsExecuted} rounds`,
);
}
return { eventsInDb, seeded: seededTotal, wiped };
return { eventsInDb, seeded: seededTotal, wiped, skipped: false };
}
// Run a single benchmark type across all clients in parallel.
@@ -1361,8 +1482,11 @@ async function main() {
const opts = parseArgs(process.argv.slice(2));
await ensureLocalPrereqs(opts);
const phaseLogger = createPhaseLogger();
const datacenterChoice = await chooseDatacenter(opts);
opts.datacenter = datacenterChoice.name;
phaseLogger.setPrepOffsetNow();
console.log(
`[plan] selected datacenter=${opts.datacenter} (${ESTIMATE_WINDOW_LABEL} est gross=${formatEuro(datacenterChoice.estimatedTotal.gross)} net=${formatEuro(datacenterChoice.estimatedTotal.net)})`,
);
@@ -1394,7 +1518,7 @@ async function main() {
fs.mkdirSync(path.dirname(historyFile), { recursive: true });
console.log(`[run] ${runId}`);
console.log("[phase] local preparation");
phaseLogger.logPhase("[phase] local preparation");
const nostrBench = await buildNostrBenchBinary(tmpDir);
const needsParrhesia = opts.targets.includes("parrhesia-pg") || opts.targets.includes("parrhesia-memory");
@@ -1475,7 +1599,7 @@ async function main() {
});
try {
console.log("[phase] create ssh credentials");
phaseLogger.logPhase("[phase] create ssh credentials");
await runCommand("ssh-keygen", ["-t", "ed25519", "-N", "", "-f", keyPath, "-C", keyName], {
stdio: "inherit",
});
@@ -1485,7 +1609,7 @@ async function main() {
});
sshKeyCreated = true;
console.log("[phase] create cloud servers in parallel");
phaseLogger.logPhase("[phase] create cloud servers in parallel");
const serverName = `${runId}-server`;
const clientNames = Array.from({ length: opts.clients }, (_, i) => `${runId}-client-${i + 1}`);
@@ -1555,7 +1679,9 @@ async function main() {
ip: c.server.public_net.ipv4.ip,
}));
console.log("[phase] wait for SSH");
// Reset phase clock to T+0 when cloud servers are successfully created.
phaseLogger.setZeroNow();
phaseLogger.logPhase("[phase] wait for SSH");
await Promise.all([
waitForSsh(serverIp, keyPath),
...clientInfos.map((client) => waitForSsh(client.ip, keyPath)),
@@ -1597,7 +1723,7 @@ async function main() {
}
console.log(`[firewall] ${firewallName} applied (sources: ${sourceIps.join(", ")})`);
console.log("[phase] install runtime dependencies on server node");
phaseLogger.logPhase("[phase] install runtime dependencies on server node");
const serverInstallCmd = [
"set -euo pipefail",
"export DEBIAN_FRONTEND=noninteractive",
@@ -1614,19 +1740,17 @@ async function main() {
await sshExec(serverIp, keyPath, serverInstallCmd, { stdio: "inherit" });
console.log("[phase] minimal client setup (no apt install)");
phaseLogger.logPhase("[phase] minimal client setup (no apt install)");
const clientBootstrapCmd = [
"set -euo pipefail",
"mkdir -p /usr/local/bin",
"sysctl -w net.ipv4.ip_local_port_range='10000 65535' >/dev/null || true",
"sysctl -w net.core.somaxconn=65535 >/dev/null || true",
"bash --version",
"uname -m",
].join("; ");
await Promise.all(clientInfos.map((client) => sshExec(client.ip, keyPath, clientBootstrapCmd, { stdio: "inherit" })));
console.log("[phase] upload control scripts + nostr-bench binary");
phaseLogger.logPhase("[phase] upload control scripts + nostr-bench binary");
await scpToHost(serverIp, keyPath, localServerScriptPath, "/root/cloud-bench-server.sh");
await sshExec(serverIp, keyPath, "chmod +x /root/cloud-bench-server.sh");
@@ -1637,7 +1761,7 @@ async function main() {
await sshExec(client.ip, keyPath, "chmod +x /root/cloud-bench-client.sh /usr/local/bin/nostr-bench");
}
console.log("[phase] server image setup");
phaseLogger.logPhase("[phase] server image setup");
let parrhesiaImageOnServer = parrhesiaSource.image;
@@ -1726,7 +1850,7 @@ async function main() {
const results = [];
const targetOrderPerRun = [];
console.log(`[phase] benchmark execution (mode=${opts.quick ? "quick" : "phased"})`);
phaseLogger.logPhase(`[phase] benchmark execution (mode=${opts.quick ? "quick" : "phased"})`);
for (let runIndex = 1; runIndex <= opts.runs; runIndex += 1) {
const runTargets = shuffled(opts.targets);
@@ -1782,6 +1906,13 @@ async function main() {
].join(" ");
const benchArgs = { clientInfos, keyPath, benchEnvPrefix, relayUrl };
const fetchEventCountForTarget = async () =>
fetchServerEventCount({
target,
serverIp,
keyPath,
serverEnvPrefix,
});
if (opts.quick) {
// Flat mode: run all benchmarks in one shot (backward compat)
@@ -1830,9 +1961,23 @@ async function main() {
mode: "event",
artifactDir: path.join(runTargetDir, "empty-event"),
});
eventsInDb += countEventsWritten(emptyEventResults);
const estimatedEmptyEventWritten = countEventsWritten(emptyEventResults);
eventsInDb += estimatedEmptyEventWritten;
let observedEmptyEventWritten = estimatedEmptyEventWritten;
const authoritativeAfterEmpty = await fetchEventCountForTarget();
if (Number.isInteger(authoritativeAfterEmpty) && authoritativeAfterEmpty >= 0) {
observedEmptyEventWritten = Math.max(0, authoritativeAfterEmpty);
eventsInDb = authoritativeAfterEmpty;
}
console.log(`[bench] ${target}: ~${eventsInDb} events in DB after empty phase`);
const warmSeedEventsPerConnection = Math.max(
0.1,
observedEmptyEventWritten / Math.max(1, opts.bench.eventCount * clientInfos.length),
);
// Fill to warm
const fillWarm = await smartFill({
target,
@@ -1846,6 +1991,10 @@ async function main() {
serverEnvPrefix,
artifactDir: path.join(runTargetDir, "fill-warm"),
threads: opts.bench.threads,
seedEventsPerConnection: warmSeedEventsPerConnection,
seedKeepaliveSeconds: opts.bench.keepaliveSeconds,
skipFill: target === "haven",
fetchEventCount: fetchEventCountForTarget,
});
eventsInDb = fillWarm.eventsInDb;
@@ -1863,7 +2012,21 @@ async function main() {
mode: "event",
artifactDir: path.join(runTargetDir, "warm-event"),
});
eventsInDb += countEventsWritten(warmEventResults);
const estimatedWarmEventWritten = countEventsWritten(warmEventResults);
const warmEventsBefore = eventsInDb;
eventsInDb += estimatedWarmEventWritten;
let observedWarmEventWritten = estimatedWarmEventWritten;
const authoritativeAfterWarmEvent = await fetchEventCountForTarget();
if (Number.isInteger(authoritativeAfterWarmEvent) && authoritativeAfterWarmEvent >= 0) {
observedWarmEventWritten = Math.max(0, authoritativeAfterWarmEvent - warmEventsBefore);
eventsInDb = authoritativeAfterWarmEvent;
}
const hotSeedEventsPerConnection = Math.max(
0.1,
observedWarmEventWritten / Math.max(1, opts.bench.eventCount * clientInfos.length),
);
// Fill to hot
const fillHot = await smartFill({
@@ -1878,6 +2041,10 @@ async function main() {
serverEnvPrefix,
artifactDir: path.join(runTargetDir, "fill-hot"),
threads: opts.bench.threads,
seedEventsPerConnection: hotSeedEventsPerConnection,
seedKeepaliveSeconds: opts.bench.keepaliveSeconds,
skipFill: target === "haven",
fetchEventCount: fetchEventCountForTarget,
});
eventsInDb = fillHot.eventsInDb;
@@ -1965,7 +2132,7 @@ async function main() {
});
}
console.log("[phase] final server cleanup (containers)");
phaseLogger.logPhase("[phase] final server cleanup (containers)");
await sshExec(serverIp, keyPath, "/root/cloud-bench-server.sh cleanup");
const servers = summariseServersFromResults(results);

View File

@@ -215,9 +215,12 @@ export function countEventsWritten(clientResults) {
for (const cr of clientResults) {
if (cr.status !== "ok") continue;
const eventSection = cr.sections?.event;
if (eventSection?.message_stats?.complete) {
total += Number(eventSection.message_stats.complete) || 0;
}
if (!eventSection?.message_stats) continue;
const complete = Number(eventSection.message_stats.complete) || 0;
const error = Number(eventSection.message_stats.error) || 0;
const accepted = Math.max(0, complete - error);
total += accepted;
}
return total;
}

View File

@@ -265,7 +265,7 @@ common_parrhesia_env+=( -e PARRHESIA_LIMITS_MAX_NEGENTROPY_ITEMS_PER_SESSION=100
cmd="${1:-}"
if [[ -z "$cmd" ]]; then
echo "usage: cloud-bench-server.sh <start-*|wipe-data-*|cleanup>" >&2
echo "usage: cloud-bench-server.sh <start-*|wipe-data-*|count-data-*|cleanup>" >&2
exit 1
fi
@@ -626,6 +626,14 @@ EOF
wait_port 3355 120 haven
;;
count-data-parrhesia-pg)
docker exec pg psql -U parrhesia -d parrhesia -At -c "SELECT count(*) FROM events"
;;
count-data-nostream)
docker exec nostream-db psql -U nostr_ts_relay -d nostr_ts_relay -At -c "SELECT count(*) FROM events"
;;
cleanup)
cleanup_containers
;;