Implement per-connection subscription lifecycle and EOSE semantics
This commit is contained in:
@@ -15,7 +15,7 @@ Implementation checklist for Parrhesia relay.
|
||||
- [x] Implement message decode/encode for `EVENT`, `REQ`, `CLOSE`
|
||||
- [x] Implement strict event validation (`id`, `sig`, shape, timestamps)
|
||||
- [x] Implement filter evaluation engine (AND/OR semantics)
|
||||
- [ ] Implement subscription lifecycle + `EOSE` behavior
|
||||
- [x] Implement subscription lifecycle + `EOSE` behavior
|
||||
- [x] Implement canonical `OK`, `NOTICE`, `CLOSED` responses + prefixes
|
||||
|
||||
## Phase 2 — storage boundary + postgres adapter
|
||||
|
||||
@@ -8,16 +8,27 @@ defmodule Parrhesia.Web.Connection do
|
||||
alias Parrhesia.Protocol
|
||||
alias Parrhesia.Protocol.Filter
|
||||
|
||||
defstruct subscriptions: MapSet.new(), authenticated_pubkeys: MapSet.new()
|
||||
@default_max_subscriptions_per_connection 32
|
||||
|
||||
defstruct subscriptions: %{},
|
||||
authenticated_pubkeys: MapSet.new(),
|
||||
max_subscriptions_per_connection: @default_max_subscriptions_per_connection
|
||||
|
||||
@type subscription :: %{
|
||||
filters: [map()],
|
||||
eose_sent?: boolean()
|
||||
}
|
||||
|
||||
@type t :: %__MODULE__{
|
||||
subscriptions: MapSet.t(String.t()),
|
||||
authenticated_pubkeys: MapSet.t(String.t())
|
||||
subscriptions: %{String.t() => subscription()},
|
||||
authenticated_pubkeys: MapSet.t(String.t()),
|
||||
max_subscriptions_per_connection: pos_integer()
|
||||
}
|
||||
|
||||
@impl true
|
||||
def init(_opts) do
|
||||
{:ok, %__MODULE__{}}
|
||||
def init(opts) do
|
||||
state = %__MODULE__{max_subscriptions_per_connection: max_subscriptions_per_connection(opts)}
|
||||
{:ok, state}
|
||||
end
|
||||
|
||||
@impl true
|
||||
@@ -38,19 +49,7 @@ defmodule Parrhesia.Web.Connection do
|
||||
{:push, {:text, response}, state}
|
||||
|
||||
{:ok, {:req, subscription_id, filters}} ->
|
||||
case Filter.validate_filters(filters) do
|
||||
:ok ->
|
||||
next_state = put_subscription(state, subscription_id)
|
||||
response = Protocol.encode_relay({:eose, subscription_id})
|
||||
|
||||
{:push, {:text, response}, next_state}
|
||||
|
||||
{:error, reason} ->
|
||||
response =
|
||||
Protocol.encode_relay({:closed, subscription_id, Filter.error_message(reason)})
|
||||
|
||||
{:push, {:text, response}, state}
|
||||
end
|
||||
handle_req(state, subscription_id, filters)
|
||||
|
||||
{:ok, {:close, subscription_id}} ->
|
||||
next_state = drop_subscription(state, subscription_id)
|
||||
@@ -79,13 +78,79 @@ defmodule Parrhesia.Web.Connection do
|
||||
{:ok, state}
|
||||
end
|
||||
|
||||
defp put_subscription(%__MODULE__{} = state, subscription_id) do
|
||||
subscriptions = MapSet.put(state.subscriptions, subscription_id)
|
||||
defp handle_req(%__MODULE__{} = state, subscription_id, filters) do
|
||||
with :ok <- Filter.validate_filters(filters),
|
||||
{:ok, next_state} <- upsert_subscription(state, subscription_id, filters) do
|
||||
response = Protocol.encode_relay({:eose, subscription_id})
|
||||
{:push, {:text, response}, next_state}
|
||||
else
|
||||
{:error, :subscription_limit_reached} ->
|
||||
response =
|
||||
Protocol.encode_relay({
|
||||
:closed,
|
||||
subscription_id,
|
||||
"rate-limited: maximum subscriptions per connection exceeded"
|
||||
})
|
||||
|
||||
{:push, {:text, response}, state}
|
||||
|
||||
{:error, reason} ->
|
||||
response = Protocol.encode_relay({:closed, subscription_id, Filter.error_message(reason)})
|
||||
{:push, {:text, response}, state}
|
||||
end
|
||||
end
|
||||
|
||||
defp upsert_subscription(%__MODULE__{} = state, subscription_id, filters) do
|
||||
subscription = %{filters: filters, eose_sent?: true}
|
||||
|
||||
cond do
|
||||
Map.has_key?(state.subscriptions, subscription_id) ->
|
||||
{:ok, put_subscription(state, subscription_id, subscription)}
|
||||
|
||||
map_size(state.subscriptions) < state.max_subscriptions_per_connection ->
|
||||
{:ok, put_subscription(state, subscription_id, subscription)}
|
||||
|
||||
true ->
|
||||
{:error, :subscription_limit_reached}
|
||||
end
|
||||
end
|
||||
|
||||
defp put_subscription(%__MODULE__{} = state, subscription_id, subscription) do
|
||||
subscriptions = Map.put(state.subscriptions, subscription_id, subscription)
|
||||
%__MODULE__{state | subscriptions: subscriptions}
|
||||
end
|
||||
|
||||
defp drop_subscription(%__MODULE__{} = state, subscription_id) do
|
||||
subscriptions = MapSet.delete(state.subscriptions, subscription_id)
|
||||
subscriptions = Map.delete(state.subscriptions, subscription_id)
|
||||
%__MODULE__{state | subscriptions: subscriptions}
|
||||
end
|
||||
|
||||
defp max_subscriptions_per_connection(opts) when is_list(opts) do
|
||||
opts
|
||||
|> Keyword.get(:max_subscriptions_per_connection)
|
||||
|> normalize_max_subscriptions_per_connection()
|
||||
end
|
||||
|
||||
defp max_subscriptions_per_connection(opts) when is_map(opts) do
|
||||
opts
|
||||
|> Map.get(:max_subscriptions_per_connection)
|
||||
|> normalize_max_subscriptions_per_connection()
|
||||
end
|
||||
|
||||
defp max_subscriptions_per_connection(_opts), do: configured_max_subscriptions_per_connection()
|
||||
|
||||
defp normalize_max_subscriptions_per_connection(value) when is_integer(value) and value > 0,
|
||||
do: value
|
||||
|
||||
defp normalize_max_subscriptions_per_connection(_value),
|
||||
do: configured_max_subscriptions_per_connection()
|
||||
|
||||
defp configured_max_subscriptions_per_connection do
|
||||
:parrhesia
|
||||
|> Application.get_env(:limits, [])
|
||||
|> Keyword.get(
|
||||
:max_subscriptions_per_connection,
|
||||
@default_max_subscriptions_per_connection
|
||||
)
|
||||
end
|
||||
end
|
||||
|
||||
@@ -12,7 +12,30 @@ defmodule Parrhesia.Web.ConnectionTest do
|
||||
assert {:push, {:text, response}, next_state} =
|
||||
Connection.handle_in({req_payload, [opcode: :text]}, state)
|
||||
|
||||
assert MapSet.member?(next_state.subscriptions, "sub-123")
|
||||
assert Map.has_key?(next_state.subscriptions, "sub-123")
|
||||
assert next_state.subscriptions["sub-123"].filters == [%{"kinds" => [1]}]
|
||||
assert next_state.subscriptions["sub-123"].eose_sent?
|
||||
assert Jason.decode!(response) == ["EOSE", "sub-123"]
|
||||
end
|
||||
|
||||
test "REQ with same subscription id replaces existing subscription" do
|
||||
{:ok, state} = Connection.init(%{})
|
||||
|
||||
first_req = Jason.encode!(["REQ", "sub-123", %{"kinds" => [1]}])
|
||||
second_req = Jason.encode!(["REQ", "sub-123", %{"kinds" => [2], "limit" => 5}])
|
||||
|
||||
assert {:push, _, subscribed_state} =
|
||||
Connection.handle_in({first_req, [opcode: :text]}, state)
|
||||
|
||||
assert {:push, {:text, response}, replaced_state} =
|
||||
Connection.handle_in({second_req, [opcode: :text]}, subscribed_state)
|
||||
|
||||
assert map_size(replaced_state.subscriptions) == 1
|
||||
|
||||
assert replaced_state.subscriptions["sub-123"].filters == [
|
||||
%{"kinds" => [2], "limit" => 5}
|
||||
]
|
||||
|
||||
assert Jason.decode!(response) == ["EOSE", "sub-123"]
|
||||
end
|
||||
|
||||
@@ -27,10 +50,31 @@ defmodule Parrhesia.Web.ConnectionTest do
|
||||
assert {:push, {:text, response}, next_state} =
|
||||
Connection.handle_in({close_payload, [opcode: :text]}, subscribed_state)
|
||||
|
||||
refute MapSet.member?(next_state.subscriptions, "sub-123")
|
||||
refute Map.has_key?(next_state.subscriptions, "sub-123")
|
||||
assert Jason.decode!(response) == ["CLOSED", "sub-123", "error: subscription closed"]
|
||||
end
|
||||
|
||||
test "REQ above max subscriptions returns CLOSED and keeps existing subscriptions" do
|
||||
{:ok, state} = Connection.init(max_subscriptions_per_connection: 1)
|
||||
|
||||
req_one = Jason.encode!(["REQ", "sub-1", %{"kinds" => [1]}])
|
||||
req_two = Jason.encode!(["REQ", "sub-2", %{"kinds" => [1]}])
|
||||
|
||||
assert {:push, _, first_state} = Connection.handle_in({req_one, [opcode: :text]}, state)
|
||||
|
||||
assert {:push, {:text, response}, second_state} =
|
||||
Connection.handle_in({req_two, [opcode: :text]}, first_state)
|
||||
|
||||
assert map_size(second_state.subscriptions) == 1
|
||||
assert Map.has_key?(second_state.subscriptions, "sub-1")
|
||||
|
||||
assert Jason.decode!(response) == [
|
||||
"CLOSED",
|
||||
"sub-2",
|
||||
"rate-limited: maximum subscriptions per connection exceeded"
|
||||
]
|
||||
end
|
||||
|
||||
test "invalid input returns NOTICE" do
|
||||
{:ok, state} = Connection.init(%{})
|
||||
|
||||
|
||||
Reference in New Issue
Block a user