From dce473662f46af4175491c44b2b1ec3060a25b0a Mon Sep 17 00:00:00 2001 From: Steffen Beyer Date: Wed, 18 Mar 2026 16:46:32 +0100 Subject: [PATCH] Lock signature verification and add per-IP ingest limits --- README.md | 7 +- config/config.exs | 3 + config/runtime.exs | 19 +- lib/parrhesia/protocol/event_validator.ex | 8 +- lib/parrhesia/runtime.ex | 1 + lib/parrhesia/web/connection.ex | 33 +++- lib/parrhesia/web/ip_event_ingest_limiter.ex | 169 ++++++++++++++++++ test/parrhesia/application_test.exs | 1 + test/parrhesia/config_test.exs | 3 + test/parrhesia/tasks/nip66_publisher_test.exs | 2 +- test/parrhesia/web/connection_test.exs | 44 +++++ .../web/ip_event_ingest_limiter_test.exs | 26 +++ test/support/integration_case.ex | 5 + test/test_helper.exs | 19 ++ 14 files changed, 332 insertions(+), 8 deletions(-) create mode 100644 lib/parrhesia/web/ip_event_ingest_limiter.ex create mode 100644 test/parrhesia/web/ip_event_ingest_limiter_test.exs diff --git a/README.md b/README.md index 7448508..63e0437 100644 --- a/README.md +++ b/README.md @@ -164,7 +164,6 @@ Examples: ```bash export PARRHESIA_POLICIES_AUTH_REQUIRED_FOR_WRITES=true -export PARRHESIA_FEATURES_VERIFY_EVENT_SIGNATURES=true export PARRHESIA_METRICS_ALLOWED_CIDRS="10.0.0.0/8,192.168.0.0/16" export PARRHESIA_LIMITS_OUTBOUND_OVERFLOW_STRATEGY=drop_oldest ``` @@ -291,6 +290,8 @@ Parrhesia treats NIP-43 invite requests as synthetic relay output, not stored cl | `:max_filter_limit` | `PARRHESIA_LIMITS_MAX_FILTER_LIMIT` | `500` | | `:max_tags_per_event` | `PARRHESIA_LIMITS_MAX_TAGS_PER_EVENT` | `256` | | `:max_tag_values_per_filter` | `PARRHESIA_LIMITS_MAX_TAG_VALUES_PER_FILTER` | `128` | +| `:ip_max_event_ingest_per_window` | `PARRHESIA_LIMITS_IP_MAX_EVENT_INGEST_PER_WINDOW` | `1000` | +| `:ip_event_ingest_window_seconds` | `PARRHESIA_LIMITS_IP_EVENT_INGEST_WINDOW_SECONDS` | `1` | | `:relay_max_event_ingest_per_window` | `PARRHESIA_LIMITS_RELAY_MAX_EVENT_INGEST_PER_WINDOW` | `10000` | | `:relay_event_ingest_window_seconds` | `PARRHESIA_LIMITS_RELAY_EVENT_INGEST_WINDOW_SECONDS` | `1` | | `:max_subscriptions_per_connection` | `PARRHESIA_LIMITS_MAX_SUBSCRIPTIONS_PER_CONNECTION` | `32` | @@ -359,12 +360,14 @@ Parrhesia treats NIP-43 invite requests as synthetic relay output, not stored cl | Atom key | ENV | Default | | --- | --- | --- | -| `:verify_event_signatures` | `PARRHESIA_FEATURES_VERIFY_EVENT_SIGNATURES` | `true` | +| `:verify_event_signatures` | `-` | `true` | | `:nip_45_count` | `PARRHESIA_FEATURES_NIP_45_COUNT` | `true` | | `:nip_50_search` | `PARRHESIA_FEATURES_NIP_50_SEARCH` | `true` | | `:nip_77_negentropy` | `PARRHESIA_FEATURES_NIP_77_NEGENTROPY` | `true` | | `:marmot_push_notifications` | `PARRHESIA_FEATURES_MARMOT_PUSH_NOTIFICATIONS` | `false` | +`:verify_event_signatures` is config-file only. Production releases always verify event signatures. + #### Extra runtime config | Atom key | ENV | Default | Notes | diff --git a/config/config.exs b/config/config.exs index 5ad85d4..79f1299 100644 --- a/config/config.exs +++ b/config/config.exs @@ -33,6 +33,8 @@ config :parrhesia, max_filter_limit: 500, max_tags_per_event: 256, max_tag_values_per_filter: 128, + ip_max_event_ingest_per_window: 1_000, + ip_event_ingest_window_seconds: 1, relay_max_event_ingest_per_window: 10_000, relay_event_ingest_window_seconds: 1, max_subscriptions_per_connection: 32, @@ -103,6 +105,7 @@ config :parrhesia, max_partitions_to_drop_per_run: 1 ], features: [ + verify_event_signatures_locked?: config_env() == :prod, verify_event_signatures: true, nip_45_count: true, nip_50_search: true, diff --git a/config/runtime.exs b/config/runtime.exs index 5af3ff8..9d9c800 100644 --- a/config/runtime.exs +++ b/config/runtime.exs @@ -184,6 +184,16 @@ if config_env() == :prod do "PARRHESIA_LIMITS_MAX_TAG_VALUES_PER_FILTER", Keyword.get(limits_defaults, :max_tag_values_per_filter, 128) ), + ip_max_event_ingest_per_window: + int_env.( + "PARRHESIA_LIMITS_IP_MAX_EVENT_INGEST_PER_WINDOW", + Keyword.get(limits_defaults, :ip_max_event_ingest_per_window, 1_000) + ), + ip_event_ingest_window_seconds: + int_env.( + "PARRHESIA_LIMITS_IP_EVENT_INGEST_WINDOW_SECONDS", + Keyword.get(limits_defaults, :ip_event_ingest_window_seconds, 1) + ), relay_max_event_ingest_per_window: int_env.( "PARRHESIA_LIMITS_RELAY_MAX_EVENT_INGEST_PER_WINDOW", @@ -583,11 +593,14 @@ if config_env() == :prod do ] features = [ + verify_event_signatures_locked?: + Keyword.get(features_defaults, :verify_event_signatures_locked?, false), verify_event_signatures: - bool_env.( - "PARRHESIA_FEATURES_VERIFY_EVENT_SIGNATURES", + if Keyword.get(features_defaults, :verify_event_signatures_locked?, false) do + true + else Keyword.get(features_defaults, :verify_event_signatures, true) - ), + end, nip_45_count: bool_env.( "PARRHESIA_FEATURES_NIP_45_COUNT", diff --git a/lib/parrhesia/protocol/event_validator.ex b/lib/parrhesia/protocol/event_validator.ex index 94c92df..287e888 100644 --- a/lib/parrhesia/protocol/event_validator.ex +++ b/lib/parrhesia/protocol/event_validator.ex @@ -8,6 +8,12 @@ defmodule Parrhesia.Protocol.EventValidator do @default_max_event_future_skew_seconds 900 @default_max_tags_per_event 256 @default_nip43_request_max_age_seconds 300 + @verify_event_signatures_locked Application.compile_env( + :parrhesia, + [:features, :verify_event_signatures_locked?], + false + ) + @supported_mls_ciphersuites MapSet.new(~w[0x0001 0x0002 0x0003 0x0004 0x0005 0x0006 0x0007]) @required_mls_extensions MapSet.new(["0xf2ee", "0x000a"]) @supported_keypackage_ref_sizes [32, 48, 64] @@ -254,7 +260,7 @@ defmodule Parrhesia.Protocol.EventValidator do end defp validate_signature(event) do - if verify_event_signatures?() do + if @verify_event_signatures_locked or verify_event_signatures?() do verify_signature(event) else :ok diff --git a/lib/parrhesia/runtime.ex b/lib/parrhesia/runtime.ex index ce220db..907d279 100644 --- a/lib/parrhesia/runtime.ex +++ b/lib/parrhesia/runtime.ex @@ -18,6 +18,7 @@ defmodule Parrhesia.Runtime do Parrhesia.Telemetry, Parrhesia.Config, Parrhesia.Web.EventIngestLimiter, + Parrhesia.Web.IPEventIngestLimiter, Parrhesia.Storage.Supervisor, Parrhesia.Subscriptions.Supervisor, Parrhesia.Auth.Supervisor, diff --git a/lib/parrhesia/web/connection.ex b/lib/parrhesia/web/connection.ex index 5fc025f..8d470d7 100644 --- a/lib/parrhesia/web/connection.ex +++ b/lib/parrhesia/web/connection.ex @@ -17,6 +17,7 @@ defmodule Parrhesia.Web.Connection do alias Parrhesia.Subscriptions.Index alias Parrhesia.Telemetry alias Parrhesia.Web.EventIngestLimiter + alias Parrhesia.Web.IPEventIngestLimiter alias Parrhesia.Web.Listener @default_max_subscriptions_per_connection 32 @@ -64,6 +65,7 @@ defmodule Parrhesia.Web.Connection do max_frame_bytes: @default_max_frame_bytes, max_event_bytes: @default_max_event_bytes, event_ingest_limiter: EventIngestLimiter, + remote_ip_event_ingest_limiter: IPEventIngestLimiter, max_event_ingest_per_window: @default_event_ingest_rate_limit, event_ingest_window_seconds: @default_event_ingest_window_seconds, event_ingest_window_started_at_ms: 0, @@ -99,6 +101,7 @@ defmodule Parrhesia.Web.Connection do max_frame_bytes: pos_integer(), max_event_bytes: pos_integer(), event_ingest_limiter: GenServer.server() | nil, + remote_ip_event_ingest_limiter: GenServer.server() | nil, max_event_ingest_per_window: pos_integer(), event_ingest_window_seconds: pos_integer(), event_ingest_window_started_at_ms: integer(), @@ -126,6 +129,7 @@ defmodule Parrhesia.Web.Connection do max_frame_bytes: max_frame_bytes(opts), max_event_bytes: max_event_bytes(opts), event_ingest_limiter: event_ingest_limiter(opts), + remote_ip_event_ingest_limiter: remote_ip_event_ingest_limiter(opts), max_event_ingest_per_window: max_event_ingest_per_window(opts), event_ingest_window_seconds: event_ingest_window_seconds(opts), event_ingest_window_started_at_ms: System.monotonic_time(:millisecond), @@ -289,7 +293,12 @@ defmodule Parrhesia.Web.Connection do end defp maybe_publish_ingested_event(next_state, state, event, event_id) do - with :ok <- maybe_allow_relay_event_ingest(next_state.event_ingest_limiter), + with :ok <- + maybe_allow_remote_ip_event_ingest( + next_state.remote_ip, + next_state.remote_ip_event_ingest_limiter + ), + :ok <- maybe_allow_relay_event_ingest(next_state.event_ingest_limiter), :ok <- authorize_listener_write(next_state, event) do publish_event_response(next_state, event) else @@ -574,6 +583,9 @@ defmodule Parrhesia.Web.Connection do defp error_message_for_ingest_failure(:event_rate_limited), do: "rate-limited: too many EVENT messages" + defp error_message_for_ingest_failure(:ip_event_rate_limited), + do: "rate-limited: too many EVENT messages from this IP" + defp error_message_for_ingest_failure(:relay_event_rate_limited), do: "rate-limited: relay-wide EVENT ingress exceeded" @@ -1571,6 +1583,16 @@ defmodule Parrhesia.Web.Connection do defp event_ingest_limiter(_opts), do: EventIngestLimiter + defp remote_ip_event_ingest_limiter(opts) when is_list(opts) do + Keyword.get(opts, :remote_ip_event_ingest_limiter, IPEventIngestLimiter) + end + + defp remote_ip_event_ingest_limiter(opts) when is_map(opts) do + Map.get(opts, :remote_ip_event_ingest_limiter, IPEventIngestLimiter) + end + + defp remote_ip_event_ingest_limiter(_opts), do: IPEventIngestLimiter + defp event_ingest_window_seconds(opts) when is_list(opts) do opts |> Keyword.get(:event_ingest_window_seconds) @@ -1671,6 +1693,15 @@ defmodule Parrhesia.Web.Connection do end end + defp maybe_allow_remote_ip_event_ingest(_remote_ip, nil), do: :ok + + defp maybe_allow_remote_ip_event_ingest(remote_ip, server) do + IPEventIngestLimiter.allow(remote_ip, server) + catch + :exit, {:noproc, _details} -> :ok + :exit, {:normal, _details} -> :ok + end + defp maybe_allow_relay_event_ingest(nil), do: :ok defp maybe_allow_relay_event_ingest(server) do diff --git a/lib/parrhesia/web/ip_event_ingest_limiter.ex b/lib/parrhesia/web/ip_event_ingest_limiter.ex new file mode 100644 index 0000000..8997130 --- /dev/null +++ b/lib/parrhesia/web/ip_event_ingest_limiter.ex @@ -0,0 +1,169 @@ +defmodule Parrhesia.Web.IPEventIngestLimiter do + @moduledoc """ + Per-IP EVENT ingest rate limiting over a fixed time window. + """ + + use GenServer + + @default_max_events_per_window 1_000 + @default_window_seconds 1 + @named_table :parrhesia_ip_event_ingest_limiter + @config_key :config + + @spec start_link(keyword()) :: GenServer.on_start() + def start_link(opts \\ []) do + max_events_per_window = + normalize_positive_integer( + Keyword.get(opts, :max_events_per_window), + max_events_per_window() + ) + + window_ms = + normalize_positive_integer(Keyword.get(opts, :window_seconds), window_seconds()) * 1000 + + init_arg = %{ + max_events_per_window: max_events_per_window, + window_ms: window_ms, + named_table?: Keyword.get(opts, :name, __MODULE__) == __MODULE__ + } + + case Keyword.get(opts, :name, __MODULE__) do + nil -> GenServer.start_link(__MODULE__, init_arg) + name -> GenServer.start_link(__MODULE__, init_arg, name: name) + end + end + + @spec allow(tuple() | String.t() | nil, GenServer.server()) :: + :ok | {:error, :ip_event_rate_limited} + def allow(remote_ip, server \\ __MODULE__) + + def allow(remote_ip, __MODULE__) do + case normalize_remote_ip(remote_ip) do + nil -> + :ok + + normalized_remote_ip -> + case fetch_named_config() do + {:ok, max_events_per_window, window_ms} -> + allow_counter(@named_table, normalized_remote_ip, max_events_per_window, window_ms) + + :error -> + :ok + end + end + end + + def allow(remote_ip, server), do: GenServer.call(server, {:allow, remote_ip}) + + @impl true + def init(%{ + max_events_per_window: max_events_per_window, + window_ms: window_ms, + named_table?: named_table? + }) do + table = create_table(named_table?) + + true = :ets.insert(table, {@config_key, max_events_per_window, window_ms}) + + {:ok, + %{ + table: table, + max_events_per_window: max_events_per_window, + window_ms: window_ms + }} + end + + @impl true + def handle_call({:allow, remote_ip}, _from, state) do + reply = + case normalize_remote_ip(remote_ip) do + nil -> + :ok + + normalized_remote_ip -> + allow_counter( + state.table, + normalized_remote_ip, + state.max_events_per_window, + state.window_ms + ) + end + + {:reply, reply, state} + end + + defp normalize_positive_integer(value, _default) when is_integer(value) and value > 0, do: value + defp normalize_positive_integer(_value, default), do: default + + defp create_table(true) do + :ets.new(@named_table, [ + :named_table, + :set, + :public, + {:read_concurrency, true}, + {:write_concurrency, true} + ]) + end + + defp create_table(false) do + :ets.new(__MODULE__, [:set, :public, {:read_concurrency, true}, {:write_concurrency, true}]) + end + + defp fetch_named_config do + case :ets.lookup(@named_table, @config_key) do + [{@config_key, max_events_per_window, window_ms}] -> {:ok, max_events_per_window, window_ms} + _other -> :error + end + rescue + ArgumentError -> :error + end + + defp allow_counter(table, remote_ip, max_events_per_window, window_ms) do + window_id = System.monotonic_time(:millisecond) |> div(window_ms) + key = {:window, remote_ip, window_id} + + count = :ets.update_counter(table, key, {2, 1}, {key, 0}) + + if count == 1 do + prune_expired_windows(table, window_id) + end + + if count <= max_events_per_window do + :ok + else + {:error, :ip_event_rate_limited} + end + rescue + ArgumentError -> :ok + end + + defp prune_expired_windows(table, window_id) do + :ets.select_delete(table, [ + {{{:window, :"$1", :"$2"}, :_}, [{:<, :"$2", window_id}], [true]} + ]) + end + + defp normalize_remote_ip({_, _, _, _} = remote_ip), do: :inet.ntoa(remote_ip) |> to_string() + + defp normalize_remote_ip({_, _, _, _, _, _, _, _} = remote_ip), + do: :inet.ntoa(remote_ip) |> to_string() + + defp normalize_remote_ip(remote_ip) when is_binary(remote_ip) and remote_ip != "", do: remote_ip + defp normalize_remote_ip(_remote_ip), do: nil + + defp max_events_per_window do + case Application.get_env(:parrhesia, :limits, []) + |> Keyword.get(:ip_max_event_ingest_per_window) do + value when is_integer(value) and value > 0 -> value + _other -> @default_max_events_per_window + end + end + + defp window_seconds do + case Application.get_env(:parrhesia, :limits, []) + |> Keyword.get(:ip_event_ingest_window_seconds) do + value when is_integer(value) and value > 0 -> value + _other -> @default_window_seconds + end + end +end diff --git a/test/parrhesia/application_test.exs b/test/parrhesia/application_test.exs index a1b853b..6f79298 100644 --- a/test/parrhesia/application_test.exs +++ b/test/parrhesia/application_test.exs @@ -6,6 +6,7 @@ defmodule Parrhesia.ApplicationTest do assert is_pid(Process.whereis(Parrhesia.Telemetry)) assert is_pid(Process.whereis(Parrhesia.Config)) assert is_pid(Process.whereis(Parrhesia.Web.EventIngestLimiter)) + assert is_pid(Process.whereis(Parrhesia.Web.IPEventIngestLimiter)) assert is_pid(Process.whereis(Parrhesia.Storage.Supervisor)) assert is_pid(Process.whereis(Parrhesia.Subscriptions.Supervisor)) assert is_pid(Process.whereis(Parrhesia.Fanout.Dispatcher)) diff --git a/test/parrhesia/config_test.exs b/test/parrhesia/config_test.exs index 82333ff..93c41b3 100644 --- a/test/parrhesia/config_test.exs +++ b/test/parrhesia/config_test.exs @@ -10,6 +10,8 @@ defmodule Parrhesia.ConfigTest do assert Parrhesia.Config.get([:limits, :max_event_ingest_per_window]) == 120 assert Parrhesia.Config.get([:limits, :max_tags_per_event]) == 256 assert Parrhesia.Config.get([:limits, :max_tag_values_per_filter]) == 128 + assert Parrhesia.Config.get([:limits, :ip_max_event_ingest_per_window]) == 1_000 + assert Parrhesia.Config.get([:limits, :ip_event_ingest_window_seconds]) == 1 assert Parrhesia.Config.get([:limits, :relay_max_event_ingest_per_window]) == 10_000 assert Parrhesia.Config.get([:limits, :relay_event_ingest_window_seconds]) == 1 assert Parrhesia.Config.get([:limits, :event_ingest_window_seconds]) == 1 @@ -21,6 +23,7 @@ defmodule Parrhesia.ConfigTest do assert Parrhesia.Config.get([:policies, :marmot_media_max_imeta_tags_per_event]) == 8 assert Parrhesia.Config.get([:policies, :marmot_media_reject_mip04_v1]) == true assert Parrhesia.Config.get([:policies, :marmot_push_max_trigger_age_seconds]) == 120 + assert Parrhesia.Config.get([:features, :verify_event_signatures_locked?]) == false assert Parrhesia.Config.get([:features, :verify_event_signatures]) == false assert Parrhesia.Config.get([:features, :nip_50_search]) == true assert Parrhesia.Config.get([:features, :marmot_push_notifications]) == false diff --git a/test/parrhesia/tasks/nip66_publisher_test.exs b/test/parrhesia/tasks/nip66_publisher_test.exs index ac34cf1..d845dc7 100644 --- a/test/parrhesia/tasks/nip66_publisher_test.exs +++ b/test/parrhesia/tasks/nip66_publisher_test.exs @@ -1,5 +1,5 @@ defmodule Parrhesia.Tasks.Nip66PublisherTest do - use Parrhesia.IntegrationCase, async: false + use Parrhesia.IntegrationCase, async: false, sandbox: :shared alias Parrhesia.API.Events alias Parrhesia.API.Identity diff --git a/test/parrhesia/web/connection_test.exs b/test/parrhesia/web/connection_test.exs index 848933d..da4823a 100644 --- a/test/parrhesia/web/connection_test.exs +++ b/test/parrhesia/web/connection_test.exs @@ -225,6 +225,50 @@ defmodule Parrhesia.Web.ConnectionTest do ] end + test "EVENT applies per-IP ingress throttling across connections" do + limiter = + start_supervised!( + {Parrhesia.Web.IPEventIngestLimiter, + name: nil, max_events_per_window: 1, window_seconds: 60} + ) + + first_state = + connection_state( + remote_ip: "203.0.113.10", + remote_ip_event_ingest_limiter: limiter + ) + + second_state = + connection_state( + remote_ip: "203.0.113.10", + remote_ip_event_ingest_limiter: limiter + ) + + first_event = valid_event(%{"content" => "first from ip"}) |> recalculate_event_id() + second_event = valid_event(%{"content" => "second from ip"}) |> recalculate_event_id() + + assert {:push, {:text, first_response}, _next_state} = + Connection.handle_in( + {JSON.encode!(["EVENT", first_event]), [opcode: :text]}, + first_state + ) + + assert JSON.decode!(first_response) == ["OK", first_event["id"], true, "ok: event stored"] + + assert {:push, {:text, second_response}, ^second_state} = + Connection.handle_in( + {JSON.encode!(["EVENT", second_event]), [opcode: :text]}, + second_state + ) + + assert JSON.decode!(second_response) == [ + "OK", + second_event["id"], + false, + "rate-limited: too many EVENT messages from this IP" + ] + end + test "protected sync REQ requires matching ACL grant" do previous_acl = Application.get_env(:parrhesia, :acl, []) diff --git a/test/parrhesia/web/ip_event_ingest_limiter_test.exs b/test/parrhesia/web/ip_event_ingest_limiter_test.exs new file mode 100644 index 0000000..4c63de0 --- /dev/null +++ b/test/parrhesia/web/ip_event_ingest_limiter_test.exs @@ -0,0 +1,26 @@ +defmodule Parrhesia.Web.IPEventIngestLimiterTest do + use ExUnit.Case, async: true + + alias Parrhesia.Web.IPEventIngestLimiter + + test "allows events up to the configured per-IP window cap" do + limiter = + start_supervised!( + {IPEventIngestLimiter, name: nil, max_events_per_window: 2, window_seconds: 60} + ) + + assert :ok = IPEventIngestLimiter.allow("203.0.113.10", limiter) + assert :ok = IPEventIngestLimiter.allow("203.0.113.10", limiter) + assert {:error, :ip_event_rate_limited} = IPEventIngestLimiter.allow("203.0.113.10", limiter) + assert :ok = IPEventIngestLimiter.allow("203.0.113.11", limiter) + end + + test "allows events without a remote IP" do + limiter = + start_supervised!( + {IPEventIngestLimiter, name: nil, max_events_per_window: 1, window_seconds: 60} + ) + + assert :ok = IPEventIngestLimiter.allow(nil, limiter) + end +end diff --git a/test/support/integration_case.ex b/test/support/integration_case.ex index f3f96ad..153c118 100644 --- a/test/support/integration_case.ex +++ b/test/support/integration_case.ex @@ -49,6 +49,11 @@ defmodule Parrhesia.IntegrationCase do receive do :stop -> Sandbox.checkin(Repo) + + # Allow the pool to process the checkin before this process + # exits, so Postgrex does not see a dead client and log a + # spurious disconnect error. + Process.sleep(50) end end) diff --git a/test/test_helper.exs b/test/test_helper.exs index eaf59ee..5423d39 100644 --- a/test/test_helper.exs +++ b/test/test_helper.exs @@ -5,4 +5,23 @@ exclude_tags = [:nak_e2e] end +# Suppress Postgrex disconnect logs that fire during sandbox teardown. +# When a sandbox-owning process exits, the connection pool detects the +# dead client and logs an error asynchronously. This is expected cleanup +# noise, not a real failure — silence it so test output stays pristine. +:logger.add_primary_filter( + :suppress_sandbox_disconnect, + {fn + %{msg: {:string, chars}}, _extra -> + if :string.find(IO.chardata_to_string(chars), "(DBConnection.ConnectionError)") != :nomatch do + :stop + else + :ignore + end + + _event, _extra -> + :ignore + end, []} +) + ExUnit.start(exclude: exclude_tags)