From 73811c0772471efb9f5c3cd80c53ba77092c94b3 Mon Sep 17 00:00:00 2001 From: Steffen Beyer Date: Fri, 13 Mar 2026 20:03:14 +0100 Subject: [PATCH] Implement per-connection subscription lifecycle and EOSE semantics --- PROGRESS.md | 2 +- lib/parrhesia/web/connection.ex | 107 ++++++++++++++++++++----- test/parrhesia/web/connection_test.exs | 48 ++++++++++- 3 files changed, 133 insertions(+), 24 deletions(-) diff --git a/PROGRESS.md b/PROGRESS.md index 7b21e44..5c42b4b 100644 --- a/PROGRESS.md +++ b/PROGRESS.md @@ -15,7 +15,7 @@ Implementation checklist for Parrhesia relay. - [x] Implement message decode/encode for `EVENT`, `REQ`, `CLOSE` - [x] Implement strict event validation (`id`, `sig`, shape, timestamps) - [x] Implement filter evaluation engine (AND/OR semantics) -- [ ] Implement subscription lifecycle + `EOSE` behavior +- [x] Implement subscription lifecycle + `EOSE` behavior - [x] Implement canonical `OK`, `NOTICE`, `CLOSED` responses + prefixes ## Phase 2 — storage boundary + postgres adapter diff --git a/lib/parrhesia/web/connection.ex b/lib/parrhesia/web/connection.ex index ad0f99a..bafaba9 100644 --- a/lib/parrhesia/web/connection.ex +++ b/lib/parrhesia/web/connection.ex @@ -8,16 +8,27 @@ defmodule Parrhesia.Web.Connection do alias Parrhesia.Protocol alias Parrhesia.Protocol.Filter - defstruct subscriptions: MapSet.new(), authenticated_pubkeys: MapSet.new() + @default_max_subscriptions_per_connection 32 + + defstruct subscriptions: %{}, + authenticated_pubkeys: MapSet.new(), + max_subscriptions_per_connection: @default_max_subscriptions_per_connection + + @type subscription :: %{ + filters: [map()], + eose_sent?: boolean() + } @type t :: %__MODULE__{ - subscriptions: MapSet.t(String.t()), - authenticated_pubkeys: MapSet.t(String.t()) + subscriptions: %{String.t() => subscription()}, + authenticated_pubkeys: MapSet.t(String.t()), + max_subscriptions_per_connection: pos_integer() } @impl true - def init(_opts) do - {:ok, %__MODULE__{}} + def init(opts) do + state = %__MODULE__{max_subscriptions_per_connection: max_subscriptions_per_connection(opts)} + {:ok, state} end @impl true @@ -38,19 +49,7 @@ defmodule Parrhesia.Web.Connection do {:push, {:text, response}, state} {:ok, {:req, subscription_id, filters}} -> - case Filter.validate_filters(filters) do - :ok -> - next_state = put_subscription(state, subscription_id) - response = Protocol.encode_relay({:eose, subscription_id}) - - {:push, {:text, response}, next_state} - - {:error, reason} -> - response = - Protocol.encode_relay({:closed, subscription_id, Filter.error_message(reason)}) - - {:push, {:text, response}, state} - end + handle_req(state, subscription_id, filters) {:ok, {:close, subscription_id}} -> next_state = drop_subscription(state, subscription_id) @@ -79,13 +78,79 @@ defmodule Parrhesia.Web.Connection do {:ok, state} end - defp put_subscription(%__MODULE__{} = state, subscription_id) do - subscriptions = MapSet.put(state.subscriptions, subscription_id) + defp handle_req(%__MODULE__{} = state, subscription_id, filters) do + with :ok <- Filter.validate_filters(filters), + {:ok, next_state} <- upsert_subscription(state, subscription_id, filters) do + response = Protocol.encode_relay({:eose, subscription_id}) + {:push, {:text, response}, next_state} + else + {:error, :subscription_limit_reached} -> + response = + Protocol.encode_relay({ + :closed, + subscription_id, + "rate-limited: maximum subscriptions per connection exceeded" + }) + + {:push, {:text, response}, state} + + {:error, reason} -> + response = Protocol.encode_relay({:closed, subscription_id, Filter.error_message(reason)}) + {:push, {:text, response}, state} + end + end + + defp upsert_subscription(%__MODULE__{} = state, subscription_id, filters) do + subscription = %{filters: filters, eose_sent?: true} + + cond do + Map.has_key?(state.subscriptions, subscription_id) -> + {:ok, put_subscription(state, subscription_id, subscription)} + + map_size(state.subscriptions) < state.max_subscriptions_per_connection -> + {:ok, put_subscription(state, subscription_id, subscription)} + + true -> + {:error, :subscription_limit_reached} + end + end + + defp put_subscription(%__MODULE__{} = state, subscription_id, subscription) do + subscriptions = Map.put(state.subscriptions, subscription_id, subscription) %__MODULE__{state | subscriptions: subscriptions} end defp drop_subscription(%__MODULE__{} = state, subscription_id) do - subscriptions = MapSet.delete(state.subscriptions, subscription_id) + subscriptions = Map.delete(state.subscriptions, subscription_id) %__MODULE__{state | subscriptions: subscriptions} end + + defp max_subscriptions_per_connection(opts) when is_list(opts) do + opts + |> Keyword.get(:max_subscriptions_per_connection) + |> normalize_max_subscriptions_per_connection() + end + + defp max_subscriptions_per_connection(opts) when is_map(opts) do + opts + |> Map.get(:max_subscriptions_per_connection) + |> normalize_max_subscriptions_per_connection() + end + + defp max_subscriptions_per_connection(_opts), do: configured_max_subscriptions_per_connection() + + defp normalize_max_subscriptions_per_connection(value) when is_integer(value) and value > 0, + do: value + + defp normalize_max_subscriptions_per_connection(_value), + do: configured_max_subscriptions_per_connection() + + defp configured_max_subscriptions_per_connection do + :parrhesia + |> Application.get_env(:limits, []) + |> Keyword.get( + :max_subscriptions_per_connection, + @default_max_subscriptions_per_connection + ) + end end diff --git a/test/parrhesia/web/connection_test.exs b/test/parrhesia/web/connection_test.exs index 1781eb0..bb991ca 100644 --- a/test/parrhesia/web/connection_test.exs +++ b/test/parrhesia/web/connection_test.exs @@ -12,7 +12,30 @@ defmodule Parrhesia.Web.ConnectionTest do assert {:push, {:text, response}, next_state} = Connection.handle_in({req_payload, [opcode: :text]}, state) - assert MapSet.member?(next_state.subscriptions, "sub-123") + assert Map.has_key?(next_state.subscriptions, "sub-123") + assert next_state.subscriptions["sub-123"].filters == [%{"kinds" => [1]}] + assert next_state.subscriptions["sub-123"].eose_sent? + assert Jason.decode!(response) == ["EOSE", "sub-123"] + end + + test "REQ with same subscription id replaces existing subscription" do + {:ok, state} = Connection.init(%{}) + + first_req = Jason.encode!(["REQ", "sub-123", %{"kinds" => [1]}]) + second_req = Jason.encode!(["REQ", "sub-123", %{"kinds" => [2], "limit" => 5}]) + + assert {:push, _, subscribed_state} = + Connection.handle_in({first_req, [opcode: :text]}, state) + + assert {:push, {:text, response}, replaced_state} = + Connection.handle_in({second_req, [opcode: :text]}, subscribed_state) + + assert map_size(replaced_state.subscriptions) == 1 + + assert replaced_state.subscriptions["sub-123"].filters == [ + %{"kinds" => [2], "limit" => 5} + ] + assert Jason.decode!(response) == ["EOSE", "sub-123"] end @@ -27,10 +50,31 @@ defmodule Parrhesia.Web.ConnectionTest do assert {:push, {:text, response}, next_state} = Connection.handle_in({close_payload, [opcode: :text]}, subscribed_state) - refute MapSet.member?(next_state.subscriptions, "sub-123") + refute Map.has_key?(next_state.subscriptions, "sub-123") assert Jason.decode!(response) == ["CLOSED", "sub-123", "error: subscription closed"] end + test "REQ above max subscriptions returns CLOSED and keeps existing subscriptions" do + {:ok, state} = Connection.init(max_subscriptions_per_connection: 1) + + req_one = Jason.encode!(["REQ", "sub-1", %{"kinds" => [1]}]) + req_two = Jason.encode!(["REQ", "sub-2", %{"kinds" => [1]}]) + + assert {:push, _, first_state} = Connection.handle_in({req_one, [opcode: :text]}, state) + + assert {:push, {:text, response}, second_state} = + Connection.handle_in({req_two, [opcode: :text]}, first_state) + + assert map_size(second_state.subscriptions) == 1 + assert Map.has_key?(second_state.subscriptions, "sub-1") + + assert Jason.decode!(response) == [ + "CLOSED", + "sub-2", + "rate-limited: maximum subscriptions per connection exceeded" + ] + end + test "invalid input returns NOTICE" do {:ok, state} = Connection.init(%{})