bench: use nostr-bench seed mode and expose relay json counters

This commit is contained in:
2026-03-20 17:59:54 +01:00
parent 8f22eb2097
commit f7ff3a4bd7
3 changed files with 179 additions and 97 deletions

View File

@@ -5,7 +5,7 @@ relay_url="${1:-}"
mode="${2:-all}"
if [[ -z "$relay_url" ]]; then
echo "usage: cloud-bench-client.sh <relay-url> [connect|echo|event|req|all]" >&2
echo "usage: cloud-bench-client.sh <relay-url> [connect|echo|event|req|seed|all]" >&2
exit 1
fi
@@ -57,11 +57,33 @@ run_req() {
"${relay_url}"
}
run_seed() {
local target_accepted="${PARRHESIA_BENCH_SEED_TARGET_ACCEPTED:-}"
if [[ -z "$target_accepted" ]]; then
echo "PARRHESIA_BENCH_SEED_TARGET_ACCEPTED must be set for seed mode" >&2
exit 2
fi
echo "==> nostr-bench seed ${relay_url}"
"$bench_bin" seed --json \
--target-accepted "$target_accepted" \
-c "${PARRHESIA_BENCH_SEED_CONNECTION_COUNT:-64}" \
-r "${PARRHESIA_BENCH_SEED_CONNECTION_RATE:-64}" \
-k "${PARRHESIA_BENCH_SEED_KEEPALIVE_SECONDS:-0}" \
-t "${bench_threads}" \
--send-strategy "${PARRHESIA_BENCH_SEED_SEND_STRATEGY:-ack-loop}" \
--inflight "${PARRHESIA_BENCH_SEED_INFLIGHT:-32}" \
--ack-timeout "${PARRHESIA_BENCH_SEED_ACK_TIMEOUT:-30}" \
"${relay_url}"
}
case "$mode" in
connect) run_connect ;;
echo) run_echo ;;
event) run_event ;;
req) run_req ;;
seed) run_seed ;;
all) run_connect; echo; run_echo; echo; run_event; echo; run_req ;;
*) echo "unknown mode: $mode" >&2; exit 1 ;;
esac

View File

@@ -38,10 +38,14 @@ const BENCH_BUILD_DIR = path.join(ROOT_DIR, "_build", "bench");
const NOSTR_BENCH_SUBMODULE_DIR = path.join(ROOT_DIR, "nix", "nostr-bench");
const NOSTREAM_REDIS_IMAGE = "redis:7.0.5-alpine3.16";
const SEED_TOLERANCE_RATIO = 0.01;
const SEED_MAX_ROUNDS = 8;
const SEED_ROUND_DEFICIT_RATIO = 0.2;
const SEED_KEEPALIVE_SECONDS = 10;
const SEED_EVENTS_PER_CONNECTION_FALLBACK = 3;
const SEED_MAX_ROUNDS = 4;
const SEED_KEEPALIVE_SECONDS = 0;
const SEED_EVENTS_PER_CONNECTION_TARGET = 2000;
const SEED_CONNECTIONS_MIN = 1;
const SEED_CONNECTIONS_MAX = 512;
const SEED_SEND_STRATEGY = "ack-loop";
const SEED_INFLIGHT = 32;
const SEED_ACK_TIMEOUT_SECONDS = 30;
const PHASE_PREP_OFFSET_MINUTES = 3;
const DEFAULTS = {
@@ -66,8 +70,8 @@ const DEFAULTS = {
quick: false,
monitoring: true,
yes: false,
warmEvents: 25000,
hotEvents: 250000,
warmEvents: 50000,
hotEvents: 500000,
bench: {
connectCount: 3000,
connectRate: 1500,
@@ -950,6 +954,28 @@ function splitCountAcrossClients(total, clients) {
return Array.from({ length: clients }, (_, i) => base + (i < remainder ? 1 : 0));
}
function extractSeedAccepted(parsed) {
const finalAccepted = Number(parsed?.seed_final?.accepted);
if (Number.isFinite(finalAccepted) && finalAccepted >= 0) {
return Math.floor(finalAccepted);
}
const sectionAccepted = Number(parsed?.seed?.message_stats?.accepted);
if (Number.isFinite(sectionAccepted) && sectionAccepted >= 0) {
return Math.floor(sectionAccepted);
}
return 0;
}
function extractSeedTargetReached(parsed, desiredAccepted) {
if (typeof parsed?.seed_final?.target_reached === "boolean") {
return parsed.seed_final.target_reached;
}
return extractSeedAccepted(parsed) >= desiredAccepted;
}
async function runClientSeedingRound({
target,
phase,
@@ -960,8 +986,6 @@ async function runClientSeedingRound({
relayUrl,
artifactDir,
threads,
seedEventsPerConnection,
seedKeepaliveSeconds,
}) {
const benchThreads = Number.isInteger(threads) && threads >= 0 ? threads : 0;
const clientsForRound = clientInfos.slice(0, Math.min(clientInfos.length, deficit));
@@ -971,38 +995,39 @@ async function runClientSeedingRound({
const seedResults = await Promise.all(
clientsForRound.map(async (client, idx) => {
const desiredEvents = shares[idx] || 0;
const desiredAccepted = shares[idx] || 0;
const stdoutPath = path.join(roundDir, `${client.name}.stdout.log`);
const stderrPath = path.join(roundDir, `${client.name}.stderr.log`);
if (desiredEvents <= 0) {
if (desiredAccepted <= 0) {
fs.writeFileSync(stdoutPath, "", "utf8");
fs.writeFileSync(stderrPath, "", "utf8");
return {
client_name: client.name,
client_ip: client.ip,
status: "skipped",
desired_events: desiredEvents,
projected_events: 0,
acked: 0,
desired_accepted: desiredAccepted,
accepted: 0,
target_reached: true,
stdout_path: path.relative(ROOT_DIR, stdoutPath),
stderr_path: path.relative(ROOT_DIR, stderrPath),
};
}
const eventsPerConnection = Math.max(
0.1,
Number(seedEventsPerConnection) || SEED_EVENTS_PER_CONNECTION_FALLBACK,
const seedConnectionCount = Math.min(
SEED_CONNECTIONS_MAX,
Math.max(SEED_CONNECTIONS_MIN, Math.ceil(desiredAccepted / SEED_EVENTS_PER_CONNECTION_TARGET)),
);
const eventConnections = Math.max(1, Math.ceil(desiredEvents / eventsPerConnection));
const eventKeepalive = Math.max(2, Number(seedKeepaliveSeconds) || SEED_KEEPALIVE_SECONDS);
const eventRate = Math.max(1, Math.ceil(eventConnections / eventKeepalive));
const projectedEvents = Math.round(eventConnections * eventsPerConnection);
const seedConnectionRate = Math.max(1, seedConnectionCount);
const seedEnvPrefix = [
`PARRHESIA_BENCH_EVENT_COUNT=${eventConnections}`,
`PARRHESIA_BENCH_EVENT_RATE=${eventRate}`,
`PARRHESIA_BENCH_KEEPALIVE_SECONDS=${eventKeepalive}`,
`PARRHESIA_BENCH_SEED_TARGET_ACCEPTED=${desiredAccepted}`,
`PARRHESIA_BENCH_SEED_CONNECTION_COUNT=${seedConnectionCount}`,
`PARRHESIA_BENCH_SEED_CONNECTION_RATE=${seedConnectionRate}`,
`PARRHESIA_BENCH_SEED_KEEPALIVE_SECONDS=${SEED_KEEPALIVE_SECONDS}`,
`PARRHESIA_BENCH_SEED_SEND_STRATEGY=${SEED_SEND_STRATEGY}`,
`PARRHESIA_BENCH_SEED_INFLIGHT=${SEED_INFLIGHT}`,
`PARRHESIA_BENCH_SEED_ACK_TIMEOUT=${SEED_ACK_TIMEOUT_SECONDS}`,
`PARRHESIA_BENCH_THREADS=${benchThreads}`,
].join(" ");
@@ -1010,30 +1035,29 @@ async function runClientSeedingRound({
const benchRes = await sshExec(
client.ip,
keyPath,
`${seedEnvPrefix} /root/cloud-bench-client.sh ${shellEscape(relayUrl)} event`,
`${seedEnvPrefix} /root/cloud-bench-client.sh ${shellEscape(relayUrl)} seed`,
);
fs.writeFileSync(stdoutPath, benchRes.stdout, "utf8");
fs.writeFileSync(stderrPath, benchRes.stderr, "utf8");
const parsed = parseNostrBenchSections(benchRes.stdout);
const complete = Number(parsed?.event?.message_stats?.complete) || 0;
const error = Number(parsed?.event?.message_stats?.error) || 0;
const acked = Math.max(0, complete - error);
const accepted = extractSeedAccepted(parsed);
const targetReached = extractSeedTargetReached(parsed, desiredAccepted);
return {
client_name: client.name,
client_ip: client.ip,
status: "ok",
desired_events: desiredEvents,
projected_events: projectedEvents,
event_connections: eventConnections,
event_rate: eventRate,
event_keepalive_seconds: eventKeepalive,
events_per_connection_estimate: eventsPerConnection,
event_complete: complete,
event_error: error,
acked,
status: targetReached ? "ok" : "partial",
desired_accepted: desiredAccepted,
accepted,
target_reached: targetReached,
seed_connection_count: seedConnectionCount,
seed_connection_rate: seedConnectionRate,
seed_keepalive_seconds: SEED_KEEPALIVE_SECONDS,
seed_send_strategy: SEED_SEND_STRATEGY,
seed_inflight: SEED_INFLIGHT,
seed_ack_timeout_seconds: SEED_ACK_TIMEOUT_SECONDS,
stdout_path: path.relative(ROOT_DIR, stdoutPath),
stderr_path: path.relative(ROOT_DIR, stderrPath),
};
@@ -1043,17 +1067,25 @@ async function runClientSeedingRound({
fs.writeFileSync(stdoutPath, out, "utf8");
fs.writeFileSync(stderrPath, err, "utf8");
const parsed = parseNostrBenchSections(out);
const accepted = extractSeedAccepted(parsed);
const targetReached = extractSeedTargetReached(parsed, desiredAccepted);
const targetNotReached = Number(error.code) === 2;
return {
client_name: client.name,
client_ip: client.ip,
status: "error",
desired_events: desiredEvents,
projected_events: projectedEvents,
event_connections: eventConnections,
event_rate: eventRate,
event_keepalive_seconds: eventKeepalive,
events_per_connection_estimate: eventsPerConnection,
acked: 0,
status: targetNotReached ? "partial" : "error",
desired_accepted: desiredAccepted,
accepted,
target_reached: targetReached,
seed_connection_count: seedConnectionCount,
seed_connection_rate: seedConnectionRate,
seed_keepalive_seconds: SEED_KEEPALIVE_SECONDS,
seed_send_strategy: SEED_SEND_STRATEGY,
seed_inflight: SEED_INFLIGHT,
seed_ack_timeout_seconds: SEED_ACK_TIMEOUT_SECONDS,
exit_code: Number(error.code) || null,
stdout_path: path.relative(ROOT_DIR, stdoutPath),
stderr_path: path.relative(ROOT_DIR, stderrPath),
error: error.message || String(error),
@@ -1069,14 +1101,12 @@ async function runClientSeedingRound({
);
}
const acked = seedResults.reduce((sum, r) => sum + (Number(r.acked) || 0), 0);
const desired = seedResults.reduce((sum, r) => sum + (Number(r.desired_events) || 0), 0);
const projected = seedResults.reduce((sum, r) => sum + (Number(r.projected_events) || 0), 0);
const accepted = seedResults.reduce((sum, r) => sum + (Number(r.accepted) || 0), 0);
const desired = seedResults.reduce((sum, r) => sum + (Number(r.desired_accepted) || 0), 0);
return {
desired,
projected,
acked,
accepted,
clients: seedResults,
};
}
@@ -1111,7 +1141,7 @@ async function fetchServerEventCount({ target, serverIp, keyPath, serverEnvPrefi
}
// Ensure the relay has approximately `targetCount` events.
// Uses client-side nostr-bench event seeding in parallel and accepts <=1% drift.
// Uses client-side nostr-bench seed mode and accepts <=1% drift.
async function smartFill({
target,
phase,
@@ -1124,8 +1154,6 @@ async function smartFill({
serverEnvPrefix,
artifactDir,
threads,
seedEventsPerConnection,
seedKeepaliveSeconds,
skipFill,
fetchEventCount,
}) {
@@ -1162,14 +1190,8 @@ async function smartFill({
return { eventsInDb, seeded: 0, wiped, skipped: false };
}
const perConnectionEstimate = Math.max(
0.1,
Number(seedEventsPerConnection) || SEED_EVENTS_PER_CONNECTION_FALLBACK,
);
const keepaliveSeconds = Math.max(2, Number(seedKeepaliveSeconds) || SEED_KEEPALIVE_SECONDS);
console.log(
`[fill] ${target}:${phase}: seeding to ~${targetCount} events from ${eventsInDb} (deficit=${deficit}, tolerance=${tolerance}, events_per_connection≈${perConnectionEstimate.toFixed(2)}, keepalive=${keepaliveSeconds}s)`,
`[fill] ${target}:${phase}: seeding to ~${targetCount} events from ${eventsInDb} (deficit=${deficit}, tolerance=${tolerance})`,
);
let seededTotal = 0;
@@ -1179,7 +1201,7 @@ async function smartFill({
if (deficit <= tolerance) break;
roundsExecuted = round;
const roundDeficit = Math.max(1, Math.ceil(deficit * SEED_ROUND_DEFICIT_RATIO));
const roundDeficit = Math.max(1, deficit);
const roundStartMs = Date.now();
const eventsBeforeRound = eventsInDb;
@@ -1193,11 +1215,9 @@ async function smartFill({
relayUrl,
artifactDir,
threads,
seedEventsPerConnection: perConnectionEstimate,
seedKeepaliveSeconds: keepaliveSeconds,
});
let observedAdded = roundResult.acked;
let observedAdded = roundResult.accepted;
if (typeof fetchEventCount === "function") {
const authoritative = await fetchEventCount();
@@ -1205,10 +1225,10 @@ async function smartFill({
eventsInDb = authoritative;
observedAdded = Math.max(0, eventsInDb - eventsBeforeRound);
} else {
eventsInDb += roundResult.acked;
eventsInDb += roundResult.accepted;
}
} else {
eventsInDb += roundResult.acked;
eventsInDb += roundResult.accepted;
}
const elapsedSec = (Date.now() - roundStartMs) / 1000;
@@ -1218,7 +1238,7 @@ async function smartFill({
deficit = targetCount - eventsInDb;
console.log(
`[fill] ${target}:${phase} round ${round}: observed ${observedAdded} (acked=${roundResult.acked}, desired=${roundResult.desired}, projected=${roundResult.projected}) in ${elapsedSec.toFixed(1)}s (${eventsPerSec} events/s), now ~${eventsInDb}/${targetCount}`,
`[fill] ${target}:${phase} round ${round}: observed ${observedAdded} (accepted=${roundResult.accepted}, desired=${roundResult.desired}) in ${elapsedSec.toFixed(1)}s (${eventsPerSec} events/s), now ~${eventsInDb}/${targetCount}`,
);
if (observedAdded <= 0) {
@@ -2003,20 +2023,13 @@ async function main() {
const estimatedEmptyEventWritten = countEventsWritten(emptyEventResults);
eventsInDb += estimatedEmptyEventWritten;
let observedEmptyEventWritten = estimatedEmptyEventWritten;
const authoritativeAfterEmpty = await fetchEventCountForTarget();
if (Number.isInteger(authoritativeAfterEmpty) && authoritativeAfterEmpty >= 0) {
observedEmptyEventWritten = Math.max(0, authoritativeAfterEmpty);
eventsInDb = authoritativeAfterEmpty;
}
console.log(`[bench] ${target}: ~${eventsInDb} events in DB after empty phase`);
const warmSeedEventsPerConnection = Math.max(
0.1,
observedEmptyEventWritten / Math.max(1, opts.bench.eventCount * clientInfos.length),
);
// Fill to warm
const fillWarm = await smartFill({
target,
@@ -2030,8 +2043,6 @@ async function main() {
serverEnvPrefix,
artifactDir: path.join(runTargetDir, "fill-warm"),
threads: opts.bench.threads,
seedEventsPerConnection: warmSeedEventsPerConnection,
seedKeepaliveSeconds: opts.bench.keepaliveSeconds,
skipFill: target === "haven",
fetchEventCount: fetchEventCountForTarget,
});
@@ -2052,21 +2063,13 @@ async function main() {
artifactDir: path.join(runTargetDir, "warm-event"),
});
const estimatedWarmEventWritten = countEventsWritten(warmEventResults);
const warmEventsBefore = eventsInDb;
eventsInDb += estimatedWarmEventWritten;
let observedWarmEventWritten = estimatedWarmEventWritten;
const authoritativeAfterWarmEvent = await fetchEventCountForTarget();
if (Number.isInteger(authoritativeAfterWarmEvent) && authoritativeAfterWarmEvent >= 0) {
observedWarmEventWritten = Math.max(0, authoritativeAfterWarmEvent - warmEventsBefore);
eventsInDb = authoritativeAfterWarmEvent;
}
const hotSeedEventsPerConnection = Math.max(
0.1,
observedWarmEventWritten / Math.max(1, opts.bench.eventCount * clientInfos.length),
);
// Fill to hot
const fillHot = await smartFill({
target,
@@ -2080,8 +2083,6 @@ async function main() {
serverEnvPrefix,
artifactDir: path.join(runTargetDir, "fill-hot"),
threads: opts.bench.threads,
seedEventsPerConnection: hotSeedEventsPerConnection,
seedKeepaliveSeconds: opts.bench.keepaliveSeconds,
skipFill: target === "haven",
fetchEventCount: fetchEventCountForTarget,
});

View File

@@ -10,7 +10,7 @@ export function parseNostrBenchSections(output) {
for (const lineRaw of lines) {
const line = lineRaw.trim();
const header = line.match(/^==>\s+nostr-bench\s+(connect|echo|event|req)\s+/);
const header = line.match(/^==>\s+nostr-bench\s+(connect|echo|event|req|seed)\s+/);
if (header) {
section = header[1];
continue;
@@ -20,9 +20,24 @@ export function parseNostrBenchSections(output) {
try {
const json = JSON.parse(line);
if (section) {
parsed[section] = json;
if (!section) continue;
if (section === "seed" && json?.type === "seed_final") {
parsed.seed_final = json;
continue;
}
const existing = parsed[section];
if (!existing) {
parsed[section] = json;
continue;
}
if (existing?.type === "final" && json?.type !== "final") {
continue;
}
parsed[section] = json;
} catch {
// ignore noisy non-json lines
}
@@ -43,14 +58,21 @@ export function sum(values) {
return valid.reduce((a, b) => a + b, 0);
}
export function throughputFromSection(section) {
export function throughputFromSection(section, options = {}) {
const { preferAccepted = false } = options;
const elapsedMs = Number(section?.elapsed ?? NaN);
const accepted = Number(section?.message_stats?.accepted ?? NaN);
const complete = Number(section?.message_stats?.complete ?? NaN);
const effectiveCount =
preferAccepted && Number.isFinite(accepted)
? accepted
: complete;
const totalBytes = Number(section?.message_stats?.size ?? NaN);
const cumulativeTps =
Number.isFinite(elapsedMs) && elapsedMs > 0 && Number.isFinite(complete)
? complete / (elapsedMs / 1000)
Number.isFinite(elapsedMs) && elapsedMs > 0 && Number.isFinite(effectiveCount)
? effectiveCount / (elapsedMs / 1000)
: NaN;
const cumulativeMibs =
@@ -58,7 +80,11 @@ export function throughputFromSection(section) {
? totalBytes / (1024 * 1024) / (elapsedMs / 1000)
: NaN;
const sampleTps = Number(section?.tps ?? NaN);
const sampleTps = Number(
preferAccepted
? section?.accepted_tps ?? section?.tps
: section?.tps,
);
const sampleMibs = Number(section?.size ?? NaN);
return {
@@ -67,10 +93,16 @@ export function throughputFromSection(section) {
};
}
function messageCounter(section, field) {
const value = Number(section?.message_stats?.[field]);
return Number.isFinite(value) ? value : 0;
}
export function metricFromSections(sections) {
const connect = sections?.connect?.connect_stats?.success_time || {};
const echo = throughputFromSection(sections?.echo || {});
const event = throughputFromSection(sections?.event || {});
const eventSection = sections?.event || {};
const event = throughputFromSection(eventSection, { preferAccepted: true });
const req = throughputFromSection(sections?.req || {});
return {
@@ -80,6 +112,10 @@ export function metricFromSections(sections) {
echo_mibs: echo.mibs,
event_tps: event.tps,
event_mibs: event.mibs,
event_notice: messageCounter(eventSection, "notice"),
event_auth_challenge: messageCounter(eventSection, "auth_challenge"),
event_reply_unrecognized: messageCounter(eventSection, "reply_unrecognized"),
event_ack_timeout: messageCounter(eventSection, "ack_timeout"),
req_tps: req.tps,
req_mibs: req.mibs,
};
@@ -109,6 +145,10 @@ export function summariseFlatResults(results) {
echo_mibs: sum(clientSamples.map((s) => s.echo_mibs)),
event_tps: sum(clientSamples.map((s) => s.event_tps)),
event_mibs: sum(clientSamples.map((s) => s.event_mibs)),
event_notice: sum(clientSamples.map((s) => s.event_notice)),
event_auth_challenge: sum(clientSamples.map((s) => s.event_auth_challenge)),
event_reply_unrecognized: sum(clientSamples.map((s) => s.event_reply_unrecognized)),
event_ack_timeout: sum(clientSamples.map((s) => s.event_ack_timeout)),
req_tps: sum(clientSamples.map((s) => s.req_tps)),
req_mibs: sum(clientSamples.map((s) => s.req_mibs)),
});
@@ -121,6 +161,10 @@ export function summariseFlatResults(results) {
"echo_mibs",
"event_tps",
"event_mibs",
"event_notice",
"event_auth_challenge",
"event_reply_unrecognized",
"event_ack_timeout",
"req_tps",
"req_mibs",
];
@@ -184,6 +228,16 @@ export function summarisePhasedResults(results) {
if (eventClients.length > 0) {
sample[`event_${level}_tps`] = sum(eventClients.map((s) => s.event_tps));
sample[`event_${level}_mibs`] = sum(eventClients.map((s) => s.event_mibs));
sample[`event_${level}_notice`] = sum(eventClients.map((s) => s.event_notice));
sample[`event_${level}_auth_challenge`] = sum(
eventClients.map((s) => s.event_auth_challenge),
);
sample[`event_${level}_reply_unrecognized`] = sum(
eventClients.map((s) => s.event_reply_unrecognized),
);
sample[`event_${level}_ack_timeout`] = sum(
eventClients.map((s) => s.event_ack_timeout),
);
}
}
@@ -217,10 +271,15 @@ export function countEventsWritten(clientResults) {
const eventSection = cr.sections?.event;
if (!eventSection?.message_stats) continue;
const accepted = Number(eventSection.message_stats.accepted);
if (Number.isFinite(accepted)) {
total += Math.max(0, accepted);
continue;
}
const complete = Number(eventSection.message_stats.complete) || 0;
const error = Number(eventSection.message_stats.error) || 0;
const accepted = Math.max(0, complete - error);
total += accepted;
total += Math.max(0, complete - error);
}
return total;
}