Expand relay metrics and observability

This commit is contained in:
2026-03-18 17:39:13 +01:00
parent c377ed4b62
commit c30449b318
16 changed files with 663 additions and 43 deletions

View File

@@ -35,6 +35,7 @@ defmodule Parrhesia.API.Events do
def publish(event, opts) when is_map(event) and is_list(opts) do
started_at = System.monotonic_time()
event_id = Map.get(event, "id", "")
telemetry_metadata = telemetry_metadata_for_event(event)
with {:ok, context} <- fetch_context(opts),
:ok <- validate_event_payload_size(event, max_event_bytes(opts)),
@@ -45,9 +46,11 @@ defmodule Parrhesia.API.Events do
Telemetry.emit(
[:parrhesia, :ingest, :stop],
%{duration: System.monotonic_time() - started_at},
telemetry_metadata_for_event(event)
telemetry_metadata
)
emit_ingest_result(telemetry_metadata, :accepted, :accepted)
message =
case NIP43.finalize_publish(event, publish_state, nip43_opts(opts, context)) do
{:ok, override} when is_binary(override) -> override
@@ -66,9 +69,12 @@ defmodule Parrhesia.API.Events do
}}
else
{:error, :invalid_context} = error ->
emit_ingest_result(telemetry_metadata, :rejected, :invalid_context)
error
{:error, reason} ->
emit_ingest_result(telemetry_metadata, :rejected, reason)
{:ok,
%PublishResult{
event_id: event_id,
@@ -86,6 +92,7 @@ defmodule Parrhesia.API.Events do
def query(filters, opts) when is_list(filters) and is_list(opts) do
started_at = System.monotonic_time()
telemetry_metadata = telemetry_metadata_for_filters(filters, :query)
with {:ok, context} <- fetch_context(opts),
:ok <- maybe_validate_filters(filters, opts),
@@ -95,11 +102,17 @@ defmodule Parrhesia.API.Events do
Telemetry.emit(
[:parrhesia, :query, :stop],
%{duration: System.monotonic_time() - started_at},
telemetry_metadata_for_filters(filters)
%{duration: System.monotonic_time() - started_at, result_count: length(events)},
telemetry_metadata
)
emit_query_result(telemetry_metadata, :ok)
{:ok, events}
else
{:error, reason} = error ->
emit_query_result(telemetry_metadata, :error, reason)
error
end
end
@@ -110,6 +123,7 @@ defmodule Parrhesia.API.Events do
def count(filters, opts) when is_list(filters) and is_list(opts) do
started_at = System.monotonic_time()
telemetry_metadata = telemetry_metadata_for_filters(filters, :count)
with {:ok, context} <- fetch_context(opts),
:ok <- maybe_validate_filters(filters, opts),
@@ -120,11 +134,17 @@ defmodule Parrhesia.API.Events do
{:ok, result} <- maybe_build_count_result(filters, count, Keyword.get(opts, :options)) do
Telemetry.emit(
[:parrhesia, :query, :stop],
%{duration: System.monotonic_time() - started_at},
telemetry_metadata_for_filters(filters)
%{duration: System.monotonic_time() - started_at, result_count: count},
telemetry_metadata
)
emit_query_result(telemetry_metadata, :ok)
{:ok, result}
else
{:error, reason} = error ->
emit_query_result(telemetry_metadata, :error, reason)
error
end
end
@@ -242,8 +262,8 @@ defmodule Parrhesia.API.Events do
%{traffic_class: traffic_class_for_event(event)}
end
defp telemetry_metadata_for_filters(filters) do
%{traffic_class: traffic_class_for_filters(filters)}
defp telemetry_metadata_for_filters(filters, operation) do
%{traffic_class: traffic_class_for_filters(filters), operation: operation}
end
defp traffic_class_for_filters(filters) do
@@ -276,6 +296,30 @@ defmodule Parrhesia.API.Events do
defp traffic_class_for_event(_event), do: :generic
defp emit_ingest_result(metadata, outcome, reason) do
Telemetry.emit(
[:parrhesia, :ingest, :result],
%{count: 1},
Map.merge(metadata, %{outcome: outcome, reason: normalize_reason(reason)})
)
end
defp emit_query_result(metadata, outcome, reason \\ nil) do
Telemetry.emit(
[:parrhesia, :query, :result],
%{count: 1},
Map.merge(
metadata,
%{outcome: outcome, reason: normalize_reason(reason || outcome)}
)
)
end
defp normalize_reason(reason) when is_atom(reason), do: reason
defp normalize_reason(reason) when is_binary(reason), do: reason
defp normalize_reason(nil), do: :none
defp normalize_reason(_reason), do: :unknown
defp fetch_context(opts) do
case Keyword.get(opts, :context) do
%RequestContext{} = context -> {:ok, context}

View File

@@ -0,0 +1,84 @@
defmodule Parrhesia.ConnectionStats do
@moduledoc false
use GenServer
alias Parrhesia.Telemetry
defstruct connections: %{}, subscriptions: %{}
@type state :: %__MODULE__{
connections: %{(atom() | String.t()) => non_neg_integer()},
subscriptions: %{(atom() | String.t()) => non_neg_integer()}
}
@spec start_link(keyword()) :: GenServer.on_start()
def start_link(opts \\ []) do
name = Keyword.get(opts, :name, __MODULE__)
GenServer.start_link(__MODULE__, %__MODULE__{}, name: name)
end
@spec connection_open(atom() | String.t()) :: :ok
def connection_open(listener_id), do: cast({:connection_open, listener_id})
@spec connection_close(atom() | String.t()) :: :ok
def connection_close(listener_id), do: cast({:connection_close, listener_id})
@spec subscriptions_change(atom() | String.t(), integer()) :: :ok
def subscriptions_change(listener_id, delta) when is_integer(delta) do
cast({:subscriptions_change, listener_id, delta})
end
@impl true
def init(%__MODULE__{} = state), do: {:ok, state}
@impl true
def handle_cast({:connection_open, listener_id}, %__MODULE__{} = state) do
listener_id = normalize_listener_id(listener_id)
next_state = %{state | connections: increment(state.connections, listener_id, 1)}
emit_population(listener_id, next_state)
{:noreply, next_state}
end
def handle_cast({:connection_close, listener_id}, %__MODULE__{} = state) do
listener_id = normalize_listener_id(listener_id)
next_state = %{state | connections: increment(state.connections, listener_id, -1)}
emit_population(listener_id, next_state)
{:noreply, next_state}
end
def handle_cast({:subscriptions_change, listener_id, delta}, %__MODULE__{} = state) do
listener_id = normalize_listener_id(listener_id)
next_state = %{state | subscriptions: increment(state.subscriptions, listener_id, delta)}
emit_population(listener_id, next_state)
{:noreply, next_state}
end
defp cast(message) do
GenServer.cast(__MODULE__, message)
:ok
catch
:exit, {:noproc, _details} -> :ok
:exit, {:normal, _details} -> :ok
end
defp increment(counts, key, delta) do
current = Map.get(counts, key, 0)
Map.put(counts, key, max(current + delta, 0))
end
defp emit_population(listener_id, %__MODULE__{} = state) do
Telemetry.emit(
[:parrhesia, :listener, :population],
%{
connections: Map.get(state.connections, listener_id, 0),
subscriptions: Map.get(state.subscriptions, listener_id, 0)
},
%{listener_id: listener_id}
)
end
defp normalize_listener_id(listener_id) when is_atom(listener_id), do: listener_id
defp normalize_listener_id(listener_id) when is_binary(listener_id), do: listener_id
defp normalize_listener_id(_listener_id), do: :unknown
end

View File

@@ -16,6 +16,7 @@ defmodule Parrhesia.Runtime do
def children do
[
Parrhesia.Telemetry,
Parrhesia.ConnectionStats,
Parrhesia.Config,
Parrhesia.Web.EventIngestLimiter,
Parrhesia.Web.IPEventIngestLimiter,

View File

@@ -30,10 +30,19 @@ defmodule Parrhesia.Tasks.ExpirationWorker do
def handle_info(:tick, state) do
started_at = System.monotonic_time()
_result = Storage.events().purge_expired([])
purged_events =
case Storage.events().purge_expired([]) do
{:ok, count} when is_integer(count) and count >= 0 -> count
_other -> 0
end
duration = System.monotonic_time() - started_at
Telemetry.emit([:parrhesia, :maintenance, :purge_expired, :stop], %{duration: duration}, %{})
Telemetry.emit(
[:parrhesia, :maintenance, :purge_expired, :stop],
%{duration: duration, purged_events: purged_events},
%{}
)
schedule_tick(state.interval_ms)
{:noreply, state}

View File

@@ -7,6 +7,7 @@ defmodule Parrhesia.Telemetry do
import Telemetry.Metrics
@repo_query_handler_id "parrhesia-repo-query-handler"
@prometheus_reporter __MODULE__.Prometheus
@spec start_link(keyword()) :: Supervisor.on_start()
@@ -16,6 +17,8 @@ defmodule Parrhesia.Telemetry do
@impl true
def init(_init_arg) do
:ok = attach_repo_query_handlers()
children = [
{TelemetryMetricsPrometheus.Core, name: @prometheus_reporter, metrics: metrics()},
{:telemetry_poller, measurements: periodic_measurements(), period: 10_000}
@@ -30,6 +33,12 @@ defmodule Parrhesia.Telemetry do
@spec metrics() :: [Telemetry.Metrics.t()]
def metrics do
[
counter("parrhesia.ingest.events.count",
event_name: [:parrhesia, :ingest, :result],
measurement: :count,
tags: [:traffic_class, :outcome, :reason],
tag_values: &ingest_result_tag_values/1
),
distribution("parrhesia.ingest.duration.ms",
event_name: [:parrhesia, :ingest, :stop],
measurement: :duration,
@@ -38,14 +47,27 @@ defmodule Parrhesia.Telemetry do
tag_values: &traffic_class_tag_values/1,
reporter_options: [buckets: [1, 5, 10, 25, 50, 100, 250, 500, 1000]]
),
counter("parrhesia.query.requests.count",
event_name: [:parrhesia, :query, :result],
measurement: :count,
tags: [:traffic_class, :operation, :outcome],
tag_values: &query_result_tag_values/1
),
distribution("parrhesia.query.duration.ms",
event_name: [:parrhesia, :query, :stop],
measurement: :duration,
unit: {:native, :millisecond},
tags: [:traffic_class],
tag_values: &traffic_class_tag_values/1,
tags: [:traffic_class, :operation],
tag_values: &query_stop_tag_values/1,
reporter_options: [buckets: [1, 5, 10, 25, 50, 100, 250, 500, 1000]]
),
distribution("parrhesia.query.results.count",
event_name: [:parrhesia, :query, :stop],
measurement: :result_count,
tags: [:traffic_class, :operation],
tag_values: &query_stop_tag_values/1,
reporter_options: [buckets: [0, 1, 5, 10, 25, 50, 100, 250, 500, 1000, 5000]]
),
distribution("parrhesia.fanout.duration.ms",
event_name: [:parrhesia, :fanout, :stop],
measurement: :duration,
@@ -54,6 +76,25 @@ defmodule Parrhesia.Telemetry do
tag_values: &traffic_class_tag_values/1,
reporter_options: [buckets: [1, 5, 10, 25, 50, 100, 250, 500, 1000]]
),
counter("parrhesia.fanout.events_considered.count",
event_name: [:parrhesia, :fanout, :stop],
measurement: :considered,
tags: [:traffic_class],
tag_values: &traffic_class_tag_values/1
),
counter("parrhesia.fanout.events_enqueued.count",
event_name: [:parrhesia, :fanout, :stop],
measurement: :enqueued,
tags: [:traffic_class],
tag_values: &traffic_class_tag_values/1
),
distribution("parrhesia.fanout.batch_size",
event_name: [:parrhesia, :fanout, :stop],
measurement: :enqueued,
tags: [:traffic_class],
tag_values: &traffic_class_tag_values/1,
reporter_options: [buckets: [0, 1, 5, 10, 25, 50, 100, 250, 500, 1000]]
),
last_value("parrhesia.connection.outbound_queue.depth",
event_name: [:parrhesia, :connection, :outbound_queue],
measurement: :depth,
@@ -80,6 +121,41 @@ defmodule Parrhesia.Telemetry do
tags: [:traffic_class],
tag_values: &traffic_class_tag_values/1
),
counter("parrhesia.connection.outbound_queue.drained_frames.count",
event_name: [:parrhesia, :connection, :outbound_queue, :drain],
measurement: :count
),
distribution("parrhesia.connection.outbound_queue.drain_batch_size",
event_name: [:parrhesia, :connection, :outbound_queue, :drain],
measurement: :count,
reporter_options: [buckets: [0, 1, 5, 10, 25, 50, 100, 250]]
),
counter("parrhesia.connection.outbound_queue.dropped_events.count",
event_name: [:parrhesia, :connection, :outbound_queue, :drop],
measurement: :count,
tags: [:strategy],
tag_values: &strategy_tag_values/1
),
last_value("parrhesia.listener.connections.active",
event_name: [:parrhesia, :listener, :population],
measurement: :connections,
tags: [:listener_id],
tag_values: &listener_tag_values/1,
reporter_options: [prometheus_type: :gauge]
),
last_value("parrhesia.listener.subscriptions.active",
event_name: [:parrhesia, :listener, :population],
measurement: :subscriptions,
tags: [:listener_id],
tag_values: &listener_tag_values/1,
reporter_options: [prometheus_type: :gauge]
),
counter("parrhesia.rate_limit.hits.count",
event_name: [:parrhesia, :rate_limit, :hit],
measurement: :count,
tags: [:scope, :traffic_class],
tag_values: &rate_limit_tag_values/1
),
last_value("parrhesia.process.mailbox.depth",
event_name: [:parrhesia, :process, :mailbox],
measurement: :depth,
@@ -87,11 +163,111 @@ defmodule Parrhesia.Telemetry do
tag_values: &process_tag_values/1,
reporter_options: [prometheus_type: :gauge]
),
counter("parrhesia.db.query.count",
event_name: [:parrhesia, :db, :query],
measurement: :count,
tags: [:repo_role],
tag_values: &repo_query_tag_values/1
),
distribution("parrhesia.db.query.total_time.ms",
event_name: [:parrhesia, :db, :query],
measurement: :total_time,
unit: {:native, :millisecond},
tags: [:repo_role],
tag_values: &repo_query_tag_values/1,
reporter_options: [buckets: [1, 5, 10, 25, 50, 100, 250, 500, 1000]]
),
distribution("parrhesia.db.query.queue_time.ms",
event_name: [:parrhesia, :db, :query],
measurement: :queue_time,
unit: {:native, :millisecond},
tags: [:repo_role],
tag_values: &repo_query_tag_values/1,
reporter_options: [buckets: [0, 1, 5, 10, 25, 50, 100, 250, 500, 1000]]
),
distribution("parrhesia.db.query.query_time.ms",
event_name: [:parrhesia, :db, :query],
measurement: :query_time,
unit: {:native, :millisecond},
tags: [:repo_role],
tag_values: &repo_query_tag_values/1,
reporter_options: [buckets: [0, 1, 5, 10, 25, 50, 100, 250, 500, 1000]]
),
distribution("parrhesia.db.query.decode_time.ms",
event_name: [:parrhesia, :db, :query],
measurement: :decode_time,
unit: {:native, :millisecond},
tags: [:repo_role],
tag_values: &repo_query_tag_values/1,
reporter_options: [buckets: [0, 1, 5, 10, 25, 50, 100, 250, 500, 1000]]
),
distribution("parrhesia.db.query.idle_time.ms",
event_name: [:parrhesia, :db, :query],
measurement: :idle_time,
unit: {:native, :millisecond},
tags: [:repo_role],
tag_values: &repo_query_tag_values/1,
reporter_options: [buckets: [0, 1, 5, 10, 25, 50, 100, 250, 500, 1000]]
),
distribution("parrhesia.maintenance.purge_expired.duration.ms",
event_name: [:parrhesia, :maintenance, :purge_expired, :stop],
measurement: :duration,
unit: {:native, :millisecond},
reporter_options: [buckets: [1, 5, 10, 25, 50, 100, 250, 500, 1000]]
),
counter("parrhesia.maintenance.purge_expired.events.count",
event_name: [:parrhesia, :maintenance, :purge_expired, :stop],
measurement: :purged_events
),
distribution("parrhesia.maintenance.partition_retention.duration.ms",
event_name: [:parrhesia, :maintenance, :partition_retention, :stop],
measurement: :duration,
unit: {:native, :millisecond},
tags: [:status],
tag_values: &status_tag_values/1,
reporter_options: [buckets: [1, 5, 10, 25, 50, 100, 250, 500, 1000]]
),
counter("parrhesia.maintenance.partition_retention.dropped_partitions.count",
event_name: [:parrhesia, :maintenance, :partition_retention, :stop],
measurement: :dropped_partitions,
tags: [:status],
tag_values: &status_tag_values/1
),
last_value("parrhesia.vm.memory.total.bytes",
event_name: [:parrhesia, :vm, :memory],
measurement: :total,
unit: :byte,
reporter_options: [prometheus_type: :gauge]
),
last_value("parrhesia.vm.memory.processes.bytes",
event_name: [:parrhesia, :vm, :memory],
measurement: :processes,
unit: :byte,
reporter_options: [prometheus_type: :gauge]
),
last_value("parrhesia.vm.memory.system.bytes",
event_name: [:parrhesia, :vm, :memory],
measurement: :system,
unit: :byte,
reporter_options: [prometheus_type: :gauge]
),
last_value("parrhesia.vm.memory.atom.bytes",
event_name: [:parrhesia, :vm, :memory],
measurement: :atom,
unit: :byte,
reporter_options: [prometheus_type: :gauge]
),
last_value("parrhesia.vm.memory.binary.bytes",
event_name: [:parrhesia, :vm, :memory],
measurement: :binary,
unit: :byte,
reporter_options: [prometheus_type: :gauge]
),
last_value("parrhesia.vm.memory.ets.bytes",
event_name: [:parrhesia, :vm, :memory],
measurement: :ets,
unit: :byte,
reporter_options: [prometheus_type: :gauge]
)
]
end
@@ -126,8 +302,18 @@ defmodule Parrhesia.Telemetry do
@doc false
def emit_vm_memory do
total = :erlang.memory(:total)
emit([:parrhesia, :vm, :memory], %{total: total}, %{})
emit(
[:parrhesia, :vm, :memory],
%{
total: :erlang.memory(:total),
processes: :erlang.memory(:processes),
system: :erlang.memory(:system),
atom: :erlang.memory(:atom),
binary: :erlang.memory(:binary),
ets: :erlang.memory(:ets)
},
%{}
)
end
defp traffic_class_tag_values(metadata) do
@@ -135,8 +321,100 @@ defmodule Parrhesia.Telemetry do
%{traffic_class: traffic_class}
end
defp ingest_result_tag_values(metadata) do
%{
traffic_class: metadata |> Map.get(:traffic_class, :generic) |> to_string(),
outcome: metadata |> Map.get(:outcome, :unknown) |> to_string(),
reason: metadata |> Map.get(:reason, :unknown) |> to_string()
}
end
defp query_stop_tag_values(metadata) do
%{
traffic_class: metadata |> Map.get(:traffic_class, :generic) |> to_string(),
operation: metadata |> Map.get(:operation, :query) |> to_string()
}
end
defp query_result_tag_values(metadata) do
%{
traffic_class: metadata |> Map.get(:traffic_class, :generic) |> to_string(),
operation: metadata |> Map.get(:operation, :query) |> to_string(),
outcome: metadata |> Map.get(:outcome, :unknown) |> to_string()
}
end
defp strategy_tag_values(metadata) do
%{strategy: metadata |> Map.get(:strategy, :unknown) |> to_string()}
end
defp listener_tag_values(metadata) do
%{listener_id: metadata |> Map.get(:listener_id, :unknown) |> to_string()}
end
defp rate_limit_tag_values(metadata) do
%{
scope: metadata |> Map.get(:scope, :unknown) |> to_string(),
traffic_class: metadata |> Map.get(:traffic_class, :generic) |> to_string()
}
end
defp process_tag_values(metadata) do
process_type = metadata |> Map.get(:process_type, :unknown) |> to_string()
%{process_type: process_type}
end
defp repo_query_tag_values(metadata) do
%{repo_role: metadata |> Map.get(:repo_role, :unknown) |> to_string()}
end
defp status_tag_values(metadata) do
%{status: metadata |> Map.get(:status, :unknown) |> to_string()}
end
defp attach_repo_query_handlers do
:telemetry.detach(@repo_query_handler_id)
:telemetry.attach_many(
@repo_query_handler_id,
[[:parrhesia, :repo, :query], [:parrhesia, :read_repo, :query]],
&__MODULE__.handle_repo_query_event/4,
nil
)
:ok
rescue
ArgumentError -> :ok
end
@doc false
def handle_repo_query_event(event_name, measurements, _metadata, _config) do
repo_role =
case event_name do
[:parrhesia, :read_repo, :query] -> :read
[:parrhesia, :repo, :query] -> :write
end
total_time =
Map.get(
measurements,
:total_time,
Map.get(measurements, :queue_time, 0) +
Map.get(measurements, :query_time, 0) +
Map.get(measurements, :decode_time, 0)
)
emit(
[:parrhesia, :db, :query],
%{
count: 1,
total_time: total_time,
queue_time: Map.get(measurements, :queue_time, 0),
query_time: Map.get(measurements, :query_time, 0),
decode_time: Map.get(measurements, :decode_time, 0),
idle_time: Map.get(measurements, :idle_time, 0)
},
%{repo_role: repo_role}
)
end
end

View File

@@ -9,6 +9,7 @@ defmodule Parrhesia.Web.Connection do
alias Parrhesia.API.RequestContext
alias Parrhesia.API.Stream
alias Parrhesia.Auth.Challenges
alias Parrhesia.ConnectionStats
alias Parrhesia.Negentropy.Sessions
alias Parrhesia.Policy.ConnectionPolicy
alias Parrhesia.Policy.EventPolicy
@@ -70,7 +71,8 @@ defmodule Parrhesia.Web.Connection do
event_ingest_window_seconds: @default_event_ingest_window_seconds,
event_ingest_window_started_at_ms: 0,
event_ingest_count: 0,
auth_max_age_seconds: @default_auth_max_age_seconds
auth_max_age_seconds: @default_auth_max_age_seconds,
track_population?: true
@type overflow_strategy :: :close | :drop_oldest | :drop_newest
@@ -106,7 +108,8 @@ defmodule Parrhesia.Web.Connection do
event_ingest_window_seconds: pos_integer(),
event_ingest_window_started_at_ms: integer(),
event_ingest_count: non_neg_integer(),
auth_max_age_seconds: pos_integer()
auth_max_age_seconds: pos_integer(),
track_population?: boolean()
}
@impl true
@@ -134,9 +137,11 @@ defmodule Parrhesia.Web.Connection do
max_event_ingest_per_window: max_event_ingest_per_window(opts),
event_ingest_window_seconds: event_ingest_window_seconds(opts),
event_ingest_window_started_at_ms: System.monotonic_time(:millisecond),
auth_max_age_seconds: auth_max_age_seconds(opts)
auth_max_age_seconds: auth_max_age_seconds(opts),
track_population?: track_population?(opts)
}
:ok = maybe_track_connection_open(state)
Telemetry.emit_process_mailbox_depth(:connection)
{:ok, state}
end
@@ -303,6 +308,8 @@ defmodule Parrhesia.Web.Connection do
@impl true
def terminate(_reason, %__MODULE__{} = state) do
:ok = maybe_track_subscription_delta(state, -map_size(state.subscriptions))
:ok = maybe_track_connection_close(state)
:ok = maybe_unsubscribe_all_stream_subscriptions(state)
:ok = maybe_remove_index_owner(state)
:ok = maybe_clear_auth_challenge(state)
@@ -311,17 +318,21 @@ defmodule Parrhesia.Web.Connection do
defp handle_event_ingest(%__MODULE__{} = state, event) do
event_id = Map.get(event, "id", "")
traffic_class = traffic_class_for_event(event)
case maybe_allow_event_ingest(state) do
{:ok, next_state} ->
maybe_publish_ingested_event(next_state, state, event, event_id)
{:error, reason} ->
maybe_emit_rate_limit_hit(reason, traffic_class)
ingest_error_response(state, event_id, reason)
end
end
defp maybe_publish_ingested_event(next_state, state, event, event_id) do
traffic_class = traffic_class_for_event(event)
with :ok <-
maybe_allow_remote_ip_event_ingest(
next_state.remote_ip,
@@ -332,6 +343,7 @@ defmodule Parrhesia.Web.Connection do
publish_event_response(next_state, event)
else
{:error, reason} ->
maybe_emit_rate_limit_hit(reason, traffic_class)
ingest_error_response(state, event_id, reason)
end
end
@@ -437,6 +449,8 @@ defmodule Parrhesia.Web.Connection do
)
{:error, :subscription_limit_reached} ->
maybe_emit_rate_limit_hit(:subscription_limit_reached)
response =
Protocol.encode_relay({
:closed,
@@ -990,22 +1004,38 @@ defmodule Parrhesia.Web.Connection do
telemetry_metadata = telemetry_metadata_for_fanout_events(fanout_events)
case enqueue_fanout_events(state, fanout_events) do
{:ok, next_state} ->
{:ok, next_state, stats} ->
Telemetry.emit(
[:parrhesia, :fanout, :stop],
%{duration: System.monotonic_time() - started_at},
%{
duration: System.monotonic_time() - started_at,
considered: stats.considered,
enqueued: stats.enqueued
},
telemetry_metadata
)
{:ok, maybe_schedule_drain(next_state)}
{:close, next_state} ->
{:close, next_state, stats} ->
Telemetry.emit(
[:parrhesia, :connection, :outbound_queue, :overflow],
%{count: 1},
telemetry_metadata
)
Telemetry.emit(
[:parrhesia, :fanout, :stop],
%{
duration: System.monotonic_time() - started_at,
considered: stats.considered,
enqueued: stats.enqueued
},
telemetry_metadata
)
maybe_emit_rate_limit_hit(:outbound_queue_overflow, telemetry_metadata.traffic_class)
close_with_outbound_overflow(next_state)
end
end
@@ -1023,15 +1053,27 @@ defmodule Parrhesia.Web.Connection do
end
defp enqueue_fanout_events(state, fanout_events) do
Enum.reduce_while(fanout_events, {:ok, state}, fn
{subscription_id, event}, {:ok, acc} when is_binary(subscription_id) and is_map(event) ->
initial_stats = %{considered: 0, enqueued: 0}
Enum.reduce_while(fanout_events, {:ok, state, initial_stats}, fn
{subscription_id, event}, {:ok, acc, stats}
when is_binary(subscription_id) and is_map(event) ->
case maybe_enqueue_fanout_event(acc, subscription_id, event) do
{:ok, next_acc} -> {:cont, {:ok, next_acc}}
{:close, next_acc} -> {:halt, {:close, next_acc}}
{:ok, next_acc, enqueued?} ->
next_stats = %{
considered: stats.considered + 1,
enqueued: stats.enqueued + if(enqueued?, do: 1, else: 0)
}
{:cont, {:ok, next_acc, next_stats}}
{:close, next_acc} ->
next_stats = %{stats | considered: stats.considered + 1}
{:halt, {:close, next_acc, next_stats}}
end
_invalid_event, {:ok, acc} ->
{:cont, {:ok, acc}}
_invalid_event, {:ok, acc, stats} ->
{:cont, {:ok, acc, stats}}
end)
end
@@ -1039,7 +1081,7 @@ defmodule Parrhesia.Web.Connection do
if subscription_matches?(state, subscription_id, event) do
enqueue_outbound(state, {subscription_id, event}, traffic_class_for_event(event))
else
{:ok, state}
{:ok, state, false}
end
end
@@ -1065,15 +1107,17 @@ defmodule Parrhesia.Web.Connection do
}
emit_outbound_queue_depth(next_state, %{traffic_class: traffic_class})
{:ok, next_state}
{:ok, next_state, true}
end
defp enqueue_outbound(
%__MODULE__{outbound_overflow_strategy: :drop_newest} = state,
_queue_entry,
_traffic_class
),
do: {:ok, state}
) do
emit_outbound_queue_drop(:drop_newest)
{:ok, state, false}
end
defp enqueue_outbound(
%__MODULE__{outbound_overflow_strategy: :drop_oldest} = state,
@@ -1086,7 +1130,8 @@ defmodule Parrhesia.Web.Connection do
next_state = %__MODULE__{state | outbound_queue: next_queue, outbound_queue_size: next_size}
emit_outbound_queue_depth(next_state, %{traffic_class: traffic_class})
{:ok, next_state}
emit_outbound_queue_drop(:drop_oldest)
{:ok, next_state, true}
end
defp enqueue_outbound(
@@ -1123,6 +1168,7 @@ defmodule Parrhesia.Web.Connection do
}
|> maybe_schedule_drain()
emit_outbound_queue_drain(length(frames))
emit_outbound_queue_depth(next_state)
{Enum.reverse(frames), next_state}
@@ -1140,6 +1186,7 @@ defmodule Parrhesia.Web.Connection do
drain_scheduled?: false
}
emit_outbound_queue_drain(length(frames))
emit_outbound_queue_depth(next_state)
{Enum.reverse(frames), next_state}
@@ -1227,12 +1274,26 @@ defmodule Parrhesia.Web.Connection do
defp put_subscription(%__MODULE__{} = state, subscription_id, subscription) do
subscriptions = Map.put(state.subscriptions, subscription_id, subscription)
%__MODULE__{state | subscriptions: subscriptions}
next_state = %__MODULE__{state | subscriptions: subscriptions}
if Map.has_key?(state.subscriptions, subscription_id) do
next_state
else
:ok = maybe_track_subscription_delta(next_state, 1)
next_state
end
end
defp drop_subscription(%__MODULE__{} = state, subscription_id) do
subscriptions = Map.delete(state.subscriptions, subscription_id)
%__MODULE__{state | subscriptions: subscriptions}
next_state = %__MODULE__{state | subscriptions: subscriptions}
if Map.has_key?(state.subscriptions, subscription_id) do
:ok = maybe_track_subscription_delta(next_state, -1)
next_state
else
next_state
end
end
defp drop_queued_subscription_events(
@@ -1708,6 +1769,10 @@ defmodule Parrhesia.Web.Connection do
|> Keyword.get(:auth_max_age_seconds, @default_auth_max_age_seconds)
end
defp track_population?(opts) when is_list(opts), do: Keyword.get(opts, :track_population?, true)
defp track_population?(opts) when is_map(opts), do: Map.get(opts, :track_population?, true)
defp track_population?(_opts), do: true
defp maybe_configure_exit_trapping(opts) do
if trap_exit?(opts) do
Process.flag(:trap_exit, true)
@@ -1789,4 +1854,72 @@ defmodule Parrhesia.Web.Connection do
:exit, {:noproc, _details} -> :ok
:exit, {:normal, _details} -> :ok
end
defp maybe_track_connection_open(%__MODULE__{track_population?: false}), do: :ok
defp maybe_track_connection_open(%__MODULE__{} = state) do
ConnectionStats.connection_open(listener_id(state))
end
defp maybe_track_connection_close(%__MODULE__{track_population?: false}), do: :ok
defp maybe_track_connection_close(%__MODULE__{} = state) do
ConnectionStats.connection_close(listener_id(state))
end
defp maybe_track_subscription_delta(_state, 0), do: :ok
defp maybe_track_subscription_delta(%__MODULE__{track_population?: false}, _delta), do: :ok
defp maybe_track_subscription_delta(%__MODULE__{} = state, delta) do
ConnectionStats.subscriptions_change(listener_id(state), delta)
end
defp listener_id(%__MODULE__{listener: %{id: id}}), do: id
defp listener_id(_state), do: :unknown
defp emit_outbound_queue_drain(0), do: :ok
defp emit_outbound_queue_drain(count) when is_integer(count) and count > 0 do
Telemetry.emit([:parrhesia, :connection, :outbound_queue, :drain], %{count: count}, %{})
end
defp emit_outbound_queue_drop(strategy) do
Telemetry.emit(
[:parrhesia, :connection, :outbound_queue, :drop],
%{count: 1},
%{strategy: strategy}
)
end
defp maybe_emit_rate_limit_hit(reason, traffic_class \\ :generic)
defp maybe_emit_rate_limit_hit(:event_rate_limited, traffic_class) do
emit_rate_limit_hit(:event_ingest_per_connection, traffic_class)
end
defp maybe_emit_rate_limit_hit(:ip_event_rate_limited, traffic_class) do
emit_rate_limit_hit(:event_ingest_per_ip, traffic_class)
end
defp maybe_emit_rate_limit_hit(:relay_event_rate_limited, traffic_class) do
emit_rate_limit_hit(:event_ingest_relay, traffic_class)
end
defp maybe_emit_rate_limit_hit(:subscription_limit_reached, traffic_class) do
emit_rate_limit_hit(:subscriptions_per_connection, traffic_class)
end
defp maybe_emit_rate_limit_hit(:outbound_queue_overflow, traffic_class) do
emit_rate_limit_hit(:outbound_queue, traffic_class)
end
defp maybe_emit_rate_limit_hit(_reason, _traffic_class), do: :ok
defp emit_rate_limit_hit(scope, traffic_class) do
Telemetry.emit(
[:parrhesia, :rate_limit, :hit],
%{count: 1},
%{scope: scope, traffic_class: traffic_class}
)
end
end