11 Commits

Author SHA1 Message Date
a74106d665 chore: Bump version to 0.8.0
Some checks failed
CI / Test (OTP 28.4 / Elixir 1.19.4 + E2E) (push) Failing after 0s
Release / Release Gate (push) Failing after 0s
Release / Build and publish image (push) Has been skipped
CI / Test (OTP 27.2 / Elixir 1.18.2) (push) Failing after 0s
2026-03-26 01:23:23 +01:00
d34b398eed Merge remote-tracking branch 'public/master' (GH actions, test stability) 2026-03-26 00:49:25 +01:00
b402d95e47 feat: add sync relay guard fanout gating and env config
Some checks failed
CI / Test (OTP 27.2 / Elixir 1.18.2) (push) Failing after 0s
CI / Test (OTP 28.4 / Elixir 1.19.4 + E2E) (push) Failing after 0s
2026-03-26 00:36:00 +01:00
8309a89ba7 perf: tune cloud seeding and lower hot fill target
Some checks failed
CI / Test (OTP 27.2 / Elixir 1.18.2) (push) Failing after 0s
CI / Test (OTP 28.4 / Elixir 1.19.4 + E2E) (push) Failing after 0s
2026-03-20 21:33:17 +01:00
9ed1d80b7f bench: simplify cloud bench flow and align phased naming
Some checks failed
CI / Test (OTP 27.2 / Elixir 1.18.2) (push) Failing after 0s
CI / Test (OTP 28.4 / Elixir 1.19.4 + E2E) (push) Failing after 0s
2026-03-20 20:56:32 +01:00
4bd8663126 bench/fix: prefix bracketed cloud bench logs with T+ timestamps
Some checks failed
CI / Test (OTP 27.2 / Elixir 1.18.2) (push) Failing after 0s
CI / Test (OTP 28.4 / Elixir 1.19.4 + E2E) (push) Failing after 0s
2026-03-20 18:10:29 +01:00
f7ff3a4bd7 bench: use nostr-bench seed mode and expose relay json counters 2026-03-20 18:00:14 +01:00
8f22eb2097 build: pin nostr-bench submodule in nix and cloud bench pipeline 2026-03-20 17:43:31 +01:00
6b59fa6328 build: nostr-bench submodule
Some checks failed
CI / Test (OTP 27.2 / Elixir 1.18.2) (push) Failing after 0s
CI / Test (OTP 28.4 / Elixir 1.19.4 + E2E) (push) Failing after 0s
2026-03-20 14:33:51 +01:00
070464f2eb bench: Cloud seeding
Some checks failed
CI / Test (OTP 27.2 / Elixir 1.18.2) (push) Failing after 0s
CI / Test (OTP 28.4 / Elixir 1.19.4 + E2E) (push) Failing after 0s
2026-03-20 14:19:58 +01:00
6bd0143de4 chore: Bump version to 0.7.0, 1st beta
Some checks failed
CI / Test (OTP 27.2 / Elixir 1.18.2) (push) Failing after 0s
CI / Test (OTP 28.4 / Elixir 1.19.4 + E2E) (push) Failing after 0s
Release / Release Gate (push) Failing after 0s
Release / Build and publish image (push) Has been skipped
2026-03-20 03:44:24 +01:00
27 changed files with 1170 additions and 340 deletions

View File

@@ -13,6 +13,7 @@ POOL_SIZE=20
# PARRHESIA_POLICIES_AUTH_REQUIRED_FOR_WRITES=false # PARRHESIA_POLICIES_AUTH_REQUIRED_FOR_WRITES=false
# PARRHESIA_POLICIES_AUTH_REQUIRED_FOR_READS=false # PARRHESIA_POLICIES_AUTH_REQUIRED_FOR_READS=false
# PARRHESIA_POLICIES_MIN_POW_DIFFICULTY=0 # PARRHESIA_POLICIES_MIN_POW_DIFFICULTY=0
# PARRHESIA_SYNC_RELAY_GUARD=false
# PARRHESIA_FEATURES_VERIFY_EVENT_SIGNATURES=true # PARRHESIA_FEATURES_VERIFY_EVENT_SIGNATURES=true
# PARRHESIA_METRICS_ENABLED_ON_MAIN_ENDPOINT=true # PARRHESIA_METRICS_ENABLED_ON_MAIN_ENDPOINT=true
# PARRHESIA_METRICS_PRIVATE_NETWORKS_ONLY=true # PARRHESIA_METRICS_PRIVATE_NETWORKS_ONLY=true

View File

@@ -55,8 +55,11 @@ jobs:
steps: steps:
- name: Checkout - name: Checkout
uses: actions/checkout@v4 uses: actions/checkout@v4
with:
submodules: recursive - name: Init submodules
run: |
git submodule init marmot-ts
git submodule update --recursive
- name: Set up Elixir + OTP - name: Set up Elixir + OTP
uses: erlef/setup-beam@v1 uses: erlef/setup-beam@v1

View File

@@ -54,8 +54,11 @@ jobs:
steps: steps:
- name: Checkout - name: Checkout
uses: actions/checkout@v4 uses: actions/checkout@v4
with:
submodules: recursive - name: Init submodules
run: |
git submodule init marmot-ts
git submodule update --recursive
- name: Set up Elixir + OTP - name: Set up Elixir + OTP
uses: erlef/setup-beam@v1 uses: erlef/setup-beam@v1

3
.gitmodules vendored
View File

@@ -4,3 +4,6 @@
[submodule "docs/nips"] [submodule "docs/nips"]
path = docs/nips path = docs/nips
url = https://github.com/nostr-protocol/nips.git url = https://github.com/nostr-protocol/nips.git
[submodule "nix/nostr-bench"]
path = nix/nostr-bench
url = ssh://gitea@git.teralink.net:10322/self/nostr-bench.git

View File

@@ -264,6 +264,7 @@ CSV env vars use comma-separated values. Boolean env vars accept `1/0`, `true/fa
| `:nip66` | config-file driven | see table below | Built-in NIP-66 discovery / monitor publisher | | `:nip66` | config-file driven | see table below | Built-in NIP-66 discovery / monitor publisher |
| `:sync.path` | `PARRHESIA_SYNC_PATH` | `nil` | Optional path to sync peer config | | `:sync.path` | `PARRHESIA_SYNC_PATH` | `nil` | Optional path to sync peer config |
| `:sync.start_workers?` | `PARRHESIA_SYNC_START_WORKERS` | `true` | Start outbound sync workers on boot | | `:sync.start_workers?` | `PARRHESIA_SYNC_START_WORKERS` | `true` | Start outbound sync workers on boot |
| `:sync.relay_guard` | `PARRHESIA_SYNC_RELAY_GUARD` | `false` | Suppress multi-node re-fanout for sync-originated events |
| `:limits` | `PARRHESIA_LIMITS_*` | see table below | Runtime override group | | `:limits` | `PARRHESIA_LIMITS_*` | see table below | Runtime override group |
| `:policies` | `PARRHESIA_POLICIES_*` | see table below | Runtime override group | | `:policies` | `PARRHESIA_POLICIES_*` | see table below | Runtime override group |
| `:listeners` | config-file driven | see notes below | Ingress listeners with bind, transport, feature, auth, network, and baseline ACL settings | | `:listeners` | config-file driven | see notes below | Ingress listeners with bind, transport, feature, auth, network, and baseline ACL settings |

View File

@@ -39,7 +39,8 @@ config :parrhesia,
], ],
sync: [ sync: [
path: nil, path: nil,
start_workers?: true start_workers?: true,
relay_guard: false
], ],
limits: [ limits: [
max_frame_bytes: 1_048_576, max_frame_bytes: 1_048_576,

View File

@@ -161,6 +161,7 @@ if config_env() == :prod do
retention_defaults = Application.get_env(:parrhesia, :retention, []) retention_defaults = Application.get_env(:parrhesia, :retention, [])
features_defaults = Application.get_env(:parrhesia, :features, []) features_defaults = Application.get_env(:parrhesia, :features, [])
acl_defaults = Application.get_env(:parrhesia, :acl, []) acl_defaults = Application.get_env(:parrhesia, :acl, [])
sync_defaults = Application.get_env(:parrhesia, :sync, [])
default_pool_size = Keyword.get(repo_defaults, :pool_size, 32) default_pool_size = Keyword.get(repo_defaults, :pool_size, 32)
default_queue_target = Keyword.get(repo_defaults, :queue_target, 1_000) default_queue_target = Keyword.get(repo_defaults, :queue_target, 1_000)
@@ -748,7 +749,12 @@ if config_env() == :prod do
start_workers?: start_workers?:
bool_env.( bool_env.(
"PARRHESIA_SYNC_START_WORKERS", "PARRHESIA_SYNC_START_WORKERS",
Keyword.get(Application.get_env(:parrhesia, :sync, []), :start_workers?, true) Keyword.get(sync_defaults, :start_workers?, true)
),
relay_guard:
bool_env.(
"PARRHESIA_SYNC_RELAY_GUARD",
Keyword.get(sync_defaults, :relay_guard, false)
) )
], ],
moderation_cache_enabled: moderation_cache_enabled:

View File

@@ -10,7 +10,7 @@
vips, vips,
}: let }: let
pname = "parrhesia"; pname = "parrhesia";
version = "0.7.0"; version = "0.8.0";
beamPackages = beam.packages.erlang_28.extend ( beamPackages = beam.packages.erlang_28.extend (
final: _prev: { final: _prev: {

View File

@@ -87,11 +87,27 @@
"type": "github" "type": "github"
} }
}, },
"nostr-bench-src": {
"flake": false,
"locked": {
"lastModified": 1774020724,
"owner": "serpent213",
"repo": "nostr-bench",
"rev": "8561b84864ce1269b26304808c64219471999caf",
"type": "github"
},
"original": {
"owner": "serpent213",
"repo": "nostr-bench",
"type": "github"
}
},
"root": { "root": {
"inputs": { "inputs": {
"devenv": "devenv", "devenv": "devenv",
"git-hooks": "git-hooks", "git-hooks": "git-hooks",
"nixpkgs": "nixpkgs", "nixpkgs": "nixpkgs",
"nostr-bench-src": "nostr-bench-src",
"pre-commit-hooks": [ "pre-commit-hooks": [
"git-hooks" "git-hooks"
] ]

View File

@@ -73,7 +73,9 @@ in {
vips.overrideAttrs (oldAttrs: { vips.overrideAttrs (oldAttrs: {
buildInputs = oldAttrs.buildInputs ++ [mozjpeg]; buildInputs = oldAttrs.buildInputs ++ [mozjpeg];
}); });
nostr-bench = pkgs.callPackage ./nix/nostr-bench.nix {}; nostr-bench = pkgs.callPackage ./nix/nostr-bench.nix {
nostrBenchSrc = inputs.nostr-bench-src;
};
in in
with pkgs; with pkgs;
[ [

View File

@@ -2,6 +2,9 @@
inputs: inputs:
nixpkgs: nixpkgs:
url: github:cachix/devenv-nixpkgs/rolling url: github:cachix/devenv-nixpkgs/rolling
nostr-bench-src:
url: github:serpent213/nostr-bench
flake: false
# If you're using non-OSS software, you can set allowUnfree to true. # If you're using non-OSS software, you can set allowUnfree to true.
# allowUnfree: true # allowUnfree: true

344
docs/BETA_REVIEW.md Normal file
View File

@@ -0,0 +1,344 @@
# Parrhesia Beta: Production-Readiness Gap Assessment
**Date:** 2026-03-20
**Version:** 0.7.0
**Scope:** Delta analysis from beta promotion — what stands between this codebase and confident public-facing production deployment.
---
## Production Readiness Scorecard
| # | Dimension | Rating | Summary |
|---|----------------------------------|--------|----------------------------------------------|
| 1 | Operational Resilience | 🟡 | Graceful shutdown partial; no DB circuit-breaking |
| 2 | Multi-Node / Clustering | 🟡 | Best-effort only; acceptable for single-node prod |
| 3 | Load & Capacity Characterisation | 🟡 | Benchmarks exist but no defined capacity model |
| 4 | Deployment & Infrastructure | 🟡 | Strong Nix/Docker base; missing runbooks and migration strategy |
| 5 | Security Hardening | 🟢 | Solid for production with reverse proxy |
| 6 | Data Integrity & Consistency | 🟢 | Transaction-wrapped writes with dedup; minor multi-node edge cases |
| 7 | Observability Completeness | 🟡 | Excellent metrics; no dashboards, alerts, or tracing |
| 8 | Technical Debt (Prod Impact) | 🟡 | Manageable; connection.ex size is the main concern |
---
## 1. Operational Resilience — 🟡
### What's good
- **No `Process.sleep` on any hot path.** Zero occurrences in `lib/`. Clean async message passing throughout.
- **WebSocket keepalive** implemented: 30s ping, 10s pong timeout, auto-close on timeout.
- **Outbound queue backpressure** well-designed: bounded queue (256 default), configurable overflow strategy (`:close`/`:drop_oldest`/`:drop_newest`), pressure telemetry at 75% threshold.
- **Connection isolation:** Each WebSocket is a separate process; one crash does not propagate.
- **Graceful connection close on shutdown:** `handle_info({:EXIT, _, :shutdown}, ...)` drains outbound frames before closing with code 1012 ("service restart"). This is good.
### Gaps
**G1.1 — No DB circuit-breaking or backoff on PostgreSQL unavailability.**
Ecto's connection pool (`db_connection`/`DBConnection`) will queue checkout requests up to `queue_target` (1000ms) / `queue_interval` (5000ms), then raise `DBConnection.ConnectionError`. These errors propagate as storage failures in the ingest path and return NOTICE errors to clients. However:
- There is no circuit breaker to fast-reject requests when the DB is known-down, meaning every ingest/query attempt during an outage burns a pool checkout timeout slot.
- On DB recovery, all queued checkouts may succeed simultaneously (thundering herd).
- **Impact:** During a PostgreSQL failover (typically 1030s), connection processes pile up waiting on the pool. Latency spikes for all connected clients. Memory pressure from queued processes.
- **Mitigation:** Ecto's built-in queue management provides partial protection. For a relay with ≤1000 concurrent connections this is likely survivable without circuit-breaking. For higher connection counts, consider a fast-fail wrapper around storage calls when the pool reports consecutive failures.
**G1.2 — Metrics scrape on the hot path.**
`/metrics` calls `TelemetryMetricsPrometheus.Core.scrape/1` synchronously within the HTTP request handler. This serialises metric aggregation and formatting. If the Prometheus reporter's internal state is large (many unique tag combinations), scraping can take 10100ms. This runs on a Bandit acceptor process — it does not block WebSocket connections directly, but a slow scrape under high cardinality could make the health endpoint unresponsive if metrics and health share the same listener.
- **Current mitigation:** Metrics can be isolated to a dedicated listener via `PARRHESIA_METRICS_ENDPOINT_*` config. If deployed this way, impact is isolated.
- **Recommendation:** Document the dedicated metrics listener as required for production. Consider adding a scrape timeout guard.
**G1.3 — Supervisor shutdown timeout is OTP default (5s).**
The `Parrhesia.Runtime` supervisor uses `:one_for_one` strategy with default child shutdown specs. Bandit listeners have their own shutdown behavior, but there is no explicit `shutdown: N` on the endpoint child spec. Under load with many connections, 5s may not be enough to drain all outbound queues.
- **Recommendation:** Set explicit `shutdown: 15_000` on `Parrhesia.Web.Endpoint` child spec. Bandit supports graceful drain on listener stop.
---
## 2. Multi-Node / Clustering — 🟡
### Current state
Per `docs/CLUSTER.md`, clustering is **implemented but explicitly best-effort and untested**:
- `:pg`-based process groups for cross-node fanout.
- No automatic cluster discovery (no libcluster).
- ETS subscription index is node-local.
- No durable inter-node transport; no replay on reconnect.
- No explicit acknowledgement between nodes.
### Assessment for production
**For single-node production deployment: not a blocker.** The clustering code is unconditionally started (`MultiNode` joins `:pg` on init) but with a single node, `get_members/0` returns only self, and the `Enum.reject(&(&1 == self()))` filter means no remote sends occur. No performance overhead.
**For multi-node production: not ready.** Key issues:
- **Subscription inconsistency on netsplit:** Events ingested on node A during a split are never delivered to subscribers on node B. No catch-up mechanism exists. Clients must reconnect and re-query to recover.
- **Node departure drops subscriptions silently:** When a node leaves the cluster, subscribers on that node lose their connections (normal). Subscribers on other nodes are unaffected. But events that were in-flight from the departed node are lost.
- **No cluster health observability:** No metrics for inter-node fanout lag, message drops, or membership changes.
**Recommendation for initial production:** Deploy single-node. Clustering is a Phase B concern per the documented roadmap.
---
## 3. Load & Capacity Characterisation — 🟡
### What exists
- `LoadSoakTest` asserts p95 fanout enqueue/drain < 25ms.
- `bench/` directory with `nostr-bench` submodule for external load testing.
- Cloud bench orchestration scripts (`scripts/cloud_bench_orchestrate.mjs`, `scripts/cloud_bench_server.sh`).
### Gaps
**G3.1 — No documented capacity model.**
There is no documented answer to: "How many connections / events per second can one node handle before degradation?" The `LoadSoakTest` runs locally with synthetic data — useful for regression detection but not representative of production traffic patterns.
**G3.2 — Multi-filter query scaling is in-memory dedup.**
`Postgres.Events.query/3` runs each filter as a separate SQL query, collects all results into memory, and deduplicates with `deduplicate_events/1` (Map.update accumulation). With many overlapping filters or high-cardinality results, this could produce significant memory pressure per-request.
- At realistic scales (< 10 filters, < 1000 results per filter), this is fine.
- At adversarial scales (32 subscriptions × large result sets), a single REQ could allocate substantial memory.
- **Current mitigation:** `max_tag_values_per_filter` (128) and query `LIMIT` bounds exist. The risk is bounded but not eliminated.
**G3.3 — No query performance benchmarks against large datasets.**
No evidence of testing against 100M+ events with monthly partitions. Partition pruning is implemented, but query plans may degrade if the partition list grows large (PostgreSQL planner overhead scales with partition count).
**Recommendation:** Before production, run `nostr-bench` at target load (e.g., 500 concurrent connections, 100 events/sec ingest, 1000 active subscriptions) and document the resulting latency profile. This becomes the baseline capacity model.
---
## 4. Deployment & Infrastructure Readiness — 🟡
### What's good
- **Docker image via Nix:** Non-root user (65534:65534), minimal base, cacerts bundled, SSL_CERT_FILE set. This is production-quality container hygiene.
- **OTP release:** `mix release` with `Parrhesia.Release.migrate/0` for safe migration execution.
- **CI pipeline:** Multi-matrix testing (OTP 27/28, Elixir 1.18/1.19), format/credo/unused deps checks, E2E tests.
- **Environment-based configuration:** All critical settings overridable via `PARRHESIA_*` env vars in `runtime.exs`.
- **Secrets:** No secrets committed. DB credentials via `DATABASE_URL`, identity key via env or file path.
### Gaps
**G4.1 — No zero-downtime migration strategy.**
`Parrhesia.Release.migrate/0` runs `Ecto.Migrator.run/4` with `:up`. Under replicated deployments (rolling update with 2+ instances), there is no advisory lock or migration guard — two instances starting simultaneously could race on migrations. Ecto's default migrator uses `pg_advisory_lock` via `Ecto.Migration.Runner`, so this is actually safe for PostgreSQL. However:
- **DDL migrations (CREATE INDEX CONCURRENTLY, ALTER TABLE) need careful handling.** The existing migrations use standard `CREATE TABLE` and `CREATE INDEX` which acquire ACCESS EXCLUSIVE locks. Running these against a live database will block reads and writes for the duration.
- **Recommendation:** For production, migrations should be run as a separate step before deploying new code (the compose.yaml already has a `migrate` service — extend this pattern).
**G4.2 — No operational runbooks.**
There are no documented procedures for:
- Rolling restart / blue-green deploy
- Partition pruning and retention tuning
- Runtime pubkey banning (the NIP-86 management API exists but isn't documented for ops use)
- DB failover response
- Scaling (horizontal or vertical)
**G4.3 — No health check in Docker image.**
The Nix-built Docker image has no `HEALTHCHECK` instruction. The `/health` and `/ready` endpoints exist but aren't wired into container orchestration.
- **Recommendation:** Add `HEALTHCHECK CMD curl -f http://localhost:4413/ready || exit 1` to the Docker image definition, or document the readiness endpoint for Kubernetes probes.
**G4.4 — No disaster recovery plan.**
No documented RTO/RPO. If the primary DB is lost, recovery depends entirely on external backup infrastructure. The relay has no built-in data export or snapshot capability.
---
## 5. Security Hardening — 🟢
### Assessment
The security posture is solid for production behind a reverse proxy:
- **TLS:** Full support for server, mutual, and proxy-terminated TLS modes. Cipher suite selection (strong/compatible). Certificate pin verification.
- **Rate limiting:** Three layers — relay-wide (10k/s), per-IP (1k/s), per-connection (120/s). All configurable.
- **Metrics endpoint:** Access-controlled via `metrics_allowed?/2` — supports private-network-only restriction and bearer token auth. Tested.
- **NIP-42 auth:** Constant-time comparison via `Plug.Crypto.secure_compare/2` (addressed in beta).
- **NIP-98:** Replay protection, event freshness check (< 60s), signature verification.
- **Input validation:** Binary field length constraints at DB level (migration 7). Event size limits at WebSocket frame level.
- **IP controls:** Trusted proxy CIDR configuration, X-Forwarded-For parsing, IP blocklist table.
- **Audit logging:** `management_audit_logs` table tracks admin actions.
- **No secrets in git.** Environment variable or file-path based secret injection.
### Minor considerations (not blocking)
- No integration with external threat intel feeds or IP reputation services. This is an infrastructure concern, not an application concern.
- DDoS mitigation assumed to be at load balancer / CDN layer. Application-level rate limiting is defense-in-depth, not primary.
- **Recommendation:** Document the expected deployment topology (Caddy/Nginx → Parrhesia) and which security controls are expected at each layer.
---
## 6. Data Integrity & Consistency — 🟢
### What's good
- **Duplicate event prevention:** Two-layer defence:
1. `event_ids` table with unique PK on `id``INSERT ... ON CONFLICT DO NOTHING`.
2. If `inserted == 0`, transaction rolls back with `:duplicate_event`.
3. Separate unique index on `events.id` as belt-and-suspenders.
- **Atomic writes:** `put_event/2` wraps `insert_event_id!`, `insert_event!`, `insert_tags!`, and `upsert_state_tables!` in a single `Repo.transaction/1`. Partial writes (event without tags) cannot occur.
- **Replaceable/addressable event state:** Upsert logic in state tables with correct conflict resolution (higher `created_at` wins, then lower `id` as tiebreaker via `candidate_wins_state?/2`).
### Minor considerations
**G6.1 — Expiration worker concurrency on multi-node.**
`ExpirationWorker` runs `Repo.delete_all/1` against all expired events. If two nodes run this worker against the same database, both execute the same DELETE query. PostgreSQL handles this safely (the second DELETE finds 0 rows), and the worker is idempotent. **Not a problem.**
**G6.2 — Partition pruning and sync.**
`PartitionRetentionWorker.drop_partition/1` drops entire monthly partitions. If negentropy sync is in progress against events in that partition, the sync session's cached refs become stale. The session would fail or return incomplete results.
- **Impact:** Low. Partition drops are infrequent (daily check, at most 1 per run). Negentropy sessions are short-lived (60s idle timeout).
- **Recommendation:** No action needed for initial production. If operating as a sync source relay, consider pausing sync during partition drops.
---
## 7. Observability Completeness — 🟡
### What's good
Metrics coverage is comprehensive — 34+ distinct metrics covering:
- Ingest: event count by outcome/reason, duration distribution
- Query: request count, duration, result cardinality
- Fanout: duration, candidates considered, events enqueued, batch size
- Connection: outbound queue depth/pressure/overflow/drop, mailbox depth
- Rate limiting: hit count by scope
- DB: query count/total_time/queue_time/query_time/decode_time/idle_time by repo role
- Maintenance: expiration purge count/duration, partition retention drops/duration
- VM: memory (total/processes/system/atom/binary/ets)
- Listener: active connections, active subscriptions
Readiness endpoint checks critical process liveness. Health endpoint for basic reachability.
### Gaps
**G7.1 — No dashboards or alerting rules.**
The metrics exist but there are no Grafana dashboard JSON files, no Prometheus alerting rules, and no documented alert thresholds. An operator deploying this relay would need to build observability from scratch.
- **Recommendation:** Ship a `deploy/grafana/` directory with a dashboard JSON and a `deploy/prometheus/alerts.yml` with rules for:
- `parrhesia_db_query_queue_time_ms` p95 > 100ms (pool saturation)
- `parrhesia_connection_outbound_queue_overflow_count` rate > 0 (clients being dropped)
- `parrhesia_rate_limit_hits_count` rate sustained > threshold (potential abuse)
- `parrhesia_vm_memory_total_bytes` > 80% of available
- Listener connection count approaching `max_connections`
**G7.2 — No distributed tracing or request correlation IDs.**
Events flow through validate → policy → persist → fanout without a correlation ID tying the stages together. Log-based debugging of "why didn't this event reach subscriber X" requires manual PID correlation across log lines.
- **Impact:** Tolerable for initial production at moderate scale. Becomes painful at high event rates.
**G7.3 — No synthetic monitoring.**
No built-in probe that ingests a canary event and verifies it arrives at a subscriber. End-to-end relay health depends on external monitoring.
- **Recommendation:** This is best implemented as an external tool. Not blocking.
---
## 8. Technical Debt with Production Impact — 🟡
### G8.1 — `connection.ex` at 2,116 lines
This module is the per-connection state machine handling EVENT, REQ, CLOSE, AUTH, COUNT, NEG-*, keepalive, outbound queue management, rate limiting, and all associated telemetry. It is the single most critical file for production incident response.
**Production risk:** During a production incident involving connection behavior, an on-call engineer needs to quickly navigate this module. At 2,116 lines with interleaved concerns (protocol parsing, policy enforcement, queue management, telemetry emission), this slows incident response.
**Recommendation (M-sized effort):** Extract into focused modules:
- `Connection.Ingest` — EVENT handling and policy application
- `Connection.Subscription` — REQ/CLOSE management and initial query streaming
- `Connection.OutboundQueue` — queue/drain/overflow logic
- `Connection.Keepalive` — ping/pong state machine
The main `Connection` module would become an orchestrator delegating to these. This is a refactor-only change with no behavioral impact.
### G8.2 — Multi-filter in-memory dedup
`deduplicate_events/1` accumulates all query results into a Map before deduplication. With 32 subscriptions (the max) and generous limits, worst case is:
- 32 filters × 5000 result limit = 160,000 events loaded into memory per REQ.
Each event struct is ~500 bytes minimum, so ~80MB per pathological request. This is bounded but could be weaponised by an attacker sending many concurrent REQs with overlapping filters.
**Current mitigation:** Per-connection subscription limit (32) and query result limits bound the damage. Per-IP rate limiting adds friction.
**Recommendation:** Not blocking for production. Monitor `parrhesia.query.results.count` distribution. If p99 > 10,000, investigate query patterns.
### G8.3 — Per-pubkey rate limiting absent
Rate limiting is currently per-IP and relay-wide. An attacker using a botnet (many IPs, one pubkey) bypasses IP-based limits. Per-pubkey rate limiting would catch this.
**Impact:** Medium for a public relay; low for an invite-only (NIP-43) relay.
**Recommendation (S-sized effort):** Add a per-pubkey event ingest limiter similar to `IPEventIngestLimiter`, keyed by `event.pubkey`. Apply after signature verification but before storage.
### G8.4 — Negentropy session memory ceiling
Negentropy session bounds:
- Max 10,000 total sessions (`@default_max_total_sessions`)
- Max 8 per connection (`@default_max_sessions_per_owner`)
- Max 50,000 items per session (`@default_max_items_per_session`)
- 60s idle timeout with 10s sweep interval
Worst case: 10,000 sessions × 50,000 items × ~40 bytes/ref = ~20GB. This is the theoretical maximum under adversarial session creation.
**Realistic ceiling:** The `open/6` path runs a DB query bounded by `max_items_per_session + 1`. At 50k items, this query itself provides backpressure (it takes time). An attacker would need 10,000 concurrent connections each opening 8 sessions, each returning 50k results. The relay-wide connection limit and rate limiting make this implausible in practice.
**Recommendation:** Reduce `@default_max_items_per_session` to 10,000 for production (reduces theoretical ceiling to ~4GB). This is a config change, not a code change.
---
## Critical Path to Production
Ordered by priority. Items above the line are required before production traffic; items below are strongly recommended.
| # | Work Item | Dimension | Effort |
|---|-----------|-----------|--------|
| 1 | Set explicit shutdown timeout on Endpoint child spec | Operational | S |
| 2 | Document dedicated metrics listener as production requirement | Operational | S |
| 3 | Add HEALTHCHECK to Docker image or document K8s probes | Deployment | S |
| 4 | Run capacity benchmark at target load and document results | Load | M |
| 5 | Ship Grafana dashboard + Prometheus alert rules | Observability | M |
| 6 | Write operational runbook (deploy, rollback, ban, failover) | Deployment | M |
| 7 | Document migration strategy (run before deploy, not during) | Deployment | S |
| --- | --- | --- | --- |
| 8 | Add per-pubkey rate limiting | Security | S |
| 9 | Reduce default negentropy items-per-session to 10k | Security | S |
| 10 | Extract connection.ex into sub-modules | Debt | M |
| 11 | Add request correlation IDs to event lifecycle | Observability | M |
| 12 | Add DB pool health fast-fail wrapper | Operational | M |
---
## Production Risk Register
| ID | Risk | Likelihood | Impact | Mitigation |
|----|------|-----------|--------|------------|
| R1 | PostgreSQL failover causes latency spike for all connections | Medium | High | G1.1: Ecto queue management provides partial protection. Add pool health telemetry alerting. Consider circuit breaker at high connection counts. |
| R2 | Slow /metrics scrape blocks health checks | Low | Medium | G1.2: Deploy dedicated metrics listener (already supported). |
| R3 | Ungraceful shutdown drops in-flight events | Low | Medium | G1.3: Set explicit shutdown timeout. Connection drain logic already exists. |
| R4 | Multi-IP spam campaign bypasses rate limiting | Medium | Medium | G8.3: Add per-pubkey rate limiter. NIP-43 invite-only mode mitigates for private relays. |
| R5 | Large REQ with many overlapping filters causes memory spike | Low | Medium | G8.2: Bounded by existing limits. Monitor query result cardinality. |
| R6 | No alerting means silent degradation | Medium | High | G7.1: Ship dashboard and alert rules before production. |
| R7 | DDL migration blocks reads during rolling deploy | Low | High | G4.1: Run migrations as separate pre-deploy step. |
| R8 | Adversarial negentropy session creation exhausts memory | Low | High | G8.4: Reduce max items per session. Existing session limits provide protection. |
| R9 | No runbooks slows incident response | Medium | Medium | G4.2: Write runbooks for common ops tasks. |
| R10 | connection.ex complexity slows debugging | Medium | Low | G8.1: Extract sub-modules. Not urgent but improves maintainability. |
---
## Final Verdict
### 🟡 Ready for Limited Production
**Constraints for initial deployment:**
1. **Single-node only.** Multi-node clustering is best-effort and should not be relied upon for production traffic. Deploy one node with a properly sized PostgreSQL instance.
2. **Behind a reverse proxy.** Deploy behind Caddy, Nginx, or a cloud load balancer for TLS termination, DDoS mitigation, and connection limits. Document the expected topology.
3. **Moderate traffic cap.** Without a validated capacity model, start with conservative limits:
- ≤ 2,000 concurrent WebSocket connections
- ≤ 500 events/second ingest rate
- Monitor `db.query.queue_time.ms` p95 and `connection.outbound_queue.overflow.count` as scaling signals.
4. **Observability must be deployed alongside.** The metrics exist but dashboards and alerts do not. Do not go live without at minimum:
- Prometheus scraping the dedicated metrics listener
- Alerts on DB queue time, outbound queue overflow, and VM memory
- Log aggregation with ERROR-level alerts
5. **Migrations run pre-deploy.** Use the existing compose.yaml `migrate` service pattern. Never run migrations as part of application startup in a multi-replica deployment.
**What's strong:**
- OTP supervision architecture is clean and fault-isolated
- Data integrity layer is well-designed (transactional writes, dedup, constraint enforcement)
- Security posture is production-appropriate
- Telemetry coverage is comprehensive
- Container image follows best practices
- No blocking issues in the hot path (no sleeps, no synchronous calls, bounded queues)
**The codebase is architecturally sound for production.** The gaps are operational (runbooks, dashboards, capacity planning) rather than structural. A focused sprint addressing items 17 from the critical path would clear the way for a controlled production launch.

View File

@@ -2,6 +2,8 @@
description = "Parrhesia Nostr relay"; description = "Parrhesia Nostr relay";
inputs.nixpkgs.url = "github:NixOS/nixpkgs/nixos-unstable"; inputs.nixpkgs.url = "github:NixOS/nixpkgs/nixos-unstable";
# Can be reenabled once patched nostr-bench is up on GitHub
# inputs.self.submodules = true;
outputs = {nixpkgs, ...}: let outputs = {nixpkgs, ...}: let
systems = [ systems = [

View File

@@ -87,7 +87,7 @@ defmodule Parrhesia.API.Events do
end end
Dispatcher.dispatch(event) Dispatcher.dispatch(event)
maybe_publish_multi_node(event) maybe_publish_multi_node(event, context)
{:ok, {:ok,
%PublishResult{ %PublishResult{
@@ -312,9 +312,15 @@ defmodule Parrhesia.API.Events do
end end
end end
defp maybe_publish_multi_node(event) do defp maybe_publish_multi_node(event, %RequestContext{} = context) do
relay_guard? = Parrhesia.Config.get([:sync, :relay_guard], false)
if relay_guard? and context.caller == :sync do
:ok
else
MultiNode.publish(event) MultiNode.publish(event)
:ok :ok
end
catch catch
:exit, _reason -> :ok :exit, _reason -> :ok
end end

View File

@@ -4,7 +4,7 @@ defmodule Parrhesia.MixProject do
def project do def project do
[ [
app: :parrhesia, app: :parrhesia,
version: "0.7.0", version: "0.8.0",
elixir: "~> 1.18", elixir: "~> 1.18",
elixirc_paths: elixirc_paths(Mix.env()), elixirc_paths: elixirc_paths(Mix.env()),
start_permanent: Mix.env() == :prod, start_permanent: Mix.env() == :prod,

1
nix/nostr-bench Submodule

Submodule nix/nostr-bench added at 8561b84864

View File

@@ -321,6 +321,12 @@ version = "0.21.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a4a4ddaa51a5bc52a6948f74c06d20aaaddb71924eab79b8c97a8c556e942d6a" checksum = "a4a4ddaa51a5bc52a6948f74c06d20aaaddb71924eab79b8c97a8c556e942d6a"
[[package]]
name = "bech32"
version = "0.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d86b93f97252c47b41663388e6d155714a9d0c398b99f1005cbc5f978b29f445"
[[package]] [[package]]
name = "bitcoin_hashes" name = "bitcoin_hashes"
version = "0.11.0" version = "0.11.0"
@@ -1249,6 +1255,7 @@ version = "0.19.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "377785e61e0da6a13226a4244e8c28b9a858aa0a5ed10830109f61ade1c2a3f2" checksum = "377785e61e0da6a13226a4244e8c28b9a858aa0a5ed10830109f61ade1c2a3f2"
dependencies = [ dependencies = [
"bech32",
"bitcoin_hashes", "bitcoin_hashes",
"getrandom", "getrandom",
"instant", "instant",
@@ -1262,7 +1269,7 @@ dependencies = [
[[package]] [[package]]
name = "nostr-bench" name = "nostr-bench"
version = "0.4.0" version = "0.5.0-parrhesia"
dependencies = [ dependencies = [
"actix", "actix",
"actix-web", "actix-web",

View File

@@ -1,22 +1,19 @@
{ {
lib, lib,
fetchFromGitHub,
rustPlatform, rustPlatform,
pkgsCross, pkgsCross,
runCommand, runCommand,
staticX86_64Musl ? false, staticX86_64Musl ? false,
nostrBenchSrc ? ./nostr-bench,
}: let }: let
selectedRustPlatform = selectedRustPlatform =
if staticX86_64Musl if staticX86_64Musl
then pkgsCross.musl64.rustPlatform then pkgsCross.musl64.rustPlatform
else rustPlatform; else rustPlatform;
srcBase = fetchFromGitHub { # Keep the submodule path as-is so devenv can evaluate it correctly.
owner = "rnostr"; # `lib.cleanSource` treats submodule contents as untracked in this context.
repo = "nostr-bench"; srcBase = nostrBenchSrc;
rev = "d3ab701512b7c871707b209ef3f934936e407963";
hash = "sha256-F2qg1veO1iNlVUKf1b/MV+vexiy4Tt+w2aikDDbp7tU=";
};
src = src =
if staticX86_64Musl if staticX86_64Musl
@@ -31,7 +28,7 @@ in
selectedRustPlatform.buildRustPackage ( selectedRustPlatform.buildRustPackage (
{ {
pname = "nostr-bench"; pname = "nostr-bench";
version = "0.4.0"; version = "0.5.0-parrhesia";
inherit src; inherit src;
@@ -46,11 +43,11 @@ in
}; };
} }
// lib.optionalAttrs staticX86_64Musl { // lib.optionalAttrs staticX86_64Musl {
cargoHash = "sha256-aL8XSBJ8sHl7CGh9SkOoI+WlAHKrdij2DfvZAWIKgKY="; cargoHash = "sha256-098BUjDLiezoFXs7fF+w7NQM+DPPfHMo1HGx3nV2UZM=";
CARGO_BUILD_TARGET = "x86_64-unknown-linux-musl"; CARGO_BUILD_TARGET = "x86_64-unknown-linux-musl";
RUSTFLAGS = "-C target-feature=+crt-static"; RUSTFLAGS = "-C target-feature=+crt-static";
} }
// lib.optionalAttrs (!staticX86_64Musl) { // lib.optionalAttrs (!staticX86_64Musl) {
cargoHash = "sha256-mh9UdYhZl6JVJEeDFnY5BjfcK+PrWBBEn1Qh7/ZX17k="; cargoHash = "sha256-x2pnxJL2nwni4cpSaUerDOfhdcTKJTyABYjPm96dAC0=";
} }
) )

View File

@@ -5,7 +5,7 @@ relay_url="${1:-}"
mode="${2:-all}" mode="${2:-all}"
if [[ -z "$relay_url" ]]; then if [[ -z "$relay_url" ]]; then
echo "usage: cloud-bench-client.sh <relay-url> [connect|echo|event|req|all]" >&2 echo "usage: cloud-bench-client.sh <relay-url> [connect|echo|event|req|seed|all]" >&2
exit 1 exit 1
fi fi
@@ -43,6 +43,9 @@ run_event() {
-r "${PARRHESIA_BENCH_EVENT_RATE:-50}" \ -r "${PARRHESIA_BENCH_EVENT_RATE:-50}" \
-k "${PARRHESIA_BENCH_KEEPALIVE_SECONDS:-5}" \ -k "${PARRHESIA_BENCH_KEEPALIVE_SECONDS:-5}" \
-t "${bench_threads}" \ -t "${bench_threads}" \
--send-strategy "${PARRHESIA_BENCH_EVENT_SEND_STRATEGY:-pipelined}" \
--inflight "${PARRHESIA_BENCH_EVENT_INFLIGHT:-32}" \
--ack-timeout "${PARRHESIA_BENCH_EVENT_ACK_TIMEOUT:-30}" \
"${relay_url}" "${relay_url}"
} }
@@ -57,11 +60,33 @@ run_req() {
"${relay_url}" "${relay_url}"
} }
run_seed() {
local target_accepted="${PARRHESIA_BENCH_SEED_TARGET_ACCEPTED:-}"
if [[ -z "$target_accepted" ]]; then
echo "PARRHESIA_BENCH_SEED_TARGET_ACCEPTED must be set for seed mode" >&2
exit 2
fi
echo "==> nostr-bench seed ${relay_url}"
"$bench_bin" seed --json \
--target-accepted "$target_accepted" \
-c "${PARRHESIA_BENCH_SEED_CONNECTION_COUNT:-512}" \
-r "${PARRHESIA_BENCH_SEED_CONNECTION_RATE:-512}" \
-k "${PARRHESIA_BENCH_SEED_KEEPALIVE_SECONDS:-0}" \
-t "${bench_threads}" \
--send-strategy "${PARRHESIA_BENCH_SEED_SEND_STRATEGY:-pipelined}" \
--inflight "${PARRHESIA_BENCH_SEED_INFLIGHT:-128}" \
--ack-timeout "${PARRHESIA_BENCH_SEED_ACK_TIMEOUT:-20}" \
"${relay_url}"
}
case "$mode" in case "$mode" in
connect) run_connect ;; connect) run_connect ;;
echo) run_echo ;; echo) run_echo ;;
event) run_event ;; event) run_event ;;
req) run_req ;; req) run_req ;;
seed) run_seed ;;
all) run_connect; echo; run_echo; echo; run_event; echo; run_req ;; all) run_connect; echo; run_echo; echo; run_event; echo; run_req ;;
*) echo "unknown mode: $mode" >&2; exit 1 ;; *) echo "unknown mode: $mode" >&2; exit 1 ;;
esac esac

View File

@@ -23,15 +23,35 @@ const __filename = fileURLToPath(import.meta.url);
const __dirname = path.dirname(__filename); const __dirname = path.dirname(__filename);
const ROOT_DIR = path.resolve(__dirname, ".."); const ROOT_DIR = path.resolve(__dirname, "..");
const DEFAULT_TARGETS = ["parrhesia-pg", "parrhesia-memory", "strfry", "nostr-rs-relay", "nostream", "haven"]; const DEFAULT_TARGETS = [
"parrhesia-pg",
"parrhesia-memory",
"strfry",
"nostr-rs-relay",
"nostream",
// "haven", // disabled by default: Haven rejects generic nostr-bench event seeding (auth/whitelist/WoT policies)
];
const ESTIMATE_WINDOW_MINUTES = 30; const ESTIMATE_WINDOW_MINUTES = 30;
const ESTIMATE_WINDOW_HOURS = ESTIMATE_WINDOW_MINUTES / 60; const ESTIMATE_WINDOW_HOURS = ESTIMATE_WINDOW_MINUTES / 60;
const ESTIMATE_WINDOW_LABEL = `${ESTIMATE_WINDOW_MINUTES}m`; const ESTIMATE_WINDOW_LABEL = `${ESTIMATE_WINDOW_MINUTES}m`;
const BENCH_BUILD_DIR = path.join(ROOT_DIR, "_build", "bench"); const BENCH_BUILD_DIR = path.join(ROOT_DIR, "_build", "bench");
const NOSTR_BENCH_SUBMODULE_DIR = path.join(ROOT_DIR, "nix", "nostr-bench");
const NOSTREAM_REDIS_IMAGE = "redis:7.0.5-alpine3.16"; const NOSTREAM_REDIS_IMAGE = "redis:7.0.5-alpine3.16";
const SEED_TOLERANCE_RATIO = 0.01; const SEED_TOLERANCE_RATIO = 0.01;
const SEED_MAX_ROUNDS = 4; const SEED_MAX_ROUNDS = 4;
const SEED_EVENT_RATE = 5000; const SEED_KEEPALIVE_SECONDS = 0;
const SEED_EVENTS_PER_CONNECTION_TARGET = 500;
const SEED_CONNECTIONS_MIN = 64;
const SEED_CONNECTIONS_MAX = 5000; // set to 0 for no cap
const SEED_CONNECTION_RATE_MIN = 64;
const SEED_CONNECTION_RATE_MAX = 5000; // set to 0 for no cap
const EVENT_SEND_STRATEGY = "pipelined";
const EVENT_INFLIGHT = 32;
const EVENT_ACK_TIMEOUT_SECONDS = 30;
const SEED_SEND_STRATEGY = "pipelined";
const SEED_INFLIGHT = 128;
const SEED_ACK_TIMEOUT_SECONDS = 20;
const PHASE_PREP_OFFSET_MINUTES = 3;
const DEFAULTS = { const DEFAULTS = {
datacenter: "fsn1-dc14", datacenter: "fsn1-dc14",
@@ -39,7 +59,6 @@ const DEFAULTS = {
clientType: "cpx31", clientType: "cpx31",
imageBase: "ubuntu-24.04", imageBase: "ubuntu-24.04",
clients: 3, clients: 3,
runs: 5,
targets: DEFAULT_TARGETS, targets: DEFAULT_TARGETS,
historyFile: "bench/history.jsonl", historyFile: "bench/history.jsonl",
artifactsDir: "bench/cloud_artifacts", artifactsDir: "bench/cloud_artifacts",
@@ -55,20 +74,20 @@ const DEFAULTS = {
quick: false, quick: false,
monitoring: true, monitoring: true,
yes: false, yes: false,
warmEvents: 25000, warmEvents: 50000,
hotEvents: 250000, hotEvents: 250000,
bench: { bench: {
connectCount: 3000, connectCount: 50000,
connectRate: 1500, connectRate: 10000,
echoCount: 3000, echoCount: 50000,
echoRate: 1500, echoRate: 10000,
echoSize: 512, echoSize: 512,
eventCount: 5000, eventCount: 50000,
eventRate: 2000, eventRate: 10000,
reqCount: 3000, reqCount: 50000,
reqRate: 1500, reqRate: 10000,
reqLimit: 50, reqLimit: 50,
keepaliveSeconds: 10, keepaliveSeconds: 120,
threads: 0, threads: 0,
}, },
}; };
@@ -88,7 +107,6 @@ Options:
--client-type <name> (default: ${DEFAULTS.clientType}) --client-type <name> (default: ${DEFAULTS.clientType})
--image-base <name> (default: ${DEFAULTS.imageBase}) --image-base <name> (default: ${DEFAULTS.imageBase})
--clients <n> (default: ${DEFAULTS.clients}) --clients <n> (default: ${DEFAULTS.clients})
--runs <n> (default: ${DEFAULTS.runs})
--targets <csv> (default: ${DEFAULT_TARGETS.join(",")}) --targets <csv> (default: ${DEFAULT_TARGETS.join(",")})
Source selection (choose one style): Source selection (choose one style):
@@ -136,7 +154,7 @@ Notes:
- In interactive terminals, prompts you to pick + confirm the datacenter unless --yes is set. - In interactive terminals, prompts you to pick + confirm the datacenter unless --yes is set.
- Caches built nostr-bench at _build/bench/nostr-bench and reuses it when valid. - Caches built nostr-bench at _build/bench/nostr-bench and reuses it when valid.
- Auto-tunes Postgres/Redis/app pool sizing from server RAM + CPU for DB-backed targets. - Auto-tunes Postgres/Redis/app pool sizing from server RAM + CPU for DB-backed targets.
- Randomizes target order per run and wipes persisted target data directories on each start. - Randomizes target order and wipes persisted target data directories on each start.
- Creates a Hetzner Cloud firewall restricting inbound access to benchmark ports from known IPs only. - Creates a Hetzner Cloud firewall restricting inbound access to benchmark ports from known IPs only.
- Handles Ctrl-C / SIGTERM with best-effort cloud cleanup. - Handles Ctrl-C / SIGTERM with best-effort cloud cleanup.
- Tries nix .#nostrBenchStaticX86_64Musl first; falls back to docker-built portable nostr-bench. - Tries nix .#nostrBenchStaticX86_64Musl first; falls back to docker-built portable nostr-bench.
@@ -186,9 +204,6 @@ function parseArgs(argv) {
case "--clients": case "--clients":
opts.clients = intOpt(arg, argv[++i]); opts.clients = intOpt(arg, argv[++i]);
break; break;
case "--runs":
opts.runs = intOpt(arg, argv[++i]);
break;
case "--targets": case "--targets":
opts.targets = argv[++i] opts.targets = argv[++i]
.split(",") .split(",")
@@ -454,6 +469,73 @@ function formatEuro(value) {
return `${value.toFixed(4)}`; return `${value.toFixed(4)}`;
} }
function createPhaseLogger(prepOffsetMinutes = PHASE_PREP_OFFSET_MINUTES) {
let phaseZeroMs;
const prefix = () => {
if (!Number.isFinite(phaseZeroMs)) {
return null;
}
const elapsedMinutes = Math.floor((Date.now() - phaseZeroMs) / 60000);
const sign = elapsedMinutes >= 0 ? "+" : "";
return `T${sign}${elapsedMinutes}m`;
};
return {
getPrefix() {
return prefix();
},
setPrepOffsetNow() {
phaseZeroMs = Date.now() + prepOffsetMinutes * 60_000;
},
setZeroNow() {
phaseZeroMs = Date.now();
},
logPhase(message) {
console.log(message);
},
};
}
function installBracketedLogPrefix(getPrefix) {
const methods = ["log", "warn", "error"];
const originals = new Map();
for (const method of methods) {
const original = console[method].bind(console);
originals.set(method, original);
console[method] = (...args) => {
if (
args.length > 0
&& typeof args[0] === "string"
&& /^\s*\[[^\]]+\]/.test(args[0])
) {
const prefix = getPrefix();
if (prefix) {
original(`${prefix} ${args[0]}`, ...args.slice(1));
return;
}
}
original(...args);
};
}
return () => {
for (const method of methods) {
const original = originals.get(method);
if (original) {
console[method] = original;
}
}
};
}
function compatibleDatacenterChoices(datacenters, serverType, clientType, clientCount) { function compatibleDatacenterChoices(datacenters, serverType, clientType, clientCount) {
const compatible = []; const compatible = [];
@@ -646,6 +728,32 @@ async function buildNostrBenchBinary(tmpDir) {
fs.mkdirSync(cacheDir, { recursive: true }); fs.mkdirSync(cacheDir, { recursive: true });
if (!fs.existsSync(path.join(NOSTR_BENCH_SUBMODULE_DIR, "Cargo.toml"))) {
throw new Error(
`nostr-bench source not found at ${NOSTR_BENCH_SUBMODULE_DIR}. Run: git submodule update --init --recursive nix/nostr-bench`,
);
}
const resolveSourceFingerprint = async () => {
let revision = "unknown";
let dirty = false;
try {
revision = (await runCommand("git", ["-C", NOSTR_BENCH_SUBMODULE_DIR, "rev-parse", "HEAD"])).stdout.trim();
dirty = (await runCommand("git", ["-C", NOSTR_BENCH_SUBMODULE_DIR, "status", "--porcelain"])).stdout.trim().length > 0;
} catch {
// Fallback for non-git checkouts of the submodule source.
const lockPath = path.join(NOSTR_BENCH_SUBMODULE_DIR, "Cargo.lock");
const lockMtime = fs.existsSync(lockPath) ? fs.statSync(lockPath).mtimeMs : 0;
revision = `mtime-${Math.trunc(lockMtime)}`;
dirty = false;
}
return dirty ? `${revision}-dirty` : revision;
};
const sourceFingerprint = await resolveSourceFingerprint();
const staticLinked = (fileOutput) => fileOutput.includes("statically linked") || fileOutput.includes("static-pie linked"); const staticLinked = (fileOutput) => fileOutput.includes("statically linked") || fileOutput.includes("static-pie linked");
const binaryLooksPortable = (fileOutput) => const binaryLooksPortable = (fileOutput) =>
@@ -698,12 +806,24 @@ async function buildNostrBenchBinary(tmpDir) {
return null; return null;
} }
const metadata = readCacheMetadata();
if (!metadata?.source_fingerprint) {
console.log("[local] cached nostr-bench has no source fingerprint, rebuilding");
return null;
}
if (metadata.source_fingerprint !== sourceFingerprint) {
console.log(
`[local] nostr-bench source changed (${metadata.source_fingerprint} -> ${sourceFingerprint}), rebuilding cache`,
);
return null;
}
try { try {
const fileSummary = await validatePortableBinary(cachedBinaryPath); const fileSummary = await validatePortableBinary(cachedBinaryPath);
fs.chmodSync(cachedBinaryPath, 0o755); fs.chmodSync(cachedBinaryPath, 0o755);
const version = await readVersionIfRunnable(cachedBinaryPath, fileSummary, "cache-reuse"); const version = await readVersionIfRunnable(cachedBinaryPath, fileSummary, "cache-reuse");
const metadata = readCacheMetadata();
console.log(`[local] reusing cached nostr-bench: ${cachedBinaryPath}`); console.log(`[local] reusing cached nostr-bench: ${cachedBinaryPath}`);
if (metadata?.build_mode) { if (metadata?.build_mode) {
@@ -734,6 +854,8 @@ async function buildNostrBenchBinary(tmpDir) {
writeCacheMetadata({ writeCacheMetadata({
build_mode: buildMode, build_mode: buildMode,
built_at: new Date().toISOString(), built_at: new Date().toISOString(),
source_fingerprint: sourceFingerprint,
source_path: path.relative(ROOT_DIR, NOSTR_BENCH_SUBMODULE_DIR),
binary_path: cachedBinaryPath, binary_path: cachedBinaryPath,
file_summary: copiedFileOut.stdout.trim(), file_summary: copiedFileOut.stdout.trim(),
version, version,
@@ -775,10 +897,8 @@ async function buildNostrBenchBinary(tmpDir) {
} }
const srcDir = path.join(tmpDir, "nostr-bench-src"); const srcDir = path.join(tmpDir, "nostr-bench-src");
console.log("[local] cloning nostr-bench source for docker fallback..."); console.log(`[local] preparing nostr-bench source from ${path.relative(ROOT_DIR, NOSTR_BENCH_SUBMODULE_DIR)} for docker fallback...`);
await runCommand("git", ["clone", "--depth", "1", "https://github.com/rnostr/nostr-bench.git", srcDir], { fs.cpSync(NOSTR_BENCH_SUBMODULE_DIR, srcDir, { recursive: true });
stdio: "inherit",
});
const binaryPath = path.join(srcDir, "target", "release", "nostr-bench"); const binaryPath = path.join(srcDir, "target", "release", "nostr-bench");
@@ -873,6 +993,47 @@ function splitCountAcrossClients(total, clients) {
return Array.from({ length: clients }, (_, i) => base + (i < remainder ? 1 : 0)); return Array.from({ length: clients }, (_, i) => base + (i < remainder ? 1 : 0));
} }
function computeSeedConnectionPlan(desiredAccepted) {
const desired = Math.max(1, Number(desiredAccepted) || 1);
let connectionCount = Math.max(
SEED_CONNECTIONS_MIN,
Math.ceil(desired / SEED_EVENTS_PER_CONNECTION_TARGET),
);
if (SEED_CONNECTIONS_MAX > 0) {
connectionCount = Math.min(SEED_CONNECTIONS_MAX, connectionCount);
}
let connectionRate = Math.max(SEED_CONNECTION_RATE_MIN, connectionCount);
if (SEED_CONNECTION_RATE_MAX > 0) {
connectionRate = Math.min(SEED_CONNECTION_RATE_MAX, connectionRate);
}
return { connectionCount, connectionRate };
}
function extractSeedAccepted(parsed) {
const finalAccepted = Number(parsed?.seed_final?.accepted);
if (Number.isFinite(finalAccepted) && finalAccepted >= 0) {
return Math.floor(finalAccepted);
}
const sectionAccepted = Number(parsed?.seed?.message_stats?.accepted);
if (Number.isFinite(sectionAccepted) && sectionAccepted >= 0) {
return Math.floor(sectionAccepted);
}
return 0;
}
function extractSeedTargetReached(parsed, desiredAccepted) {
if (typeof parsed?.seed_final?.target_reached === "boolean") {
return parsed.seed_final.target_reached;
}
return extractSeedAccepted(parsed) >= desiredAccepted;
}
async function runClientSeedingRound({ async function runClientSeedingRound({
target, target,
phase, phase,
@@ -892,34 +1053,36 @@ async function runClientSeedingRound({
const seedResults = await Promise.all( const seedResults = await Promise.all(
clientsForRound.map(async (client, idx) => { clientsForRound.map(async (client, idx) => {
const desiredEvents = shares[idx] || 0; const desiredAccepted = shares[idx] || 0;
const stdoutPath = path.join(roundDir, `${client.name}.stdout.log`); const stdoutPath = path.join(roundDir, `${client.name}.stdout.log`);
const stderrPath = path.join(roundDir, `${client.name}.stderr.log`); const stderrPath = path.join(roundDir, `${client.name}.stderr.log`);
if (desiredEvents <= 0) { if (desiredAccepted <= 0) {
fs.writeFileSync(stdoutPath, "", "utf8"); fs.writeFileSync(stdoutPath, "", "utf8");
fs.writeFileSync(stderrPath, "", "utf8"); fs.writeFileSync(stderrPath, "", "utf8");
return { return {
client_name: client.name, client_name: client.name,
client_ip: client.ip, client_ip: client.ip,
status: "skipped", status: "skipped",
desired_events: desiredEvents, desired_accepted: desiredAccepted,
projected_events: 0, accepted: 0,
acked: 0, target_reached: true,
stdout_path: path.relative(ROOT_DIR, stdoutPath), stdout_path: path.relative(ROOT_DIR, stdoutPath),
stderr_path: path.relative(ROOT_DIR, stderrPath), stderr_path: path.relative(ROOT_DIR, stderrPath),
}; };
} }
const eventConnections = 1; const { connectionCount: seedConnectionCount, connectionRate: seedConnectionRate } =
const eventKeepalive = Math.max(5, Math.ceil(desiredEvents / SEED_EVENT_RATE)); computeSeedConnectionPlan(desiredAccepted);
const eventRate = Math.max(1, Math.ceil(desiredEvents / eventKeepalive));
const projectedEvents = eventConnections * eventRate * eventKeepalive;
const seedEnvPrefix = [ const seedEnvPrefix = [
`PARRHESIA_BENCH_EVENT_COUNT=${eventConnections}`, `PARRHESIA_BENCH_SEED_TARGET_ACCEPTED=${desiredAccepted}`,
`PARRHESIA_BENCH_EVENT_RATE=${eventRate}`, `PARRHESIA_BENCH_SEED_CONNECTION_COUNT=${seedConnectionCount}`,
`PARRHESIA_BENCH_KEEPALIVE_SECONDS=${eventKeepalive}`, `PARRHESIA_BENCH_SEED_CONNECTION_RATE=${seedConnectionRate}`,
`PARRHESIA_BENCH_SEED_KEEPALIVE_SECONDS=${SEED_KEEPALIVE_SECONDS}`,
`PARRHESIA_BENCH_SEED_SEND_STRATEGY=${SEED_SEND_STRATEGY}`,
`PARRHESIA_BENCH_SEED_INFLIGHT=${SEED_INFLIGHT}`,
`PARRHESIA_BENCH_SEED_ACK_TIMEOUT=${SEED_ACK_TIMEOUT_SECONDS}`,
`PARRHESIA_BENCH_THREADS=${benchThreads}`, `PARRHESIA_BENCH_THREADS=${benchThreads}`,
].join(" "); ].join(" ");
@@ -927,25 +1090,29 @@ async function runClientSeedingRound({
const benchRes = await sshExec( const benchRes = await sshExec(
client.ip, client.ip,
keyPath, keyPath,
`${seedEnvPrefix} /root/cloud-bench-client.sh ${shellEscape(relayUrl)} event`, `${seedEnvPrefix} /root/cloud-bench-client.sh ${shellEscape(relayUrl)} seed`,
); );
fs.writeFileSync(stdoutPath, benchRes.stdout, "utf8"); fs.writeFileSync(stdoutPath, benchRes.stdout, "utf8");
fs.writeFileSync(stderrPath, benchRes.stderr, "utf8"); fs.writeFileSync(stderrPath, benchRes.stderr, "utf8");
const parsed = parseNostrBenchSections(benchRes.stdout); const parsed = parseNostrBenchSections(benchRes.stdout);
const acked = Number(parsed?.event?.message_stats?.complete) || 0; const accepted = extractSeedAccepted(parsed);
const targetReached = extractSeedTargetReached(parsed, desiredAccepted);
return { return {
client_name: client.name, client_name: client.name,
client_ip: client.ip, client_ip: client.ip,
status: "ok", status: targetReached ? "ok" : "partial",
desired_events: desiredEvents, desired_accepted: desiredAccepted,
projected_events: projectedEvents, accepted,
event_connections: eventConnections, target_reached: targetReached,
event_rate: eventRate, seed_connection_count: seedConnectionCount,
event_keepalive_seconds: eventKeepalive, seed_connection_rate: seedConnectionRate,
acked, seed_keepalive_seconds: SEED_KEEPALIVE_SECONDS,
seed_send_strategy: SEED_SEND_STRATEGY,
seed_inflight: SEED_INFLIGHT,
seed_ack_timeout_seconds: SEED_ACK_TIMEOUT_SECONDS,
stdout_path: path.relative(ROOT_DIR, stdoutPath), stdout_path: path.relative(ROOT_DIR, stdoutPath),
stderr_path: path.relative(ROOT_DIR, stderrPath), stderr_path: path.relative(ROOT_DIR, stderrPath),
}; };
@@ -955,16 +1122,25 @@ async function runClientSeedingRound({
fs.writeFileSync(stdoutPath, out, "utf8"); fs.writeFileSync(stdoutPath, out, "utf8");
fs.writeFileSync(stderrPath, err, "utf8"); fs.writeFileSync(stderrPath, err, "utf8");
const parsed = parseNostrBenchSections(out);
const accepted = extractSeedAccepted(parsed);
const targetReached = extractSeedTargetReached(parsed, desiredAccepted);
const targetNotReached = Number(error.code) === 2;
return { return {
client_name: client.name, client_name: client.name,
client_ip: client.ip, client_ip: client.ip,
status: "error", status: targetNotReached ? "partial" : "error",
desired_events: desiredEvents, desired_accepted: desiredAccepted,
projected_events: projectedEvents, accepted,
event_connections: eventConnections, target_reached: targetReached,
event_rate: eventRate, seed_connection_count: seedConnectionCount,
event_keepalive_seconds: eventKeepalive, seed_connection_rate: seedConnectionRate,
acked: 0, seed_keepalive_seconds: SEED_KEEPALIVE_SECONDS,
seed_send_strategy: SEED_SEND_STRATEGY,
seed_inflight: SEED_INFLIGHT,
seed_ack_timeout_seconds: SEED_ACK_TIMEOUT_SECONDS,
exit_code: Number(error.code) || null,
stdout_path: path.relative(ROOT_DIR, stdoutPath), stdout_path: path.relative(ROOT_DIR, stdoutPath),
stderr_path: path.relative(ROOT_DIR, stderrPath), stderr_path: path.relative(ROOT_DIR, stderrPath),
error: error.message || String(error), error: error.message || String(error),
@@ -980,20 +1156,47 @@ async function runClientSeedingRound({
); );
} }
const acked = seedResults.reduce((sum, r) => sum + (Number(r.acked) || 0), 0); const accepted = seedResults.reduce((sum, r) => sum + (Number(r.accepted) || 0), 0);
const desired = seedResults.reduce((sum, r) => sum + (Number(r.desired_events) || 0), 0); const desired = seedResults.reduce((sum, r) => sum + (Number(r.desired_accepted) || 0), 0);
const projected = seedResults.reduce((sum, r) => sum + (Number(r.projected_events) || 0), 0);
return { return {
desired, desired,
projected, accepted,
acked,
clients: seedResults, clients: seedResults,
}; };
} }
async function fetchServerEventCount({ target, serverIp, keyPath, serverEnvPrefix }) {
const countableTargets = new Set(["parrhesia-pg", "nostream"]);
if (!countableTargets.has(target)) return null;
const countCmd = `count-data-${target}`;
try {
const res = await sshExec(
serverIp,
keyPath,
`${serverEnvPrefix} /root/cloud-bench-server.sh ${shellEscape(countCmd)}`,
);
const lines = res.stdout
.split(/\r?\n/)
.map((line) => line.trim())
.filter(Boolean);
const numericLine = [...lines].reverse().find((line) => /^\d+$/.test(line));
if (!numericLine) return null;
const count = Number(numericLine);
return Number.isInteger(count) && count >= 0 ? count : null;
} catch (error) {
console.warn(`[fill] ${target}: failed to fetch server event count (${error.message || error})`);
return null;
}
}
// Ensure the relay has approximately `targetCount` events. // Ensure the relay has approximately `targetCount` events.
// Uses client-side nostr-bench event seeding in parallel and accepts <=1% drift. // Uses client-side nostr-bench seed mode and accepts <=1% drift.
async function smartFill({ async function smartFill({
target, target,
phase, phase,
@@ -1006,8 +1209,15 @@ async function smartFill({
serverEnvPrefix, serverEnvPrefix,
artifactDir, artifactDir,
threads, threads,
skipFill,
fetchEventCount,
}) { }) {
if (targetCount <= 0) return { eventsInDb, seeded: 0, wiped: false }; if (targetCount <= 0) return { eventsInDb, seeded: 0, wiped: false, skipped: false };
if (skipFill) {
console.log(`[fill] ${target}:${phase}: skipped (target does not support generic event seeding)`);
return { eventsInDb, seeded: 0, wiped: false, skipped: true };
}
let wiped = false; let wiped = false;
if (eventsInDb > targetCount) { if (eventsInDb > targetCount) {
@@ -1018,6 +1228,13 @@ async function smartFill({
wiped = true; wiped = true;
} }
if (typeof fetchEventCount === "function") {
const authoritative = await fetchEventCount();
if (Number.isInteger(authoritative) && authoritative >= 0) {
eventsInDb = authoritative;
}
}
const tolerance = Math.max(1, Math.floor(targetCount * SEED_TOLERANCE_RATIO)); const tolerance = Math.max(1, Math.floor(targetCount * SEED_TOLERANCE_RATIO));
let deficit = targetCount - eventsInDb; let deficit = targetCount - eventsInDb;
@@ -1025,7 +1242,7 @@ async function smartFill({
console.log( console.log(
`[fill] ${target}: already within tolerance (${eventsInDb}/${targetCount}, tolerance=${tolerance}), skipping`, `[fill] ${target}: already within tolerance (${eventsInDb}/${targetCount}, tolerance=${tolerance}), skipping`,
); );
return { eventsInDb, seeded: 0, wiped }; return { eventsInDb, seeded: 0, wiped, skipped: false };
} }
console.log( console.log(
@@ -1033,16 +1250,21 @@ async function smartFill({
); );
let seededTotal = 0; let seededTotal = 0;
let roundsExecuted = 0;
for (let round = 1; round <= SEED_MAX_ROUNDS; round += 1) { for (let round = 1; round <= SEED_MAX_ROUNDS; round += 1) {
if (deficit <= tolerance) break; if (deficit <= tolerance) break;
roundsExecuted = round;
const roundDeficit = Math.max(1, deficit);
const roundStartMs = Date.now(); const roundStartMs = Date.now();
const eventsBeforeRound = eventsInDb;
const roundResult = await runClientSeedingRound({ const roundResult = await runClientSeedingRound({
target, target,
phase, phase,
round, round,
deficit, deficit: roundDeficit,
clientInfos, clientInfos,
keyPath, keyPath,
relayUrl, relayUrl,
@@ -1050,18 +1272,31 @@ async function smartFill({
threads, threads,
}); });
const elapsedSec = (Date.now() - roundStartMs) / 1000; let observedAdded = roundResult.accepted;
const eventsPerSec = elapsedSec > 0 ? Math.round(roundResult.acked / elapsedSec) : 0;
eventsInDb += roundResult.acked; if (typeof fetchEventCount === "function") {
seededTotal += roundResult.acked; const authoritative = await fetchEventCount();
if (Number.isInteger(authoritative) && authoritative >= 0) {
eventsInDb = authoritative;
observedAdded = Math.max(0, eventsInDb - eventsBeforeRound);
} else {
eventsInDb += roundResult.accepted;
}
} else {
eventsInDb += roundResult.accepted;
}
const elapsedSec = (Date.now() - roundStartMs) / 1000;
const eventsPerSec = elapsedSec > 0 ? Math.round(observedAdded / elapsedSec) : 0;
seededTotal += observedAdded;
deficit = targetCount - eventsInDb; deficit = targetCount - eventsInDb;
console.log( console.log(
`[fill] ${target}:${phase} round ${round}: acked ${roundResult.acked} (desired=${roundResult.desired}, projected=${roundResult.projected}) in ${elapsedSec.toFixed(1)}s (${eventsPerSec} events/s), now ~${eventsInDb}/${targetCount}`, `[fill] ${target}:${phase} round ${round}: observed ${observedAdded} (accepted=${roundResult.accepted}, desired=${roundResult.desired}) in ${elapsedSec.toFixed(1)}s (${eventsPerSec} events/s), now ~${eventsInDb}/${targetCount}`,
); );
if (roundResult.acked <= 0) { if (observedAdded <= 0) {
console.warn(`[fill] ${target}:${phase} round ${round}: no progress, stopping early`); console.warn(`[fill] ${target}:${phase} round ${round}: no progress, stopping early`);
break; break;
} }
@@ -1070,11 +1305,11 @@ async function smartFill({
const remaining = Math.max(0, targetCount - eventsInDb); const remaining = Math.max(0, targetCount - eventsInDb);
if (remaining > tolerance) { if (remaining > tolerance) {
console.warn( console.warn(
`[fill] ${target}:${phase}: remaining deficit ${remaining} exceeds tolerance ${tolerance} after ${SEED_MAX_ROUNDS} rounds`, `[fill] ${target}:${phase}: remaining deficit ${remaining} exceeds tolerance ${tolerance} after ${roundsExecuted} rounds`,
); );
} }
return { eventsInDb, seeded: seededTotal, wiped }; return { eventsInDb, seeded: seededTotal, wiped, skipped: false };
} }
// Run a single benchmark type across all clients in parallel. // Run a single benchmark type across all clients in parallel.
@@ -1361,8 +1596,12 @@ async function main() {
const opts = parseArgs(process.argv.slice(2)); const opts = parseArgs(process.argv.slice(2));
await ensureLocalPrereqs(opts); await ensureLocalPrereqs(opts);
const phaseLogger = createPhaseLogger();
const restoreBracketedLogPrefix = installBracketedLogPrefix(() => phaseLogger.getPrefix());
const datacenterChoice = await chooseDatacenter(opts); const datacenterChoice = await chooseDatacenter(opts);
opts.datacenter = datacenterChoice.name; opts.datacenter = datacenterChoice.name;
phaseLogger.setPrepOffsetNow();
console.log( console.log(
`[plan] selected datacenter=${opts.datacenter} (${ESTIMATE_WINDOW_LABEL} est gross=${formatEuro(datacenterChoice.estimatedTotal.gross)} net=${formatEuro(datacenterChoice.estimatedTotal.net)})`, `[plan] selected datacenter=${opts.datacenter} (${ESTIMATE_WINDOW_LABEL} est gross=${formatEuro(datacenterChoice.estimatedTotal.gross)} net=${formatEuro(datacenterChoice.estimatedTotal.net)})`,
); );
@@ -1394,7 +1633,7 @@ async function main() {
fs.mkdirSync(path.dirname(historyFile), { recursive: true }); fs.mkdirSync(path.dirname(historyFile), { recursive: true });
console.log(`[run] ${runId}`); console.log(`[run] ${runId}`);
console.log("[phase] local preparation"); phaseLogger.logPhase("[phase] local preparation");
const nostrBench = await buildNostrBenchBinary(tmpDir); const nostrBench = await buildNostrBenchBinary(tmpDir);
const needsParrhesia = opts.targets.includes("parrhesia-pg") || opts.targets.includes("parrhesia-memory"); const needsParrhesia = opts.targets.includes("parrhesia-pg") || opts.targets.includes("parrhesia-memory");
@@ -1475,7 +1714,7 @@ async function main() {
}); });
try { try {
console.log("[phase] create ssh credentials"); phaseLogger.logPhase("[phase] create ssh credentials");
await runCommand("ssh-keygen", ["-t", "ed25519", "-N", "", "-f", keyPath, "-C", keyName], { await runCommand("ssh-keygen", ["-t", "ed25519", "-N", "", "-f", keyPath, "-C", keyName], {
stdio: "inherit", stdio: "inherit",
}); });
@@ -1485,7 +1724,9 @@ async function main() {
}); });
sshKeyCreated = true; sshKeyCreated = true;
console.log("[phase] create cloud servers in parallel"); // Start execution clock at T+0 immediately before cloud server creation.
phaseLogger.setZeroNow();
phaseLogger.logPhase("[phase] create cloud servers in parallel");
const serverName = `${runId}-server`; const serverName = `${runId}-server`;
const clientNames = Array.from({ length: opts.clients }, (_, i) => `${runId}-client-${i + 1}`); const clientNames = Array.from({ length: opts.clients }, (_, i) => `${runId}-client-${i + 1}`);
@@ -1555,7 +1796,7 @@ async function main() {
ip: c.server.public_net.ipv4.ip, ip: c.server.public_net.ipv4.ip,
})); }));
console.log("[phase] wait for SSH"); phaseLogger.logPhase("[phase] wait for SSH");
await Promise.all([ await Promise.all([
waitForSsh(serverIp, keyPath), waitForSsh(serverIp, keyPath),
...clientInfos.map((client) => waitForSsh(client.ip, keyPath)), ...clientInfos.map((client) => waitForSsh(client.ip, keyPath)),
@@ -1597,7 +1838,7 @@ async function main() {
} }
console.log(`[firewall] ${firewallName} applied (sources: ${sourceIps.join(", ")})`); console.log(`[firewall] ${firewallName} applied (sources: ${sourceIps.join(", ")})`);
console.log("[phase] install runtime dependencies on server node"); phaseLogger.logPhase("[phase] install runtime dependencies on server node");
const serverInstallCmd = [ const serverInstallCmd = [
"set -euo pipefail", "set -euo pipefail",
"export DEBIAN_FRONTEND=noninteractive", "export DEBIAN_FRONTEND=noninteractive",
@@ -1614,19 +1855,17 @@ async function main() {
await sshExec(serverIp, keyPath, serverInstallCmd, { stdio: "inherit" }); await sshExec(serverIp, keyPath, serverInstallCmd, { stdio: "inherit" });
console.log("[phase] minimal client setup (no apt install)"); phaseLogger.logPhase("[phase] minimal client setup (no apt install)");
const clientBootstrapCmd = [ const clientBootstrapCmd = [
"set -euo pipefail", "set -euo pipefail",
"mkdir -p /usr/local/bin", "mkdir -p /usr/local/bin",
"sysctl -w net.ipv4.ip_local_port_range='10000 65535' >/dev/null || true", "sysctl -w net.ipv4.ip_local_port_range='10000 65535' >/dev/null || true",
"sysctl -w net.core.somaxconn=65535 >/dev/null || true", "sysctl -w net.core.somaxconn=65535 >/dev/null || true",
"bash --version",
"uname -m",
].join("; "); ].join("; ");
await Promise.all(clientInfos.map((client) => sshExec(client.ip, keyPath, clientBootstrapCmd, { stdio: "inherit" }))); await Promise.all(clientInfos.map((client) => sshExec(client.ip, keyPath, clientBootstrapCmd, { stdio: "inherit" })));
console.log("[phase] upload control scripts + nostr-bench binary"); phaseLogger.logPhase("[phase] upload control scripts + nostr-bench binary");
await scpToHost(serverIp, keyPath, localServerScriptPath, "/root/cloud-bench-server.sh"); await scpToHost(serverIp, keyPath, localServerScriptPath, "/root/cloud-bench-server.sh");
await sshExec(serverIp, keyPath, "chmod +x /root/cloud-bench-server.sh"); await sshExec(serverIp, keyPath, "chmod +x /root/cloud-bench-server.sh");
@@ -1637,7 +1876,7 @@ async function main() {
await sshExec(client.ip, keyPath, "chmod +x /root/cloud-bench-client.sh /usr/local/bin/nostr-bench"); await sshExec(client.ip, keyPath, "chmod +x /root/cloud-bench-client.sh /usr/local/bin/nostr-bench");
} }
console.log("[phase] server image setup"); phaseLogger.logPhase("[phase] server image setup");
let parrhesiaImageOnServer = parrhesiaSource.image; let parrhesiaImageOnServer = parrhesiaSource.image;
@@ -1724,17 +1963,14 @@ async function main() {
}; };
const results = []; const results = [];
const targetOrderPerRun = []; const targetOrder = shuffled(opts.targets);
console.log(`[phase] benchmark execution (mode=${opts.quick ? "quick" : "phased"})`); phaseLogger.logPhase(`[phase] benchmark execution (mode=${opts.quick ? "quick" : "phased"})`);
for (let runIndex = 1; runIndex <= opts.runs; runIndex += 1) { console.log(`[bench] target-order=${targetOrder.join(",")}`);
const runTargets = shuffled(opts.targets);
targetOrderPerRun.push({ run: runIndex, targets: runTargets });
console.log(`[bench] run ${runIndex}/${opts.runs} target-order=${runTargets.join(",")}`);
for (const target of runTargets) { for (const target of targetOrder) {
console.log(`[bench] run ${runIndex}/${opts.runs} target=${target}`); console.log(`[bench] target=${target}`);
const targetStartTime = new Date().toISOString(); const targetStartTime = new Date().toISOString();
const serverEnvPrefix = [ const serverEnvPrefix = [
@@ -1752,7 +1988,7 @@ async function main() {
try { try {
await sshExec(serverIp, keyPath, `${serverEnvPrefix} /root/cloud-bench-server.sh ${shellEscape(startCommands[target])}`); await sshExec(serverIp, keyPath, `${serverEnvPrefix} /root/cloud-bench-server.sh ${shellEscape(startCommands[target])}`);
} catch (error) { } catch (error) {
console.error(`[bench] target startup failed target=${target} run=${runIndex}`); console.error(`[bench] target startup failed target=${target}`);
if (error?.stdout?.trim()) { if (error?.stdout?.trim()) {
console.error(`[bench] server startup stdout:\n${error.stdout.trim()}`); console.error(`[bench] server startup stdout:\n${error.stdout.trim()}`);
} }
@@ -1763,7 +1999,7 @@ async function main() {
} }
const relayUrl = relayUrls[target]; const relayUrl = relayUrls[target];
const runTargetDir = path.join(artifactsDir, target, `run-${runIndex}`); const runTargetDir = path.join(artifactsDir, target);
fs.mkdirSync(runTargetDir, { recursive: true }); fs.mkdirSync(runTargetDir, { recursive: true });
const benchEnvPrefix = [ const benchEnvPrefix = [
@@ -1774,6 +2010,9 @@ async function main() {
`PARRHESIA_BENCH_ECHO_SIZE=${opts.bench.echoSize}`, `PARRHESIA_BENCH_ECHO_SIZE=${opts.bench.echoSize}`,
`PARRHESIA_BENCH_EVENT_COUNT=${opts.bench.eventCount}`, `PARRHESIA_BENCH_EVENT_COUNT=${opts.bench.eventCount}`,
`PARRHESIA_BENCH_EVENT_RATE=${opts.bench.eventRate}`, `PARRHESIA_BENCH_EVENT_RATE=${opts.bench.eventRate}`,
`PARRHESIA_BENCH_EVENT_SEND_STRATEGY=${EVENT_SEND_STRATEGY}`,
`PARRHESIA_BENCH_EVENT_INFLIGHT=${EVENT_INFLIGHT}`,
`PARRHESIA_BENCH_EVENT_ACK_TIMEOUT=${EVENT_ACK_TIMEOUT_SECONDS}`,
`PARRHESIA_BENCH_REQ_COUNT=${opts.bench.reqCount}`, `PARRHESIA_BENCH_REQ_COUNT=${opts.bench.reqCount}`,
`PARRHESIA_BENCH_REQ_RATE=${opts.bench.reqRate}`, `PARRHESIA_BENCH_REQ_RATE=${opts.bench.reqRate}`,
`PARRHESIA_BENCH_REQ_LIMIT=${opts.bench.reqLimit}`, `PARRHESIA_BENCH_REQ_LIMIT=${opts.bench.reqLimit}`,
@@ -1782,9 +2021,16 @@ async function main() {
].join(" "); ].join(" ");
const benchArgs = { clientInfos, keyPath, benchEnvPrefix, relayUrl }; const benchArgs = { clientInfos, keyPath, benchEnvPrefix, relayUrl };
const fetchEventCountForTarget = async () =>
fetchServerEventCount({
target,
serverIp,
keyPath,
serverEnvPrefix,
});
if (opts.quick) { if (opts.quick) {
// Flat mode: run all benchmarks in one shot (backward compat) // Flat mode: run all benchmarks in one shot
const clientRunResults = await runSingleBenchmark({ const clientRunResults = await runSingleBenchmark({
...benchArgs, ...benchArgs,
mode: "all", mode: "all",
@@ -1792,14 +2038,13 @@ async function main() {
}); });
results.push({ results.push({
run: runIndex,
target, target,
relay_url: relayUrl, relay_url: relayUrl,
mode: "flat", mode: "flat",
clients: clientRunResults, clients: clientRunResults,
}); });
} else { } else {
// Phased mode: separate benchmarks at different DB fill levels // Phased mode: one sequence per target (seed each target DB only once)
let eventsInDb = 0; let eventsInDb = 0;
console.log(`[bench] ${target}: connect`); console.log(`[bench] ${target}: connect`);
@@ -1816,22 +2061,29 @@ async function main() {
artifactDir: path.join(runTargetDir, "echo"), artifactDir: path.join(runTargetDir, "echo"),
}); });
// Phase: empty // Phase: cold
console.log(`[bench] ${target}: req (empty, ${eventsInDb} events)`); console.log(`[bench] ${target}: req (cold, ${eventsInDb} events)`);
const emptyReqResults = await runSingleBenchmark({ const coldReqResults = await runSingleBenchmark({
...benchArgs, ...benchArgs,
mode: "req", mode: "req",
artifactDir: path.join(runTargetDir, "empty-req"), artifactDir: path.join(runTargetDir, "cold-req"),
}); });
console.log(`[bench] ${target}: event (empty, ${eventsInDb} events)`); console.log(`[bench] ${target}: event (cold, ${eventsInDb} events)`);
const emptyEventResults = await runSingleBenchmark({ const coldEventResults = await runSingleBenchmark({
...benchArgs, ...benchArgs,
mode: "event", mode: "event",
artifactDir: path.join(runTargetDir, "empty-event"), artifactDir: path.join(runTargetDir, "cold-event"),
}); });
eventsInDb += countEventsWritten(emptyEventResults); const estimatedColdEventWritten = countEventsWritten(coldEventResults);
console.log(`[bench] ${target}: ~${eventsInDb} events in DB after empty phase`); eventsInDb += estimatedColdEventWritten;
const authoritativeAfterCold = await fetchEventCountForTarget();
if (Number.isInteger(authoritativeAfterCold) && authoritativeAfterCold >= 0) {
eventsInDb = authoritativeAfterCold;
}
console.log(`[bench] ${target}: ~${eventsInDb} events in DB after cold phase`);
// Fill to warm // Fill to warm
const fillWarm = await smartFill({ const fillWarm = await smartFill({
@@ -1846,6 +2098,8 @@ async function main() {
serverEnvPrefix, serverEnvPrefix,
artifactDir: path.join(runTargetDir, "fill-warm"), artifactDir: path.join(runTargetDir, "fill-warm"),
threads: opts.bench.threads, threads: opts.bench.threads,
skipFill: target === "haven",
fetchEventCount: fetchEventCountForTarget,
}); });
eventsInDb = fillWarm.eventsInDb; eventsInDb = fillWarm.eventsInDb;
@@ -1863,7 +2117,13 @@ async function main() {
mode: "event", mode: "event",
artifactDir: path.join(runTargetDir, "warm-event"), artifactDir: path.join(runTargetDir, "warm-event"),
}); });
eventsInDb += countEventsWritten(warmEventResults); const estimatedWarmEventWritten = countEventsWritten(warmEventResults);
eventsInDb += estimatedWarmEventWritten;
const authoritativeAfterWarmEvent = await fetchEventCountForTarget();
if (Number.isInteger(authoritativeAfterWarmEvent) && authoritativeAfterWarmEvent >= 0) {
eventsInDb = authoritativeAfterWarmEvent;
}
// Fill to hot // Fill to hot
const fillHot = await smartFill({ const fillHot = await smartFill({
@@ -1878,6 +2138,8 @@ async function main() {
serverEnvPrefix, serverEnvPrefix,
artifactDir: path.join(runTargetDir, "fill-hot"), artifactDir: path.join(runTargetDir, "fill-hot"),
threads: opts.bench.threads, threads: opts.bench.threads,
skipFill: target === "haven",
fetchEventCount: fetchEventCountForTarget,
}); });
eventsInDb = fillHot.eventsInDb; eventsInDb = fillHot.eventsInDb;
@@ -1897,16 +2159,15 @@ async function main() {
}); });
results.push({ results.push({
run: runIndex,
target, target,
relay_url: relayUrl, relay_url: relayUrl,
mode: "phased", mode: "phased",
phases: { phases: {
connect: { clients: connectResults }, connect: { clients: connectResults },
echo: { clients: echoResults }, echo: { clients: echoResults },
empty: { cold: {
req: { clients: emptyReqResults }, req: { clients: coldReqResults },
event: { clients: emptyEventResults }, event: { clients: coldEventResults },
db_events_before: 0, db_events_before: 0,
}, },
warm: { warm: {
@@ -1941,7 +2202,6 @@ async function main() {
} }
} }
} }
}
const gitTag = detectedGitTag || "untagged"; const gitTag = detectedGitTag || "untagged";
const gitCommit = parrhesiaSource.gitCommit || detectedGitCommit || "unknown"; const gitCommit = parrhesiaSource.gitCommit || detectedGitCommit || "unknown";
@@ -1965,7 +2225,7 @@ async function main() {
}); });
} }
console.log("[phase] final server cleanup (containers)"); phaseLogger.logPhase("[phase] final server cleanup (containers)");
await sshExec(serverIp, keyPath, "/root/cloud-bench-server.sh cleanup"); await sshExec(serverIp, keyPath, "/root/cloud-bench-server.sh cleanup");
const servers = summariseServersFromResults(results); const servers = summariseServersFromResults(results);
@@ -1977,7 +2237,6 @@ async function main() {
machine_id: os.hostname(), machine_id: os.hostname(),
git_tag: gitTag, git_tag: gitTag,
git_commit: gitCommit, git_commit: gitCommit,
runs: opts.runs,
source: { source: {
kind: "cloud", kind: "cloud",
mode: parrhesiaSource.mode, mode: parrhesiaSource.mode,
@@ -2001,9 +2260,8 @@ async function main() {
}, },
}, },
bench: { bench: {
runs: opts.runs,
targets: opts.targets, targets: opts.targets,
target_order_per_run: targetOrderPerRun, target_order: targetOrder,
mode: opts.quick ? "flat" : "phased", mode: opts.quick ? "flat" : "phased",
warm_events: opts.warmEvents, warm_events: opts.warmEvents,
hot_events: opts.hotEvents, hot_events: opts.hotEvents,
@@ -2029,6 +2287,7 @@ async function main() {
console.log(`[done] ssh key kept: ${keyName}`); console.log(`[done] ssh key kept: ${keyName}`);
} }
} finally { } finally {
restoreBracketedLogPrefix();
removeSignalHandlers(); removeSignalHandlers();
await cleanup(); await cleanup();
} }

View File

@@ -10,7 +10,7 @@ export function parseNostrBenchSections(output) {
for (const lineRaw of lines) { for (const lineRaw of lines) {
const line = lineRaw.trim(); const line = lineRaw.trim();
const header = line.match(/^==>\s+nostr-bench\s+(connect|echo|event|req)\s+/); const header = line.match(/^==>\s+nostr-bench\s+(connect|echo|event|req|seed)\s+/);
if (header) { if (header) {
section = header[1]; section = header[1];
continue; continue;
@@ -20,9 +20,24 @@ export function parseNostrBenchSections(output) {
try { try {
const json = JSON.parse(line); const json = JSON.parse(line);
if (section) { if (!section) continue;
parsed[section] = json;
if (section === "seed" && json?.type === "seed_final") {
parsed.seed_final = json;
continue;
} }
const existing = parsed[section];
if (!existing) {
parsed[section] = json;
continue;
}
if (existing?.type === "final" && json?.type !== "final") {
continue;
}
parsed[section] = json;
} catch { } catch {
// ignore noisy non-json lines // ignore noisy non-json lines
} }
@@ -43,14 +58,21 @@ export function sum(values) {
return valid.reduce((a, b) => a + b, 0); return valid.reduce((a, b) => a + b, 0);
} }
export function throughputFromSection(section) { export function throughputFromSection(section, options = {}) {
const { preferAccepted = false } = options;
const elapsedMs = Number(section?.elapsed ?? NaN); const elapsedMs = Number(section?.elapsed ?? NaN);
const accepted = Number(section?.message_stats?.accepted ?? NaN);
const complete = Number(section?.message_stats?.complete ?? NaN); const complete = Number(section?.message_stats?.complete ?? NaN);
const effectiveCount =
preferAccepted && Number.isFinite(accepted)
? accepted
: complete;
const totalBytes = Number(section?.message_stats?.size ?? NaN); const totalBytes = Number(section?.message_stats?.size ?? NaN);
const cumulativeTps = const cumulativeTps =
Number.isFinite(elapsedMs) && elapsedMs > 0 && Number.isFinite(complete) Number.isFinite(elapsedMs) && elapsedMs > 0 && Number.isFinite(effectiveCount)
? complete / (elapsedMs / 1000) ? effectiveCount / (elapsedMs / 1000)
: NaN; : NaN;
const cumulativeMibs = const cumulativeMibs =
@@ -58,7 +80,11 @@ export function throughputFromSection(section) {
? totalBytes / (1024 * 1024) / (elapsedMs / 1000) ? totalBytes / (1024 * 1024) / (elapsedMs / 1000)
: NaN; : NaN;
const sampleTps = Number(section?.tps ?? NaN); const sampleTps = Number(
preferAccepted
? section?.accepted_tps ?? section?.tps
: section?.tps,
);
const sampleMibs = Number(section?.size ?? NaN); const sampleMibs = Number(section?.size ?? NaN);
return { return {
@@ -67,10 +93,16 @@ export function throughputFromSection(section) {
}; };
} }
function messageCounter(section, field) {
const value = Number(section?.message_stats?.[field]);
return Number.isFinite(value) ? value : 0;
}
export function metricFromSections(sections) { export function metricFromSections(sections) {
const connect = sections?.connect?.connect_stats?.success_time || {}; const connect = sections?.connect?.connect_stats?.success_time || {};
const echo = throughputFromSection(sections?.echo || {}); const echo = throughputFromSection(sections?.echo || {});
const event = throughputFromSection(sections?.event || {}); const eventSection = sections?.event || {};
const event = throughputFromSection(eventSection, { preferAccepted: true });
const req = throughputFromSection(sections?.req || {}); const req = throughputFromSection(sections?.req || {});
return { return {
@@ -80,6 +112,10 @@ export function metricFromSections(sections) {
echo_mibs: echo.mibs, echo_mibs: echo.mibs,
event_tps: event.tps, event_tps: event.tps,
event_mibs: event.mibs, event_mibs: event.mibs,
event_notice: messageCounter(eventSection, "notice"),
event_auth_challenge: messageCounter(eventSection, "auth_challenge"),
event_reply_unrecognized: messageCounter(eventSection, "reply_unrecognized"),
event_ack_timeout: messageCounter(eventSection, "ack_timeout"),
req_tps: req.tps, req_tps: req.tps,
req_mibs: req.mibs, req_mibs: req.mibs,
}; };
@@ -109,6 +145,10 @@ export function summariseFlatResults(results) {
echo_mibs: sum(clientSamples.map((s) => s.echo_mibs)), echo_mibs: sum(clientSamples.map((s) => s.echo_mibs)),
event_tps: sum(clientSamples.map((s) => s.event_tps)), event_tps: sum(clientSamples.map((s) => s.event_tps)),
event_mibs: sum(clientSamples.map((s) => s.event_mibs)), event_mibs: sum(clientSamples.map((s) => s.event_mibs)),
event_notice: sum(clientSamples.map((s) => s.event_notice)),
event_auth_challenge: sum(clientSamples.map((s) => s.event_auth_challenge)),
event_reply_unrecognized: sum(clientSamples.map((s) => s.event_reply_unrecognized)),
event_ack_timeout: sum(clientSamples.map((s) => s.event_ack_timeout)),
req_tps: sum(clientSamples.map((s) => s.req_tps)), req_tps: sum(clientSamples.map((s) => s.req_tps)),
req_mibs: sum(clientSamples.map((s) => s.req_mibs)), req_mibs: sum(clientSamples.map((s) => s.req_mibs)),
}); });
@@ -121,6 +161,10 @@ export function summariseFlatResults(results) {
"echo_mibs", "echo_mibs",
"event_tps", "event_tps",
"event_mibs", "event_mibs",
"event_notice",
"event_auth_challenge",
"event_reply_unrecognized",
"event_ack_timeout",
"req_tps", "req_tps",
"req_mibs", "req_mibs",
]; ];
@@ -166,8 +210,8 @@ export function summarisePhasedResults(results) {
} }
// Per-level req and event metrics // Per-level req and event metrics
for (const level of ["empty", "warm", "hot"]) { for (const level of ["cold", "warm", "hot"]) {
const phase = phases[level]; const phase = phases[level] || (level === "cold" ? phases.empty : undefined);
if (!phase) continue; if (!phase) continue;
const reqClients = (phase.req?.clients || []) const reqClients = (phase.req?.clients || [])
@@ -184,6 +228,16 @@ export function summarisePhasedResults(results) {
if (eventClients.length > 0) { if (eventClients.length > 0) {
sample[`event_${level}_tps`] = sum(eventClients.map((s) => s.event_tps)); sample[`event_${level}_tps`] = sum(eventClients.map((s) => s.event_tps));
sample[`event_${level}_mibs`] = sum(eventClients.map((s) => s.event_mibs)); sample[`event_${level}_mibs`] = sum(eventClients.map((s) => s.event_mibs));
sample[`event_${level}_notice`] = sum(eventClients.map((s) => s.event_notice));
sample[`event_${level}_auth_challenge`] = sum(
eventClients.map((s) => s.event_auth_challenge),
);
sample[`event_${level}_reply_unrecognized`] = sum(
eventClients.map((s) => s.event_reply_unrecognized),
);
sample[`event_${level}_ack_timeout`] = sum(
eventClients.map((s) => s.event_ack_timeout),
);
} }
} }
@@ -215,9 +269,17 @@ export function countEventsWritten(clientResults) {
for (const cr of clientResults) { for (const cr of clientResults) {
if (cr.status !== "ok") continue; if (cr.status !== "ok") continue;
const eventSection = cr.sections?.event; const eventSection = cr.sections?.event;
if (eventSection?.message_stats?.complete) { if (!eventSection?.message_stats) continue;
total += Number(eventSection.message_stats.complete) || 0;
const accepted = Number(eventSection.message_stats.accepted);
if (Number.isFinite(accepted)) {
total += Math.max(0, accepted);
continue;
} }
const complete = Number(eventSection.message_stats.complete) || 0;
const error = Number(eventSection.message_stats.error) || 0;
total += Math.max(0, complete - error);
} }
return total; return total;
} }

View File

@@ -265,7 +265,7 @@ common_parrhesia_env+=( -e PARRHESIA_LIMITS_MAX_NEGENTROPY_ITEMS_PER_SESSION=100
cmd="${1:-}" cmd="${1:-}"
if [[ -z "$cmd" ]]; then if [[ -z "$cmd" ]]; then
echo "usage: cloud-bench-server.sh <start-*|wipe-data-*|cleanup>" >&2 echo "usage: cloud-bench-server.sh <start-*|wipe-data-*|count-data-*|cleanup>" >&2
exit 1 exit 1
fi fi
@@ -626,6 +626,14 @@ EOF
wait_port 3355 120 haven wait_port 3355 120 haven
;; ;;
count-data-parrhesia-pg)
docker exec pg psql -U parrhesia -d parrhesia -At -c "SELECT count(*) FROM events"
;;
count-data-nostream)
docker exec nostream-db psql -U nostr_ts_relay -d nostr_ts_relay -At -c "SELECT count(*) FROM events"
;;
cleanup) cleanup)
cleanup_containers cleanup_containers
;; ;;

View File

@@ -27,7 +27,7 @@ Examples:
just e2e marmot just e2e marmot
just e2e node-sync just e2e node-sync
just bench compare just bench compare
just bench cloud --clients 3 --runs 3 just bench cloud --clients 3
EOF EOF
exit 0 exit 0
fi fi
@@ -77,7 +77,7 @@ Examples:
just bench collect just bench collect
just bench update --machine all just bench update --machine all
just bench at v0.5.0 just bench at v0.5.0
just bench cloud --clients 3 --runs 3 just bench cloud --clients 3
just bench cloud --targets parrhesia-pg,nostream,haven --nostream-ref main just bench cloud --targets parrhesia-pg,nostream,haven --nostream-ref main
EOF EOF
exit 0 exit 0

View File

@@ -18,7 +18,6 @@ Behavior:
- Adds smoke defaults when --quick is set (unless already provided): - Adds smoke defaults when --quick is set (unless already provided):
--server-type cx23 --server-type cx23
--client-type cx23 --client-type cx23
--runs 1
--clients 1 --clients 1
--connect-count 20 --connect-count 20
--connect-rate 20 --connect-rate 20
@@ -42,7 +41,7 @@ Everything else is passed through unchanged.
Examples: Examples:
just bench cloud just bench cloud
just bench cloud --quick just bench cloud --quick
just bench cloud --clients 2 --runs 1 --targets parrhesia-memory just bench cloud --clients 2 --targets parrhesia-memory
just bench cloud --image ghcr.io/owner/parrhesia:latest --threads 4 just bench cloud --image ghcr.io/owner/parrhesia:latest --threads 4
just bench cloud --no-monitoring just bench cloud --no-monitoring
just bench cloud --yes --datacenter auto just bench cloud --yes --datacenter auto
@@ -106,7 +105,6 @@ done
if [[ "$QUICK" == "1" ]]; then if [[ "$QUICK" == "1" ]]; then
add_default_if_missing "--server-type" "cx23" add_default_if_missing "--server-type" "cx23"
add_default_if_missing "--client-type" "cx23" add_default_if_missing "--client-type" "cx23"
add_default_if_missing "--runs" "1"
add_default_if_missing "--clients" "1" add_default_if_missing "--clients" "1"
add_default_if_missing "--connect-count" "20" add_default_if_missing "--connect-count" "20"

View File

@@ -207,7 +207,7 @@ const presentBaselines = [
...[...discoveredBaselines].filter((srv) => !preferredBaselineOrder.includes(srv)).sort((a, b) => a.localeCompare(b)), ...[...discoveredBaselines].filter((srv) => !preferredBaselineOrder.includes(srv)).sort((a, b) => a.localeCompare(b)),
]; ];
// --- Colour palette per server: [empty, warm, hot] --- // --- Colour palette per server: [cold, warm, hot] ---
const serverColours = { const serverColours = {
"parrhesia-pg": ["#93c5fd", "#3b82f6", "#1e40af"], "parrhesia-pg": ["#93c5fd", "#3b82f6", "#1e40af"],
"parrhesia-memory": ["#86efac", "#22c55e", "#166534"], "parrhesia-memory": ["#86efac", "#22c55e", "#166534"],
@@ -218,12 +218,12 @@ const serverColours = {
}; };
const levelStyles = [ const levelStyles = [
/* empty */ { dt: 3, pt: 6, ps: 0.7, lw: 1.5 }, /* cold */ { dt: 3, pt: 6, ps: 0.7, lw: 1.5 },
/* warm */ { dt: 2, pt: 8, ps: 0.8, lw: 1.5 }, /* warm */ { dt: 2, pt: 8, ps: 0.8, lw: 1.5 },
/* hot */ { dt: 1, pt: 7, ps: 1.0, lw: 2 }, /* hot */ { dt: 1, pt: 7, ps: 1.0, lw: 2 },
]; ];
const levels = ["empty", "warm", "hot"]; const levels = ["cold", "warm", "hot"];
const shortLabel = { const shortLabel = {
"parrhesia-pg": "pg", "parrhesia-memory": "mem", "parrhesia-pg": "pg", "parrhesia-memory": "mem",
@@ -233,19 +233,31 @@ const shortLabel = {
const allServers = ["parrhesia-pg", "parrhesia-memory", ...presentBaselines]; const allServers = ["parrhesia-pg", "parrhesia-memory", ...presentBaselines];
function isPhased(e) {
for (const srv of Object.values(e.servers || {})) {
if (srv.event_empty_tps !== undefined) return true;
}
return false;
}
// Build phased key: "event_tps" + "empty" → "event_empty_tps"
function phasedKey(base, level) { function phasedKey(base, level) {
const idx = base.lastIndexOf("_"); const idx = base.lastIndexOf("_");
return `${base.slice(0, idx)}_${level}_${base.slice(idx + 1)}`; return `${base.slice(0, idx)}_${level}_${base.slice(idx + 1)}`;
} }
function phasedValue(d, base, level) {
const direct = d?.[phasedKey(base, level)];
if (direct !== undefined) return direct;
if (level === "cold") {
// Backward compatibility for historical entries written with `empty` phase names.
const legacy = d?.[phasedKey(base, "empty")];
if (legacy !== undefined) return legacy;
}
return undefined;
}
function isPhased(e) {
for (const srv of Object.values(e.servers || {})) {
if (phasedValue(srv, "event_tps", "cold") !== undefined) return true;
}
return false;
}
// --- Emit linetype definitions (server × level) --- // --- Emit linetype definitions (server × level) ---
const plotLines = []; const plotLines = [];
for (let si = 0; si < allServers.length; si++) { for (let si = 0; si < allServers.length; si++) {
@@ -297,7 +309,7 @@ for (const panel of panels) {
plotLines.push(""); plotLines.push("");
} else { } else {
// Three columns per server (empty, warm, hot) // Three columns per server (cold, warm, hot)
const header = ["tag"]; const header = ["tag"];
for (const srv of allServers) { for (const srv of allServers) {
const sl = shortLabel[srv] || srv; const sl = shortLabel[srv] || srv;
@@ -311,7 +323,7 @@ for (const panel of panels) {
const d = e.servers?.[srv]; const d = e.servers?.[srv];
if (!d) { row.push("NaN", "NaN", "NaN"); continue; } if (!d) { row.push("NaN", "NaN", "NaN"); continue; }
if (phased) { if (phased) {
for (const lvl of levels) row.push(d[phasedKey(panel.base, lvl)] ?? "NaN"); for (const lvl of levels) row.push(phasedValue(d, panel.base, lvl) ?? "NaN");
} else { } else {
row.push("NaN", d[panel.base] ?? "NaN", "NaN"); // flat → warm only row.push("NaN", d[panel.base] ?? "NaN", "NaN"); // flat → warm only
} }
@@ -320,7 +332,7 @@ for (const panel of panels) {
} }
fs.writeFileSync(path.join(workDir, panel.file), rows.join("\n") + "\n", "utf8"); fs.writeFileSync(path.join(workDir, panel.file), rows.join("\n") + "\n", "utf8");
// Plot: three series per server (empty/warm/hot) // Plot: three series per server (cold/warm/hot)
const dataFile = `data_dir."/${panel.file}"`; const dataFile = `data_dir."/${panel.file}"`;
plotLines.push(`set title "${panel.label}"`); plotLines.push(`set title "${panel.label}"`);
plotLines.push(`set ylabel "${panel.ylabel}"`); plotLines.push(`set ylabel "${panel.ylabel}"`);
@@ -395,7 +407,7 @@ if (!pg || !mem) {
} }
// Detect phased entries — use hot fill level as headline metric // Detect phased entries — use hot fill level as headline metric
const phased = pg.event_empty_tps !== undefined; const phased = pg.event_cold_tps !== undefined || pg.event_empty_tps !== undefined;
// For phased entries, resolve "event_tps" → "event_hot_tps" etc. // For phased entries, resolve "event_tps" → "event_hot_tps" etc.
function resolveKey(key) { function resolveKey(key) {

View File

@@ -30,6 +30,45 @@ defmodule Parrhesia.API.EventsTest do
assert second_result.message == "duplicate: event already stored" assert second_result.message == "duplicate: event already stored"
end end
test "publish fanout includes sync-originated events when relay guard is disabled" do
with_sync_relay_guard(false)
join_multi_node_group!()
event = valid_event()
event_id = event["id"]
assert {:ok, %{accepted: true}} =
Events.publish(event, context: %RequestContext{caller: :sync})
assert_receive {:remote_fanout_event, %{"id" => ^event_id}}, 200
end
test "publish fanout skips sync-originated events when relay guard is enabled" do
with_sync_relay_guard(true)
join_multi_node_group!()
event = valid_event()
event_id = event["id"]
assert {:ok, %{accepted: true}} =
Events.publish(event, context: %RequestContext{caller: :sync})
refute_receive {:remote_fanout_event, %{"id" => ^event_id}}, 200
end
test "publish fanout still includes local-originated events when relay guard is enabled" do
with_sync_relay_guard(true)
join_multi_node_group!()
event = valid_event()
event_id = event["id"]
assert {:ok, %{accepted: true}} =
Events.publish(event, context: %RequestContext{caller: :local})
assert_receive {:remote_fanout_event, %{"id" => ^event_id}}, 200
end
test "query and count preserve read semantics through the shared API" do test "query and count preserve read semantics through the shared API" do
now = System.system_time(:second) now = System.system_time(:second)
first = valid_event(%{"content" => "first", "created_at" => now}) first = valid_event(%{"content" => "first", "created_at" => now})
@@ -53,6 +92,36 @@ defmodule Parrhesia.API.EventsTest do
) )
end end
defp with_sync_relay_guard(enabled?) when is_boolean(enabled?) do
[{:config, previous}] = :ets.lookup(Parrhesia.Config, :config)
sync =
previous
|> Map.get(:sync, [])
|> Keyword.put(:relay_guard, enabled?)
:ets.insert(Parrhesia.Config, {:config, Map.put(previous, :sync, sync)})
on_exit(fn ->
:ets.insert(Parrhesia.Config, {:config, previous})
end)
end
defp join_multi_node_group! do
case Process.whereis(:pg) do
nil ->
case :pg.start_link() do
{:ok, _pid} -> :ok
{:error, {:already_started, _pid}} -> :ok
end
_pid ->
:ok
end
:ok = :pg.join(Parrhesia.Fanout.MultiNode, self())
end
defp valid_event(overrides \\ %{}) do defp valid_event(overrides \\ %{}) do
base_event = %{ base_event = %{
"pubkey" => String.duplicate("1", 64), "pubkey" => String.duplicate("1", 64),

View File

@@ -22,6 +22,7 @@ defmodule Parrhesia.ConfigTest do
assert Parrhesia.Config.get([:limits, :max_filter_limit]) == 500 assert Parrhesia.Config.get([:limits, :max_filter_limit]) == 500
assert Parrhesia.Config.get([:database, :separate_read_pool?]) == false assert Parrhesia.Config.get([:database, :separate_read_pool?]) == false
assert Parrhesia.Config.get([:relay_url]) == "ws://localhost:4413/relay" assert Parrhesia.Config.get([:relay_url]) == "ws://localhost:4413/relay"
assert Parrhesia.Config.get([:sync, :relay_guard]) == false
assert Parrhesia.Config.get([:policies, :auth_required_for_writes]) == false assert Parrhesia.Config.get([:policies, :auth_required_for_writes]) == false
assert Parrhesia.Config.get([:policies, :marmot_media_max_imeta_tags_per_event]) == 8 assert Parrhesia.Config.get([:policies, :marmot_media_max_imeta_tags_per_event]) == 8
assert Parrhesia.Config.get([:policies, :marmot_media_reject_mip04_v1]) == true assert Parrhesia.Config.get([:policies, :marmot_media_reject_mip04_v1]) == true