bench: Multi-temperature cloud bench
Some checks failed
CI / Test (OTP 27.2 / Elixir 1.18.2) (push) Failing after 0s
CI / Test (OTP 28.4 / Elixir 1.19.4 + E2E) (push) Failing after 0s

This commit is contained in:
2026-03-19 22:14:35 +01:00
parent c45dbadd78
commit e02bd99a43
7 changed files with 1275 additions and 247 deletions

View File

@@ -610,7 +610,7 @@ just bench update all
Current comparison results:
| metric | parrhesia-pg | parrhesia-mem | strfry | nostr-rs-relay | mem/pg | strfry/pg | nostr-rs/pg |
| metric | parrhesia-pg | parrhesia-mem | strfry | nostr-rs-relay | mem/pg | strfry/pg | nostr-rs-relay/pg |
| --- | ---: | ---: | ---: | ---: | ---: | ---: | ---: |
| connect avg latency (ms) ↓ | 34.67 | 43.33 | 2.67 | 2.67 | 1.25x | **0.08x** | **0.08x** |
| connect max latency (ms) ↓ | 61.67 | 74.67 | 4.67 | 4.00 | 1.21x | **0.08x** | **0.06x** |

View File

@@ -18,6 +18,7 @@
packages = forAllSystems (
system: let
pkgs = import nixpkgs {inherit system;};
pkgsLinux = import nixpkgs {system = "x86_64-linux";};
lib = pkgs.lib;
parrhesia = pkgs.callPackage ./default.nix {};
nostrBench = pkgs.callPackage ./nix/nostr-bench.nix {};
@@ -25,10 +26,12 @@
{
default = parrhesia;
inherit parrhesia nostrBench;
# Uses x86_64-linux pkgs so it can cross-build via a remote
# builder even when the host is aarch64-darwin.
nostrBenchStaticX86_64Musl = pkgsLinux.callPackage ./nix/nostr-bench.nix {staticX86_64Musl = true;};
}
// lib.optionalAttrs pkgs.stdenv.hostPlatform.isLinux {
nostrBenchStaticX86_64Musl = pkgs.callPackage ./nix/nostr-bench.nix {staticX86_64Musl = true;};
dockerImage = pkgs.dockerTools.buildLayeredImage {
name = "parrhesia";
tag = "latest";

594
package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@@ -1,5 +1,7 @@
{
"dependencies": {
"@mariozechner/pi-coding-agent": "^0.60.0"
"devDependencies": {
"@mariozechner/pi-coding-agent": "^0.60.0",
"nostr-tools": "^2.10.0",
"ws": "^8.18.0"
}
}

View File

@@ -6,6 +6,7 @@ import path from "node:path";
import { spawn } from "node:child_process";
import readline from "node:readline";
import { fileURLToPath } from "node:url";
import { seedEvents } from "./nostr_seed.mjs";
const __filename = fileURLToPath(import.meta.url);
const __dirname = path.dirname(__filename);
@@ -37,6 +38,9 @@ const DEFAULTS = {
nostreamRef: "main",
havenImage: "holgerhatgarkeinenode/haven-docker:latest",
keep: false,
quick: false,
warmEvents: 100000,
hotEvents: 1000000,
bench: {
connectCount: 1000,
connectRate: 200,
@@ -95,6 +99,11 @@ Options:
--req-limit <n> (default: ${DEFAULTS.bench.reqLimit})
--keepalive-seconds <n> (default: ${DEFAULTS.bench.keepaliveSeconds})
Phased benchmark:
--warm-events <n> DB fill level for warm phase (default: ${DEFAULTS.warmEvents})
--hot-events <n> DB fill level for hot phase (default: ${DEFAULTS.hotEvents})
--quick Skip phased benchmarks, run flat connect→echo→event→req
Output + lifecycle:
--history-file <path> (default: ${DEFAULTS.historyFile})
--artifacts-dir <path> (default: ${DEFAULTS.artifactsDir})
@@ -223,11 +232,24 @@ function parseArgs(argv) {
case "--keep":
opts.keep = true;
break;
case "--quick":
opts.quick = true;
break;
case "--warm-events":
opts.warmEvents = intOpt(arg, argv[++i]);
break;
case "--hot-events":
opts.hotEvents = intOpt(arg, argv[++i]);
break;
default:
throw new Error(`Unknown argument: ${arg}`);
}
}
if (process.env.PARRHESIA_BENCH_WARM_EVENTS) opts.warmEvents = Number(process.env.PARRHESIA_BENCH_WARM_EVENTS);
if (process.env.PARRHESIA_BENCH_HOT_EVENTS) opts.hotEvents = Number(process.env.PARRHESIA_BENCH_HOT_EVENTS);
if (process.env.PARRHESIA_BENCH_QUICK === "1") opts.quick = true;
if (!opts.targets.length) {
throw new Error("--targets must include at least one target");
}
@@ -1053,7 +1075,7 @@ common_parrhesia_env+=( -e PARRHESIA_LIMITS_MAX_NEGENTROPY_ITEMS_PER_SESSION=100
cmd="\${1:-}"
if [[ -z "\$cmd" ]]; then
echo "usage: cloud-bench-server.sh <start-parrhesia-pg|start-parrhesia-memory|start-strfry|start-nostr-rs-relay|start-nostream|start-haven|cleanup>" >&2
echo "usage: cloud-bench-server.sh <start-*|wipe-data-*|cleanup>" >&2
exit 1
fi
@@ -1375,6 +1397,45 @@ EOF
wait_port 3355 120 haven
;;
wipe-data-parrhesia-pg)
docker exec pg psql -U parrhesia -d parrhesia -c \
"TRUNCATE event_ids, event_tags, events, replaceable_event_state, addressable_event_state CASCADE"
;;
wipe-data-parrhesia-memory)
docker restart parrhesia
wait_http "http://127.0.0.1:4413/health" 120 parrhesia
;;
wipe-data-strfry)
docker stop strfry
rm -rf /root/strfry-data/strfry/*
docker start strfry
wait_port 7777 60 strfry
;;
wipe-data-nostr-rs-relay)
docker rm -f nostr-rs
docker run -d --name nostr-rs \
--ulimit nofile=262144:262144 \
-p 8080:8080 \
-v /root/nostr-rs.toml:/usr/src/app/config.toml:ro \
"\$NOSTR_RS_IMAGE" >/dev/null
wait_http "http://127.0.0.1:8080/" 60 nostr-rs
;;
wipe-data-nostream)
docker exec nostream-db psql -U nostr_ts_relay -d nostr_ts_relay -c \
"TRUNCATE events CASCADE"
;;
wipe-data-haven)
docker stop haven
rm -rf /root/haven-bench/db/*
docker start haven
wait_port 3355 120 haven
;;
cleanup)
cleanup_containers
;;
@@ -1392,21 +1453,25 @@ function makeClientScript() {
set -euo pipefail
relay_url="\${1:-}"
mode="\${2:-all}"
if [[ -z "\$relay_url" ]]; then
echo "usage: cloud-bench-client.sh <relay-url>" >&2
echo "usage: cloud-bench-client.sh <relay-url> [connect|echo|event|req|all]" >&2
exit 1
fi
bench_bin="\${NOSTR_BENCH_BIN:-/usr/local/bin/nostr-bench}"
run_connect() {
echo "==> nostr-bench connect \${relay_url}"
"\$bench_bin" connect --json \
-c "\${PARRHESIA_BENCH_CONNECT_COUNT:-200}" \
-r "\${PARRHESIA_BENCH_CONNECT_RATE:-100}" \
-k "\${PARRHESIA_BENCH_KEEPALIVE_SECONDS:-5}" \
"\${relay_url}"
}
echo
run_echo() {
echo "==> nostr-bench echo \${relay_url}"
"\$bench_bin" echo --json \
-c "\${PARRHESIA_BENCH_ECHO_COUNT:-100}" \
@@ -1414,16 +1479,18 @@ echo "==> nostr-bench echo \${relay_url}"
-k "\${PARRHESIA_BENCH_KEEPALIVE_SECONDS:-5}" \
--size "\${PARRHESIA_BENCH_ECHO_SIZE:-512}" \
"\${relay_url}"
}
echo
run_event() {
echo "==> nostr-bench event \${relay_url}"
"\$bench_bin" event --json \
-c "\${PARRHESIA_BENCH_EVENT_COUNT:-100}" \
-r "\${PARRHESIA_BENCH_EVENT_RATE:-50}" \
-k "\${PARRHESIA_BENCH_KEEPALIVE_SECONDS:-5}" \
"\${relay_url}"
}
echo
run_req() {
echo "==> nostr-bench req \${relay_url}"
"\$bench_bin" req --json \
-c "\${PARRHESIA_BENCH_REQ_COUNT:-100}" \
@@ -1431,6 +1498,16 @@ echo "==> nostr-bench req \${relay_url}"
-k "\${PARRHESIA_BENCH_KEEPALIVE_SECONDS:-5}" \
--limit "\${PARRHESIA_BENCH_REQ_LIMIT:-10}" \
"\${relay_url}"
}
case "\$mode" in
connect) run_connect ;;
echo) run_echo ;;
event) run_event ;;
req) run_req ;;
all) run_connect; echo; run_echo; echo; run_event; echo; run_req ;;
*) echo "unknown mode: \$mode" >&2; exit 1 ;;
esac
`;
}
@@ -1516,7 +1593,7 @@ function metricFromSections(sections) {
};
}
function summariseServersFromResults(results) {
function summariseFlatResults(results) {
const byServer = new Map();
for (const runEntry of results) {
@@ -1568,6 +1645,210 @@ function summariseServersFromResults(results) {
return out;
}
function summarisePhasedResults(results) {
const byServer = new Map();
for (const entry of results) {
if (!byServer.has(entry.target)) byServer.set(entry.target, []);
const phases = entry.phases;
if (!phases) continue;
const sample = {};
// connect
const connectClients = (phases.connect?.clients || [])
.filter((c) => c.status === "ok")
.map((c) => metricFromSections(c.sections || {}));
if (connectClients.length > 0) {
sample.connect_avg_ms = mean(connectClients.map((s) => s.connect_avg_ms));
sample.connect_max_ms = mean(connectClients.map((s) => s.connect_max_ms));
}
// echo
const echoClients = (phases.echo?.clients || [])
.filter((c) => c.status === "ok")
.map((c) => metricFromSections(c.sections || {}));
if (echoClients.length > 0) {
sample.echo_tps = sum(echoClients.map((s) => s.echo_tps));
sample.echo_mibs = sum(echoClients.map((s) => s.echo_mibs));
}
// Per-level req and event metrics
for (const level of ["empty", "warm", "hot"]) {
const phase = phases[level];
if (!phase) continue;
const reqClients = (phase.req?.clients || [])
.filter((c) => c.status === "ok")
.map((c) => metricFromSections(c.sections || {}));
if (reqClients.length > 0) {
sample[`req_${level}_tps`] = sum(reqClients.map((s) => s.req_tps));
sample[`req_${level}_mibs`] = sum(reqClients.map((s) => s.req_mibs));
}
const eventClients = (phase.event?.clients || [])
.filter((c) => c.status === "ok")
.map((c) => metricFromSections(c.sections || {}));
if (eventClients.length > 0) {
sample[`event_${level}_tps`] = sum(eventClients.map((s) => s.event_tps));
sample[`event_${level}_mibs`] = sum(eventClients.map((s) => s.event_mibs));
}
}
byServer.get(entry.target).push(sample);
}
const out = {};
for (const [name, samples] of byServer.entries()) {
if (samples.length === 0) continue;
const allKeys = new Set(samples.flatMap((s) => Object.keys(s)));
const summary = {};
for (const key of allKeys) {
summary[key] = mean(samples.map((s) => s[key]).filter((v) => v !== undefined));
}
out[name] = summary;
}
return out;
}
function summariseServersFromResults(results) {
const isPhased = results.some((r) => r.mode === "phased");
return isPhased ? summarisePhasedResults(results) : summariseFlatResults(results);
}
// Count events successfully written by event benchmarks across all clients.
function countEventsWritten(clientResults) {
let total = 0;
for (const cr of clientResults) {
if (cr.status !== "ok") continue;
const eventSection = cr.sections?.event;
if (eventSection?.message_stats?.complete) {
total += Number(eventSection.message_stats.complete) || 0;
}
}
return total;
}
// Ensure the relay has approximately `targetCount` events.
// Seeds from the orchestrator via WebSocket, or wipes and reseeds if over target.
async function smartFill({
target,
targetCount,
eventsInDb,
relayUrl,
serverIp,
keyPath,
serverEnvPrefix,
}) {
if (targetCount <= 0) return { eventsInDb, seeded: 0, wiped: false };
let wiped = false;
if (eventsInDb > targetCount) {
console.log(`[fill] ${target}: have ${eventsInDb} > ${targetCount}, wiping and reseeding`);
const wipeCmd = `wipe-data-${target}`;
await sshExec(serverIp, keyPath, `${serverEnvPrefix} /root/cloud-bench-server.sh ${shellEscape(wipeCmd)}`);
eventsInDb = 0;
wiped = true;
}
const deficit = targetCount - eventsInDb;
if (deficit <= 0) {
console.log(`[fill] ${target}: already at ${eventsInDb} events (target ${targetCount}), skipping`);
return { eventsInDb, seeded: 0, wiped };
}
console.log(`[fill] ${target}: seeding ${deficit} events (${eventsInDb}${targetCount})`);
const result = await seedEvents({
url: relayUrl,
count: deficit,
concurrency: 16,
onProgress: (n) => {
if (n % 10000 === 0) console.log(`[fill] ${target}: ${n}/${deficit} seeded`);
},
});
const elapsedSec = result.elapsed_ms / 1000;
const eventsPerSec = elapsedSec > 0 ? Math.round(result.acked / elapsedSec) : 0;
console.log(
`[fill] ${target}: seeded ${result.acked}/${deficit} in ${elapsedSec.toFixed(1)}s (${eventsPerSec} events/s)` +
(result.errors > 0 ? ` (${result.errors} errors)` : ""),
);
eventsInDb += result.acked;
return { eventsInDb, seeded: result.acked, wiped };
}
// Run a single benchmark type across all clients in parallel.
async function runSingleBenchmark({
clientInfos,
keyPath,
benchEnvPrefix,
relayUrl,
mode,
artifactDir,
}) {
fs.mkdirSync(artifactDir, { recursive: true });
const clientResults = await Promise.all(
clientInfos.map(async (client) => {
const startedAt = new Date().toISOString();
const startMs = Date.now();
const stdoutPath = path.join(artifactDir, `${client.name}.stdout.log`);
const stderrPath = path.join(artifactDir, `${client.name}.stderr.log`);
try {
const benchRes = await sshExec(
client.ip,
keyPath,
`${benchEnvPrefix} /root/cloud-bench-client.sh ${shellEscape(relayUrl)} ${shellEscape(mode)}`,
);
fs.writeFileSync(stdoutPath, benchRes.stdout, "utf8");
fs.writeFileSync(stderrPath, benchRes.stderr, "utf8");
return {
client_name: client.name,
client_ip: client.ip,
status: "ok",
started_at: startedAt,
finished_at: new Date().toISOString(),
duration_ms: Date.now() - startMs,
stdout_path: path.relative(ROOT_DIR, stdoutPath),
stderr_path: path.relative(ROOT_DIR, stderrPath),
sections: parseNostrBenchSections(benchRes.stdout),
};
} catch (error) {
const out = error.stdout || "";
const err = error.stderr || String(error);
fs.writeFileSync(stdoutPath, out, "utf8");
fs.writeFileSync(stderrPath, err, "utf8");
return {
client_name: client.name,
client_ip: client.ip,
status: "error",
started_at: startedAt,
finished_at: new Date().toISOString(),
duration_ms: Date.now() - startMs,
stdout_path: path.relative(ROOT_DIR, stdoutPath),
stderr_path: path.relative(ROOT_DIR, stderrPath),
error: String(error.message || error),
sections: parseNostrBenchSections(out),
};
}
}),
);
const failed = clientResults.filter((r) => r.status !== "ok");
if (failed.length > 0) {
throw new Error(
`Client benchmark failed: ${failed.map((f) => f.client_name).join(", ")}`,
);
}
return clientResults;
}
async function tryCommandStdout(command, args = [], options = {}) {
try {
const res = await runCommand(command, args, options);
@@ -2082,7 +2363,7 @@ async function main() {
const results = [];
const targetOrderPerRun = [];
console.log("[phase] benchmark execution");
console.log(`[phase] benchmark execution (mode=${opts.quick ? "quick" : "phased"})`);
for (let runIndex = 1; runIndex <= opts.runs; runIndex += 1) {
const runTargets = shuffled(opts.targets);
@@ -2135,70 +2416,142 @@ async function main() {
`PARRHESIA_BENCH_KEEPALIVE_SECONDS=${opts.bench.keepaliveSeconds}`,
].join(" ");
const clientRunResults = await Promise.all(
clientInfos.map(async (client) => {
const startedAt = new Date().toISOString();
const startMs = Date.now();
const stdoutPath = path.join(runTargetDir, `${client.name}.stdout.log`);
const stderrPath = path.join(runTargetDir, `${client.name}.stderr.log`);
const benchArgs = { clientInfos, keyPath, benchEnvPrefix, relayUrl };
try {
const benchRes = await sshExec(
client.ip,
keyPath,
`${benchEnvPrefix} /root/cloud-bench-client.sh ${shellEscape(relayUrl)}`,
);
fs.writeFileSync(stdoutPath, benchRes.stdout, "utf8");
fs.writeFileSync(stderrPath, benchRes.stderr, "utf8");
return {
client_name: client.name,
client_ip: client.ip,
status: "ok",
started_at: startedAt,
finished_at: new Date().toISOString(),
duration_ms: Date.now() - startMs,
stdout_path: path.relative(ROOT_DIR, stdoutPath),
stderr_path: path.relative(ROOT_DIR, stderrPath),
sections: parseNostrBenchSections(benchRes.stdout),
};
} catch (error) {
const out = error.stdout || "";
const err = error.stderr || String(error);
fs.writeFileSync(stdoutPath, out, "utf8");
fs.writeFileSync(stderrPath, err, "utf8");
return {
client_name: client.name,
client_ip: client.ip,
status: "error",
started_at: startedAt,
finished_at: new Date().toISOString(),
duration_ms: Date.now() - startMs,
stdout_path: path.relative(ROOT_DIR, stdoutPath),
stderr_path: path.relative(ROOT_DIR, stderrPath),
error: String(error.message || error),
sections: parseNostrBenchSections(out),
};
}
}),
);
if (opts.quick) {
// Flat mode: run all benchmarks in one shot (backward compat)
const clientRunResults = await runSingleBenchmark({
...benchArgs,
mode: "all",
artifactDir: runTargetDir,
});
results.push({
run: runIndex,
target,
relay_url: relayUrl,
mode: "flat",
clients: clientRunResults,
});
} else {
// Phased mode: separate benchmarks at different DB fill levels
let eventsInDb = 0;
const failed = clientRunResults.filter((r) => r.status !== "ok");
if (failed.length > 0) {
throw new Error(
`Client benchmark failed for target=${target}, run=${runIndex}: ${failed
.map((f) => f.client_name)
.join(", ")}`,
);
console.log(`[bench] ${target}: connect`);
const connectResults = await runSingleBenchmark({
...benchArgs,
mode: "connect",
artifactDir: path.join(runTargetDir, "connect"),
});
console.log(`[bench] ${target}: echo`);
const echoResults = await runSingleBenchmark({
...benchArgs,
mode: "echo",
artifactDir: path.join(runTargetDir, "echo"),
});
// Phase: empty
console.log(`[bench] ${target}: req (empty, ${eventsInDb} events)`);
const emptyReqResults = await runSingleBenchmark({
...benchArgs,
mode: "req",
artifactDir: path.join(runTargetDir, "empty-req"),
});
console.log(`[bench] ${target}: event (empty, ${eventsInDb} events)`);
const emptyEventResults = await runSingleBenchmark({
...benchArgs,
mode: "event",
artifactDir: path.join(runTargetDir, "empty-event"),
});
eventsInDb += countEventsWritten(emptyEventResults);
console.log(`[bench] ${target}: ~${eventsInDb} events in DB after empty phase`);
// Fill to warm
const fillWarm = await smartFill({
target,
targetCount: opts.warmEvents,
eventsInDb,
relayUrl,
serverIp,
keyPath,
serverEnvPrefix,
});
eventsInDb = fillWarm.eventsInDb;
// Phase: warm
console.log(`[bench] ${target}: req (warm, ~${eventsInDb} events)`);
const warmReqResults = await runSingleBenchmark({
...benchArgs,
mode: "req",
artifactDir: path.join(runTargetDir, "warm-req"),
});
console.log(`[bench] ${target}: event (warm, ~${eventsInDb} events)`);
const warmEventResults = await runSingleBenchmark({
...benchArgs,
mode: "event",
artifactDir: path.join(runTargetDir, "warm-event"),
});
eventsInDb += countEventsWritten(warmEventResults);
// Fill to hot
const fillHot = await smartFill({
target,
targetCount: opts.hotEvents,
eventsInDb,
relayUrl,
serverIp,
keyPath,
serverEnvPrefix,
});
eventsInDb = fillHot.eventsInDb;
// Phase: hot
console.log(`[bench] ${target}: req (hot, ~${eventsInDb} events)`);
const hotReqResults = await runSingleBenchmark({
...benchArgs,
mode: "req",
artifactDir: path.join(runTargetDir, "hot-req"),
});
console.log(`[bench] ${target}: event (hot, ~${eventsInDb} events)`);
const hotEventResults = await runSingleBenchmark({
...benchArgs,
mode: "event",
artifactDir: path.join(runTargetDir, "hot-event"),
});
results.push({
run: runIndex,
target,
relay_url: relayUrl,
mode: "phased",
phases: {
connect: { clients: connectResults },
echo: { clients: echoResults },
empty: {
req: { clients: emptyReqResults },
event: { clients: emptyEventResults },
db_events_before: 0,
},
warm: {
req: { clients: warmReqResults },
event: { clients: warmEventResults },
db_events_before: fillWarm.eventsInDb,
seeded: fillWarm.seeded,
wiped: fillWarm.wiped,
},
hot: {
req: { clients: hotReqResults },
event: { clients: hotEventResults },
db_events_before: fillHot.eventsInDb,
seeded: fillHot.seeded,
wiped: fillHot.wiped,
},
},
});
}
}
}
@@ -2222,7 +2575,7 @@ async function main() {
const servers = summariseServersFromResults(results);
const entry = {
schema_version: 2,
schema_version: opts.quick ? 2 : 3,
timestamp,
run_id: runId,
machine_id: os.hostname(),
@@ -2255,6 +2608,9 @@ async function main() {
runs: opts.runs,
targets: opts.targets,
target_order_per_run: targetOrderPerRun,
mode: opts.quick ? "flat" : "phased",
warm_events: opts.warmEvents,
hot_events: opts.hotEvents,
...opts.bench,
},
versions,

185
scripts/nostr_seed.mjs Normal file
View File

@@ -0,0 +1,185 @@
#!/usr/bin/env node
// Nostr event seeder — generates and publishes events matching nostr-bench
// patterns to a relay via WebSocket.
//
// Standalone:
// node scripts/nostr_seed.mjs --url ws://127.0.0.1:4413/relay --count 10000 [--concurrency 8]
//
// As module:
// import { seedEvents } from './nostr_seed.mjs';
// const result = await seedEvents({ url, count, concurrency: 8 });
import { generateSecretKey, getPublicKey, finalizeEvent } from "nostr-tools/pure";
import WebSocket from "ws";
import { parseArgs } from "node:util";
import { fileURLToPath } from "node:url";
// Matches nostr-bench util.rs:48-66 exactly.
function generateBenchEvent() {
const sk = generateSecretKey();
const pk = getPublicKey(sk);
const benchTag = `nostr-bench-${Math.floor(Math.random() * 1000)}`;
return finalizeEvent(
{
kind: 1,
created_at: Math.floor(Date.now() / 1000),
content: "This is a message from nostr-bench client",
tags: [
["p", pk],
[
"e",
"378f145897eea948952674269945e88612420db35791784abf0616b4fed56ef7",
],
["t", "nostr-bench-"],
["t", benchTag],
],
},
sk,
);
}
/**
* Seed exactly `count` events to a relay.
*
* Opens `concurrency` parallel WebSocket connections, generates events
* matching nostr-bench patterns, sends them, and waits for OK acks.
*
* @param {object} opts
* @param {string} opts.url Relay WebSocket URL
* @param {number} opts.count Number of events to send
* @param {number} [opts.concurrency] Parallel connections (default 8)
* @param {function} [opts.onProgress] Called with (acked_so_far) periodically
* @returns {Promise<{sent: number, acked: number, errors: number, elapsed_ms: number}>}
*/
export async function seedEvents({ url, count, concurrency = 8, onProgress }) {
if (count <= 0) return { sent: 0, acked: 0, errors: 0, elapsed_ms: 0 };
const startMs = Date.now();
let sent = 0;
let acked = 0;
let errors = 0;
let nextToSend = 0;
// Claim the next event index atomically (single-threaded, but clear intent).
function claimNext() {
if (nextToSend >= count) return -1;
return nextToSend++;
}
async function runWorker() {
return new Promise((resolve, reject) => {
const ws = new WebSocket(url);
let pendingResolve = null;
let closed = false;
ws.on("error", (err) => {
if (!closed) {
closed = true;
reject(err);
}
});
ws.on("close", () => {
closed = true;
if (pendingResolve) pendingResolve();
resolve();
});
ws.on("open", () => {
sendNext();
});
ws.on("message", (data) => {
const msg = data.toString();
if (msg.includes('"OK"')) {
if (msg.includes("true")) {
acked++;
} else {
errors++;
}
if (onProgress && (acked + errors) % 10000 === 0) {
onProgress(acked);
}
sendNext();
}
});
function sendNext() {
if (closed) return;
const idx = claimNext();
if (idx < 0) {
closed = true;
ws.close();
return;
}
const event = generateBenchEvent();
sent++;
ws.send(JSON.stringify(["EVENT", event]));
}
});
}
const workers = [];
const actualConcurrency = Math.min(concurrency, count);
for (let i = 0; i < actualConcurrency; i++) {
workers.push(runWorker());
}
await Promise.allSettled(workers);
return {
sent,
acked,
errors,
elapsed_ms: Date.now() - startMs,
};
}
// CLI entrypoint
const isMain =
process.argv[1] &&
fileURLToPath(import.meta.url).endsWith(process.argv[1].replace(/^.*\//, ""));
if (isMain) {
const { values } = parseArgs({
options: {
url: { type: "string" },
count: { type: "string" },
concurrency: { type: "string", default: "8" },
},
strict: false,
});
if (!values.url || !values.count) {
console.error(
"usage: node scripts/nostr_seed.mjs --url ws://... --count <n> [--concurrency <n>]",
);
process.exit(1);
}
const count = Number(values.count);
const concurrency = Number(values.concurrency);
if (!Number.isInteger(count) || count < 1) {
console.error("--count must be a positive integer");
process.exit(1);
}
console.log(
`[seed] seeding ${count} events to ${values.url} (concurrency=${concurrency})`,
);
const result = await seedEvents({
url: values.url,
count,
concurrency,
onProgress: (n) => {
process.stdout.write(`\r[seed] ${n}/${count} acked`);
},
});
if (count > 500) process.stdout.write("\n");
console.log(JSON.stringify(result));
}

View File

@@ -207,51 +207,138 @@ const presentBaselines = [
...[...discoveredBaselines].filter((srv) => !preferredBaselineOrder.includes(srv)).sort((a, b) => a.localeCompare(b)),
];
const chartMetrics = [
{ key: "event_tps", label: "Event Throughput (TPS) — higher is better", file: "event_tps.tsv", ylabel: "TPS" },
{ key: "req_tps", label: "Req Throughput (TPS) — higher is better", file: "req_tps.tsv", ylabel: "TPS" },
{ key: "echo_tps", label: "Echo Throughput (TPS) — higher is better", file: "echo_tps.tsv", ylabel: "TPS" },
{ key: "connect_avg_ms", label: "Connect Avg Latency (ms) — lower is better", file: "connect_avg_ms.tsv", ylabel: "ms" },
// --- Colour palette per server: [empty, warm, hot] ---
const serverColours = {
"parrhesia-pg": ["#93c5fd", "#3b82f6", "#1e40af"],
"parrhesia-memory": ["#86efac", "#22c55e", "#166534"],
"strfry": ["#fdba74", "#f97316", "#9a3412"],
"nostr-rs-relay": ["#fca5a5", "#ef4444", "#991b1b"],
"nostream": ["#d8b4fe", "#a855f7", "#6b21a8"],
"haven": ["#fde68a", "#eab308", "#854d0e"],
};
const levelStyles = [
/* empty */ { dt: 3, pt: 6, ps: 0.7, lw: 1.5 },
/* warm */ { dt: 2, pt: 8, ps: 0.8, lw: 1.5 },
/* hot */ { dt: 1, pt: 7, ps: 1.0, lw: 2 },
];
for (const cm of chartMetrics) {
const header = ["tag", "parrhesia-pg", "parrhesia-memory"];
for (const srv of presentBaselines) header.push(srv);
const levels = ["empty", "warm", "hot"];
const shortLabel = {
"parrhesia-pg": "pg", "parrhesia-memory": "mem",
"strfry": "strfry", "nostr-rs-relay": "nostr-rs",
"nostream": "nostream", "haven": "haven",
};
const allServers = ["parrhesia-pg", "parrhesia-memory", ...presentBaselines];
function isPhased(e) {
for (const srv of Object.values(e.servers || {})) {
if (srv.event_empty_tps !== undefined) return true;
}
return false;
}
// Build phased key: "event_tps" + "empty" → "event_empty_tps"
function phasedKey(base, level) {
const idx = base.lastIndexOf("_");
return `${base.slice(0, idx)}_${level}_${base.slice(idx + 1)}`;
}
// --- Emit linetype definitions (server × level) ---
const plotLines = [];
for (let si = 0; si < allServers.length; si++) {
const colours = serverColours[allServers[si]] || ["#888888", "#555555", "#222222"];
for (let li = 0; li < 3; li++) {
const s = levelStyles[li];
plotLines.push(
`set linetype ${si * 3 + li + 1} lc rgb "${colours[li]}" lw ${s.lw} pt ${s.pt} ps ${s.ps} dt ${s.dt}`
);
}
}
plotLines.push("");
// Panel definitions — order matches 4x2 grid (left-to-right, top-to-bottom)
const panels = [
{ kind: "simple", key: "echo_tps", label: "Echo Throughput (TPS) — higher is better", file: "echo_tps.tsv", ylabel: "TPS" },
{ kind: "simple", key: "echo_mibs", label: "Echo Throughput (MiB/s) — higher is better", file: "echo_mibs.tsv", ylabel: "MiB/s" },
{ kind: "fill", base: "event_tps", label: "Event Throughput (TPS) — higher is better", file: "event_tps.tsv", ylabel: "TPS" },
{ kind: "fill", base: "event_mibs", label: "Event Throughput (MiB/s) — higher is better", file: "event_mibs.tsv", ylabel: "MiB/s" },
{ kind: "fill", base: "req_tps", label: "Req Throughput (TPS) — higher is better", file: "req_tps.tsv", ylabel: "TPS" },
{ kind: "fill", base: "req_mibs", label: "Req Throughput (MiB/s) — higher is better", file: "req_mibs.tsv", ylabel: "MiB/s" },
{ kind: "simple", key: "connect_avg_ms", label: "Connect Avg Latency (ms) — lower is better", file: "connect_avg_ms.tsv", ylabel: "ms" },
];
for (const panel of panels) {
if (panel.kind === "simple") {
// One column per server
const header = ["tag", ...allServers.map((s) => shortLabel[s] || s)];
const rows = [header.join("\t")];
for (const e of deduped) {
const row = [
e.git_tag || "untagged",
e.servers?.["parrhesia-pg"]?.[cm.key] ?? "NaN",
e.servers?.["parrhesia-memory"]?.[cm.key] ?? "NaN",
];
for (const srv of presentBaselines) {
row.push(e.servers?.[srv]?.[cm.key] ?? "NaN");
const row = [e.git_tag || "untagged"];
for (const srv of allServers) {
row.push(e.servers?.[srv]?.[panel.key] ?? "NaN");
}
rows.push(row.join("\t"));
}
fs.writeFileSync(path.join(workDir, panel.file), rows.join("\n") + "\n", "utf8");
fs.writeFileSync(path.join(workDir, cm.file), rows.join("\n") + "\n", "utf8");
}
const serverLabels = ["parrhesia-pg", "parrhesia-memory"];
for (const srv of presentBaselines) serverLabels.push(srv);
const plotLines = [];
for (const cm of chartMetrics) {
const dataFile = `data_dir."/${cm.file}"`;
plotLines.push(`set title "${cm.label}"`);
plotLines.push(`set ylabel "${cm.ylabel}"`);
const plotParts = [];
plotParts.push(`${dataFile} using 0:2:xtic(1) lt 1 title "${serverLabels[0]}"`);
plotParts.push(`'' using 0:3 lt 2 title "${serverLabels[1]}"`);
for (let i = 0; i < presentBaselines.length; i += 1) {
plotParts.push(`'' using 0:${4 + i} lt ${3 + i} title "${serverLabels[2 + i]}"`);
}
plotLines.push("plot " + plotParts.join(", \\\n "));
// Plot: one series per server, using its "hot" linetype
const dataFile = `data_dir."/${panel.file}"`;
plotLines.push(`set title "${panel.label}"`);
plotLines.push(`set ylabel "${panel.ylabel}"`);
const parts = allServers.map((srv, si) => {
const src = si === 0 ? dataFile : "''";
const xtic = si === 0 ? ":xtic(1)" : "";
return `${src} using 0:${si + 2}${xtic} lt ${si * 3 + 3} title "${shortLabel[srv] || srv}"`;
});
plotLines.push("plot " + parts.join(", \\\n "));
plotLines.push("");
} else {
// Three columns per server (empty, warm, hot)
const header = ["tag"];
for (const srv of allServers) {
const sl = shortLabel[srv] || srv;
for (const lvl of levels) header.push(`${sl}-${lvl}`);
}
const rows = [header.join("\t")];
for (const e of deduped) {
const row = [e.git_tag || "untagged"];
const phased = isPhased(e);
for (const srv of allServers) {
const d = e.servers?.[srv];
if (!d) { row.push("NaN", "NaN", "NaN"); continue; }
if (phased) {
for (const lvl of levels) row.push(d[phasedKey(panel.base, lvl)] ?? "NaN");
} else {
row.push("NaN", d[panel.base] ?? "NaN", "NaN"); // flat → warm only
}
}
rows.push(row.join("\t"));
}
fs.writeFileSync(path.join(workDir, panel.file), rows.join("\n") + "\n", "utf8");
// Plot: three series per server (empty/warm/hot)
const dataFile = `data_dir."/${panel.file}"`;
plotLines.push(`set title "${panel.label}"`);
plotLines.push(`set ylabel "${panel.ylabel}"`);
const parts = [];
let first = true;
for (let si = 0; si < allServers.length; si++) {
const label = shortLabel[allServers[si]] || allServers[si];
for (let li = 0; li < 3; li++) {
const src = first ? dataFile : "''";
const xtic = first ? ":xtic(1)" : "";
const col = 2 + si * 3 + li;
parts.push(`${src} using 0:${col}${xtic} lt ${si * 3 + li + 1} title "${label} (${levels[li]})"`);
first = false;
}
}
plotLines.push("plot " + parts.join(", \\\n "));
plotLines.push("");
}
}
fs.writeFileSync(path.join(workDir, "plot_commands.gnuplot"), plotLines.join("\n") + "\n", "utf8");
@@ -307,6 +394,18 @@ if (!pg || !mem) {
process.exit(1);
}
// Detect phased entries — use hot fill level as headline metric
const phased = pg.event_empty_tps !== undefined;
// For phased entries, resolve "event_tps" → "event_hot_tps" etc.
function resolveKey(key) {
if (!phased) return key;
const fillKeys = ["event_tps", "event_mibs", "req_tps", "req_mibs"];
if (!fillKeys.includes(key)) return key;
const idx = key.lastIndexOf("_");
return `${key.slice(0, idx)}_hot_${key.slice(idx + 1)}`;
}
function toFixed(v, d = 2) {
return Number.isFinite(v) ? v.toFixed(d) : "n/a";
}
@@ -324,15 +423,17 @@ function boldIf(ratioStr, lowerIsBetter) {
return better ? `**${ratioStr}**` : ratioStr;
}
const fillNote = phased ? " (hot fill level)" : "";
const metricRows = [
["connect avg latency (ms) ↓", "connect_avg_ms", true],
["connect max latency (ms) ↓", "connect_max_ms", true],
["echo throughput (TPS) ↑", "echo_tps", false],
["echo throughput (MiB/s) ↑", "echo_mibs", false],
["event throughput (TPS) ↑", "event_tps", false],
["event throughput (MiB/s) ↑", "event_mibs", false],
["req throughput (TPS) ↑", "req_tps", false],
["req throughput (MiB/s) ↑", "req_mibs", false],
[`event throughput (TPS)${fillNote}`, "event_tps", false],
[`event throughput (MiB/s)${fillNote}`, "event_mibs", false],
[`req throughput (TPS)${fillNote}`, "req_tps", false],
[`req throughput (MiB/s)${fillNote}`, "req_mibs", false],
];
const preferredComparisonOrder = ["strfry", "nostr-rs-relay", "nostream", "haven"];
@@ -354,16 +455,17 @@ const alignRow = ["---"];
for (let i = 1; i < header.length; i += 1) alignRow.push("---:");
const rows = metricRows.map(([label, key, lowerIsBetter]) => {
const row = [label, toFixed(pg[key]), toFixed(mem[key])];
const rk = resolveKey(key);
const row = [label, toFixed(pg[rk]), toFixed(mem[rk])];
for (const serverName of comparisonServers) {
row.push(toFixed(servers?.[serverName]?.[key]));
row.push(toFixed(servers?.[serverName]?.[rk]));
}
row.push(boldIf(ratio(pg[key], mem[key]), lowerIsBetter));
row.push(boldIf(ratio(pg[rk], mem[rk]), lowerIsBetter));
for (const serverName of comparisonServers) {
row.push(boldIf(ratio(pg[key], servers?.[serverName]?.[key]), lowerIsBetter));
row.push(boldIf(ratio(pg[rk], servers?.[serverName]?.[rk]), lowerIsBetter));
}
return row;