diff --git a/PROGRESS_MARMOT.md b/PROGRESS_MARMOT.md index d4ee8c4..3d4d88a 100644 --- a/PROGRESS_MARMOT.md +++ b/PROGRESS_MARMOT.md @@ -54,8 +54,8 @@ Spec source: `~/marmot/README.md` + MIP-00..05. ## M7 — hardening + operations -- [ ] Add Marmot-focused telemetry breakdowns (ingest/query/fanout, queue pressure) -- [ ] Add query-plan regression checks for `#h` and `#i` heavy workloads -- [ ] Add fault-injection scenarios for relay outage/reordering behavior in group flows -- [ ] Add docs for operator limits tuned for Marmot traffic patterns -- [ ] Final `mix precommit` before merge +- [x] Add Marmot-focused telemetry breakdowns (ingest/query/fanout, queue pressure) +- [x] Add query-plan regression checks for `#h` and `#i` heavy workloads +- [x] Add fault-injection scenarios for relay outage/reordering behavior in group flows +- [x] Add docs for operator limits tuned for Marmot traffic patterns +- [x] Final `mix precommit` before merge diff --git a/docs/MARMOT_OPERATIONS.md b/docs/MARMOT_OPERATIONS.md new file mode 100644 index 0000000..1b37c2b --- /dev/null +++ b/docs/MARMOT_OPERATIONS.md @@ -0,0 +1,69 @@ +# Marmot operations guide (relay operator tuning) + +This document captures practical limits and operational defaults for Marmot-heavy traffic (`443`, `445`, `10051`, wrapped `1059`, optional media/push flows). + +## 1) Recommended baseline limits + +Use these as a starting point and tune from production telemetry. + +```elixir +config :parrhesia, + limits: [ + max_filter_limit: 500, + max_filters_per_req: 16, + max_outbound_queue: 256, + outbound_drain_batch_size: 64 + ], + policies: [ + # Marmot group routing/query guards + marmot_require_h_for_group_queries: true, + marmot_group_max_h_values_per_filter: 32, + marmot_group_max_query_window_seconds: 2_592_000, + + # Kind 445 retention + mls_group_event_ttl_seconds: 300, + + # MIP-04 metadata controls + marmot_media_max_imeta_tags_per_event: 8, + marmot_media_max_field_value_bytes: 1024, + marmot_media_max_url_bytes: 2048, + marmot_media_allowed_mime_prefixes: [], + marmot_media_reject_mip04_v1: true, + + # MIP-05 push controls (optional) + marmot_push_server_pubkeys: [], + marmot_push_max_relay_tags: 16, + marmot_push_max_payload_bytes: 65_536, + marmot_push_max_trigger_age_seconds: 120, + marmot_push_require_expiration: true, + marmot_push_max_expiration_window_seconds: 120, + marmot_push_max_server_recipients: 1 + ] +``` + +## 2) Index expectations for Marmot workloads + +The Postgres adapter relies on dedicated partial tag indexes for hot Marmot selectors: + +- `event_tags_h_value_created_at_idx` for `#h` group routing +- `event_tags_i_value_created_at_idx` for `#i` keypackage reference lookups + +Query-plan regression tests assert these paths remain usable for heavy workloads. + +## 3) Telemetry to watch + +Key metrics for Marmot traffic and pressure: + +- `parrhesia.ingest.duration.ms{traffic_class="marmot|generic"}` +- `parrhesia.query.duration.ms{traffic_class="marmot|generic"}` +- `parrhesia.fanout.duration.ms{traffic_class="marmot|generic"}` +- `parrhesia.connection.outbound_queue.depth{traffic_class=...}` +- `parrhesia.connection.outbound_queue.pressure{traffic_class=...}` +- `parrhesia.connection.outbound_queue.pressure_events.count{traffic_class=...}` +- `parrhesia.connection.outbound_queue.overflow.count{traffic_class=...}` + +Operational target: keep queue pressure below sustained 0.75 and avoid overflow spikes during `445` bursts. + +## 4) Fault and recovery expectations + +During storage outages, Marmot group-flow writes must fail with explicit `OK false` errors. After recovery, reordered group events should still query deterministically by `created_at DESC, id ASC`. diff --git a/lib/parrhesia/telemetry.ex b/lib/parrhesia/telemetry.ex index a092952..3af231f 100644 --- a/lib/parrhesia/telemetry.ex +++ b/lib/parrhesia/telemetry.ex @@ -34,28 +34,51 @@ defmodule Parrhesia.Telemetry do event_name: [:parrhesia, :ingest, :stop], measurement: :duration, unit: {:native, :millisecond}, + tags: [:traffic_class], + tag_values: &traffic_class_tag_values/1, reporter_options: [buckets: [1, 5, 10, 25, 50, 100, 250, 500, 1000]] ), distribution("parrhesia.query.duration.ms", event_name: [:parrhesia, :query, :stop], measurement: :duration, unit: {:native, :millisecond}, + tags: [:traffic_class], + tag_values: &traffic_class_tag_values/1, reporter_options: [buckets: [1, 5, 10, 25, 50, 100, 250, 500, 1000]] ), distribution("parrhesia.fanout.duration.ms", event_name: [:parrhesia, :fanout, :stop], measurement: :duration, unit: {:native, :millisecond}, + tags: [:traffic_class], + tag_values: &traffic_class_tag_values/1, reporter_options: [buckets: [1, 5, 10, 25, 50, 100, 250, 500, 1000]] ), last_value("parrhesia.connection.outbound_queue.depth", event_name: [:parrhesia, :connection, :outbound_queue], measurement: :depth, + tags: [:traffic_class], + tag_values: &traffic_class_tag_values/1, reporter_options: [prometheus_type: :gauge] ), + last_value("parrhesia.connection.outbound_queue.pressure", + event_name: [:parrhesia, :connection, :outbound_queue], + measurement: :pressure, + tags: [:traffic_class], + tag_values: &traffic_class_tag_values/1, + reporter_options: [prometheus_type: :gauge] + ), + counter("parrhesia.connection.outbound_queue.pressure_events.count", + event_name: [:parrhesia, :connection, :outbound_queue, :pressure], + measurement: :count, + tags: [:traffic_class], + tag_values: &traffic_class_tag_values/1 + ), counter("parrhesia.connection.outbound_queue.overflow.count", event_name: [:parrhesia, :connection, :outbound_queue, :overflow], - measurement: :count + measurement: :count, + tags: [:traffic_class], + tag_values: &traffic_class_tag_values/1 ), last_value("parrhesia.vm.memory.total.bytes", event_name: [:parrhesia, :vm, :memory], @@ -83,4 +106,9 @@ defmodule Parrhesia.Telemetry do total = :erlang.memory(:total) emit([:parrhesia, :vm, :memory], %{total: total}, %{}) end + + defp traffic_class_tag_values(metadata) do + traffic_class = metadata |> Map.get(:traffic_class, :generic) |> to_string() + %{traffic_class: traffic_class} + end end diff --git a/lib/parrhesia/web/connection.ex b/lib/parrhesia/web/connection.ex index 32ac5f8..a40ec87 100644 --- a/lib/parrhesia/web/connection.ex +++ b/lib/parrhesia/web/connection.ex @@ -21,6 +21,20 @@ defmodule Parrhesia.Web.Connection do @default_outbound_drain_batch_size 64 @default_outbound_overflow_strategy :close @drain_outbound_queue :drain_outbound_queue + @outbound_queue_pressure_threshold 0.75 + + @marmot_kinds MapSet.new([ + 443, + 444, + 445, + 1059, + 10_050, + 10_051, + 446, + 447, + 448, + 449 + ]) defstruct subscriptions: %{}, authenticated_pubkeys: MapSet.new(), @@ -173,7 +187,7 @@ defmodule Parrhesia.Web.Connection do Telemetry.emit( [:parrhesia, :ingest, :stop], %{duration: System.monotonic_time() - started_at}, - %{} + telemetry_metadata_for_event(event) ) fanout_event(event) @@ -205,7 +219,7 @@ defmodule Parrhesia.Web.Connection do Telemetry.emit( [:parrhesia, :query, :stop], %{duration: System.monotonic_time() - started_at}, - %{} + telemetry_metadata_for_filters(filters) ) frames = @@ -292,7 +306,7 @@ defmodule Parrhesia.Web.Connection do Telemetry.emit( [:parrhesia, :query, :stop], %{duration: System.monotonic_time() - started_at}, - %{} + telemetry_metadata_for_filters(filters) ) response = Protocol.encode_relay({:count, subscription_id, payload}) @@ -485,6 +499,61 @@ defmodule Parrhesia.Web.Connection do |> Base.encode64() end + defp telemetry_metadata_for_event(event) do + %{traffic_class: traffic_class_for_event(event)} + end + + defp telemetry_metadata_for_filters(filters) do + %{traffic_class: traffic_class_for_filters(filters)} + end + + defp telemetry_metadata_for_fanout_events(fanout_events) do + traffic_class = + if Enum.any?(fanout_events, fn + {_subscription_id, event} when is_map(event) -> + traffic_class_for_event(event) == :marmot + + _other -> + false + end) do + :marmot + else + :generic + end + + %{traffic_class: traffic_class} + end + + defp traffic_class_for_filters(filters) do + if Enum.any?(filters, &marmot_filter?/1) do + :marmot + else + :generic + end + end + + defp marmot_filter?(filter) when is_map(filter) do + has_marmot_kind? = + case Map.get(filter, "kinds") do + kinds when is_list(kinds) -> Enum.any?(kinds, &MapSet.member?(@marmot_kinds, &1)) + _other -> false + end + + has_marmot_kind? or Map.has_key?(filter, "#h") or Map.has_key?(filter, "#i") + end + + defp marmot_filter?(_filter), do: false + + defp traffic_class_for_event(event) when is_map(event) do + if MapSet.member?(@marmot_kinds, Map.get(event, "kind")) do + :marmot + else + :generic + end + end + + defp traffic_class_for_event(_event), do: :generic + defp restricted_close(state, subscription_id, reason) do response = Protocol.encode_relay({:closed, subscription_id, reason}) with_auth_challenge_frame(state, {:push, {:text, response}, state}) @@ -632,19 +701,25 @@ defmodule Parrhesia.Web.Connection do defp handle_fanout_events(%__MODULE__{} = state, fanout_events) do started_at = System.monotonic_time() + telemetry_metadata = telemetry_metadata_for_fanout_events(fanout_events) case enqueue_fanout_events(state, fanout_events) do {:ok, next_state} -> Telemetry.emit( [:parrhesia, :fanout, :stop], %{duration: System.monotonic_time() - started_at}, - %{} + telemetry_metadata ) {:ok, maybe_schedule_drain(next_state)} {:close, next_state} -> - Telemetry.emit([:parrhesia, :connection, :outbound_queue, :overflow], %{count: 1}, %{}) + Telemetry.emit( + [:parrhesia, :connection, :outbound_queue, :overflow], + %{count: 1}, + telemetry_metadata + ) + close_with_outbound_overflow(next_state) end end @@ -671,7 +746,7 @@ defmodule Parrhesia.Web.Connection do defp maybe_enqueue_fanout_event(state, subscription_id, event) do if subscription_matches?(state, subscription_id, event) do - enqueue_outbound(state, {subscription_id, event}) + enqueue_outbound(state, {subscription_id, event}, traffic_class_for_event(event)) else {:ok, state} end @@ -687,7 +762,8 @@ defmodule Parrhesia.Web.Connection do defp enqueue_outbound( %__MODULE__{outbound_queue_size: queue_size, max_outbound_queue: max_outbound_queue} = state, - queue_entry + queue_entry, + traffic_class ) when queue_size < max_outbound_queue do next_state = @@ -697,29 +773,37 @@ defmodule Parrhesia.Web.Connection do outbound_queue_size: queue_size + 1 } - emit_outbound_queue_depth(next_state.outbound_queue_size) + emit_outbound_queue_depth(next_state, %{traffic_class: traffic_class}) {:ok, next_state} end defp enqueue_outbound( %__MODULE__{outbound_overflow_strategy: :drop_newest} = state, - _queue_entry + _queue_entry, + _traffic_class ), do: {:ok, state} defp enqueue_outbound( %__MODULE__{outbound_overflow_strategy: :drop_oldest} = state, - queue_entry + queue_entry, + traffic_class ) do {next_queue, next_size} = drop_oldest_and_enqueue(state.outbound_queue, state.outbound_queue_size, queue_entry) - emit_outbound_queue_depth(next_size) - {:ok, %__MODULE__{state | outbound_queue: next_queue, outbound_queue_size: next_size}} + next_state = %__MODULE__{state | outbound_queue: next_queue, outbound_queue_size: next_size} + + emit_outbound_queue_depth(next_state, %{traffic_class: traffic_class}) + {:ok, next_state} end - defp enqueue_outbound(%__MODULE__{outbound_overflow_strategy: :close} = state, _queue_entry), - do: {:close, state} + defp enqueue_outbound( + %__MODULE__{outbound_overflow_strategy: :close} = state, + _queue_entry, + _traffic_class + ), + do: {:close, state} defp drop_oldest_and_enqueue(queue, queue_size, queue_entry) when queue_size > 0 do {_dropped, truncated_queue} = :queue.out(queue) @@ -748,7 +832,7 @@ defmodule Parrhesia.Web.Connection do } |> maybe_schedule_drain() - emit_outbound_queue_depth(remaining_size) + emit_outbound_queue_depth(next_state) {Enum.reverse(frames), next_state} end @@ -779,8 +863,29 @@ defmodule Parrhesia.Web.Connection do %__MODULE__{state | drain_scheduled?: true} end - defp emit_outbound_queue_depth(depth) do - Telemetry.emit([:parrhesia, :connection, :outbound_queue], %{depth: depth}, %{}) + defp emit_outbound_queue_depth(state, metadata \\ %{}) do + depth = state.outbound_queue_size + + pressure = + if state.max_outbound_queue > 0 do + depth / state.max_outbound_queue + else + 0.0 + end + + Telemetry.emit( + [:parrhesia, :connection, :outbound_queue], + %{depth: depth, pressure: pressure}, + metadata + ) + + if pressure >= @outbound_queue_pressure_threshold do + Telemetry.emit( + [:parrhesia, :connection, :outbound_queue, :pressure], + %{count: 1, pressure: pressure}, + metadata + ) + end end defp upsert_subscription(%__MODULE__{} = state, subscription_id, filters) do @@ -830,7 +935,7 @@ defmodule Parrhesia.Web.Connection do outbound_queue_size: length(filtered_entries) } - emit_outbound_queue_depth(next_state.outbound_queue_size) + emit_outbound_queue_depth(next_state) next_state end diff --git a/test/parrhesia/fault_injection_group_flow_test.exs b/test/parrhesia/fault_injection_group_flow_test.exs new file mode 100644 index 0000000..913e5fc --- /dev/null +++ b/test/parrhesia/fault_injection_group_flow_test.exs @@ -0,0 +1,143 @@ +defmodule Parrhesia.FaultInjectionGroupFlowTest do + use ExUnit.Case, async: false + + alias Ecto.Adapters.SQL.Sandbox + alias Parrhesia.Protocol.EventValidator + alias Parrhesia.Repo + alias Parrhesia.Storage + alias Parrhesia.TestSupport.FailingEvents + alias Parrhesia.TestSupport.PermissiveModeration + alias Parrhesia.Web.Connection + + setup do + :ok = Sandbox.checkout(Repo) + + previous_storage = Application.get_env(:parrhesia, :storage, []) + + Application.put_env( + :parrhesia, + :storage, + previous_storage + |> Keyword.put(:events, FailingEvents) + |> Keyword.put(:moderation, PermissiveModeration) + ) + + on_exit(fn -> + Application.put_env(:parrhesia, :storage, previous_storage) + end) + + %{previous_storage: previous_storage} + end + + test "kind 445 commit recovers cleanly after storage outage", %{ + previous_storage: previous_storage + } do + {:ok, state} = Connection.init(subscription_index: nil) + + group_event = + build_event(%{ + "kind" => 445, + "tags" => [["h", String.duplicate("a", 64)]], + "content" => Base.encode64("commit") + }) + + payload = Jason.encode!(["EVENT", group_event]) + + assert {:push, {:text, error_response}, ^state} = + Connection.handle_in({payload, [opcode: :text]}, state) + + assert Jason.decode!(error_response) == ["OK", group_event["id"], false, "error: :db_down"] + + Application.put_env( + :parrhesia, + :storage, + previous_storage |> Keyword.put(:moderation, PermissiveModeration) + ) + + assert {:push, {:text, ok_response}, ^state} = + Connection.handle_in({payload, [opcode: :text]}, state) + + assert Jason.decode!(ok_response) == ["OK", group_event["id"], true, "ok: event stored"] + + assert {:ok, persisted_group_event} = Storage.events().get_event(%{}, group_event["id"]) + assert persisted_group_event["id"] == group_event["id"] + end + + test "reordered group flow remains deterministic after outage recovery", %{ + previous_storage: previous_storage + } do + {:ok, state} = Connection.init(subscription_index: nil) + + group_id = String.duplicate("b", 64) + now = System.system_time(:second) + + older_event = + build_event(%{ + "created_at" => now - 10, + "kind" => 445, + "tags" => [["h", group_id]], + "content" => Base.encode64("older") + }) + + newer_event = + build_event(%{ + "created_at" => now - 5, + "kind" => 445, + "tags" => [["h", group_id]], + "content" => Base.encode64("newer") + }) + + assert {:push, {:text, outage_response}, ^state} = + Connection.handle_in( + {Jason.encode!(["EVENT", older_event]), [opcode: :text]}, + state + ) + + assert Jason.decode!(outage_response) == ["OK", older_event["id"], false, "error: :db_down"] + + Application.put_env( + :parrhesia, + :storage, + previous_storage |> Keyword.put(:moderation, PermissiveModeration) + ) + + assert {:push, {:text, newer_response}, ^state} = + Connection.handle_in( + {Jason.encode!(["EVENT", newer_event]), [opcode: :text]}, + state + ) + + assert Jason.decode!(newer_response) == ["OK", newer_event["id"], true, "ok: event stored"] + + assert {:push, {:text, older_response}, ^state} = + Connection.handle_in( + {Jason.encode!(["EVENT", older_event]), [opcode: :text]}, + state + ) + + assert Jason.decode!(older_response) == ["OK", older_event["id"], true, "ok: event stored"] + + assert {:ok, results} = + Storage.events().query( + %{}, + [%{"kinds" => [445], "#h" => [group_id]}], + now: now + 1 + ) + + assert Enum.map(results, & &1["id"]) == [newer_event["id"], older_event["id"]] + end + + defp build_event(overrides) do + base_event = %{ + "pubkey" => String.duplicate("1", 64), + "created_at" => System.system_time(:second), + "kind" => 1, + "tags" => [], + "content" => "fault-group", + "sig" => String.duplicate("2", 128) + } + + event = Map.merge(base_event, overrides) + Map.put(event, "id", EventValidator.compute_id(event)) + end +end diff --git a/test/parrhesia/storage/adapters/postgres/query_plan_regression_test.exs b/test/parrhesia/storage/adapters/postgres/query_plan_regression_test.exs new file mode 100644 index 0000000..f583020 --- /dev/null +++ b/test/parrhesia/storage/adapters/postgres/query_plan_regression_test.exs @@ -0,0 +1,135 @@ +defmodule Parrhesia.Storage.Adapters.Postgres.QueryPlanRegressionTest do + use ExUnit.Case, async: false + + alias Ecto.Adapters.SQL.Sandbox + alias Parrhesia.Protocol.EventValidator + alias Parrhesia.Repo + alias Parrhesia.Storage.Adapters.Postgres.Events + + setup_all do + if is_nil(Process.whereis(Repo)) do + start_supervised!(Repo) + end + + Sandbox.mode(Repo, :manual) + :ok + end + + setup do + :ok = Sandbox.checkout(Repo) + :ok = Repo.query!("SET enable_seqscan TO off") |> then(fn _ -> :ok end) + end + + test "#h-heavy query plan uses dedicated event_tags h index" do + group_id = String.duplicate("a", 64) + + Enum.each(1..150, fn idx -> + persist_event(%{ + "kind" => 445, + "created_at" => 1_700_010_000 + idx, + "tags" => [["h", group_id]], + "content" => Base.encode64("group-#{idx}") + }) + end) + + Enum.each(1..50, fn idx -> + persist_event(%{ + "kind" => 445, + "created_at" => 1_700_020_000 + idx, + "tags" => [["h", String.duplicate("b", 64)]], + "content" => Base.encode64("other-#{idx}") + }) + end) + + explain = + Repo.query!( + """ + EXPLAIN (FORMAT TEXT) + SELECT e.id + FROM events e + WHERE e.kind = 445 + AND e.deleted_at IS NULL + AND EXISTS ( + SELECT 1 + FROM event_tags t + WHERE t.event_created_at = e.created_at + AND t.event_id = e.id + AND t.name = 'h' + AND t.value = $1 + ) + ORDER BY e.created_at DESC, e.id ASC + LIMIT 100 + """, + [group_id] + ) + + plan = Enum.map_join(explain.rows, "\n", &hd/1) + assert plan =~ "event_tags_h_value_created_at_idx" + end + + test "#i-heavy query plan uses dedicated event_tags i index" do + keypackage_ref = String.duplicate("c", 64) + + Enum.each(1..120, fn idx -> + persist_event(%{ + "kind" => 443, + "created_at" => 1_700_030_000 + idx, + "tags" => [["i", keypackage_ref], ["encoding", "base64"]], + "content" => Base.encode64("keypackage-#{idx}") + }) + end) + + Enum.each(1..40, fn idx -> + persist_event(%{ + "kind" => 443, + "created_at" => 1_700_040_000 + idx, + "tags" => [["i", String.duplicate("d", 64)], ["encoding", "base64"]], + "content" => Base.encode64("other-#{idx}") + }) + end) + + explain = + Repo.query!( + """ + EXPLAIN (FORMAT TEXT) + SELECT e.id + FROM events e + WHERE e.kind = 443 + AND e.deleted_at IS NULL + AND EXISTS ( + SELECT 1 + FROM event_tags t + WHERE t.event_created_at = e.created_at + AND t.event_id = e.id + AND t.name = 'i' + AND t.value = $1 + ) + ORDER BY e.created_at DESC, e.id ASC + LIMIT 100 + """, + [keypackage_ref] + ) + + plan = Enum.map_join(explain.rows, "\n", &hd/1) + assert plan =~ "event_tags_i_value_created_at_idx" + end + + defp persist_event(overrides) do + event = build_event(overrides) + assert {:ok, _persisted} = Events.put_event(%{}, event) + end + + defp build_event(overrides) do + base_event = %{ + "pubkey" => String.duplicate("7", 64), + "created_at" => System.system_time(:second), + "kind" => 1, + "tags" => [], + "content" => "query-plan-#{System.unique_integer([:positive])}", + "sig" => String.duplicate("8", 128) + } + + event = Map.merge(base_event, overrides) + Map.put(event, "id", EventValidator.compute_id(event)) + end +end diff --git a/test/parrhesia/telemetry_test.exs b/test/parrhesia/telemetry_test.exs new file mode 100644 index 0000000..1351091 --- /dev/null +++ b/test/parrhesia/telemetry_test.exs @@ -0,0 +1,25 @@ +defmodule Parrhesia.TelemetryTest do + use ExUnit.Case, async: true + + alias Parrhesia.Telemetry + + test "exposes Marmot-focused telemetry metrics" do + metric_names = Enum.map(Telemetry.metrics(), & &1.name) + + assert [:parrhesia, :ingest, :duration, :ms] in metric_names + assert [:parrhesia, :query, :duration, :ms] in metric_names + assert [:parrhesia, :fanout, :duration, :ms] in metric_names + assert [:parrhesia, :connection, :outbound_queue, :depth] in metric_names + assert [:parrhesia, :connection, :outbound_queue, :pressure] in metric_names + assert [:parrhesia, :connection, :outbound_queue, :pressure_events, :count] in metric_names + end + + test "emit/3 accepts traffic-class metadata" do + assert :ok = + Telemetry.emit( + [:parrhesia, :ingest, :stop], + %{duration: 1}, + %{traffic_class: :marmot} + ) + end +end