Add bounded outbound queue backpressure to connections

This commit is contained in:
2026-03-13 20:58:28 +01:00
parent df3f2dae8d
commit 3b66ed1996
5 changed files with 386 additions and 6 deletions

View File

@@ -10,11 +10,23 @@ defmodule Parrhesia.Web.Connection do
alias Parrhesia.Subscriptions.Index
@default_max_subscriptions_per_connection 32
@default_max_outbound_queue 256
@default_outbound_drain_batch_size 64
@default_outbound_overflow_strategy :close
@drain_outbound_queue :drain_outbound_queue
defstruct subscriptions: %{},
authenticated_pubkeys: MapSet.new(),
max_subscriptions_per_connection: @default_max_subscriptions_per_connection,
subscription_index: Index
subscription_index: Index,
outbound_queue: :queue.new(),
outbound_queue_size: 0,
max_outbound_queue: @default_max_outbound_queue,
outbound_overflow_strategy: @default_outbound_overflow_strategy,
outbound_drain_batch_size: @default_outbound_drain_batch_size,
drain_scheduled?: false
@type overflow_strategy :: :close | :drop_oldest | :drop_newest
@type subscription :: %{
filters: [map()],
@@ -25,14 +37,23 @@ defmodule Parrhesia.Web.Connection do
subscriptions: %{String.t() => subscription()},
authenticated_pubkeys: MapSet.t(String.t()),
max_subscriptions_per_connection: pos_integer(),
subscription_index: GenServer.server() | nil
subscription_index: GenServer.server() | nil,
outbound_queue: :queue.queue({String.t(), map()}),
outbound_queue_size: non_neg_integer(),
max_outbound_queue: pos_integer(),
outbound_overflow_strategy: overflow_strategy(),
outbound_drain_batch_size: pos_integer(),
drain_scheduled?: boolean()
}
@impl true
def init(opts) do
state = %__MODULE__{
max_subscriptions_per_connection: max_subscriptions_per_connection(opts),
subscription_index: subscription_index(opts)
subscription_index: subscription_index(opts),
max_outbound_queue: max_outbound_queue(opts),
outbound_overflow_strategy: outbound_overflow_strategy(opts),
outbound_drain_batch_size: outbound_drain_batch_size(opts)
}
{:ok, state}
@@ -59,7 +80,11 @@ defmodule Parrhesia.Web.Connection do
handle_req(state, subscription_id, filters)
{:ok, {:close, subscription_id}} ->
next_state = drop_subscription(state, subscription_id)
next_state =
state
|> drop_subscription(subscription_id)
|> drop_queued_subscription_events(subscription_id)
:ok = maybe_remove_index_subscription(next_state, subscription_id)
response =
@@ -82,6 +107,26 @@ defmodule Parrhesia.Web.Connection do
end
@impl true
def handle_info({:fanout_event, subscription_id, event}, %__MODULE__{} = state)
when is_binary(subscription_id) and is_map(event) do
handle_fanout_events(state, [{subscription_id, event}])
end
def handle_info({:fanout_events, fanout_events}, %__MODULE__{} = state)
when is_list(fanout_events) do
handle_fanout_events(state, fanout_events)
end
def handle_info(@drain_outbound_queue, %__MODULE__{} = state) do
{frames, next_state} = drain_outbound_frames(state)
if frames == [] do
{:ok, next_state}
else
{:push, frames, next_state}
end
end
def handle_info(_message, %__MODULE__{} = state) do
{:ok, state}
end
@@ -116,6 +161,140 @@ defmodule Parrhesia.Web.Connection do
end
end
defp handle_fanout_events(%__MODULE__{} = state, fanout_events) do
case enqueue_fanout_events(state, fanout_events) do
{:ok, next_state} ->
{:ok, maybe_schedule_drain(next_state)}
{:close, next_state} ->
close_with_outbound_overflow(next_state)
end
end
defp close_with_outbound_overflow(state) do
message = "rate-limited: outbound queue overflow"
notice = Protocol.encode_relay({:notice, message})
{:stop, :normal, {1008, message}, [{:text, notice}], state}
end
defp enqueue_fanout_events(state, fanout_events) do
Enum.reduce_while(fanout_events, {:ok, state}, fn
{subscription_id, event}, {:ok, acc} when is_binary(subscription_id) and is_map(event) ->
case maybe_enqueue_fanout_event(acc, subscription_id, event) do
{:ok, next_acc} -> {:cont, {:ok, next_acc}}
{:close, next_acc} -> {:halt, {:close, next_acc}}
end
_invalid_event, {:ok, acc} ->
{:cont, {:ok, acc}}
end)
end
defp maybe_enqueue_fanout_event(state, subscription_id, event) do
if subscription_matches?(state, subscription_id, event) do
enqueue_outbound(state, {subscription_id, event})
else
{:ok, state}
end
end
defp subscription_matches?(%__MODULE__{} = state, subscription_id, event) do
case Map.get(state.subscriptions, subscription_id) do
nil -> false
%{filters: filters} -> Filter.matches_any?(event, filters)
end
end
defp enqueue_outbound(
%__MODULE__{outbound_queue_size: queue_size, max_outbound_queue: max_outbound_queue} =
state,
queue_entry
)
when queue_size < max_outbound_queue do
{:ok,
%__MODULE__{
state
| outbound_queue: :queue.in(queue_entry, state.outbound_queue),
outbound_queue_size: queue_size + 1
}}
end
defp enqueue_outbound(
%__MODULE__{outbound_overflow_strategy: :drop_newest} = state,
_queue_entry
),
do: {:ok, state}
defp enqueue_outbound(
%__MODULE__{outbound_overflow_strategy: :drop_oldest} = state,
queue_entry
) do
{next_queue, next_size} =
drop_oldest_and_enqueue(state.outbound_queue, state.outbound_queue_size, queue_entry)
{:ok, %__MODULE__{state | outbound_queue: next_queue, outbound_queue_size: next_size}}
end
defp enqueue_outbound(%__MODULE__{outbound_overflow_strategy: :close} = state, _queue_entry),
do: {:close, state}
defp drop_oldest_and_enqueue(queue, queue_size, queue_entry) when queue_size > 0 do
{_dropped, truncated_queue} = :queue.out(queue)
{:queue.in(queue_entry, truncated_queue), queue_size}
end
defp drop_oldest_and_enqueue(queue, queue_size, queue_entry) do
{:queue.in(queue_entry, queue), queue_size + 1}
end
defp drain_outbound_frames(%__MODULE__{} = state) do
{frames, next_queue, remaining_size} =
pop_frames(
state.outbound_queue,
state.outbound_queue_size,
state.outbound_drain_batch_size,
[]
)
next_state =
%__MODULE__{
state
| outbound_queue: next_queue,
outbound_queue_size: remaining_size,
drain_scheduled?: false
}
|> maybe_schedule_drain()
{Enum.reverse(frames), next_state}
end
defp pop_frames(queue, queue_size, _remaining_batch, acc) when queue_size == 0,
do: {acc, queue, queue_size}
defp pop_frames(queue, queue_size, remaining_batch, acc) when remaining_batch <= 0,
do: {acc, queue, queue_size}
defp pop_frames(queue, queue_size, remaining_batch, acc) do
case :queue.out(queue) do
{{:value, {subscription_id, event}}, next_queue} ->
frame = {:text, Protocol.encode_relay({:event, subscription_id, event})}
pop_frames(next_queue, queue_size - 1, remaining_batch - 1, [frame | acc])
{:empty, _same_queue} ->
{acc, :queue.new(), 0}
end
end
defp maybe_schedule_drain(%__MODULE__{drain_scheduled?: true} = state), do: state
defp maybe_schedule_drain(%__MODULE__{outbound_queue_size: 0} = state), do: state
defp maybe_schedule_drain(%__MODULE__{} = state) do
send(self(), @drain_outbound_queue)
%__MODULE__{state | drain_scheduled?: true}
end
defp upsert_subscription(%__MODULE__{} = state, subscription_id, filters) do
subscription = %{filters: filters, eose_sent?: true}
@@ -141,6 +320,28 @@ defmodule Parrhesia.Web.Connection do
%__MODULE__{state | subscriptions: subscriptions}
end
defp drop_queued_subscription_events(
%__MODULE__{outbound_queue_size: 0} = state,
_subscription_id
),
do: state
defp drop_queued_subscription_events(%__MODULE__{} = state, subscription_id) do
filtered_entries =
state.outbound_queue
|> :queue.to_list()
|> Enum.reject(fn
{^subscription_id, _event} -> true
_queue_entry -> false
end)
%__MODULE__{
state
| outbound_queue: :queue.from_list(filtered_entries),
outbound_queue_size: length(filtered_entries)
}
end
defp maybe_upsert_index_subscription(
%__MODULE__{subscription_index: nil},
_subscription_id,
@@ -234,4 +435,78 @@ defmodule Parrhesia.Web.Connection do
@default_max_subscriptions_per_connection
)
end
defp max_outbound_queue(opts) when is_list(opts) do
opts
|> Keyword.get(:max_outbound_queue)
|> normalize_max_outbound_queue()
end
defp max_outbound_queue(opts) when is_map(opts) do
opts
|> Map.get(:max_outbound_queue)
|> normalize_max_outbound_queue()
end
defp max_outbound_queue(_opts), do: configured_max_outbound_queue()
defp normalize_max_outbound_queue(value) when is_integer(value) and value > 0, do: value
defp normalize_max_outbound_queue(_value), do: configured_max_outbound_queue()
defp configured_max_outbound_queue do
:parrhesia
|> Application.get_env(:limits, [])
|> Keyword.get(:max_outbound_queue, @default_max_outbound_queue)
end
defp outbound_drain_batch_size(opts) when is_list(opts) do
opts
|> Keyword.get(:outbound_drain_batch_size)
|> normalize_outbound_drain_batch_size()
end
defp outbound_drain_batch_size(opts) when is_map(opts) do
opts
|> Map.get(:outbound_drain_batch_size)
|> normalize_outbound_drain_batch_size()
end
defp outbound_drain_batch_size(_opts), do: configured_outbound_drain_batch_size()
defp normalize_outbound_drain_batch_size(value) when is_integer(value) and value > 0,
do: value
defp normalize_outbound_drain_batch_size(_value), do: configured_outbound_drain_batch_size()
defp configured_outbound_drain_batch_size do
:parrhesia
|> Application.get_env(:limits, [])
|> Keyword.get(:outbound_drain_batch_size, @default_outbound_drain_batch_size)
end
defp outbound_overflow_strategy(opts) when is_list(opts) do
opts
|> Keyword.get(:outbound_overflow_strategy)
|> normalize_outbound_overflow_strategy()
end
defp outbound_overflow_strategy(opts) when is_map(opts) do
opts
|> Map.get(:outbound_overflow_strategy)
|> normalize_outbound_overflow_strategy()
end
defp outbound_overflow_strategy(_opts), do: configured_outbound_overflow_strategy()
defp normalize_outbound_overflow_strategy(:close), do: :close
defp normalize_outbound_overflow_strategy(:drop_oldest), do: :drop_oldest
defp normalize_outbound_overflow_strategy(:drop_newest), do: :drop_newest
defp normalize_outbound_overflow_strategy(_value), do: configured_outbound_overflow_strategy()
defp configured_outbound_overflow_strategy do
:parrhesia
|> Application.get_env(:limits, [])
|> Keyword.get(:outbound_overflow_strategy, @default_outbound_overflow_strategy)
end
end