diff --git a/README.md b/README.md index 64a38e9..b2fa6a0 100644 --- a/README.md +++ b/README.md @@ -13,11 +13,11 @@ Parrhesia is a Nostr relay server written in Elixir/OTP with PostgreSQL storage. It exposes: -- a WebSocket relay endpoint at `/relay` +- listener-configurable WS/HTTP ingress, with a default `public` listener on port `4413` +- a WebSocket relay endpoint at `/relay` on listeners that enable the `nostr` feature - NIP-11 relay info on `GET /relay` with `Accept: application/nostr+json` -- operational HTTP endpoints (`/health`, `/ready`, `/metrics`) - - `/metrics` is restricted by default to private/loopback source IPs -- a NIP-86-style management API at `POST /management` (NIP-98 auth) +- operational HTTP endpoints such as `/health`, `/ready`, and `/metrics` on listeners that enable them +- a NIP-86-style management API at `POST /management` on listeners that enable the `admin` feature ## Supported NIPs @@ -56,7 +56,7 @@ mix setup mix run --no-halt ``` -Server listens on `http://localhost:4413` by default. +The default `public` listener binds to `http://localhost:4413`. WebSocket clients should connect to: @@ -83,8 +83,8 @@ Before a Nostr client can publish its first event successfully, make sure these 1. PostgreSQL is reachable from Parrhesia. Set `DATABASE_URL` and create/migrate the database with `Parrhesia.Release.migrate()` or `mix ecto.migrate`. -2. Parrhesia is reachable behind your reverse proxy. - Parrhesia itself listens on plain HTTP on port `4413`, and the reverse proxy is expected to terminate TLS and forward WebSocket traffic to `/relay`. +2. Parrhesia listeners are configured for your deployment. + The default config exposes a `public` listener on plain HTTP port `4413`, and a reverse proxy can terminate TLS and forward WebSocket traffic to `/relay`. Additional listeners can be defined in `config/*.exs`. 3. `:relay_url` matches the public relay URL clients should use. Set `PARRHESIA_RELAY_URL` to the public relay URL exposed by the reverse proxy. @@ -100,7 +100,7 @@ In `prod`, these environment variables are used: - `DATABASE_URL` (**required**), e.g. `ecto://USER:PASS@HOST/parrhesia_prod` - `POOL_SIZE` (optional, default `32`) - `PORT` (optional, default `4413`) -- `PARRHESIA_*` runtime overrides for relay config, limits, policies, metrics, and features +- `PARRHESIA_*` runtime overrides for relay config, limits, policies, listener-related metrics helpers, and features - `PARRHESIA_EXTRA_CONFIG` (optional path to an extra runtime config file) `config/runtime.exs` reads these values at runtime in production releases. @@ -110,6 +110,7 @@ In `prod`, these environment variables are used: For runtime overrides, use the `PARRHESIA_...` prefix: - `PARRHESIA_RELAY_URL` +- `PARRHESIA_TRUSTED_PROXIES` - `PARRHESIA_MODERATION_CACHE_ENABLED` - `PARRHESIA_ENABLE_EXPIRATION_WORKER` - `PARRHESIA_LIMITS_*` @@ -128,6 +129,8 @@ export PARRHESIA_METRICS_ALLOWED_CIDRS="10.0.0.0/8,192.168.0.0/16" export PARRHESIA_LIMITS_OUTBOUND_OVERFLOW_STRATEGY=drop_oldest ``` +Listeners themselves are primarily configured under `config :parrhesia, :listeners, ...`. The current runtime env helpers tune the default public listener and the optional dedicated metrics listener. + For settings that are awkward to express as env vars, mount an extra config file and set `PARRHESIA_EXTRA_CONFIG` to its path inside the container. ### Config reference @@ -143,7 +146,7 @@ CSV env vars use comma-separated values. Boolean env vars accept `1/0`, `true/fa | `:enable_expiration_worker` | `PARRHESIA_ENABLE_EXPIRATION_WORKER` | `true` | Toggle background expiration worker | | `:limits` | `PARRHESIA_LIMITS_*` | see table below | Runtime override group | | `:policies` | `PARRHESIA_POLICIES_*` | see table below | Runtime override group | -| `:metrics` | `PARRHESIA_METRICS_*` | see table below | Runtime override group | +| `:listeners` | config-file driven | see notes below | Ingress listeners with bind, transport, feature, auth, network, and baseline ACL settings | | `:retention` | `PARRHESIA_RETENTION_*` | see table below | Partition lifecycle and pruning policy | | `:features` | `PARRHESIA_FEATURES_*` | see table below | Runtime override group | | `:storage.events` | `-` | `Parrhesia.Storage.Adapters.Postgres.Events` | Config-file override only | @@ -161,19 +164,15 @@ CSV env vars use comma-separated values. Boolean env vars accept `1/0`, `true/fa | `:queue_interval` | `DB_QUEUE_INTERVAL_MS` | `5000` | Ecto queue interval in ms | | `:types` | `-` | `Parrhesia.PostgresTypes` | Internal config-file setting | -#### `Parrhesia.Web.Endpoint` +#### `:listeners` | Atom key | ENV | Default | Notes | | --- | --- | --- | --- | -| `:port` | `PORT` | `4413` | Main HTTP/WebSocket listener | - -#### `Parrhesia.Web.MetricsEndpoint` - -| Atom key | ENV | Default | Notes | -| --- | --- | --- | --- | -| `:enabled` | `PARRHESIA_METRICS_ENDPOINT_ENABLED` | `false` | Enables dedicated metrics listener | -| `:ip` | `PARRHESIA_METRICS_ENDPOINT_IP` | `127.0.0.1` | IPv4 only | -| `:port` | `PARRHESIA_METRICS_ENDPOINT_PORT` | `9568` | Dedicated metrics port | +| `:public.bind.port` | `PORT` | `4413` | Default public listener port | +| `:public.proxy.trusted_cidrs` | `PARRHESIA_TRUSTED_PROXIES` | `[]` | Trusted reverse proxies for forwarded IP handling | +| `:public.features.metrics.*` | `PARRHESIA_METRICS_*` | see below | Convenience runtime overrides for metrics on the public listener | +| `:metrics.bind.port` | `PARRHESIA_METRICS_ENDPOINT_PORT` | `9568` | Optional dedicated metrics listener port | +| `:metrics.enabled` | `PARRHESIA_METRICS_ENDPOINT_ENABLED` | `false` | Enables the optional dedicated metrics listener | #### `:limits` @@ -223,11 +222,11 @@ CSV env vars use comma-separated values. Boolean env vars accept `1/0`, `true/fa | `:marmot_push_max_server_recipients` | `PARRHESIA_POLICIES_MARMOT_PUSH_MAX_SERVER_RECIPIENTS` | `1` | | `:management_auth_required` | `PARRHESIA_POLICIES_MANAGEMENT_AUTH_REQUIRED` | `true` | -#### `:metrics` +#### Listener-related Metrics Helpers | Atom key | ENV | Default | | --- | --- | --- | -| `:enabled_on_main_endpoint` | `PARRHESIA_METRICS_ENABLED_ON_MAIN_ENDPOINT` | `true` | +| `:public.features.metrics.enabled` | `PARRHESIA_METRICS_ENABLED_ON_MAIN_ENDPOINT` | `true` | | `:public` | `PARRHESIA_METRICS_PUBLIC` | `false` | | `:private_networks_only` | `PARRHESIA_METRICS_PRIVATE_NETWORKS_ONLY` | `true` | | `:allowed_cidrs` | `PARRHESIA_METRICS_ALLOWED_CIDRS` | `[]` | diff --git a/config/config.exs b/config/config.exs index fe9e944..9cf6496 100644 --- a/config/config.exs +++ b/config/config.exs @@ -57,13 +57,26 @@ config :parrhesia, marmot_push_max_server_recipients: 1, management_auth_required: true ], - metrics: [ - enabled_on_main_endpoint: true, - public: false, - private_networks_only: true, - allowed_cidrs: [], - auth_token: nil - ], + listeners: %{ + public: %{ + enabled: true, + bind: %{ip: {0, 0, 0, 0}, port: 4413}, + transport: %{scheme: :http, tls: %{mode: :disabled}}, + proxy: %{trusted_cidrs: [], honor_x_forwarded_for: true}, + network: %{allow_all: true}, + features: %{ + nostr: %{enabled: true}, + admin: %{enabled: true}, + metrics: %{ + enabled: true, + access: %{private_networks_only: true}, + auth_token: nil + } + }, + auth: %{nip42_required: false, nip98_required_for_admin: true}, + baseline_acl: %{read: [], write: []} + } + }, retention: [ check_interval_hours: 24, months_ahead: 2, @@ -85,13 +98,6 @@ config :parrhesia, admin: Parrhesia.Storage.Adapters.Postgres.Admin ] -config :parrhesia, Parrhesia.Web.Endpoint, port: 4413 - -config :parrhesia, Parrhesia.Web.MetricsEndpoint, - enabled: false, - ip: {127, 0, 0, 1}, - port: 9568 - config :parrhesia, Parrhesia.Repo, types: Parrhesia.PostgresTypes config :parrhesia, ecto_repos: [Parrhesia.Repo] diff --git a/config/runtime.exs b/config/runtime.exs index 462981e..52e32e9 100644 --- a/config/runtime.exs +++ b/config/runtime.exs @@ -121,10 +121,9 @@ if config_env() == :prod do limits_defaults = Application.get_env(:parrhesia, :limits, []) policies_defaults = Application.get_env(:parrhesia, :policies, []) - metrics_defaults = Application.get_env(:parrhesia, :metrics, []) + listeners_defaults = Application.get_env(:parrhesia, :listeners, %{}) retention_defaults = Application.get_env(:parrhesia, :retention, []) features_defaults = Application.get_env(:parrhesia, :features, []) - metrics_endpoint_defaults = Application.get_env(:parrhesia, Parrhesia.Web.MetricsEndpoint, []) default_pool_size = Keyword.get(repo_defaults, :pool_size, 32) default_queue_target = Keyword.get(repo_defaults, :queue_target, 1_000) @@ -340,33 +339,170 @@ if config_env() == :prod do ) ] - metrics = [ - enabled_on_main_endpoint: - bool_env.( - "PARRHESIA_METRICS_ENABLED_ON_MAIN_ENDPOINT", - Keyword.get(metrics_defaults, :enabled_on_main_endpoint, true) - ), - public: - bool_env.( - "PARRHESIA_METRICS_PUBLIC", - Keyword.get(metrics_defaults, :public, false) - ), - private_networks_only: - bool_env.( - "PARRHESIA_METRICS_PRIVATE_NETWORKS_ONLY", - Keyword.get(metrics_defaults, :private_networks_only, true) - ), - allowed_cidrs: - csv_env.( - "PARRHESIA_METRICS_ALLOWED_CIDRS", - Keyword.get(metrics_defaults, :allowed_cidrs, []) - ), - auth_token: - string_env.( - "PARRHESIA_METRICS_AUTH_TOKEN", - Keyword.get(metrics_defaults, :auth_token) + public_listener_defaults = Map.get(listeners_defaults, :public, %{}) + public_bind_defaults = Map.get(public_listener_defaults, :bind, %{}) + public_transport_defaults = Map.get(public_listener_defaults, :transport, %{}) + public_proxy_defaults = Map.get(public_listener_defaults, :proxy, %{}) + public_network_defaults = Map.get(public_listener_defaults, :network, %{}) + public_features_defaults = Map.get(public_listener_defaults, :features, %{}) + public_auth_defaults = Map.get(public_listener_defaults, :auth, %{}) + public_metrics_defaults = Map.get(public_features_defaults, :metrics, %{}) + public_metrics_access_defaults = Map.get(public_metrics_defaults, :access, %{}) + + metrics_listener_defaults = Map.get(listeners_defaults, :metrics, %{}) + metrics_listener_bind_defaults = Map.get(metrics_listener_defaults, :bind, %{}) + metrics_listener_transport_defaults = Map.get(metrics_listener_defaults, :transport, %{}) + metrics_listener_network_defaults = Map.get(metrics_listener_defaults, :network, %{}) + + metrics_listener_metrics_defaults = + metrics_listener_defaults + |> Map.get(:features, %{}) + |> Map.get(:metrics, %{}) + + metrics_listener_metrics_access_defaults = + Map.get(metrics_listener_metrics_defaults, :access, %{}) + + public_listener = %{ + enabled: Map.get(public_listener_defaults, :enabled, true), + bind: %{ + ip: Map.get(public_bind_defaults, :ip, {0, 0, 0, 0}), + port: int_env.("PORT", Map.get(public_bind_defaults, :port, 4413)) + }, + transport: %{ + scheme: Map.get(public_transport_defaults, :scheme, :http), + tls: Map.get(public_transport_defaults, :tls, %{mode: :disabled}) + }, + proxy: %{ + trusted_cidrs: + csv_env.( + "PARRHESIA_TRUSTED_PROXIES", + Map.get(public_proxy_defaults, :trusted_cidrs, []) + ), + honor_x_forwarded_for: Map.get(public_proxy_defaults, :honor_x_forwarded_for, true) + }, + network: %{ + allow_cidrs: Map.get(public_network_defaults, :allow_cidrs, []), + private_networks_only: Map.get(public_network_defaults, :private_networks_only, false), + public: Map.get(public_network_defaults, :public, false), + allow_all: Map.get(public_network_defaults, :allow_all, true) + }, + features: %{ + nostr: %{ + enabled: public_features_defaults |> Map.get(:nostr, %{}) |> Map.get(:enabled, true) + }, + admin: %{ + enabled: public_features_defaults |> Map.get(:admin, %{}) |> Map.get(:enabled, true) + }, + metrics: %{ + enabled: + bool_env.( + "PARRHESIA_METRICS_ENABLED_ON_MAIN_ENDPOINT", + Map.get(public_metrics_defaults, :enabled, true) + ), + auth_token: + string_env.( + "PARRHESIA_METRICS_AUTH_TOKEN", + Map.get(public_metrics_defaults, :auth_token) + ), + access: %{ + public: + bool_env.( + "PARRHESIA_METRICS_PUBLIC", + Map.get(public_metrics_access_defaults, :public, false) + ), + private_networks_only: + bool_env.( + "PARRHESIA_METRICS_PRIVATE_NETWORKS_ONLY", + Map.get(public_metrics_access_defaults, :private_networks_only, true) + ), + allow_cidrs: + csv_env.( + "PARRHESIA_METRICS_ALLOWED_CIDRS", + Map.get(public_metrics_access_defaults, :allow_cidrs, []) + ), + allow_all: Map.get(public_metrics_access_defaults, :allow_all, true) + } + } + }, + auth: %{ + nip42_required: Map.get(public_auth_defaults, :nip42_required, false), + nip98_required_for_admin: + bool_env.( + "PARRHESIA_POLICIES_MANAGEMENT_AUTH_REQUIRED", + Map.get(public_auth_defaults, :nip98_required_for_admin, true) + ) + }, + baseline_acl: Map.get(public_listener_defaults, :baseline_acl, %{read: [], write: []}) + } + + listeners = + if Map.get(metrics_listener_defaults, :enabled, false) or + bool_env.("PARRHESIA_METRICS_ENDPOINT_ENABLED", false) do + Map.put( + %{public: public_listener}, + :metrics, + %{ + enabled: true, + bind: %{ + ip: Map.get(metrics_listener_bind_defaults, :ip, {127, 0, 0, 1}), + port: + int_env.( + "PARRHESIA_METRICS_ENDPOINT_PORT", + Map.get(metrics_listener_bind_defaults, :port, 9568) + ) + }, + transport: %{ + scheme: Map.get(metrics_listener_transport_defaults, :scheme, :http), + tls: Map.get(metrics_listener_transport_defaults, :tls, %{mode: :disabled}) + }, + network: %{ + allow_cidrs: Map.get(metrics_listener_network_defaults, :allow_cidrs, []), + private_networks_only: + Map.get(metrics_listener_network_defaults, :private_networks_only, false), + public: Map.get(metrics_listener_network_defaults, :public, false), + allow_all: Map.get(metrics_listener_network_defaults, :allow_all, true) + }, + features: %{ + nostr: %{enabled: false}, + admin: %{enabled: false}, + metrics: %{ + enabled: true, + auth_token: + string_env.( + "PARRHESIA_METRICS_AUTH_TOKEN", + Map.get(metrics_listener_metrics_defaults, :auth_token) + ), + access: %{ + public: + bool_env.( + "PARRHESIA_METRICS_PUBLIC", + Map.get(metrics_listener_metrics_access_defaults, :public, false) + ), + private_networks_only: + bool_env.( + "PARRHESIA_METRICS_PRIVATE_NETWORKS_ONLY", + Map.get( + metrics_listener_metrics_access_defaults, + :private_networks_only, + true + ) + ), + allow_cidrs: + csv_env.( + "PARRHESIA_METRICS_ALLOWED_CIDRS", + Map.get(metrics_listener_metrics_access_defaults, :allow_cidrs, []) + ), + allow_all: Map.get(metrics_listener_metrics_access_defaults, :allow_all, true) + } + } + }, + auth: %{nip42_required: false, nip98_required_for_admin: true}, + baseline_acl: %{read: [], write: []} + } ) - ] + else + %{public: public_listener} + end retention = [ check_interval_hours: @@ -430,25 +566,6 @@ if config_env() == :prod do queue_target: queue_target, queue_interval: queue_interval - config :parrhesia, Parrhesia.Web.Endpoint, port: int_env.("PORT", 4413) - - config :parrhesia, Parrhesia.Web.MetricsEndpoint, - enabled: - bool_env.( - "PARRHESIA_METRICS_ENDPOINT_ENABLED", - Keyword.get(metrics_endpoint_defaults, :enabled, false) - ), - ip: - ipv4_env.( - "PARRHESIA_METRICS_ENDPOINT_IP", - Keyword.get(metrics_endpoint_defaults, :ip, {127, 0, 0, 1}) - ), - port: - int_env.( - "PARRHESIA_METRICS_ENDPOINT_PORT", - Keyword.get(metrics_endpoint_defaults, :port, 9568) - ) - config :parrhesia, relay_url: string_env.("PARRHESIA_RELAY_URL", relay_url_default), identity: [ @@ -467,9 +584,9 @@ if config_env() == :prod do bool_env.("PARRHESIA_MODERATION_CACHE_ENABLED", moderation_cache_enabled_default), enable_expiration_worker: bool_env.("PARRHESIA_ENABLE_EXPIRATION_WORKER", enable_expiration_worker_default), + listeners: listeners, limits: limits, policies: policies, - metrics: metrics, retention: retention, features: features diff --git a/config/test.exs b/config/test.exs index 235eed7..86aeac8 100644 --- a/config/test.exs +++ b/config/test.exs @@ -8,9 +8,21 @@ test_endpoint_port = value -> String.to_integer(value) end -config :parrhesia, Parrhesia.Web.Endpoint, - port: test_endpoint_port, - ip: {127, 0, 0, 1} +config :parrhesia, :listeners, + public: %{ + enabled: true, + bind: %{ip: {127, 0, 0, 1}, port: test_endpoint_port}, + transport: %{scheme: :http, tls: %{mode: :disabled}}, + proxy: %{trusted_cidrs: [], honor_x_forwarded_for: true}, + network: %{allow_all: true}, + features: %{ + nostr: %{enabled: true}, + admin: %{enabled: true}, + metrics: %{enabled: true, access: %{private_networks_only: true}, auth_token: nil} + }, + auth: %{nip42_required: false, nip98_required_for_admin: true}, + baseline_acl: %{read: [], write: []} + } config :parrhesia, enable_expiration_worker: false, diff --git a/docs/ARCH.md b/docs/ARCH.md index 92acb1a..c78650b 100644 --- a/docs/ARCH.md +++ b/docs/ARCH.md @@ -68,10 +68,10 @@ Notes: ## 3) System architecture (high level) ```text -WS/HTTP Edge (Bandit/Plug) +Configured WS/HTTP Listeners (Bandit/Plug) -> Protocol Decoder/Encoder -> Command Router (EVENT/REQ/CLOSE/AUTH/COUNT/NEG-*) - -> Policy Pipeline (validation, auth, ACL, PoW, NIP-70) + -> Policy Pipeline (listener baseline, validation, auth, ACL, PoW, NIP-70) -> Event Service / Query Service -> Storage Port (behavior) -> Postgres Adapter (Ecto) @@ -90,15 +90,22 @@ WS/HTTP Edge (Bandit/Plug) 4. `Parrhesia.Subscriptions.Supervisor` – subscription index + fanout workers 5. `Parrhesia.Auth.Supervisor` – AUTH challenge/session tracking 6. `Parrhesia.Policy.Supervisor` – rate limiters / ACL caches -7. `Parrhesia.Web.Endpoint` – WS + HTTP ingress +7. `Parrhesia.Web.Endpoint` – supervises configured WS + HTTP listeners 8. `Parrhesia.Tasks.Supervisor` – background jobs (expiry purge, maintenance) Failure model: - Connection failures are isolated per socket process. +- Listener failures are isolated per Bandit child and restarted independently. - Storage outages degrade with explicit `OK/CLOSED` error prefixes (`error:`) per NIP-01. - Non-critical workers are `:transient`; core infra is `:permanent`. +Ingress model: + +- Ingress is defined through `config :parrhesia, :listeners, ...`. +- Each listener has its own bind/transport settings, proxy trust, network allowlist, enabled features (`nostr`, `admin`, `metrics`), auth requirements, and baseline read/write ACL. +- Listeners can therefore expose different security postures, for example a public relay listener and a VPN-only sync-capable listener. + ## 5) Core runtime components ### 5.1 Connection process diff --git a/lib/parrhesia/application.ex b/lib/parrhesia/application.ex index 1ed720b..e232308 100644 --- a/lib/parrhesia/application.ex +++ b/lib/parrhesia/application.ex @@ -14,7 +14,6 @@ defmodule Parrhesia.Application do Parrhesia.Sync.Supervisor, Parrhesia.Policy.Supervisor, Parrhesia.Web.Endpoint, - Parrhesia.Web.MetricsEndpoint, Parrhesia.Tasks.Supervisor ] diff --git a/lib/parrhesia/web/connection.ex b/lib/parrhesia/web/connection.ex index 87d518d..b89bdaa 100644 --- a/lib/parrhesia/web/connection.ex +++ b/lib/parrhesia/web/connection.ex @@ -15,6 +15,7 @@ defmodule Parrhesia.Web.Connection do alias Parrhesia.Protocol.Filter alias Parrhesia.Subscriptions.Index alias Parrhesia.Telemetry + alias Parrhesia.Web.Listener @default_max_subscriptions_per_connection 32 @default_max_outbound_queue 256 @@ -43,6 +44,7 @@ defmodule Parrhesia.Web.Connection do defstruct subscriptions: %{}, authenticated_pubkeys: MapSet.new(), + listener: nil, max_subscriptions_per_connection: @default_max_subscriptions_per_connection, subscription_index: Index, auth_challenges: Challenges, @@ -74,6 +76,7 @@ defmodule Parrhesia.Web.Connection do @type t :: %__MODULE__{ subscriptions: %{String.t() => subscription()}, authenticated_pubkeys: MapSet.t(String.t()), + listener: map() | nil, max_subscriptions_per_connection: pos_integer(), subscription_index: GenServer.server() | nil, auth_challenges: GenServer.server() | nil, @@ -101,6 +104,7 @@ defmodule Parrhesia.Web.Connection do auth_challenges = auth_challenges(opts) state = %__MODULE__{ + listener: Listener.from_opts(opts), max_subscriptions_per_connection: max_subscriptions_per_connection(opts), subscription_index: subscription_index(opts), auth_challenges: auth_challenges, @@ -222,7 +226,10 @@ defmodule Parrhesia.Web.Connection do case maybe_allow_event_ingest(state) do {:ok, next_state} -> - publish_event_response(next_state, event) + case authorize_listener_write(next_state, event) do + :ok -> publish_event_response(next_state, event) + {:error, reason} -> ingest_error_response(state, event_id, reason) + end {:error, reason} -> ingest_error_response(state, event_id, reason) @@ -265,6 +272,7 @@ defmodule Parrhesia.Web.Connection do defp handle_req(%__MODULE__{} = state, subscription_id, filters) do with :ok <- Filter.validate_filters(filters), + :ok <- authorize_listener_read(state, filters), :ok <- EventPolicy.authorize_read( filters, @@ -302,6 +310,13 @@ defmodule Parrhesia.Web.Connection do EventPolicy.error_message(:sync_read_not_allowed) ) + {:error, :listener_read_not_allowed} -> + restricted_close( + state, + subscription_id, + "restricted: listener baseline denies requested filters" + ) + {:error, :marmot_group_h_tag_required} -> restricted_close( state, @@ -364,13 +379,21 @@ defmodule Parrhesia.Web.Connection do end defp handle_count(%__MODULE__{} = state, subscription_id, filters, options) do - case Events.count(filters, - context: request_context(state, subscription_id), - options: options - ) do - {:ok, payload} -> - response = Protocol.encode_relay({:count, subscription_id, payload}) - {:push, {:text, response}, state} + with :ok <- authorize_listener_read(state, filters), + {:ok, payload} <- + Events.count(filters, + context: request_context(state, subscription_id), + options: options + ) do + response = Protocol.encode_relay({:count, subscription_id, payload}) + {:push, {:text, response}, state} + else + {:error, :listener_read_not_allowed} -> + restricted_count_notice( + state, + subscription_id, + "restricted: listener baseline denies requested filters" + ) {:error, reason} -> handle_count_error(state, subscription_id, reason) @@ -422,6 +445,7 @@ defmodule Parrhesia.Web.Connection do defp handle_neg_open(%__MODULE__{} = state, subscription_id, filter, message) do with :ok <- Filter.validate_filters([filter]), + :ok <- authorize_listener_read(state, [filter]), :ok <- EventPolicy.authorize_read( [filter], @@ -471,6 +495,9 @@ defmodule Parrhesia.Web.Connection do defp error_message_for_ingest_failure(:event_too_large), do: "invalid: event exceeds max event size" + defp error_message_for_ingest_failure(:listener_write_not_allowed), + do: "restricted: listener baseline denies event" + defp error_message_for_ingest_failure(:ephemeral_events_disabled), do: "blocked: ephemeral events are disabled" @@ -480,6 +507,7 @@ defmodule Parrhesia.Web.Connection do :pubkey_not_allowed, :restricted_giftwrap, :sync_write_not_allowed, + :listener_write_not_allowed, :protected_event_requires_auth, :protected_event_pubkey_mismatch, :pow_below_minimum, @@ -576,6 +604,7 @@ defmodule Parrhesia.Web.Connection do :pubkey_not_allowed, :restricted_giftwrap, :sync_read_not_allowed, + :listener_read_not_allowed, :marmot_group_h_tag_required, :marmot_group_h_values_exceeded, :marmot_group_filter_window_too_wide @@ -627,6 +656,9 @@ defmodule Parrhesia.Web.Connection do ], do: Filter.error_message(reason) + defp negentropy_policy_or_filter_error_message(:listener_read_not_allowed), + do: "restricted: listener baseline denies requested filters" + defp negentropy_policy_or_filter_error_message(reason), do: EventPolicy.error_message(reason) defp validate_auth_event(%__MODULE__{} = state, %{"kind" => 22_242} = auth_event) do @@ -706,6 +738,31 @@ defmodule Parrhesia.Web.Connection do defp auth_error_message(reason) when is_binary(reason), do: reason defp auth_error_message(reason), do: "invalid: #{inspect(reason)}" + defp authorize_listener_read(%__MODULE__{} = state, filters) do + case maybe_require_listener_auth(state) do + :ok -> Listener.authorize_read(state.listener, filters) + error -> error + end + end + + defp authorize_listener_write(%__MODULE__{} = state, event) do + case maybe_require_listener_auth(state) do + :ok -> Listener.authorize_write(state.listener, event) + error -> error + end + end + + defp maybe_require_listener_auth(%__MODULE__{ + listener: listener, + authenticated_pubkeys: pubkeys + }) do + if Listener.nip42_required?(listener) and MapSet.size(pubkeys) == 0 do + {:error, :auth_required} + else + :ok + end + end + defp with_auth_challenge_frame( %__MODULE__{auth_challenge: nil}, result @@ -1417,7 +1474,8 @@ defmodule Parrhesia.Web.Connection do authenticated_pubkeys: state.authenticated_pubkeys, caller: :websocket, remote_ip: state.remote_ip, - subscription_id: subscription_id + subscription_id: subscription_id, + metadata: %{listener_id: state.listener.id} } end diff --git a/lib/parrhesia/web/endpoint.ex b/lib/parrhesia/web/endpoint.ex index 5f0ee3b..210ff07 100644 --- a/lib/parrhesia/web/endpoint.ex +++ b/lib/parrhesia/web/endpoint.ex @@ -1,29 +1,27 @@ defmodule Parrhesia.Web.Endpoint do @moduledoc """ - Supervision entrypoint for WS/HTTP ingress. + Supervision entrypoint for configured ingress listeners. """ use Supervisor - def start_link(init_arg \\ []) do - Supervisor.start_link(__MODULE__, init_arg, name: __MODULE__) + alias Parrhesia.Web.Listener + + def start_link(_init_arg \\ []) do + Supervisor.start_link(__MODULE__, :ok, name: __MODULE__) end @impl true - def init(init_arg) do - children = [ - {Bandit, bandit_options(init_arg)} - ] + def init(:ok) do + children = + Listener.all() + |> Enum.map(fn listener -> + %{ + id: {:listener, listener.id}, + start: {Bandit, :start_link, [Listener.bandit_options(listener)]} + } + end) Supervisor.init(children, strategy: :one_for_one) end - - defp bandit_options(overrides) do - configured = Application.get_env(:parrhesia, __MODULE__, []) - - configured - |> Keyword.merge(overrides) - |> Keyword.put_new(:scheme, :http) - |> Keyword.put_new(:plug, Parrhesia.Web.Router) - end end diff --git a/lib/parrhesia/web/listener.ex b/lib/parrhesia/web/listener.ex new file mode 100644 index 0000000..1bc0e52 --- /dev/null +++ b/lib/parrhesia/web/listener.ex @@ -0,0 +1,627 @@ +defmodule Parrhesia.Web.Listener do + @moduledoc false + + import Bitwise + + alias Parrhesia.Protocol.Filter + + @private_cidrs [ + "127.0.0.0/8", + "10.0.0.0/8", + "172.16.0.0/12", + "192.168.0.0/16", + "169.254.0.0/16", + "::1/128", + "fc00::/7", + "fe80::/10" + ] + + @type t :: %{ + id: atom(), + enabled: boolean(), + bind: %{ip: tuple(), port: pos_integer()}, + transport: map(), + proxy: map(), + network: map(), + features: map(), + auth: map(), + baseline_acl: map(), + bandit_options: keyword() + } + + @spec all() :: [t()] + def all do + :parrhesia + |> Application.get_env(:listeners, %{}) + |> normalize_listeners() + |> Enum.filter(& &1.enabled) + end + + @spec from_opts(keyword() | map()) :: t() + def from_opts(opts) when is_list(opts) do + opts + |> Keyword.get(:listener, default_listener()) + |> normalize_listener() + end + + def from_opts(opts) when is_map(opts) do + opts + |> Map.get(:listener, default_listener()) + |> normalize_listener() + end + + @spec from_conn(Plug.Conn.t()) :: t() + def from_conn(conn) do + conn.private + |> Map.get(:parrhesia_listener, default_listener()) + |> normalize_listener() + end + + @spec put_conn(Plug.Conn.t(), keyword()) :: Plug.Conn.t() + def put_conn(conn, opts) when is_list(opts) do + Plug.Conn.put_private(conn, :parrhesia_listener, from_opts(opts)) + end + + @spec feature_enabled?(t(), atom()) :: boolean() + def feature_enabled?(listener, feature) when is_map(listener) and is_atom(feature) do + listener + |> Map.get(:features, %{}) + |> Map.get(feature, %{}) + |> Map.get(:enabled, false) + end + + @spec nip42_required?(t()) :: boolean() + def nip42_required?(listener), do: listener.auth.nip42_required + + @spec admin_auth_required?(t()) :: boolean() + def admin_auth_required?(listener), do: listener.auth.nip98_required_for_admin + + @spec trusted_proxies(t()) :: [String.t()] + def trusted_proxies(listener) do + listener.proxy.trusted_cidrs + end + + @spec remote_ip_allowed?(t(), tuple() | String.t() | nil) :: boolean() + def remote_ip_allowed?(listener, remote_ip) do + access_allowed?(listener.network, remote_ip) + end + + @spec metrics_allowed?(t(), Plug.Conn.t()) :: boolean() + def metrics_allowed?(listener, conn) do + metrics = Map.get(listener.features, :metrics, %{}) + + feature_enabled?(listener, :metrics) and + access_allowed?(Map.get(metrics, :access, %{}), conn.remote_ip) and + metrics_token_allowed?(metrics, conn) + end + + @spec relay_url(t(), Plug.Conn.t()) :: String.t() + def relay_url(listener, conn) do + scheme = listener.transport.scheme + ws_scheme = if scheme == :https, do: "wss", else: "ws" + + port_segment = + if default_http_port?(scheme, conn.port) do + "" + else + ":#{conn.port}" + end + + "#{ws_scheme}://#{conn.host}#{port_segment}#{conn.request_path}" + end + + @spec relay_auth_required?(t()) :: boolean() + def relay_auth_required?(listener), do: listener.auth.nip42_required + + @spec authorize_read(t(), [map()]) :: :ok | {:error, :listener_read_not_allowed} + def authorize_read(listener, filters) when is_list(filters) do + case evaluate_rules(listener.baseline_acl.read, filters, :read) do + :allow -> :ok + :deny -> {:error, :listener_read_not_allowed} + end + end + + @spec authorize_write(t(), map()) :: :ok | {:error, :listener_write_not_allowed} + def authorize_write(listener, event) when is_map(event) do + case evaluate_rules(listener.baseline_acl.write, event, :write) do + :allow -> :ok + :deny -> {:error, :listener_write_not_allowed} + end + end + + @spec bandit_options(t()) :: keyword() + def bandit_options(listener) do + [ + ip: listener.bind.ip, + port: listener.bind.port, + scheme: listener.transport.scheme, + plug: {Parrhesia.Web.ListenerPlug, listener: listener} + ] ++ listener.bandit_options + end + + defp normalize_listeners(listeners) when is_list(listeners) do + Enum.map(listeners, fn + {id, listener} when is_atom(id) and is_map(listener) -> + normalize_listener(Map.put(listener, :id, id)) + + listener when is_map(listener) -> + normalize_listener(listener) + end) + end + + defp normalize_listeners(listeners) when is_map(listeners) do + listeners + |> Enum.map(fn {id, listener} -> normalize_listener(Map.put(listener, :id, id)) end) + |> Enum.sort_by(& &1.id) + end + + defp normalize_listener(listener) when is_map(listener) do + id = normalize_atom(fetch_value(listener, :id), :listener) + enabled = normalize_boolean(fetch_value(listener, :enabled), true) + bind = normalize_bind(fetch_value(listener, :bind), listener) + transport = normalize_transport(fetch_value(listener, :transport)) + proxy = normalize_proxy(fetch_value(listener, :proxy)) + network = normalize_access(fetch_value(listener, :network), %{allow_all?: true}) + features = normalize_features(fetch_value(listener, :features)) + auth = normalize_auth(fetch_value(listener, :auth)) + baseline_acl = normalize_baseline_acl(fetch_value(listener, :baseline_acl)) + bandit_options = normalize_bandit_options(fetch_value(listener, :bandit_options)) + + %{ + id: id, + enabled: enabled, + bind: bind, + transport: transport, + proxy: proxy, + network: network, + features: features, + auth: auth, + baseline_acl: baseline_acl, + bandit_options: bandit_options + } + end + + defp normalize_listener(_listener), do: default_listener() + + defp normalize_bind(bind, listener) when is_map(bind) do + %{ + ip: normalize_ip(fetch_value(bind, :ip), default_bind_ip(listener)), + port: normalize_port(fetch_value(bind, :port), 4413) + } + end + + defp normalize_bind(_bind, listener) do + %{ + ip: default_bind_ip(listener), + port: normalize_port(fetch_value(listener, :port), 4413) + } + end + + defp default_bind_ip(listener) do + normalize_ip(fetch_value(listener, :ip), {0, 0, 0, 0}) + end + + defp normalize_transport(transport) when is_map(transport) do + %{ + scheme: normalize_scheme(fetch_value(transport, :scheme), :http), + tls: normalize_map(fetch_value(transport, :tls)) + } + end + + defp normalize_transport(_transport), do: %{scheme: :http, tls: %{}} + + defp normalize_proxy(proxy) when is_map(proxy) do + %{ + trusted_cidrs: normalize_string_list(fetch_value(proxy, :trusted_cidrs)), + honor_x_forwarded_for: normalize_boolean(fetch_value(proxy, :honor_x_forwarded_for), true) + } + end + + defp normalize_proxy(_proxy), do: %{trusted_cidrs: [], honor_x_forwarded_for: true} + + defp normalize_features(features) when is_map(features) do + %{ + nostr: normalize_simple_feature(fetch_value(features, :nostr), true), + admin: normalize_simple_feature(fetch_value(features, :admin), true), + metrics: normalize_metrics_feature(fetch_value(features, :metrics)) + } + end + + defp normalize_features(_features) do + %{ + nostr: %{enabled: true}, + admin: %{enabled: true}, + metrics: %{enabled: false, access: default_feature_access()} + } + end + + defp normalize_simple_feature(feature, default_enabled) when is_map(feature) do + %{enabled: normalize_boolean(fetch_value(feature, :enabled), default_enabled)} + end + + defp normalize_simple_feature(feature, _default_enabled) when is_boolean(feature) do + %{enabled: feature} + end + + defp normalize_simple_feature(_feature, default_enabled), do: %{enabled: default_enabled} + + defp normalize_metrics_feature(feature) when is_map(feature) do + %{ + enabled: normalize_boolean(fetch_value(feature, :enabled), false), + auth_token: normalize_optional_string(fetch_value(feature, :auth_token)), + access: + normalize_access(fetch_value(feature, :access), %{ + private_networks_only?: false, + allow_all?: true + }) + } + end + + defp normalize_metrics_feature(feature) when is_boolean(feature) do + %{enabled: feature, auth_token: nil, access: default_feature_access()} + end + + defp normalize_metrics_feature(_feature), + do: %{enabled: false, auth_token: nil, access: default_feature_access()} + + defp default_feature_access do + %{public?: false, private_networks_only?: false, allow_cidrs: [], allow_all?: true} + end + + defp normalize_auth(auth) when is_map(auth) do + %{ + nip42_required: normalize_boolean(fetch_value(auth, :nip42_required), false), + nip98_required_for_admin: + normalize_boolean(fetch_value(auth, :nip98_required_for_admin), true) + } + end + + defp normalize_auth(_auth), do: %{nip42_required: false, nip98_required_for_admin: true} + + defp normalize_baseline_acl(acl) when is_map(acl) do + %{ + read: normalize_baseline_rules(fetch_value(acl, :read)), + write: normalize_baseline_rules(fetch_value(acl, :write)) + } + end + + defp normalize_baseline_acl(_acl), do: %{read: [], write: []} + + defp normalize_baseline_rules(rules) when is_list(rules) do + Enum.flat_map(rules, fn + %{match: match} = rule when is_map(match) -> + [ + %{ + action: normalize_rule_action(fetch_value(rule, :action)), + match: normalize_filter_map(match) + } + ] + + _other -> + [] + end) + end + + defp normalize_baseline_rules(_rules), do: [] + + defp normalize_rule_action(:deny), do: :deny + defp normalize_rule_action("deny"), do: :deny + defp normalize_rule_action(_action), do: :allow + + defp normalize_bandit_options(options) when is_list(options), do: options + defp normalize_bandit_options(_options), do: [] + + defp normalize_access(access, defaults) when is_map(access) do + %{ + public?: + normalize_boolean( + first_present(access, [:public, :public?]), + Map.get(defaults, :public?, false) + ), + private_networks_only?: + normalize_boolean( + first_present(access, [:private_networks_only, :private_networks_only?]), + Map.get(defaults, :private_networks_only?, false) + ), + allow_cidrs: normalize_string_list(fetch_value(access, :allow_cidrs)), + allow_all?: + normalize_boolean( + first_present(access, [:allow_all, :allow_all?]), + Map.get(defaults, :allow_all?, false) + ) + } + end + + defp normalize_access(_access, defaults) do + %{ + public?: Map.get(defaults, :public?, false), + private_networks_only?: Map.get(defaults, :private_networks_only?, false), + allow_cidrs: [], + allow_all?: Map.get(defaults, :allow_all?, false) + } + end + + defp access_allowed?(%{public?: true}, _remote_ip), do: true + + defp access_allowed?(%{allow_cidrs: allow_cidrs}, remote_ip) when allow_cidrs != [] do + Enum.any?(allow_cidrs, &ip_in_cidr?(remote_ip, &1)) + end + + defp access_allowed?(%{private_networks_only?: true}, remote_ip) do + Enum.any?(@private_cidrs, &ip_in_cidr?(remote_ip, &1)) + end + + defp access_allowed?(%{allow_all?: allow_all?}, _remote_ip), do: allow_all? + + defp metrics_token_allowed?(metrics, conn) do + case metrics.auth_token do + nil -> + true + + token -> + conn + |> Plug.Conn.get_req_header("authorization") + |> List.first() + |> normalize_authorization_header() + |> Kernel.==(token) + end + end + + defp normalize_authorization_header("Bearer " <> token), do: token + defp normalize_authorization_header(token) when is_binary(token), do: token + defp normalize_authorization_header(_header), do: nil + + defp evaluate_rules([], _subject, _mode), do: :allow + + defp evaluate_rules(rules, subject, mode) do + has_allow_rules? = Enum.any?(rules, &(&1.action == :allow)) + + case Enum.find(rules, &rule_matches?(&1, subject, mode)) do + %{action: :deny} -> :deny + %{action: :allow} -> :allow + nil when has_allow_rules? -> :deny + nil -> :allow + end + end + + defp rule_matches?(rule, filters, :read) when is_list(filters) do + Enum.any?(filters, &filters_overlap?(&1, rule.match)) + end + + defp rule_matches?(rule, event, :write) when is_map(event) do + Filter.matches_filter?(event, rule.match) + end + + defp rule_matches?(_rule, _subject, _mode), do: false + + defp filters_overlap?(left, right) when is_map(left) and is_map(right) do + comparable_keys = + left + |> Map.keys() + |> Kernel.++(Map.keys(right)) + |> Enum.uniq() + |> Enum.reject(&(&1 in ["limit", "search", "since", "until"])) + + Enum.all?(comparable_keys, fn key -> + filter_constraint_compatible?(Map.get(left, key), Map.get(right, key)) + end) and filter_ranges_overlap?(left, right) + end + + defp filter_constraint_compatible?(nil, _right), do: true + defp filter_constraint_compatible?(_left, nil), do: true + + defp filter_constraint_compatible?(left, right) when is_list(left) and is_list(right) do + not MapSet.disjoint?(MapSet.new(left), MapSet.new(right)) + end + + defp filter_constraint_compatible?(left, right), do: left == right + + defp filter_ranges_overlap?(left, right) do + since = max(Map.get(left, "since", 0), Map.get(right, "since", 0)) + + until = + min( + Map.get(left, "until", 9_223_372_036_854_775_807), + Map.get(right, "until", 9_223_372_036_854_775_807) + ) + + since <= until + end + + defp default_listener do + case configured_default_listener() do + nil -> fallback_listener() + listener -> normalize_listener(listener) + end + end + + defp configured_default_listener do + listeners = Application.get_env(:parrhesia, :listeners, %{}) + + case fetch_public_listener(listeners) do + nil -> first_configured_listener(listeners) + listener -> listener + end + end + + defp fetch_public_listener(%{public: listener}) when is_map(listener), + do: Map.put_new(listener, :id, :public) + + defp fetch_public_listener(listeners) when is_list(listeners) do + case Keyword.fetch(listeners, :public) do + {:ok, listener} when is_map(listener) -> Map.put_new(listener, :id, :public) + _other -> nil + end + end + + defp fetch_public_listener(_listeners), do: nil + + defp first_configured_listener(listeners) when is_list(listeners) do + case listeners do + [{id, listener} | _rest] when is_atom(id) and is_map(listener) -> + Map.put_new(listener, :id, id) + + _other -> + nil + end + end + + defp first_configured_listener(listeners) when is_map(listeners) and map_size(listeners) > 0 do + {id, listener} = Enum.at(Enum.sort_by(listeners, fn {key, _value} -> key end), 0) + Map.put_new(listener, :id, id) + end + + defp first_configured_listener(_listeners), do: nil + + defp fallback_listener do + %{ + id: :public, + enabled: true, + bind: %{ip: {0, 0, 0, 0}, port: 4413}, + transport: %{scheme: :http, tls: %{}}, + proxy: %{trusted_cidrs: [], honor_x_forwarded_for: true}, + network: %{public?: false, private_networks_only?: false, allow_cidrs: [], allow_all?: true}, + features: %{ + nostr: %{enabled: true}, + admin: %{enabled: true}, + metrics: %{enabled: false, auth_token: nil, access: default_feature_access()} + }, + auth: %{nip42_required: false, nip98_required_for_admin: true}, + baseline_acl: %{read: [], write: []}, + bandit_options: [] + } + end + + defp fetch_value(map, key) when is_map(map) do + cond do + Map.has_key?(map, key) -> + Map.get(map, key) + + is_atom(key) and Map.has_key?(map, Atom.to_string(key)) -> + Map.get(map, Atom.to_string(key)) + + true -> + nil + end + end + + defp first_present(map, keys) do + Enum.find_value(keys, fn key -> + cond do + Map.has_key?(map, key) -> + {:present, Map.get(map, key)} + + is_atom(key) and Map.has_key?(map, Atom.to_string(key)) -> + {:present, Map.get(map, Atom.to_string(key))} + + true -> + nil + end + end) + |> case do + {:present, value} -> value + nil -> nil + end + end + + defp normalize_map(value) when is_map(value), do: value + defp normalize_map(_value), do: %{} + + defp normalize_boolean(value, _default) when is_boolean(value), do: value + defp normalize_boolean(nil, default), do: default + defp normalize_boolean(_value, default), do: default + + defp normalize_optional_string(value) when is_binary(value) and value != "", do: value + defp normalize_optional_string(_value), do: nil + + defp normalize_string_list(values) when is_list(values) do + Enum.filter(values, &(is_binary(&1) and &1 != "")) + end + + defp normalize_string_list(_values), do: [] + + defp normalize_ip({_, _, _, _} = ip, _default), do: ip + defp normalize_ip({_, _, _, _, _, _, _, _} = ip, _default), do: ip + defp normalize_ip(_ip, default), do: default + + defp normalize_port(port, _default) when is_integer(port) and port > 0, do: port + defp normalize_port(0, _default), do: 0 + defp normalize_port(_port, default), do: default + + defp normalize_scheme(:https, _default), do: :https + defp normalize_scheme("https", _default), do: :https + defp normalize_scheme(_scheme, default), do: default + + defp normalize_atom(value, _default) when is_atom(value), do: value + defp normalize_atom(_value, default), do: default + + defp normalize_filter_map(filter) when is_map(filter) do + Map.new(filter, fn + {key, value} when is_atom(key) -> {Atom.to_string(key), value} + {key, value} -> {key, value} + end) + end + + defp normalize_filter_map(filter), do: filter + + defp default_http_port?(:http, 80), do: true + defp default_http_port?(:https, 443), do: true + defp default_http_port?(_scheme, _port), do: false + + defp ip_in_cidr?(ip, cidr) do + with {network, prefix_len} <- parse_cidr(cidr), + {:ok, ip_size, ip_value} <- ip_to_int(ip), + {:ok, network_size, network_value} <- ip_to_int(network), + true <- ip_size == network_size, + true <- prefix_len >= 0, + true <- prefix_len <= ip_size do + mask = network_mask(ip_size, prefix_len) + (ip_value &&& mask) == (network_value &&& mask) + else + _other -> false + end + end + + defp parse_cidr(cidr) when is_binary(cidr) do + case String.split(cidr, "/", parts: 2) do + [address, prefix_str] -> + with {prefix_len, ""} <- Integer.parse(prefix_str), + {:ok, ip} <- :inet.parse_address(String.to_charlist(address)) do + {ip, prefix_len} + else + _other -> :error + end + + [address] -> + case :inet.parse_address(String.to_charlist(address)) do + {:ok, {_, _, _, _} = ip} -> {ip, 32} + {:ok, {_, _, _, _, _, _, _, _} = ip} -> {ip, 128} + _other -> :error + end + + _other -> + :error + end + end + + defp parse_cidr(_cidr), do: :error + + defp ip_to_int({a, b, c, d}) do + {:ok, 32, (a <<< 24) + (b <<< 16) + (c <<< 8) + d} + end + + defp ip_to_int({a, b, c, d, e, f, g, h}) do + {:ok, 128, + (a <<< 112) + (b <<< 96) + (c <<< 80) + (d <<< 64) + (e <<< 48) + (f <<< 32) + (g <<< 16) + + h} + end + + defp ip_to_int(_ip), do: :error + + defp network_mask(_size, 0), do: 0 + + defp network_mask(size, prefix_len) do + all_ones = (1 <<< size) - 1 + all_ones <<< (size - prefix_len) + end +end diff --git a/lib/parrhesia/web/listener_plug.ex b/lib/parrhesia/web/listener_plug.ex new file mode 100644 index 0000000..c6fd76a --- /dev/null +++ b/lib/parrhesia/web/listener_plug.ex @@ -0,0 +1,14 @@ +defmodule Parrhesia.Web.ListenerPlug do + @moduledoc false + + alias Parrhesia.Web.Listener + alias Parrhesia.Web.Router + + def init(opts), do: opts + + def call(conn, opts) do + conn + |> Listener.put_conn(opts) + |> Router.call([]) + end +end diff --git a/lib/parrhesia/web/management.ex b/lib/parrhesia/web/management.ex index efbe1d7..4f261b4 100644 --- a/lib/parrhesia/web/management.ex +++ b/lib/parrhesia/web/management.ex @@ -8,13 +8,15 @@ defmodule Parrhesia.Web.Management do alias Parrhesia.API.Admin alias Parrhesia.API.Auth - @spec handle(Plug.Conn.t()) :: Plug.Conn.t() - def handle(conn) do + @spec handle(Plug.Conn.t(), keyword()) :: Plug.Conn.t() + def handle(conn, opts \\ []) do full_url = full_request_url(conn) method = conn.method authorization = get_req_header(conn, "authorization") |> List.first() + auth_required? = admin_auth_required?(opts) - with {:ok, auth_context} <- Auth.validate_nip98(authorization, method, full_url), + with {:ok, auth_context} <- + maybe_validate_nip98(auth_required?, authorization, method, full_url), {:ok, payload} <- parse_payload(conn.body_params), {:ok, result} <- execute_method(payload), :ok <- append_audit_log(auth_context, payload, result) do @@ -46,6 +48,14 @@ defmodule Parrhesia.Web.Management do end end + defp maybe_validate_nip98(true, authorization, method, url) do + Auth.validate_nip98(authorization, method, url) + end + + defp maybe_validate_nip98(false, _authorization, _method, _url) do + {:ok, %{pubkey: nil}} + end + defp parse_payload(%{"method" => method} = payload) when is_binary(method) do params = Map.get(payload, "params", %{}) @@ -99,4 +109,13 @@ defmodule Parrhesia.Web.Management do "#{scheme}://#{host}#{port_suffix}#{conn.request_path}#{query_suffix}" end + + defp admin_auth_required?(opts) do + opts + |> Keyword.get(:listener) + |> case do + %{auth: %{nip98_required_for_admin: value}} -> value + _other -> true + end + end end diff --git a/lib/parrhesia/web/metrics.ex b/lib/parrhesia/web/metrics.ex index 8dd82cd..ff3b0cb 100644 --- a/lib/parrhesia/web/metrics.ex +++ b/lib/parrhesia/web/metrics.ex @@ -4,18 +4,13 @@ defmodule Parrhesia.Web.Metrics do import Plug.Conn alias Parrhesia.Telemetry - alias Parrhesia.Web.MetricsAccess - - @spec enabled_on_main_endpoint?() :: boolean() - def enabled_on_main_endpoint? do - :parrhesia - |> Application.get_env(:metrics, []) - |> Keyword.get(:enabled_on_main_endpoint, true) - end + alias Parrhesia.Web.Listener @spec handle(Plug.Conn.t()) :: Plug.Conn.t() def handle(conn) do - if MetricsAccess.allowed?(conn) do + listener = Listener.from_conn(conn) + + if Listener.metrics_allowed?(listener, conn) do body = TelemetryMetricsPrometheus.Core.scrape(Telemetry.prometheus_reporter()) conn diff --git a/lib/parrhesia/web/relay_info.ex b/lib/parrhesia/web/relay_info.ex index 79d2f82..fe92f47 100644 --- a/lib/parrhesia/web/relay_info.ex +++ b/lib/parrhesia/web/relay_info.ex @@ -4,9 +4,10 @@ defmodule Parrhesia.Web.RelayInfo do """ alias Parrhesia.API.Identity + alias Parrhesia.Web.Listener - @spec document() :: map() - def document do + @spec document(Listener.t()) :: map() + def document(listener) do %{ "name" => "Parrhesia", "description" => "Nostr/Marmot relay", @@ -14,7 +15,7 @@ defmodule Parrhesia.Web.RelayInfo do "supported_nips" => supported_nips(), "software" => "https://git.teralink.net/self/parrhesia", "version" => Application.spec(:parrhesia, :vsn) |> to_string(), - "limitation" => limitations() + "limitation" => limitations(listener) } end @@ -31,13 +32,13 @@ defmodule Parrhesia.Web.RelayInfo do with_negentropy ++ [86, 98] end - defp limitations do + defp limitations(listener) do %{ "max_message_length" => Parrhesia.Config.get([:limits, :max_frame_bytes], 1_048_576), "max_subscriptions" => Parrhesia.Config.get([:limits, :max_subscriptions_per_connection], 32), "max_filters" => Parrhesia.Config.get([:limits, :max_filters_per_req], 16), - "auth_required" => Parrhesia.Config.get([:policies, :auth_required_for_reads], false) + "auth_required" => Listener.relay_auth_required?(listener) } end diff --git a/lib/parrhesia/web/remote_ip.ex b/lib/parrhesia/web/remote_ip.ex index 13e999e..5264312 100644 --- a/lib/parrhesia/web/remote_ip.ex +++ b/lib/parrhesia/web/remote_ip.ex @@ -3,12 +3,14 @@ defmodule Parrhesia.Web.RemoteIp do import Bitwise + alias Parrhesia.Web.Listener + @spec init(term()) :: term() def init(opts), do: opts @spec call(Plug.Conn.t(), term()) :: Plug.Conn.t() def call(conn, _opts) do - if trusted_proxy?(conn.remote_ip) do + if trusted_proxy?(conn) do case forwarded_ip(conn) do nil -> conn forwarded_ip -> %{conn | remote_ip: forwarded_ip} @@ -50,14 +52,22 @@ defmodule Parrhesia.Web.RemoteIp do defp fallback_real_ip(ip, _conn), do: ip - defp trusted_proxy?(remote_ip) do - Enum.any?(trusted_proxies(), &ip_in_cidr?(remote_ip, &1)) + defp trusted_proxy?(conn) do + Enum.any?(trusted_proxies(conn), &ip_in_cidr?(conn.remote_ip, &1)) end - defp trusted_proxies do - :parrhesia - |> Application.get_env(:trusted_proxies, []) - |> Enum.filter(&is_binary/1) + defp trusted_proxies(conn) do + listener = Listener.from_conn(conn) + + case Listener.trusted_proxies(listener) do + [] -> + :parrhesia + |> Application.get_env(:trusted_proxies, []) + |> Enum.filter(&is_binary/1) + + trusted_proxies -> + trusted_proxies + end end defp parse_x_forwarded_for(value) when is_binary(value) do diff --git a/lib/parrhesia/web/router.ex b/lib/parrhesia/web/router.ex index 58834a3..9860044 100644 --- a/lib/parrhesia/web/router.ex +++ b/lib/parrhesia/web/router.ex @@ -4,11 +4,14 @@ defmodule Parrhesia.Web.Router do use Plug.Router alias Parrhesia.Policy.ConnectionPolicy + alias Parrhesia.Web.Listener alias Parrhesia.Web.Management alias Parrhesia.Web.Metrics alias Parrhesia.Web.Readiness alias Parrhesia.Web.RelayInfo + plug(:put_listener) + plug(Plug.Parsers, parsers: [:json], pass: ["application/json"], @@ -32,42 +35,63 @@ defmodule Parrhesia.Web.Router do end get "/metrics" do - if Metrics.enabled_on_main_endpoint?() do - Metrics.handle(conn) + listener = Listener.from_conn(conn) + + if Listener.feature_enabled?(listener, :metrics) do + case authorize_listener_request(conn, listener) do + :ok -> Metrics.handle(conn) + {:error, :forbidden} -> send_resp(conn, 403, "forbidden") + end else send_resp(conn, 404, "not found") end end post "/management" do - case ConnectionPolicy.authorize_remote_ip(conn.remote_ip) do - :ok -> Management.handle(conn) - {:error, :ip_blocked} -> send_resp(conn, 403, "forbidden") + listener = Listener.from_conn(conn) + + if Listener.feature_enabled?(listener, :admin) do + case authorize_listener_request(conn, listener) do + :ok -> Management.handle(conn, listener: listener) + {:error, :forbidden} -> send_resp(conn, 403, "forbidden") + end + else + send_resp(conn, 404, "not found") end end get "/relay" do - case ConnectionPolicy.authorize_remote_ip(conn.remote_ip) do - :ok -> - if accepts_nip11?(conn) do - body = JSON.encode!(RelayInfo.document()) + listener = Listener.from_conn(conn) - conn - |> put_resp_content_type("application/nostr+json") - |> send_resp(200, body) - else - conn - |> WebSockAdapter.upgrade( - Parrhesia.Web.Connection, - %{relay_url: relay_url(conn), remote_ip: remote_ip(conn)}, - timeout: 60_000, - max_frame_size: max_frame_bytes() - ) - |> halt() - end + if Listener.feature_enabled?(listener, :nostr) do + case authorize_listener_request(conn, listener) do + :ok -> + if accepts_nip11?(conn) do + body = JSON.encode!(RelayInfo.document(listener)) - {:error, :ip_blocked} -> - send_resp(conn, 403, "forbidden") + conn + |> put_resp_content_type("application/nostr+json") + |> send_resp(200, body) + else + conn + |> WebSockAdapter.upgrade( + Parrhesia.Web.Connection, + %{ + listener: listener, + relay_url: Listener.relay_url(listener, conn), + remote_ip: remote_ip(conn) + }, + timeout: 60_000, + max_frame_size: max_frame_bytes() + ) + |> halt() + end + + {:error, :forbidden} -> + send_resp(conn, 403, "forbidden") + end + else + send_resp(conn, 404, "not found") end end @@ -75,33 +99,37 @@ defmodule Parrhesia.Web.Router do send_resp(conn, 404, "not found") end + defp put_listener(conn, opts) do + case conn.private do + %{parrhesia_listener: _listener} -> conn + _other -> Listener.put_conn(conn, opts) + end + end + defp accepts_nip11?(conn) do conn |> get_req_header("accept") |> Enum.any?(&String.contains?(&1, "application/nostr+json")) end - defp relay_url(conn) do - ws_scheme = if conn.scheme == :https, do: "wss", else: "ws" - - port_segment = - if default_http_port?(conn.scheme, conn.port) do - "" - else - ":#{conn.port}" - end - - "#{ws_scheme}://#{conn.host}#{port_segment}#{conn.request_path}" - end - - defp default_http_port?(:http, 80), do: true - defp default_http_port?(:https, 443), do: true - defp default_http_port?(_scheme, _port), do: false - defp max_frame_bytes do Parrhesia.Config.get([:limits, :max_frame_bytes], 1_048_576) end + defp authorize_listener_request(conn, listener) do + with :ok <- authorize_remote_ip(conn), + true <- Listener.remote_ip_allowed?(listener, conn.remote_ip) do + :ok + else + {:error, :ip_blocked} -> {:error, :forbidden} + false -> {:error, :forbidden} + end + end + + defp authorize_remote_ip(conn) do + ConnectionPolicy.authorize_remote_ip(conn.remote_ip) + end + defp remote_ip(conn) do case conn.remote_ip do {_, _, _, _} = remote_ip -> :inet.ntoa(remote_ip) |> to_string() diff --git a/test/parrhesia/application_test.exs b/test/parrhesia/application_test.exs index bb77ee4..9834cf5 100644 --- a/test/parrhesia/application_test.exs +++ b/test/parrhesia/application_test.exs @@ -11,7 +11,6 @@ defmodule Parrhesia.ApplicationTest do assert is_pid(Process.whereis(Parrhesia.Sync.Supervisor)) assert is_pid(Process.whereis(Parrhesia.Policy.Supervisor)) assert is_pid(Process.whereis(Parrhesia.Web.Endpoint)) - assert is_pid(Process.whereis(Parrhesia.Web.MetricsEndpoint)) assert is_pid(Process.whereis(Parrhesia.Tasks.Supervisor)) assert Enum.any?(Supervisor.which_children(Parrhesia.Web.Endpoint), fn {_id, pid, _type, diff --git a/test/parrhesia/web/connection_test.exs b/test/parrhesia/web/connection_test.exs index 1667399..f02dfef 100644 --- a/test/parrhesia/web/connection_test.exs +++ b/test/parrhesia/web/connection_test.exs @@ -123,6 +123,69 @@ defmodule Parrhesia.Web.ConnectionTest do Enum.find(decoded, fn frame -> List.first(frame) == "OK" end) end + test "listener can require NIP-42 for reads and writes" do + listener = + listener(%{ + auth: %{nip42_required: true, nip98_required_for_admin: true} + }) + + state = connection_state(listener: listener) + + req_payload = JSON.encode!(["REQ", "sub-auth", %{"kinds" => [1]}]) + + assert {:push, frames, ^state} = Connection.handle_in({req_payload, [opcode: :text]}, state) + + assert Enum.map(frames, fn {:text, frame} -> JSON.decode!(frame) end) == [ + ["AUTH", state.auth_challenge], + ["CLOSED", "sub-auth", "auth-required: authentication required"] + ] + + event = valid_event(%{"content" => "auth required"}) + + assert {:push, event_frames, ^state} = + Connection.handle_in({JSON.encode!(["EVENT", event]), [opcode: :text]}, state) + + decoded = Enum.map(event_frames, fn {:text, frame} -> JSON.decode!(frame) end) + + assert ["AUTH", state.auth_challenge] in decoded + assert ["OK", event["id"], false, "auth-required: authentication required"] in decoded + end + + test "listener baseline ACL can deny read and write shapes before sync ACLs" do + listener = + listener(%{ + baseline_acl: %{ + read: [%{action: :deny, match: %{"kinds" => [5000]}}], + write: [%{action: :deny, match: %{"kinds" => [5000]}}] + } + }) + + state = connection_state(listener: listener) + + req_payload = JSON.encode!(["REQ", "sub-baseline", %{"kinds" => [5000]}]) + + assert {:push, req_frames, ^state} = + Connection.handle_in({req_payload, [opcode: :text]}, state) + + assert Enum.map(req_frames, fn {:text, frame} -> JSON.decode!(frame) end) == [ + ["AUTH", state.auth_challenge], + ["CLOSED", "sub-baseline", "restricted: listener baseline denies requested filters"] + ] + + event = + valid_event(%{"kind" => 5000, "content" => "baseline blocked"}) |> recalculate_event_id() + + assert {:push, {:text, response}, ^state} = + Connection.handle_in({JSON.encode!(["EVENT", event]), [opcode: :text]}, state) + + assert JSON.decode!(response) == [ + "OK", + event["id"], + false, + "restricted: listener baseline denies event" + ] + end + test "protected sync REQ requires matching ACL grant" do previous_acl = Application.get_env(:parrhesia, :acl, []) @@ -766,6 +829,27 @@ defmodule Parrhesia.Web.ConnectionTest do state end + defp listener(overrides) do + base = %{ + id: :test, + enabled: true, + bind: %{ip: {127, 0, 0, 1}, port: 4413}, + transport: %{scheme: :http, tls: %{mode: :disabled}}, + proxy: %{trusted_cidrs: [], honor_x_forwarded_for: true}, + network: %{allow_all: true}, + features: %{ + nostr: %{enabled: true}, + admin: %{enabled: true}, + metrics: %{enabled: false, access: %{allow_all: true}, auth_token: nil} + }, + auth: %{nip42_required: false, nip98_required_for_admin: true}, + baseline_acl: %{read: [], write: []}, + bandit_options: [] + } + + Map.merge(base, overrides) + end + defp live_event(id, kind) do %{ "id" => id, diff --git a/test/parrhesia/web/router_test.exs b/test/parrhesia/web/router_test.exs index 4783297..1a3e320 100644 --- a/test/parrhesia/web/router_test.exs +++ b/test/parrhesia/web/router_test.exs @@ -7,6 +7,7 @@ defmodule Parrhesia.Web.RouterTest do alias Ecto.Adapters.SQL.Sandbox alias Parrhesia.Protocol.EventValidator alias Parrhesia.Repo + alias Parrhesia.Web.Listener alias Parrhesia.Web.Router setup do @@ -44,7 +45,13 @@ defmodule Parrhesia.Web.RouterTest do end test "GET /metrics returns prometheus payload for private-network clients" do - conn = conn(:get, "/metrics") |> Router.call([]) + conn = + conn(:get, "/metrics") + |> route_conn( + listener(%{ + features: %{metrics: %{enabled: true, access: %{private_networks_only: true}}} + }) + ) assert conn.status == 200 assert get_resp_header(conn, "content-type") == ["text/plain; charset=utf-8"] @@ -53,6 +60,14 @@ defmodule Parrhesia.Web.RouterTest do test "GET /metrics denies public-network clients by default" do conn = conn(:get, "/metrics") conn = %{conn | remote_ip: {8, 8, 8, 8}} + + test_listener = + listener(%{features: %{metrics: %{enabled: true, access: %{private_networks_only: true}}}}) + + conn = Listener.put_conn(conn, listener: test_listener) + + refute Listener.metrics_allowed?(Listener.from_conn(conn), conn) + conn = Router.call(conn, []) assert conn.status == 403 @@ -60,47 +75,34 @@ defmodule Parrhesia.Web.RouterTest do end test "GET /metrics can be disabled on the main endpoint" do - previous_metrics = Application.get_env(:parrhesia, :metrics, []) - - Application.put_env( - :parrhesia, - :metrics, - Keyword.put(previous_metrics, :enabled_on_main_endpoint, false) - ) - - on_exit(fn -> - Application.put_env(:parrhesia, :metrics, previous_metrics) - end) - - conn = conn(:get, "/metrics") |> Router.call([]) + conn = + conn(:get, "/metrics") + |> route_conn(listener(%{features: %{metrics: %{enabled: false}}})) assert conn.status == 404 assert conn.resp_body == "not found" end test "GET /metrics accepts bearer auth when configured" do - previous_metrics = Application.get_env(:parrhesia, :metrics, []) + test_listener = + listener(%{ + features: %{ + metrics: %{ + enabled: true, + access: %{private_networks_only: false}, + auth_token: "secret-token" + } + } + }) - Application.put_env( - :parrhesia, - :metrics, - previous_metrics - |> Keyword.put(:private_networks_only, false) - |> Keyword.put(:auth_token, "secret-token") - ) - - on_exit(fn -> - Application.put_env(:parrhesia, :metrics, previous_metrics) - end) - - denied_conn = conn(:get, "/metrics") |> Router.call([]) + denied_conn = conn(:get, "/metrics") |> route_conn(test_listener) assert denied_conn.status == 403 allowed_conn = conn(:get, "/metrics") |> put_req_header("authorization", "Bearer secret-token") - |> Router.call([]) + |> route_conn(test_listener) assert allowed_conn.status == 200 end @@ -247,6 +249,15 @@ defmodule Parrhesia.Web.RouterTest do assert byte_size(pubkey) == 64 end + test "POST /management returns not found when admin feature is disabled on the listener" do + conn = + conn(:post, "/management", JSON.encode!(%{"method" => "ping", "params" => %{}})) + |> put_req_header("content-type", "application/json") + |> route_conn(listener(%{features: %{admin: %{enabled: false}}})) + + assert conn.status == 404 + end + defp nip98_event(method, url) do now = System.system_time(:second) @@ -261,4 +272,41 @@ defmodule Parrhesia.Web.RouterTest do Map.put(base, "id", EventValidator.compute_id(base)) end + + defp listener(overrides) do + deep_merge( + %{ + id: :test, + enabled: true, + bind: %{ip: {127, 0, 0, 1}, port: 4413}, + transport: %{scheme: :http, tls: %{mode: :disabled}}, + proxy: %{trusted_cidrs: [], honor_x_forwarded_for: true}, + network: %{allow_all: true}, + features: %{ + nostr: %{enabled: true}, + admin: %{enabled: true}, + metrics: %{enabled: true, access: %{private_networks_only: true}, auth_token: nil} + }, + auth: %{nip42_required: false, nip98_required_for_admin: true}, + baseline_acl: %{read: [], write: []} + }, + overrides + ) + end + + defp deep_merge(left, right) when is_map(left) and is_map(right) do + Map.merge(left, right, fn _key, left_value, right_value -> + if is_map(left_value) and is_map(right_value) do + deep_merge(left_value, right_value) + else + right_value + end + end) + end + + defp route_conn(conn, listener) do + conn + |> Listener.put_conn(listener: listener) + |> Router.call([]) + end end