diff --git a/PROGRESS_MARMOT.md b/PROGRESS_MARMOT.md index 1331896..50c591d 100644 --- a/PROGRESS_MARMOT.md +++ b/PROGRESS_MARMOT.md @@ -35,10 +35,10 @@ Spec source: `~/marmot/README.md` + MIP-00..05. ## M4 — MIP-03 (group events) -- [ ] Enforce kind `445` envelope validation (`#h` tag presence/shape, base64 content shape) -- [ ] Keep relay MLS-agnostic (no MLS decrypt/inspect in relay hot path) -- [ ] Add configurable retention policy for kind `445` traffic -- [ ] Add tests for high-volume fanout behavior and deterministic query results +- [x] Enforce kind `445` envelope validation (`#h` tag presence/shape, base64 content shape) +- [x] Keep relay MLS-agnostic (no MLS decrypt/inspect in relay hot path) +- [x] Add configurable retention policy for kind `445` traffic +- [x] Add tests for high-volume fanout behavior and deterministic query results ## M5 — optional MIP-04 (encrypted media) diff --git a/lib/parrhesia/protocol/event_validator.ex b/lib/parrhesia/protocol/event_validator.ex index 1ef5426..b871d51 100644 --- a/lib/parrhesia/protocol/event_validator.ex +++ b/lib/parrhesia/protocol/event_validator.ex @@ -40,6 +40,9 @@ defmodule Parrhesia.Protocol.EventValidator do | :invalid_giftwrap_content | :missing_giftwrap_recipient_tag | :invalid_giftwrap_recipient_tag + | :missing_marmot_group_tag + | :invalid_marmot_group_tag + | :invalid_marmot_group_content @spec validate(map()) :: :ok | {:error, error_reason()} def validate(event) when is_map(event) do @@ -117,7 +120,11 @@ defmodule Parrhesia.Protocol.EventValidator do missing_giftwrap_recipient_tag: "invalid: kind 1059 must include at least one recipient p tag", invalid_giftwrap_recipient_tag: - "invalid: kind 1059 recipient p tags must contain lowercase hex pubkeys" + "invalid: kind 1059 recipient p tags must contain lowercase hex pubkeys", + missing_marmot_group_tag: "invalid: kind 445 must include at least one h tag with a group id", + invalid_marmot_group_tag: + "invalid: kind 445 h tags must contain 32-byte lowercase hex group ids", + invalid_marmot_group_content: "invalid: kind 445 content must be non-empty base64" } @spec error_message(error_reason()) :: String.t() @@ -198,6 +205,9 @@ defmodule Parrhesia.Protocol.EventValidator do defp validate_kind_specific(%{"kind" => 10_051} = event), do: validate_marmot_keypackage_relay_list(event) + defp validate_kind_specific(%{"kind" => 445} = event), + do: validate_marmot_group_event(event) + defp validate_kind_specific(%{"kind" => 444}), do: {:error, :invalid_marmot_direct_welcome_event} @@ -261,6 +271,14 @@ defmodule Parrhesia.Protocol.EventValidator do end end + defp validate_marmot_group_event(event) do + tags = Map.get(event, "tags", []) + + with :ok <- validate_non_empty_base64_content(event, :invalid_marmot_group_content) do + validate_marmot_group_tags(tags) + end + end + defp validate_giftwrap_event(event) do tags = Map.get(event, "tags", []) @@ -269,10 +287,13 @@ defmodule Parrhesia.Protocol.EventValidator do end end - defp validate_non_empty_base64_content(event) do + defp validate_non_empty_base64_content(event), + do: validate_non_empty_base64_content(event, :invalid_marmot_keypackage_content) + + defp validate_non_empty_base64_content(event, error_reason) do case Base.decode64(Map.get(event, "content", "")) do {:ok, decoded} when byte_size(decoded) > 0 -> :ok - _other -> {:error, :invalid_marmot_keypackage_content} + _other -> {:error, error_reason} end end @@ -283,6 +304,21 @@ defmodule Parrhesia.Protocol.EventValidator do end end + defp validate_marmot_group_tags(tags) do + group_tags = Enum.filter(tags, &match_tag_name?(&1, "h")) + + cond do + group_tags == [] -> + {:error, :missing_marmot_group_tag} + + Enum.all?(group_tags, &valid_marmot_group_tag?/1) -> + :ok + + true -> + {:error, :invalid_marmot_group_tag} + end + end + defp validate_giftwrap_recipient_tags(tags) do recipient_tags = Enum.filter(tags, &match_tag_name?(&1, "p")) @@ -405,6 +441,9 @@ defmodule Parrhesia.Protocol.EventValidator do defp valid_single_relay_tag?(["relay", relay_url]), do: valid_websocket_url?(relay_url) defp valid_single_relay_tag?(_tag), do: false + defp valid_marmot_group_tag?(["h", group_id | _rest]), do: lowercase_hex?(group_id, 32) + defp valid_marmot_group_tag?(_tag), do: false + defp valid_giftwrap_recipient_tag?(["p", recipient_pubkey | _rest]), do: lowercase_hex?(recipient_pubkey, 32) diff --git a/test/parrhesia/protocol/event_validator_marmot_test.exs b/test/parrhesia/protocol/event_validator_marmot_test.exs index 53d8dce..ffbf81e 100644 --- a/test/parrhesia/protocol/event_validator_marmot_test.exs +++ b/test/parrhesia/protocol/event_validator_marmot_test.exs @@ -57,6 +57,57 @@ defmodule Parrhesia.Protocol.EventValidatorMarmotTest do Protocol.validate_event(event) end + test "accepts valid kind 445 Marmot group envelope" do + group_event = + valid_keypackage_event(%{ + "kind" => 445, + "tags" => [["h", String.duplicate("c", 64)]], + "content" => Base.encode64("mls-message") + }) + + assert :ok = EventValidator.validate(group_event) + end + + test "accepts opaque binary payload for kind 445 without MLS inspection" do + opaque_payload = Base.encode64(<<0, 255, 10, 42, 128, 1, 2, 3>>) + + group_event = + valid_keypackage_event(%{ + "kind" => 445, + "tags" => [["h", String.duplicate("c", 64)]], + "content" => opaque_payload + }) + + assert :ok = EventValidator.validate(group_event) + end + + test "rejects malformed kind 445 Marmot group envelopes" do + missing_h = + valid_keypackage_event(%{ + "kind" => 445, + "tags" => [["p", String.duplicate("c", 64)]], + "content" => Base.encode64("mls-message") + }) + + invalid_h = + valid_keypackage_event(%{ + "kind" => 445, + "tags" => [["h", "not-hex"]], + "content" => Base.encode64("mls-message") + }) + + invalid_content = + valid_keypackage_event(%{ + "kind" => 445, + "tags" => [["h", String.duplicate("d", 64)]], + "content" => "not-base64" + }) + + assert {:error, :missing_marmot_group_tag} = EventValidator.validate(missing_h) + assert {:error, :invalid_marmot_group_tag} = EventValidator.validate(invalid_h) + assert {:error, :invalid_marmot_group_content} = EventValidator.validate(invalid_content) + end + test "rejects malformed kind 1059 wrapped welcome envelopes" do invalid_missing_recipient = valid_keypackage_event(%{ diff --git a/test/parrhesia/storage/adapters/postgres/events_query_count_test.exs b/test/parrhesia/storage/adapters/postgres/events_query_count_test.exs index ac091f8..66f3d70 100644 --- a/test/parrhesia/storage/adapters/postgres/events_query_count_test.exs +++ b/test/parrhesia/storage/adapters/postgres/events_query_count_test.exs @@ -316,6 +316,36 @@ defmodule Parrhesia.Storage.Adapters.Postgres.EventsQueryCountTest do assert Enum.map(results, & &1["id"]) == [tie_winner_id, tie_loser_id, older["id"]] end + test "query/3 keeps deterministic ordering for high-volume kind 445 group traffic" do + group_id = String.duplicate("c", 64) + + events = + Enum.map(1..60, fn idx -> + persist_event(%{ + "kind" => 445, + "created_at" => 1_700_001_000 + div(idx, 3), + "tags" => [["h", group_id]], + "content" => Base.encode64("group-message-#{idx}") + }) + end) + + assert {:ok, results} = + Events.query(%{}, [%{"kinds" => [445], "#h" => [group_id]}], []) + + expected_ids = + events + |> Enum.sort(fn left, right -> + cond do + left["created_at"] > right["created_at"] -> true + left["created_at"] < right["created_at"] -> false + true -> left["id"] < right["id"] + end + end) + |> Enum.map(& &1["id"]) + + assert Enum.map(results, & &1["id"]) == expected_ids + end + test "mls keypackage relay list kind 10051 follows replaceable conflict semantics" do author = String.duplicate("c", 64) diff --git a/test/parrhesia/storage/adapters/postgres/events_test.exs b/test/parrhesia/storage/adapters/postgres/events_test.exs index dc0d30c..421aa0e 100644 --- a/test/parrhesia/storage/adapters/postgres/events_test.exs +++ b/test/parrhesia/storage/adapters/postgres/events_test.exs @@ -59,6 +59,37 @@ defmodule Parrhesia.Storage.Adapters.Postgres.EventsTest do assert normalized.expires_at == 1_700_000_120 end + test "keeps explicit expiration tag for kind 445 when present" do + previous_features = Application.get_env(:parrhesia, :features, []) + previous_policies = Application.get_env(:parrhesia, :policies, []) + + Application.put_env(:parrhesia, :features, Keyword.put(previous_features, :nip_ee_mls, true)) + + Application.put_env( + :parrhesia, + :policies, + Keyword.put(previous_policies, :mls_group_event_ttl_seconds, 120) + ) + + on_exit(fn -> + Application.put_env(:parrhesia, :features, previous_features) + Application.put_env(:parrhesia, :policies, previous_policies) + end) + + event = %{ + "id" => String.duplicate("4", 64), + "pubkey" => String.duplicate("5", 64), + "created_at" => 1_700_000_000, + "kind" => 445, + "tags" => [["expiration", "1700000900"]], + "content" => "mls", + "sig" => String.duplicate("6", 128) + } + + assert {:ok, normalized} = Events.normalize_event(event) + assert normalized.expires_at == 1_700_000_900 + end + test "candidate_wins_state?/2 uses created_at then lexical id tie-break" do assert Events.candidate_wins_state?( %{created_at: 11, id: <<2>>}, diff --git a/test/parrhesia/web/conformance_test.exs b/test/parrhesia/web/conformance_test.exs index d80c6a0..5fb9a09 100644 --- a/test/parrhesia/web/conformance_test.exs +++ b/test/parrhesia/web/conformance_test.exs @@ -111,7 +111,7 @@ defmodule Parrhesia.Web.ConformanceTest do valid_event(%{ "kind" => 445, "tags" => [["h", String.duplicate("b", 64)]], - "content" => "commit-envelope" + "content" => Base.encode64("commit-envelope") }) assert {:push, {:text, commit_ok_frame}, ^state} = diff --git a/test/parrhesia/web/connection_test.exs b/test/parrhesia/web/connection_test.exs index f9ed209..75604c7 100644 --- a/test/parrhesia/web/connection_test.exs +++ b/test/parrhesia/web/connection_test.exs @@ -158,6 +158,37 @@ defmodule Parrhesia.Web.ConnectionTest do ] end + test "malformed kind 445 envelope EVENT is rejected" do + previous_features = Application.get_env(:parrhesia, :features, []) + + Application.put_env(:parrhesia, :features, Keyword.put(previous_features, :nip_ee_mls, true)) + + on_exit(fn -> + Application.put_env(:parrhesia, :features, previous_features) + end) + + state = connection_state() + + event = + valid_event() + |> Map.put("kind", 445) + |> Map.put("tags", [["h", "not-hex"]]) + |> Map.put("content", "not-base64") + |> then(&Map.put(&1, "id", EventValidator.compute_id(&1))) + + payload = Jason.encode!(["EVENT", event]) + + assert {:push, {:text, response}, ^state} = + Connection.handle_in({payload, [opcode: :text]}, state) + + assert Jason.decode!(response) == [ + "OK", + event["id"], + false, + "invalid: kind 445 content must be non-empty base64" + ] + end + test "NEG sessions open and close" do state = connection_state() @@ -205,6 +236,35 @@ defmodule Parrhesia.Web.ConnectionTest do assert Jason.decode!(payload) == ["EVENT", "sub-1", event] end + test "high-volume kind 445 fanout drains in order across batches" do + group_id = String.duplicate("c", 64) + + state = + subscribed_group_connection_state(group_id, + max_outbound_queue: 256, + outbound_drain_batch_size: 16 + ) + + events = + Enum.map(1..70, fn idx -> + live_group_event("group-event-#{idx}", group_id) + end) + + fanout_events = Enum.map(events, &{"sub-group", &1}) + + assert {:ok, queued_state} = Connection.handle_info({:fanout_events, fanout_events}, state) + assert queued_state.outbound_queue_size == 70 + + frames = drain_all_event_frames(queued_state) + + delivered_ids = + frames + |> Enum.map(fn {:text, payload} -> Jason.decode!(payload) end) + |> Enum.map(fn ["EVENT", "sub-group", event] -> event["id"] end) + + assert delivered_ids == Enum.map(events, & &1["id"]) + end + test "outbound queue overflow closes connection when strategy is close" do state = subscribed_connection_state( @@ -238,6 +298,16 @@ defmodule Parrhesia.Web.ConnectionTest do subscribed_state end + defp subscribed_group_connection_state(group_id, opts) do + state = connection_state(opts) + req_payload = Jason.encode!(["REQ", "sub-group", %{"kinds" => [445], "#h" => [group_id]}]) + + assert {:push, _, subscribed_state} = + Connection.handle_in({req_payload, [opcode: :text]}, state) + + subscribed_state + end + defp connection_state(opts \\ []) do {:ok, state} = Connection.init(Keyword.put_new(opts, :subscription_index, nil)) state @@ -255,6 +325,18 @@ defmodule Parrhesia.Web.ConnectionTest do } end + defp live_group_event(id, group_id) do + %{ + "id" => id, + "pubkey" => String.duplicate("a", 64), + "created_at" => System.system_time(:second), + "kind" => 445, + "tags" => [["h", group_id]], + "content" => Base.encode64("mls-group-message"), + "sig" => String.duplicate("b", 128) + } + end + defp valid_auth_event(challenge) do now = System.system_time(:second) @@ -270,6 +352,31 @@ defmodule Parrhesia.Web.ConnectionTest do Map.put(base, "id", EventValidator.compute_id(base)) end + defp drain_all_event_frames(state), do: drain_all_event_frames(state, []) + + defp drain_all_event_frames(%{outbound_queue_size: 0}, acc) do + flush_drain_messages() + Enum.reverse(acc) + end + + defp drain_all_event_frames(state, acc) do + case Connection.handle_info(:drain_outbound_queue, state) do + {:push, frames, next_state} -> + drain_all_event_frames(next_state, Enum.reverse(frames) ++ acc) + + {:ok, next_state} -> + drain_all_event_frames(next_state, acc) + end + end + + defp flush_drain_messages do + receive do + :drain_outbound_queue -> flush_drain_messages() + after + 0 -> :ok + end + end + defp valid_event do base_event = %{ "pubkey" => String.duplicate("1", 64),