Implement M7 Marmot hardening telemetry and ops checks

This commit is contained in:
2026-03-13 22:40:36 +01:00
parent 99983bbb32
commit f2a6ab5150
7 changed files with 529 additions and 24 deletions

View File

@@ -34,28 +34,51 @@ defmodule Parrhesia.Telemetry do
event_name: [:parrhesia, :ingest, :stop],
measurement: :duration,
unit: {:native, :millisecond},
tags: [:traffic_class],
tag_values: &traffic_class_tag_values/1,
reporter_options: [buckets: [1, 5, 10, 25, 50, 100, 250, 500, 1000]]
),
distribution("parrhesia.query.duration.ms",
event_name: [:parrhesia, :query, :stop],
measurement: :duration,
unit: {:native, :millisecond},
tags: [:traffic_class],
tag_values: &traffic_class_tag_values/1,
reporter_options: [buckets: [1, 5, 10, 25, 50, 100, 250, 500, 1000]]
),
distribution("parrhesia.fanout.duration.ms",
event_name: [:parrhesia, :fanout, :stop],
measurement: :duration,
unit: {:native, :millisecond},
tags: [:traffic_class],
tag_values: &traffic_class_tag_values/1,
reporter_options: [buckets: [1, 5, 10, 25, 50, 100, 250, 500, 1000]]
),
last_value("parrhesia.connection.outbound_queue.depth",
event_name: [:parrhesia, :connection, :outbound_queue],
measurement: :depth,
tags: [:traffic_class],
tag_values: &traffic_class_tag_values/1,
reporter_options: [prometheus_type: :gauge]
),
last_value("parrhesia.connection.outbound_queue.pressure",
event_name: [:parrhesia, :connection, :outbound_queue],
measurement: :pressure,
tags: [:traffic_class],
tag_values: &traffic_class_tag_values/1,
reporter_options: [prometheus_type: :gauge]
),
counter("parrhesia.connection.outbound_queue.pressure_events.count",
event_name: [:parrhesia, :connection, :outbound_queue, :pressure],
measurement: :count,
tags: [:traffic_class],
tag_values: &traffic_class_tag_values/1
),
counter("parrhesia.connection.outbound_queue.overflow.count",
event_name: [:parrhesia, :connection, :outbound_queue, :overflow],
measurement: :count
measurement: :count,
tags: [:traffic_class],
tag_values: &traffic_class_tag_values/1
),
last_value("parrhesia.vm.memory.total.bytes",
event_name: [:parrhesia, :vm, :memory],
@@ -83,4 +106,9 @@ defmodule Parrhesia.Telemetry do
total = :erlang.memory(:total)
emit([:parrhesia, :vm, :memory], %{total: total}, %{})
end
defp traffic_class_tag_values(metadata) do
traffic_class = metadata |> Map.get(:traffic_class, :generic) |> to_string()
%{traffic_class: traffic_class}
end
end

View File

@@ -21,6 +21,20 @@ defmodule Parrhesia.Web.Connection do
@default_outbound_drain_batch_size 64
@default_outbound_overflow_strategy :close
@drain_outbound_queue :drain_outbound_queue
@outbound_queue_pressure_threshold 0.75
@marmot_kinds MapSet.new([
443,
444,
445,
1059,
10_050,
10_051,
446,
447,
448,
449
])
defstruct subscriptions: %{},
authenticated_pubkeys: MapSet.new(),
@@ -173,7 +187,7 @@ defmodule Parrhesia.Web.Connection do
Telemetry.emit(
[:parrhesia, :ingest, :stop],
%{duration: System.monotonic_time() - started_at},
%{}
telemetry_metadata_for_event(event)
)
fanout_event(event)
@@ -205,7 +219,7 @@ defmodule Parrhesia.Web.Connection do
Telemetry.emit(
[:parrhesia, :query, :stop],
%{duration: System.monotonic_time() - started_at},
%{}
telemetry_metadata_for_filters(filters)
)
frames =
@@ -292,7 +306,7 @@ defmodule Parrhesia.Web.Connection do
Telemetry.emit(
[:parrhesia, :query, :stop],
%{duration: System.monotonic_time() - started_at},
%{}
telemetry_metadata_for_filters(filters)
)
response = Protocol.encode_relay({:count, subscription_id, payload})
@@ -485,6 +499,61 @@ defmodule Parrhesia.Web.Connection do
|> Base.encode64()
end
defp telemetry_metadata_for_event(event) do
%{traffic_class: traffic_class_for_event(event)}
end
defp telemetry_metadata_for_filters(filters) do
%{traffic_class: traffic_class_for_filters(filters)}
end
defp telemetry_metadata_for_fanout_events(fanout_events) do
traffic_class =
if Enum.any?(fanout_events, fn
{_subscription_id, event} when is_map(event) ->
traffic_class_for_event(event) == :marmot
_other ->
false
end) do
:marmot
else
:generic
end
%{traffic_class: traffic_class}
end
defp traffic_class_for_filters(filters) do
if Enum.any?(filters, &marmot_filter?/1) do
:marmot
else
:generic
end
end
defp marmot_filter?(filter) when is_map(filter) do
has_marmot_kind? =
case Map.get(filter, "kinds") do
kinds when is_list(kinds) -> Enum.any?(kinds, &MapSet.member?(@marmot_kinds, &1))
_other -> false
end
has_marmot_kind? or Map.has_key?(filter, "#h") or Map.has_key?(filter, "#i")
end
defp marmot_filter?(_filter), do: false
defp traffic_class_for_event(event) when is_map(event) do
if MapSet.member?(@marmot_kinds, Map.get(event, "kind")) do
:marmot
else
:generic
end
end
defp traffic_class_for_event(_event), do: :generic
defp restricted_close(state, subscription_id, reason) do
response = Protocol.encode_relay({:closed, subscription_id, reason})
with_auth_challenge_frame(state, {:push, {:text, response}, state})
@@ -632,19 +701,25 @@ defmodule Parrhesia.Web.Connection do
defp handle_fanout_events(%__MODULE__{} = state, fanout_events) do
started_at = System.monotonic_time()
telemetry_metadata = telemetry_metadata_for_fanout_events(fanout_events)
case enqueue_fanout_events(state, fanout_events) do
{:ok, next_state} ->
Telemetry.emit(
[:parrhesia, :fanout, :stop],
%{duration: System.monotonic_time() - started_at},
%{}
telemetry_metadata
)
{:ok, maybe_schedule_drain(next_state)}
{:close, next_state} ->
Telemetry.emit([:parrhesia, :connection, :outbound_queue, :overflow], %{count: 1}, %{})
Telemetry.emit(
[:parrhesia, :connection, :outbound_queue, :overflow],
%{count: 1},
telemetry_metadata
)
close_with_outbound_overflow(next_state)
end
end
@@ -671,7 +746,7 @@ defmodule Parrhesia.Web.Connection do
defp maybe_enqueue_fanout_event(state, subscription_id, event) do
if subscription_matches?(state, subscription_id, event) do
enqueue_outbound(state, {subscription_id, event})
enqueue_outbound(state, {subscription_id, event}, traffic_class_for_event(event))
else
{:ok, state}
end
@@ -687,7 +762,8 @@ defmodule Parrhesia.Web.Connection do
defp enqueue_outbound(
%__MODULE__{outbound_queue_size: queue_size, max_outbound_queue: max_outbound_queue} =
state,
queue_entry
queue_entry,
traffic_class
)
when queue_size < max_outbound_queue do
next_state =
@@ -697,29 +773,37 @@ defmodule Parrhesia.Web.Connection do
outbound_queue_size: queue_size + 1
}
emit_outbound_queue_depth(next_state.outbound_queue_size)
emit_outbound_queue_depth(next_state, %{traffic_class: traffic_class})
{:ok, next_state}
end
defp enqueue_outbound(
%__MODULE__{outbound_overflow_strategy: :drop_newest} = state,
_queue_entry
_queue_entry,
_traffic_class
),
do: {:ok, state}
defp enqueue_outbound(
%__MODULE__{outbound_overflow_strategy: :drop_oldest} = state,
queue_entry
queue_entry,
traffic_class
) do
{next_queue, next_size} =
drop_oldest_and_enqueue(state.outbound_queue, state.outbound_queue_size, queue_entry)
emit_outbound_queue_depth(next_size)
{:ok, %__MODULE__{state | outbound_queue: next_queue, outbound_queue_size: next_size}}
next_state = %__MODULE__{state | outbound_queue: next_queue, outbound_queue_size: next_size}
emit_outbound_queue_depth(next_state, %{traffic_class: traffic_class})
{:ok, next_state}
end
defp enqueue_outbound(%__MODULE__{outbound_overflow_strategy: :close} = state, _queue_entry),
do: {:close, state}
defp enqueue_outbound(
%__MODULE__{outbound_overflow_strategy: :close} = state,
_queue_entry,
_traffic_class
),
do: {:close, state}
defp drop_oldest_and_enqueue(queue, queue_size, queue_entry) when queue_size > 0 do
{_dropped, truncated_queue} = :queue.out(queue)
@@ -748,7 +832,7 @@ defmodule Parrhesia.Web.Connection do
}
|> maybe_schedule_drain()
emit_outbound_queue_depth(remaining_size)
emit_outbound_queue_depth(next_state)
{Enum.reverse(frames), next_state}
end
@@ -779,8 +863,29 @@ defmodule Parrhesia.Web.Connection do
%__MODULE__{state | drain_scheduled?: true}
end
defp emit_outbound_queue_depth(depth) do
Telemetry.emit([:parrhesia, :connection, :outbound_queue], %{depth: depth}, %{})
defp emit_outbound_queue_depth(state, metadata \\ %{}) do
depth = state.outbound_queue_size
pressure =
if state.max_outbound_queue > 0 do
depth / state.max_outbound_queue
else
0.0
end
Telemetry.emit(
[:parrhesia, :connection, :outbound_queue],
%{depth: depth, pressure: pressure},
metadata
)
if pressure >= @outbound_queue_pressure_threshold do
Telemetry.emit(
[:parrhesia, :connection, :outbound_queue, :pressure],
%{count: 1, pressure: pressure},
metadata
)
end
end
defp upsert_subscription(%__MODULE__{} = state, subscription_id, filters) do
@@ -830,7 +935,7 @@ defmodule Parrhesia.Web.Connection do
outbound_queue_size: length(filtered_entries)
}
emit_outbound_queue_depth(next_state.outbound_queue_size)
emit_outbound_queue_depth(next_state)
next_state
end