diff --git a/lib/parrhesia/api/events.ex b/lib/parrhesia/api/events.ex index 5f2b45e..c795744 100644 --- a/lib/parrhesia/api/events.ex +++ b/lib/parrhesia/api/events.ex @@ -35,6 +35,7 @@ defmodule Parrhesia.API.Events do def publish(event, opts) when is_map(event) and is_list(opts) do started_at = System.monotonic_time() event_id = Map.get(event, "id", "") + telemetry_metadata = telemetry_metadata_for_event(event) with {:ok, context} <- fetch_context(opts), :ok <- validate_event_payload_size(event, max_event_bytes(opts)), @@ -45,9 +46,11 @@ defmodule Parrhesia.API.Events do Telemetry.emit( [:parrhesia, :ingest, :stop], %{duration: System.monotonic_time() - started_at}, - telemetry_metadata_for_event(event) + telemetry_metadata ) + emit_ingest_result(telemetry_metadata, :accepted, :accepted) + message = case NIP43.finalize_publish(event, publish_state, nip43_opts(opts, context)) do {:ok, override} when is_binary(override) -> override @@ -66,9 +69,12 @@ defmodule Parrhesia.API.Events do }} else {:error, :invalid_context} = error -> + emit_ingest_result(telemetry_metadata, :rejected, :invalid_context) error {:error, reason} -> + emit_ingest_result(telemetry_metadata, :rejected, reason) + {:ok, %PublishResult{ event_id: event_id, @@ -86,6 +92,7 @@ defmodule Parrhesia.API.Events do def query(filters, opts) when is_list(filters) and is_list(opts) do started_at = System.monotonic_time() + telemetry_metadata = telemetry_metadata_for_filters(filters, :query) with {:ok, context} <- fetch_context(opts), :ok <- maybe_validate_filters(filters, opts), @@ -95,11 +102,17 @@ defmodule Parrhesia.API.Events do Telemetry.emit( [:parrhesia, :query, :stop], - %{duration: System.monotonic_time() - started_at}, - telemetry_metadata_for_filters(filters) + %{duration: System.monotonic_time() - started_at, result_count: length(events)}, + telemetry_metadata ) + emit_query_result(telemetry_metadata, :ok) + {:ok, events} + else + {:error, reason} = error -> + emit_query_result(telemetry_metadata, :error, reason) + error end end @@ -110,6 +123,7 @@ defmodule Parrhesia.API.Events do def count(filters, opts) when is_list(filters) and is_list(opts) do started_at = System.monotonic_time() + telemetry_metadata = telemetry_metadata_for_filters(filters, :count) with {:ok, context} <- fetch_context(opts), :ok <- maybe_validate_filters(filters, opts), @@ -120,11 +134,17 @@ defmodule Parrhesia.API.Events do {:ok, result} <- maybe_build_count_result(filters, count, Keyword.get(opts, :options)) do Telemetry.emit( [:parrhesia, :query, :stop], - %{duration: System.monotonic_time() - started_at}, - telemetry_metadata_for_filters(filters) + %{duration: System.monotonic_time() - started_at, result_count: count}, + telemetry_metadata ) + emit_query_result(telemetry_metadata, :ok) + {:ok, result} + else + {:error, reason} = error -> + emit_query_result(telemetry_metadata, :error, reason) + error end end @@ -242,8 +262,8 @@ defmodule Parrhesia.API.Events do %{traffic_class: traffic_class_for_event(event)} end - defp telemetry_metadata_for_filters(filters) do - %{traffic_class: traffic_class_for_filters(filters)} + defp telemetry_metadata_for_filters(filters, operation) do + %{traffic_class: traffic_class_for_filters(filters), operation: operation} end defp traffic_class_for_filters(filters) do @@ -276,6 +296,30 @@ defmodule Parrhesia.API.Events do defp traffic_class_for_event(_event), do: :generic + defp emit_ingest_result(metadata, outcome, reason) do + Telemetry.emit( + [:parrhesia, :ingest, :result], + %{count: 1}, + Map.merge(metadata, %{outcome: outcome, reason: normalize_reason(reason)}) + ) + end + + defp emit_query_result(metadata, outcome, reason \\ nil) do + Telemetry.emit( + [:parrhesia, :query, :result], + %{count: 1}, + Map.merge( + metadata, + %{outcome: outcome, reason: normalize_reason(reason || outcome)} + ) + ) + end + + defp normalize_reason(reason) when is_atom(reason), do: reason + defp normalize_reason(reason) when is_binary(reason), do: reason + defp normalize_reason(nil), do: :none + defp normalize_reason(_reason), do: :unknown + defp fetch_context(opts) do case Keyword.get(opts, :context) do %RequestContext{} = context -> {:ok, context} diff --git a/lib/parrhesia/connection_stats.ex b/lib/parrhesia/connection_stats.ex new file mode 100644 index 0000000..fc0f615 --- /dev/null +++ b/lib/parrhesia/connection_stats.ex @@ -0,0 +1,84 @@ +defmodule Parrhesia.ConnectionStats do + @moduledoc false + + use GenServer + + alias Parrhesia.Telemetry + + defstruct connections: %{}, subscriptions: %{} + + @type state :: %__MODULE__{ + connections: %{(atom() | String.t()) => non_neg_integer()}, + subscriptions: %{(atom() | String.t()) => non_neg_integer()} + } + + @spec start_link(keyword()) :: GenServer.on_start() + def start_link(opts \\ []) do + name = Keyword.get(opts, :name, __MODULE__) + GenServer.start_link(__MODULE__, %__MODULE__{}, name: name) + end + + @spec connection_open(atom() | String.t()) :: :ok + def connection_open(listener_id), do: cast({:connection_open, listener_id}) + + @spec connection_close(atom() | String.t()) :: :ok + def connection_close(listener_id), do: cast({:connection_close, listener_id}) + + @spec subscriptions_change(atom() | String.t(), integer()) :: :ok + def subscriptions_change(listener_id, delta) when is_integer(delta) do + cast({:subscriptions_change, listener_id, delta}) + end + + @impl true + def init(%__MODULE__{} = state), do: {:ok, state} + + @impl true + def handle_cast({:connection_open, listener_id}, %__MODULE__{} = state) do + listener_id = normalize_listener_id(listener_id) + next_state = %{state | connections: increment(state.connections, listener_id, 1)} + emit_population(listener_id, next_state) + {:noreply, next_state} + end + + def handle_cast({:connection_close, listener_id}, %__MODULE__{} = state) do + listener_id = normalize_listener_id(listener_id) + next_state = %{state | connections: increment(state.connections, listener_id, -1)} + emit_population(listener_id, next_state) + {:noreply, next_state} + end + + def handle_cast({:subscriptions_change, listener_id, delta}, %__MODULE__{} = state) do + listener_id = normalize_listener_id(listener_id) + next_state = %{state | subscriptions: increment(state.subscriptions, listener_id, delta)} + emit_population(listener_id, next_state) + {:noreply, next_state} + end + + defp cast(message) do + GenServer.cast(__MODULE__, message) + :ok + catch + :exit, {:noproc, _details} -> :ok + :exit, {:normal, _details} -> :ok + end + + defp increment(counts, key, delta) do + current = Map.get(counts, key, 0) + Map.put(counts, key, max(current + delta, 0)) + end + + defp emit_population(listener_id, %__MODULE__{} = state) do + Telemetry.emit( + [:parrhesia, :listener, :population], + %{ + connections: Map.get(state.connections, listener_id, 0), + subscriptions: Map.get(state.subscriptions, listener_id, 0) + }, + %{listener_id: listener_id} + ) + end + + defp normalize_listener_id(listener_id) when is_atom(listener_id), do: listener_id + defp normalize_listener_id(listener_id) when is_binary(listener_id), do: listener_id + defp normalize_listener_id(_listener_id), do: :unknown +end diff --git a/lib/parrhesia/runtime.ex b/lib/parrhesia/runtime.ex index 907d279..f03edc1 100644 --- a/lib/parrhesia/runtime.ex +++ b/lib/parrhesia/runtime.ex @@ -16,6 +16,7 @@ defmodule Parrhesia.Runtime do def children do [ Parrhesia.Telemetry, + Parrhesia.ConnectionStats, Parrhesia.Config, Parrhesia.Web.EventIngestLimiter, Parrhesia.Web.IPEventIngestLimiter, diff --git a/lib/parrhesia/tasks/expiration_worker.ex b/lib/parrhesia/tasks/expiration_worker.ex index d9069b7..1b0d9f9 100644 --- a/lib/parrhesia/tasks/expiration_worker.ex +++ b/lib/parrhesia/tasks/expiration_worker.ex @@ -30,10 +30,19 @@ defmodule Parrhesia.Tasks.ExpirationWorker do def handle_info(:tick, state) do started_at = System.monotonic_time() - _result = Storage.events().purge_expired([]) + purged_events = + case Storage.events().purge_expired([]) do + {:ok, count} when is_integer(count) and count >= 0 -> count + _other -> 0 + end duration = System.monotonic_time() - started_at - Telemetry.emit([:parrhesia, :maintenance, :purge_expired, :stop], %{duration: duration}, %{}) + + Telemetry.emit( + [:parrhesia, :maintenance, :purge_expired, :stop], + %{duration: duration, purged_events: purged_events}, + %{} + ) schedule_tick(state.interval_ms) {:noreply, state} diff --git a/lib/parrhesia/telemetry.ex b/lib/parrhesia/telemetry.ex index bea907c..5a5958d 100644 --- a/lib/parrhesia/telemetry.ex +++ b/lib/parrhesia/telemetry.ex @@ -7,6 +7,7 @@ defmodule Parrhesia.Telemetry do import Telemetry.Metrics + @repo_query_handler_id "parrhesia-repo-query-handler" @prometheus_reporter __MODULE__.Prometheus @spec start_link(keyword()) :: Supervisor.on_start() @@ -16,6 +17,8 @@ defmodule Parrhesia.Telemetry do @impl true def init(_init_arg) do + :ok = attach_repo_query_handlers() + children = [ {TelemetryMetricsPrometheus.Core, name: @prometheus_reporter, metrics: metrics()}, {:telemetry_poller, measurements: periodic_measurements(), period: 10_000} @@ -30,6 +33,12 @@ defmodule Parrhesia.Telemetry do @spec metrics() :: [Telemetry.Metrics.t()] def metrics do [ + counter("parrhesia.ingest.events.count", + event_name: [:parrhesia, :ingest, :result], + measurement: :count, + tags: [:traffic_class, :outcome, :reason], + tag_values: &ingest_result_tag_values/1 + ), distribution("parrhesia.ingest.duration.ms", event_name: [:parrhesia, :ingest, :stop], measurement: :duration, @@ -38,14 +47,27 @@ defmodule Parrhesia.Telemetry do tag_values: &traffic_class_tag_values/1, reporter_options: [buckets: [1, 5, 10, 25, 50, 100, 250, 500, 1000]] ), + counter("parrhesia.query.requests.count", + event_name: [:parrhesia, :query, :result], + measurement: :count, + tags: [:traffic_class, :operation, :outcome], + tag_values: &query_result_tag_values/1 + ), distribution("parrhesia.query.duration.ms", event_name: [:parrhesia, :query, :stop], measurement: :duration, unit: {:native, :millisecond}, - tags: [:traffic_class], - tag_values: &traffic_class_tag_values/1, + tags: [:traffic_class, :operation], + tag_values: &query_stop_tag_values/1, reporter_options: [buckets: [1, 5, 10, 25, 50, 100, 250, 500, 1000]] ), + distribution("parrhesia.query.results.count", + event_name: [:parrhesia, :query, :stop], + measurement: :result_count, + tags: [:traffic_class, :operation], + tag_values: &query_stop_tag_values/1, + reporter_options: [buckets: [0, 1, 5, 10, 25, 50, 100, 250, 500, 1000, 5000]] + ), distribution("parrhesia.fanout.duration.ms", event_name: [:parrhesia, :fanout, :stop], measurement: :duration, @@ -54,6 +76,25 @@ defmodule Parrhesia.Telemetry do tag_values: &traffic_class_tag_values/1, reporter_options: [buckets: [1, 5, 10, 25, 50, 100, 250, 500, 1000]] ), + counter("parrhesia.fanout.events_considered.count", + event_name: [:parrhesia, :fanout, :stop], + measurement: :considered, + tags: [:traffic_class], + tag_values: &traffic_class_tag_values/1 + ), + counter("parrhesia.fanout.events_enqueued.count", + event_name: [:parrhesia, :fanout, :stop], + measurement: :enqueued, + tags: [:traffic_class], + tag_values: &traffic_class_tag_values/1 + ), + distribution("parrhesia.fanout.batch_size", + event_name: [:parrhesia, :fanout, :stop], + measurement: :enqueued, + tags: [:traffic_class], + tag_values: &traffic_class_tag_values/1, + reporter_options: [buckets: [0, 1, 5, 10, 25, 50, 100, 250, 500, 1000]] + ), last_value("parrhesia.connection.outbound_queue.depth", event_name: [:parrhesia, :connection, :outbound_queue], measurement: :depth, @@ -80,6 +121,41 @@ defmodule Parrhesia.Telemetry do tags: [:traffic_class], tag_values: &traffic_class_tag_values/1 ), + counter("parrhesia.connection.outbound_queue.drained_frames.count", + event_name: [:parrhesia, :connection, :outbound_queue, :drain], + measurement: :count + ), + distribution("parrhesia.connection.outbound_queue.drain_batch_size", + event_name: [:parrhesia, :connection, :outbound_queue, :drain], + measurement: :count, + reporter_options: [buckets: [0, 1, 5, 10, 25, 50, 100, 250]] + ), + counter("parrhesia.connection.outbound_queue.dropped_events.count", + event_name: [:parrhesia, :connection, :outbound_queue, :drop], + measurement: :count, + tags: [:strategy], + tag_values: &strategy_tag_values/1 + ), + last_value("parrhesia.listener.connections.active", + event_name: [:parrhesia, :listener, :population], + measurement: :connections, + tags: [:listener_id], + tag_values: &listener_tag_values/1, + reporter_options: [prometheus_type: :gauge] + ), + last_value("parrhesia.listener.subscriptions.active", + event_name: [:parrhesia, :listener, :population], + measurement: :subscriptions, + tags: [:listener_id], + tag_values: &listener_tag_values/1, + reporter_options: [prometheus_type: :gauge] + ), + counter("parrhesia.rate_limit.hits.count", + event_name: [:parrhesia, :rate_limit, :hit], + measurement: :count, + tags: [:scope, :traffic_class], + tag_values: &rate_limit_tag_values/1 + ), last_value("parrhesia.process.mailbox.depth", event_name: [:parrhesia, :process, :mailbox], measurement: :depth, @@ -87,11 +163,111 @@ defmodule Parrhesia.Telemetry do tag_values: &process_tag_values/1, reporter_options: [prometheus_type: :gauge] ), + counter("parrhesia.db.query.count", + event_name: [:parrhesia, :db, :query], + measurement: :count, + tags: [:repo_role], + tag_values: &repo_query_tag_values/1 + ), + distribution("parrhesia.db.query.total_time.ms", + event_name: [:parrhesia, :db, :query], + measurement: :total_time, + unit: {:native, :millisecond}, + tags: [:repo_role], + tag_values: &repo_query_tag_values/1, + reporter_options: [buckets: [1, 5, 10, 25, 50, 100, 250, 500, 1000]] + ), + distribution("parrhesia.db.query.queue_time.ms", + event_name: [:parrhesia, :db, :query], + measurement: :queue_time, + unit: {:native, :millisecond}, + tags: [:repo_role], + tag_values: &repo_query_tag_values/1, + reporter_options: [buckets: [0, 1, 5, 10, 25, 50, 100, 250, 500, 1000]] + ), + distribution("parrhesia.db.query.query_time.ms", + event_name: [:parrhesia, :db, :query], + measurement: :query_time, + unit: {:native, :millisecond}, + tags: [:repo_role], + tag_values: &repo_query_tag_values/1, + reporter_options: [buckets: [0, 1, 5, 10, 25, 50, 100, 250, 500, 1000]] + ), + distribution("parrhesia.db.query.decode_time.ms", + event_name: [:parrhesia, :db, :query], + measurement: :decode_time, + unit: {:native, :millisecond}, + tags: [:repo_role], + tag_values: &repo_query_tag_values/1, + reporter_options: [buckets: [0, 1, 5, 10, 25, 50, 100, 250, 500, 1000]] + ), + distribution("parrhesia.db.query.idle_time.ms", + event_name: [:parrhesia, :db, :query], + measurement: :idle_time, + unit: {:native, :millisecond}, + tags: [:repo_role], + tag_values: &repo_query_tag_values/1, + reporter_options: [buckets: [0, 1, 5, 10, 25, 50, 100, 250, 500, 1000]] + ), + distribution("parrhesia.maintenance.purge_expired.duration.ms", + event_name: [:parrhesia, :maintenance, :purge_expired, :stop], + measurement: :duration, + unit: {:native, :millisecond}, + reporter_options: [buckets: [1, 5, 10, 25, 50, 100, 250, 500, 1000]] + ), + counter("parrhesia.maintenance.purge_expired.events.count", + event_name: [:parrhesia, :maintenance, :purge_expired, :stop], + measurement: :purged_events + ), + distribution("parrhesia.maintenance.partition_retention.duration.ms", + event_name: [:parrhesia, :maintenance, :partition_retention, :stop], + measurement: :duration, + unit: {:native, :millisecond}, + tags: [:status], + tag_values: &status_tag_values/1, + reporter_options: [buckets: [1, 5, 10, 25, 50, 100, 250, 500, 1000]] + ), + counter("parrhesia.maintenance.partition_retention.dropped_partitions.count", + event_name: [:parrhesia, :maintenance, :partition_retention, :stop], + measurement: :dropped_partitions, + tags: [:status], + tag_values: &status_tag_values/1 + ), last_value("parrhesia.vm.memory.total.bytes", event_name: [:parrhesia, :vm, :memory], measurement: :total, unit: :byte, reporter_options: [prometheus_type: :gauge] + ), + last_value("parrhesia.vm.memory.processes.bytes", + event_name: [:parrhesia, :vm, :memory], + measurement: :processes, + unit: :byte, + reporter_options: [prometheus_type: :gauge] + ), + last_value("parrhesia.vm.memory.system.bytes", + event_name: [:parrhesia, :vm, :memory], + measurement: :system, + unit: :byte, + reporter_options: [prometheus_type: :gauge] + ), + last_value("parrhesia.vm.memory.atom.bytes", + event_name: [:parrhesia, :vm, :memory], + measurement: :atom, + unit: :byte, + reporter_options: [prometheus_type: :gauge] + ), + last_value("parrhesia.vm.memory.binary.bytes", + event_name: [:parrhesia, :vm, :memory], + measurement: :binary, + unit: :byte, + reporter_options: [prometheus_type: :gauge] + ), + last_value("parrhesia.vm.memory.ets.bytes", + event_name: [:parrhesia, :vm, :memory], + measurement: :ets, + unit: :byte, + reporter_options: [prometheus_type: :gauge] ) ] end @@ -126,8 +302,18 @@ defmodule Parrhesia.Telemetry do @doc false def emit_vm_memory do - total = :erlang.memory(:total) - emit([:parrhesia, :vm, :memory], %{total: total}, %{}) + emit( + [:parrhesia, :vm, :memory], + %{ + total: :erlang.memory(:total), + processes: :erlang.memory(:processes), + system: :erlang.memory(:system), + atom: :erlang.memory(:atom), + binary: :erlang.memory(:binary), + ets: :erlang.memory(:ets) + }, + %{} + ) end defp traffic_class_tag_values(metadata) do @@ -135,8 +321,100 @@ defmodule Parrhesia.Telemetry do %{traffic_class: traffic_class} end + defp ingest_result_tag_values(metadata) do + %{ + traffic_class: metadata |> Map.get(:traffic_class, :generic) |> to_string(), + outcome: metadata |> Map.get(:outcome, :unknown) |> to_string(), + reason: metadata |> Map.get(:reason, :unknown) |> to_string() + } + end + + defp query_stop_tag_values(metadata) do + %{ + traffic_class: metadata |> Map.get(:traffic_class, :generic) |> to_string(), + operation: metadata |> Map.get(:operation, :query) |> to_string() + } + end + + defp query_result_tag_values(metadata) do + %{ + traffic_class: metadata |> Map.get(:traffic_class, :generic) |> to_string(), + operation: metadata |> Map.get(:operation, :query) |> to_string(), + outcome: metadata |> Map.get(:outcome, :unknown) |> to_string() + } + end + + defp strategy_tag_values(metadata) do + %{strategy: metadata |> Map.get(:strategy, :unknown) |> to_string()} + end + + defp listener_tag_values(metadata) do + %{listener_id: metadata |> Map.get(:listener_id, :unknown) |> to_string()} + end + + defp rate_limit_tag_values(metadata) do + %{ + scope: metadata |> Map.get(:scope, :unknown) |> to_string(), + traffic_class: metadata |> Map.get(:traffic_class, :generic) |> to_string() + } + end + defp process_tag_values(metadata) do process_type = metadata |> Map.get(:process_type, :unknown) |> to_string() %{process_type: process_type} end + + defp repo_query_tag_values(metadata) do + %{repo_role: metadata |> Map.get(:repo_role, :unknown) |> to_string()} + end + + defp status_tag_values(metadata) do + %{status: metadata |> Map.get(:status, :unknown) |> to_string()} + end + + defp attach_repo_query_handlers do + :telemetry.detach(@repo_query_handler_id) + + :telemetry.attach_many( + @repo_query_handler_id, + [[:parrhesia, :repo, :query], [:parrhesia, :read_repo, :query]], + &__MODULE__.handle_repo_query_event/4, + nil + ) + + :ok + rescue + ArgumentError -> :ok + end + + @doc false + def handle_repo_query_event(event_name, measurements, _metadata, _config) do + repo_role = + case event_name do + [:parrhesia, :read_repo, :query] -> :read + [:parrhesia, :repo, :query] -> :write + end + + total_time = + Map.get( + measurements, + :total_time, + Map.get(measurements, :queue_time, 0) + + Map.get(measurements, :query_time, 0) + + Map.get(measurements, :decode_time, 0) + ) + + emit( + [:parrhesia, :db, :query], + %{ + count: 1, + total_time: total_time, + queue_time: Map.get(measurements, :queue_time, 0), + query_time: Map.get(measurements, :query_time, 0), + decode_time: Map.get(measurements, :decode_time, 0), + idle_time: Map.get(measurements, :idle_time, 0) + }, + %{repo_role: repo_role} + ) + end end diff --git a/lib/parrhesia/web/connection.ex b/lib/parrhesia/web/connection.ex index d096d96..3f0abff 100644 --- a/lib/parrhesia/web/connection.ex +++ b/lib/parrhesia/web/connection.ex @@ -9,6 +9,7 @@ defmodule Parrhesia.Web.Connection do alias Parrhesia.API.RequestContext alias Parrhesia.API.Stream alias Parrhesia.Auth.Challenges + alias Parrhesia.ConnectionStats alias Parrhesia.Negentropy.Sessions alias Parrhesia.Policy.ConnectionPolicy alias Parrhesia.Policy.EventPolicy @@ -70,7 +71,8 @@ defmodule Parrhesia.Web.Connection do event_ingest_window_seconds: @default_event_ingest_window_seconds, event_ingest_window_started_at_ms: 0, event_ingest_count: 0, - auth_max_age_seconds: @default_auth_max_age_seconds + auth_max_age_seconds: @default_auth_max_age_seconds, + track_population?: true @type overflow_strategy :: :close | :drop_oldest | :drop_newest @@ -106,7 +108,8 @@ defmodule Parrhesia.Web.Connection do event_ingest_window_seconds: pos_integer(), event_ingest_window_started_at_ms: integer(), event_ingest_count: non_neg_integer(), - auth_max_age_seconds: pos_integer() + auth_max_age_seconds: pos_integer(), + track_population?: boolean() } @impl true @@ -134,9 +137,11 @@ defmodule Parrhesia.Web.Connection do max_event_ingest_per_window: max_event_ingest_per_window(opts), event_ingest_window_seconds: event_ingest_window_seconds(opts), event_ingest_window_started_at_ms: System.monotonic_time(:millisecond), - auth_max_age_seconds: auth_max_age_seconds(opts) + auth_max_age_seconds: auth_max_age_seconds(opts), + track_population?: track_population?(opts) } + :ok = maybe_track_connection_open(state) Telemetry.emit_process_mailbox_depth(:connection) {:ok, state} end @@ -303,6 +308,8 @@ defmodule Parrhesia.Web.Connection do @impl true def terminate(_reason, %__MODULE__{} = state) do + :ok = maybe_track_subscription_delta(state, -map_size(state.subscriptions)) + :ok = maybe_track_connection_close(state) :ok = maybe_unsubscribe_all_stream_subscriptions(state) :ok = maybe_remove_index_owner(state) :ok = maybe_clear_auth_challenge(state) @@ -311,17 +318,21 @@ defmodule Parrhesia.Web.Connection do defp handle_event_ingest(%__MODULE__{} = state, event) do event_id = Map.get(event, "id", "") + traffic_class = traffic_class_for_event(event) case maybe_allow_event_ingest(state) do {:ok, next_state} -> maybe_publish_ingested_event(next_state, state, event, event_id) {:error, reason} -> + maybe_emit_rate_limit_hit(reason, traffic_class) ingest_error_response(state, event_id, reason) end end defp maybe_publish_ingested_event(next_state, state, event, event_id) do + traffic_class = traffic_class_for_event(event) + with :ok <- maybe_allow_remote_ip_event_ingest( next_state.remote_ip, @@ -332,6 +343,7 @@ defmodule Parrhesia.Web.Connection do publish_event_response(next_state, event) else {:error, reason} -> + maybe_emit_rate_limit_hit(reason, traffic_class) ingest_error_response(state, event_id, reason) end end @@ -437,6 +449,8 @@ defmodule Parrhesia.Web.Connection do ) {:error, :subscription_limit_reached} -> + maybe_emit_rate_limit_hit(:subscription_limit_reached) + response = Protocol.encode_relay({ :closed, @@ -990,22 +1004,38 @@ defmodule Parrhesia.Web.Connection do telemetry_metadata = telemetry_metadata_for_fanout_events(fanout_events) case enqueue_fanout_events(state, fanout_events) do - {:ok, next_state} -> + {:ok, next_state, stats} -> Telemetry.emit( [:parrhesia, :fanout, :stop], - %{duration: System.monotonic_time() - started_at}, + %{ + duration: System.monotonic_time() - started_at, + considered: stats.considered, + enqueued: stats.enqueued + }, telemetry_metadata ) {:ok, maybe_schedule_drain(next_state)} - {:close, next_state} -> + {:close, next_state, stats} -> Telemetry.emit( [:parrhesia, :connection, :outbound_queue, :overflow], %{count: 1}, telemetry_metadata ) + Telemetry.emit( + [:parrhesia, :fanout, :stop], + %{ + duration: System.monotonic_time() - started_at, + considered: stats.considered, + enqueued: stats.enqueued + }, + telemetry_metadata + ) + + maybe_emit_rate_limit_hit(:outbound_queue_overflow, telemetry_metadata.traffic_class) + close_with_outbound_overflow(next_state) end end @@ -1023,15 +1053,27 @@ defmodule Parrhesia.Web.Connection do end defp enqueue_fanout_events(state, fanout_events) do - Enum.reduce_while(fanout_events, {:ok, state}, fn - {subscription_id, event}, {:ok, acc} when is_binary(subscription_id) and is_map(event) -> + initial_stats = %{considered: 0, enqueued: 0} + + Enum.reduce_while(fanout_events, {:ok, state, initial_stats}, fn + {subscription_id, event}, {:ok, acc, stats} + when is_binary(subscription_id) and is_map(event) -> case maybe_enqueue_fanout_event(acc, subscription_id, event) do - {:ok, next_acc} -> {:cont, {:ok, next_acc}} - {:close, next_acc} -> {:halt, {:close, next_acc}} + {:ok, next_acc, enqueued?} -> + next_stats = %{ + considered: stats.considered + 1, + enqueued: stats.enqueued + if(enqueued?, do: 1, else: 0) + } + + {:cont, {:ok, next_acc, next_stats}} + + {:close, next_acc} -> + next_stats = %{stats | considered: stats.considered + 1} + {:halt, {:close, next_acc, next_stats}} end - _invalid_event, {:ok, acc} -> - {:cont, {:ok, acc}} + _invalid_event, {:ok, acc, stats} -> + {:cont, {:ok, acc, stats}} end) end @@ -1039,7 +1081,7 @@ defmodule Parrhesia.Web.Connection do if subscription_matches?(state, subscription_id, event) do enqueue_outbound(state, {subscription_id, event}, traffic_class_for_event(event)) else - {:ok, state} + {:ok, state, false} end end @@ -1065,15 +1107,17 @@ defmodule Parrhesia.Web.Connection do } emit_outbound_queue_depth(next_state, %{traffic_class: traffic_class}) - {:ok, next_state} + {:ok, next_state, true} end defp enqueue_outbound( %__MODULE__{outbound_overflow_strategy: :drop_newest} = state, _queue_entry, _traffic_class - ), - do: {:ok, state} + ) do + emit_outbound_queue_drop(:drop_newest) + {:ok, state, false} + end defp enqueue_outbound( %__MODULE__{outbound_overflow_strategy: :drop_oldest} = state, @@ -1086,7 +1130,8 @@ defmodule Parrhesia.Web.Connection do next_state = %__MODULE__{state | outbound_queue: next_queue, outbound_queue_size: next_size} emit_outbound_queue_depth(next_state, %{traffic_class: traffic_class}) - {:ok, next_state} + emit_outbound_queue_drop(:drop_oldest) + {:ok, next_state, true} end defp enqueue_outbound( @@ -1123,6 +1168,7 @@ defmodule Parrhesia.Web.Connection do } |> maybe_schedule_drain() + emit_outbound_queue_drain(length(frames)) emit_outbound_queue_depth(next_state) {Enum.reverse(frames), next_state} @@ -1140,6 +1186,7 @@ defmodule Parrhesia.Web.Connection do drain_scheduled?: false } + emit_outbound_queue_drain(length(frames)) emit_outbound_queue_depth(next_state) {Enum.reverse(frames), next_state} @@ -1227,12 +1274,26 @@ defmodule Parrhesia.Web.Connection do defp put_subscription(%__MODULE__{} = state, subscription_id, subscription) do subscriptions = Map.put(state.subscriptions, subscription_id, subscription) - %__MODULE__{state | subscriptions: subscriptions} + next_state = %__MODULE__{state | subscriptions: subscriptions} + + if Map.has_key?(state.subscriptions, subscription_id) do + next_state + else + :ok = maybe_track_subscription_delta(next_state, 1) + next_state + end end defp drop_subscription(%__MODULE__{} = state, subscription_id) do subscriptions = Map.delete(state.subscriptions, subscription_id) - %__MODULE__{state | subscriptions: subscriptions} + next_state = %__MODULE__{state | subscriptions: subscriptions} + + if Map.has_key?(state.subscriptions, subscription_id) do + :ok = maybe_track_subscription_delta(next_state, -1) + next_state + else + next_state + end end defp drop_queued_subscription_events( @@ -1708,6 +1769,10 @@ defmodule Parrhesia.Web.Connection do |> Keyword.get(:auth_max_age_seconds, @default_auth_max_age_seconds) end + defp track_population?(opts) when is_list(opts), do: Keyword.get(opts, :track_population?, true) + defp track_population?(opts) when is_map(opts), do: Map.get(opts, :track_population?, true) + defp track_population?(_opts), do: true + defp maybe_configure_exit_trapping(opts) do if trap_exit?(opts) do Process.flag(:trap_exit, true) @@ -1789,4 +1854,72 @@ defmodule Parrhesia.Web.Connection do :exit, {:noproc, _details} -> :ok :exit, {:normal, _details} -> :ok end + + defp maybe_track_connection_open(%__MODULE__{track_population?: false}), do: :ok + + defp maybe_track_connection_open(%__MODULE__{} = state) do + ConnectionStats.connection_open(listener_id(state)) + end + + defp maybe_track_connection_close(%__MODULE__{track_population?: false}), do: :ok + + defp maybe_track_connection_close(%__MODULE__{} = state) do + ConnectionStats.connection_close(listener_id(state)) + end + + defp maybe_track_subscription_delta(_state, 0), do: :ok + defp maybe_track_subscription_delta(%__MODULE__{track_population?: false}, _delta), do: :ok + + defp maybe_track_subscription_delta(%__MODULE__{} = state, delta) do + ConnectionStats.subscriptions_change(listener_id(state), delta) + end + + defp listener_id(%__MODULE__{listener: %{id: id}}), do: id + defp listener_id(_state), do: :unknown + + defp emit_outbound_queue_drain(0), do: :ok + + defp emit_outbound_queue_drain(count) when is_integer(count) and count > 0 do + Telemetry.emit([:parrhesia, :connection, :outbound_queue, :drain], %{count: count}, %{}) + end + + defp emit_outbound_queue_drop(strategy) do + Telemetry.emit( + [:parrhesia, :connection, :outbound_queue, :drop], + %{count: 1}, + %{strategy: strategy} + ) + end + + defp maybe_emit_rate_limit_hit(reason, traffic_class \\ :generic) + + defp maybe_emit_rate_limit_hit(:event_rate_limited, traffic_class) do + emit_rate_limit_hit(:event_ingest_per_connection, traffic_class) + end + + defp maybe_emit_rate_limit_hit(:ip_event_rate_limited, traffic_class) do + emit_rate_limit_hit(:event_ingest_per_ip, traffic_class) + end + + defp maybe_emit_rate_limit_hit(:relay_event_rate_limited, traffic_class) do + emit_rate_limit_hit(:event_ingest_relay, traffic_class) + end + + defp maybe_emit_rate_limit_hit(:subscription_limit_reached, traffic_class) do + emit_rate_limit_hit(:subscriptions_per_connection, traffic_class) + end + + defp maybe_emit_rate_limit_hit(:outbound_queue_overflow, traffic_class) do + emit_rate_limit_hit(:outbound_queue, traffic_class) + end + + defp maybe_emit_rate_limit_hit(_reason, _traffic_class), do: :ok + + defp emit_rate_limit_hit(scope, traffic_class) do + Telemetry.emit( + [:parrhesia, :rate_limit, :hit], + %{count: 1}, + %{scope: scope, traffic_class: traffic_class} + ) + end end diff --git a/test/parrhesia/application_test.exs b/test/parrhesia/application_test.exs index fdcf44e..bceae88 100644 --- a/test/parrhesia/application_test.exs +++ b/test/parrhesia/application_test.exs @@ -6,6 +6,7 @@ defmodule Parrhesia.ApplicationTest do test "starts the core supervision tree" do assert is_pid(Process.whereis(Parrhesia.Supervisor)) assert is_pid(Process.whereis(Parrhesia.Telemetry)) + assert is_pid(Process.whereis(Parrhesia.ConnectionStats)) assert is_pid(Process.whereis(Parrhesia.Config)) assert is_pid(Process.whereis(Parrhesia.Web.EventIngestLimiter)) assert is_pid(Process.whereis(Parrhesia.Web.IPEventIngestLimiter)) diff --git a/test/parrhesia/fault_injection_group_flow_test.exs b/test/parrhesia/fault_injection_group_flow_test.exs index c4a070e..07d7f9a 100644 --- a/test/parrhesia/fault_injection_group_flow_test.exs +++ b/test/parrhesia/fault_injection_group_flow_test.exs @@ -28,7 +28,7 @@ defmodule Parrhesia.FaultInjectionGroupFlowTest do test "kind 445 commit recovers cleanly after storage outage", %{ previous_storage: previous_storage } do - {:ok, state} = Connection.init(subscription_index: nil) + {:ok, state} = Connection.init(subscription_index: nil, track_population?: false) group_event = build_event(%{ @@ -62,7 +62,7 @@ defmodule Parrhesia.FaultInjectionGroupFlowTest do test "reordered group flow remains deterministic after outage recovery", %{ previous_storage: previous_storage } do - {:ok, state} = Connection.init(subscription_index: nil) + {:ok, state} = Connection.init(subscription_index: nil, track_population?: false) group_id = String.duplicate("b", 64) now = System.system_time(:second) diff --git a/test/parrhesia/fault_injection_test.exs b/test/parrhesia/fault_injection_test.exs index 5d976ac..b4acf55 100644 --- a/test/parrhesia/fault_injection_test.exs +++ b/test/parrhesia/fault_injection_test.exs @@ -26,7 +26,7 @@ defmodule Parrhesia.FaultInjectionTest do end test "EVENT responds with error prefix when storage is unavailable" do - {:ok, state} = Connection.init(subscription_index: nil) + {:ok, state} = Connection.init(subscription_index: nil, track_population?: false) event = valid_event() assert {:push, {:text, response}, _next_state} = @@ -36,7 +36,7 @@ defmodule Parrhesia.FaultInjectionTest do end test "REQ closes with storage error when query fails" do - {:ok, state} = Connection.init(subscription_index: nil) + {:ok, state} = Connection.init(subscription_index: nil, track_population?: false) payload = JSON.encode!(["REQ", "sub-db-down", %{"kinds" => [1]}]) assert {:push, {:text, response}, ^state} = diff --git a/test/parrhesia/performance/load_soak_test.exs b/test/parrhesia/performance/load_soak_test.exs index 1a3926c..6ed4f4b 100644 --- a/test/parrhesia/performance/load_soak_test.exs +++ b/test/parrhesia/performance/load_soak_test.exs @@ -6,7 +6,12 @@ defmodule Parrhesia.Performance.LoadSoakTest do @tag :performance test "fanout enqueue/drain stays within relaxed p95 budget" do {:ok, state} = - Connection.init(subscription_index: nil, max_outbound_queue: 10_000, trap_exit?: false) + Connection.init( + subscription_index: nil, + max_outbound_queue: 10_000, + trap_exit?: false, + track_population?: false + ) req_payload = JSON.encode!(["REQ", "sub-load", %{"kinds" => [1]}]) diff --git a/test/parrhesia/telemetry_test.exs b/test/parrhesia/telemetry_test.exs index 85dd8ee..f7f527f 100644 --- a/test/parrhesia/telemetry_test.exs +++ b/test/parrhesia/telemetry_test.exs @@ -9,10 +9,25 @@ defmodule Parrhesia.TelemetryTest do assert [:parrhesia, :ingest, :duration, :ms] in metric_names assert [:parrhesia, :query, :duration, :ms] in metric_names assert [:parrhesia, :fanout, :duration, :ms] in metric_names + assert [:parrhesia, :fanout, :events_enqueued, :count] in metric_names + assert [:parrhesia, :ingest, :events, :count] in metric_names + assert [:parrhesia, :query, :requests, :count] in metric_names + assert [:parrhesia, :query, :results, :count] in metric_names assert [:parrhesia, :connection, :outbound_queue, :depth] in metric_names + assert [:parrhesia, :connection, :outbound_queue, :drained_frames, :count] in metric_names + assert [:parrhesia, :connection, :outbound_queue, :dropped_events, :count] in metric_names assert [:parrhesia, :connection, :outbound_queue, :pressure] in metric_names assert [:parrhesia, :connection, :outbound_queue, :pressure_events, :count] in metric_names + assert [:parrhesia, :listener, :connections, :active] in metric_names + assert [:parrhesia, :listener, :subscriptions, :active] in metric_names + assert [:parrhesia, :rate_limit, :hits, :count] in metric_names + assert [:parrhesia, :db, :query, :count] in metric_names assert [:parrhesia, :process, :mailbox, :depth] in metric_names + assert [:parrhesia, :maintenance, :purge_expired, :events, :count] in metric_names + + assert [:parrhesia, :maintenance, :partition_retention, :dropped_partitions, :count] in metric_names + + assert [:parrhesia, :vm, :memory, :binary, :bytes] in metric_names end test "emit/3 accepts traffic-class metadata" do diff --git a/test/parrhesia/web/conformance_test.exs b/test/parrhesia/web/conformance_test.exs index 0fb08d1..e86042c 100644 --- a/test/parrhesia/web/conformance_test.exs +++ b/test/parrhesia/web/conformance_test.exs @@ -6,7 +6,7 @@ defmodule Parrhesia.Web.ConformanceTest do alias Parrhesia.Web.Connection test "REQ -> EOSE emitted once and CLOSE emits CLOSED" do - {:ok, state} = Connection.init(subscription_index: nil) + {:ok, state} = Connection.init(subscription_index: nil, track_population?: false) req_payload = JSON.encode!(["REQ", "sub-e2e", %{"kinds" => [1]}]) @@ -26,7 +26,7 @@ defmodule Parrhesia.Web.ConformanceTest do end test "EVENT accepted path returns canonical OK frame" do - {:ok, state} = Connection.init(subscription_index: nil) + {:ok, state} = Connection.init(subscription_index: nil, track_population?: false) event = valid_event() @@ -37,7 +37,7 @@ defmodule Parrhesia.Web.ConformanceTest do end test "wrapped kind 1059 welcome delivery is recipient-gated" do - {:ok, state} = Connection.init(subscription_index: nil) + {:ok, state} = Connection.init(subscription_index: nil, track_population?: false) recipient = String.duplicate("9", 64) wrapped_welcome = @@ -90,7 +90,7 @@ defmodule Parrhesia.Web.ConformanceTest do end test "kind 445 commit ACK implies durable visibility before wrapped welcome ACK" do - {:ok, state} = Connection.init(subscription_index: nil) + {:ok, state} = Connection.init(subscription_index: nil, track_population?: false) commit_event = valid_event(%{ @@ -161,7 +161,7 @@ defmodule Parrhesia.Web.ConformanceTest do Application.put_env(:parrhesia, :policies, previous_policies) end) - {:ok, state} = Connection.init(subscription_index: nil) + {:ok, state} = Connection.init(subscription_index: nil, track_population?: false) relay_list_event = valid_event(%{ diff --git a/test/parrhesia/web/connection_nip43_test.exs b/test/parrhesia/web/connection_nip43_test.exs index 7cb1712..f6f2426 100644 --- a/test/parrhesia/web/connection_nip43_test.exs +++ b/test/parrhesia/web/connection_nip43_test.exs @@ -251,7 +251,12 @@ defmodule Parrhesia.Web.ConnectionNIP43Test do end defp connection_state(opts \\ []) do - {:ok, state} = Connection.init(Keyword.put_new(opts, :subscription_index, nil)) + opts = + opts + |> Keyword.put_new(:subscription_index, nil) + |> Keyword.put_new(:track_population?, false) + + {:ok, state} = Connection.init(opts) state end diff --git a/test/parrhesia/web/connection_test.exs b/test/parrhesia/web/connection_test.exs index d0f7af3..b050321 100644 --- a/test/parrhesia/web/connection_test.exs +++ b/test/parrhesia/web/connection_test.exs @@ -1003,6 +1003,7 @@ defmodule Parrhesia.Web.ConnectionTest do opts |> Keyword.put_new(:subscription_index, nil) |> Keyword.put_new(:trap_exit?, false) + |> Keyword.put_new(:track_population?, false) {:ok, state} = Connection.init(opts) state diff --git a/test/parrhesia/web/router_test.exs b/test/parrhesia/web/router_test.exs index d577ff8..e2d69a5 100644 --- a/test/parrhesia/web/router_test.exs +++ b/test/parrhesia/web/router_test.exs @@ -6,6 +6,8 @@ defmodule Parrhesia.Web.RouterTest do alias Parrhesia.API.Sync alias Parrhesia.Protocol.EventValidator + alias Parrhesia.Repo + alias Parrhesia.Telemetry alias Parrhesia.Web.Listener alias Parrhesia.Web.Router @@ -51,6 +53,47 @@ defmodule Parrhesia.Web.RouterTest do assert get_resp_header(conn, "content-type") == ["text/plain; charset=utf-8"] end + test "GET /metrics includes exported relay counters and gauges" do + Telemetry.emit( + [:parrhesia, :ingest, :result], + %{count: 1}, + %{traffic_class: :generic, outcome: :accepted, reason: :accepted} + ) + + Telemetry.emit( + [:parrhesia, :listener, :population], + %{connections: 2, subscriptions: 3}, + %{listener_id: :public} + ) + + Telemetry.emit( + [:parrhesia, :rate_limit, :hit], + %{count: 1}, + %{scope: :event_ingest_per_ip, traffic_class: :generic} + ) + + Telemetry.emit_vm_memory() + _ = Repo.query!("SELECT 1") + + conn = + conn(:get, "/metrics") + |> route_conn( + listener(%{ + features: %{metrics: %{enabled: true, access: %{private_networks_only: true}}} + }) + ) + + assert conn.status == 200 + assert String.contains?(conn.resp_body, "parrhesia_ingest_events_count") + assert String.contains?(conn.resp_body, "parrhesia_listener_connections_active") + assert String.contains?(conn.resp_body, "listener_id=\"public\"") + assert String.contains?(conn.resp_body, "parrhesia_rate_limit_hits_count") + assert String.contains?(conn.resp_body, "scope=\"event_ingest_per_ip\"") + assert String.contains?(conn.resp_body, "parrhesia_db_query_count") + assert String.contains?(conn.resp_body, "repo_role=\"write\"") + assert String.contains?(conn.resp_body, "parrhesia_vm_memory_binary_bytes") + end + test "GET /metrics denies public-network clients by default" do conn = conn(:get, "/metrics") conn = %{conn | remote_ip: {8, 8, 8, 8}} diff --git a/test/support/runtime.ex b/test/support/runtime.ex index 210725e..271b74f 100644 --- a/test/support/runtime.ex +++ b/test/support/runtime.ex @@ -29,6 +29,7 @@ defmodule Parrhesia.TestSupport.Runtime do defp required_processes do [ Parrhesia.Supervisor, + Parrhesia.ConnectionStats, Parrhesia.Config, Parrhesia.Subscriptions.Supervisor, Parrhesia.API.Stream.Supervisor,