Add relay-wide event ingest limiter

This commit is contained in:
2026-03-18 13:42:56 +01:00
parent 57fdb4ed85
commit 1fef184f50
9 changed files with 181 additions and 4 deletions

View File

@@ -17,6 +17,7 @@ defmodule Parrhesia.Runtime do
[
Parrhesia.Telemetry,
Parrhesia.Config,
Parrhesia.Web.EventIngestLimiter,
Parrhesia.Storage.Supervisor,
Parrhesia.Subscriptions.Supervisor,
Parrhesia.Auth.Supervisor,

View File

@@ -16,6 +16,7 @@ defmodule Parrhesia.Web.Connection do
alias Parrhesia.Protocol.Filter
alias Parrhesia.Subscriptions.Index
alias Parrhesia.Telemetry
alias Parrhesia.Web.EventIngestLimiter
alias Parrhesia.Web.Listener
@default_max_subscriptions_per_connection 32
@@ -62,6 +63,7 @@ defmodule Parrhesia.Web.Connection do
drain_scheduled?: false,
max_frame_bytes: @default_max_frame_bytes,
max_event_bytes: @default_max_event_bytes,
event_ingest_limiter: EventIngestLimiter,
max_event_ingest_per_window: @default_event_ingest_rate_limit,
event_ingest_window_seconds: @default_event_ingest_window_seconds,
event_ingest_window_started_at_ms: 0,
@@ -96,6 +98,7 @@ defmodule Parrhesia.Web.Connection do
drain_scheduled?: boolean(),
max_frame_bytes: pos_integer(),
max_event_bytes: pos_integer(),
event_ingest_limiter: GenServer.server() | nil,
max_event_ingest_per_window: pos_integer(),
event_ingest_window_seconds: pos_integer(),
event_ingest_window_started_at_ms: integer(),
@@ -122,6 +125,7 @@ defmodule Parrhesia.Web.Connection do
outbound_drain_batch_size: outbound_drain_batch_size(opts),
max_frame_bytes: max_frame_bytes(opts),
max_event_bytes: max_event_bytes(opts),
event_ingest_limiter: event_ingest_limiter(opts),
max_event_ingest_per_window: max_event_ingest_per_window(opts),
event_ingest_window_seconds: event_ingest_window_seconds(opts),
event_ingest_window_started_at_ms: System.monotonic_time(:millisecond),
@@ -277,16 +281,23 @@ defmodule Parrhesia.Web.Connection do
case maybe_allow_event_ingest(state) do
{:ok, next_state} ->
case authorize_listener_write(next_state, event) do
:ok -> publish_event_response(next_state, event)
{:error, reason} -> ingest_error_response(state, event_id, reason)
end
maybe_publish_ingested_event(next_state, state, event, event_id)
{:error, reason} ->
ingest_error_response(state, event_id, reason)
end
end
defp maybe_publish_ingested_event(next_state, state, event, event_id) do
with :ok <- maybe_allow_relay_event_ingest(next_state.event_ingest_limiter),
:ok <- authorize_listener_write(next_state, event) do
publish_event_response(next_state, event)
else
{:error, reason} ->
ingest_error_response(state, event_id, reason)
end
end
defp publish_event_response(%__MODULE__{} = state, event) do
case Events.publish(
event,
@@ -563,6 +574,9 @@ defmodule Parrhesia.Web.Connection do
defp error_message_for_ingest_failure(:event_rate_limited),
do: "rate-limited: too many EVENT messages"
defp error_message_for_ingest_failure(:relay_event_rate_limited),
do: "rate-limited: relay-wide EVENT ingress exceeded"
defp error_message_for_ingest_failure(:event_too_large),
do: "invalid: event exceeds max event size"
@@ -1547,6 +1561,16 @@ defmodule Parrhesia.Web.Connection do
|> Keyword.get(:max_event_ingest_per_window, @default_event_ingest_rate_limit)
end
defp event_ingest_limiter(opts) when is_list(opts) do
Keyword.get(opts, :event_ingest_limiter, EventIngestLimiter)
end
defp event_ingest_limiter(opts) when is_map(opts) do
Map.get(opts, :event_ingest_limiter, EventIngestLimiter)
end
defp event_ingest_limiter(_opts), do: EventIngestLimiter
defp event_ingest_window_seconds(opts) when is_list(opts) do
opts
|> Keyword.get(:event_ingest_window_seconds)
@@ -1646,4 +1670,13 @@ defmodule Parrhesia.Web.Connection do
{:error, :event_rate_limited}
end
end
defp maybe_allow_relay_event_ingest(nil), do: :ok
defp maybe_allow_relay_event_ingest(server) do
EventIngestLimiter.allow(server)
catch
:exit, {:noproc, _details} -> :ok
:exit, {:normal, _details} -> :ok
end
end

View File

@@ -0,0 +1,76 @@
defmodule Parrhesia.Web.EventIngestLimiter do
@moduledoc """
Relay-wide EVENT ingest rate limiting over a fixed time window.
"""
use GenServer
@default_max_events_per_window 10_000
@default_window_seconds 1
@spec start_link(keyword()) :: GenServer.on_start()
def start_link(opts \\ []) do
case Keyword.get(opts, :name, __MODULE__) do
nil -> GenServer.start_link(__MODULE__, opts)
name -> GenServer.start_link(__MODULE__, opts, name: name)
end
end
@spec allow(GenServer.server()) :: :ok | {:error, :relay_event_rate_limited}
def allow(server \\ __MODULE__) do
GenServer.call(server, :allow)
end
@impl true
def init(opts) do
{:ok,
%{
max_events_per_window:
normalize_positive_integer(
Keyword.get(opts, :max_events_per_window),
max_events_per_window()
),
window_ms:
normalize_positive_integer(Keyword.get(opts, :window_seconds), window_seconds()) * 1000,
window_started_at_ms: System.monotonic_time(:millisecond),
count: 0
}}
end
@impl true
def handle_call(:allow, _from, state) do
now_ms = System.monotonic_time(:millisecond)
cond do
now_ms - state.window_started_at_ms >= state.window_ms ->
next_state = %{state | window_started_at_ms: now_ms, count: 1}
{:reply, :ok, next_state}
state.count < state.max_events_per_window ->
next_state = %{state | count: state.count + 1}
{:reply, :ok, next_state}
true ->
{:reply, {:error, :relay_event_rate_limited}, state}
end
end
defp normalize_positive_integer(value, _default) when is_integer(value) and value > 0, do: value
defp normalize_positive_integer(_value, default), do: default
defp max_events_per_window do
case Application.get_env(:parrhesia, :limits, [])
|> Keyword.get(:relay_max_event_ingest_per_window) do
value when is_integer(value) and value > 0 -> value
_other -> @default_max_events_per_window
end
end
defp window_seconds do
case Application.get_env(:parrhesia, :limits, [])
|> Keyword.get(:relay_event_ingest_window_seconds) do
value when is_integer(value) and value > 0 -> value
_other -> @default_window_seconds
end
end
end