websocket: add configurable ping/pong keepalive

This commit is contained in:
2026-03-20 02:32:34 +01:00
parent b22fe98ab0
commit a15856bdac
5 changed files with 265 additions and 23 deletions

View File

@@ -381,6 +381,8 @@ Parrhesia treats NIP-43 invite requests as synthetic relay output, not stored cl
| `:max_event_ingest_per_window` | `PARRHESIA_LIMITS_MAX_EVENT_INGEST_PER_WINDOW` | `120` | | `:max_event_ingest_per_window` | `PARRHESIA_LIMITS_MAX_EVENT_INGEST_PER_WINDOW` | `120` |
| `:event_ingest_window_seconds` | `PARRHESIA_LIMITS_EVENT_INGEST_WINDOW_SECONDS` | `1` | | `:event_ingest_window_seconds` | `PARRHESIA_LIMITS_EVENT_INGEST_WINDOW_SECONDS` | `1` |
| `:auth_max_age_seconds` | `PARRHESIA_LIMITS_AUTH_MAX_AGE_SECONDS` | `600` | | `:auth_max_age_seconds` | `PARRHESIA_LIMITS_AUTH_MAX_AGE_SECONDS` | `600` |
| `:websocket_ping_interval_seconds` | `PARRHESIA_LIMITS_WEBSOCKET_PING_INTERVAL_SECONDS` | `30` |
| `:websocket_pong_timeout_seconds` | `PARRHESIA_LIMITS_WEBSOCKET_PONG_TIMEOUT_SECONDS` | `10` |
| `:max_outbound_queue` | `PARRHESIA_LIMITS_MAX_OUTBOUND_QUEUE` | `256` | | `:max_outbound_queue` | `PARRHESIA_LIMITS_MAX_OUTBOUND_QUEUE` | `256` |
| `:outbound_drain_batch_size` | `PARRHESIA_LIMITS_OUTBOUND_DRAIN_BATCH_SIZE` | `64` | | `:outbound_drain_batch_size` | `PARRHESIA_LIMITS_OUTBOUND_DRAIN_BATCH_SIZE` | `64` |
| `:outbound_overflow_strategy` | `PARRHESIA_LIMITS_OUTBOUND_OVERFLOW_STRATEGY` | `:close` | | `:outbound_overflow_strategy` | `PARRHESIA_LIMITS_OUTBOUND_OVERFLOW_STRATEGY` | `:close` |

View File

@@ -57,6 +57,8 @@ config :parrhesia,
max_event_ingest_per_window: 120, max_event_ingest_per_window: 120,
event_ingest_window_seconds: 1, event_ingest_window_seconds: 1,
auth_max_age_seconds: 600, auth_max_age_seconds: 600,
websocket_ping_interval_seconds: 30,
websocket_pong_timeout_seconds: 10,
max_outbound_queue: 256, max_outbound_queue: 256,
outbound_drain_batch_size: 64, outbound_drain_batch_size: 64,
outbound_overflow_strategy: :close, outbound_overflow_strategy: :close,

View File

@@ -277,6 +277,16 @@ if config_env() == :prod do
"PARRHESIA_LIMITS_AUTH_MAX_AGE_SECONDS", "PARRHESIA_LIMITS_AUTH_MAX_AGE_SECONDS",
Keyword.get(limits_defaults, :auth_max_age_seconds, 600) Keyword.get(limits_defaults, :auth_max_age_seconds, 600)
), ),
websocket_ping_interval_seconds:
int_env.(
"PARRHESIA_LIMITS_WEBSOCKET_PING_INTERVAL_SECONDS",
Keyword.get(limits_defaults, :websocket_ping_interval_seconds, 30)
),
websocket_pong_timeout_seconds:
int_env.(
"PARRHESIA_LIMITS_WEBSOCKET_PONG_TIMEOUT_SECONDS",
Keyword.get(limits_defaults, :websocket_pong_timeout_seconds, 10)
),
max_outbound_queue: max_outbound_queue:
int_env.( int_env.(
"PARRHESIA_LIMITS_MAX_OUTBOUND_QUEUE", "PARRHESIA_LIMITS_MAX_OUTBOUND_QUEUE",

View File

@@ -30,7 +30,11 @@ defmodule Parrhesia.Web.Connection do
@default_event_ingest_rate_limit 120 @default_event_ingest_rate_limit 120
@default_event_ingest_window_seconds 1 @default_event_ingest_window_seconds 1
@default_auth_max_age_seconds 600 @default_auth_max_age_seconds 600
@default_websocket_ping_interval_seconds 30
@default_websocket_pong_timeout_seconds 10
@drain_outbound_queue :drain_outbound_queue @drain_outbound_queue :drain_outbound_queue
@websocket_keepalive_ping :websocket_keepalive_ping
@websocket_keepalive_timeout :websocket_keepalive_timeout
@outbound_queue_pressure_threshold 0.75 @outbound_queue_pressure_threshold 0.75
@marmot_kinds MapSet.new([ @marmot_kinds MapSet.new([
@@ -72,6 +76,10 @@ defmodule Parrhesia.Web.Connection do
event_ingest_window_started_at_ms: 0, event_ingest_window_started_at_ms: 0,
event_ingest_count: 0, event_ingest_count: 0,
auth_max_age_seconds: @default_auth_max_age_seconds, auth_max_age_seconds: @default_auth_max_age_seconds,
websocket_ping_interval_seconds: @default_websocket_ping_interval_seconds,
websocket_pong_timeout_seconds: @default_websocket_pong_timeout_seconds,
websocket_keepalive_timeout_timer_ref: nil,
websocket_awaiting_pong_payload: nil,
track_population?: true track_population?: true
@type overflow_strategy :: :close | :drop_oldest | :drop_newest @type overflow_strategy :: :close | :drop_oldest | :drop_newest
@@ -109,6 +117,10 @@ defmodule Parrhesia.Web.Connection do
event_ingest_window_started_at_ms: integer(), event_ingest_window_started_at_ms: integer(),
event_ingest_count: non_neg_integer(), event_ingest_count: non_neg_integer(),
auth_max_age_seconds: pos_integer(), auth_max_age_seconds: pos_integer(),
websocket_ping_interval_seconds: non_neg_integer(),
websocket_pong_timeout_seconds: pos_integer(),
websocket_keepalive_timeout_timer_ref: reference() | nil,
websocket_awaiting_pong_payload: binary() | nil,
track_population?: boolean() track_population?: boolean()
} }
@@ -117,29 +129,33 @@ defmodule Parrhesia.Web.Connection do
maybe_configure_exit_trapping(opts) maybe_configure_exit_trapping(opts)
auth_challenges = auth_challenges(opts) auth_challenges = auth_challenges(opts)
state = %__MODULE__{ state =
listener: Listener.from_opts(opts), %__MODULE__{
transport_identity: transport_identity(opts), listener: Listener.from_opts(opts),
max_subscriptions_per_connection: max_subscriptions_per_connection(opts), transport_identity: transport_identity(opts),
subscription_index: subscription_index(opts), max_subscriptions_per_connection: max_subscriptions_per_connection(opts),
auth_challenges: auth_challenges, subscription_index: subscription_index(opts),
auth_challenge: maybe_issue_auth_challenge(auth_challenges), auth_challenges: auth_challenges,
relay_url: relay_url(opts), auth_challenge: maybe_issue_auth_challenge(auth_challenges),
remote_ip: remote_ip(opts), relay_url: relay_url(opts),
negentropy_sessions: negentropy_sessions(opts), remote_ip: remote_ip(opts),
max_outbound_queue: max_outbound_queue(opts), negentropy_sessions: negentropy_sessions(opts),
outbound_overflow_strategy: outbound_overflow_strategy(opts), max_outbound_queue: max_outbound_queue(opts),
outbound_drain_batch_size: outbound_drain_batch_size(opts), outbound_overflow_strategy: outbound_overflow_strategy(opts),
max_frame_bytes: max_frame_bytes(opts), outbound_drain_batch_size: outbound_drain_batch_size(opts),
max_event_bytes: max_event_bytes(opts), max_frame_bytes: max_frame_bytes(opts),
event_ingest_limiter: event_ingest_limiter(opts), max_event_bytes: max_event_bytes(opts),
remote_ip_event_ingest_limiter: remote_ip_event_ingest_limiter(opts), event_ingest_limiter: event_ingest_limiter(opts),
max_event_ingest_per_window: max_event_ingest_per_window(opts), remote_ip_event_ingest_limiter: remote_ip_event_ingest_limiter(opts),
event_ingest_window_seconds: event_ingest_window_seconds(opts), max_event_ingest_per_window: max_event_ingest_per_window(opts),
event_ingest_window_started_at_ms: System.monotonic_time(:millisecond), event_ingest_window_seconds: event_ingest_window_seconds(opts),
auth_max_age_seconds: auth_max_age_seconds(opts), event_ingest_window_started_at_ms: System.monotonic_time(:millisecond),
track_population?: track_population?(opts) auth_max_age_seconds: auth_max_age_seconds(opts),
} websocket_ping_interval_seconds: websocket_ping_interval_seconds(opts),
websocket_pong_timeout_seconds: websocket_pong_timeout_seconds(opts),
track_population?: track_population?(opts)
}
|> maybe_schedule_next_websocket_ping()
:ok = maybe_track_connection_open(state) :ok = maybe_track_connection_open(state)
Telemetry.emit_process_mailbox_depth(:connection) Telemetry.emit_process_mailbox_depth(:connection)
@@ -180,6 +196,17 @@ defmodule Parrhesia.Web.Connection do
|> emit_connection_mailbox_depth() |> emit_connection_mailbox_depth()
end end
@impl true
def handle_control({payload, [opcode: :pong]}, %__MODULE__{} = state) when is_binary(payload) do
{:ok, maybe_acknowledge_websocket_pong(state, payload)}
|> emit_connection_mailbox_depth()
end
def handle_control({_payload, [opcode: :ping]}, %__MODULE__{} = state) do
{:ok, state}
|> emit_connection_mailbox_depth()
end
defp handle_decoded_message({:event, event}, state), do: handle_event_ingest(state, event) defp handle_decoded_message({:event, event}, state), do: handle_event_ingest(state, event)
defp handle_decoded_message({:req, subscription_id, filters}, state), defp handle_decoded_message({:req, subscription_id, filters}, state),
@@ -291,6 +318,24 @@ defmodule Parrhesia.Web.Connection do
end end
end end
def handle_info(@websocket_keepalive_ping, %__MODULE__{} = state) do
state
|> maybe_schedule_next_websocket_ping()
|> maybe_send_websocket_keepalive_ping()
|> emit_connection_mailbox_depth()
end
def handle_info({@websocket_keepalive_timeout, payload}, %__MODULE__{} = state)
when is_binary(payload) do
if websocket_keepalive_timeout_payload?(state, payload) do
{:stop, :normal, {1001, "keepalive timeout"}, state}
|> emit_connection_mailbox_depth()
else
{:ok, state}
|> emit_connection_mailbox_depth()
end
end
def handle_info({:EXIT, _from, :shutdown}, %__MODULE__{} = state) do def handle_info({:EXIT, _from, :shutdown}, %__MODULE__{} = state) do
close_with_drained_outbound_frames(state) close_with_drained_outbound_frames(state)
|> emit_connection_mailbox_depth() |> emit_connection_mailbox_depth()
@@ -313,6 +358,7 @@ defmodule Parrhesia.Web.Connection do
:ok = maybe_unsubscribe_all_stream_subscriptions(state) :ok = maybe_unsubscribe_all_stream_subscriptions(state)
:ok = maybe_remove_index_owner(state) :ok = maybe_remove_index_owner(state)
:ok = maybe_clear_auth_challenge(state) :ok = maybe_clear_auth_challenge(state)
:ok = cancel_websocket_keepalive_timers(state)
:ok :ok
end end
@@ -1229,6 +1275,93 @@ defmodule Parrhesia.Web.Connection do
%__MODULE__{state | drain_scheduled?: true} %__MODULE__{state | drain_scheduled?: true}
end end
defp maybe_schedule_next_websocket_ping(
%__MODULE__{websocket_ping_interval_seconds: interval_seconds} = state
)
when interval_seconds <= 0,
do: state
defp maybe_schedule_next_websocket_ping(
%__MODULE__{websocket_ping_interval_seconds: interval_seconds} = state
) do
_timer_ref = Process.send_after(self(), @websocket_keepalive_ping, interval_seconds * 1_000)
state
end
defp maybe_send_websocket_keepalive_ping(
%__MODULE__{websocket_ping_interval_seconds: interval_seconds} = state
)
when interval_seconds <= 0,
do: {:ok, state}
defp maybe_send_websocket_keepalive_ping(
%__MODULE__{websocket_awaiting_pong_payload: awaiting_payload} = state
)
when is_binary(awaiting_payload),
do: {:ok, state}
defp maybe_send_websocket_keepalive_ping(%__MODULE__{} = state) do
payload = Base.encode16(:crypto.strong_rand_bytes(8), case: :lower)
timeout_timer_ref =
Process.send_after(
self(),
{@websocket_keepalive_timeout, payload},
state.websocket_pong_timeout_seconds * 1_000
)
next_state =
%__MODULE__{
state
| websocket_keepalive_timeout_timer_ref: timeout_timer_ref,
websocket_awaiting_pong_payload: payload
}
{:push, {:ping, payload}, next_state}
end
defp websocket_keepalive_timeout_payload?(
%__MODULE__{websocket_awaiting_pong_payload: awaiting_payload},
payload
)
when is_binary(awaiting_payload) and is_binary(payload) do
Plug.Crypto.secure_compare(awaiting_payload, payload)
end
defp websocket_keepalive_timeout_payload?(_state, _payload), do: false
defp maybe_acknowledge_websocket_pong(
%__MODULE__{websocket_awaiting_pong_payload: awaiting_payload} = state,
payload
)
when is_binary(awaiting_payload) and is_binary(payload) do
if Plug.Crypto.secure_compare(awaiting_payload, payload) do
:ok = cancel_timer(state.websocket_keepalive_timeout_timer_ref)
%__MODULE__{
state
| websocket_keepalive_timeout_timer_ref: nil,
websocket_awaiting_pong_payload: nil
}
else
state
end
end
defp maybe_acknowledge_websocket_pong(%__MODULE__{} = state, _payload), do: state
defp cancel_websocket_keepalive_timers(%__MODULE__{} = state) do
:ok = cancel_timer(state.websocket_keepalive_timeout_timer_ref)
:ok
end
defp cancel_timer(timer_ref) when is_reference(timer_ref) do
_ = Process.cancel_timer(timer_ref, async: true, info: false)
:ok
end
defp cancel_timer(_timer_ref), do: :ok
defp emit_outbound_queue_depth(state, metadata \\ %{}) do defp emit_outbound_queue_depth(state, metadata \\ %{}) do
depth = state.outbound_queue_size depth = state.outbound_queue_size
@@ -1769,6 +1902,64 @@ defmodule Parrhesia.Web.Connection do
|> Keyword.get(:auth_max_age_seconds, @default_auth_max_age_seconds) |> Keyword.get(:auth_max_age_seconds, @default_auth_max_age_seconds)
end end
defp websocket_ping_interval_seconds(opts) when is_list(opts) do
opts
|> Keyword.get(:websocket_ping_interval_seconds)
|> normalize_websocket_ping_interval_seconds()
end
defp websocket_ping_interval_seconds(opts) when is_map(opts) do
opts
|> Map.get(:websocket_ping_interval_seconds)
|> normalize_websocket_ping_interval_seconds()
end
defp websocket_ping_interval_seconds(_opts), do: configured_websocket_ping_interval_seconds()
defp normalize_websocket_ping_interval_seconds(value)
when is_integer(value) and value >= 0,
do: value
defp normalize_websocket_ping_interval_seconds(_value),
do: configured_websocket_ping_interval_seconds()
defp configured_websocket_ping_interval_seconds do
case Application.get_env(:parrhesia, :limits, [])
|> Keyword.get(:websocket_ping_interval_seconds) do
value when is_integer(value) and value >= 0 -> value
_other -> @default_websocket_ping_interval_seconds
end
end
defp websocket_pong_timeout_seconds(opts) when is_list(opts) do
opts
|> Keyword.get(:websocket_pong_timeout_seconds)
|> normalize_websocket_pong_timeout_seconds()
end
defp websocket_pong_timeout_seconds(opts) when is_map(opts) do
opts
|> Map.get(:websocket_pong_timeout_seconds)
|> normalize_websocket_pong_timeout_seconds()
end
defp websocket_pong_timeout_seconds(_opts), do: configured_websocket_pong_timeout_seconds()
defp normalize_websocket_pong_timeout_seconds(value)
when is_integer(value) and value > 0,
do: value
defp normalize_websocket_pong_timeout_seconds(_value),
do: configured_websocket_pong_timeout_seconds()
defp configured_websocket_pong_timeout_seconds do
case Application.get_env(:parrhesia, :limits, [])
|> Keyword.get(:websocket_pong_timeout_seconds) do
value when is_integer(value) and value > 0 -> value
_other -> @default_websocket_pong_timeout_seconds
end
end
defp track_population?(opts) when is_list(opts), do: Keyword.get(opts, :track_population?, true) defp track_population?(opts) when is_list(opts), do: Keyword.get(opts, :track_population?, true)
defp track_population?(opts) when is_map(opts), do: Map.get(opts, :track_population?, true) defp track_population?(opts) when is_map(opts), do: Map.get(opts, :track_population?, true)
defp track_population?(_opts), do: true defp track_population?(_opts), do: true

View File

@@ -978,6 +978,41 @@ defmodule Parrhesia.Web.ConnectionTest do
assert JSON.decode!(notice_payload) == ["NOTICE", message] assert JSON.decode!(notice_payload) == ["NOTICE", message]
end end
test "websocket keepalive ping waits for matching pong" do
state =
connection_state(websocket_ping_interval_seconds: 30, websocket_pong_timeout_seconds: 5)
assert {:push, {:ping, payload}, ping_state} =
Connection.handle_info(:websocket_keepalive_ping, state)
assert is_binary(payload)
assert ping_state.websocket_awaiting_pong_payload == payload
assert is_reference(ping_state.websocket_keepalive_timeout_timer_ref)
assert {:ok, acknowledged_state} =
Connection.handle_control({payload, [opcode: :pong]}, ping_state)
assert acknowledged_state.websocket_awaiting_pong_payload == nil
assert acknowledged_state.websocket_keepalive_timeout_timer_ref == nil
end
test "websocket keepalive timeout closes the connection" do
state =
connection_state(websocket_ping_interval_seconds: 30, websocket_pong_timeout_seconds: 5)
assert {:push, {:ping, payload}, ping_state} =
Connection.handle_info(:websocket_keepalive_ping, state)
assert {:stop, :normal, {1001, "keepalive timeout"}, _timeout_state} =
Connection.handle_info({:websocket_keepalive_timeout, payload}, ping_state)
end
test "websocket keepalive can be disabled" do
state = connection_state(websocket_ping_interval_seconds: 0)
assert {:ok, ^state} = Connection.handle_info(:websocket_keepalive_ping, state)
end
defp subscribed_connection_state(opts) do defp subscribed_connection_state(opts) do
state = connection_state(opts) state = connection_state(opts)
req_payload = JSON.encode!(["REQ", "sub-1", %{"kinds" => [1]}]) req_payload = JSON.encode!(["REQ", "sub-1", %{"kinds" => [1]}])
@@ -1004,6 +1039,8 @@ defmodule Parrhesia.Web.ConnectionTest do
|> Keyword.put_new(:subscription_index, nil) |> Keyword.put_new(:subscription_index, nil)
|> Keyword.put_new(:trap_exit?, false) |> Keyword.put_new(:trap_exit?, false)
|> Keyword.put_new(:track_population?, false) |> Keyword.put_new(:track_population?, false)
|> Keyword.put_new(:websocket_ping_interval_seconds, 0)
|> Keyword.put_new(:websocket_pong_timeout_seconds, 10)
{:ok, state} = Connection.init(opts) {:ok, state} = Connection.init(opts)
state state