diff --git a/README.md b/README.md index 1edd20d..66748d2 100644 --- a/README.md +++ b/README.md @@ -381,6 +381,8 @@ Parrhesia treats NIP-43 invite requests as synthetic relay output, not stored cl | `:max_event_ingest_per_window` | `PARRHESIA_LIMITS_MAX_EVENT_INGEST_PER_WINDOW` | `120` | | `:event_ingest_window_seconds` | `PARRHESIA_LIMITS_EVENT_INGEST_WINDOW_SECONDS` | `1` | | `:auth_max_age_seconds` | `PARRHESIA_LIMITS_AUTH_MAX_AGE_SECONDS` | `600` | +| `:websocket_ping_interval_seconds` | `PARRHESIA_LIMITS_WEBSOCKET_PING_INTERVAL_SECONDS` | `30` | +| `:websocket_pong_timeout_seconds` | `PARRHESIA_LIMITS_WEBSOCKET_PONG_TIMEOUT_SECONDS` | `10` | | `:max_outbound_queue` | `PARRHESIA_LIMITS_MAX_OUTBOUND_QUEUE` | `256` | | `:outbound_drain_batch_size` | `PARRHESIA_LIMITS_OUTBOUND_DRAIN_BATCH_SIZE` | `64` | | `:outbound_overflow_strategy` | `PARRHESIA_LIMITS_OUTBOUND_OVERFLOW_STRATEGY` | `:close` | diff --git a/config/config.exs b/config/config.exs index 6aa99df..b37d7e3 100644 --- a/config/config.exs +++ b/config/config.exs @@ -57,6 +57,8 @@ config :parrhesia, max_event_ingest_per_window: 120, event_ingest_window_seconds: 1, auth_max_age_seconds: 600, + websocket_ping_interval_seconds: 30, + websocket_pong_timeout_seconds: 10, max_outbound_queue: 256, outbound_drain_batch_size: 64, outbound_overflow_strategy: :close, diff --git a/config/runtime.exs b/config/runtime.exs index f099c15..fb8200f 100644 --- a/config/runtime.exs +++ b/config/runtime.exs @@ -277,6 +277,16 @@ if config_env() == :prod do "PARRHESIA_LIMITS_AUTH_MAX_AGE_SECONDS", Keyword.get(limits_defaults, :auth_max_age_seconds, 600) ), + websocket_ping_interval_seconds: + int_env.( + "PARRHESIA_LIMITS_WEBSOCKET_PING_INTERVAL_SECONDS", + Keyword.get(limits_defaults, :websocket_ping_interval_seconds, 30) + ), + websocket_pong_timeout_seconds: + int_env.( + "PARRHESIA_LIMITS_WEBSOCKET_PONG_TIMEOUT_SECONDS", + Keyword.get(limits_defaults, :websocket_pong_timeout_seconds, 10) + ), max_outbound_queue: int_env.( "PARRHESIA_LIMITS_MAX_OUTBOUND_QUEUE", diff --git a/lib/parrhesia/web/connection.ex b/lib/parrhesia/web/connection.ex index 3f0abff..5997a82 100644 --- a/lib/parrhesia/web/connection.ex +++ b/lib/parrhesia/web/connection.ex @@ -30,7 +30,11 @@ defmodule Parrhesia.Web.Connection do @default_event_ingest_rate_limit 120 @default_event_ingest_window_seconds 1 @default_auth_max_age_seconds 600 + @default_websocket_ping_interval_seconds 30 + @default_websocket_pong_timeout_seconds 10 @drain_outbound_queue :drain_outbound_queue + @websocket_keepalive_ping :websocket_keepalive_ping + @websocket_keepalive_timeout :websocket_keepalive_timeout @outbound_queue_pressure_threshold 0.75 @marmot_kinds MapSet.new([ @@ -72,6 +76,10 @@ defmodule Parrhesia.Web.Connection do event_ingest_window_started_at_ms: 0, event_ingest_count: 0, auth_max_age_seconds: @default_auth_max_age_seconds, + websocket_ping_interval_seconds: @default_websocket_ping_interval_seconds, + websocket_pong_timeout_seconds: @default_websocket_pong_timeout_seconds, + websocket_keepalive_timeout_timer_ref: nil, + websocket_awaiting_pong_payload: nil, track_population?: true @type overflow_strategy :: :close | :drop_oldest | :drop_newest @@ -109,6 +117,10 @@ defmodule Parrhesia.Web.Connection do event_ingest_window_started_at_ms: integer(), event_ingest_count: non_neg_integer(), auth_max_age_seconds: pos_integer(), + websocket_ping_interval_seconds: non_neg_integer(), + websocket_pong_timeout_seconds: pos_integer(), + websocket_keepalive_timeout_timer_ref: reference() | nil, + websocket_awaiting_pong_payload: binary() | nil, track_population?: boolean() } @@ -117,29 +129,33 @@ defmodule Parrhesia.Web.Connection do maybe_configure_exit_trapping(opts) auth_challenges = auth_challenges(opts) - state = %__MODULE__{ - listener: Listener.from_opts(opts), - transport_identity: transport_identity(opts), - max_subscriptions_per_connection: max_subscriptions_per_connection(opts), - subscription_index: subscription_index(opts), - auth_challenges: auth_challenges, - auth_challenge: maybe_issue_auth_challenge(auth_challenges), - relay_url: relay_url(opts), - remote_ip: remote_ip(opts), - negentropy_sessions: negentropy_sessions(opts), - max_outbound_queue: max_outbound_queue(opts), - outbound_overflow_strategy: outbound_overflow_strategy(opts), - outbound_drain_batch_size: outbound_drain_batch_size(opts), - max_frame_bytes: max_frame_bytes(opts), - max_event_bytes: max_event_bytes(opts), - event_ingest_limiter: event_ingest_limiter(opts), - remote_ip_event_ingest_limiter: remote_ip_event_ingest_limiter(opts), - max_event_ingest_per_window: max_event_ingest_per_window(opts), - event_ingest_window_seconds: event_ingest_window_seconds(opts), - event_ingest_window_started_at_ms: System.monotonic_time(:millisecond), - auth_max_age_seconds: auth_max_age_seconds(opts), - track_population?: track_population?(opts) - } + state = + %__MODULE__{ + listener: Listener.from_opts(opts), + transport_identity: transport_identity(opts), + max_subscriptions_per_connection: max_subscriptions_per_connection(opts), + subscription_index: subscription_index(opts), + auth_challenges: auth_challenges, + auth_challenge: maybe_issue_auth_challenge(auth_challenges), + relay_url: relay_url(opts), + remote_ip: remote_ip(opts), + negentropy_sessions: negentropy_sessions(opts), + max_outbound_queue: max_outbound_queue(opts), + outbound_overflow_strategy: outbound_overflow_strategy(opts), + outbound_drain_batch_size: outbound_drain_batch_size(opts), + max_frame_bytes: max_frame_bytes(opts), + max_event_bytes: max_event_bytes(opts), + event_ingest_limiter: event_ingest_limiter(opts), + remote_ip_event_ingest_limiter: remote_ip_event_ingest_limiter(opts), + max_event_ingest_per_window: max_event_ingest_per_window(opts), + event_ingest_window_seconds: event_ingest_window_seconds(opts), + event_ingest_window_started_at_ms: System.monotonic_time(:millisecond), + auth_max_age_seconds: auth_max_age_seconds(opts), + websocket_ping_interval_seconds: websocket_ping_interval_seconds(opts), + websocket_pong_timeout_seconds: websocket_pong_timeout_seconds(opts), + track_population?: track_population?(opts) + } + |> maybe_schedule_next_websocket_ping() :ok = maybe_track_connection_open(state) Telemetry.emit_process_mailbox_depth(:connection) @@ -180,6 +196,17 @@ defmodule Parrhesia.Web.Connection do |> emit_connection_mailbox_depth() end + @impl true + def handle_control({payload, [opcode: :pong]}, %__MODULE__{} = state) when is_binary(payload) do + {:ok, maybe_acknowledge_websocket_pong(state, payload)} + |> emit_connection_mailbox_depth() + end + + def handle_control({_payload, [opcode: :ping]}, %__MODULE__{} = state) do + {:ok, state} + |> emit_connection_mailbox_depth() + end + defp handle_decoded_message({:event, event}, state), do: handle_event_ingest(state, event) defp handle_decoded_message({:req, subscription_id, filters}, state), @@ -291,6 +318,24 @@ defmodule Parrhesia.Web.Connection do end end + def handle_info(@websocket_keepalive_ping, %__MODULE__{} = state) do + state + |> maybe_schedule_next_websocket_ping() + |> maybe_send_websocket_keepalive_ping() + |> emit_connection_mailbox_depth() + end + + def handle_info({@websocket_keepalive_timeout, payload}, %__MODULE__{} = state) + when is_binary(payload) do + if websocket_keepalive_timeout_payload?(state, payload) do + {:stop, :normal, {1001, "keepalive timeout"}, state} + |> emit_connection_mailbox_depth() + else + {:ok, state} + |> emit_connection_mailbox_depth() + end + end + def handle_info({:EXIT, _from, :shutdown}, %__MODULE__{} = state) do close_with_drained_outbound_frames(state) |> emit_connection_mailbox_depth() @@ -313,6 +358,7 @@ defmodule Parrhesia.Web.Connection do :ok = maybe_unsubscribe_all_stream_subscriptions(state) :ok = maybe_remove_index_owner(state) :ok = maybe_clear_auth_challenge(state) + :ok = cancel_websocket_keepalive_timers(state) :ok end @@ -1229,6 +1275,93 @@ defmodule Parrhesia.Web.Connection do %__MODULE__{state | drain_scheduled?: true} end + defp maybe_schedule_next_websocket_ping( + %__MODULE__{websocket_ping_interval_seconds: interval_seconds} = state + ) + when interval_seconds <= 0, + do: state + + defp maybe_schedule_next_websocket_ping( + %__MODULE__{websocket_ping_interval_seconds: interval_seconds} = state + ) do + _timer_ref = Process.send_after(self(), @websocket_keepalive_ping, interval_seconds * 1_000) + state + end + + defp maybe_send_websocket_keepalive_ping( + %__MODULE__{websocket_ping_interval_seconds: interval_seconds} = state + ) + when interval_seconds <= 0, + do: {:ok, state} + + defp maybe_send_websocket_keepalive_ping( + %__MODULE__{websocket_awaiting_pong_payload: awaiting_payload} = state + ) + when is_binary(awaiting_payload), + do: {:ok, state} + + defp maybe_send_websocket_keepalive_ping(%__MODULE__{} = state) do + payload = Base.encode16(:crypto.strong_rand_bytes(8), case: :lower) + + timeout_timer_ref = + Process.send_after( + self(), + {@websocket_keepalive_timeout, payload}, + state.websocket_pong_timeout_seconds * 1_000 + ) + + next_state = + %__MODULE__{ + state + | websocket_keepalive_timeout_timer_ref: timeout_timer_ref, + websocket_awaiting_pong_payload: payload + } + + {:push, {:ping, payload}, next_state} + end + + defp websocket_keepalive_timeout_payload?( + %__MODULE__{websocket_awaiting_pong_payload: awaiting_payload}, + payload + ) + when is_binary(awaiting_payload) and is_binary(payload) do + Plug.Crypto.secure_compare(awaiting_payload, payload) + end + + defp websocket_keepalive_timeout_payload?(_state, _payload), do: false + + defp maybe_acknowledge_websocket_pong( + %__MODULE__{websocket_awaiting_pong_payload: awaiting_payload} = state, + payload + ) + when is_binary(awaiting_payload) and is_binary(payload) do + if Plug.Crypto.secure_compare(awaiting_payload, payload) do + :ok = cancel_timer(state.websocket_keepalive_timeout_timer_ref) + + %__MODULE__{ + state + | websocket_keepalive_timeout_timer_ref: nil, + websocket_awaiting_pong_payload: nil + } + else + state + end + end + + defp maybe_acknowledge_websocket_pong(%__MODULE__{} = state, _payload), do: state + + defp cancel_websocket_keepalive_timers(%__MODULE__{} = state) do + :ok = cancel_timer(state.websocket_keepalive_timeout_timer_ref) + :ok + end + + defp cancel_timer(timer_ref) when is_reference(timer_ref) do + _ = Process.cancel_timer(timer_ref, async: true, info: false) + :ok + end + + defp cancel_timer(_timer_ref), do: :ok + defp emit_outbound_queue_depth(state, metadata \\ %{}) do depth = state.outbound_queue_size @@ -1769,6 +1902,64 @@ defmodule Parrhesia.Web.Connection do |> Keyword.get(:auth_max_age_seconds, @default_auth_max_age_seconds) end + defp websocket_ping_interval_seconds(opts) when is_list(opts) do + opts + |> Keyword.get(:websocket_ping_interval_seconds) + |> normalize_websocket_ping_interval_seconds() + end + + defp websocket_ping_interval_seconds(opts) when is_map(opts) do + opts + |> Map.get(:websocket_ping_interval_seconds) + |> normalize_websocket_ping_interval_seconds() + end + + defp websocket_ping_interval_seconds(_opts), do: configured_websocket_ping_interval_seconds() + + defp normalize_websocket_ping_interval_seconds(value) + when is_integer(value) and value >= 0, + do: value + + defp normalize_websocket_ping_interval_seconds(_value), + do: configured_websocket_ping_interval_seconds() + + defp configured_websocket_ping_interval_seconds do + case Application.get_env(:parrhesia, :limits, []) + |> Keyword.get(:websocket_ping_interval_seconds) do + value when is_integer(value) and value >= 0 -> value + _other -> @default_websocket_ping_interval_seconds + end + end + + defp websocket_pong_timeout_seconds(opts) when is_list(opts) do + opts + |> Keyword.get(:websocket_pong_timeout_seconds) + |> normalize_websocket_pong_timeout_seconds() + end + + defp websocket_pong_timeout_seconds(opts) when is_map(opts) do + opts + |> Map.get(:websocket_pong_timeout_seconds) + |> normalize_websocket_pong_timeout_seconds() + end + + defp websocket_pong_timeout_seconds(_opts), do: configured_websocket_pong_timeout_seconds() + + defp normalize_websocket_pong_timeout_seconds(value) + when is_integer(value) and value > 0, + do: value + + defp normalize_websocket_pong_timeout_seconds(_value), + do: configured_websocket_pong_timeout_seconds() + + defp configured_websocket_pong_timeout_seconds do + case Application.get_env(:parrhesia, :limits, []) + |> Keyword.get(:websocket_pong_timeout_seconds) do + value when is_integer(value) and value > 0 -> value + _other -> @default_websocket_pong_timeout_seconds + end + end + defp track_population?(opts) when is_list(opts), do: Keyword.get(opts, :track_population?, true) defp track_population?(opts) when is_map(opts), do: Map.get(opts, :track_population?, true) defp track_population?(_opts), do: true diff --git a/test/parrhesia/web/connection_test.exs b/test/parrhesia/web/connection_test.exs index b050321..4f94ac0 100644 --- a/test/parrhesia/web/connection_test.exs +++ b/test/parrhesia/web/connection_test.exs @@ -978,6 +978,41 @@ defmodule Parrhesia.Web.ConnectionTest do assert JSON.decode!(notice_payload) == ["NOTICE", message] end + test "websocket keepalive ping waits for matching pong" do + state = + connection_state(websocket_ping_interval_seconds: 30, websocket_pong_timeout_seconds: 5) + + assert {:push, {:ping, payload}, ping_state} = + Connection.handle_info(:websocket_keepalive_ping, state) + + assert is_binary(payload) + assert ping_state.websocket_awaiting_pong_payload == payload + assert is_reference(ping_state.websocket_keepalive_timeout_timer_ref) + + assert {:ok, acknowledged_state} = + Connection.handle_control({payload, [opcode: :pong]}, ping_state) + + assert acknowledged_state.websocket_awaiting_pong_payload == nil + assert acknowledged_state.websocket_keepalive_timeout_timer_ref == nil + end + + test "websocket keepalive timeout closes the connection" do + state = + connection_state(websocket_ping_interval_seconds: 30, websocket_pong_timeout_seconds: 5) + + assert {:push, {:ping, payload}, ping_state} = + Connection.handle_info(:websocket_keepalive_ping, state) + + assert {:stop, :normal, {1001, "keepalive timeout"}, _timeout_state} = + Connection.handle_info({:websocket_keepalive_timeout, payload}, ping_state) + end + + test "websocket keepalive can be disabled" do + state = connection_state(websocket_ping_interval_seconds: 0) + + assert {:ok, ^state} = Connection.handle_info(:websocket_keepalive_ping, state) + end + defp subscribed_connection_state(opts) do state = connection_state(opts) req_payload = JSON.encode!(["REQ", "sub-1", %{"kinds" => [1]}]) @@ -1004,6 +1039,8 @@ defmodule Parrhesia.Web.ConnectionTest do |> Keyword.put_new(:subscription_index, nil) |> Keyword.put_new(:trap_exit?, false) |> Keyword.put_new(:track_population?, false) + |> Keyword.put_new(:websocket_ping_interval_seconds, 0) + |> Keyword.put_new(:websocket_pong_timeout_seconds, 10) {:ok, state} = Connection.init(opts) state