Lock signature verification and add per-IP ingest limits
This commit is contained in:
@@ -164,7 +164,6 @@ Examples:
|
|||||||
|
|
||||||
```bash
|
```bash
|
||||||
export PARRHESIA_POLICIES_AUTH_REQUIRED_FOR_WRITES=true
|
export PARRHESIA_POLICIES_AUTH_REQUIRED_FOR_WRITES=true
|
||||||
export PARRHESIA_FEATURES_VERIFY_EVENT_SIGNATURES=true
|
|
||||||
export PARRHESIA_METRICS_ALLOWED_CIDRS="10.0.0.0/8,192.168.0.0/16"
|
export PARRHESIA_METRICS_ALLOWED_CIDRS="10.0.0.0/8,192.168.0.0/16"
|
||||||
export PARRHESIA_LIMITS_OUTBOUND_OVERFLOW_STRATEGY=drop_oldest
|
export PARRHESIA_LIMITS_OUTBOUND_OVERFLOW_STRATEGY=drop_oldest
|
||||||
```
|
```
|
||||||
@@ -291,6 +290,8 @@ Parrhesia treats NIP-43 invite requests as synthetic relay output, not stored cl
|
|||||||
| `:max_filter_limit` | `PARRHESIA_LIMITS_MAX_FILTER_LIMIT` | `500` |
|
| `:max_filter_limit` | `PARRHESIA_LIMITS_MAX_FILTER_LIMIT` | `500` |
|
||||||
| `:max_tags_per_event` | `PARRHESIA_LIMITS_MAX_TAGS_PER_EVENT` | `256` |
|
| `:max_tags_per_event` | `PARRHESIA_LIMITS_MAX_TAGS_PER_EVENT` | `256` |
|
||||||
| `:max_tag_values_per_filter` | `PARRHESIA_LIMITS_MAX_TAG_VALUES_PER_FILTER` | `128` |
|
| `:max_tag_values_per_filter` | `PARRHESIA_LIMITS_MAX_TAG_VALUES_PER_FILTER` | `128` |
|
||||||
|
| `:ip_max_event_ingest_per_window` | `PARRHESIA_LIMITS_IP_MAX_EVENT_INGEST_PER_WINDOW` | `1000` |
|
||||||
|
| `:ip_event_ingest_window_seconds` | `PARRHESIA_LIMITS_IP_EVENT_INGEST_WINDOW_SECONDS` | `1` |
|
||||||
| `:relay_max_event_ingest_per_window` | `PARRHESIA_LIMITS_RELAY_MAX_EVENT_INGEST_PER_WINDOW` | `10000` |
|
| `:relay_max_event_ingest_per_window` | `PARRHESIA_LIMITS_RELAY_MAX_EVENT_INGEST_PER_WINDOW` | `10000` |
|
||||||
| `:relay_event_ingest_window_seconds` | `PARRHESIA_LIMITS_RELAY_EVENT_INGEST_WINDOW_SECONDS` | `1` |
|
| `:relay_event_ingest_window_seconds` | `PARRHESIA_LIMITS_RELAY_EVENT_INGEST_WINDOW_SECONDS` | `1` |
|
||||||
| `:max_subscriptions_per_connection` | `PARRHESIA_LIMITS_MAX_SUBSCRIPTIONS_PER_CONNECTION` | `32` |
|
| `:max_subscriptions_per_connection` | `PARRHESIA_LIMITS_MAX_SUBSCRIPTIONS_PER_CONNECTION` | `32` |
|
||||||
@@ -359,12 +360,14 @@ Parrhesia treats NIP-43 invite requests as synthetic relay output, not stored cl
|
|||||||
|
|
||||||
| Atom key | ENV | Default |
|
| Atom key | ENV | Default |
|
||||||
| --- | --- | --- |
|
| --- | --- | --- |
|
||||||
| `:verify_event_signatures` | `PARRHESIA_FEATURES_VERIFY_EVENT_SIGNATURES` | `true` |
|
| `:verify_event_signatures` | `-` | `true` |
|
||||||
| `:nip_45_count` | `PARRHESIA_FEATURES_NIP_45_COUNT` | `true` |
|
| `:nip_45_count` | `PARRHESIA_FEATURES_NIP_45_COUNT` | `true` |
|
||||||
| `:nip_50_search` | `PARRHESIA_FEATURES_NIP_50_SEARCH` | `true` |
|
| `:nip_50_search` | `PARRHESIA_FEATURES_NIP_50_SEARCH` | `true` |
|
||||||
| `:nip_77_negentropy` | `PARRHESIA_FEATURES_NIP_77_NEGENTROPY` | `true` |
|
| `:nip_77_negentropy` | `PARRHESIA_FEATURES_NIP_77_NEGENTROPY` | `true` |
|
||||||
| `:marmot_push_notifications` | `PARRHESIA_FEATURES_MARMOT_PUSH_NOTIFICATIONS` | `false` |
|
| `:marmot_push_notifications` | `PARRHESIA_FEATURES_MARMOT_PUSH_NOTIFICATIONS` | `false` |
|
||||||
|
|
||||||
|
`:verify_event_signatures` is config-file only. Production releases always verify event signatures.
|
||||||
|
|
||||||
#### Extra runtime config
|
#### Extra runtime config
|
||||||
|
|
||||||
| Atom key | ENV | Default | Notes |
|
| Atom key | ENV | Default | Notes |
|
||||||
|
|||||||
@@ -33,6 +33,8 @@ config :parrhesia,
|
|||||||
max_filter_limit: 500,
|
max_filter_limit: 500,
|
||||||
max_tags_per_event: 256,
|
max_tags_per_event: 256,
|
||||||
max_tag_values_per_filter: 128,
|
max_tag_values_per_filter: 128,
|
||||||
|
ip_max_event_ingest_per_window: 1_000,
|
||||||
|
ip_event_ingest_window_seconds: 1,
|
||||||
relay_max_event_ingest_per_window: 10_000,
|
relay_max_event_ingest_per_window: 10_000,
|
||||||
relay_event_ingest_window_seconds: 1,
|
relay_event_ingest_window_seconds: 1,
|
||||||
max_subscriptions_per_connection: 32,
|
max_subscriptions_per_connection: 32,
|
||||||
@@ -103,6 +105,7 @@ config :parrhesia,
|
|||||||
max_partitions_to_drop_per_run: 1
|
max_partitions_to_drop_per_run: 1
|
||||||
],
|
],
|
||||||
features: [
|
features: [
|
||||||
|
verify_event_signatures_locked?: config_env() == :prod,
|
||||||
verify_event_signatures: true,
|
verify_event_signatures: true,
|
||||||
nip_45_count: true,
|
nip_45_count: true,
|
||||||
nip_50_search: true,
|
nip_50_search: true,
|
||||||
|
|||||||
@@ -184,6 +184,16 @@ if config_env() == :prod do
|
|||||||
"PARRHESIA_LIMITS_MAX_TAG_VALUES_PER_FILTER",
|
"PARRHESIA_LIMITS_MAX_TAG_VALUES_PER_FILTER",
|
||||||
Keyword.get(limits_defaults, :max_tag_values_per_filter, 128)
|
Keyword.get(limits_defaults, :max_tag_values_per_filter, 128)
|
||||||
),
|
),
|
||||||
|
ip_max_event_ingest_per_window:
|
||||||
|
int_env.(
|
||||||
|
"PARRHESIA_LIMITS_IP_MAX_EVENT_INGEST_PER_WINDOW",
|
||||||
|
Keyword.get(limits_defaults, :ip_max_event_ingest_per_window, 1_000)
|
||||||
|
),
|
||||||
|
ip_event_ingest_window_seconds:
|
||||||
|
int_env.(
|
||||||
|
"PARRHESIA_LIMITS_IP_EVENT_INGEST_WINDOW_SECONDS",
|
||||||
|
Keyword.get(limits_defaults, :ip_event_ingest_window_seconds, 1)
|
||||||
|
),
|
||||||
relay_max_event_ingest_per_window:
|
relay_max_event_ingest_per_window:
|
||||||
int_env.(
|
int_env.(
|
||||||
"PARRHESIA_LIMITS_RELAY_MAX_EVENT_INGEST_PER_WINDOW",
|
"PARRHESIA_LIMITS_RELAY_MAX_EVENT_INGEST_PER_WINDOW",
|
||||||
@@ -583,11 +593,14 @@ if config_env() == :prod do
|
|||||||
]
|
]
|
||||||
|
|
||||||
features = [
|
features = [
|
||||||
|
verify_event_signatures_locked?:
|
||||||
|
Keyword.get(features_defaults, :verify_event_signatures_locked?, false),
|
||||||
verify_event_signatures:
|
verify_event_signatures:
|
||||||
bool_env.(
|
if Keyword.get(features_defaults, :verify_event_signatures_locked?, false) do
|
||||||
"PARRHESIA_FEATURES_VERIFY_EVENT_SIGNATURES",
|
true
|
||||||
|
else
|
||||||
Keyword.get(features_defaults, :verify_event_signatures, true)
|
Keyword.get(features_defaults, :verify_event_signatures, true)
|
||||||
),
|
end,
|
||||||
nip_45_count:
|
nip_45_count:
|
||||||
bool_env.(
|
bool_env.(
|
||||||
"PARRHESIA_FEATURES_NIP_45_COUNT",
|
"PARRHESIA_FEATURES_NIP_45_COUNT",
|
||||||
|
|||||||
@@ -8,6 +8,12 @@ defmodule Parrhesia.Protocol.EventValidator do
|
|||||||
@default_max_event_future_skew_seconds 900
|
@default_max_event_future_skew_seconds 900
|
||||||
@default_max_tags_per_event 256
|
@default_max_tags_per_event 256
|
||||||
@default_nip43_request_max_age_seconds 300
|
@default_nip43_request_max_age_seconds 300
|
||||||
|
@verify_event_signatures_locked Application.compile_env(
|
||||||
|
:parrhesia,
|
||||||
|
[:features, :verify_event_signatures_locked?],
|
||||||
|
false
|
||||||
|
)
|
||||||
|
|
||||||
@supported_mls_ciphersuites MapSet.new(~w[0x0001 0x0002 0x0003 0x0004 0x0005 0x0006 0x0007])
|
@supported_mls_ciphersuites MapSet.new(~w[0x0001 0x0002 0x0003 0x0004 0x0005 0x0006 0x0007])
|
||||||
@required_mls_extensions MapSet.new(["0xf2ee", "0x000a"])
|
@required_mls_extensions MapSet.new(["0xf2ee", "0x000a"])
|
||||||
@supported_keypackage_ref_sizes [32, 48, 64]
|
@supported_keypackage_ref_sizes [32, 48, 64]
|
||||||
@@ -254,7 +260,7 @@ defmodule Parrhesia.Protocol.EventValidator do
|
|||||||
end
|
end
|
||||||
|
|
||||||
defp validate_signature(event) do
|
defp validate_signature(event) do
|
||||||
if verify_event_signatures?() do
|
if @verify_event_signatures_locked or verify_event_signatures?() do
|
||||||
verify_signature(event)
|
verify_signature(event)
|
||||||
else
|
else
|
||||||
:ok
|
:ok
|
||||||
|
|||||||
@@ -18,6 +18,7 @@ defmodule Parrhesia.Runtime do
|
|||||||
Parrhesia.Telemetry,
|
Parrhesia.Telemetry,
|
||||||
Parrhesia.Config,
|
Parrhesia.Config,
|
||||||
Parrhesia.Web.EventIngestLimiter,
|
Parrhesia.Web.EventIngestLimiter,
|
||||||
|
Parrhesia.Web.IPEventIngestLimiter,
|
||||||
Parrhesia.Storage.Supervisor,
|
Parrhesia.Storage.Supervisor,
|
||||||
Parrhesia.Subscriptions.Supervisor,
|
Parrhesia.Subscriptions.Supervisor,
|
||||||
Parrhesia.Auth.Supervisor,
|
Parrhesia.Auth.Supervisor,
|
||||||
|
|||||||
@@ -17,6 +17,7 @@ defmodule Parrhesia.Web.Connection do
|
|||||||
alias Parrhesia.Subscriptions.Index
|
alias Parrhesia.Subscriptions.Index
|
||||||
alias Parrhesia.Telemetry
|
alias Parrhesia.Telemetry
|
||||||
alias Parrhesia.Web.EventIngestLimiter
|
alias Parrhesia.Web.EventIngestLimiter
|
||||||
|
alias Parrhesia.Web.IPEventIngestLimiter
|
||||||
alias Parrhesia.Web.Listener
|
alias Parrhesia.Web.Listener
|
||||||
|
|
||||||
@default_max_subscriptions_per_connection 32
|
@default_max_subscriptions_per_connection 32
|
||||||
@@ -64,6 +65,7 @@ defmodule Parrhesia.Web.Connection do
|
|||||||
max_frame_bytes: @default_max_frame_bytes,
|
max_frame_bytes: @default_max_frame_bytes,
|
||||||
max_event_bytes: @default_max_event_bytes,
|
max_event_bytes: @default_max_event_bytes,
|
||||||
event_ingest_limiter: EventIngestLimiter,
|
event_ingest_limiter: EventIngestLimiter,
|
||||||
|
remote_ip_event_ingest_limiter: IPEventIngestLimiter,
|
||||||
max_event_ingest_per_window: @default_event_ingest_rate_limit,
|
max_event_ingest_per_window: @default_event_ingest_rate_limit,
|
||||||
event_ingest_window_seconds: @default_event_ingest_window_seconds,
|
event_ingest_window_seconds: @default_event_ingest_window_seconds,
|
||||||
event_ingest_window_started_at_ms: 0,
|
event_ingest_window_started_at_ms: 0,
|
||||||
@@ -99,6 +101,7 @@ defmodule Parrhesia.Web.Connection do
|
|||||||
max_frame_bytes: pos_integer(),
|
max_frame_bytes: pos_integer(),
|
||||||
max_event_bytes: pos_integer(),
|
max_event_bytes: pos_integer(),
|
||||||
event_ingest_limiter: GenServer.server() | nil,
|
event_ingest_limiter: GenServer.server() | nil,
|
||||||
|
remote_ip_event_ingest_limiter: GenServer.server() | nil,
|
||||||
max_event_ingest_per_window: pos_integer(),
|
max_event_ingest_per_window: pos_integer(),
|
||||||
event_ingest_window_seconds: pos_integer(),
|
event_ingest_window_seconds: pos_integer(),
|
||||||
event_ingest_window_started_at_ms: integer(),
|
event_ingest_window_started_at_ms: integer(),
|
||||||
@@ -126,6 +129,7 @@ defmodule Parrhesia.Web.Connection do
|
|||||||
max_frame_bytes: max_frame_bytes(opts),
|
max_frame_bytes: max_frame_bytes(opts),
|
||||||
max_event_bytes: max_event_bytes(opts),
|
max_event_bytes: max_event_bytes(opts),
|
||||||
event_ingest_limiter: event_ingest_limiter(opts),
|
event_ingest_limiter: event_ingest_limiter(opts),
|
||||||
|
remote_ip_event_ingest_limiter: remote_ip_event_ingest_limiter(opts),
|
||||||
max_event_ingest_per_window: max_event_ingest_per_window(opts),
|
max_event_ingest_per_window: max_event_ingest_per_window(opts),
|
||||||
event_ingest_window_seconds: event_ingest_window_seconds(opts),
|
event_ingest_window_seconds: event_ingest_window_seconds(opts),
|
||||||
event_ingest_window_started_at_ms: System.monotonic_time(:millisecond),
|
event_ingest_window_started_at_ms: System.monotonic_time(:millisecond),
|
||||||
@@ -289,7 +293,12 @@ defmodule Parrhesia.Web.Connection do
|
|||||||
end
|
end
|
||||||
|
|
||||||
defp maybe_publish_ingested_event(next_state, state, event, event_id) do
|
defp maybe_publish_ingested_event(next_state, state, event, event_id) do
|
||||||
with :ok <- maybe_allow_relay_event_ingest(next_state.event_ingest_limiter),
|
with :ok <-
|
||||||
|
maybe_allow_remote_ip_event_ingest(
|
||||||
|
next_state.remote_ip,
|
||||||
|
next_state.remote_ip_event_ingest_limiter
|
||||||
|
),
|
||||||
|
:ok <- maybe_allow_relay_event_ingest(next_state.event_ingest_limiter),
|
||||||
:ok <- authorize_listener_write(next_state, event) do
|
:ok <- authorize_listener_write(next_state, event) do
|
||||||
publish_event_response(next_state, event)
|
publish_event_response(next_state, event)
|
||||||
else
|
else
|
||||||
@@ -574,6 +583,9 @@ defmodule Parrhesia.Web.Connection do
|
|||||||
defp error_message_for_ingest_failure(:event_rate_limited),
|
defp error_message_for_ingest_failure(:event_rate_limited),
|
||||||
do: "rate-limited: too many EVENT messages"
|
do: "rate-limited: too many EVENT messages"
|
||||||
|
|
||||||
|
defp error_message_for_ingest_failure(:ip_event_rate_limited),
|
||||||
|
do: "rate-limited: too many EVENT messages from this IP"
|
||||||
|
|
||||||
defp error_message_for_ingest_failure(:relay_event_rate_limited),
|
defp error_message_for_ingest_failure(:relay_event_rate_limited),
|
||||||
do: "rate-limited: relay-wide EVENT ingress exceeded"
|
do: "rate-limited: relay-wide EVENT ingress exceeded"
|
||||||
|
|
||||||
@@ -1571,6 +1583,16 @@ defmodule Parrhesia.Web.Connection do
|
|||||||
|
|
||||||
defp event_ingest_limiter(_opts), do: EventIngestLimiter
|
defp event_ingest_limiter(_opts), do: EventIngestLimiter
|
||||||
|
|
||||||
|
defp remote_ip_event_ingest_limiter(opts) when is_list(opts) do
|
||||||
|
Keyword.get(opts, :remote_ip_event_ingest_limiter, IPEventIngestLimiter)
|
||||||
|
end
|
||||||
|
|
||||||
|
defp remote_ip_event_ingest_limiter(opts) when is_map(opts) do
|
||||||
|
Map.get(opts, :remote_ip_event_ingest_limiter, IPEventIngestLimiter)
|
||||||
|
end
|
||||||
|
|
||||||
|
defp remote_ip_event_ingest_limiter(_opts), do: IPEventIngestLimiter
|
||||||
|
|
||||||
defp event_ingest_window_seconds(opts) when is_list(opts) do
|
defp event_ingest_window_seconds(opts) when is_list(opts) do
|
||||||
opts
|
opts
|
||||||
|> Keyword.get(:event_ingest_window_seconds)
|
|> Keyword.get(:event_ingest_window_seconds)
|
||||||
@@ -1671,6 +1693,15 @@ defmodule Parrhesia.Web.Connection do
|
|||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
defp maybe_allow_remote_ip_event_ingest(_remote_ip, nil), do: :ok
|
||||||
|
|
||||||
|
defp maybe_allow_remote_ip_event_ingest(remote_ip, server) do
|
||||||
|
IPEventIngestLimiter.allow(remote_ip, server)
|
||||||
|
catch
|
||||||
|
:exit, {:noproc, _details} -> :ok
|
||||||
|
:exit, {:normal, _details} -> :ok
|
||||||
|
end
|
||||||
|
|
||||||
defp maybe_allow_relay_event_ingest(nil), do: :ok
|
defp maybe_allow_relay_event_ingest(nil), do: :ok
|
||||||
|
|
||||||
defp maybe_allow_relay_event_ingest(server) do
|
defp maybe_allow_relay_event_ingest(server) do
|
||||||
|
|||||||
169
lib/parrhesia/web/ip_event_ingest_limiter.ex
Normal file
169
lib/parrhesia/web/ip_event_ingest_limiter.ex
Normal file
@@ -0,0 +1,169 @@
|
|||||||
|
defmodule Parrhesia.Web.IPEventIngestLimiter do
|
||||||
|
@moduledoc """
|
||||||
|
Per-IP EVENT ingest rate limiting over a fixed time window.
|
||||||
|
"""
|
||||||
|
|
||||||
|
use GenServer
|
||||||
|
|
||||||
|
@default_max_events_per_window 1_000
|
||||||
|
@default_window_seconds 1
|
||||||
|
@named_table :parrhesia_ip_event_ingest_limiter
|
||||||
|
@config_key :config
|
||||||
|
|
||||||
|
@spec start_link(keyword()) :: GenServer.on_start()
|
||||||
|
def start_link(opts \\ []) do
|
||||||
|
max_events_per_window =
|
||||||
|
normalize_positive_integer(
|
||||||
|
Keyword.get(opts, :max_events_per_window),
|
||||||
|
max_events_per_window()
|
||||||
|
)
|
||||||
|
|
||||||
|
window_ms =
|
||||||
|
normalize_positive_integer(Keyword.get(opts, :window_seconds), window_seconds()) * 1000
|
||||||
|
|
||||||
|
init_arg = %{
|
||||||
|
max_events_per_window: max_events_per_window,
|
||||||
|
window_ms: window_ms,
|
||||||
|
named_table?: Keyword.get(opts, :name, __MODULE__) == __MODULE__
|
||||||
|
}
|
||||||
|
|
||||||
|
case Keyword.get(opts, :name, __MODULE__) do
|
||||||
|
nil -> GenServer.start_link(__MODULE__, init_arg)
|
||||||
|
name -> GenServer.start_link(__MODULE__, init_arg, name: name)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
@spec allow(tuple() | String.t() | nil, GenServer.server()) ::
|
||||||
|
:ok | {:error, :ip_event_rate_limited}
|
||||||
|
def allow(remote_ip, server \\ __MODULE__)
|
||||||
|
|
||||||
|
def allow(remote_ip, __MODULE__) do
|
||||||
|
case normalize_remote_ip(remote_ip) do
|
||||||
|
nil ->
|
||||||
|
:ok
|
||||||
|
|
||||||
|
normalized_remote_ip ->
|
||||||
|
case fetch_named_config() do
|
||||||
|
{:ok, max_events_per_window, window_ms} ->
|
||||||
|
allow_counter(@named_table, normalized_remote_ip, max_events_per_window, window_ms)
|
||||||
|
|
||||||
|
:error ->
|
||||||
|
:ok
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def allow(remote_ip, server), do: GenServer.call(server, {:allow, remote_ip})
|
||||||
|
|
||||||
|
@impl true
|
||||||
|
def init(%{
|
||||||
|
max_events_per_window: max_events_per_window,
|
||||||
|
window_ms: window_ms,
|
||||||
|
named_table?: named_table?
|
||||||
|
}) do
|
||||||
|
table = create_table(named_table?)
|
||||||
|
|
||||||
|
true = :ets.insert(table, {@config_key, max_events_per_window, window_ms})
|
||||||
|
|
||||||
|
{:ok,
|
||||||
|
%{
|
||||||
|
table: table,
|
||||||
|
max_events_per_window: max_events_per_window,
|
||||||
|
window_ms: window_ms
|
||||||
|
}}
|
||||||
|
end
|
||||||
|
|
||||||
|
@impl true
|
||||||
|
def handle_call({:allow, remote_ip}, _from, state) do
|
||||||
|
reply =
|
||||||
|
case normalize_remote_ip(remote_ip) do
|
||||||
|
nil ->
|
||||||
|
:ok
|
||||||
|
|
||||||
|
normalized_remote_ip ->
|
||||||
|
allow_counter(
|
||||||
|
state.table,
|
||||||
|
normalized_remote_ip,
|
||||||
|
state.max_events_per_window,
|
||||||
|
state.window_ms
|
||||||
|
)
|
||||||
|
end
|
||||||
|
|
||||||
|
{:reply, reply, state}
|
||||||
|
end
|
||||||
|
|
||||||
|
defp normalize_positive_integer(value, _default) when is_integer(value) and value > 0, do: value
|
||||||
|
defp normalize_positive_integer(_value, default), do: default
|
||||||
|
|
||||||
|
defp create_table(true) do
|
||||||
|
:ets.new(@named_table, [
|
||||||
|
:named_table,
|
||||||
|
:set,
|
||||||
|
:public,
|
||||||
|
{:read_concurrency, true},
|
||||||
|
{:write_concurrency, true}
|
||||||
|
])
|
||||||
|
end
|
||||||
|
|
||||||
|
defp create_table(false) do
|
||||||
|
:ets.new(__MODULE__, [:set, :public, {:read_concurrency, true}, {:write_concurrency, true}])
|
||||||
|
end
|
||||||
|
|
||||||
|
defp fetch_named_config do
|
||||||
|
case :ets.lookup(@named_table, @config_key) do
|
||||||
|
[{@config_key, max_events_per_window, window_ms}] -> {:ok, max_events_per_window, window_ms}
|
||||||
|
_other -> :error
|
||||||
|
end
|
||||||
|
rescue
|
||||||
|
ArgumentError -> :error
|
||||||
|
end
|
||||||
|
|
||||||
|
defp allow_counter(table, remote_ip, max_events_per_window, window_ms) do
|
||||||
|
window_id = System.monotonic_time(:millisecond) |> div(window_ms)
|
||||||
|
key = {:window, remote_ip, window_id}
|
||||||
|
|
||||||
|
count = :ets.update_counter(table, key, {2, 1}, {key, 0})
|
||||||
|
|
||||||
|
if count == 1 do
|
||||||
|
prune_expired_windows(table, window_id)
|
||||||
|
end
|
||||||
|
|
||||||
|
if count <= max_events_per_window do
|
||||||
|
:ok
|
||||||
|
else
|
||||||
|
{:error, :ip_event_rate_limited}
|
||||||
|
end
|
||||||
|
rescue
|
||||||
|
ArgumentError -> :ok
|
||||||
|
end
|
||||||
|
|
||||||
|
defp prune_expired_windows(table, window_id) do
|
||||||
|
:ets.select_delete(table, [
|
||||||
|
{{{:window, :"$1", :"$2"}, :_}, [{:<, :"$2", window_id}], [true]}
|
||||||
|
])
|
||||||
|
end
|
||||||
|
|
||||||
|
defp normalize_remote_ip({_, _, _, _} = remote_ip), do: :inet.ntoa(remote_ip) |> to_string()
|
||||||
|
|
||||||
|
defp normalize_remote_ip({_, _, _, _, _, _, _, _} = remote_ip),
|
||||||
|
do: :inet.ntoa(remote_ip) |> to_string()
|
||||||
|
|
||||||
|
defp normalize_remote_ip(remote_ip) when is_binary(remote_ip) and remote_ip != "", do: remote_ip
|
||||||
|
defp normalize_remote_ip(_remote_ip), do: nil
|
||||||
|
|
||||||
|
defp max_events_per_window do
|
||||||
|
case Application.get_env(:parrhesia, :limits, [])
|
||||||
|
|> Keyword.get(:ip_max_event_ingest_per_window) do
|
||||||
|
value when is_integer(value) and value > 0 -> value
|
||||||
|
_other -> @default_max_events_per_window
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
defp window_seconds do
|
||||||
|
case Application.get_env(:parrhesia, :limits, [])
|
||||||
|
|> Keyword.get(:ip_event_ingest_window_seconds) do
|
||||||
|
value when is_integer(value) and value > 0 -> value
|
||||||
|
_other -> @default_window_seconds
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
@@ -6,6 +6,7 @@ defmodule Parrhesia.ApplicationTest do
|
|||||||
assert is_pid(Process.whereis(Parrhesia.Telemetry))
|
assert is_pid(Process.whereis(Parrhesia.Telemetry))
|
||||||
assert is_pid(Process.whereis(Parrhesia.Config))
|
assert is_pid(Process.whereis(Parrhesia.Config))
|
||||||
assert is_pid(Process.whereis(Parrhesia.Web.EventIngestLimiter))
|
assert is_pid(Process.whereis(Parrhesia.Web.EventIngestLimiter))
|
||||||
|
assert is_pid(Process.whereis(Parrhesia.Web.IPEventIngestLimiter))
|
||||||
assert is_pid(Process.whereis(Parrhesia.Storage.Supervisor))
|
assert is_pid(Process.whereis(Parrhesia.Storage.Supervisor))
|
||||||
assert is_pid(Process.whereis(Parrhesia.Subscriptions.Supervisor))
|
assert is_pid(Process.whereis(Parrhesia.Subscriptions.Supervisor))
|
||||||
assert is_pid(Process.whereis(Parrhesia.Fanout.Dispatcher))
|
assert is_pid(Process.whereis(Parrhesia.Fanout.Dispatcher))
|
||||||
|
|||||||
@@ -10,6 +10,8 @@ defmodule Parrhesia.ConfigTest do
|
|||||||
assert Parrhesia.Config.get([:limits, :max_event_ingest_per_window]) == 120
|
assert Parrhesia.Config.get([:limits, :max_event_ingest_per_window]) == 120
|
||||||
assert Parrhesia.Config.get([:limits, :max_tags_per_event]) == 256
|
assert Parrhesia.Config.get([:limits, :max_tags_per_event]) == 256
|
||||||
assert Parrhesia.Config.get([:limits, :max_tag_values_per_filter]) == 128
|
assert Parrhesia.Config.get([:limits, :max_tag_values_per_filter]) == 128
|
||||||
|
assert Parrhesia.Config.get([:limits, :ip_max_event_ingest_per_window]) == 1_000
|
||||||
|
assert Parrhesia.Config.get([:limits, :ip_event_ingest_window_seconds]) == 1
|
||||||
assert Parrhesia.Config.get([:limits, :relay_max_event_ingest_per_window]) == 10_000
|
assert Parrhesia.Config.get([:limits, :relay_max_event_ingest_per_window]) == 10_000
|
||||||
assert Parrhesia.Config.get([:limits, :relay_event_ingest_window_seconds]) == 1
|
assert Parrhesia.Config.get([:limits, :relay_event_ingest_window_seconds]) == 1
|
||||||
assert Parrhesia.Config.get([:limits, :event_ingest_window_seconds]) == 1
|
assert Parrhesia.Config.get([:limits, :event_ingest_window_seconds]) == 1
|
||||||
@@ -21,6 +23,7 @@ defmodule Parrhesia.ConfigTest do
|
|||||||
assert Parrhesia.Config.get([:policies, :marmot_media_max_imeta_tags_per_event]) == 8
|
assert Parrhesia.Config.get([:policies, :marmot_media_max_imeta_tags_per_event]) == 8
|
||||||
assert Parrhesia.Config.get([:policies, :marmot_media_reject_mip04_v1]) == true
|
assert Parrhesia.Config.get([:policies, :marmot_media_reject_mip04_v1]) == true
|
||||||
assert Parrhesia.Config.get([:policies, :marmot_push_max_trigger_age_seconds]) == 120
|
assert Parrhesia.Config.get([:policies, :marmot_push_max_trigger_age_seconds]) == 120
|
||||||
|
assert Parrhesia.Config.get([:features, :verify_event_signatures_locked?]) == false
|
||||||
assert Parrhesia.Config.get([:features, :verify_event_signatures]) == false
|
assert Parrhesia.Config.get([:features, :verify_event_signatures]) == false
|
||||||
assert Parrhesia.Config.get([:features, :nip_50_search]) == true
|
assert Parrhesia.Config.get([:features, :nip_50_search]) == true
|
||||||
assert Parrhesia.Config.get([:features, :marmot_push_notifications]) == false
|
assert Parrhesia.Config.get([:features, :marmot_push_notifications]) == false
|
||||||
|
|||||||
@@ -1,5 +1,5 @@
|
|||||||
defmodule Parrhesia.Tasks.Nip66PublisherTest do
|
defmodule Parrhesia.Tasks.Nip66PublisherTest do
|
||||||
use Parrhesia.IntegrationCase, async: false
|
use Parrhesia.IntegrationCase, async: false, sandbox: :shared
|
||||||
|
|
||||||
alias Parrhesia.API.Events
|
alias Parrhesia.API.Events
|
||||||
alias Parrhesia.API.Identity
|
alias Parrhesia.API.Identity
|
||||||
|
|||||||
@@ -225,6 +225,50 @@ defmodule Parrhesia.Web.ConnectionTest do
|
|||||||
]
|
]
|
||||||
end
|
end
|
||||||
|
|
||||||
|
test "EVENT applies per-IP ingress throttling across connections" do
|
||||||
|
limiter =
|
||||||
|
start_supervised!(
|
||||||
|
{Parrhesia.Web.IPEventIngestLimiter,
|
||||||
|
name: nil, max_events_per_window: 1, window_seconds: 60}
|
||||||
|
)
|
||||||
|
|
||||||
|
first_state =
|
||||||
|
connection_state(
|
||||||
|
remote_ip: "203.0.113.10",
|
||||||
|
remote_ip_event_ingest_limiter: limiter
|
||||||
|
)
|
||||||
|
|
||||||
|
second_state =
|
||||||
|
connection_state(
|
||||||
|
remote_ip: "203.0.113.10",
|
||||||
|
remote_ip_event_ingest_limiter: limiter
|
||||||
|
)
|
||||||
|
|
||||||
|
first_event = valid_event(%{"content" => "first from ip"}) |> recalculate_event_id()
|
||||||
|
second_event = valid_event(%{"content" => "second from ip"}) |> recalculate_event_id()
|
||||||
|
|
||||||
|
assert {:push, {:text, first_response}, _next_state} =
|
||||||
|
Connection.handle_in(
|
||||||
|
{JSON.encode!(["EVENT", first_event]), [opcode: :text]},
|
||||||
|
first_state
|
||||||
|
)
|
||||||
|
|
||||||
|
assert JSON.decode!(first_response) == ["OK", first_event["id"], true, "ok: event stored"]
|
||||||
|
|
||||||
|
assert {:push, {:text, second_response}, ^second_state} =
|
||||||
|
Connection.handle_in(
|
||||||
|
{JSON.encode!(["EVENT", second_event]), [opcode: :text]},
|
||||||
|
second_state
|
||||||
|
)
|
||||||
|
|
||||||
|
assert JSON.decode!(second_response) == [
|
||||||
|
"OK",
|
||||||
|
second_event["id"],
|
||||||
|
false,
|
||||||
|
"rate-limited: too many EVENT messages from this IP"
|
||||||
|
]
|
||||||
|
end
|
||||||
|
|
||||||
test "protected sync REQ requires matching ACL grant" do
|
test "protected sync REQ requires matching ACL grant" do
|
||||||
previous_acl = Application.get_env(:parrhesia, :acl, [])
|
previous_acl = Application.get_env(:parrhesia, :acl, [])
|
||||||
|
|
||||||
|
|||||||
26
test/parrhesia/web/ip_event_ingest_limiter_test.exs
Normal file
26
test/parrhesia/web/ip_event_ingest_limiter_test.exs
Normal file
@@ -0,0 +1,26 @@
|
|||||||
|
defmodule Parrhesia.Web.IPEventIngestLimiterTest do
|
||||||
|
use ExUnit.Case, async: true
|
||||||
|
|
||||||
|
alias Parrhesia.Web.IPEventIngestLimiter
|
||||||
|
|
||||||
|
test "allows events up to the configured per-IP window cap" do
|
||||||
|
limiter =
|
||||||
|
start_supervised!(
|
||||||
|
{IPEventIngestLimiter, name: nil, max_events_per_window: 2, window_seconds: 60}
|
||||||
|
)
|
||||||
|
|
||||||
|
assert :ok = IPEventIngestLimiter.allow("203.0.113.10", limiter)
|
||||||
|
assert :ok = IPEventIngestLimiter.allow("203.0.113.10", limiter)
|
||||||
|
assert {:error, :ip_event_rate_limited} = IPEventIngestLimiter.allow("203.0.113.10", limiter)
|
||||||
|
assert :ok = IPEventIngestLimiter.allow("203.0.113.11", limiter)
|
||||||
|
end
|
||||||
|
|
||||||
|
test "allows events without a remote IP" do
|
||||||
|
limiter =
|
||||||
|
start_supervised!(
|
||||||
|
{IPEventIngestLimiter, name: nil, max_events_per_window: 1, window_seconds: 60}
|
||||||
|
)
|
||||||
|
|
||||||
|
assert :ok = IPEventIngestLimiter.allow(nil, limiter)
|
||||||
|
end
|
||||||
|
end
|
||||||
@@ -49,6 +49,11 @@ defmodule Parrhesia.IntegrationCase do
|
|||||||
receive do
|
receive do
|
||||||
:stop ->
|
:stop ->
|
||||||
Sandbox.checkin(Repo)
|
Sandbox.checkin(Repo)
|
||||||
|
|
||||||
|
# Allow the pool to process the checkin before this process
|
||||||
|
# exits, so Postgrex does not see a dead client and log a
|
||||||
|
# spurious disconnect error.
|
||||||
|
Process.sleep(50)
|
||||||
end
|
end
|
||||||
end)
|
end)
|
||||||
|
|
||||||
|
|||||||
@@ -5,4 +5,23 @@ exclude_tags =
|
|||||||
[:nak_e2e]
|
[:nak_e2e]
|
||||||
end
|
end
|
||||||
|
|
||||||
|
# Suppress Postgrex disconnect logs that fire during sandbox teardown.
|
||||||
|
# When a sandbox-owning process exits, the connection pool detects the
|
||||||
|
# dead client and logs an error asynchronously. This is expected cleanup
|
||||||
|
# noise, not a real failure — silence it so test output stays pristine.
|
||||||
|
:logger.add_primary_filter(
|
||||||
|
:suppress_sandbox_disconnect,
|
||||||
|
{fn
|
||||||
|
%{msg: {:string, chars}}, _extra ->
|
||||||
|
if :string.find(IO.chardata_to_string(chars), "(DBConnection.ConnectionError)") != :nomatch do
|
||||||
|
:stop
|
||||||
|
else
|
||||||
|
:ignore
|
||||||
|
end
|
||||||
|
|
||||||
|
_event, _extra ->
|
||||||
|
:ignore
|
||||||
|
end, []}
|
||||||
|
)
|
||||||
|
|
||||||
ExUnit.start(exclude: exclude_tags)
|
ExUnit.start(exclude: exclude_tags)
|
||||||
|
|||||||
Reference in New Issue
Block a user