From 3b66ed19966534bd533baed839d0e896e8846736 Mon Sep 17 00:00:00 2001 From: Steffen Beyer Date: Fri, 13 Mar 2026 20:58:28 +0100 Subject: [PATCH] Add bounded outbound queue backpressure to connections --- PROGRESS.md | 2 +- config/config.exs | 5 +- lib/parrhesia/web/connection.ex | 283 ++++++++++++++++++++++++- test/parrhesia/config_test.exs | 1 + test/parrhesia/web/connection_test.exs | 101 +++++++++ 5 files changed, 386 insertions(+), 6 deletions(-) diff --git a/PROGRESS.md b/PROGRESS.md index ac6a5bc..bc0364e 100644 --- a/PROGRESS.md +++ b/PROGRESS.md @@ -30,7 +30,7 @@ Implementation checklist for Parrhesia relay. - [x] Build ETS-backed subscription index - [x] Implement candidate narrowing by kind/author/tag -- [ ] Add bounded outbound queues/backpressure per connection +- [x] Add bounded outbound queues/backpressure per connection - [ ] Add telemetry for ingest/query/fanout latency + queue depth ## Phase 4 — relay metadata and auth diff --git a/config/config.exs b/config/config.exs index c9d0891..a0cb173 100644 --- a/config/config.exs +++ b/config/config.exs @@ -6,7 +6,10 @@ config :parrhesia, max_event_bytes: 262_144, max_filters_per_req: 16, max_subscriptions_per_connection: 32, - max_event_future_skew_seconds: 900 + max_event_future_skew_seconds: 900, + max_outbound_queue: 256, + outbound_drain_batch_size: 64, + outbound_overflow_strategy: :close ], policies: [ auth_required_for_writes: false, diff --git a/lib/parrhesia/web/connection.ex b/lib/parrhesia/web/connection.ex index ccf26d0..8117199 100644 --- a/lib/parrhesia/web/connection.ex +++ b/lib/parrhesia/web/connection.ex @@ -10,11 +10,23 @@ defmodule Parrhesia.Web.Connection do alias Parrhesia.Subscriptions.Index @default_max_subscriptions_per_connection 32 + @default_max_outbound_queue 256 + @default_outbound_drain_batch_size 64 + @default_outbound_overflow_strategy :close + @drain_outbound_queue :drain_outbound_queue defstruct subscriptions: %{}, authenticated_pubkeys: MapSet.new(), max_subscriptions_per_connection: @default_max_subscriptions_per_connection, - subscription_index: Index + subscription_index: Index, + outbound_queue: :queue.new(), + outbound_queue_size: 0, + max_outbound_queue: @default_max_outbound_queue, + outbound_overflow_strategy: @default_outbound_overflow_strategy, + outbound_drain_batch_size: @default_outbound_drain_batch_size, + drain_scheduled?: false + + @type overflow_strategy :: :close | :drop_oldest | :drop_newest @type subscription :: %{ filters: [map()], @@ -25,14 +37,23 @@ defmodule Parrhesia.Web.Connection do subscriptions: %{String.t() => subscription()}, authenticated_pubkeys: MapSet.t(String.t()), max_subscriptions_per_connection: pos_integer(), - subscription_index: GenServer.server() | nil + subscription_index: GenServer.server() | nil, + outbound_queue: :queue.queue({String.t(), map()}), + outbound_queue_size: non_neg_integer(), + max_outbound_queue: pos_integer(), + outbound_overflow_strategy: overflow_strategy(), + outbound_drain_batch_size: pos_integer(), + drain_scheduled?: boolean() } @impl true def init(opts) do state = %__MODULE__{ max_subscriptions_per_connection: max_subscriptions_per_connection(opts), - subscription_index: subscription_index(opts) + subscription_index: subscription_index(opts), + max_outbound_queue: max_outbound_queue(opts), + outbound_overflow_strategy: outbound_overflow_strategy(opts), + outbound_drain_batch_size: outbound_drain_batch_size(opts) } {:ok, state} @@ -59,7 +80,11 @@ defmodule Parrhesia.Web.Connection do handle_req(state, subscription_id, filters) {:ok, {:close, subscription_id}} -> - next_state = drop_subscription(state, subscription_id) + next_state = + state + |> drop_subscription(subscription_id) + |> drop_queued_subscription_events(subscription_id) + :ok = maybe_remove_index_subscription(next_state, subscription_id) response = @@ -82,6 +107,26 @@ defmodule Parrhesia.Web.Connection do end @impl true + def handle_info({:fanout_event, subscription_id, event}, %__MODULE__{} = state) + when is_binary(subscription_id) and is_map(event) do + handle_fanout_events(state, [{subscription_id, event}]) + end + + def handle_info({:fanout_events, fanout_events}, %__MODULE__{} = state) + when is_list(fanout_events) do + handle_fanout_events(state, fanout_events) + end + + def handle_info(@drain_outbound_queue, %__MODULE__{} = state) do + {frames, next_state} = drain_outbound_frames(state) + + if frames == [] do + {:ok, next_state} + else + {:push, frames, next_state} + end + end + def handle_info(_message, %__MODULE__{} = state) do {:ok, state} end @@ -116,6 +161,140 @@ defmodule Parrhesia.Web.Connection do end end + defp handle_fanout_events(%__MODULE__{} = state, fanout_events) do + case enqueue_fanout_events(state, fanout_events) do + {:ok, next_state} -> + {:ok, maybe_schedule_drain(next_state)} + + {:close, next_state} -> + close_with_outbound_overflow(next_state) + end + end + + defp close_with_outbound_overflow(state) do + message = "rate-limited: outbound queue overflow" + notice = Protocol.encode_relay({:notice, message}) + + {:stop, :normal, {1008, message}, [{:text, notice}], state} + end + + defp enqueue_fanout_events(state, fanout_events) do + Enum.reduce_while(fanout_events, {:ok, state}, fn + {subscription_id, event}, {:ok, acc} when is_binary(subscription_id) and is_map(event) -> + case maybe_enqueue_fanout_event(acc, subscription_id, event) do + {:ok, next_acc} -> {:cont, {:ok, next_acc}} + {:close, next_acc} -> {:halt, {:close, next_acc}} + end + + _invalid_event, {:ok, acc} -> + {:cont, {:ok, acc}} + end) + end + + defp maybe_enqueue_fanout_event(state, subscription_id, event) do + if subscription_matches?(state, subscription_id, event) do + enqueue_outbound(state, {subscription_id, event}) + else + {:ok, state} + end + end + + defp subscription_matches?(%__MODULE__{} = state, subscription_id, event) do + case Map.get(state.subscriptions, subscription_id) do + nil -> false + %{filters: filters} -> Filter.matches_any?(event, filters) + end + end + + defp enqueue_outbound( + %__MODULE__{outbound_queue_size: queue_size, max_outbound_queue: max_outbound_queue} = + state, + queue_entry + ) + when queue_size < max_outbound_queue do + {:ok, + %__MODULE__{ + state + | outbound_queue: :queue.in(queue_entry, state.outbound_queue), + outbound_queue_size: queue_size + 1 + }} + end + + defp enqueue_outbound( + %__MODULE__{outbound_overflow_strategy: :drop_newest} = state, + _queue_entry + ), + do: {:ok, state} + + defp enqueue_outbound( + %__MODULE__{outbound_overflow_strategy: :drop_oldest} = state, + queue_entry + ) do + {next_queue, next_size} = + drop_oldest_and_enqueue(state.outbound_queue, state.outbound_queue_size, queue_entry) + + {:ok, %__MODULE__{state | outbound_queue: next_queue, outbound_queue_size: next_size}} + end + + defp enqueue_outbound(%__MODULE__{outbound_overflow_strategy: :close} = state, _queue_entry), + do: {:close, state} + + defp drop_oldest_and_enqueue(queue, queue_size, queue_entry) when queue_size > 0 do + {_dropped, truncated_queue} = :queue.out(queue) + {:queue.in(queue_entry, truncated_queue), queue_size} + end + + defp drop_oldest_and_enqueue(queue, queue_size, queue_entry) do + {:queue.in(queue_entry, queue), queue_size + 1} + end + + defp drain_outbound_frames(%__MODULE__{} = state) do + {frames, next_queue, remaining_size} = + pop_frames( + state.outbound_queue, + state.outbound_queue_size, + state.outbound_drain_batch_size, + [] + ) + + next_state = + %__MODULE__{ + state + | outbound_queue: next_queue, + outbound_queue_size: remaining_size, + drain_scheduled?: false + } + |> maybe_schedule_drain() + + {Enum.reverse(frames), next_state} + end + + defp pop_frames(queue, queue_size, _remaining_batch, acc) when queue_size == 0, + do: {acc, queue, queue_size} + + defp pop_frames(queue, queue_size, remaining_batch, acc) when remaining_batch <= 0, + do: {acc, queue, queue_size} + + defp pop_frames(queue, queue_size, remaining_batch, acc) do + case :queue.out(queue) do + {{:value, {subscription_id, event}}, next_queue} -> + frame = {:text, Protocol.encode_relay({:event, subscription_id, event})} + pop_frames(next_queue, queue_size - 1, remaining_batch - 1, [frame | acc]) + + {:empty, _same_queue} -> + {acc, :queue.new(), 0} + end + end + + defp maybe_schedule_drain(%__MODULE__{drain_scheduled?: true} = state), do: state + + defp maybe_schedule_drain(%__MODULE__{outbound_queue_size: 0} = state), do: state + + defp maybe_schedule_drain(%__MODULE__{} = state) do + send(self(), @drain_outbound_queue) + %__MODULE__{state | drain_scheduled?: true} + end + defp upsert_subscription(%__MODULE__{} = state, subscription_id, filters) do subscription = %{filters: filters, eose_sent?: true} @@ -141,6 +320,28 @@ defmodule Parrhesia.Web.Connection do %__MODULE__{state | subscriptions: subscriptions} end + defp drop_queued_subscription_events( + %__MODULE__{outbound_queue_size: 0} = state, + _subscription_id + ), + do: state + + defp drop_queued_subscription_events(%__MODULE__{} = state, subscription_id) do + filtered_entries = + state.outbound_queue + |> :queue.to_list() + |> Enum.reject(fn + {^subscription_id, _event} -> true + _queue_entry -> false + end) + + %__MODULE__{ + state + | outbound_queue: :queue.from_list(filtered_entries), + outbound_queue_size: length(filtered_entries) + } + end + defp maybe_upsert_index_subscription( %__MODULE__{subscription_index: nil}, _subscription_id, @@ -234,4 +435,78 @@ defmodule Parrhesia.Web.Connection do @default_max_subscriptions_per_connection ) end + + defp max_outbound_queue(opts) when is_list(opts) do + opts + |> Keyword.get(:max_outbound_queue) + |> normalize_max_outbound_queue() + end + + defp max_outbound_queue(opts) when is_map(opts) do + opts + |> Map.get(:max_outbound_queue) + |> normalize_max_outbound_queue() + end + + defp max_outbound_queue(_opts), do: configured_max_outbound_queue() + + defp normalize_max_outbound_queue(value) when is_integer(value) and value > 0, do: value + defp normalize_max_outbound_queue(_value), do: configured_max_outbound_queue() + + defp configured_max_outbound_queue do + :parrhesia + |> Application.get_env(:limits, []) + |> Keyword.get(:max_outbound_queue, @default_max_outbound_queue) + end + + defp outbound_drain_batch_size(opts) when is_list(opts) do + opts + |> Keyword.get(:outbound_drain_batch_size) + |> normalize_outbound_drain_batch_size() + end + + defp outbound_drain_batch_size(opts) when is_map(opts) do + opts + |> Map.get(:outbound_drain_batch_size) + |> normalize_outbound_drain_batch_size() + end + + defp outbound_drain_batch_size(_opts), do: configured_outbound_drain_batch_size() + + defp normalize_outbound_drain_batch_size(value) when is_integer(value) and value > 0, + do: value + + defp normalize_outbound_drain_batch_size(_value), do: configured_outbound_drain_batch_size() + + defp configured_outbound_drain_batch_size do + :parrhesia + |> Application.get_env(:limits, []) + |> Keyword.get(:outbound_drain_batch_size, @default_outbound_drain_batch_size) + end + + defp outbound_overflow_strategy(opts) when is_list(opts) do + opts + |> Keyword.get(:outbound_overflow_strategy) + |> normalize_outbound_overflow_strategy() + end + + defp outbound_overflow_strategy(opts) when is_map(opts) do + opts + |> Map.get(:outbound_overflow_strategy) + |> normalize_outbound_overflow_strategy() + end + + defp outbound_overflow_strategy(_opts), do: configured_outbound_overflow_strategy() + + defp normalize_outbound_overflow_strategy(:close), do: :close + defp normalize_outbound_overflow_strategy(:drop_oldest), do: :drop_oldest + defp normalize_outbound_overflow_strategy(:drop_newest), do: :drop_newest + + defp normalize_outbound_overflow_strategy(_value), do: configured_outbound_overflow_strategy() + + defp configured_outbound_overflow_strategy do + :parrhesia + |> Application.get_env(:limits, []) + |> Keyword.get(:outbound_overflow_strategy, @default_outbound_overflow_strategy) + end end diff --git a/test/parrhesia/config_test.exs b/test/parrhesia/config_test.exs index 955ad1f..aff5b32 100644 --- a/test/parrhesia/config_test.exs +++ b/test/parrhesia/config_test.exs @@ -5,6 +5,7 @@ defmodule Parrhesia.ConfigTest do assert Parrhesia.Config.get([:limits, :max_frame_bytes]) == 1_048_576 assert Parrhesia.Config.get([:limits, :max_event_bytes]) == 262_144 assert Parrhesia.Config.get([:limits, :max_event_future_skew_seconds]) == 900 + assert Parrhesia.Config.get([:limits, :max_outbound_queue]) == 256 assert Parrhesia.Config.get([:policies, :auth_required_for_writes]) == false assert Parrhesia.Config.get([:features, :nip_ee_mls]) == false end diff --git a/test/parrhesia/web/connection_test.exs b/test/parrhesia/web/connection_test.exs index bb991ca..a6a8457 100644 --- a/test/parrhesia/web/connection_test.exs +++ b/test/parrhesia/web/connection_test.exs @@ -133,6 +133,107 @@ defmodule Parrhesia.Web.ConnectionTest do ] end + test "fanout_event enqueues and drains matching events" do + state = subscribed_connection_state([]) + event = live_event("event-1", 1) + + assert {:ok, queued_state} = Connection.handle_info({:fanout_event, "sub-1", event}, state) + assert queued_state.outbound_queue_size == 1 + + assert_receive :drain_outbound_queue + + assert {:push, [{:text, payload}], drained_state} = + Connection.handle_info(:drain_outbound_queue, queued_state) + + assert drained_state.outbound_queue_size == 0 + assert Jason.decode!(payload) == ["EVENT", "sub-1", event] + end + + test "fanout_event ignores non-matching subscription filters" do + state = subscribed_connection_state([]) + + assert {:ok, next_state} = + Connection.handle_info({:fanout_event, "sub-1", live_event("event-2", 2)}, state) + + assert next_state.outbound_queue_size == 0 + refute_received :drain_outbound_queue + end + + test "outbound queue overflow closes connection when strategy is close" do + state = + subscribed_connection_state( + max_outbound_queue: 1, + outbound_overflow_strategy: :close, + outbound_drain_batch_size: 1 + ) + + event_one = live_event("event-1", 1) + event_two = live_event("event-2", 1) + + assert {:ok, queued_state} = + Connection.handle_info({:fanout_event, "sub-1", event_one}, state) + + assert queued_state.outbound_queue_size == 1 + assert_receive :drain_outbound_queue + + assert {:stop, :normal, {1008, message}, [{:text, notice_payload}], _overflow_state} = + Connection.handle_info({:fanout_event, "sub-1", event_two}, queued_state) + + assert message == "rate-limited: outbound queue overflow" + assert Jason.decode!(notice_payload) == ["NOTICE", message] + end + + test "outbound queue overflow drops oldest event when strategy is drop_oldest" do + state = + subscribed_connection_state( + max_outbound_queue: 1, + outbound_overflow_strategy: :drop_oldest, + outbound_drain_batch_size: 1 + ) + + event_one = live_event("event-1", 1) + event_two = live_event("event-2", 1) + + assert {:ok, queued_state} = + Connection.handle_info({:fanout_event, "sub-1", event_one}, state) + + assert queued_state.outbound_queue_size == 1 + assert_receive :drain_outbound_queue + + assert {:ok, replaced_state} = + Connection.handle_info({:fanout_event, "sub-1", event_two}, queued_state) + + assert replaced_state.outbound_queue_size == 1 + + assert {:push, [{:text, payload}], drained_state} = + Connection.handle_info(:drain_outbound_queue, replaced_state) + + assert drained_state.outbound_queue_size == 0 + assert Jason.decode!(payload) == ["EVENT", "sub-1", event_two] + end + + defp subscribed_connection_state(opts) do + {:ok, initial_state} = Connection.init(Keyword.put_new(opts, :subscription_index, nil)) + req_payload = Jason.encode!(["REQ", "sub-1", %{"kinds" => [1]}]) + + assert {:push, _, subscribed_state} = + Connection.handle_in({req_payload, [opcode: :text]}, initial_state) + + subscribed_state + end + + defp live_event(id, kind) do + %{ + "id" => id, + "pubkey" => String.duplicate("a", 64), + "created_at" => System.system_time(:second), + "kind" => kind, + "tags" => [], + "content" => "live", + "sig" => String.duplicate("b", 128) + } + end + defp valid_event do base_event = %{ "pubkey" => String.duplicate("1", 64),