Implement M4 Marmot group envelope and fanout hardening

This commit is contained in:
2026-03-13 22:12:45 +01:00
parent 1547d00215
commit 619c1a0bdf
7 changed files with 266 additions and 8 deletions

View File

@@ -57,6 +57,57 @@ defmodule Parrhesia.Protocol.EventValidatorMarmotTest do
Protocol.validate_event(event)
end
test "accepts valid kind 445 Marmot group envelope" do
group_event =
valid_keypackage_event(%{
"kind" => 445,
"tags" => [["h", String.duplicate("c", 64)]],
"content" => Base.encode64("mls-message")
})
assert :ok = EventValidator.validate(group_event)
end
test "accepts opaque binary payload for kind 445 without MLS inspection" do
opaque_payload = Base.encode64(<<0, 255, 10, 42, 128, 1, 2, 3>>)
group_event =
valid_keypackage_event(%{
"kind" => 445,
"tags" => [["h", String.duplicate("c", 64)]],
"content" => opaque_payload
})
assert :ok = EventValidator.validate(group_event)
end
test "rejects malformed kind 445 Marmot group envelopes" do
missing_h =
valid_keypackage_event(%{
"kind" => 445,
"tags" => [["p", String.duplicate("c", 64)]],
"content" => Base.encode64("mls-message")
})
invalid_h =
valid_keypackage_event(%{
"kind" => 445,
"tags" => [["h", "not-hex"]],
"content" => Base.encode64("mls-message")
})
invalid_content =
valid_keypackage_event(%{
"kind" => 445,
"tags" => [["h", String.duplicate("d", 64)]],
"content" => "not-base64"
})
assert {:error, :missing_marmot_group_tag} = EventValidator.validate(missing_h)
assert {:error, :invalid_marmot_group_tag} = EventValidator.validate(invalid_h)
assert {:error, :invalid_marmot_group_content} = EventValidator.validate(invalid_content)
end
test "rejects malformed kind 1059 wrapped welcome envelopes" do
invalid_missing_recipient =
valid_keypackage_event(%{

View File

@@ -316,6 +316,36 @@ defmodule Parrhesia.Storage.Adapters.Postgres.EventsQueryCountTest do
assert Enum.map(results, & &1["id"]) == [tie_winner_id, tie_loser_id, older["id"]]
end
test "query/3 keeps deterministic ordering for high-volume kind 445 group traffic" do
group_id = String.duplicate("c", 64)
events =
Enum.map(1..60, fn idx ->
persist_event(%{
"kind" => 445,
"created_at" => 1_700_001_000 + div(idx, 3),
"tags" => [["h", group_id]],
"content" => Base.encode64("group-message-#{idx}")
})
end)
assert {:ok, results} =
Events.query(%{}, [%{"kinds" => [445], "#h" => [group_id]}], [])
expected_ids =
events
|> Enum.sort(fn left, right ->
cond do
left["created_at"] > right["created_at"] -> true
left["created_at"] < right["created_at"] -> false
true -> left["id"] < right["id"]
end
end)
|> Enum.map(& &1["id"])
assert Enum.map(results, & &1["id"]) == expected_ids
end
test "mls keypackage relay list kind 10051 follows replaceable conflict semantics" do
author = String.duplicate("c", 64)

View File

@@ -59,6 +59,37 @@ defmodule Parrhesia.Storage.Adapters.Postgres.EventsTest do
assert normalized.expires_at == 1_700_000_120
end
test "keeps explicit expiration tag for kind 445 when present" do
previous_features = Application.get_env(:parrhesia, :features, [])
previous_policies = Application.get_env(:parrhesia, :policies, [])
Application.put_env(:parrhesia, :features, Keyword.put(previous_features, :nip_ee_mls, true))
Application.put_env(
:parrhesia,
:policies,
Keyword.put(previous_policies, :mls_group_event_ttl_seconds, 120)
)
on_exit(fn ->
Application.put_env(:parrhesia, :features, previous_features)
Application.put_env(:parrhesia, :policies, previous_policies)
end)
event = %{
"id" => String.duplicate("4", 64),
"pubkey" => String.duplicate("5", 64),
"created_at" => 1_700_000_000,
"kind" => 445,
"tags" => [["expiration", "1700000900"]],
"content" => "mls",
"sig" => String.duplicate("6", 128)
}
assert {:ok, normalized} = Events.normalize_event(event)
assert normalized.expires_at == 1_700_000_900
end
test "candidate_wins_state?/2 uses created_at then lexical id tie-break" do
assert Events.candidate_wins_state?(
%{created_at: 11, id: <<2>>},

View File

@@ -111,7 +111,7 @@ defmodule Parrhesia.Web.ConformanceTest do
valid_event(%{
"kind" => 445,
"tags" => [["h", String.duplicate("b", 64)]],
"content" => "commit-envelope"
"content" => Base.encode64("commit-envelope")
})
assert {:push, {:text, commit_ok_frame}, ^state} =

View File

@@ -158,6 +158,37 @@ defmodule Parrhesia.Web.ConnectionTest do
]
end
test "malformed kind 445 envelope EVENT is rejected" do
previous_features = Application.get_env(:parrhesia, :features, [])
Application.put_env(:parrhesia, :features, Keyword.put(previous_features, :nip_ee_mls, true))
on_exit(fn ->
Application.put_env(:parrhesia, :features, previous_features)
end)
state = connection_state()
event =
valid_event()
|> Map.put("kind", 445)
|> Map.put("tags", [["h", "not-hex"]])
|> Map.put("content", "not-base64")
|> then(&Map.put(&1, "id", EventValidator.compute_id(&1)))
payload = Jason.encode!(["EVENT", event])
assert {:push, {:text, response}, ^state} =
Connection.handle_in({payload, [opcode: :text]}, state)
assert Jason.decode!(response) == [
"OK",
event["id"],
false,
"invalid: kind 445 content must be non-empty base64"
]
end
test "NEG sessions open and close" do
state = connection_state()
@@ -205,6 +236,35 @@ defmodule Parrhesia.Web.ConnectionTest do
assert Jason.decode!(payload) == ["EVENT", "sub-1", event]
end
test "high-volume kind 445 fanout drains in order across batches" do
group_id = String.duplicate("c", 64)
state =
subscribed_group_connection_state(group_id,
max_outbound_queue: 256,
outbound_drain_batch_size: 16
)
events =
Enum.map(1..70, fn idx ->
live_group_event("group-event-#{idx}", group_id)
end)
fanout_events = Enum.map(events, &{"sub-group", &1})
assert {:ok, queued_state} = Connection.handle_info({:fanout_events, fanout_events}, state)
assert queued_state.outbound_queue_size == 70
frames = drain_all_event_frames(queued_state)
delivered_ids =
frames
|> Enum.map(fn {:text, payload} -> Jason.decode!(payload) end)
|> Enum.map(fn ["EVENT", "sub-group", event] -> event["id"] end)
assert delivered_ids == Enum.map(events, & &1["id"])
end
test "outbound queue overflow closes connection when strategy is close" do
state =
subscribed_connection_state(
@@ -238,6 +298,16 @@ defmodule Parrhesia.Web.ConnectionTest do
subscribed_state
end
defp subscribed_group_connection_state(group_id, opts) do
state = connection_state(opts)
req_payload = Jason.encode!(["REQ", "sub-group", %{"kinds" => [445], "#h" => [group_id]}])
assert {:push, _, subscribed_state} =
Connection.handle_in({req_payload, [opcode: :text]}, state)
subscribed_state
end
defp connection_state(opts \\ []) do
{:ok, state} = Connection.init(Keyword.put_new(opts, :subscription_index, nil))
state
@@ -255,6 +325,18 @@ defmodule Parrhesia.Web.ConnectionTest do
}
end
defp live_group_event(id, group_id) do
%{
"id" => id,
"pubkey" => String.duplicate("a", 64),
"created_at" => System.system_time(:second),
"kind" => 445,
"tags" => [["h", group_id]],
"content" => Base.encode64("mls-group-message"),
"sig" => String.duplicate("b", 128)
}
end
defp valid_auth_event(challenge) do
now = System.system_time(:second)
@@ -270,6 +352,31 @@ defmodule Parrhesia.Web.ConnectionTest do
Map.put(base, "id", EventValidator.compute_id(base))
end
defp drain_all_event_frames(state), do: drain_all_event_frames(state, [])
defp drain_all_event_frames(%{outbound_queue_size: 0}, acc) do
flush_drain_messages()
Enum.reverse(acc)
end
defp drain_all_event_frames(state, acc) do
case Connection.handle_info(:drain_outbound_queue, state) do
{:push, frames, next_state} ->
drain_all_event_frames(next_state, Enum.reverse(frames) ++ acc)
{:ok, next_state} ->
drain_all_event_frames(next_state, acc)
end
end
defp flush_drain_messages do
receive do
:drain_outbound_queue -> flush_drain_messages()
after
0 -> :ok
end
end
defp valid_event do
base_event = %{
"pubkey" => String.duplicate("1", 64),