diff --git a/lib/parrhesia/api/events.ex b/lib/parrhesia/api/events.ex index 83449e5..b438ecf 100644 --- a/lib/parrhesia/api/events.ex +++ b/lib/parrhesia/api/events.ex @@ -5,13 +5,13 @@ defmodule Parrhesia.API.Events do alias Parrhesia.API.Events.PublishResult alias Parrhesia.API.RequestContext + alias Parrhesia.Fanout.Dispatcher alias Parrhesia.Fanout.MultiNode alias Parrhesia.Groups.Flow alias Parrhesia.Policy.EventPolicy alias Parrhesia.Protocol alias Parrhesia.Protocol.Filter alias Parrhesia.Storage - alias Parrhesia.Subscriptions.Index alias Parrhesia.Telemetry @default_max_event_bytes 262_144 @@ -48,7 +48,7 @@ defmodule Parrhesia.API.Events do telemetry_metadata_for_event(event) ) - fanout_event(event) + Dispatcher.dispatch(event) maybe_publish_multi_node(event) {:ok, @@ -230,20 +230,6 @@ defmodule Parrhesia.API.Events do end end - defp fanout_event(event) do - case Index.candidate_subscription_keys(event) do - candidates when is_list(candidates) -> - Enum.each(candidates, fn {owner_pid, subscription_id} -> - send(owner_pid, {:fanout_event, subscription_id, event}) - end) - - _other -> - :ok - end - catch - :exit, _reason -> :ok - end - defp maybe_publish_multi_node(event) do MultiNode.publish(event) :ok diff --git a/lib/parrhesia/fanout/dispatcher.ex b/lib/parrhesia/fanout/dispatcher.ex new file mode 100644 index 0000000..7125e9d --- /dev/null +++ b/lib/parrhesia/fanout/dispatcher.ex @@ -0,0 +1,46 @@ +defmodule Parrhesia.Fanout.Dispatcher do + @moduledoc """ + Asynchronous local fanout dispatcher. + """ + + use GenServer + + alias Parrhesia.Subscriptions.Index + + @spec start_link(keyword()) :: GenServer.on_start() + def start_link(opts \\ []) do + name = Keyword.get(opts, :name, __MODULE__) + GenServer.start_link(__MODULE__, :ok, name: name) + end + + @spec dispatch(map()) :: :ok + def dispatch(event), do: dispatch(__MODULE__, event) + + @spec dispatch(GenServer.server(), map()) :: :ok + def dispatch(server, event) when is_map(event) do + GenServer.cast(server, {:dispatch, event}) + end + + @impl true + def init(:ok), do: {:ok, %{}} + + @impl true + def handle_cast({:dispatch, event}, state) do + dispatch_to_candidates(event) + {:noreply, state} + end + + defp dispatch_to_candidates(event) do + case Index.candidate_subscription_keys(event) do + candidates when is_list(candidates) -> + Enum.each(candidates, fn {owner_pid, subscription_id} -> + send(owner_pid, {:fanout_event, subscription_id, event}) + end) + + _other -> + :ok + end + catch + :exit, _reason -> :ok + end +end diff --git a/lib/parrhesia/fanout/multi_node.ex b/lib/parrhesia/fanout/multi_node.ex index c10fd77..4d88749 100644 --- a/lib/parrhesia/fanout/multi_node.ex +++ b/lib/parrhesia/fanout/multi_node.ex @@ -5,7 +5,7 @@ defmodule Parrhesia.Fanout.MultiNode do use GenServer - alias Parrhesia.Subscriptions.Index + alias Parrhesia.Fanout.Dispatcher @group __MODULE__ @@ -44,11 +44,7 @@ defmodule Parrhesia.Fanout.MultiNode do @impl true def handle_info({:remote_fanout_event, event}, state) do - Index.candidate_subscription_keys(event) - |> Enum.each(fn {owner_pid, subscription_id} -> - send(owner_pid, {:fanout_event, subscription_id, event}) - end) - + Dispatcher.dispatch(event) {:noreply, state} end diff --git a/lib/parrhesia/subscriptions/supervisor.ex b/lib/parrhesia/subscriptions/supervisor.ex index 92b009e..738500c 100644 --- a/lib/parrhesia/subscriptions/supervisor.ex +++ b/lib/parrhesia/subscriptions/supervisor.ex @@ -14,6 +14,7 @@ defmodule Parrhesia.Subscriptions.Supervisor do children = [ {Parrhesia.Subscriptions.Index, name: Parrhesia.Subscriptions.Index}, + {Parrhesia.Fanout.Dispatcher, name: Parrhesia.Fanout.Dispatcher}, {Registry, keys: :unique, name: Parrhesia.API.Stream.Registry}, {DynamicSupervisor, strategy: :one_for_one, name: Parrhesia.API.Stream.Supervisor} ] ++ diff --git a/lib/parrhesia/web/event_ingest_limiter.ex b/lib/parrhesia/web/event_ingest_limiter.ex index e8f8407..72fcdd8 100644 --- a/lib/parrhesia/web/event_ingest_limiter.ex +++ b/lib/parrhesia/web/event_ingest_limiter.ex @@ -7,57 +7,121 @@ defmodule Parrhesia.Web.EventIngestLimiter do @default_max_events_per_window 10_000 @default_window_seconds 1 + @named_table :parrhesia_event_ingest_limiter + @config_key :config @spec start_link(keyword()) :: GenServer.on_start() def start_link(opts \\ []) do + max_events_per_window = + normalize_positive_integer( + Keyword.get(opts, :max_events_per_window), + max_events_per_window() + ) + + window_ms = + normalize_positive_integer(Keyword.get(opts, :window_seconds), window_seconds()) * 1000 + + init_arg = %{ + max_events_per_window: max_events_per_window, + window_ms: window_ms, + named_table?: Keyword.get(opts, :name, __MODULE__) == __MODULE__ + } + case Keyword.get(opts, :name, __MODULE__) do - nil -> GenServer.start_link(__MODULE__, opts) - name -> GenServer.start_link(__MODULE__, opts, name: name) + nil -> GenServer.start_link(__MODULE__, init_arg) + name -> GenServer.start_link(__MODULE__, init_arg, name: name) end end @spec allow(GenServer.server()) :: :ok | {:error, :relay_event_rate_limited} - def allow(server \\ __MODULE__) do - GenServer.call(server, :allow) + def allow(server \\ __MODULE__) + + def allow(__MODULE__) do + case fetch_named_config() do + {:ok, max_events_per_window, window_ms} -> + allow_counter(@named_table, max_events_per_window, window_ms) + + :error -> + :ok + end end + def allow(server), do: GenServer.call(server, :allow) + @impl true - def init(opts) do + def init(%{ + max_events_per_window: max_events_per_window, + window_ms: window_ms, + named_table?: named_table? + }) do + table = create_table(named_table?) + + true = :ets.insert(table, {@config_key, max_events_per_window, window_ms}) + {:ok, %{ - max_events_per_window: - normalize_positive_integer( - Keyword.get(opts, :max_events_per_window), - max_events_per_window() - ), - window_ms: - normalize_positive_integer(Keyword.get(opts, :window_seconds), window_seconds()) * 1000, - window_started_at_ms: System.monotonic_time(:millisecond), - count: 0 + table: table, + max_events_per_window: max_events_per_window, + window_ms: window_ms }} end @impl true def handle_call(:allow, _from, state) do - now_ms = System.monotonic_time(:millisecond) - - cond do - now_ms - state.window_started_at_ms >= state.window_ms -> - next_state = %{state | window_started_at_ms: now_ms, count: 1} - {:reply, :ok, next_state} - - state.count < state.max_events_per_window -> - next_state = %{state | count: state.count + 1} - {:reply, :ok, next_state} - - true -> - {:reply, {:error, :relay_event_rate_limited}, state} - end + {:reply, allow_counter(state.table, state.max_events_per_window, state.window_ms), state} end defp normalize_positive_integer(value, _default) when is_integer(value) and value > 0, do: value defp normalize_positive_integer(_value, default), do: default + defp create_table(true) do + :ets.new(@named_table, [ + :named_table, + :set, + :public, + {:read_concurrency, true}, + {:write_concurrency, true} + ]) + end + + defp create_table(false) do + :ets.new(__MODULE__, [:set, :public, {:read_concurrency, true}, {:write_concurrency, true}]) + end + + defp fetch_named_config do + case :ets.lookup(@named_table, @config_key) do + [{@config_key, max_events_per_window, window_ms}] -> {:ok, max_events_per_window, window_ms} + _other -> :error + end + rescue + ArgumentError -> :error + end + + defp allow_counter(table, max_events_per_window, window_ms) do + window_id = System.monotonic_time(:millisecond) |> div(window_ms) + key = {:window, window_id} + + count = :ets.update_counter(table, key, {2, 1}, {key, 0}) + + if count == 1 do + prune_expired_windows(table, window_id) + end + + if count <= max_events_per_window do + :ok + else + {:error, :relay_event_rate_limited} + end + rescue + ArgumentError -> :ok + end + + defp prune_expired_windows(table, window_id) do + :ets.select_delete(table, [ + {{{:window, :"$1"}, :_}, [{:<, :"$1", window_id}], [true]} + ]) + end + defp max_events_per_window do case Application.get_env(:parrhesia, :limits, []) |> Keyword.get(:relay_max_event_ingest_per_window) do diff --git a/test/parrhesia/application_test.exs b/test/parrhesia/application_test.exs index 20faf54..a1b853b 100644 --- a/test/parrhesia/application_test.exs +++ b/test/parrhesia/application_test.exs @@ -8,6 +8,7 @@ defmodule Parrhesia.ApplicationTest do assert is_pid(Process.whereis(Parrhesia.Web.EventIngestLimiter)) assert is_pid(Process.whereis(Parrhesia.Storage.Supervisor)) assert is_pid(Process.whereis(Parrhesia.Subscriptions.Supervisor)) + assert is_pid(Process.whereis(Parrhesia.Fanout.Dispatcher)) assert is_pid(Process.whereis(Parrhesia.Auth.Supervisor)) assert is_pid(Process.whereis(Parrhesia.Sync.Supervisor)) assert is_pid(Process.whereis(Parrhesia.Policy.Supervisor))