From 36365710a8cee5957cc9f9b6274d772278f1e1a5 Mon Sep 17 00:00:00 2001 From: Steffen Beyer Date: Sat, 14 Mar 2026 04:51:47 +0100 Subject: [PATCH] Harden NEG session handling and gate feature wiring --- config/config.exs | 7 +- lib/parrhesia/negentropy/sessions.ex | 217 ++++++++++++++++++-- lib/parrhesia/subscriptions/supervisor.ex | 24 ++- lib/parrhesia/web/connection.ex | 20 +- lib/parrhesia/web/readiness.ex | 16 +- lib/parrhesia/web/relay_info.ex | 36 ++-- test/parrhesia/application_test.exs | 11 +- test/parrhesia/negentropy/sessions_test.exs | 35 ++++ 8 files changed, 317 insertions(+), 49 deletions(-) diff --git a/config/config.exs b/config/config.exs index 9349c9b..8ecd920 100644 --- a/config/config.exs +++ b/config/config.exs @@ -17,7 +17,12 @@ config :parrhesia, auth_max_age_seconds: 600, max_outbound_queue: 256, outbound_drain_batch_size: 64, - outbound_overflow_strategy: :close + outbound_overflow_strategy: :close, + max_negentropy_payload_bytes: 4096, + max_negentropy_sessions_per_connection: 8, + max_negentropy_total_sessions: 10_000, + negentropy_session_idle_timeout_seconds: 60, + negentropy_session_sweep_interval_seconds: 10 ], policies: [ auth_required_for_writes: false, diff --git a/lib/parrhesia/negentropy/sessions.ex b/lib/parrhesia/negentropy/sessions.ex index aae0778..080f5b6 100644 --- a/lib/parrhesia/negentropy/sessions.ex +++ b/lib/parrhesia/negentropy/sessions.ex @@ -7,10 +7,17 @@ defmodule Parrhesia.Negentropy.Sessions do @type session_key :: {pid(), String.t()} + @default_max_payload_bytes 4096 + @default_max_sessions_per_owner 8 + @default_max_total_sessions 10_000 + @default_max_idle_seconds 60 + @default_sweep_interval_seconds 10 + @sweep_idle_sessions :sweep_idle_sessions + @spec start_link(keyword()) :: GenServer.on_start() def start_link(opts \\ []) do name = Keyword.get(opts, :name, __MODULE__) - GenServer.start_link(__MODULE__, :ok, name: name) + GenServer.start_link(__MODULE__, opts, name: name) end @spec open(GenServer.server(), pid(), String.t(), map()) :: {:ok, map()} | {:error, term()} @@ -32,26 +39,63 @@ defmodule Parrhesia.Negentropy.Sessions do end @impl true - def init(:ok) do - {:ok, %{sessions: %{}, monitors: %{}}} + def init(opts) do + max_idle_ms = + normalize_positive_integer(Keyword.get(opts, :max_idle_seconds), max_idle_seconds()) * 1000 + + sweep_interval_ms = + normalize_positive_integer( + Keyword.get(opts, :sweep_interval_seconds), + sweep_interval_seconds() + ) * + 1000 + + state = %{ + sessions: %{}, + monitors: %{}, + max_payload_bytes: + normalize_positive_integer(Keyword.get(opts, :max_payload_bytes), max_payload_bytes()), + max_sessions_per_owner: + normalize_positive_integer( + Keyword.get(opts, :max_sessions_per_owner), + max_sessions_per_owner() + ), + max_total_sessions: + normalize_positive_integer(Keyword.get(opts, :max_total_sessions), max_total_sessions()), + max_idle_ms: max_idle_ms, + sweep_interval_ms: sweep_interval_ms + } + + :ok = schedule_idle_sweep(sweep_interval_ms) + + {:ok, state} end @impl true def handle_call({:open, owner_pid, subscription_id, params}, _from, state) do key = {owner_pid, subscription_id} - session = %{ - cursor: 0, - params: params, - opened_at: System.system_time(:second) - } + with :ok <- validate_payload_size(params, state.max_payload_bytes), + :ok <- enforce_session_limits(state, owner_pid, key) do + now_ms = System.monotonic_time(:millisecond) - state = - state - |> ensure_monitor(owner_pid) - |> put_in([:sessions, key], session) + session = %{ + cursor: 0, + params: params, + opened_at: System.system_time(:second), + last_active_at_ms: now_ms + } - {:reply, {:ok, %{"status" => "open", "cursor" => 0}}, state} + state = + state + |> ensure_monitor(owner_pid) + |> put_in([:sessions, key], session) + + {:reply, {:ok, %{"status" => "open", "cursor" => 0}}, state} + else + {:error, reason} -> + {:reply, {:error, reason}, state} + end end def handle_call({:message, owner_pid, subscription_id, payload}, _from, state) do @@ -62,22 +106,68 @@ defmodule Parrhesia.Negentropy.Sessions do {:reply, {:error, :unknown_session}, state} session -> - cursor = session.cursor + 1 + case validate_payload_size(payload, state.max_payload_bytes) do + :ok -> + cursor = session.cursor + 1 - next_session = %{session | cursor: cursor, params: Map.merge(session.params, payload)} - state = put_in(state, [:sessions, key], next_session) + next_session = %{ + session + | cursor: cursor, + last_active_at_ms: System.monotonic_time(:millisecond) + } - {:reply, {:ok, %{"status" => "ack", "cursor" => cursor}}, state} + state = put_in(state, [:sessions, key], next_session) + + {:reply, {:ok, %{"status" => "ack", "cursor" => cursor}}, state} + + {:error, reason} -> + {:reply, {:error, reason}, state} + end end end def handle_call({:close, owner_pid, subscription_id}, _from, state) do key = {owner_pid, subscription_id} - state = update_in(state.sessions, &Map.delete(&1, key)) + + state = + state + |> update_in([:sessions], &Map.delete(&1, key)) + |> maybe_remove_monitor_if_owner_has_no_sessions(owner_pid) + {:reply, :ok, state} end @impl true + def handle_info(@sweep_idle_sessions, state) do + now_ms = System.monotonic_time(:millisecond) + + sessions = + Enum.reduce(state.sessions, %{}, fn {key, session}, acc -> + idle_ms = now_ms - Map.get(session, :last_active_at_ms, now_ms) + + if idle_ms >= state.max_idle_ms do + acc + else + Map.put(acc, key, session) + end + end) + + owner_pids = + sessions + |> Map.keys() + |> Enum.map(fn {owner_pid, _subscription_id} -> owner_pid end) + |> MapSet.new() + + state = + state + |> Map.put(:sessions, sessions) + |> clear_monitors_without_sessions(owner_pids) + + :ok = schedule_idle_sweep(state.sweep_interval_ms) + + {:noreply, state} + end + def handle_info({:DOWN, monitor_ref, :process, owner_pid, _reason}, state) do case Map.get(state.monitors, owner_pid) do ^monitor_ref -> @@ -95,6 +185,16 @@ defmodule Parrhesia.Negentropy.Sessions do def handle_info(_message, state), do: {:noreply, state} + defp clear_monitors_without_sessions(state, owner_pids) do + Enum.reduce(Map.keys(state.monitors), state, fn owner_pid, acc -> + if MapSet.member?(owner_pids, owner_pid) do + acc + else + maybe_remove_monitor(acc, owner_pid) + end + end) + end + defp remove_owner_sessions(state, owner_pid) do update_in(state.sessions, fn sessions -> sessions @@ -103,6 +203,39 @@ defmodule Parrhesia.Negentropy.Sessions do end) end + defp validate_payload_size(payload, max_payload_bytes) do + if :erlang.external_size(payload) <= max_payload_bytes do + :ok + else + {:error, :payload_too_large} + end + end + + defp enforce_session_limits(state, owner_pid, key) do + if Map.has_key?(state.sessions, key) do + :ok + else + total_sessions = map_size(state.sessions) + + cond do + total_sessions >= state.max_total_sessions -> + {:error, :session_limit_reached} + + owner_session_count(state.sessions, owner_pid) >= state.max_sessions_per_owner -> + {:error, :owner_session_limit_reached} + + true -> + :ok + end + end + end + + defp owner_session_count(sessions, owner_pid) do + Enum.count(sessions, fn {{session_owner, _subscription_id}, _session} -> + session_owner == owner_pid + end) + end + defp ensure_monitor(state, owner_pid) do case Map.has_key?(state.monitors, owner_pid) do true -> state @@ -110,6 +243,14 @@ defmodule Parrhesia.Negentropy.Sessions do end end + defp maybe_remove_monitor_if_owner_has_no_sessions(state, owner_pid) do + if owner_session_count(state.sessions, owner_pid) == 0 do + maybe_remove_monitor(state, owner_pid) + else + state + end + end + defp maybe_remove_monitor(state, owner_pid) do {monitor_ref, monitors} = Map.pop(state.monitors, owner_pid) @@ -119,4 +260,44 @@ defmodule Parrhesia.Negentropy.Sessions do Map.put(state, :monitors, monitors) end + + defp schedule_idle_sweep(sweep_interval_ms) do + _timer_ref = Process.send_after(self(), @sweep_idle_sessions, sweep_interval_ms) + :ok + end + + defp max_payload_bytes do + :parrhesia + |> Application.get_env(:limits, []) + |> Keyword.get(:max_negentropy_payload_bytes, @default_max_payload_bytes) + end + + defp max_sessions_per_owner do + :parrhesia + |> Application.get_env(:limits, []) + |> Keyword.get(:max_negentropy_sessions_per_connection, @default_max_sessions_per_owner) + end + + defp max_total_sessions do + :parrhesia + |> Application.get_env(:limits, []) + |> Keyword.get(:max_negentropy_total_sessions, @default_max_total_sessions) + end + + defp max_idle_seconds do + :parrhesia + |> Application.get_env(:limits, []) + |> Keyword.get(:negentropy_session_idle_timeout_seconds, @default_max_idle_seconds) + end + + defp sweep_interval_seconds do + :parrhesia + |> Application.get_env(:limits, []) + |> Keyword.get(:negentropy_session_sweep_interval_seconds, @default_sweep_interval_seconds) + end + + defp normalize_positive_integer(value, _default) when is_integer(value) and value > 0, + do: value + + defp normalize_positive_integer(_value, default), do: default end diff --git a/lib/parrhesia/subscriptions/supervisor.ex b/lib/parrhesia/subscriptions/supervisor.ex index 882f41e..6db147c 100644 --- a/lib/parrhesia/subscriptions/supervisor.ex +++ b/lib/parrhesia/subscriptions/supervisor.ex @@ -11,12 +11,26 @@ defmodule Parrhesia.Subscriptions.Supervisor do @impl true def init(_init_arg) do - children = [ - {Parrhesia.Subscriptions.Index, name: Parrhesia.Subscriptions.Index}, - {Parrhesia.Negentropy.Sessions, name: Parrhesia.Negentropy.Sessions}, - {Parrhesia.Fanout.MultiNode, name: Parrhesia.Fanout.MultiNode} - ] + children = + [ + {Parrhesia.Subscriptions.Index, name: Parrhesia.Subscriptions.Index} + ] ++ + negentropy_children() ++ [{Parrhesia.Fanout.MultiNode, name: Parrhesia.Fanout.MultiNode}] Supervisor.init(children, strategy: :one_for_one) end + + defp negentropy_children do + if negentropy_enabled?() do + [{Parrhesia.Negentropy.Sessions, name: Parrhesia.Negentropy.Sessions}] + else + [] + end + end + + defp negentropy_enabled? do + :parrhesia + |> Application.get_env(:features, []) + |> Keyword.get(:nip_77_negentropy, true) + end end diff --git a/lib/parrhesia/web/connection.ex b/lib/parrhesia/web/connection.ex index 38f5fb9..6fbdf40 100644 --- a/lib/parrhesia/web/connection.ex +++ b/lib/parrhesia/web/connection.ex @@ -1155,17 +1155,31 @@ defmodule Parrhesia.Web.Connection do defp negentropy_sessions(opts) when is_list(opts) do opts - |> Keyword.get(:negentropy_sessions, Sessions) + |> Keyword.get(:negentropy_sessions, configured_negentropy_sessions()) |> normalize_server_ref() end defp negentropy_sessions(opts) when is_map(opts) do opts - |> Map.get(:negentropy_sessions, Sessions) + |> Map.get(:negentropy_sessions, configured_negentropy_sessions()) |> normalize_server_ref() end - defp negentropy_sessions(_opts), do: Sessions + defp negentropy_sessions(_opts), do: configured_negentropy_sessions() + + defp configured_negentropy_sessions do + if negentropy_enabled?() do + Sessions + else + nil + end + end + + defp negentropy_enabled? do + :parrhesia + |> Application.get_env(:features, []) + |> Keyword.get(:nip_77_negentropy, true) + end defp normalize_server_ref(server_ref) when is_pid(server_ref) or is_atom(server_ref), do: server_ref diff --git a/lib/parrhesia/web/readiness.ex b/lib/parrhesia/web/readiness.ex index ebf4763..df59fd4 100644 --- a/lib/parrhesia/web/readiness.ex +++ b/lib/parrhesia/web/readiness.ex @@ -5,10 +5,24 @@ defmodule Parrhesia.Web.Readiness do def ready? do process_ready?(Parrhesia.Subscriptions.Index) and process_ready?(Parrhesia.Auth.Challenges) and - process_ready?(Parrhesia.Negentropy.Sessions) and + negentropy_ready?() and process_ready?(Parrhesia.Repo) end + defp negentropy_ready? do + if negentropy_enabled?() do + process_ready?(Parrhesia.Negentropy.Sessions) + else + true + end + end + + defp negentropy_enabled? do + :parrhesia + |> Application.get_env(:features, []) + |> Keyword.get(:nip_77_negentropy, true) + end + defp process_ready?(name) do case Process.whereis(name) do pid when is_pid(pid) -> true diff --git a/lib/parrhesia/web/relay_info.ex b/lib/parrhesia/web/relay_info.ex index 62494bc..3f38938 100644 --- a/lib/parrhesia/web/relay_info.ex +++ b/lib/parrhesia/web/relay_info.ex @@ -17,26 +17,16 @@ defmodule Parrhesia.Web.RelayInfo do end defp supported_nips do - [ - 1, - 9, - 11, - 13, - 17, - 40, - 42, - 43, - 44, - 45, - 50, - 59, - 62, - 66, - 70, - 77, - 86, - 98 - ] + base = [1, 9, 11, 13, 17, 40, 42, 43, 44, 45, 50, 59, 62, 66, 70] + + with_negentropy = + if negentropy_enabled?() do + base ++ [77] + else + base + end + + with_negentropy ++ [86, 98] end defp limitations do @@ -48,4 +38,10 @@ defmodule Parrhesia.Web.RelayInfo do "auth_required" => Parrhesia.Config.get([:policies, :auth_required_for_reads], false) } end + + defp negentropy_enabled? do + :parrhesia + |> Application.get_env(:features, []) + |> Keyword.get(:nip_77_negentropy, true) + end end diff --git a/test/parrhesia/application_test.exs b/test/parrhesia/application_test.exs index 2589d2e..9135608 100644 --- a/test/parrhesia/application_test.exs +++ b/test/parrhesia/application_test.exs @@ -18,6 +18,15 @@ defmodule Parrhesia.ApplicationTest do end) assert is_pid(Process.whereis(Parrhesia.Auth.Challenges)) - assert is_pid(Process.whereis(Parrhesia.Negentropy.Sessions)) + + if negentropy_enabled?() do + assert is_pid(Process.whereis(Parrhesia.Negentropy.Sessions)) + end + end + + defp negentropy_enabled? do + :parrhesia + |> Application.get_env(:features, []) + |> Keyword.get(:nip_77_negentropy, true) end end diff --git a/test/parrhesia/negentropy/sessions_test.exs b/test/parrhesia/negentropy/sessions_test.exs index 87037a6..a5f3089 100644 --- a/test/parrhesia/negentropy/sessions_test.exs +++ b/test/parrhesia/negentropy/sessions_test.exs @@ -15,4 +15,39 @@ defmodule Parrhesia.Negentropy.SessionsTest do assert :ok = Sessions.close(server, self(), "sub-neg") assert {:error, :unknown_session} = Sessions.message(server, self(), "sub-neg", %{}) end + + test "rejects oversized NEG payloads" do + server = + start_supervised!( + {Sessions, + name: nil, + max_payload_bytes: 32, + max_sessions_per_owner: 8, + max_total_sessions: 16, + max_idle_seconds: 60, + sweep_interval_seconds: 60} + ) + + assert {:error, :payload_too_large} = + Sessions.open(server, self(), "sub-neg", %{"delta" => String.duplicate("a", 256)}) + end + + test "enforces per-owner session limits" do + server = + start_supervised!( + {Sessions, + name: nil, + max_payload_bytes: 1024, + max_sessions_per_owner: 1, + max_total_sessions: 16, + max_idle_seconds: 60, + sweep_interval_seconds: 60} + ) + + assert {:ok, %{"status" => "open", "cursor" => 0}} = + Sessions.open(server, self(), "sub-1", %{}) + + assert {:error, :owner_session_limit_reached} = + Sessions.open(server, self(), "sub-2", %{}) + end end