diff --git a/config/config.exs b/config/config.exs index 3540a88..69e3395 100644 --- a/config/config.exs +++ b/config/config.exs @@ -20,6 +20,8 @@ config :parrhesia, max_filter_limit: 500, max_tags_per_event: 256, max_tag_values_per_filter: 128, + relay_max_event_ingest_per_window: 10_000, + relay_event_ingest_window_seconds: 1, max_subscriptions_per_connection: 32, max_event_future_skew_seconds: 900, max_event_ingest_per_window: 120, diff --git a/config/runtime.exs b/config/runtime.exs index dbbf002..a4d7669 100644 --- a/config/runtime.exs +++ b/config/runtime.exs @@ -184,6 +184,16 @@ if config_env() == :prod do "PARRHESIA_LIMITS_MAX_TAG_VALUES_PER_FILTER", Keyword.get(limits_defaults, :max_tag_values_per_filter, 128) ), + relay_max_event_ingest_per_window: + int_env.( + "PARRHESIA_LIMITS_RELAY_MAX_EVENT_INGEST_PER_WINDOW", + Keyword.get(limits_defaults, :relay_max_event_ingest_per_window, 10_000) + ), + relay_event_ingest_window_seconds: + int_env.( + "PARRHESIA_LIMITS_RELAY_EVENT_INGEST_WINDOW_SECONDS", + Keyword.get(limits_defaults, :relay_event_ingest_window_seconds, 1) + ), max_subscriptions_per_connection: int_env.( "PARRHESIA_LIMITS_MAX_SUBSCRIPTIONS_PER_CONNECTION", diff --git a/lib/parrhesia/runtime.ex b/lib/parrhesia/runtime.ex index 77bd2f4..ce220db 100644 --- a/lib/parrhesia/runtime.ex +++ b/lib/parrhesia/runtime.ex @@ -17,6 +17,7 @@ defmodule Parrhesia.Runtime do [ Parrhesia.Telemetry, Parrhesia.Config, + Parrhesia.Web.EventIngestLimiter, Parrhesia.Storage.Supervisor, Parrhesia.Subscriptions.Supervisor, Parrhesia.Auth.Supervisor, diff --git a/lib/parrhesia/web/connection.ex b/lib/parrhesia/web/connection.ex index 586ba05..5fc025f 100644 --- a/lib/parrhesia/web/connection.ex +++ b/lib/parrhesia/web/connection.ex @@ -16,6 +16,7 @@ defmodule Parrhesia.Web.Connection do alias Parrhesia.Protocol.Filter alias Parrhesia.Subscriptions.Index alias Parrhesia.Telemetry + alias Parrhesia.Web.EventIngestLimiter alias Parrhesia.Web.Listener @default_max_subscriptions_per_connection 32 @@ -62,6 +63,7 @@ defmodule Parrhesia.Web.Connection do drain_scheduled?: false, max_frame_bytes: @default_max_frame_bytes, max_event_bytes: @default_max_event_bytes, + event_ingest_limiter: EventIngestLimiter, max_event_ingest_per_window: @default_event_ingest_rate_limit, event_ingest_window_seconds: @default_event_ingest_window_seconds, event_ingest_window_started_at_ms: 0, @@ -96,6 +98,7 @@ defmodule Parrhesia.Web.Connection do drain_scheduled?: boolean(), max_frame_bytes: pos_integer(), max_event_bytes: pos_integer(), + event_ingest_limiter: GenServer.server() | nil, max_event_ingest_per_window: pos_integer(), event_ingest_window_seconds: pos_integer(), event_ingest_window_started_at_ms: integer(), @@ -122,6 +125,7 @@ defmodule Parrhesia.Web.Connection do outbound_drain_batch_size: outbound_drain_batch_size(opts), max_frame_bytes: max_frame_bytes(opts), max_event_bytes: max_event_bytes(opts), + event_ingest_limiter: event_ingest_limiter(opts), max_event_ingest_per_window: max_event_ingest_per_window(opts), event_ingest_window_seconds: event_ingest_window_seconds(opts), event_ingest_window_started_at_ms: System.monotonic_time(:millisecond), @@ -277,16 +281,23 @@ defmodule Parrhesia.Web.Connection do case maybe_allow_event_ingest(state) do {:ok, next_state} -> - case authorize_listener_write(next_state, event) do - :ok -> publish_event_response(next_state, event) - {:error, reason} -> ingest_error_response(state, event_id, reason) - end + maybe_publish_ingested_event(next_state, state, event, event_id) {:error, reason} -> ingest_error_response(state, event_id, reason) end end + defp maybe_publish_ingested_event(next_state, state, event, event_id) do + with :ok <- maybe_allow_relay_event_ingest(next_state.event_ingest_limiter), + :ok <- authorize_listener_write(next_state, event) do + publish_event_response(next_state, event) + else + {:error, reason} -> + ingest_error_response(state, event_id, reason) + end + end + defp publish_event_response(%__MODULE__{} = state, event) do case Events.publish( event, @@ -563,6 +574,9 @@ defmodule Parrhesia.Web.Connection do defp error_message_for_ingest_failure(:event_rate_limited), do: "rate-limited: too many EVENT messages" + defp error_message_for_ingest_failure(:relay_event_rate_limited), + do: "rate-limited: relay-wide EVENT ingress exceeded" + defp error_message_for_ingest_failure(:event_too_large), do: "invalid: event exceeds max event size" @@ -1547,6 +1561,16 @@ defmodule Parrhesia.Web.Connection do |> Keyword.get(:max_event_ingest_per_window, @default_event_ingest_rate_limit) end + defp event_ingest_limiter(opts) when is_list(opts) do + Keyword.get(opts, :event_ingest_limiter, EventIngestLimiter) + end + + defp event_ingest_limiter(opts) when is_map(opts) do + Map.get(opts, :event_ingest_limiter, EventIngestLimiter) + end + + defp event_ingest_limiter(_opts), do: EventIngestLimiter + defp event_ingest_window_seconds(opts) when is_list(opts) do opts |> Keyword.get(:event_ingest_window_seconds) @@ -1646,4 +1670,13 @@ defmodule Parrhesia.Web.Connection do {:error, :event_rate_limited} end end + + defp maybe_allow_relay_event_ingest(nil), do: :ok + + defp maybe_allow_relay_event_ingest(server) do + EventIngestLimiter.allow(server) + catch + :exit, {:noproc, _details} -> :ok + :exit, {:normal, _details} -> :ok + end end diff --git a/lib/parrhesia/web/event_ingest_limiter.ex b/lib/parrhesia/web/event_ingest_limiter.ex new file mode 100644 index 0000000..e8f8407 --- /dev/null +++ b/lib/parrhesia/web/event_ingest_limiter.ex @@ -0,0 +1,76 @@ +defmodule Parrhesia.Web.EventIngestLimiter do + @moduledoc """ + Relay-wide EVENT ingest rate limiting over a fixed time window. + """ + + use GenServer + + @default_max_events_per_window 10_000 + @default_window_seconds 1 + + @spec start_link(keyword()) :: GenServer.on_start() + def start_link(opts \\ []) do + case Keyword.get(opts, :name, __MODULE__) do + nil -> GenServer.start_link(__MODULE__, opts) + name -> GenServer.start_link(__MODULE__, opts, name: name) + end + end + + @spec allow(GenServer.server()) :: :ok | {:error, :relay_event_rate_limited} + def allow(server \\ __MODULE__) do + GenServer.call(server, :allow) + end + + @impl true + def init(opts) do + {:ok, + %{ + max_events_per_window: + normalize_positive_integer( + Keyword.get(opts, :max_events_per_window), + max_events_per_window() + ), + window_ms: + normalize_positive_integer(Keyword.get(opts, :window_seconds), window_seconds()) * 1000, + window_started_at_ms: System.monotonic_time(:millisecond), + count: 0 + }} + end + + @impl true + def handle_call(:allow, _from, state) do + now_ms = System.monotonic_time(:millisecond) + + cond do + now_ms - state.window_started_at_ms >= state.window_ms -> + next_state = %{state | window_started_at_ms: now_ms, count: 1} + {:reply, :ok, next_state} + + state.count < state.max_events_per_window -> + next_state = %{state | count: state.count + 1} + {:reply, :ok, next_state} + + true -> + {:reply, {:error, :relay_event_rate_limited}, state} + end + end + + defp normalize_positive_integer(value, _default) when is_integer(value) and value > 0, do: value + defp normalize_positive_integer(_value, default), do: default + + defp max_events_per_window do + case Application.get_env(:parrhesia, :limits, []) + |> Keyword.get(:relay_max_event_ingest_per_window) do + value when is_integer(value) and value > 0 -> value + _other -> @default_max_events_per_window + end + end + + defp window_seconds do + case Application.get_env(:parrhesia, :limits, []) + |> Keyword.get(:relay_event_ingest_window_seconds) do + value when is_integer(value) and value > 0 -> value + _other -> @default_window_seconds + end + end +end diff --git a/test/parrhesia/application_test.exs b/test/parrhesia/application_test.exs index 3c9043a..55bb135 100644 --- a/test/parrhesia/application_test.exs +++ b/test/parrhesia/application_test.exs @@ -5,6 +5,7 @@ defmodule Parrhesia.ApplicationTest do assert is_pid(Process.whereis(Parrhesia.Supervisor)) assert is_pid(Process.whereis(Parrhesia.Telemetry)) assert is_pid(Process.whereis(Parrhesia.Config)) + assert is_pid(Process.whereis(Parrhesia.Web.EventIngestLimiter)) assert is_pid(Process.whereis(Parrhesia.Storage.Supervisor)) assert is_pid(Process.whereis(Parrhesia.Subscriptions.Supervisor)) assert is_pid(Process.whereis(Parrhesia.Auth.Supervisor)) diff --git a/test/parrhesia/config_test.exs b/test/parrhesia/config_test.exs index 2d160d0..402a7d8 100644 --- a/test/parrhesia/config_test.exs +++ b/test/parrhesia/config_test.exs @@ -8,6 +8,8 @@ defmodule Parrhesia.ConfigTest do assert Parrhesia.Config.get([:limits, :max_event_ingest_per_window]) == 120 assert Parrhesia.Config.get([:limits, :max_tags_per_event]) == 256 assert Parrhesia.Config.get([:limits, :max_tag_values_per_filter]) == 128 + assert Parrhesia.Config.get([:limits, :relay_max_event_ingest_per_window]) == 10_000 + assert Parrhesia.Config.get([:limits, :relay_event_ingest_window_seconds]) == 1 assert Parrhesia.Config.get([:limits, :event_ingest_window_seconds]) == 1 assert Parrhesia.Config.get([:limits, :auth_max_age_seconds]) == 600 assert Parrhesia.Config.get([:limits, :max_outbound_queue]) == 256 diff --git a/test/parrhesia/web/connection_test.exs b/test/parrhesia/web/connection_test.exs index 0d5efc5..848933d 100644 --- a/test/parrhesia/web/connection_test.exs +++ b/test/parrhesia/web/connection_test.exs @@ -424,6 +424,42 @@ defmodule Parrhesia.Web.ConnectionTest do ] end + test "EVENT ingest enforces relay-wide rate limits" do + limiter = + start_supervised!( + {Parrhesia.Web.EventIngestLimiter, + name: nil, max_events_per_window: 1, window_seconds: 60} + ) + + state = + connection_state( + event_ingest_limiter: limiter, + max_event_ingest_per_window: 10, + event_ingest_window_seconds: 60 + ) + + first_event = valid_event(%{"content" => "first"}) + second_event = valid_event(%{"content" => "second"}) + + assert {:push, {:text, first_response}, next_state} = + Connection.handle_in({JSON.encode!(["EVENT", first_event]), [opcode: :text]}, state) + + assert JSON.decode!(first_response) == ["OK", first_event["id"], true, "ok: event stored"] + + assert {:push, {:text, second_response}, ^next_state} = + Connection.handle_in( + {JSON.encode!(["EVENT", second_event]), [opcode: :text]}, + next_state + ) + + assert JSON.decode!(second_response) == [ + "OK", + second_event["id"], + false, + "rate-limited: relay-wide EVENT ingress exceeded" + ] + end + test "EVENT ingest enforces max event bytes" do state = connection_state(max_event_bytes: 128) diff --git a/test/parrhesia/web/event_ingest_limiter_test.exs b/test/parrhesia/web/event_ingest_limiter_test.exs new file mode 100644 index 0000000..9890016 --- /dev/null +++ b/test/parrhesia/web/event_ingest_limiter_test.exs @@ -0,0 +1,16 @@ +defmodule Parrhesia.Web.EventIngestLimiterTest do + use ExUnit.Case, async: true + + alias Parrhesia.Web.EventIngestLimiter + + test "allows events up to the configured relay-wide window cap" do + limiter = + start_supervised!( + {EventIngestLimiter, name: nil, max_events_per_window: 2, window_seconds: 60} + ) + + assert :ok = EventIngestLimiter.allow(limiter) + assert :ok = EventIngestLimiter.allow(limiter) + assert {:error, :relay_event_rate_limited} = EventIngestLimiter.allow(limiter) + end +end