Harden NEG session handling and gate feature wiring
This commit is contained in:
@@ -17,7 +17,12 @@ config :parrhesia,
|
||||
auth_max_age_seconds: 600,
|
||||
max_outbound_queue: 256,
|
||||
outbound_drain_batch_size: 64,
|
||||
outbound_overflow_strategy: :close
|
||||
outbound_overflow_strategy: :close,
|
||||
max_negentropy_payload_bytes: 4096,
|
||||
max_negentropy_sessions_per_connection: 8,
|
||||
max_negentropy_total_sessions: 10_000,
|
||||
negentropy_session_idle_timeout_seconds: 60,
|
||||
negentropy_session_sweep_interval_seconds: 10
|
||||
],
|
||||
policies: [
|
||||
auth_required_for_writes: false,
|
||||
|
||||
@@ -7,10 +7,17 @@ defmodule Parrhesia.Negentropy.Sessions do
|
||||
|
||||
@type session_key :: {pid(), String.t()}
|
||||
|
||||
@default_max_payload_bytes 4096
|
||||
@default_max_sessions_per_owner 8
|
||||
@default_max_total_sessions 10_000
|
||||
@default_max_idle_seconds 60
|
||||
@default_sweep_interval_seconds 10
|
||||
@sweep_idle_sessions :sweep_idle_sessions
|
||||
|
||||
@spec start_link(keyword()) :: GenServer.on_start()
|
||||
def start_link(opts \\ []) do
|
||||
name = Keyword.get(opts, :name, __MODULE__)
|
||||
GenServer.start_link(__MODULE__, :ok, name: name)
|
||||
GenServer.start_link(__MODULE__, opts, name: name)
|
||||
end
|
||||
|
||||
@spec open(GenServer.server(), pid(), String.t(), map()) :: {:ok, map()} | {:error, term()}
|
||||
@@ -32,26 +39,63 @@ defmodule Parrhesia.Negentropy.Sessions do
|
||||
end
|
||||
|
||||
@impl true
|
||||
def init(:ok) do
|
||||
{:ok, %{sessions: %{}, monitors: %{}}}
|
||||
def init(opts) do
|
||||
max_idle_ms =
|
||||
normalize_positive_integer(Keyword.get(opts, :max_idle_seconds), max_idle_seconds()) * 1000
|
||||
|
||||
sweep_interval_ms =
|
||||
normalize_positive_integer(
|
||||
Keyword.get(opts, :sweep_interval_seconds),
|
||||
sweep_interval_seconds()
|
||||
) *
|
||||
1000
|
||||
|
||||
state = %{
|
||||
sessions: %{},
|
||||
monitors: %{},
|
||||
max_payload_bytes:
|
||||
normalize_positive_integer(Keyword.get(opts, :max_payload_bytes), max_payload_bytes()),
|
||||
max_sessions_per_owner:
|
||||
normalize_positive_integer(
|
||||
Keyword.get(opts, :max_sessions_per_owner),
|
||||
max_sessions_per_owner()
|
||||
),
|
||||
max_total_sessions:
|
||||
normalize_positive_integer(Keyword.get(opts, :max_total_sessions), max_total_sessions()),
|
||||
max_idle_ms: max_idle_ms,
|
||||
sweep_interval_ms: sweep_interval_ms
|
||||
}
|
||||
|
||||
:ok = schedule_idle_sweep(sweep_interval_ms)
|
||||
|
||||
{:ok, state}
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_call({:open, owner_pid, subscription_id, params}, _from, state) do
|
||||
key = {owner_pid, subscription_id}
|
||||
|
||||
session = %{
|
||||
cursor: 0,
|
||||
params: params,
|
||||
opened_at: System.system_time(:second)
|
||||
}
|
||||
with :ok <- validate_payload_size(params, state.max_payload_bytes),
|
||||
:ok <- enforce_session_limits(state, owner_pid, key) do
|
||||
now_ms = System.monotonic_time(:millisecond)
|
||||
|
||||
state =
|
||||
state
|
||||
|> ensure_monitor(owner_pid)
|
||||
|> put_in([:sessions, key], session)
|
||||
session = %{
|
||||
cursor: 0,
|
||||
params: params,
|
||||
opened_at: System.system_time(:second),
|
||||
last_active_at_ms: now_ms
|
||||
}
|
||||
|
||||
{:reply, {:ok, %{"status" => "open", "cursor" => 0}}, state}
|
||||
state =
|
||||
state
|
||||
|> ensure_monitor(owner_pid)
|
||||
|> put_in([:sessions, key], session)
|
||||
|
||||
{:reply, {:ok, %{"status" => "open", "cursor" => 0}}, state}
|
||||
else
|
||||
{:error, reason} ->
|
||||
{:reply, {:error, reason}, state}
|
||||
end
|
||||
end
|
||||
|
||||
def handle_call({:message, owner_pid, subscription_id, payload}, _from, state) do
|
||||
@@ -62,22 +106,68 @@ defmodule Parrhesia.Negentropy.Sessions do
|
||||
{:reply, {:error, :unknown_session}, state}
|
||||
|
||||
session ->
|
||||
cursor = session.cursor + 1
|
||||
case validate_payload_size(payload, state.max_payload_bytes) do
|
||||
:ok ->
|
||||
cursor = session.cursor + 1
|
||||
|
||||
next_session = %{session | cursor: cursor, params: Map.merge(session.params, payload)}
|
||||
state = put_in(state, [:sessions, key], next_session)
|
||||
next_session = %{
|
||||
session
|
||||
| cursor: cursor,
|
||||
last_active_at_ms: System.monotonic_time(:millisecond)
|
||||
}
|
||||
|
||||
{:reply, {:ok, %{"status" => "ack", "cursor" => cursor}}, state}
|
||||
state = put_in(state, [:sessions, key], next_session)
|
||||
|
||||
{:reply, {:ok, %{"status" => "ack", "cursor" => cursor}}, state}
|
||||
|
||||
{:error, reason} ->
|
||||
{:reply, {:error, reason}, state}
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def handle_call({:close, owner_pid, subscription_id}, _from, state) do
|
||||
key = {owner_pid, subscription_id}
|
||||
state = update_in(state.sessions, &Map.delete(&1, key))
|
||||
|
||||
state =
|
||||
state
|
||||
|> update_in([:sessions], &Map.delete(&1, key))
|
||||
|> maybe_remove_monitor_if_owner_has_no_sessions(owner_pid)
|
||||
|
||||
{:reply, :ok, state}
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_info(@sweep_idle_sessions, state) do
|
||||
now_ms = System.monotonic_time(:millisecond)
|
||||
|
||||
sessions =
|
||||
Enum.reduce(state.sessions, %{}, fn {key, session}, acc ->
|
||||
idle_ms = now_ms - Map.get(session, :last_active_at_ms, now_ms)
|
||||
|
||||
if idle_ms >= state.max_idle_ms do
|
||||
acc
|
||||
else
|
||||
Map.put(acc, key, session)
|
||||
end
|
||||
end)
|
||||
|
||||
owner_pids =
|
||||
sessions
|
||||
|> Map.keys()
|
||||
|> Enum.map(fn {owner_pid, _subscription_id} -> owner_pid end)
|
||||
|> MapSet.new()
|
||||
|
||||
state =
|
||||
state
|
||||
|> Map.put(:sessions, sessions)
|
||||
|> clear_monitors_without_sessions(owner_pids)
|
||||
|
||||
:ok = schedule_idle_sweep(state.sweep_interval_ms)
|
||||
|
||||
{:noreply, state}
|
||||
end
|
||||
|
||||
def handle_info({:DOWN, monitor_ref, :process, owner_pid, _reason}, state) do
|
||||
case Map.get(state.monitors, owner_pid) do
|
||||
^monitor_ref ->
|
||||
@@ -95,6 +185,16 @@ defmodule Parrhesia.Negentropy.Sessions do
|
||||
|
||||
def handle_info(_message, state), do: {:noreply, state}
|
||||
|
||||
defp clear_monitors_without_sessions(state, owner_pids) do
|
||||
Enum.reduce(Map.keys(state.monitors), state, fn owner_pid, acc ->
|
||||
if MapSet.member?(owner_pids, owner_pid) do
|
||||
acc
|
||||
else
|
||||
maybe_remove_monitor(acc, owner_pid)
|
||||
end
|
||||
end)
|
||||
end
|
||||
|
||||
defp remove_owner_sessions(state, owner_pid) do
|
||||
update_in(state.sessions, fn sessions ->
|
||||
sessions
|
||||
@@ -103,6 +203,39 @@ defmodule Parrhesia.Negentropy.Sessions do
|
||||
end)
|
||||
end
|
||||
|
||||
defp validate_payload_size(payload, max_payload_bytes) do
|
||||
if :erlang.external_size(payload) <= max_payload_bytes do
|
||||
:ok
|
||||
else
|
||||
{:error, :payload_too_large}
|
||||
end
|
||||
end
|
||||
|
||||
defp enforce_session_limits(state, owner_pid, key) do
|
||||
if Map.has_key?(state.sessions, key) do
|
||||
:ok
|
||||
else
|
||||
total_sessions = map_size(state.sessions)
|
||||
|
||||
cond do
|
||||
total_sessions >= state.max_total_sessions ->
|
||||
{:error, :session_limit_reached}
|
||||
|
||||
owner_session_count(state.sessions, owner_pid) >= state.max_sessions_per_owner ->
|
||||
{:error, :owner_session_limit_reached}
|
||||
|
||||
true ->
|
||||
:ok
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
defp owner_session_count(sessions, owner_pid) do
|
||||
Enum.count(sessions, fn {{session_owner, _subscription_id}, _session} ->
|
||||
session_owner == owner_pid
|
||||
end)
|
||||
end
|
||||
|
||||
defp ensure_monitor(state, owner_pid) do
|
||||
case Map.has_key?(state.monitors, owner_pid) do
|
||||
true -> state
|
||||
@@ -110,6 +243,14 @@ defmodule Parrhesia.Negentropy.Sessions do
|
||||
end
|
||||
end
|
||||
|
||||
defp maybe_remove_monitor_if_owner_has_no_sessions(state, owner_pid) do
|
||||
if owner_session_count(state.sessions, owner_pid) == 0 do
|
||||
maybe_remove_monitor(state, owner_pid)
|
||||
else
|
||||
state
|
||||
end
|
||||
end
|
||||
|
||||
defp maybe_remove_monitor(state, owner_pid) do
|
||||
{monitor_ref, monitors} = Map.pop(state.monitors, owner_pid)
|
||||
|
||||
@@ -119,4 +260,44 @@ defmodule Parrhesia.Negentropy.Sessions do
|
||||
|
||||
Map.put(state, :monitors, monitors)
|
||||
end
|
||||
|
||||
defp schedule_idle_sweep(sweep_interval_ms) do
|
||||
_timer_ref = Process.send_after(self(), @sweep_idle_sessions, sweep_interval_ms)
|
||||
:ok
|
||||
end
|
||||
|
||||
defp max_payload_bytes do
|
||||
:parrhesia
|
||||
|> Application.get_env(:limits, [])
|
||||
|> Keyword.get(:max_negentropy_payload_bytes, @default_max_payload_bytes)
|
||||
end
|
||||
|
||||
defp max_sessions_per_owner do
|
||||
:parrhesia
|
||||
|> Application.get_env(:limits, [])
|
||||
|> Keyword.get(:max_negentropy_sessions_per_connection, @default_max_sessions_per_owner)
|
||||
end
|
||||
|
||||
defp max_total_sessions do
|
||||
:parrhesia
|
||||
|> Application.get_env(:limits, [])
|
||||
|> Keyword.get(:max_negentropy_total_sessions, @default_max_total_sessions)
|
||||
end
|
||||
|
||||
defp max_idle_seconds do
|
||||
:parrhesia
|
||||
|> Application.get_env(:limits, [])
|
||||
|> Keyword.get(:negentropy_session_idle_timeout_seconds, @default_max_idle_seconds)
|
||||
end
|
||||
|
||||
defp sweep_interval_seconds do
|
||||
:parrhesia
|
||||
|> Application.get_env(:limits, [])
|
||||
|> Keyword.get(:negentropy_session_sweep_interval_seconds, @default_sweep_interval_seconds)
|
||||
end
|
||||
|
||||
defp normalize_positive_integer(value, _default) when is_integer(value) and value > 0,
|
||||
do: value
|
||||
|
||||
defp normalize_positive_integer(_value, default), do: default
|
||||
end
|
||||
|
||||
@@ -11,12 +11,26 @@ defmodule Parrhesia.Subscriptions.Supervisor do
|
||||
|
||||
@impl true
|
||||
def init(_init_arg) do
|
||||
children = [
|
||||
{Parrhesia.Subscriptions.Index, name: Parrhesia.Subscriptions.Index},
|
||||
{Parrhesia.Negentropy.Sessions, name: Parrhesia.Negentropy.Sessions},
|
||||
{Parrhesia.Fanout.MultiNode, name: Parrhesia.Fanout.MultiNode}
|
||||
]
|
||||
children =
|
||||
[
|
||||
{Parrhesia.Subscriptions.Index, name: Parrhesia.Subscriptions.Index}
|
||||
] ++
|
||||
negentropy_children() ++ [{Parrhesia.Fanout.MultiNode, name: Parrhesia.Fanout.MultiNode}]
|
||||
|
||||
Supervisor.init(children, strategy: :one_for_one)
|
||||
end
|
||||
|
||||
defp negentropy_children do
|
||||
if negentropy_enabled?() do
|
||||
[{Parrhesia.Negentropy.Sessions, name: Parrhesia.Negentropy.Sessions}]
|
||||
else
|
||||
[]
|
||||
end
|
||||
end
|
||||
|
||||
defp negentropy_enabled? do
|
||||
:parrhesia
|
||||
|> Application.get_env(:features, [])
|
||||
|> Keyword.get(:nip_77_negentropy, true)
|
||||
end
|
||||
end
|
||||
|
||||
@@ -1155,17 +1155,31 @@ defmodule Parrhesia.Web.Connection do
|
||||
|
||||
defp negentropy_sessions(opts) when is_list(opts) do
|
||||
opts
|
||||
|> Keyword.get(:negentropy_sessions, Sessions)
|
||||
|> Keyword.get(:negentropy_sessions, configured_negentropy_sessions())
|
||||
|> normalize_server_ref()
|
||||
end
|
||||
|
||||
defp negentropy_sessions(opts) when is_map(opts) do
|
||||
opts
|
||||
|> Map.get(:negentropy_sessions, Sessions)
|
||||
|> Map.get(:negentropy_sessions, configured_negentropy_sessions())
|
||||
|> normalize_server_ref()
|
||||
end
|
||||
|
||||
defp negentropy_sessions(_opts), do: Sessions
|
||||
defp negentropy_sessions(_opts), do: configured_negentropy_sessions()
|
||||
|
||||
defp configured_negentropy_sessions do
|
||||
if negentropy_enabled?() do
|
||||
Sessions
|
||||
else
|
||||
nil
|
||||
end
|
||||
end
|
||||
|
||||
defp negentropy_enabled? do
|
||||
:parrhesia
|
||||
|> Application.get_env(:features, [])
|
||||
|> Keyword.get(:nip_77_negentropy, true)
|
||||
end
|
||||
|
||||
defp normalize_server_ref(server_ref) when is_pid(server_ref) or is_atom(server_ref),
|
||||
do: server_ref
|
||||
|
||||
@@ -5,10 +5,24 @@ defmodule Parrhesia.Web.Readiness do
|
||||
def ready? do
|
||||
process_ready?(Parrhesia.Subscriptions.Index) and
|
||||
process_ready?(Parrhesia.Auth.Challenges) and
|
||||
process_ready?(Parrhesia.Negentropy.Sessions) and
|
||||
negentropy_ready?() and
|
||||
process_ready?(Parrhesia.Repo)
|
||||
end
|
||||
|
||||
defp negentropy_ready? do
|
||||
if negentropy_enabled?() do
|
||||
process_ready?(Parrhesia.Negentropy.Sessions)
|
||||
else
|
||||
true
|
||||
end
|
||||
end
|
||||
|
||||
defp negentropy_enabled? do
|
||||
:parrhesia
|
||||
|> Application.get_env(:features, [])
|
||||
|> Keyword.get(:nip_77_negentropy, true)
|
||||
end
|
||||
|
||||
defp process_ready?(name) do
|
||||
case Process.whereis(name) do
|
||||
pid when is_pid(pid) -> true
|
||||
|
||||
@@ -17,26 +17,16 @@ defmodule Parrhesia.Web.RelayInfo do
|
||||
end
|
||||
|
||||
defp supported_nips do
|
||||
[
|
||||
1,
|
||||
9,
|
||||
11,
|
||||
13,
|
||||
17,
|
||||
40,
|
||||
42,
|
||||
43,
|
||||
44,
|
||||
45,
|
||||
50,
|
||||
59,
|
||||
62,
|
||||
66,
|
||||
70,
|
||||
77,
|
||||
86,
|
||||
98
|
||||
]
|
||||
base = [1, 9, 11, 13, 17, 40, 42, 43, 44, 45, 50, 59, 62, 66, 70]
|
||||
|
||||
with_negentropy =
|
||||
if negentropy_enabled?() do
|
||||
base ++ [77]
|
||||
else
|
||||
base
|
||||
end
|
||||
|
||||
with_negentropy ++ [86, 98]
|
||||
end
|
||||
|
||||
defp limitations do
|
||||
@@ -48,4 +38,10 @@ defmodule Parrhesia.Web.RelayInfo do
|
||||
"auth_required" => Parrhesia.Config.get([:policies, :auth_required_for_reads], false)
|
||||
}
|
||||
end
|
||||
|
||||
defp negentropy_enabled? do
|
||||
:parrhesia
|
||||
|> Application.get_env(:features, [])
|
||||
|> Keyword.get(:nip_77_negentropy, true)
|
||||
end
|
||||
end
|
||||
|
||||
@@ -18,6 +18,15 @@ defmodule Parrhesia.ApplicationTest do
|
||||
end)
|
||||
|
||||
assert is_pid(Process.whereis(Parrhesia.Auth.Challenges))
|
||||
assert is_pid(Process.whereis(Parrhesia.Negentropy.Sessions))
|
||||
|
||||
if negentropy_enabled?() do
|
||||
assert is_pid(Process.whereis(Parrhesia.Negentropy.Sessions))
|
||||
end
|
||||
end
|
||||
|
||||
defp negentropy_enabled? do
|
||||
:parrhesia
|
||||
|> Application.get_env(:features, [])
|
||||
|> Keyword.get(:nip_77_negentropy, true)
|
||||
end
|
||||
end
|
||||
|
||||
@@ -15,4 +15,39 @@ defmodule Parrhesia.Negentropy.SessionsTest do
|
||||
assert :ok = Sessions.close(server, self(), "sub-neg")
|
||||
assert {:error, :unknown_session} = Sessions.message(server, self(), "sub-neg", %{})
|
||||
end
|
||||
|
||||
test "rejects oversized NEG payloads" do
|
||||
server =
|
||||
start_supervised!(
|
||||
{Sessions,
|
||||
name: nil,
|
||||
max_payload_bytes: 32,
|
||||
max_sessions_per_owner: 8,
|
||||
max_total_sessions: 16,
|
||||
max_idle_seconds: 60,
|
||||
sweep_interval_seconds: 60}
|
||||
)
|
||||
|
||||
assert {:error, :payload_too_large} =
|
||||
Sessions.open(server, self(), "sub-neg", %{"delta" => String.duplicate("a", 256)})
|
||||
end
|
||||
|
||||
test "enforces per-owner session limits" do
|
||||
server =
|
||||
start_supervised!(
|
||||
{Sessions,
|
||||
name: nil,
|
||||
max_payload_bytes: 1024,
|
||||
max_sessions_per_owner: 1,
|
||||
max_total_sessions: 16,
|
||||
max_idle_seconds: 60,
|
||||
sweep_interval_seconds: 60}
|
||||
)
|
||||
|
||||
assert {:ok, %{"status" => "open", "cursor" => 0}} =
|
||||
Sessions.open(server, self(), "sub-1", %{})
|
||||
|
||||
assert {:error, :owner_session_limit_reached} =
|
||||
Sessions.open(server, self(), "sub-2", %{})
|
||||
end
|
||||
end
|
||||
|
||||
Reference in New Issue
Block a user