14 Commits

Author SHA1 Message Date
11d959d0bd build: LICENSE, prepare Hex release
Some checks failed
CI / Test (OTP 27.2 / Elixir 1.18.2) (push) Failing after 1s
CI / Test (OTP 28.4 / Elixir 1.19.4 + E2E) (push) Failing after 1s
2026-04-06 08:59:47 +02:00
8a4ec953b4 dev: devenv update, fix legacy pre-commit hooks
Some checks failed
CI / Test (OTP 27.2 / Elixir 1.18.2) (push) Failing after 0s
CI / Test (OTP 28.4 / Elixir 1.19.4 + E2E) (push) Failing after 0s
2026-03-26 13:02:08 +01:00
39282c8a59 fix: bypass ACL for local callers 2026-03-26 13:01:37 +01:00
a74106d665 chore: Bump version to 0.8.0
Some checks failed
CI / Test (OTP 28.4 / Elixir 1.19.4 + E2E) (push) Failing after 0s
Release / Release Gate (push) Failing after 0s
Release / Build and publish image (push) Has been skipped
CI / Test (OTP 27.2 / Elixir 1.18.2) (push) Failing after 0s
2026-03-26 01:23:23 +01:00
d34b398eed Merge remote-tracking branch 'public/master' (GH actions, test stability) 2026-03-26 00:49:25 +01:00
b402d95e47 feat: add sync relay guard fanout gating and env config
Some checks failed
CI / Test (OTP 27.2 / Elixir 1.18.2) (push) Failing after 0s
CI / Test (OTP 28.4 / Elixir 1.19.4 + E2E) (push) Failing after 0s
2026-03-26 00:36:00 +01:00
8309a89ba7 perf: tune cloud seeding and lower hot fill target
Some checks failed
CI / Test (OTP 27.2 / Elixir 1.18.2) (push) Failing after 0s
CI / Test (OTP 28.4 / Elixir 1.19.4 + E2E) (push) Failing after 0s
2026-03-20 21:33:17 +01:00
9ed1d80b7f bench: simplify cloud bench flow and align phased naming
Some checks failed
CI / Test (OTP 27.2 / Elixir 1.18.2) (push) Failing after 0s
CI / Test (OTP 28.4 / Elixir 1.19.4 + E2E) (push) Failing after 0s
2026-03-20 20:56:32 +01:00
4bd8663126 bench/fix: prefix bracketed cloud bench logs with T+ timestamps
Some checks failed
CI / Test (OTP 27.2 / Elixir 1.18.2) (push) Failing after 0s
CI / Test (OTP 28.4 / Elixir 1.19.4 + E2E) (push) Failing after 0s
2026-03-20 18:10:29 +01:00
f7ff3a4bd7 bench: use nostr-bench seed mode and expose relay json counters 2026-03-20 18:00:14 +01:00
8f22eb2097 build: pin nostr-bench submodule in nix and cloud bench pipeline 2026-03-20 17:43:31 +01:00
6b59fa6328 build: nostr-bench submodule
Some checks failed
CI / Test (OTP 27.2 / Elixir 1.18.2) (push) Failing after 0s
CI / Test (OTP 28.4 / Elixir 1.19.4 + E2E) (push) Failing after 0s
2026-03-20 14:33:51 +01:00
070464f2eb bench: Cloud seeding
Some checks failed
CI / Test (OTP 27.2 / Elixir 1.18.2) (push) Failing after 0s
CI / Test (OTP 28.4 / Elixir 1.19.4 + E2E) (push) Failing after 0s
2026-03-20 14:19:58 +01:00
6bd0143de4 chore: Bump version to 0.7.0, 1st beta
Some checks failed
CI / Test (OTP 27.2 / Elixir 1.18.2) (push) Failing after 0s
CI / Test (OTP 28.4 / Elixir 1.19.4 + E2E) (push) Failing after 0s
Release / Release Gate (push) Failing after 0s
Release / Build and publish image (push) Has been skipped
2026-03-20 03:44:24 +01:00
30 changed files with 1345 additions and 365 deletions

View File

@@ -13,6 +13,7 @@ POOL_SIZE=20
# PARRHESIA_POLICIES_AUTH_REQUIRED_FOR_WRITES=false
# PARRHESIA_POLICIES_AUTH_REQUIRED_FOR_READS=false
# PARRHESIA_POLICIES_MIN_POW_DIFFICULTY=0
# PARRHESIA_SYNC_RELAY_GUARD=false
# PARRHESIA_FEATURES_VERIFY_EVENT_SIGNATURES=true
# PARRHESIA_METRICS_ENABLED_ON_MAIN_ENDPOINT=true
# PARRHESIA_METRICS_PRIVATE_NETWORKS_ONLY=true

View File

@@ -55,8 +55,11 @@ jobs:
steps:
- name: Checkout
uses: actions/checkout@v4
with:
submodules: recursive
- name: Init submodules
run: |
git submodule init marmot-ts
git submodule update --recursive
- name: Set up Elixir + OTP
uses: erlef/setup-beam@v1

View File

@@ -54,8 +54,11 @@ jobs:
steps:
- name: Checkout
uses: actions/checkout@v4
with:
submodules: recursive
- name: Init submodules
run: |
git submodule init marmot-ts
git submodule update --recursive
- name: Set up Elixir + OTP
uses: erlef/setup-beam@v1

3
.gitmodules vendored
View File

@@ -4,3 +4,6 @@
[submodule "docs/nips"]
path = docs/nips
url = https://github.com/nostr-protocol/nips.git
[submodule "nix/nostr-bench"]
path = nix/nostr-bench
url = ssh://gitea@git.teralink.net:10322/self/nostr-bench.git

25
LICENSE Normal file
View File

@@ -0,0 +1,25 @@
BSD 2-Clause License
Copyright (c) 2026, Steffen Beyer <steffen@beyer.io>
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
1. Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

View File

@@ -264,6 +264,7 @@ CSV env vars use comma-separated values. Boolean env vars accept `1/0`, `true/fa
| `:nip66` | config-file driven | see table below | Built-in NIP-66 discovery / monitor publisher |
| `:sync.path` | `PARRHESIA_SYNC_PATH` | `nil` | Optional path to sync peer config |
| `:sync.start_workers?` | `PARRHESIA_SYNC_START_WORKERS` | `true` | Start outbound sync workers on boot |
| `:sync.relay_guard` | `PARRHESIA_SYNC_RELAY_GUARD` | `false` | Suppress multi-node re-fanout for sync-originated events |
| `:limits` | `PARRHESIA_LIMITS_*` | see table below | Runtime override group |
| `:policies` | `PARRHESIA_POLICIES_*` | see table below | Runtime override group |
| `:listeners` | config-file driven | see notes below | Ingress listeners with bind, transport, feature, auth, network, and baseline ACL settings |
@@ -667,5 +668,3 @@ For Marmot client end-to-end checks (TypeScript/Node suite using `marmot-ts`, in
```bash
just e2e marmot
```
```

View File

@@ -39,7 +39,8 @@ config :parrhesia,
],
sync: [
path: nil,
start_workers?: true
start_workers?: true,
relay_guard: false
],
limits: [
max_frame_bytes: 1_048_576,

View File

@@ -161,6 +161,7 @@ if config_env() == :prod do
retention_defaults = Application.get_env(:parrhesia, :retention, [])
features_defaults = Application.get_env(:parrhesia, :features, [])
acl_defaults = Application.get_env(:parrhesia, :acl, [])
sync_defaults = Application.get_env(:parrhesia, :sync, [])
default_pool_size = Keyword.get(repo_defaults, :pool_size, 32)
default_queue_target = Keyword.get(repo_defaults, :queue_target, 1_000)
@@ -748,7 +749,12 @@ if config_env() == :prod do
start_workers?:
bool_env.(
"PARRHESIA_SYNC_START_WORKERS",
Keyword.get(Application.get_env(:parrhesia, :sync, []), :start_workers?, true)
Keyword.get(sync_defaults, :start_workers?, true)
),
relay_guard:
bool_env.(
"PARRHESIA_SYNC_RELAY_GUARD",
Keyword.get(sync_defaults, :relay_guard, false)
)
],
moderation_cache_enabled:

View File

@@ -10,7 +10,7 @@
vips,
}: let
pname = "parrhesia";
version = "0.7.0";
version = "0.8.0";
beamPackages = beam.packages.erlang_28.extend (
final: _prev: {

View File

@@ -3,10 +3,10 @@
"devenv": {
"locked": {
"dir": "src/modules",
"lastModified": 1768736080,
"lastModified": 1774475276,
"owner": "cachix",
"repo": "devenv",
"rev": "efa86311444852d24137d14964b449075522d489",
"rev": "f8ca2c061ec2feceee1cf1c5e52c92f58b6aec9c",
"type": "github"
},
"original": {
@@ -40,10 +40,10 @@
]
},
"locked": {
"lastModified": 1767281941,
"lastModified": 1774104215,
"owner": "cachix",
"repo": "git-hooks.nix",
"rev": "f0927703b7b1c8d97511c4116eb9b4ec6645a0fa",
"rev": "f799ae951fde0627157f40aec28dec27b22076d0",
"type": "github"
},
"original": {
@@ -73,11 +73,14 @@
}
},
"nixpkgs": {
"inputs": {
"nixpkgs-src": "nixpkgs-src"
},
"locked": {
"lastModified": 1767052823,
"lastModified": 1774287239,
"owner": "cachix",
"repo": "devenv-nixpkgs",
"rev": "538a5124359f0b3d466e1160378c87887e3b51a4",
"rev": "fa7125ea7f1ae5430010a6e071f68375a39bd24c",
"type": "github"
},
"original": {
@@ -87,11 +90,44 @@
"type": "github"
}
},
"nixpkgs-src": {
"flake": false,
"locked": {
"lastModified": 1769922788,
"narHash": "sha256-H3AfG4ObMDTkTJYkd8cz1/RbY9LatN5Mk4UF48VuSXc=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "207d15f1a6603226e1e223dc79ac29c7846da32e",
"type": "github"
},
"original": {
"owner": "NixOS",
"ref": "nixpkgs-unstable",
"repo": "nixpkgs",
"type": "github"
}
},
"nostr-bench-src": {
"flake": false,
"locked": {
"lastModified": 1774020724,
"owner": "serpent213",
"repo": "nostr-bench",
"rev": "8561b84864ce1269b26304808c64219471999caf",
"type": "github"
},
"original": {
"owner": "serpent213",
"repo": "nostr-bench",
"type": "github"
}
},
"root": {
"inputs": {
"devenv": "devenv",
"git-hooks": "git-hooks",
"nixpkgs": "nixpkgs",
"nostr-bench-src": "nostr-bench-src",
"pre-commit-hooks": [
"git-hooks"
]

View File

@@ -73,7 +73,9 @@ in {
vips.overrideAttrs (oldAttrs: {
buildInputs = oldAttrs.buildInputs ++ [mozjpeg];
});
nostr-bench = pkgs.callPackage ./nix/nostr-bench.nix {};
nostr-bench = pkgs.callPackage ./nix/nostr-bench.nix {
nostrBenchSrc = inputs.nostr-bench-src;
};
in
with pkgs;
[
@@ -187,6 +189,21 @@ in {
# https://devenv.sh/scripts/
enterShell = ''
cleanup_stale_git_hook_legacy() {
hooks_dir="$(git rev-parse --git-path hooks 2>/dev/null)" || return 0
for legacy_hook in "$hooks_dir"/*.legacy; do
[ -e "$legacy_hook" ] || continue
if grep -Fq "File generated by pre-commit: https://pre-commit.com" "$legacy_hook"; then
rm -f "$legacy_hook"
echo "Removed stale legacy git hook: $legacy_hook"
fi
done
}
cleanup_stale_git_hook_legacy
echo
elixir --version
echo

View File

@@ -2,6 +2,9 @@
inputs:
nixpkgs:
url: github:cachix/devenv-nixpkgs/rolling
nostr-bench-src:
url: github:serpent213/nostr-bench
flake: false
# If you're using non-OSS software, you can set allowUnfree to true.
# allowUnfree: true

344
docs/BETA_REVIEW.md Normal file
View File

@@ -0,0 +1,344 @@
# Parrhesia Beta: Production-Readiness Gap Assessment
**Date:** 2026-03-20
**Version:** 0.7.0
**Scope:** Delta analysis from beta promotion — what stands between this codebase and confident public-facing production deployment.
---
## Production Readiness Scorecard
| # | Dimension | Rating | Summary |
|---|----------------------------------|--------|----------------------------------------------|
| 1 | Operational Resilience | 🟡 | Graceful shutdown partial; no DB circuit-breaking |
| 2 | Multi-Node / Clustering | 🟡 | Best-effort only; acceptable for single-node prod |
| 3 | Load & Capacity Characterisation | 🟡 | Benchmarks exist but no defined capacity model |
| 4 | Deployment & Infrastructure | 🟡 | Strong Nix/Docker base; missing runbooks and migration strategy |
| 5 | Security Hardening | 🟢 | Solid for production with reverse proxy |
| 6 | Data Integrity & Consistency | 🟢 | Transaction-wrapped writes with dedup; minor multi-node edge cases |
| 7 | Observability Completeness | 🟡 | Excellent metrics; no dashboards, alerts, or tracing |
| 8 | Technical Debt (Prod Impact) | 🟡 | Manageable; connection.ex size is the main concern |
---
## 1. Operational Resilience — 🟡
### What's good
- **No `Process.sleep` on any hot path.** Zero occurrences in `lib/`. Clean async message passing throughout.
- **WebSocket keepalive** implemented: 30s ping, 10s pong timeout, auto-close on timeout.
- **Outbound queue backpressure** well-designed: bounded queue (256 default), configurable overflow strategy (`:close`/`:drop_oldest`/`:drop_newest`), pressure telemetry at 75% threshold.
- **Connection isolation:** Each WebSocket is a separate process; one crash does not propagate.
- **Graceful connection close on shutdown:** `handle_info({:EXIT, _, :shutdown}, ...)` drains outbound frames before closing with code 1012 ("service restart"). This is good.
### Gaps
**G1.1 — No DB circuit-breaking or backoff on PostgreSQL unavailability.**
Ecto's connection pool (`db_connection`/`DBConnection`) will queue checkout requests up to `queue_target` (1000ms) / `queue_interval` (5000ms), then raise `DBConnection.ConnectionError`. These errors propagate as storage failures in the ingest path and return NOTICE errors to clients. However:
- There is no circuit breaker to fast-reject requests when the DB is known-down, meaning every ingest/query attempt during an outage burns a pool checkout timeout slot.
- On DB recovery, all queued checkouts may succeed simultaneously (thundering herd).
- **Impact:** During a PostgreSQL failover (typically 1030s), connection processes pile up waiting on the pool. Latency spikes for all connected clients. Memory pressure from queued processes.
- **Mitigation:** Ecto's built-in queue management provides partial protection. For a relay with ≤1000 concurrent connections this is likely survivable without circuit-breaking. For higher connection counts, consider a fast-fail wrapper around storage calls when the pool reports consecutive failures.
**G1.2 — Metrics scrape on the hot path.**
`/metrics` calls `TelemetryMetricsPrometheus.Core.scrape/1` synchronously within the HTTP request handler. This serialises metric aggregation and formatting. If the Prometheus reporter's internal state is large (many unique tag combinations), scraping can take 10100ms. This runs on a Bandit acceptor process — it does not block WebSocket connections directly, but a slow scrape under high cardinality could make the health endpoint unresponsive if metrics and health share the same listener.
- **Current mitigation:** Metrics can be isolated to a dedicated listener via `PARRHESIA_METRICS_ENDPOINT_*` config. If deployed this way, impact is isolated.
- **Recommendation:** Document the dedicated metrics listener as required for production. Consider adding a scrape timeout guard.
**G1.3 — Supervisor shutdown timeout is OTP default (5s).**
The `Parrhesia.Runtime` supervisor uses `:one_for_one` strategy with default child shutdown specs. Bandit listeners have their own shutdown behavior, but there is no explicit `shutdown: N` on the endpoint child spec. Under load with many connections, 5s may not be enough to drain all outbound queues.
- **Recommendation:** Set explicit `shutdown: 15_000` on `Parrhesia.Web.Endpoint` child spec. Bandit supports graceful drain on listener stop.
---
## 2. Multi-Node / Clustering — 🟡
### Current state
Per `docs/CLUSTER.md`, clustering is **implemented but explicitly best-effort and untested**:
- `:pg`-based process groups for cross-node fanout.
- No automatic cluster discovery (no libcluster).
- ETS subscription index is node-local.
- No durable inter-node transport; no replay on reconnect.
- No explicit acknowledgement between nodes.
### Assessment for production
**For single-node production deployment: not a blocker.** The clustering code is unconditionally started (`MultiNode` joins `:pg` on init) but with a single node, `get_members/0` returns only self, and the `Enum.reject(&(&1 == self()))` filter means no remote sends occur. No performance overhead.
**For multi-node production: not ready.** Key issues:
- **Subscription inconsistency on netsplit:** Events ingested on node A during a split are never delivered to subscribers on node B. No catch-up mechanism exists. Clients must reconnect and re-query to recover.
- **Node departure drops subscriptions silently:** When a node leaves the cluster, subscribers on that node lose their connections (normal). Subscribers on other nodes are unaffected. But events that were in-flight from the departed node are lost.
- **No cluster health observability:** No metrics for inter-node fanout lag, message drops, or membership changes.
**Recommendation for initial production:** Deploy single-node. Clustering is a Phase B concern per the documented roadmap.
---
## 3. Load & Capacity Characterisation — 🟡
### What exists
- `LoadSoakTest` asserts p95 fanout enqueue/drain < 25ms.
- `bench/` directory with `nostr-bench` submodule for external load testing.
- Cloud bench orchestration scripts (`scripts/cloud_bench_orchestrate.mjs`, `scripts/cloud_bench_server.sh`).
### Gaps
**G3.1 — No documented capacity model.**
There is no documented answer to: "How many connections / events per second can one node handle before degradation?" The `LoadSoakTest` runs locally with synthetic data — useful for regression detection but not representative of production traffic patterns.
**G3.2 — Multi-filter query scaling is in-memory dedup.**
`Postgres.Events.query/3` runs each filter as a separate SQL query, collects all results into memory, and deduplicates with `deduplicate_events/1` (Map.update accumulation). With many overlapping filters or high-cardinality results, this could produce significant memory pressure per-request.
- At realistic scales (< 10 filters, < 1000 results per filter), this is fine.
- At adversarial scales (32 subscriptions × large result sets), a single REQ could allocate substantial memory.
- **Current mitigation:** `max_tag_values_per_filter` (128) and query `LIMIT` bounds exist. The risk is bounded but not eliminated.
**G3.3 — No query performance benchmarks against large datasets.**
No evidence of testing against 100M+ events with monthly partitions. Partition pruning is implemented, but query plans may degrade if the partition list grows large (PostgreSQL planner overhead scales with partition count).
**Recommendation:** Before production, run `nostr-bench` at target load (e.g., 500 concurrent connections, 100 events/sec ingest, 1000 active subscriptions) and document the resulting latency profile. This becomes the baseline capacity model.
---
## 4. Deployment & Infrastructure Readiness — 🟡
### What's good
- **Docker image via Nix:** Non-root user (65534:65534), minimal base, cacerts bundled, SSL_CERT_FILE set. This is production-quality container hygiene.
- **OTP release:** `mix release` with `Parrhesia.Release.migrate/0` for safe migration execution.
- **CI pipeline:** Multi-matrix testing (OTP 27/28, Elixir 1.18/1.19), format/credo/unused deps checks, E2E tests.
- **Environment-based configuration:** All critical settings overridable via `PARRHESIA_*` env vars in `runtime.exs`.
- **Secrets:** No secrets committed. DB credentials via `DATABASE_URL`, identity key via env or file path.
### Gaps
**G4.1 — No zero-downtime migration strategy.**
`Parrhesia.Release.migrate/0` runs `Ecto.Migrator.run/4` with `:up`. Under replicated deployments (rolling update with 2+ instances), there is no advisory lock or migration guard — two instances starting simultaneously could race on migrations. Ecto's default migrator uses `pg_advisory_lock` via `Ecto.Migration.Runner`, so this is actually safe for PostgreSQL. However:
- **DDL migrations (CREATE INDEX CONCURRENTLY, ALTER TABLE) need careful handling.** The existing migrations use standard `CREATE TABLE` and `CREATE INDEX` which acquire ACCESS EXCLUSIVE locks. Running these against a live database will block reads and writes for the duration.
- **Recommendation:** For production, migrations should be run as a separate step before deploying new code (the compose.yaml already has a `migrate` service — extend this pattern).
**G4.2 — No operational runbooks.**
There are no documented procedures for:
- Rolling restart / blue-green deploy
- Partition pruning and retention tuning
- Runtime pubkey banning (the NIP-86 management API exists but isn't documented for ops use)
- DB failover response
- Scaling (horizontal or vertical)
**G4.3 — No health check in Docker image.**
The Nix-built Docker image has no `HEALTHCHECK` instruction. The `/health` and `/ready` endpoints exist but aren't wired into container orchestration.
- **Recommendation:** Add `HEALTHCHECK CMD curl -f http://localhost:4413/ready || exit 1` to the Docker image definition, or document the readiness endpoint for Kubernetes probes.
**G4.4 — No disaster recovery plan.**
No documented RTO/RPO. If the primary DB is lost, recovery depends entirely on external backup infrastructure. The relay has no built-in data export or snapshot capability.
---
## 5. Security Hardening — 🟢
### Assessment
The security posture is solid for production behind a reverse proxy:
- **TLS:** Full support for server, mutual, and proxy-terminated TLS modes. Cipher suite selection (strong/compatible). Certificate pin verification.
- **Rate limiting:** Three layers — relay-wide (10k/s), per-IP (1k/s), per-connection (120/s). All configurable.
- **Metrics endpoint:** Access-controlled via `metrics_allowed?/2` — supports private-network-only restriction and bearer token auth. Tested.
- **NIP-42 auth:** Constant-time comparison via `Plug.Crypto.secure_compare/2` (addressed in beta).
- **NIP-98:** Replay protection, event freshness check (< 60s), signature verification.
- **Input validation:** Binary field length constraints at DB level (migration 7). Event size limits at WebSocket frame level.
- **IP controls:** Trusted proxy CIDR configuration, X-Forwarded-For parsing, IP blocklist table.
- **Audit logging:** `management_audit_logs` table tracks admin actions.
- **No secrets in git.** Environment variable or file-path based secret injection.
### Minor considerations (not blocking)
- No integration with external threat intel feeds or IP reputation services. This is an infrastructure concern, not an application concern.
- DDoS mitigation assumed to be at load balancer / CDN layer. Application-level rate limiting is defense-in-depth, not primary.
- **Recommendation:** Document the expected deployment topology (Caddy/Nginx → Parrhesia) and which security controls are expected at each layer.
---
## 6. Data Integrity & Consistency — 🟢
### What's good
- **Duplicate event prevention:** Two-layer defence:
1. `event_ids` table with unique PK on `id``INSERT ... ON CONFLICT DO NOTHING`.
2. If `inserted == 0`, transaction rolls back with `:duplicate_event`.
3. Separate unique index on `events.id` as belt-and-suspenders.
- **Atomic writes:** `put_event/2` wraps `insert_event_id!`, `insert_event!`, `insert_tags!`, and `upsert_state_tables!` in a single `Repo.transaction/1`. Partial writes (event without tags) cannot occur.
- **Replaceable/addressable event state:** Upsert logic in state tables with correct conflict resolution (higher `created_at` wins, then lower `id` as tiebreaker via `candidate_wins_state?/2`).
### Minor considerations
**G6.1 — Expiration worker concurrency on multi-node.**
`ExpirationWorker` runs `Repo.delete_all/1` against all expired events. If two nodes run this worker against the same database, both execute the same DELETE query. PostgreSQL handles this safely (the second DELETE finds 0 rows), and the worker is idempotent. **Not a problem.**
**G6.2 — Partition pruning and sync.**
`PartitionRetentionWorker.drop_partition/1` drops entire monthly partitions. If negentropy sync is in progress against events in that partition, the sync session's cached refs become stale. The session would fail or return incomplete results.
- **Impact:** Low. Partition drops are infrequent (daily check, at most 1 per run). Negentropy sessions are short-lived (60s idle timeout).
- **Recommendation:** No action needed for initial production. If operating as a sync source relay, consider pausing sync during partition drops.
---
## 7. Observability Completeness — 🟡
### What's good
Metrics coverage is comprehensive — 34+ distinct metrics covering:
- Ingest: event count by outcome/reason, duration distribution
- Query: request count, duration, result cardinality
- Fanout: duration, candidates considered, events enqueued, batch size
- Connection: outbound queue depth/pressure/overflow/drop, mailbox depth
- Rate limiting: hit count by scope
- DB: query count/total_time/queue_time/query_time/decode_time/idle_time by repo role
- Maintenance: expiration purge count/duration, partition retention drops/duration
- VM: memory (total/processes/system/atom/binary/ets)
- Listener: active connections, active subscriptions
Readiness endpoint checks critical process liveness. Health endpoint for basic reachability.
### Gaps
**G7.1 — No dashboards or alerting rules.**
The metrics exist but there are no Grafana dashboard JSON files, no Prometheus alerting rules, and no documented alert thresholds. An operator deploying this relay would need to build observability from scratch.
- **Recommendation:** Ship a `deploy/grafana/` directory with a dashboard JSON and a `deploy/prometheus/alerts.yml` with rules for:
- `parrhesia_db_query_queue_time_ms` p95 > 100ms (pool saturation)
- `parrhesia_connection_outbound_queue_overflow_count` rate > 0 (clients being dropped)
- `parrhesia_rate_limit_hits_count` rate sustained > threshold (potential abuse)
- `parrhesia_vm_memory_total_bytes` > 80% of available
- Listener connection count approaching `max_connections`
**G7.2 — No distributed tracing or request correlation IDs.**
Events flow through validate → policy → persist → fanout without a correlation ID tying the stages together. Log-based debugging of "why didn't this event reach subscriber X" requires manual PID correlation across log lines.
- **Impact:** Tolerable for initial production at moderate scale. Becomes painful at high event rates.
**G7.3 — No synthetic monitoring.**
No built-in probe that ingests a canary event and verifies it arrives at a subscriber. End-to-end relay health depends on external monitoring.
- **Recommendation:** This is best implemented as an external tool. Not blocking.
---
## 8. Technical Debt with Production Impact — 🟡
### G8.1 — `connection.ex` at 2,116 lines
This module is the per-connection state machine handling EVENT, REQ, CLOSE, AUTH, COUNT, NEG-*, keepalive, outbound queue management, rate limiting, and all associated telemetry. It is the single most critical file for production incident response.
**Production risk:** During a production incident involving connection behavior, an on-call engineer needs to quickly navigate this module. At 2,116 lines with interleaved concerns (protocol parsing, policy enforcement, queue management, telemetry emission), this slows incident response.
**Recommendation (M-sized effort):** Extract into focused modules:
- `Connection.Ingest` — EVENT handling and policy application
- `Connection.Subscription` — REQ/CLOSE management and initial query streaming
- `Connection.OutboundQueue` — queue/drain/overflow logic
- `Connection.Keepalive` — ping/pong state machine
The main `Connection` module would become an orchestrator delegating to these. This is a refactor-only change with no behavioral impact.
### G8.2 — Multi-filter in-memory dedup
`deduplicate_events/1` accumulates all query results into a Map before deduplication. With 32 subscriptions (the max) and generous limits, worst case is:
- 32 filters × 5000 result limit = 160,000 events loaded into memory per REQ.
Each event struct is ~500 bytes minimum, so ~80MB per pathological request. This is bounded but could be weaponised by an attacker sending many concurrent REQs with overlapping filters.
**Current mitigation:** Per-connection subscription limit (32) and query result limits bound the damage. Per-IP rate limiting adds friction.
**Recommendation:** Not blocking for production. Monitor `parrhesia.query.results.count` distribution. If p99 > 10,000, investigate query patterns.
### G8.3 — Per-pubkey rate limiting absent
Rate limiting is currently per-IP and relay-wide. An attacker using a botnet (many IPs, one pubkey) bypasses IP-based limits. Per-pubkey rate limiting would catch this.
**Impact:** Medium for a public relay; low for an invite-only (NIP-43) relay.
**Recommendation (S-sized effort):** Add a per-pubkey event ingest limiter similar to `IPEventIngestLimiter`, keyed by `event.pubkey`. Apply after signature verification but before storage.
### G8.4 — Negentropy session memory ceiling
Negentropy session bounds:
- Max 10,000 total sessions (`@default_max_total_sessions`)
- Max 8 per connection (`@default_max_sessions_per_owner`)
- Max 50,000 items per session (`@default_max_items_per_session`)
- 60s idle timeout with 10s sweep interval
Worst case: 10,000 sessions × 50,000 items × ~40 bytes/ref = ~20GB. This is the theoretical maximum under adversarial session creation.
**Realistic ceiling:** The `open/6` path runs a DB query bounded by `max_items_per_session + 1`. At 50k items, this query itself provides backpressure (it takes time). An attacker would need 10,000 concurrent connections each opening 8 sessions, each returning 50k results. The relay-wide connection limit and rate limiting make this implausible in practice.
**Recommendation:** Reduce `@default_max_items_per_session` to 10,000 for production (reduces theoretical ceiling to ~4GB). This is a config change, not a code change.
---
## Critical Path to Production
Ordered by priority. Items above the line are required before production traffic; items below are strongly recommended.
| # | Work Item | Dimension | Effort |
|---|-----------|-----------|--------|
| 1 | Set explicit shutdown timeout on Endpoint child spec | Operational | S |
| 2 | Document dedicated metrics listener as production requirement | Operational | S |
| 3 | Add HEALTHCHECK to Docker image or document K8s probes | Deployment | S |
| 4 | Run capacity benchmark at target load and document results | Load | M |
| 5 | Ship Grafana dashboard + Prometheus alert rules | Observability | M |
| 6 | Write operational runbook (deploy, rollback, ban, failover) | Deployment | M |
| 7 | Document migration strategy (run before deploy, not during) | Deployment | S |
| --- | --- | --- | --- |
| 8 | Add per-pubkey rate limiting | Security | S |
| 9 | Reduce default negentropy items-per-session to 10k | Security | S |
| 10 | Extract connection.ex into sub-modules | Debt | M |
| 11 | Add request correlation IDs to event lifecycle | Observability | M |
| 12 | Add DB pool health fast-fail wrapper | Operational | M |
---
## Production Risk Register
| ID | Risk | Likelihood | Impact | Mitigation |
|----|------|-----------|--------|------------|
| R1 | PostgreSQL failover causes latency spike for all connections | Medium | High | G1.1: Ecto queue management provides partial protection. Add pool health telemetry alerting. Consider circuit breaker at high connection counts. |
| R2 | Slow /metrics scrape blocks health checks | Low | Medium | G1.2: Deploy dedicated metrics listener (already supported). |
| R3 | Ungraceful shutdown drops in-flight events | Low | Medium | G1.3: Set explicit shutdown timeout. Connection drain logic already exists. |
| R4 | Multi-IP spam campaign bypasses rate limiting | Medium | Medium | G8.3: Add per-pubkey rate limiter. NIP-43 invite-only mode mitigates for private relays. |
| R5 | Large REQ with many overlapping filters causes memory spike | Low | Medium | G8.2: Bounded by existing limits. Monitor query result cardinality. |
| R6 | No alerting means silent degradation | Medium | High | G7.1: Ship dashboard and alert rules before production. |
| R7 | DDL migration blocks reads during rolling deploy | Low | High | G4.1: Run migrations as separate pre-deploy step. |
| R8 | Adversarial negentropy session creation exhausts memory | Low | High | G8.4: Reduce max items per session. Existing session limits provide protection. |
| R9 | No runbooks slows incident response | Medium | Medium | G4.2: Write runbooks for common ops tasks. |
| R10 | connection.ex complexity slows debugging | Medium | Low | G8.1: Extract sub-modules. Not urgent but improves maintainability. |
---
## Final Verdict
### 🟡 Ready for Limited Production
**Constraints for initial deployment:**
1. **Single-node only.** Multi-node clustering is best-effort and should not be relied upon for production traffic. Deploy one node with a properly sized PostgreSQL instance.
2. **Behind a reverse proxy.** Deploy behind Caddy, Nginx, or a cloud load balancer for TLS termination, DDoS mitigation, and connection limits. Document the expected topology.
3. **Moderate traffic cap.** Without a validated capacity model, start with conservative limits:
- ≤ 2,000 concurrent WebSocket connections
- ≤ 500 events/second ingest rate
- Monitor `db.query.queue_time.ms` p95 and `connection.outbound_queue.overflow.count` as scaling signals.
4. **Observability must be deployed alongside.** The metrics exist but dashboards and alerts do not. Do not go live without at minimum:
- Prometheus scraping the dedicated metrics listener
- Alerts on DB queue time, outbound queue overflow, and VM memory
- Log aggregation with ERROR-level alerts
5. **Migrations run pre-deploy.** Use the existing compose.yaml `migrate` service pattern. Never run migrations as part of application startup in a multi-replica deployment.
**What's strong:**
- OTP supervision architecture is clean and fault-isolated
- Data integrity layer is well-designed (transactional writes, dedup, constraint enforcement)
- Security posture is production-appropriate
- Telemetry coverage is comprehensive
- Container image follows best practices
- No blocking issues in the hot path (no sleeps, no synchronous calls, bounded queues)
**The codebase is architecturally sound for production.** The gaps are operational (runbooks, dashboards, capacity planning) rather than structural. A focused sprint addressing items 17 from the critical path would clear the way for a controlled production launch.

View File

@@ -2,6 +2,8 @@
description = "Parrhesia Nostr relay";
inputs.nixpkgs.url = "github:NixOS/nixpkgs/nixos-unstable";
# Can be reenabled once patched nostr-bench is up on GitHub
# inputs.self.submodules = true;
outputs = {nixpkgs, ...}: let
systems = [

View File

@@ -71,6 +71,9 @@ defmodule Parrhesia.API.ACL do
`opts[:context]` defaults to an empty `Parrhesia.API.RequestContext`, which means protected
subjects will fail with `{:error, :auth_required}` until authenticated pubkeys are present.
Local callers bypass ACL enforcement entirely. ACL is intended to protect external sync traffic,
not trusted in-process calls.
"""
@spec check(atom(), map(), keyword()) :: :ok | {:error, term()}
def check(capability, subject, opts \\ [])
@@ -80,13 +83,8 @@ defmodule Parrhesia.API.ACL do
context = Keyword.get(opts, :context, %RequestContext{})
with {:ok, normalized_capability} <- normalize_capability(capability),
{:ok, normalized_context} <- normalize_context(context),
{:ok, protected_filters} <- protected_filters() do
if protected_subject?(normalized_capability, subject, protected_filters) do
authorize_subject(normalized_capability, subject, normalized_context)
else
:ok
end
{:ok, normalized_context} <- normalize_context(context) do
maybe_authorize_subject(normalized_capability, subject, normalized_context)
end
end
@@ -134,6 +132,18 @@ defmodule Parrhesia.API.ACL do
end
end
defp maybe_authorize_subject(_capability, _subject, %RequestContext{caller: :local}), do: :ok
defp maybe_authorize_subject(capability, subject, %RequestContext{} = context) do
with {:ok, protected_filters} <- protected_filters() do
if protected_subject?(capability, subject, protected_filters) do
authorize_subject(capability, subject, context)
else
:ok
end
end
end
defp list_rules_for_capability(capability) do
Storage.acl().list_rules(%{}, principal_type: :pubkey, capability: capability)
end

View File

@@ -87,7 +87,7 @@ defmodule Parrhesia.API.Events do
end
Dispatcher.dispatch(event)
maybe_publish_multi_node(event)
maybe_publish_multi_node(event, context)
{:ok,
%PublishResult{
@@ -312,9 +312,15 @@ defmodule Parrhesia.API.Events do
end
end
defp maybe_publish_multi_node(event) do
MultiNode.publish(event)
:ok
defp maybe_publish_multi_node(event, %RequestContext{} = context) do
relay_guard? = Parrhesia.Config.get([:sync, :relay_guard], false)
if relay_guard? and context.caller == :sync do
:ok
else
MultiNode.publish(event)
:ok
end
catch
:exit, _reason -> :ok
end

28
mix.exs
View File

@@ -4,13 +4,15 @@ defmodule Parrhesia.MixProject do
def project do
[
app: :parrhesia,
version: "0.7.0",
version: "0.8.0",
elixir: "~> 1.18",
elixirc_paths: elixirc_paths(Mix.env()),
start_permanent: Mix.env() == :prod,
deps: deps(),
aliases: aliases(),
docs: docs()
docs: docs(),
description: description(),
package: package()
]
end
@@ -52,15 +54,17 @@ defmodule Parrhesia.MixProject do
{:telemetry_poller, "~> 1.0"},
{:telemetry_metrics_prometheus, "~> 1.1"},
# Runtime: outbound WebSocket client (sync transport)
{:websockex, "~> 0.4"},
# Test tooling
{:stream_data, "~> 1.0", only: :test},
{:websockex, "~> 0.4"},
# Project tooling
{:credo, "~> 1.7", only: [:dev, :test], runtime: false},
{:ex_doc, "~> 0.34", only: :dev, runtime: false},
{:deps_changelog, "~> 0.3"},
{:igniter, "~> 0.6", only: [:dev, :test]}
{:deps_changelog, "~> 0.3", only: :dev, runtime: false},
{:igniter, "~> 0.6", only: [:dev, :test], runtime: false}
]
end
@@ -82,6 +86,17 @@ defmodule Parrhesia.MixProject do
]
end
defp description do
"Nostr event relay with WebSocket fanout, sync, and access control"
end
defp package do
[
licenses: ["BSD-2-Clause"],
links: %{"Gitea" => "https://git.teralink.net/tribes/parrhesia"}
]
end
defp docs do
[
main: "readme",
@@ -91,8 +106,7 @@ defmodule Parrhesia.MixProject do
"docs/LOCAL_API.md",
"docs/SYNC.md",
"docs/ARCH.md",
"docs/CLUSTER.md",
"BENCHMARK.md"
"docs/CLUSTER.md"
],
groups_for_modules: [
"Embedded API": [

1
nix/nostr-bench Submodule

Submodule nix/nostr-bench added at 8561b84864

View File

@@ -321,6 +321,12 @@ version = "0.21.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a4a4ddaa51a5bc52a6948f74c06d20aaaddb71924eab79b8c97a8c556e942d6a"
[[package]]
name = "bech32"
version = "0.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d86b93f97252c47b41663388e6d155714a9d0c398b99f1005cbc5f978b29f445"
[[package]]
name = "bitcoin_hashes"
version = "0.11.0"
@@ -1249,6 +1255,7 @@ version = "0.19.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "377785e61e0da6a13226a4244e8c28b9a858aa0a5ed10830109f61ade1c2a3f2"
dependencies = [
"bech32",
"bitcoin_hashes",
"getrandom",
"instant",
@@ -1262,7 +1269,7 @@ dependencies = [
[[package]]
name = "nostr-bench"
version = "0.4.0"
version = "0.5.0-parrhesia"
dependencies = [
"actix",
"actix-web",

View File

@@ -1,22 +1,19 @@
{
lib,
fetchFromGitHub,
rustPlatform,
pkgsCross,
runCommand,
staticX86_64Musl ? false,
nostrBenchSrc ? ./nostr-bench,
}: let
selectedRustPlatform =
if staticX86_64Musl
then pkgsCross.musl64.rustPlatform
else rustPlatform;
srcBase = fetchFromGitHub {
owner = "rnostr";
repo = "nostr-bench";
rev = "d3ab701512b7c871707b209ef3f934936e407963";
hash = "sha256-F2qg1veO1iNlVUKf1b/MV+vexiy4Tt+w2aikDDbp7tU=";
};
# Keep the submodule path as-is so devenv can evaluate it correctly.
# `lib.cleanSource` treats submodule contents as untracked in this context.
srcBase = nostrBenchSrc;
src =
if staticX86_64Musl
@@ -31,7 +28,7 @@ in
selectedRustPlatform.buildRustPackage (
{
pname = "nostr-bench";
version = "0.4.0";
version = "0.5.0-parrhesia";
inherit src;
@@ -46,11 +43,11 @@ in
};
}
// lib.optionalAttrs staticX86_64Musl {
cargoHash = "sha256-aL8XSBJ8sHl7CGh9SkOoI+WlAHKrdij2DfvZAWIKgKY=";
cargoHash = "sha256-098BUjDLiezoFXs7fF+w7NQM+DPPfHMo1HGx3nV2UZM=";
CARGO_BUILD_TARGET = "x86_64-unknown-linux-musl";
RUSTFLAGS = "-C target-feature=+crt-static";
}
// lib.optionalAttrs (!staticX86_64Musl) {
cargoHash = "sha256-mh9UdYhZl6JVJEeDFnY5BjfcK+PrWBBEn1Qh7/ZX17k=";
cargoHash = "sha256-x2pnxJL2nwni4cpSaUerDOfhdcTKJTyABYjPm96dAC0=";
}
)

View File

@@ -5,7 +5,7 @@ relay_url="${1:-}"
mode="${2:-all}"
if [[ -z "$relay_url" ]]; then
echo "usage: cloud-bench-client.sh <relay-url> [connect|echo|event|req|all]" >&2
echo "usage: cloud-bench-client.sh <relay-url> [connect|echo|event|req|seed|all]" >&2
exit 1
fi
@@ -43,6 +43,9 @@ run_event() {
-r "${PARRHESIA_BENCH_EVENT_RATE:-50}" \
-k "${PARRHESIA_BENCH_KEEPALIVE_SECONDS:-5}" \
-t "${bench_threads}" \
--send-strategy "${PARRHESIA_BENCH_EVENT_SEND_STRATEGY:-pipelined}" \
--inflight "${PARRHESIA_BENCH_EVENT_INFLIGHT:-32}" \
--ack-timeout "${PARRHESIA_BENCH_EVENT_ACK_TIMEOUT:-30}" \
"${relay_url}"
}
@@ -57,11 +60,33 @@ run_req() {
"${relay_url}"
}
run_seed() {
local target_accepted="${PARRHESIA_BENCH_SEED_TARGET_ACCEPTED:-}"
if [[ -z "$target_accepted" ]]; then
echo "PARRHESIA_BENCH_SEED_TARGET_ACCEPTED must be set for seed mode" >&2
exit 2
fi
echo "==> nostr-bench seed ${relay_url}"
"$bench_bin" seed --json \
--target-accepted "$target_accepted" \
-c "${PARRHESIA_BENCH_SEED_CONNECTION_COUNT:-512}" \
-r "${PARRHESIA_BENCH_SEED_CONNECTION_RATE:-512}" \
-k "${PARRHESIA_BENCH_SEED_KEEPALIVE_SECONDS:-0}" \
-t "${bench_threads}" \
--send-strategy "${PARRHESIA_BENCH_SEED_SEND_STRATEGY:-pipelined}" \
--inflight "${PARRHESIA_BENCH_SEED_INFLIGHT:-128}" \
--ack-timeout "${PARRHESIA_BENCH_SEED_ACK_TIMEOUT:-20}" \
"${relay_url}"
}
case "$mode" in
connect) run_connect ;;
echo) run_echo ;;
event) run_event ;;
req) run_req ;;
seed) run_seed ;;
all) run_connect; echo; run_echo; echo; run_event; echo; run_req ;;
*) echo "unknown mode: $mode" >&2; exit 1 ;;
esac

File diff suppressed because it is too large Load Diff

View File

@@ -10,7 +10,7 @@ export function parseNostrBenchSections(output) {
for (const lineRaw of lines) {
const line = lineRaw.trim();
const header = line.match(/^==>\s+nostr-bench\s+(connect|echo|event|req)\s+/);
const header = line.match(/^==>\s+nostr-bench\s+(connect|echo|event|req|seed)\s+/);
if (header) {
section = header[1];
continue;
@@ -20,9 +20,24 @@ export function parseNostrBenchSections(output) {
try {
const json = JSON.parse(line);
if (section) {
parsed[section] = json;
if (!section) continue;
if (section === "seed" && json?.type === "seed_final") {
parsed.seed_final = json;
continue;
}
const existing = parsed[section];
if (!existing) {
parsed[section] = json;
continue;
}
if (existing?.type === "final" && json?.type !== "final") {
continue;
}
parsed[section] = json;
} catch {
// ignore noisy non-json lines
}
@@ -43,14 +58,21 @@ export function sum(values) {
return valid.reduce((a, b) => a + b, 0);
}
export function throughputFromSection(section) {
export function throughputFromSection(section, options = {}) {
const { preferAccepted = false } = options;
const elapsedMs = Number(section?.elapsed ?? NaN);
const accepted = Number(section?.message_stats?.accepted ?? NaN);
const complete = Number(section?.message_stats?.complete ?? NaN);
const effectiveCount =
preferAccepted && Number.isFinite(accepted)
? accepted
: complete;
const totalBytes = Number(section?.message_stats?.size ?? NaN);
const cumulativeTps =
Number.isFinite(elapsedMs) && elapsedMs > 0 && Number.isFinite(complete)
? complete / (elapsedMs / 1000)
Number.isFinite(elapsedMs) && elapsedMs > 0 && Number.isFinite(effectiveCount)
? effectiveCount / (elapsedMs / 1000)
: NaN;
const cumulativeMibs =
@@ -58,7 +80,11 @@ export function throughputFromSection(section) {
? totalBytes / (1024 * 1024) / (elapsedMs / 1000)
: NaN;
const sampleTps = Number(section?.tps ?? NaN);
const sampleTps = Number(
preferAccepted
? section?.accepted_tps ?? section?.tps
: section?.tps,
);
const sampleMibs = Number(section?.size ?? NaN);
return {
@@ -67,10 +93,16 @@ export function throughputFromSection(section) {
};
}
function messageCounter(section, field) {
const value = Number(section?.message_stats?.[field]);
return Number.isFinite(value) ? value : 0;
}
export function metricFromSections(sections) {
const connect = sections?.connect?.connect_stats?.success_time || {};
const echo = throughputFromSection(sections?.echo || {});
const event = throughputFromSection(sections?.event || {});
const eventSection = sections?.event || {};
const event = throughputFromSection(eventSection, { preferAccepted: true });
const req = throughputFromSection(sections?.req || {});
return {
@@ -80,6 +112,10 @@ export function metricFromSections(sections) {
echo_mibs: echo.mibs,
event_tps: event.tps,
event_mibs: event.mibs,
event_notice: messageCounter(eventSection, "notice"),
event_auth_challenge: messageCounter(eventSection, "auth_challenge"),
event_reply_unrecognized: messageCounter(eventSection, "reply_unrecognized"),
event_ack_timeout: messageCounter(eventSection, "ack_timeout"),
req_tps: req.tps,
req_mibs: req.mibs,
};
@@ -109,6 +145,10 @@ export function summariseFlatResults(results) {
echo_mibs: sum(clientSamples.map((s) => s.echo_mibs)),
event_tps: sum(clientSamples.map((s) => s.event_tps)),
event_mibs: sum(clientSamples.map((s) => s.event_mibs)),
event_notice: sum(clientSamples.map((s) => s.event_notice)),
event_auth_challenge: sum(clientSamples.map((s) => s.event_auth_challenge)),
event_reply_unrecognized: sum(clientSamples.map((s) => s.event_reply_unrecognized)),
event_ack_timeout: sum(clientSamples.map((s) => s.event_ack_timeout)),
req_tps: sum(clientSamples.map((s) => s.req_tps)),
req_mibs: sum(clientSamples.map((s) => s.req_mibs)),
});
@@ -121,6 +161,10 @@ export function summariseFlatResults(results) {
"echo_mibs",
"event_tps",
"event_mibs",
"event_notice",
"event_auth_challenge",
"event_reply_unrecognized",
"event_ack_timeout",
"req_tps",
"req_mibs",
];
@@ -166,8 +210,8 @@ export function summarisePhasedResults(results) {
}
// Per-level req and event metrics
for (const level of ["empty", "warm", "hot"]) {
const phase = phases[level];
for (const level of ["cold", "warm", "hot"]) {
const phase = phases[level] || (level === "cold" ? phases.empty : undefined);
if (!phase) continue;
const reqClients = (phase.req?.clients || [])
@@ -184,6 +228,16 @@ export function summarisePhasedResults(results) {
if (eventClients.length > 0) {
sample[`event_${level}_tps`] = sum(eventClients.map((s) => s.event_tps));
sample[`event_${level}_mibs`] = sum(eventClients.map((s) => s.event_mibs));
sample[`event_${level}_notice`] = sum(eventClients.map((s) => s.event_notice));
sample[`event_${level}_auth_challenge`] = sum(
eventClients.map((s) => s.event_auth_challenge),
);
sample[`event_${level}_reply_unrecognized`] = sum(
eventClients.map((s) => s.event_reply_unrecognized),
);
sample[`event_${level}_ack_timeout`] = sum(
eventClients.map((s) => s.event_ack_timeout),
);
}
}
@@ -215,9 +269,17 @@ export function countEventsWritten(clientResults) {
for (const cr of clientResults) {
if (cr.status !== "ok") continue;
const eventSection = cr.sections?.event;
if (eventSection?.message_stats?.complete) {
total += Number(eventSection.message_stats.complete) || 0;
if (!eventSection?.message_stats) continue;
const accepted = Number(eventSection.message_stats.accepted);
if (Number.isFinite(accepted)) {
total += Math.max(0, accepted);
continue;
}
const complete = Number(eventSection.message_stats.complete) || 0;
const error = Number(eventSection.message_stats.error) || 0;
total += Math.max(0, complete - error);
}
return total;
}

View File

@@ -265,7 +265,7 @@ common_parrhesia_env+=( -e PARRHESIA_LIMITS_MAX_NEGENTROPY_ITEMS_PER_SESSION=100
cmd="${1:-}"
if [[ -z "$cmd" ]]; then
echo "usage: cloud-bench-server.sh <start-*|wipe-data-*|cleanup>" >&2
echo "usage: cloud-bench-server.sh <start-*|wipe-data-*|count-data-*|cleanup>" >&2
exit 1
fi
@@ -626,6 +626,14 @@ EOF
wait_port 3355 120 haven
;;
count-data-parrhesia-pg)
docker exec pg psql -U parrhesia -d parrhesia -At -c "SELECT count(*) FROM events"
;;
count-data-nostream)
docker exec nostream-db psql -U nostr_ts_relay -d nostr_ts_relay -At -c "SELECT count(*) FROM events"
;;
cleanup)
cleanup_containers
;;

View File

@@ -27,7 +27,7 @@ Examples:
just e2e marmot
just e2e node-sync
just bench compare
just bench cloud --clients 3 --runs 3
just bench cloud --clients 3
EOF
exit 0
fi
@@ -77,7 +77,7 @@ Examples:
just bench collect
just bench update --machine all
just bench at v0.5.0
just bench cloud --clients 3 --runs 3
just bench cloud --clients 3
just bench cloud --targets parrhesia-pg,nostream,haven --nostream-ref main
EOF
exit 0

View File

@@ -18,7 +18,6 @@ Behavior:
- Adds smoke defaults when --quick is set (unless already provided):
--server-type cx23
--client-type cx23
--runs 1
--clients 1
--connect-count 20
--connect-rate 20
@@ -42,7 +41,7 @@ Everything else is passed through unchanged.
Examples:
just bench cloud
just bench cloud --quick
just bench cloud --clients 2 --runs 1 --targets parrhesia-memory
just bench cloud --clients 2 --targets parrhesia-memory
just bench cloud --image ghcr.io/owner/parrhesia:latest --threads 4
just bench cloud --no-monitoring
just bench cloud --yes --datacenter auto
@@ -106,7 +105,6 @@ done
if [[ "$QUICK" == "1" ]]; then
add_default_if_missing "--server-type" "cx23"
add_default_if_missing "--client-type" "cx23"
add_default_if_missing "--runs" "1"
add_default_if_missing "--clients" "1"
add_default_if_missing "--connect-count" "20"

View File

@@ -207,7 +207,7 @@ const presentBaselines = [
...[...discoveredBaselines].filter((srv) => !preferredBaselineOrder.includes(srv)).sort((a, b) => a.localeCompare(b)),
];
// --- Colour palette per server: [empty, warm, hot] ---
// --- Colour palette per server: [cold, warm, hot] ---
const serverColours = {
"parrhesia-pg": ["#93c5fd", "#3b82f6", "#1e40af"],
"parrhesia-memory": ["#86efac", "#22c55e", "#166534"],
@@ -218,12 +218,12 @@ const serverColours = {
};
const levelStyles = [
/* empty */ { dt: 3, pt: 6, ps: 0.7, lw: 1.5 },
/* warm */ { dt: 2, pt: 8, ps: 0.8, lw: 1.5 },
/* hot */ { dt: 1, pt: 7, ps: 1.0, lw: 2 },
/* cold */ { dt: 3, pt: 6, ps: 0.7, lw: 1.5 },
/* warm */ { dt: 2, pt: 8, ps: 0.8, lw: 1.5 },
/* hot */ { dt: 1, pt: 7, ps: 1.0, lw: 2 },
];
const levels = ["empty", "warm", "hot"];
const levels = ["cold", "warm", "hot"];
const shortLabel = {
"parrhesia-pg": "pg", "parrhesia-memory": "mem",
@@ -233,19 +233,31 @@ const shortLabel = {
const allServers = ["parrhesia-pg", "parrhesia-memory", ...presentBaselines];
function isPhased(e) {
for (const srv of Object.values(e.servers || {})) {
if (srv.event_empty_tps !== undefined) return true;
}
return false;
}
// Build phased key: "event_tps" + "empty" → "event_empty_tps"
function phasedKey(base, level) {
const idx = base.lastIndexOf("_");
return `${base.slice(0, idx)}_${level}_${base.slice(idx + 1)}`;
}
function phasedValue(d, base, level) {
const direct = d?.[phasedKey(base, level)];
if (direct !== undefined) return direct;
if (level === "cold") {
// Backward compatibility for historical entries written with `empty` phase names.
const legacy = d?.[phasedKey(base, "empty")];
if (legacy !== undefined) return legacy;
}
return undefined;
}
function isPhased(e) {
for (const srv of Object.values(e.servers || {})) {
if (phasedValue(srv, "event_tps", "cold") !== undefined) return true;
}
return false;
}
// --- Emit linetype definitions (server × level) ---
const plotLines = [];
for (let si = 0; si < allServers.length; si++) {
@@ -297,7 +309,7 @@ for (const panel of panels) {
plotLines.push("");
} else {
// Three columns per server (empty, warm, hot)
// Three columns per server (cold, warm, hot)
const header = ["tag"];
for (const srv of allServers) {
const sl = shortLabel[srv] || srv;
@@ -311,7 +323,7 @@ for (const panel of panels) {
const d = e.servers?.[srv];
if (!d) { row.push("NaN", "NaN", "NaN"); continue; }
if (phased) {
for (const lvl of levels) row.push(d[phasedKey(panel.base, lvl)] ?? "NaN");
for (const lvl of levels) row.push(phasedValue(d, panel.base, lvl) ?? "NaN");
} else {
row.push("NaN", d[panel.base] ?? "NaN", "NaN"); // flat → warm only
}
@@ -320,7 +332,7 @@ for (const panel of panels) {
}
fs.writeFileSync(path.join(workDir, panel.file), rows.join("\n") + "\n", "utf8");
// Plot: three series per server (empty/warm/hot)
// Plot: three series per server (cold/warm/hot)
const dataFile = `data_dir."/${panel.file}"`;
plotLines.push(`set title "${panel.label}"`);
plotLines.push(`set ylabel "${panel.ylabel}"`);
@@ -395,7 +407,7 @@ if (!pg || !mem) {
}
// Detect phased entries — use hot fill level as headline metric
const phased = pg.event_empty_tps !== undefined;
const phased = pg.event_cold_tps !== undefined || pg.event_empty_tps !== undefined;
// For phased entries, resolve "event_tps" → "event_hot_tps" etc.
function resolveKey(key) {

View File

@@ -41,11 +41,14 @@ defmodule Parrhesia.API.ACLTest do
authenticated_pubkey = String.duplicate("b", 64)
assert {:error, :auth_required} =
ACL.check(:sync_read, filter, context: %RequestContext{})
ACL.check(:sync_read, filter, context: %RequestContext{caller: :websocket})
assert {:error, :sync_read_not_allowed} =
ACL.check(:sync_read, filter,
context: %RequestContext{authenticated_pubkeys: MapSet.new([authenticated_pubkey])}
context: %RequestContext{
caller: :websocket,
authenticated_pubkeys: MapSet.new([authenticated_pubkey])
}
)
assert :ok =
@@ -58,7 +61,10 @@ defmodule Parrhesia.API.ACLTest do
assert :ok =
ACL.check(:sync_read, filter,
context: %RequestContext{authenticated_pubkeys: MapSet.new([authenticated_pubkey])}
context: %RequestContext{
caller: :websocket,
authenticated_pubkeys: MapSet.new([authenticated_pubkey])
}
)
end
@@ -75,7 +81,38 @@ defmodule Parrhesia.API.ACLTest do
assert {:error, :sync_read_not_allowed} =
ACL.check(:sync_read, %{"kinds" => [5000]},
context: %RequestContext{authenticated_pubkeys: MapSet.new([principal])}
context: %RequestContext{
caller: :websocket,
authenticated_pubkeys: MapSet.new([principal])
}
)
end
test "check/3 bypasses protected sync ACL for local callers" do
protected_filter = %{"kinds" => [5000], "#r" => ["tribes.accounts.user"]}
assert :ok =
ACL.check(:sync_read, %{"ids" => [String.duplicate("d", 64)]},
context: %RequestContext{caller: :local}
)
assert :ok =
ACL.check(
:sync_write,
%{
"id" => String.duplicate("e", 64),
"kind" => 5000,
"tags" => [["r", "tribes.accounts.user"]]
},
context: %RequestContext{caller: :local}
)
assert {:error, :sync_read_not_allowed} =
ACL.check(:sync_read, protected_filter,
context: %RequestContext{
caller: :websocket,
authenticated_pubkeys: MapSet.new([String.duplicate("f", 64)])
}
)
end
end

View File

@@ -30,6 +30,45 @@ defmodule Parrhesia.API.EventsTest do
assert second_result.message == "duplicate: event already stored"
end
test "publish fanout includes sync-originated events when relay guard is disabled" do
with_sync_relay_guard(false)
join_multi_node_group!()
event = valid_event()
event_id = event["id"]
assert {:ok, %{accepted: true}} =
Events.publish(event, context: %RequestContext{caller: :sync})
assert_receive {:remote_fanout_event, %{"id" => ^event_id}}, 200
end
test "publish fanout skips sync-originated events when relay guard is enabled" do
with_sync_relay_guard(true)
join_multi_node_group!()
event = valid_event()
event_id = event["id"]
assert {:ok, %{accepted: true}} =
Events.publish(event, context: %RequestContext{caller: :sync})
refute_receive {:remote_fanout_event, %{"id" => ^event_id}}, 200
end
test "publish fanout still includes local-originated events when relay guard is enabled" do
with_sync_relay_guard(true)
join_multi_node_group!()
event = valid_event()
event_id = event["id"]
assert {:ok, %{accepted: true}} =
Events.publish(event, context: %RequestContext{caller: :local})
assert_receive {:remote_fanout_event, %{"id" => ^event_id}}, 200
end
test "query and count preserve read semantics through the shared API" do
now = System.system_time(:second)
first = valid_event(%{"content" => "first", "created_at" => now})
@@ -53,6 +92,67 @@ defmodule Parrhesia.API.EventsTest do
)
end
test "local query can read protected sync events without ACL grants or kind scoping" do
previous_acl = Application.get_env(:parrhesia, :acl, [])
Application.put_env(
:parrhesia,
:acl,
protected_filters: [%{"kinds" => [5000], "#r" => ["tribes.accounts.user"]}]
)
on_exit(fn ->
Application.put_env(:parrhesia, :acl, previous_acl)
end)
protected_event =
valid_event(%{
"kind" => 5000,
"tags" => [["r", "tribes.accounts.user"]],
"content" => "protected"
})
assert {:ok, %{accepted: true}} =
Events.publish(protected_event, context: %RequestContext{caller: :local})
assert {:ok, [stored_event]} =
Events.query([%{"ids" => [protected_event["id"]]}],
context: %RequestContext{caller: :local}
)
assert stored_event["id"] == protected_event["id"]
end
defp with_sync_relay_guard(enabled?) when is_boolean(enabled?) do
[{:config, previous}] = :ets.lookup(Parrhesia.Config, :config)
sync =
previous
|> Map.get(:sync, [])
|> Keyword.put(:relay_guard, enabled?)
:ets.insert(Parrhesia.Config, {:config, Map.put(previous, :sync, sync)})
on_exit(fn ->
:ets.insert(Parrhesia.Config, {:config, previous})
end)
end
defp join_multi_node_group! do
case Process.whereis(:pg) do
nil ->
case :pg.start_link() do
{:ok, _pid} -> :ok
{:error, {:already_started, _pid}} -> :ok
end
_pid ->
:ok
end
:ok = :pg.join(Parrhesia.Fanout.MultiNode, self())
end
defp valid_event(overrides \\ %{}) do
base_event = %{
"pubkey" => String.duplicate("1", 64),

View File

@@ -22,6 +22,7 @@ defmodule Parrhesia.ConfigTest do
assert Parrhesia.Config.get([:limits, :max_filter_limit]) == 500
assert Parrhesia.Config.get([:database, :separate_read_pool?]) == false
assert Parrhesia.Config.get([:relay_url]) == "ws://localhost:4413/relay"
assert Parrhesia.Config.get([:sync, :relay_guard]) == false
assert Parrhesia.Config.get([:policies, :auth_required_for_writes]) == false
assert Parrhesia.Config.get([:policies, :marmot_media_max_imeta_tags_per_event]) == 8
assert Parrhesia.Config.get([:policies, :marmot_media_reject_mip04_v1]) == true