From 889d630c12ff76df4ab6f03d862c805ff70366d7 Mon Sep 17 00:00:00 2001 From: Steffen Beyer Date: Sat, 14 Mar 2026 18:09:53 +0100 Subject: [PATCH] Add monthly partition maintenance and retention pruning --- README.md | 12 + config/config.exs | 7 + config/runtime.exs | 46 ++ docs/CLUSTER.md | 234 ++++++++++ docs/MARMOT_OPERATIONS.md | 69 --- docs/slop/LOCAL_API.md | 398 ++++++++++++++++++ lib/parrhesia/storage/archiver.ex | 169 +++++++- .../tasks/partition_retention_worker.ex | 280 ++++++++++++ lib/parrhesia/tasks/supervisor.ex | 21 +- .../partition_retention_stub_archiver.ex | 52 +++ test/parrhesia/storage/archiver_test.exs | 23 + .../tasks/partition_retention_worker_test.exs | 124 ++++++ 12 files changed, 1359 insertions(+), 76 deletions(-) create mode 100644 docs/CLUSTER.md delete mode 100644 docs/MARMOT_OPERATIONS.md create mode 100644 docs/slop/LOCAL_API.md create mode 100644 lib/parrhesia/tasks/partition_retention_worker.ex create mode 100644 lib/parrhesia/test_support/partition_retention_stub_archiver.ex create mode 100644 test/parrhesia/tasks/partition_retention_worker_test.exs diff --git a/README.md b/README.md index e80110b..263f35d 100644 --- a/README.md +++ b/README.md @@ -107,6 +107,7 @@ For runtime overrides, use the `PARRHESIA_...` prefix: - `PARRHESIA_LIMITS_*` - `PARRHESIA_POLICIES_*` - `PARRHESIA_METRICS_*` +- `PARRHESIA_RETENTION_*` - `PARRHESIA_FEATURES_*` - `PARRHESIA_METRICS_ENDPOINT_*` @@ -135,6 +136,7 @@ CSV env vars use comma-separated values. Boolean env vars accept `1/0`, `true/fa | `:limits` | `PARRHESIA_LIMITS_*` | see table below | Runtime override group | | `:policies` | `PARRHESIA_POLICIES_*` | see table below | Runtime override group | | `:metrics` | `PARRHESIA_METRICS_*` | see table below | Runtime override group | +| `:retention` | `PARRHESIA_RETENTION_*` | see table below | Partition lifecycle and pruning policy | | `:features` | `PARRHESIA_FEATURES_*` | see table below | Runtime override group | | `:storage.events` | `-` | `Parrhesia.Storage.Adapters.Postgres.Events` | Config-file override only | | `:storage.moderation` | `-` | `Parrhesia.Storage.Adapters.Postgres.Moderation` | Config-file override only | @@ -223,6 +225,16 @@ CSV env vars use comma-separated values. Boolean env vars accept `1/0`, `true/fa | `:allowed_cidrs` | `PARRHESIA_METRICS_ALLOWED_CIDRS` | `[]` | | `:auth_token` | `PARRHESIA_METRICS_AUTH_TOKEN` | `nil` | +#### `:retention` + +| Atom key | ENV | Default | Notes | +| --- | --- | --- | --- | +| `:check_interval_hours` | `PARRHESIA_RETENTION_CHECK_INTERVAL_HOURS` | `24` | Partition maintenance + pruning cadence | +| `:months_ahead` | `PARRHESIA_RETENTION_MONTHS_AHEAD` | `2` | Pre-create current month plus N future monthly partitions | +| `:max_db_bytes` | `PARRHESIA_RETENTION_MAX_DB_BYTES` | `:infinity` | Interpreted as GiB threshold; accepts integer or `infinity` | +| `:max_months_to_keep` | `PARRHESIA_RETENTION_MAX_MONTHS_TO_KEEP` | `:infinity` | Keep at most N months (including current month); accepts integer or `infinity` | +| `:max_partitions_to_drop_per_run` | `PARRHESIA_RETENTION_MAX_PARTITIONS_TO_DROP_PER_RUN` | `1` | Safety cap for each maintenance run | + #### `:features` | Atom key | ENV | Default | diff --git a/config/config.exs b/config/config.exs index 6cd278b..8950b97 100644 --- a/config/config.exs +++ b/config/config.exs @@ -54,6 +54,13 @@ config :parrhesia, allowed_cidrs: [], auth_token: nil ], + retention: [ + check_interval_hours: 24, + months_ahead: 2, + max_db_bytes: :infinity, + max_months_to_keep: :infinity, + max_partitions_to_drop_per_run: 1 + ], features: [ verify_event_signatures: true, nip_45_count: true, diff --git a/config/runtime.exs b/config/runtime.exs index 2d36789..54d31bf 100644 --- a/config/runtime.exs +++ b/config/runtime.exs @@ -48,6 +48,22 @@ csv_env = fn name, default -> end end +infinity_or_int_env = fn name, default -> + case System.get_env(name) do + nil -> + default + + value -> + normalized = value |> String.trim() |> String.downcase() + + if normalized == "infinity" do + :infinity + else + String.to_integer(value) + end + end +end + outbound_overflow_strategy_env = fn name, default -> case System.get_env(name) do nil -> @@ -106,6 +122,7 @@ if config_env() == :prod do limits_defaults = Application.get_env(:parrhesia, :limits, []) policies_defaults = Application.get_env(:parrhesia, :policies, []) metrics_defaults = Application.get_env(:parrhesia, :metrics, []) + retention_defaults = Application.get_env(:parrhesia, :retention, []) features_defaults = Application.get_env(:parrhesia, :features, []) metrics_endpoint_defaults = Application.get_env(:parrhesia, Parrhesia.Web.MetricsEndpoint, []) @@ -341,6 +358,34 @@ if config_env() == :prod do ) ] + retention = [ + check_interval_hours: + int_env.( + "PARRHESIA_RETENTION_CHECK_INTERVAL_HOURS", + Keyword.get(retention_defaults, :check_interval_hours, 24) + ), + months_ahead: + int_env.( + "PARRHESIA_RETENTION_MONTHS_AHEAD", + Keyword.get(retention_defaults, :months_ahead, 2) + ), + max_db_bytes: + infinity_or_int_env.( + "PARRHESIA_RETENTION_MAX_DB_BYTES", + Keyword.get(retention_defaults, :max_db_bytes, :infinity) + ), + max_months_to_keep: + infinity_or_int_env.( + "PARRHESIA_RETENTION_MAX_MONTHS_TO_KEEP", + Keyword.get(retention_defaults, :max_months_to_keep, :infinity) + ), + max_partitions_to_drop_per_run: + int_env.( + "PARRHESIA_RETENTION_MAX_PARTITIONS_TO_DROP_PER_RUN", + Keyword.get(retention_defaults, :max_partitions_to_drop_per_run, 1) + ) + ] + features = [ verify_event_signatures: bool_env.( @@ -403,6 +448,7 @@ if config_env() == :prod do limits: limits, policies: policies, metrics: metrics, + retention: retention, features: features case System.get_env("PARRHESIA_EXTRA_CONFIG") do diff --git a/docs/CLUSTER.md b/docs/CLUSTER.md new file mode 100644 index 0000000..b34af78 --- /dev/null +++ b/docs/CLUSTER.md @@ -0,0 +1,234 @@ +# Parrhesia clustering and distributed fanout + +This document describes: + +1. the **current** distributed fanout behavior implemented today, and +2. a practical evolution path to a more production-grade clustered relay. + +--- + +## 1) Current state (implemented today) + +### 1.1 What exists right now + +Parrhesia currently includes a lightweight multi-node live fanout path (untested!): + +- `Parrhesia.Fanout.MultiNode` (`lib/parrhesia/fanout/multi_node.ex`) + - GenServer that joins a `:pg` process group. + - Receives locally-published events and forwards them to other group members. + - Receives remote events and performs local fanout lookup. +- `Parrhesia.Web.Connection` (`lib/parrhesia/web/connection.ex`) + - On successful ingest, after ACK scheduling, it does: + 1. local fanout (`fanout_event/1`), then + 2. cross-node publish (`maybe_publish_multi_node/1`). +- `Parrhesia.Subscriptions.Supervisor` (`lib/parrhesia/subscriptions/supervisor.ex`) + - Starts `Parrhesia.Fanout.MultiNode` unconditionally. + +In other words: **if BEAM nodes are connected, live events are fanned out cross-node**. + +### 1.2 What is not included yet + +- No automatic cluster formation/discovery (no `libcluster`, DNS polling, gossip, etc.). +- No durable inter-node event transport. +- No replay/recovery of missed cross-node live events. +- No explicit per-node delivery ACK between relay nodes. + +--- + +## 2) Current runtime behavior in detail + +### 2.1 Local ingest flow and publish ordering + +For an accepted event in `Parrhesia.Web.Connection`: + +1. validate/policy/persist path runs. +2. Client receives `OK` reply. +3. A post-ACK message triggers: + - local fanout (`Index.candidate_subscription_keys/1` + send `{:fanout_event, ...}`), + - multi-node publish (`MultiNode.publish/1`). + +Important semantics: + +- Regular persisted events: ACK implies DB persistence succeeded. +- Ephemeral events: ACK implies accepted by policy, but no DB durability. +- Cross-node fanout happens **after** ACK path is scheduled. + +### 2.2 Multi-node transport mechanics + +`Parrhesia.Fanout.MultiNode` uses `:pg` membership: + +- On init: + - ensures `:pg` is started, + - joins group `Parrhesia.Fanout.MultiNode`. +- On publish: + - gets all group members, + - excludes itself, + - sends `{:remote_fanout_event, event}` to each member pid. +- On remote receive: + - runs local subscription candidate narrowing via `Parrhesia.Subscriptions.Index`, + - forwards matching candidates to local connection owners as `{:fanout_event, sub_id, event}`. + +No republish on remote receive, so this path does not create fanout loops. + +### 2.3 Subscription index locality + +The subscription index is local ETS state per node (`Parrhesia.Subscriptions.Index`). + +- Each node only tracks subscriptions of its local websocket processes. +- Each node independently decides which local subscribers match a remote event. +- There is no global cross-node subscription registry. + +### 2.4 Delivery model and guarantees (current) + +Current model is **best-effort live propagation** among connected nodes. + +- If nodes are connected and healthy, remote live subscribers should receive events quickly. +- If there is a netsplit or temporary disconnection: + - remote live subscribers may miss events, + - persisted events can still be recovered by normal `REQ`/history query, + - ephemeral events are not recoverable. + +### 2.5 Cluster preconditions + +For cross-node fanout to work, operators must provide distributed BEAM connectivity: + +- consistent Erlang cookie, +- named nodes (`--name`/`--sname`), +- network reachability for Erlang distribution ports, +- explicit node connections (or external discovery tooling). + +Parrhesia currently does not automate these steps. + +--- + +## 3) Operational characteristics of current design + +### 3.1 Performance shape + +For each accepted event on one node: + +- one local fanout lookup + local sends, +- one cluster publish that sends to `N - 1` remote bus members, +- on each remote node: one local fanout lookup + local sends. + +So inter-node traffic scales roughly linearly with node count per event (full-cluster broadcast). + +This is simple and low-latency for small-to-medium clusters, but can become expensive as node count grows. + +### 3.2 Failure behavior + +- Remote node down: send attempts to that member stop once membership updates; no replay. +- Netsplit: live propagation gap during split. +- Recovery: local clients can catch up via DB-backed queries (except ephemeral kinds). + +### 3.3 Consistency expectations + +- No global total-ordering guarantee for live delivery across nodes. +- Per-connection ordering is preserved by each connection process queue/drain behavior. +- Duplicate suppression for ingestion uses storage semantics (`duplicate_event`), but transport itself is not exactly-once. + +### 3.4 Observability today + +Relevant metrics exist for fanout/queue pressure (see `Parrhesia.Telemetry`), e.g.: + +- `parrhesia.fanout.duration.ms` +- `parrhesia.connection.outbound_queue.depth` +- `parrhesia.connection.outbound_queue.pressure` +- `parrhesia.connection.outbound_queue.overflow.count` + +These are useful but do not yet fully separate local-vs-remote fanout pipeline stages. + +--- + +## 4) Practical extension path to a fully-fledged clustered system + +A realistic path is incremental. Suggested phases: + +### Phase A — hardened BEAM cluster control plane + +1. Add cluster discovery/formation (e.g. `libcluster`) with environment-specific topology: + - Kubernetes DNS, + - static nodes, + - cloud VM discovery. +2. Add clear node liveness/partition telemetry and alerts. +3. Provide operator docs for cookie, node naming, and network requirements. + +Outcome: simpler and safer cluster operations, same data plane semantics. + +### Phase B — resilient distributed fanout data plane + +Introduce a durable fanout stream for persisted events. + +Recommended pattern: + +1. On successful DB commit of event, append to a monotonic fanout log (or use DB sequence-based stream view). +2. Each relay node runs a consumer with a stored cursor. +3. On restart/partition recovery, node resumes from cursor and replays missed events. +4. Local fanout remains same (subscription index + per-connection queues). + +Semantics target: + +- **at-least-once** node-to-node propagation, +- replay after downtime, +- idempotent handling keyed by event id. + +Notes: + +- Ephemeral events can remain best-effort (or have a separate short-lived transport), since no storage source exists for replay. + +### Phase C — scale and efficiency improvements + +As cluster size grows, avoid naive full broadcast where possible: + +1. Optional node-level subscription summaries (coarse bloom/bitset or keyed summaries) to reduce unnecessary remote sends. +2. Shard fanout workers for CPU locality and mailbox control. +3. Batch remote delivery payloads. +4. Separate traffic classes (e.g. Marmot-heavy streams vs generic) with independent queues. + +Outcome: higher throughput per node and lower inter-node amplification. + +### Phase D — stronger observability and SLOs + +Add explicit distributed pipeline metrics: + +- publish enqueue/dequeue latency, +- cross-node delivery lag (commit -> remote fanout enqueue), +- replay backlog depth, +- per-node dropped/expired transport messages, +- partition detection counters. + +Define cluster SLO examples: + +- p95 commit->remote-live enqueue under nominal load, +- max replay catch-up time after node restart, +- bounded message loss for best-effort channels. + +--- + +## 5) How a fully-fledged system would behave in practice + +With Phases A-D implemented, expected behavior: + +- **Normal operation:** + - low-latency local fanout, + - remote nodes receive events via stream consumers quickly, + - consistent operational visibility of end-to-end lag. +- **Node restart:** + - node reconnects and replays from stored cursor, + - local subscribers begin receiving new + missed persisted events. +- **Transient partition:** + - live best-effort path may degrade, + - persisted events converge after partition heals via replay. +- **High fanout bursts:** + - batching + sharding keeps queue pressure bounded, + - overflow policies remain connection-local and measurable. + +This approach gives a good trade-off between Nostr relay latency and distributed robustness without requiring strict exactly-once semantics. + +--- + +## 6) Current status summary + +Today, Parrhesia already supports **lightweight distributed live fanout** when BEAM nodes are connected. + +It is intentionally simple and fast for smaller clusters, and provides a solid base for a more durable, observable cluster architecture as relay scale and availability requirements grow. diff --git a/docs/MARMOT_OPERATIONS.md b/docs/MARMOT_OPERATIONS.md deleted file mode 100644 index 1b37c2b..0000000 --- a/docs/MARMOT_OPERATIONS.md +++ /dev/null @@ -1,69 +0,0 @@ -# Marmot operations guide (relay operator tuning) - -This document captures practical limits and operational defaults for Marmot-heavy traffic (`443`, `445`, `10051`, wrapped `1059`, optional media/push flows). - -## 1) Recommended baseline limits - -Use these as a starting point and tune from production telemetry. - -```elixir -config :parrhesia, - limits: [ - max_filter_limit: 500, - max_filters_per_req: 16, - max_outbound_queue: 256, - outbound_drain_batch_size: 64 - ], - policies: [ - # Marmot group routing/query guards - marmot_require_h_for_group_queries: true, - marmot_group_max_h_values_per_filter: 32, - marmot_group_max_query_window_seconds: 2_592_000, - - # Kind 445 retention - mls_group_event_ttl_seconds: 300, - - # MIP-04 metadata controls - marmot_media_max_imeta_tags_per_event: 8, - marmot_media_max_field_value_bytes: 1024, - marmot_media_max_url_bytes: 2048, - marmot_media_allowed_mime_prefixes: [], - marmot_media_reject_mip04_v1: true, - - # MIP-05 push controls (optional) - marmot_push_server_pubkeys: [], - marmot_push_max_relay_tags: 16, - marmot_push_max_payload_bytes: 65_536, - marmot_push_max_trigger_age_seconds: 120, - marmot_push_require_expiration: true, - marmot_push_max_expiration_window_seconds: 120, - marmot_push_max_server_recipients: 1 - ] -``` - -## 2) Index expectations for Marmot workloads - -The Postgres adapter relies on dedicated partial tag indexes for hot Marmot selectors: - -- `event_tags_h_value_created_at_idx` for `#h` group routing -- `event_tags_i_value_created_at_idx` for `#i` keypackage reference lookups - -Query-plan regression tests assert these paths remain usable for heavy workloads. - -## 3) Telemetry to watch - -Key metrics for Marmot traffic and pressure: - -- `parrhesia.ingest.duration.ms{traffic_class="marmot|generic"}` -- `parrhesia.query.duration.ms{traffic_class="marmot|generic"}` -- `parrhesia.fanout.duration.ms{traffic_class="marmot|generic"}` -- `parrhesia.connection.outbound_queue.depth{traffic_class=...}` -- `parrhesia.connection.outbound_queue.pressure{traffic_class=...}` -- `parrhesia.connection.outbound_queue.pressure_events.count{traffic_class=...}` -- `parrhesia.connection.outbound_queue.overflow.count{traffic_class=...}` - -Operational target: keep queue pressure below sustained 0.75 and avoid overflow spikes during `445` bursts. - -## 4) Fault and recovery expectations - -During storage outages, Marmot group-flow writes must fail with explicit `OK false` errors. After recovery, reordered group events should still query deterministically by `created_at DESC, id ASC`. diff --git a/docs/slop/LOCAL_API.md b/docs/slop/LOCAL_API.md new file mode 100644 index 0000000..d245d06 --- /dev/null +++ b/docs/slop/LOCAL_API.md @@ -0,0 +1,398 @@ +# Parrhesia Shared API + Local API Design (Option 1) + +## 1) Goal + +Expose a stable in-process API for embedding apps **and** refactor server transports to consume the same API. + +Desired end state: + +- WebSocket server, HTTP management, and embedding app all call one shared core API. +- Transport layers (WS/HTTP/local) only do framing, auth header extraction, and response encoding. +- Policy/storage/fanout/business semantics live in one place. + +This keeps everything in the same dependency (`:parrhesia`) and avoids a second package. + +--- + +## 2) Key architectural decision + +Previous direction: `Parrhesia.Local.*` as primary public API. + +Updated direction (this doc): + +- Introduce **shared core API modules** under `Parrhesia.API.*`. +- Make server code (`Parrhesia.Web.Connection`, management handlers) delegate to `Parrhesia.API.*`. +- Keep `Parrhesia.Local.*` as optional convenience wrappers over `Parrhesia.API.*`. + +This ensures no divergence between local embedding behavior and websocket behavior. + +--- + +## 3) Layered design + +```text +Transport layer + - Parrhesia.Web.Connection (WS) + - Parrhesia.Web.Management (HTTP) + - Parrhesia.Local.* wrappers (in-process) + +Shared API layer + - Parrhesia.API.Auth + - Parrhesia.API.Events + - Parrhesia.API.Stream (optional) + - Parrhesia.API.Admin (optional, for management methods) + +Domain/runtime dependencies + - Parrhesia.Policy.EventPolicy + - Parrhesia.Storage.* adapters + - Parrhesia.Groups.Flow + - Parrhesia.Subscriptions.Index + - Parrhesia.Fanout.MultiNode + - Parrhesia.Telemetry +``` + +Rule: all ingest/query/count decisions happen in `Parrhesia.API.Events`. + +--- + +## 4) Public module plan + +## 4.1 `Parrhesia.API.Auth` + +Purpose: +- event validation helpers +- NIP-98 verification +- optional embedding account resolution hook + +Proposed functions: + +```elixir +@spec validate_event(map()) :: :ok | {:error, term()} +@spec compute_event_id(map()) :: String.t() + +@spec validate_nip98(String.t() | nil, String.t(), String.t()) :: + {:ok, Parrhesia.API.Auth.Context.t()} | {:error, term()} + +@spec validate_nip98(String.t() | nil, String.t(), String.t(), keyword()) :: + {:ok, Parrhesia.API.Auth.Context.t()} | {:error, term()} +``` + +`validate_nip98/4` options: + +```elixir +account_resolver: (pubkey_hex :: String.t(), auth_event :: map() -> + {:ok, account :: term()} | {:error, term()}) +``` + +Context struct: + +```elixir +defmodule Parrhesia.API.Auth.Context do + @enforce_keys [:pubkey, :auth_event] + defstruct [:pubkey, :auth_event, :account, claims: %{}] +end +``` + +--- + +## 4.2 `Parrhesia.API.Events` + +Purpose: +- canonical ingress/query/count API used by WS + local + HTTP integrations. + +Proposed functions: + +```elixir +@spec publish(map(), keyword()) :: {:ok, Parrhesia.API.Events.PublishResult.t()} | {:error, term()} +@spec query([map()], keyword()) :: {:ok, [map()]} | {:error, term()} +@spec count([map()], keyword()) :: {:ok, non_neg_integer() | map()} | {:error, term()} +``` + +Request context: + +```elixir +defmodule Parrhesia.API.RequestContext do + defstruct authenticated_pubkeys: MapSet.new(), + actor: nil, + metadata: %{} +end +``` + +Publish result: + +```elixir +defmodule Parrhesia.API.Events.PublishResult do + @enforce_keys [:event_id, :accepted, :message] + defstruct [:event_id, :accepted, :message] +end +``` + +### Publish semantics (must match websocket EVENT) + +Pipeline in `publish/2`: + +1. frame/event size limits +2. `Parrhesia.Protocol.validate_event/1` +3. `Parrhesia.Policy.EventPolicy.authorize_write/2` +4. group handling (`Parrhesia.Groups.Flow.handle_event/1`) +5. persistence path (`put_event`, deletion, vanish, ephemeral rules) +6. fanout (local + multi-node) +7. telemetry emit + +Return shape mirrors Nostr `OK` semantics: + +```elixir +{:ok, %PublishResult{event_id: id, accepted: true, message: "ok: event stored"}} +{:ok, %PublishResult{event_id: id, accepted: false, message: "blocked: ..."}} +``` + +### Query/count semantics (must match websocket REQ/COUNT) + +`query/2` and `count/2`: + +1. validate filters +2. run read policy (`EventPolicy.authorize_read/2`) +3. call storage with `requester_pubkeys` from context +4. return ordered events/count payload + +Giftwrap restrictions (`kind 1059`) must remain identical to websocket behavior. + +--- + +## 4.3 `Parrhesia.API.Stream` (optional but recommended) + +Purpose: +- local in-process subscriptions using same subscription index/fanout model. + +Proposed functions: + +```elixir +@spec subscribe(pid(), String.t(), [map()], keyword()) :: {:ok, reference()} | {:error, term()} +@spec unsubscribe(reference()) :: :ok +``` + +Subscriber contract: + +```elixir +{:parrhesia, :event, ref, subscription_id, event} +{:parrhesia, :eose, ref, subscription_id} +{:parrhesia, :closed, ref, subscription_id, reason} +``` + +--- + +## 4.4 `Parrhesia.Local.*` wrappers + +`Parrhesia.Local.*` remain as convenience API for embedding apps, implemented as thin wrappers: + +- `Parrhesia.Local.Auth` -> delegates to `Parrhesia.API.Auth` +- `Parrhesia.Local.Events` -> delegates to `Parrhesia.API.Events` +- `Parrhesia.Local.Stream` -> delegates to `Parrhesia.API.Stream` +- `Parrhesia.Local.Client` -> use-case helpers (posts + private messages) + +No business logic in wrappers. + +--- + +## 5) Server integration plan (critical) + +## 5.1 WebSocket (`Parrhesia.Web.Connection`) + +After decode: +- `EVENT` -> `Parrhesia.API.Events.publish/2` +- `REQ` -> `Parrhesia.API.Events.query/2` +- `COUNT` -> `Parrhesia.API.Events.count/2` +- `AUTH` keep transport-specific challenge/session flow, but can use `API.Auth.validate_event/1` internally + +WebSocket keeps responsibility for: +- websocket framing +- subscription lifecycle per connection +- AUTH challenge rotation protocol frames + +## 5.2 HTTP management (`Parrhesia.Web.Management`) + +- NIP-98 header validation via `Parrhesia.API.Auth.validate_nip98/3` +- command execution via `Parrhesia.API.Admin` (or existing storage admin adapter via API facade) + +--- + +## 6) High-level client helpers for embedding app use case + +These helpers are optional and live in `Parrhesia.Local.Client`. + +## 6.1 Public posts + +```elixir +@spec publish_post(Parrhesia.API.Auth.Context.t(), String.t(), keyword()) :: + {:ok, Parrhesia.API.Events.PublishResult.t()} | {:error, term()} + +@spec list_posts(keyword()) :: {:ok, [map()]} | {:error, term()} +@spec stream_posts(pid(), keyword()) :: {:ok, reference()} | {:error, term()} +``` + +`publish_post/3` options: +- `:tags` +- `:created_at` +- `:signer` callback (required unless fully signed event provided) + +Signer contract: + +```elixir +(unsigned_event_map -> {:ok, signed_event_map} | {:error, term()}) +``` + +Parrhesia does not store or manage private keys. + +## 6.2 Private messages (giftwrap kind 1059) + +```elixir +@spec send_private_message( + Parrhesia.API.Auth.Context.t(), + recipient_pubkey :: String.t(), + encrypted_payload :: String.t(), + keyword() +) :: {:ok, Parrhesia.API.Events.PublishResult.t()} | {:error, term()} + +@spec inbox(Parrhesia.API.Auth.Context.t(), keyword()) :: {:ok, [map()]} | {:error, term()} +@spec stream_inbox(pid(), Parrhesia.API.Auth.Context.t(), keyword()) :: {:ok, reference()} | {:error, term()} +``` + +Behavior: +- `send_private_message/4` builds event template with kind `1059` and `p` tag. +- host signer signs template. +- publish through `API.Events.publish/2`. +- `inbox/2` queries `%{"kinds" => [1059], "#p" => [auth.pubkey]}` with authenticated context. + +--- + +## 7) Error model + +Shared API should normalize output regardless of transport. + +Guideline: +- protocol/policy rejection -> `{:ok, %{accepted: false, message: "..."}}` +- runtime/system failure -> `{:error, term()}` + +Common reason mapping: + +| Reason | Message prefix | +|---|---| +| `:auth_required` | `auth-required:` | +| `:restricted_giftwrap` | `restricted:` | +| `:invalid_event` | `invalid:` | +| `:duplicate_event` | `duplicate:` | +| `:event_rate_limited` | `rate-limited:` | + +--- + +## 8) Telemetry + +Emit shared events in API layer (not transport-specific): + +- `[:parrhesia, :api, :publish, :stop]` +- `[:parrhesia, :api, :query, :stop]` +- `[:parrhesia, :api, :count, :stop]` +- `[:parrhesia, :api, :auth, :stop]` + +Metadata: +- `traffic_class` +- `caller` (`:websocket | :http | :local`) +- optional `account_present?` + +Transport-level telemetry can remain separate where needed. + +--- + +## 9) Refactor sequence + +### Phase 1: Extract shared API +1. Create `Parrhesia.API.Events` with publish/query/count from current `Web.Connection` paths. +2. Create `Parrhesia.API.Auth` wrappers for NIP-98/event validation. +3. Add API-level tests. + +### Phase 2: Migrate transports +1. Update `Parrhesia.Web.Connection` to delegate publish/query/count to `API.Events`. +2. Update `Parrhesia.Web.Management` to use `API.Auth`. +3. Keep behavior unchanged. + +### Phase 3: Add local wrappers/helpers +1. Implement `Parrhesia.Local.Auth/Events/Stream` as thin delegates. +2. Add `Parrhesia.Local.Client` post/inbox/send helpers. +3. Add embedding documentation. + +### Phase 4: Lock parity +1. Add parity tests: WS vs Local API for same inputs and policy outcomes. +2. Add property tests for query/count equivalence where feasible. + +--- + +## 10) Testing requirements + +1. **Transport parity tests** + - Same signed event via WS and API => same accepted/message semantics. +2. **Policy parity tests** + - Giftwrap visibility and auth-required behavior identical across WS/API/local. +3. **Auth tests** + - NIP-98 success/failure + account resolver success/failure. +4. **Fanout tests** + - publish via API reaches local stream subscribers and WS subscribers. +5. **Failure tests** + - storage failures surface deterministic errors in all transports. + +--- + +## 11) Backwards compatibility + +- No breaking change to websocket protocol. +- No breaking change to management endpoint contract. +- New API modules are additive. +- Existing apps can ignore local API entirely. + +--- + +## 12) Embedding example flow + +### 12.1 Login/auth + +```elixir +with {:ok, auth} <- Parrhesia.API.Auth.validate_nip98(header, method, url, + account_resolver: &MyApp.Accounts.resolve_nostr_pubkey/2 + ) do + # use auth.pubkey/auth.account in host session +end +``` + +### 12.2 Post publish + +```elixir +Parrhesia.Local.Client.publish_post(auth, "hello", signer: &MyApp.NostrSigner.sign/1) +``` + +### 12.3 Private message + +```elixir +Parrhesia.Local.Client.send_private_message( + auth, + recipient_pubkey, + encrypted_payload, + signer: &MyApp.NostrSigner.sign/1 +) +``` + +### 12.4 Inbox + +```elixir +Parrhesia.Local.Client.inbox(auth, limit: 100) +``` + +--- + +## 13) Summary + +Yes, this can and should be extracted into a shared API module. The server should consume it too. + +That gives: +- one canonical behavior path, +- cleaner embedding, +- easier testing, +- lower long-term maintenance cost. diff --git a/lib/parrhesia/storage/archiver.ex b/lib/parrhesia/storage/archiver.ex index f380f59..29e5e85 100644 --- a/lib/parrhesia/storage/archiver.ex +++ b/lib/parrhesia/storage/archiver.ex @@ -7,6 +7,18 @@ defmodule Parrhesia.Storage.Archiver do alias Parrhesia.Repo + @identifier_pattern ~r/^[a-zA-Z_][a-zA-Z0-9_]*$/ + @monthly_partition_pattern ~r/^events_(\d{4})_(\d{2})$/ + @default_months_ahead 2 + + @type monthly_partition :: %{ + name: String.t(), + year: pos_integer(), + month: pos_integer(), + month_start_unix: non_neg_integer(), + month_end_unix: non_neg_integer() + } + @doc """ Lists all `events_*` partitions excluding the default partition. """ @@ -24,7 +36,79 @@ defmodule Parrhesia.Storage.Archiver do Repo.all(query) end - @identifier_pattern ~r/^[a-zA-Z_][a-zA-Z0-9_]*$/ + @doc """ + Lists monthly event partitions that match `events_YYYY_MM` naming. + """ + @spec list_monthly_partitions() :: [monthly_partition()] + def list_monthly_partitions do + list_partitions() + |> Enum.map(&parse_monthly_partition/1) + |> Enum.reject(&is_nil/1) + |> Enum.sort_by(&{&1.year, &1.month}) + end + + @doc """ + Ensures monthly partitions exist for the current month and `months_ahead` future months. + """ + @spec ensure_monthly_partitions(keyword()) :: :ok | {:error, term()} + def ensure_monthly_partitions(opts \\ []) when is_list(opts) do + months_ahead = + opts + |> Keyword.get(:months_ahead, @default_months_ahead) + |> normalize_non_negative_integer(@default_months_ahead) + + reference_date = + opts + |> Keyword.get(:reference_date, Date.utc_today()) + |> normalize_reference_date() + + reference_month = month_start(reference_date) + + offsets = + if months_ahead == 0 do + [0] + else + Enum.to_list(0..months_ahead) + end + + Enum.reduce_while(offsets, :ok, fn offset, :ok -> + target_month = shift_month(reference_month, offset) + + case create_monthly_partition(target_month) do + :ok -> {:cont, :ok} + {:error, reason} -> {:halt, {:error, reason}} + end + end) + end + + @doc """ + Returns the current database size in bytes. + """ + @spec database_size_bytes() :: {:ok, non_neg_integer()} | {:error, term()} + def database_size_bytes do + case Repo.query("SELECT pg_database_size(current_database())") do + {:ok, %{rows: [[size]]}} when is_integer(size) and size >= 0 -> {:ok, size} + {:ok, _result} -> {:error, :unexpected_result} + {:error, reason} -> {:error, reason} + end + end + + @doc """ + Drops an event partition table by name. + """ + @spec drop_partition(String.t()) :: :ok | {:error, term()} + def drop_partition(partition_name) when is_binary(partition_name) do + if partition_name in ["events", "events_default"] do + {:error, :protected_partition} + else + quoted_partition_name = quote_identifier!(partition_name) + + case Repo.query("DROP TABLE IF EXISTS #{quoted_partition_name}") do + {:ok, _result} -> :ok + {:error, reason} -> {:error, reason} + end + end + end @doc """ Generates an archive SQL statement for the given partition. @@ -37,6 +121,89 @@ defmodule Parrhesia.Storage.Archiver do "INSERT INTO #{quoted_archive_table_name} SELECT * FROM #{quoted_partition_name};" end + @doc """ + Returns the monthly partition name for a date. + """ + @spec month_partition_name(Date.t()) :: String.t() + def month_partition_name(%Date{} = date) do + month = date.month |> Integer.to_string() |> String.pad_leading(2, "0") + "events_#{date.year}_#{month}" + end + + defp create_monthly_partition(%Date{} = month_date) do + partition_name = month_partition_name(month_date) + {start_unix, end_unix} = month_bounds_unix(month_date.year, month_date.month) + quoted_partition_name = quote_identifier!(partition_name) + + sql = + """ + CREATE TABLE IF NOT EXISTS #{quoted_partition_name} + PARTITION OF "events" + FOR VALUES FROM (#{start_unix}) TO (#{end_unix}) + """ + + case Repo.query(sql) do + {:ok, _result} -> :ok + {:error, reason} -> {:error, reason} + end + end + + defp parse_monthly_partition(partition_name) do + case Regex.run(@monthly_partition_pattern, partition_name, capture: :all_but_first) do + [year_text, month_text] -> + {year, ""} = Integer.parse(year_text) + {month, ""} = Integer.parse(month_text) + + if month in 1..12 do + {month_start_unix, month_end_unix} = month_bounds_unix(year, month) + + %{ + name: partition_name, + year: year, + month: month, + month_start_unix: month_start_unix, + month_end_unix: month_end_unix + } + else + nil + end + + _other -> + nil + end + end + + defp month_bounds_unix(year, month) do + month_date = Date.new!(year, month, 1) + next_month_date = shift_month(month_date, 1) + + {date_to_unix(month_date), date_to_unix(next_month_date)} + end + + defp date_to_unix(%Date{} = date) do + date + |> DateTime.new!(~T[00:00:00], "Etc/UTC") + |> DateTime.to_unix() + end + + defp month_start(%Date{} = date), do: Date.new!(date.year, date.month, 1) + + defp shift_month(%Date{} = date, month_delta) when is_integer(month_delta) do + month_index = date.year * 12 + date.month - 1 + month_delta + shifted_year = div(month_index, 12) + shifted_month = rem(month_index, 12) + 1 + + Date.new!(shifted_year, shifted_month, 1) + end + + defp normalize_reference_date(%Date{} = date), do: date + defp normalize_reference_date(_other), do: Date.utc_today() + + defp normalize_non_negative_integer(value, _default) when is_integer(value) and value >= 0, + do: value + + defp normalize_non_negative_integer(_value, default), do: default + defp quote_identifier!(identifier) when is_binary(identifier) do if Regex.match?(@identifier_pattern, identifier) do ~s("#{identifier}") diff --git a/lib/parrhesia/tasks/partition_retention_worker.ex b/lib/parrhesia/tasks/partition_retention_worker.ex new file mode 100644 index 0000000..3557db5 --- /dev/null +++ b/lib/parrhesia/tasks/partition_retention_worker.ex @@ -0,0 +1,280 @@ +defmodule Parrhesia.Tasks.PartitionRetentionWorker do + @moduledoc """ + Periodic worker that ensures monthly event partitions and applies retention pruning. + """ + + use GenServer + + alias Parrhesia.Storage.Archiver + alias Parrhesia.Telemetry + + @default_check_interval_hours 24 + @default_months_ahead 2 + @default_max_partitions_to_drop_per_run 1 + @bytes_per_gib 1_073_741_824 + + @type monthly_partition :: Archiver.monthly_partition() + + @spec start_link(keyword()) :: GenServer.on_start() + def start_link(opts \\ []) do + name = Keyword.get(opts, :name, __MODULE__) + GenServer.start_link(__MODULE__, opts, name: name) + end + + @impl true + def init(opts) do + retention_config = Application.get_env(:parrhesia, :retention, []) + + state = %{ + archiver: Keyword.get(opts, :archiver, Archiver), + interval_ms: interval_ms(opts, retention_config), + months_ahead: months_ahead(opts, retention_config), + max_db_gib: max_db_gib(opts, retention_config), + max_months_to_keep: max_months_to_keep(opts, retention_config), + max_partitions_to_drop_per_run: max_partitions_to_drop_per_run(opts, retention_config), + today_fun: today_fun(opts) + } + + schedule_tick(0) + {:ok, state} + end + + @impl true + def handle_info(:tick, state) do + started_at = System.monotonic_time() + + {dropped_count, status} = + case run_maintenance(state) do + {:ok, count} -> {count, :ok} + {:error, _reason} -> {0, :error} + end + + Telemetry.emit( + [:parrhesia, :maintenance, :partition_retention, :stop], + %{ + duration: System.monotonic_time() - started_at, + dropped_partitions: dropped_count + }, + %{status: status} + ) + + schedule_tick(state.interval_ms) + {:noreply, state} + end + + def handle_info(_message, state), do: {:noreply, state} + + defp run_maintenance(state) do + case state.archiver.ensure_monthly_partitions(months_ahead: state.months_ahead) do + :ok -> maybe_drop_oldest_partitions(state) + {:error, reason} -> {:error, reason} + end + end + + defp maybe_drop_oldest_partitions(%{max_partitions_to_drop_per_run: max_drops}) + when max_drops <= 0, + do: {:ok, 0} + + defp maybe_drop_oldest_partitions(state) do + 1..state.max_partitions_to_drop_per_run + |> Enum.reduce_while({:ok, 0}, fn _attempt, {:ok, dropped_count} -> + drop_oldest_partition_once(state, dropped_count) + end) + end + + defp drop_oldest_partition_once(state, dropped_count) do + case next_partition_to_drop(state) do + {:ok, partition} -> apply_partition_drop(state, partition, dropped_count) + {:error, reason} -> {:halt, {:error, reason}} + end + end + + defp apply_partition_drop(_state, nil, dropped_count), do: {:halt, {:ok, dropped_count}} + + defp apply_partition_drop(state, partition, dropped_count) do + case state.archiver.drop_partition(partition.name) do + :ok -> {:cont, {:ok, dropped_count + 1}} + {:error, reason} -> {:halt, {:error, reason}} + end + end + + defp next_partition_to_drop(state) do + partitions = state.archiver.list_monthly_partitions() + current_month_index = current_month_index(state.today_fun) + + month_limit_candidate = + oldest_partition_exceeding_month_limit( + partitions, + state.max_months_to_keep, + current_month_index + ) + + with {:ok, size_limit_candidate} <- + oldest_partition_exceeding_size_limit( + partitions, + state.max_db_gib, + current_month_index, + state.archiver + ) do + {:ok, pick_oldest_partition(month_limit_candidate, size_limit_candidate)} + end + end + + defp oldest_partition_exceeding_month_limit(_partitions, :infinity, _current_month_index), + do: nil + + defp oldest_partition_exceeding_month_limit(partitions, max_months_to_keep, current_month_index) + when is_integer(max_months_to_keep) and max_months_to_keep > 0 do + oldest_month_to_keep_index = current_month_index - (max_months_to_keep - 1) + + partitions + |> Enum.filter(fn partition -> + month_index(partition) < current_month_index and + month_index(partition) < oldest_month_to_keep_index + end) + |> Enum.min_by(&month_index/1, fn -> nil end) + end + + defp oldest_partition_exceeding_month_limit( + _partitions, + _max_months_to_keep, + _current_month_index + ), + do: nil + + defp oldest_partition_exceeding_size_limit( + _partitions, + :infinity, + _current_month_index, + _archiver + ), + do: {:ok, nil} + + defp oldest_partition_exceeding_size_limit( + partitions, + max_db_gib, + current_month_index, + archiver + ) + when is_integer(max_db_gib) and max_db_gib > 0 do + with {:ok, current_size_bytes} <- archiver.database_size_bytes() do + max_size_bytes = max_db_gib * @bytes_per_gib + + if current_size_bytes > max_size_bytes do + {:ok, oldest_completed_partition(partitions, current_month_index)} + else + {:ok, nil} + end + end + end + + defp oldest_partition_exceeding_size_limit( + _partitions, + _max_db_gib, + _current_month_index, + _archiver + ), + do: {:ok, nil} + + defp oldest_completed_partition(partitions, current_month_index) do + partitions + |> Enum.filter(&(month_index(&1) < current_month_index)) + |> Enum.min_by(&month_index/1, fn -> nil end) + end + + defp pick_oldest_partition(nil, nil), do: nil + defp pick_oldest_partition(partition, nil), do: partition + defp pick_oldest_partition(nil, partition), do: partition + + defp pick_oldest_partition(left, right) do + if month_index(left) <= month_index(right) do + left + else + right + end + end + + defp month_index(%{year: year, month: month}) when is_integer(year) and is_integer(month) do + year * 12 + month + end + + defp current_month_index(today_fun) do + today = today_fun.() + today.year * 12 + today.month + end + + defp interval_ms(opts, retention_config) do + case Keyword.get(opts, :interval_ms) do + value when is_integer(value) and value > 0 -> + value + + _other -> + retention_config + |> Keyword.get(:check_interval_hours, @default_check_interval_hours) + |> normalize_positive_integer(@default_check_interval_hours) + |> hours_to_ms() + end + end + + defp months_ahead(opts, retention_config) do + opts + |> Keyword.get( + :months_ahead, + Keyword.get(retention_config, :months_ahead, @default_months_ahead) + ) + |> normalize_non_negative_integer(@default_months_ahead) + end + + defp max_db_gib(opts, retention_config) do + opts + |> Keyword.get(:max_db_bytes, Keyword.get(retention_config, :max_db_bytes, :infinity)) + |> normalize_limit() + end + + defp max_months_to_keep(opts, retention_config) do + opts + |> Keyword.get( + :max_months_to_keep, + Keyword.get(retention_config, :max_months_to_keep, :infinity) + ) + |> normalize_limit() + end + + defp max_partitions_to_drop_per_run(opts, retention_config) do + opts + |> Keyword.get( + :max_partitions_to_drop_per_run, + Keyword.get( + retention_config, + :max_partitions_to_drop_per_run, + @default_max_partitions_to_drop_per_run + ) + ) + |> normalize_non_negative_integer(@default_max_partitions_to_drop_per_run) + end + + defp today_fun(opts) do + case Keyword.get(opts, :today_fun, &Date.utc_today/0) do + function when is_function(function, 0) -> function + _other -> &Date.utc_today/0 + end + end + + defp normalize_limit(:infinity), do: :infinity + defp normalize_limit(value) when is_integer(value) and value > 0, do: value + defp normalize_limit(_value), do: :infinity + + defp normalize_positive_integer(value, _default) when is_integer(value) and value > 0, do: value + defp normalize_positive_integer(_value, default), do: default + + defp normalize_non_negative_integer(value, _default) when is_integer(value) and value >= 0, + do: value + + defp normalize_non_negative_integer(_value, default), do: default + + defp hours_to_ms(hours), do: hours * 60 * 60 * 1000 + + defp schedule_tick(interval_ms) do + Process.send_after(self(), :tick, interval_ms) + end +end diff --git a/lib/parrhesia/tasks/supervisor.ex b/lib/parrhesia/tasks/supervisor.ex index 99345fb..b95f1d6 100644 --- a/lib/parrhesia/tasks/supervisor.ex +++ b/lib/parrhesia/tasks/supervisor.ex @@ -11,13 +11,22 @@ defmodule Parrhesia.Tasks.Supervisor do @impl true def init(_init_arg) do - children = - if Application.get_env(:parrhesia, :enable_expiration_worker, true) do - [{Parrhesia.Tasks.ExpirationWorker, name: Parrhesia.Tasks.ExpirationWorker}] - else - [] - end + children = expiration_children() ++ partition_retention_children() Supervisor.init(children, strategy: :one_for_one) end + + defp expiration_children do + if Application.get_env(:parrhesia, :enable_expiration_worker, true) do + [{Parrhesia.Tasks.ExpirationWorker, name: Parrhesia.Tasks.ExpirationWorker}] + else + [] + end + end + + defp partition_retention_children do + [ + {Parrhesia.Tasks.PartitionRetentionWorker, name: Parrhesia.Tasks.PartitionRetentionWorker} + ] + end end diff --git a/lib/parrhesia/test_support/partition_retention_stub_archiver.ex b/lib/parrhesia/test_support/partition_retention_stub_archiver.ex new file mode 100644 index 0000000..cda7805 --- /dev/null +++ b/lib/parrhesia/test_support/partition_retention_stub_archiver.ex @@ -0,0 +1,52 @@ +defmodule Parrhesia.TestSupport.PartitionRetentionStubArchiver do + @moduledoc false + + use Agent + + @spec start_link(keyword()) :: Agent.on_start() + def start_link(opts \\ []) do + name = Keyword.get(opts, :name, __MODULE__) + + initial_state = %{ + partitions: Keyword.get(opts, :partitions, []), + db_size_bytes: Keyword.get(opts, :db_size_bytes, 0), + test_pid: Keyword.get(opts, :test_pid) + } + + Agent.start_link(fn -> initial_state end, name: name) + end + + @spec ensure_monthly_partitions(keyword()) :: :ok + def ensure_monthly_partitions(opts \\ []) do + notify({:ensure_monthly_partitions, opts}) + :ok + end + + @spec list_monthly_partitions() :: [map()] + def list_monthly_partitions do + Agent.get(__MODULE__, & &1.partitions) + end + + @spec database_size_bytes() :: {:ok, non_neg_integer()} + def database_size_bytes do + notify(:database_size_bytes) + {:ok, Agent.get(__MODULE__, & &1.db_size_bytes)} + end + + @spec drop_partition(String.t()) :: :ok + def drop_partition(partition_name) when is_binary(partition_name) do + Agent.update(__MODULE__, fn state -> + %{state | partitions: Enum.reject(state.partitions, &(&1.name == partition_name))} + end) + + notify({:drop_partition, partition_name}) + :ok + end + + defp notify(message) do + case Agent.get(__MODULE__, & &1.test_pid) do + pid when is_pid(pid) -> send(pid, message) + _other -> :ok + end + end +end diff --git a/test/parrhesia/storage/archiver_test.exs b/test/parrhesia/storage/archiver_test.exs index 1f625ca..a72f6a8 100644 --- a/test/parrhesia/storage/archiver_test.exs +++ b/test/parrhesia/storage/archiver_test.exs @@ -15,11 +15,34 @@ defmodule Parrhesia.Storage.ArchiverTest do assert is_list(partitions) end + test "ensure_monthly_partitions creates named monthly partitions" do + assert :ok = + Archiver.ensure_monthly_partitions(reference_date: ~D[2026-06-14], months_ahead: 1) + + monthly_partition_names = + Archiver.list_monthly_partitions() + |> Enum.map(& &1.name) + + assert "events_2026_06" in monthly_partition_names + assert "events_2026_07" in monthly_partition_names + end + test "archive_sql builds insert-select statement" do assert Archiver.archive_sql("events_2026_03", "events_archive") == ~s(INSERT INTO "events_archive" SELECT * FROM "events_2026_03";) end + test "drop_partition returns an error for protected partitions" do + assert {:error, :protected_partition} = Archiver.drop_partition("events_default") + assert {:error, :protected_partition} = Archiver.drop_partition("events") + end + + test "database_size_bytes returns the current database size" do + assert {:ok, size} = Archiver.database_size_bytes() + assert is_integer(size) + assert size >= 0 + end + test "archive_sql rejects invalid SQL identifiers" do assert_raise ArgumentError, fn -> Archiver.archive_sql("events_default; DROP TABLE events", "events_archive") diff --git a/test/parrhesia/tasks/partition_retention_worker_test.exs b/test/parrhesia/tasks/partition_retention_worker_test.exs new file mode 100644 index 0000000..e68f5dd --- /dev/null +++ b/test/parrhesia/tasks/partition_retention_worker_test.exs @@ -0,0 +1,124 @@ +defmodule Parrhesia.Tasks.PartitionRetentionWorkerTest do + use ExUnit.Case, async: false + + alias Parrhesia.Tasks.PartitionRetentionWorker + alias Parrhesia.TestSupport.PartitionRetentionStubArchiver + + @bytes_per_gib 1_073_741_824 + + test "drops oldest partition when max_months_to_keep is exceeded" do + start_supervised!( + {PartitionRetentionStubArchiver, + partitions: [ + partition(2026, 1), + partition(2026, 2), + partition(2026, 3), + partition(2026, 4), + partition(2026, 5) + ], + db_size_bytes: 2 * @bytes_per_gib, + test_pid: self()} + ) + + worker = + start_supervised!( + {PartitionRetentionWorker, + name: nil, + archiver: PartitionRetentionStubArchiver, + interval_ms: :timer.hours(24), + months_ahead: 0, + max_db_bytes: :infinity, + max_months_to_keep: 3, + max_partitions_to_drop_per_run: 1, + today_fun: fn -> ~D[2026-06-15] end} + ) + + assert is_pid(worker) + assert_receive {:ensure_monthly_partitions, [months_ahead: 0]} + assert_receive {:drop_partition, "events_2026_01"} + refute_receive {:drop_partition, _partition_name}, 20 + refute_receive :database_size_bytes, 20 + end + + test "drops oldest completed partition when size exceeds max_db_bytes" do + start_supervised!( + {PartitionRetentionStubArchiver, + partitions: [partition(2026, 3), partition(2026, 4), partition(2026, 5)], + db_size_bytes: 12 * @bytes_per_gib, + test_pid: self()} + ) + + worker = + start_supervised!( + {PartitionRetentionWorker, + name: nil, + archiver: PartitionRetentionStubArchiver, + interval_ms: :timer.hours(24), + months_ahead: 0, + max_db_bytes: 10, + max_months_to_keep: :infinity, + max_partitions_to_drop_per_run: 1, + today_fun: fn -> ~D[2026-06-15] end} + ) + + assert is_pid(worker) + assert_receive {:ensure_monthly_partitions, [months_ahead: 0]} + assert_receive :database_size_bytes + assert_receive {:drop_partition, "events_2026_03"} + end + + test "does not drop partitions when both limits are infinity" do + start_supervised!( + {PartitionRetentionStubArchiver, + partitions: [partition(2026, 1), partition(2026, 2), partition(2026, 3)], + db_size_bytes: 50 * @bytes_per_gib, + test_pid: self()} + ) + + worker = + start_supervised!( + {PartitionRetentionWorker, + name: nil, + archiver: PartitionRetentionStubArchiver, + interval_ms: :timer.hours(24), + months_ahead: 0, + max_db_bytes: :infinity, + max_months_to_keep: :infinity, + max_partitions_to_drop_per_run: 1, + today_fun: fn -> ~D[2026-06-15] end} + ) + + assert is_pid(worker) + assert_receive {:ensure_monthly_partitions, [months_ahead: 0]} + refute_receive :database_size_bytes, 20 + refute_receive {:drop_partition, _partition_name}, 20 + end + + defp partition(year, month) when is_integer(year) and is_integer(month) do + month_name = month |> Integer.to_string() |> String.pad_leading(2, "0") + month_start = Date.new!(year, month, 1) + next_month_start = shift_month(month_start, 1) + + %{ + name: "events_#{year}_#{month_name}", + year: year, + month: month, + month_start_unix: date_to_unix(month_start), + month_end_unix: date_to_unix(next_month_start) + } + end + + defp shift_month(%Date{} = date, month_delta) when is_integer(month_delta) do + month_index = date.year * 12 + date.month - 1 + month_delta + shifted_year = div(month_index, 12) + shifted_month = rem(month_index, 12) + 1 + + Date.new!(shifted_year, shifted_month, 1) + end + + defp date_to_unix(%Date{} = date) do + date + |> DateTime.new!(~T[00:00:00], "Etc/UTC") + |> DateTime.to_unix() + end +end