Decouple publish fanout and use ETS ingest counters
Some checks failed
CI / Test (OTP 27.2 / Elixir 1.18.2) (push) Failing after 0s
CI / Test (OTP 28.4 / Elixir 1.19.4 + Marmot E2E) (push) Failing after 0s

This commit is contained in:
2026-03-18 14:10:32 +01:00
parent 05718d4b91
commit b56925f413
6 changed files with 144 additions and 50 deletions

View File

@@ -5,13 +5,13 @@ defmodule Parrhesia.API.Events do
alias Parrhesia.API.Events.PublishResult
alias Parrhesia.API.RequestContext
alias Parrhesia.Fanout.Dispatcher
alias Parrhesia.Fanout.MultiNode
alias Parrhesia.Groups.Flow
alias Parrhesia.Policy.EventPolicy
alias Parrhesia.Protocol
alias Parrhesia.Protocol.Filter
alias Parrhesia.Storage
alias Parrhesia.Subscriptions.Index
alias Parrhesia.Telemetry
@default_max_event_bytes 262_144
@@ -48,7 +48,7 @@ defmodule Parrhesia.API.Events do
telemetry_metadata_for_event(event)
)
fanout_event(event)
Dispatcher.dispatch(event)
maybe_publish_multi_node(event)
{:ok,
@@ -230,20 +230,6 @@ defmodule Parrhesia.API.Events do
end
end
defp fanout_event(event) do
case Index.candidate_subscription_keys(event) do
candidates when is_list(candidates) ->
Enum.each(candidates, fn {owner_pid, subscription_id} ->
send(owner_pid, {:fanout_event, subscription_id, event})
end)
_other ->
:ok
end
catch
:exit, _reason -> :ok
end
defp maybe_publish_multi_node(event) do
MultiNode.publish(event)
:ok

View File

@@ -0,0 +1,46 @@
defmodule Parrhesia.Fanout.Dispatcher do
@moduledoc """
Asynchronous local fanout dispatcher.
"""
use GenServer
alias Parrhesia.Subscriptions.Index
@spec start_link(keyword()) :: GenServer.on_start()
def start_link(opts \\ []) do
name = Keyword.get(opts, :name, __MODULE__)
GenServer.start_link(__MODULE__, :ok, name: name)
end
@spec dispatch(map()) :: :ok
def dispatch(event), do: dispatch(__MODULE__, event)
@spec dispatch(GenServer.server(), map()) :: :ok
def dispatch(server, event) when is_map(event) do
GenServer.cast(server, {:dispatch, event})
end
@impl true
def init(:ok), do: {:ok, %{}}
@impl true
def handle_cast({:dispatch, event}, state) do
dispatch_to_candidates(event)
{:noreply, state}
end
defp dispatch_to_candidates(event) do
case Index.candidate_subscription_keys(event) do
candidates when is_list(candidates) ->
Enum.each(candidates, fn {owner_pid, subscription_id} ->
send(owner_pid, {:fanout_event, subscription_id, event})
end)
_other ->
:ok
end
catch
:exit, _reason -> :ok
end
end

View File

@@ -5,7 +5,7 @@ defmodule Parrhesia.Fanout.MultiNode do
use GenServer
alias Parrhesia.Subscriptions.Index
alias Parrhesia.Fanout.Dispatcher
@group __MODULE__
@@ -44,11 +44,7 @@ defmodule Parrhesia.Fanout.MultiNode do
@impl true
def handle_info({:remote_fanout_event, event}, state) do
Index.candidate_subscription_keys(event)
|> Enum.each(fn {owner_pid, subscription_id} ->
send(owner_pid, {:fanout_event, subscription_id, event})
end)
Dispatcher.dispatch(event)
{:noreply, state}
end

View File

@@ -14,6 +14,7 @@ defmodule Parrhesia.Subscriptions.Supervisor do
children =
[
{Parrhesia.Subscriptions.Index, name: Parrhesia.Subscriptions.Index},
{Parrhesia.Fanout.Dispatcher, name: Parrhesia.Fanout.Dispatcher},
{Registry, keys: :unique, name: Parrhesia.API.Stream.Registry},
{DynamicSupervisor, strategy: :one_for_one, name: Parrhesia.API.Stream.Supervisor}
] ++

View File

@@ -7,57 +7,121 @@ defmodule Parrhesia.Web.EventIngestLimiter do
@default_max_events_per_window 10_000
@default_window_seconds 1
@named_table :parrhesia_event_ingest_limiter
@config_key :config
@spec start_link(keyword()) :: GenServer.on_start()
def start_link(opts \\ []) do
max_events_per_window =
normalize_positive_integer(
Keyword.get(opts, :max_events_per_window),
max_events_per_window()
)
window_ms =
normalize_positive_integer(Keyword.get(opts, :window_seconds), window_seconds()) * 1000
init_arg = %{
max_events_per_window: max_events_per_window,
window_ms: window_ms,
named_table?: Keyword.get(opts, :name, __MODULE__) == __MODULE__
}
case Keyword.get(opts, :name, __MODULE__) do
nil -> GenServer.start_link(__MODULE__, opts)
name -> GenServer.start_link(__MODULE__, opts, name: name)
nil -> GenServer.start_link(__MODULE__, init_arg)
name -> GenServer.start_link(__MODULE__, init_arg, name: name)
end
end
@spec allow(GenServer.server()) :: :ok | {:error, :relay_event_rate_limited}
def allow(server \\ __MODULE__) do
GenServer.call(server, :allow)
def allow(server \\ __MODULE__)
def allow(__MODULE__) do
case fetch_named_config() do
{:ok, max_events_per_window, window_ms} ->
allow_counter(@named_table, max_events_per_window, window_ms)
:error ->
:ok
end
end
def allow(server), do: GenServer.call(server, :allow)
@impl true
def init(opts) do
def init(%{
max_events_per_window: max_events_per_window,
window_ms: window_ms,
named_table?: named_table?
}) do
table = create_table(named_table?)
true = :ets.insert(table, {@config_key, max_events_per_window, window_ms})
{:ok,
%{
max_events_per_window:
normalize_positive_integer(
Keyword.get(opts, :max_events_per_window),
max_events_per_window()
),
window_ms:
normalize_positive_integer(Keyword.get(opts, :window_seconds), window_seconds()) * 1000,
window_started_at_ms: System.monotonic_time(:millisecond),
count: 0
table: table,
max_events_per_window: max_events_per_window,
window_ms: window_ms
}}
end
@impl true
def handle_call(:allow, _from, state) do
now_ms = System.monotonic_time(:millisecond)
cond do
now_ms - state.window_started_at_ms >= state.window_ms ->
next_state = %{state | window_started_at_ms: now_ms, count: 1}
{:reply, :ok, next_state}
state.count < state.max_events_per_window ->
next_state = %{state | count: state.count + 1}
{:reply, :ok, next_state}
true ->
{:reply, {:error, :relay_event_rate_limited}, state}
end
{:reply, allow_counter(state.table, state.max_events_per_window, state.window_ms), state}
end
defp normalize_positive_integer(value, _default) when is_integer(value) and value > 0, do: value
defp normalize_positive_integer(_value, default), do: default
defp create_table(true) do
:ets.new(@named_table, [
:named_table,
:set,
:public,
{:read_concurrency, true},
{:write_concurrency, true}
])
end
defp create_table(false) do
:ets.new(__MODULE__, [:set, :public, {:read_concurrency, true}, {:write_concurrency, true}])
end
defp fetch_named_config do
case :ets.lookup(@named_table, @config_key) do
[{@config_key, max_events_per_window, window_ms}] -> {:ok, max_events_per_window, window_ms}
_other -> :error
end
rescue
ArgumentError -> :error
end
defp allow_counter(table, max_events_per_window, window_ms) do
window_id = System.monotonic_time(:millisecond) |> div(window_ms)
key = {:window, window_id}
count = :ets.update_counter(table, key, {2, 1}, {key, 0})
if count == 1 do
prune_expired_windows(table, window_id)
end
if count <= max_events_per_window do
:ok
else
{:error, :relay_event_rate_limited}
end
rescue
ArgumentError -> :ok
end
defp prune_expired_windows(table, window_id) do
:ets.select_delete(table, [
{{{:window, :"$1"}, :_}, [{:<, :"$1", window_id}], [true]}
])
end
defp max_events_per_window do
case Application.get_env(:parrhesia, :limits, [])
|> Keyword.get(:relay_max_event_ingest_per_window) do

View File

@@ -8,6 +8,7 @@ defmodule Parrhesia.ApplicationTest do
assert is_pid(Process.whereis(Parrhesia.Web.EventIngestLimiter))
assert is_pid(Process.whereis(Parrhesia.Storage.Supervisor))
assert is_pid(Process.whereis(Parrhesia.Subscriptions.Supervisor))
assert is_pid(Process.whereis(Parrhesia.Fanout.Dispatcher))
assert is_pid(Process.whereis(Parrhesia.Auth.Supervisor))
assert is_pid(Process.whereis(Parrhesia.Sync.Supervisor))
assert is_pid(Process.whereis(Parrhesia.Policy.Supervisor))