From 39dbc069a7c0615c8eae3ab2803e24b86e9031be Mon Sep 17 00:00:00 2001 From: Steffen Beyer Date: Mon, 16 Mar 2026 16:00:15 +0100 Subject: [PATCH] feat: NIF-77 negentropy sync --- config/config.exs | 2 + config/runtime.exs | 10 + docs/SYNC.md | 12 +- docs/slop/LOCAL_API.md | 6 +- lib/parrhesia/negentropy/engine.ex | 136 +++++++ lib/parrhesia/negentropy/message.ex | 349 ++++++++++++++++++ lib/parrhesia/negentropy/sessions.ex | 108 ++++-- lib/parrhesia/protocol.ex | 41 +- .../storage/adapters/memory/events.ex | 35 ++ .../storage/adapters/postgres/events.ex | 59 +++ lib/parrhesia/storage/events.ex | 3 + .../test_support/expiration_stub_events.ex | 3 + lib/parrhesia/test_support/failing_events.ex | 3 + lib/parrhesia/web/connection.ex | 160 ++++++-- test/parrhesia/negentropy/engine_test.exs | 42 +++ test/parrhesia/negentropy/message_test.exs | 28 ++ test/parrhesia/negentropy/sessions_test.exs | 125 ++++++- test/parrhesia/protocol_test.exs | 16 +- .../storage/adapters/memory/adapter_test.exs | 13 +- .../postgres/events_query_count_test.exs | 31 ++ .../storage/behaviour_contracts_test.exs | 11 +- test/parrhesia/web/connection_test.exs | 102 ++++- 22 files changed, 1194 insertions(+), 101 deletions(-) create mode 100644 lib/parrhesia/negentropy/engine.ex create mode 100644 lib/parrhesia/negentropy/message.ex create mode 100644 test/parrhesia/negentropy/engine_test.exs create mode 100644 test/parrhesia/negentropy/message_test.exs diff --git a/config/config.exs b/config/config.exs index 8950b97..54795d4 100644 --- a/config/config.exs +++ b/config/config.exs @@ -21,6 +21,8 @@ config :parrhesia, max_negentropy_payload_bytes: 4096, max_negentropy_sessions_per_connection: 8, max_negentropy_total_sessions: 10_000, + max_negentropy_items_per_session: 50_000, + negentropy_id_list_threshold: 32, negentropy_session_idle_timeout_seconds: 60, negentropy_session_sweep_interval_seconds: 10 ], diff --git a/config/runtime.exs b/config/runtime.exs index 54d31bf..bb66bcb 100644 --- a/config/runtime.exs +++ b/config/runtime.exs @@ -210,6 +210,16 @@ if config_env() == :prod do "PARRHESIA_LIMITS_MAX_NEGENTROPY_TOTAL_SESSIONS", Keyword.get(limits_defaults, :max_negentropy_total_sessions, 10_000) ), + max_negentropy_items_per_session: + int_env.( + "PARRHESIA_LIMITS_MAX_NEGENTROPY_ITEMS_PER_SESSION", + Keyword.get(limits_defaults, :max_negentropy_items_per_session, 50_000) + ), + negentropy_id_list_threshold: + int_env.( + "PARRHESIA_LIMITS_NEGENTROPY_ID_LIST_THRESHOLD", + Keyword.get(limits_defaults, :negentropy_id_list_threshold, 32) + ), negentropy_session_idle_timeout_seconds: int_env.( "PARRHESIA_LIMITS_NEGENTROPY_SESSION_IDLE_TIMEOUT_SECONDS", diff --git a/docs/SYNC.md b/docs/SYNC.md index a965ae0..4483b87 100644 --- a/docs/SYNC.md +++ b/docs/SYNC.md @@ -131,14 +131,16 @@ This is enough for Tribes and keeps the first version simple. ### NIP-77 -NIP-77 is **not required** for the first sync implementation. +Parrhesia now has a real reusable relay-side NIP-77 engine: -Reason: +- proper `NEG-OPEN` / `NEG-MSG` / `NEG-CLOSE` / `NEG-ERR` framing, +- a reusable negentropy codec and reconciliation engine, +- bounded local `(created_at, id)` snapshot enumeration for matching filters, +- connection/session integration with policy checks and resource limits. -- Parrhesia currently only has `NEG-*` session tracking, not real negentropy reconciliation. -- The current Tribes sync profile already assumes catch-up plus live replay, not negentropy. +That means NIP-77 can be used for bandwidth-efficient catch-up between trusted nodes. -NIP-77 should be treated as a later optimization for bandwidth-efficient reconciliation once Parrhesia has a real reusable implementation. +The first sync worker implementation may still default to ordinary NIP-01 catch-up plus live replay, because that path is operationally simpler and already matches the current Tribes sync profile. `:negentropy` can now be introduced as an optimization mode rather than a future prerequisite. --- diff --git a/docs/slop/LOCAL_API.md b/docs/slop/LOCAL_API.md index e514384..d204d72 100644 --- a/docs/slop/LOCAL_API.md +++ b/docs/slop/LOCAL_API.md @@ -359,8 +359,8 @@ Initial mode should be `:req_stream`: Future optimization: -- `:negentropy` may be added when real NIP-77 reconciliation exists. -- It is not required for the first implementation. +- `:negentropy` may be added as an optimization mode on top of the simpler `:req_stream` baseline. +- Parrhesia now has a reusable NIP-77 engine, but a sync worker does not need to depend on it for the first implementation. --- @@ -372,7 +372,7 @@ Future optimization: - `REQ` -> `Parrhesia.API.Stream.subscribe/4` - `COUNT` -> `Parrhesia.API.Events.count/2` - `AUTH` stays connection-specific, but validation helpers may move to `API.Auth` -- `NEG-*` remains transport-specific until Parrhesia has a real reusable NIP-77 engine +- `NEG-*` maps to the reusable NIP-77 engine and remains exposed through the websocket transport boundary ### HTTP management diff --git a/lib/parrhesia/negentropy/engine.ex b/lib/parrhesia/negentropy/engine.ex new file mode 100644 index 0000000..04b7921 --- /dev/null +++ b/lib/parrhesia/negentropy/engine.ex @@ -0,0 +1,136 @@ +defmodule Parrhesia.Negentropy.Engine do + @moduledoc """ + Relay/client-agnostic negentropy reconciliation engine. + """ + + alias Parrhesia.Negentropy.Message + + @default_id_list_threshold 32 + + @type item :: Message.item() + + @spec initial_message([item()], keyword()) :: binary() + def initial_message(items, opts \\ []) when is_list(opts) do + normalized_items = normalize_items(items) + + Message.encode([ + describe_range(normalized_items, :infinity, id_list_threshold(opts)) + ]) + end + + @spec answer([item()], binary(), keyword()) :: {:ok, binary()} | {:error, term()} + def answer(items, incoming_message, opts \\ []) + when is_binary(incoming_message) and is_list(opts) do + normalized_items = normalize_items(items) + threshold = id_list_threshold(opts) + + case Message.decode(incoming_message) do + {:ok, ranges} -> + response_ranges = + respond_to_ranges(normalized_items, ranges, Message.initial_lower_bound(), threshold) + + {:ok, Message.encode(response_ranges)} + + {:unsupported_version, _supported_version} -> + {:ok, Message.supported_version_message()} + + {:error, reason} -> + {:error, reason} + end + end + + defp respond_to_ranges(_items, [], _lower_bound, _threshold), do: [] + + defp respond_to_ranges(items, [range | rest], lower_bound, threshold) do + upper_bound = Map.fetch!(range, :upper_bound) + + items_in_range = + Enum.filter(items, fn item -> + Message.item_in_range?(item, lower_bound, upper_bound) + end) + + response = + case range.mode do + :skip -> + [%{upper_bound: upper_bound, mode: :skip, payload: nil}] + + :fingerprint -> + respond_to_fingerprint_range(items_in_range, upper_bound, range.payload, threshold) + + :id_list -> + respond_to_id_list_range(items_in_range, upper_bound, range.payload, threshold) + end + + response ++ respond_to_ranges(items, rest, upper_bound, threshold) + end + + defp respond_to_fingerprint_range(items, upper_bound, remote_fingerprint, threshold) do + if Message.fingerprint(items) == remote_fingerprint do + [%{upper_bound: upper_bound, mode: :skip, payload: nil}] + else + mismatch_response(items, upper_bound, threshold) + end + end + + defp respond_to_id_list_range(items, upper_bound, remote_ids, threshold) do + if Enum.map(items, & &1.id) == remote_ids do + [%{upper_bound: upper_bound, mode: :skip, payload: nil}] + else + mismatch_response(items, upper_bound, threshold) + end + end + + defp mismatch_response(items, upper_bound, threshold) do + if length(items) <= threshold do + [%{upper_bound: upper_bound, mode: :id_list, payload: Enum.map(items, & &1.id)}] + else + split_response(items, upper_bound, threshold) + end + end + + defp split_response(items, upper_bound, threshold) do + midpoint = div(length(items), 2) + left_items = Enum.take(items, midpoint) + right_items = Enum.drop(items, midpoint) + + boundary = + left_items + |> List.last() + |> then(&Message.split_bound(&1, hd(right_items))) + + [ + describe_range(left_items, boundary, threshold), + describe_range(right_items, upper_bound, threshold) + ] + end + + defp describe_range(items, upper_bound, threshold) do + if length(items) <= threshold do + %{upper_bound: upper_bound, mode: :id_list, payload: Enum.map(items, & &1.id)} + else + %{upper_bound: upper_bound, mode: :fingerprint, payload: Message.fingerprint(items)} + end + end + + defp normalize_items(items) do + items + |> Enum.map(&normalize_item/1) + |> Enum.sort(&(Message.compare_items(&1, &2) != :gt)) + end + + defp normalize_item(%{created_at: created_at, id: id}) + when is_integer(created_at) and created_at >= 0 and is_binary(id) and byte_size(id) == 32 do + %{created_at: created_at, id: id} + end + + defp normalize_item(item) do + raise ArgumentError, "invalid negentropy item: #{inspect(item)}" + end + + defp id_list_threshold(opts) do + case Keyword.get(opts, :id_list_threshold, @default_id_list_threshold) do + threshold when is_integer(threshold) and threshold > 0 -> threshold + _other -> @default_id_list_threshold + end + end +end diff --git a/lib/parrhesia/negentropy/message.ex b/lib/parrhesia/negentropy/message.ex new file mode 100644 index 0000000..5bfafa8 --- /dev/null +++ b/lib/parrhesia/negentropy/message.ex @@ -0,0 +1,349 @@ +defmodule Parrhesia.Negentropy.Message do + @moduledoc """ + NIP-77 negentropy message codec and helpers. + """ + + import Bitwise + + @protocol_version 0x61 + @id_size 32 + @fingerprint_size 16 + @u256_mod 1 <<< 256 + @zero_id <<0::size(256)>> + + @type item :: %{created_at: non_neg_integer(), id: binary()} + @type bound :: :infinity | {non_neg_integer(), binary()} + @type range :: + %{ + upper_bound: bound(), + mode: :skip | :fingerprint | :id_list, + payload: nil | binary() | [binary()] + } + + @spec protocol_version() :: byte() + def protocol_version, do: @protocol_version + + @spec supported_version_message() :: binary() + def supported_version_message, do: <<@protocol_version>> + + @spec decode(binary()) :: {:ok, [range()]} | {:unsupported_version, byte()} | {:error, term()} + def decode(<>) when version != @protocol_version, + do: {:unsupported_version, @protocol_version} + + def decode(<<@protocol_version, rest::binary>>) do + decode_ranges(rest, 0, initial_lower_bound(), []) + end + + def decode(_message), do: {:error, :invalid_message} + + @spec encode([range()]) :: binary() + def encode(ranges) when is_list(ranges) do + ranges + |> drop_trailing_skip_ranges() + |> Enum.reduce({[@protocol_version], 0}, fn range, {acc, previous_timestamp} -> + {encoded_range, next_timestamp} = encode_range(range, previous_timestamp) + {[acc, encoded_range], next_timestamp} + end) + |> elem(0) + |> IO.iodata_to_binary() + end + + @spec fingerprint([item()]) :: binary() + def fingerprint(items) when is_list(items) do + sum = + Enum.reduce(items, 0, fn %{id: id}, acc -> + <> = id + rem(acc + id_integer, @u256_mod) + end) + + payload = [<>, encode_varint(length(items))] + + payload + |> IO.iodata_to_binary() + |> then(&:crypto.hash(:sha256, &1)) + |> binary_part(0, @fingerprint_size) + end + + @spec compare_items(item(), item()) :: :lt | :eq | :gt + def compare_items(left, right) do + cond do + left.created_at < right.created_at -> :lt + left.created_at > right.created_at -> :gt + left.id < right.id -> :lt + left.id > right.id -> :gt + true -> :eq + end + end + + @spec compare_bound(bound(), bound()) :: :lt | :eq | :gt + def compare_bound(:infinity, :infinity), do: :eq + def compare_bound(:infinity, _other), do: :gt + def compare_bound(_other, :infinity), do: :lt + + def compare_bound({left_timestamp, left_id}, {right_timestamp, right_id}) do + cond do + left_timestamp < right_timestamp -> :lt + left_timestamp > right_timestamp -> :gt + left_id < right_id -> :lt + left_id > right_id -> :gt + true -> :eq + end + end + + @spec item_in_range?(item(), bound(), bound()) :: boolean() + def item_in_range?(item, lower_bound, upper_bound) do + compare_item_to_bound(item, lower_bound) != :lt and + compare_item_to_bound(item, upper_bound) == :lt + end + + @spec initial_lower_bound() :: bound() + def initial_lower_bound, do: {0, @zero_id} + + @spec zero_id() :: binary() + def zero_id, do: @zero_id + + @spec split_bound(item(), item()) :: bound() + def split_bound(previous_item, next_item) + when is_map(previous_item) and is_map(next_item) do + cond do + previous_item.created_at < next_item.created_at -> + {next_item.created_at, @zero_id} + + previous_item.created_at == next_item.created_at -> + prefix_length = shared_prefix_length(previous_item.id, next_item.id) + 1 + <> = next_item.id + {next_item.created_at, prefix <> :binary.copy(<<0>>, @id_size - prefix_length)} + + true -> + raise ArgumentError, "split_bound/2 requires previous_item <= next_item" + end + end + + defp decode_ranges(<<>>, _previous_timestamp, _lower_bound, ranges), + do: {:ok, Enum.reverse(ranges)} + + defp decode_ranges(binary, previous_timestamp, lower_bound, ranges) do + with {:ok, upper_bound, rest, next_timestamp} <- decode_bound(binary, previous_timestamp), + :ok <- validate_upper_bound(lower_bound, upper_bound), + {:ok, mode, payload, tail} <- decode_payload(rest) do + next_ranges = [%{upper_bound: upper_bound, mode: mode, payload: payload} | ranges] + + if upper_bound == :infinity and tail != <<>> do + {:error, :invalid_message} + else + decode_ranges(tail, next_timestamp, upper_bound, next_ranges) + end + end + end + + defp validate_upper_bound(lower_bound, upper_bound) do + if compare_bound(lower_bound, upper_bound) == :lt do + :ok + else + {:error, :invalid_message} + end + end + + defp decode_bound(binary, previous_timestamp) do + with {:ok, encoded_timestamp, rest} <- decode_varint(binary), + {:ok, length, tail} <- decode_varint(rest), + :ok <- validate_bound_prefix_length(length), + {:ok, prefix, remainder} <- decode_prefix(tail, length) do + decode_bound_value(encoded_timestamp, length, prefix, remainder, previous_timestamp) + end + end + + defp decode_payload(binary) do + with {:ok, mode_value, rest} <- decode_varint(binary) do + case mode_value do + 0 -> + {:ok, :skip, nil, rest} + + 1 -> + decode_fingerprint_payload(rest) + + 2 -> + decode_id_list_payload(rest) + + _other -> + {:error, :invalid_message} + end + end + end + + defp decode_varint(binary), do: decode_varint(binary, 0) + + defp decode_varint(<<>>, _acc), do: {:error, :invalid_message} + + defp decode_varint(<>, acc) do + value = acc * 128 + band(byte, 0x7F) + + if band(byte, 0x80) == 0 do + {:ok, value, rest} + else + decode_varint(rest, value) + end + end + + defp encode_range(range, previous_timestamp) do + {encoded_bound, next_timestamp} = encode_bound(range.upper_bound, previous_timestamp) + {mode, payload} = encode_payload(range) + {[encoded_bound, mode, payload], next_timestamp} + end + + defp encode_bound(:infinity, previous_timestamp), + do: {[encode_varint(0), encode_varint(0)], previous_timestamp} + + defp encode_bound({timestamp, id}, previous_timestamp) do + prefix_length = id_prefix_length(id) + <> = id + + { + [encode_varint(timestamp - previous_timestamp + 1), encode_varint(prefix_length), prefix], + timestamp + } + end + + defp encode_payload(%{mode: :skip}) do + {encode_varint(0), <<>>} + end + + defp encode_payload(%{mode: :fingerprint, payload: fingerprint}) + when is_binary(fingerprint) and byte_size(fingerprint) == @fingerprint_size do + {encode_varint(1), fingerprint} + end + + defp encode_payload(%{mode: :id_list, payload: ids}) when is_list(ids) do + encoded_ids = Enum.map(ids, fn id -> validate_id!(id) end) + {encode_varint(2), [encode_varint(length(encoded_ids)), encoded_ids]} + end + + defp encode_varint(value) when is_integer(value) and value >= 0 do + digits = collect_base128_digits(value, []) + last_index = length(digits) - 1 + + digits + |> Enum.with_index() + |> Enum.map(fn {digit, index} -> + if index == last_index do + digit + else + digit + 128 + end + end) + |> :erlang.list_to_binary() + end + + defp collect_base128_digits(value, acc) do + quotient = div(value, 128) + remainder = rem(value, 128) + + if quotient == 0 do + [remainder | acc] + else + collect_base128_digits(quotient, [remainder | acc]) + end + end + + defp unpack_ids(binary), do: unpack_ids(binary, []) + + defp unpack_ids(<<>>, acc), do: Enum.reverse(acc) + + defp unpack_ids(<>, acc), + do: unpack_ids(rest, [id | acc]) + + defp decode_prefix(binary, length) when byte_size(binary) >= length do + <> = binary + {:ok, prefix, rest} + end + + defp decode_prefix(_binary, _length), do: {:error, :invalid_message} + + defp decode_bound_value(0, 0, _prefix, remainder, previous_timestamp), + do: {:ok, :infinity, remainder, previous_timestamp} + + defp decode_bound_value(0, _length, _prefix, _remainder, _previous_timestamp), + do: {:error, :invalid_message} + + defp decode_bound_value(encoded_timestamp, length, prefix, remainder, previous_timestamp) do + timestamp = previous_timestamp + encoded_timestamp - 1 + id = prefix <> :binary.copy(<<0>>, @id_size - length) + {:ok, {timestamp, id}, remainder, timestamp} + end + + defp decode_fingerprint_payload(<>), + do: {:ok, :fingerprint, fingerprint, tail} + + defp decode_fingerprint_payload(_payload), do: {:error, :invalid_message} + + defp decode_id_list_payload(rest) do + with {:ok, count, tail} <- decode_varint(rest), + {:ok, ids, remainder} <- decode_id_list_bytes(tail, count) do + {:ok, :id_list, ids, remainder} + end + end + + defp decode_id_list_bytes(tail, count) do + expected_bytes = count * @id_size + + if byte_size(tail) >= expected_bytes do + <> = tail + {:ok, unpack_ids(ids), remainder} + else + {:error, :invalid_message} + end + end + + defp validate_bound_prefix_length(length) + when is_integer(length) and length >= 0 and length <= @id_size, + do: :ok + + defp validate_bound_prefix_length(_length), do: {:error, :invalid_message} + + defp id_prefix_length(id) do + id + |> validate_id!() + |> :binary.bin_to_list() + |> Enum.reverse() + |> Enum.drop_while(&(&1 == 0)) + |> length() + end + + defp shared_prefix_length(left_id, right_id) do + left_id = validate_id!(left_id) + right_id = validate_id!(right_id) + + left_id + |> :binary.bin_to_list() + |> Enum.zip(:binary.bin_to_list(right_id)) + |> Enum.reduce_while(0, fn + {left_byte, right_byte}, acc when left_byte == right_byte -> {:cont, acc + 1} + _pair, acc -> {:halt, acc} + end) + end + + defp drop_trailing_skip_ranges(ranges) do + ranges + |> Enum.reverse() + |> Enum.drop_while(fn range -> range.mode == :skip end) + |> Enum.reverse() + end + + defp compare_item_to_bound(_item, :infinity), do: :lt + + defp compare_item_to_bound(item, {timestamp, id}) do + cond do + item.created_at < timestamp -> :lt + item.created_at > timestamp -> :gt + item.id < id -> :lt + item.id > id -> :gt + true -> :eq + end + end + + defp validate_id!(id) when is_binary(id) and byte_size(id) == @id_size, do: id + + defp validate_id!(_id) do + raise ArgumentError, "negentropy ids must be 32-byte binaries" + end +end diff --git a/lib/parrhesia/negentropy/sessions.ex b/lib/parrhesia/negentropy/sessions.ex index 080f5b6..6b10f26 100644 --- a/lib/parrhesia/negentropy/sessions.ex +++ b/lib/parrhesia/negentropy/sessions.ex @@ -1,10 +1,13 @@ defmodule Parrhesia.Negentropy.Sessions do @moduledoc """ - In-memory NEG-* session tracking. + In-memory NIP-77 session tracking over bounded local event snapshots. """ use GenServer + alias Parrhesia.Negentropy.Engine + alias Parrhesia.Storage + @type session_key :: {pid(), String.t()} @default_max_payload_bytes 4096 @@ -12,6 +15,8 @@ defmodule Parrhesia.Negentropy.Sessions do @default_max_total_sessions 10_000 @default_max_idle_seconds 60 @default_sweep_interval_seconds 10 + @default_max_items_per_session 50_000 + @default_id_list_threshold 32 @sweep_idle_sessions :sweep_idle_sessions @spec start_link(keyword()) :: GenServer.on_start() @@ -20,16 +25,19 @@ defmodule Parrhesia.Negentropy.Sessions do GenServer.start_link(__MODULE__, opts, name: name) end - @spec open(GenServer.server(), pid(), String.t(), map()) :: {:ok, map()} | {:error, term()} - def open(server \\ __MODULE__, owner_pid, subscription_id, params) - when is_pid(owner_pid) and is_binary(subscription_id) and is_map(params) do - GenServer.call(server, {:open, owner_pid, subscription_id, params}) + @spec open(GenServer.server(), pid(), String.t(), map(), binary(), keyword()) :: + {:ok, binary()} | {:error, term()} + def open(server \\ __MODULE__, owner_pid, subscription_id, filter, message, opts \\ []) + when is_pid(owner_pid) and is_binary(subscription_id) and is_map(filter) and + is_binary(message) and is_list(opts) do + GenServer.call(server, {:open, owner_pid, subscription_id, filter, message, opts}) end - @spec message(GenServer.server(), pid(), String.t(), map()) :: {:ok, map()} | {:error, term()} - def message(server \\ __MODULE__, owner_pid, subscription_id, payload) - when is_pid(owner_pid) and is_binary(subscription_id) and is_map(payload) do - GenServer.call(server, {:message, owner_pid, subscription_id, payload}) + @spec message(GenServer.server(), pid(), String.t(), binary()) :: + {:ok, binary()} | {:error, term()} + def message(server \\ __MODULE__, owner_pid, subscription_id, message) + when is_pid(owner_pid) and is_binary(subscription_id) and is_binary(message) do + GenServer.call(server, {:message, owner_pid, subscription_id, message}) end @spec close(GenServer.server(), pid(), String.t()) :: :ok @@ -63,7 +71,17 @@ defmodule Parrhesia.Negentropy.Sessions do max_total_sessions: normalize_positive_integer(Keyword.get(opts, :max_total_sessions), max_total_sessions()), max_idle_ms: max_idle_ms, - sweep_interval_ms: sweep_interval_ms + sweep_interval_ms: sweep_interval_ms, + max_items_per_session: + normalize_positive_integer( + Keyword.get(opts, :max_items_per_session), + max_items_per_session() + ), + id_list_threshold: + normalize_positive_integer( + Keyword.get(opts, :id_list_threshold), + id_list_threshold() + ) } :ok = schedule_idle_sweep(sweep_interval_ms) @@ -72,16 +90,19 @@ defmodule Parrhesia.Negentropy.Sessions do end @impl true - def handle_call({:open, owner_pid, subscription_id, params}, _from, state) do + def handle_call({:open, owner_pid, subscription_id, filter, message, opts}, _from, state) do key = {owner_pid, subscription_id} - with :ok <- validate_payload_size(params, state.max_payload_bytes), - :ok <- enforce_session_limits(state, owner_pid, key) do + with :ok <- validate_payload_size(filter, message, state.max_payload_bytes), + :ok <- enforce_session_limits(state, owner_pid, key), + {:ok, refs} <- fetch_event_refs(filter, opts, state.max_items_per_session), + {:ok, response} <- + Engine.answer(refs, message, id_list_threshold: state.id_list_threshold) do now_ms = System.monotonic_time(:millisecond) session = %{ - cursor: 0, - params: params, + filter: filter, + refs: refs, opened_at: System.system_time(:second), last_active_at_ms: now_ms } @@ -91,14 +112,14 @@ defmodule Parrhesia.Negentropy.Sessions do |> ensure_monitor(owner_pid) |> put_in([:sessions, key], session) - {:reply, {:ok, %{"status" => "open", "cursor" => 0}}, state} + {:reply, {:ok, response}, state} else {:error, reason} -> {:reply, {:error, reason}, state} end end - def handle_call({:message, owner_pid, subscription_id, payload}, _from, state) do + def handle_call({:message, owner_pid, subscription_id, message}, _from, state) do key = {owner_pid, subscription_id} case Map.get(state.sessions, key) do @@ -106,20 +127,18 @@ defmodule Parrhesia.Negentropy.Sessions do {:reply, {:error, :unknown_session}, state} session -> - case validate_payload_size(payload, state.max_payload_bytes) do - :ok -> - cursor = session.cursor + 1 + with :ok <- validate_payload_size(session.filter, message, state.max_payload_bytes), + {:ok, response} <- + Engine.answer(session.refs, message, id_list_threshold: state.id_list_threshold) do + next_session = %{ + session + | last_active_at_ms: System.monotonic_time(:millisecond) + } - next_session = %{ - session - | cursor: cursor, - last_active_at_ms: System.monotonic_time(:millisecond) - } - - state = put_in(state, [:sessions, key], next_session) - - {:reply, {:ok, %{"status" => "ack", "cursor" => cursor}}, state} + state = put_in(state, [:sessions, key], next_session) + {:reply, {:ok, response}, state} + else {:error, reason} -> {:reply, {:error, reason}, state} end @@ -185,6 +204,21 @@ defmodule Parrhesia.Negentropy.Sessions do def handle_info(_message, state), do: {:noreply, state} + defp fetch_event_refs(filter, opts, max_items_per_session) do + query_opts = + opts + |> Keyword.take([:now, :requester_pubkeys]) + |> Keyword.put(:limit, max_items_per_session + 1) + + with {:ok, refs} <- Storage.events().query_event_refs(%{}, [filter], query_opts) do + if length(refs) > max_items_per_session do + {:error, :query_too_big} + else + {:ok, refs} + end + end + end + defp clear_monitors_without_sessions(state, owner_pids) do Enum.reduce(Map.keys(state.monitors), state, fn owner_pid, acc -> if MapSet.member?(owner_pids, owner_pid) do @@ -203,8 +237,8 @@ defmodule Parrhesia.Negentropy.Sessions do end) end - defp validate_payload_size(payload, max_payload_bytes) do - if :erlang.external_size(payload) <= max_payload_bytes do + defp validate_payload_size(filter, message, max_payload_bytes) do + if :erlang.external_size({filter, message}) <= max_payload_bytes do :ok else {:error, :payload_too_large} @@ -296,6 +330,18 @@ defmodule Parrhesia.Negentropy.Sessions do |> Keyword.get(:negentropy_session_sweep_interval_seconds, @default_sweep_interval_seconds) end + defp max_items_per_session do + :parrhesia + |> Application.get_env(:limits, []) + |> Keyword.get(:max_negentropy_items_per_session, @default_max_items_per_session) + end + + defp id_list_threshold do + :parrhesia + |> Application.get_env(:limits, []) + |> Keyword.get(:negentropy_id_list_threshold, @default_id_list_threshold) + end + defp normalize_positive_integer(value, _default) when is_integer(value) and value > 0, do: value diff --git a/lib/parrhesia/protocol.ex b/lib/parrhesia/protocol.ex index aeb06c3..4e9efdf 100644 --- a/lib/parrhesia/protocol.ex +++ b/lib/parrhesia/protocol.ex @@ -14,8 +14,8 @@ defmodule Parrhesia.Protocol do | {:close, String.t()} | {:auth, event()} | {:count, String.t(), [filter()], map()} - | {:neg_open, String.t(), map()} - | {:neg_msg, String.t(), map()} + | {:neg_open, String.t(), filter(), binary()} + | {:neg_msg, String.t(), binary()} | {:neg_close, String.t()} @type relay_message :: @@ -26,7 +26,8 @@ defmodule Parrhesia.Protocol do | {:event, String.t(), event()} | {:auth, String.t()} | {:count, String.t(), map()} - | {:neg_msg, String.t(), map()} + | {:neg_msg, String.t(), String.t()} + | {:neg_err, String.t(), String.t()} @type decode_error :: :invalid_json @@ -122,21 +123,25 @@ defmodule Parrhesia.Protocol do defp decode_message(["AUTH", _invalid]), do: {:error, :invalid_auth} - defp decode_message(["NEG-OPEN", subscription_id, payload]) - when is_binary(subscription_id) and is_map(payload) do - if valid_subscription_id?(subscription_id) do - {:ok, {:neg_open, subscription_id, payload}} + defp decode_message(["NEG-OPEN", subscription_id, filter, initial_message]) + when is_binary(subscription_id) and is_map(filter) and is_binary(initial_message) do + with true <- valid_subscription_id?(subscription_id), + {:ok, decoded_message} <- decode_negentropy_hex(initial_message) do + {:ok, {:neg_open, subscription_id, filter, decoded_message}} else - {:error, :invalid_subscription_id} + false -> {:error, :invalid_subscription_id} + {:error, _reason} -> {:error, :invalid_negentropy} end end defp decode_message(["NEG-MSG", subscription_id, payload]) - when is_binary(subscription_id) and is_map(payload) do - if valid_subscription_id?(subscription_id) do - {:ok, {:neg_msg, subscription_id, payload}} + when is_binary(subscription_id) and is_binary(payload) do + with true <- valid_subscription_id?(subscription_id), + {:ok, decoded_payload} <- decode_negentropy_hex(payload) do + {:ok, {:neg_msg, subscription_id, decoded_payload}} else - {:error, :invalid_subscription_id} + false -> {:error, :invalid_subscription_id} + {:error, _reason} -> {:error, :invalid_negentropy} end end @@ -215,7 +220,19 @@ defmodule Parrhesia.Protocol do defp relay_frame({:neg_msg, subscription_id, payload}), do: ["NEG-MSG", subscription_id, payload] + defp relay_frame({:neg_err, subscription_id, reason}), + do: ["NEG-ERR", subscription_id, reason] + defp valid_subscription_id?(subscription_id) do subscription_id != "" and String.length(subscription_id) <= 64 end + + defp decode_negentropy_hex(payload) when is_binary(payload) and payload != "" do + case Base.decode16(payload, case: :mixed) do + {:ok, decoded} when decoded != <<>> -> {:ok, decoded} + _other -> {:error, :invalid_negentropy} + end + end + + defp decode_negentropy_hex(_payload), do: {:error, :invalid_negentropy} end diff --git a/lib/parrhesia/storage/adapters/memory/events.ex b/lib/parrhesia/storage/adapters/memory/events.ex index 5e9870e..6cf3ec6 100644 --- a/lib/parrhesia/storage/adapters/memory/events.ex +++ b/lib/parrhesia/storage/adapters/memory/events.ex @@ -55,6 +55,24 @@ defmodule Parrhesia.Storage.Adapters.Memory.Events do end end + @impl true + def query_event_refs(context, filters, opts) do + with {:ok, events} <- query(context, filters, opts) do + refs = + events + |> Enum.map(fn event -> + %{ + created_at: Map.fetch!(event, "created_at"), + id: Base.decode16!(Map.fetch!(event, "id"), case: :mixed) + } + end) + |> Enum.sort(&(compare_event_refs(&1, &2) != :gt)) + |> maybe_limit_event_refs(opts) + + {:ok, refs} + end + end + @impl true def count(context, filters, opts) do with {:ok, events} <- query(context, filters, opts) do @@ -189,4 +207,21 @@ defmodule Parrhesia.Storage.Adapters.Memory.Events do _tag -> false end) end + + defp compare_event_refs(left, right) do + cond do + left.created_at < right.created_at -> :lt + left.created_at > right.created_at -> :gt + left.id < right.id -> :lt + left.id > right.id -> :gt + true -> :eq + end + end + + defp maybe_limit_event_refs(refs, opts) do + case Keyword.get(opts, :limit) do + limit when is_integer(limit) and limit > 0 -> Enum.take(refs, limit) + _other -> refs + end + end end diff --git a/lib/parrhesia/storage/adapters/postgres/events.ex b/lib/parrhesia/storage/adapters/postgres/events.ex index 3b4622b..ce4a158 100644 --- a/lib/parrhesia/storage/adapters/postgres/events.ex +++ b/lib/parrhesia/storage/adapters/postgres/events.ex @@ -94,6 +94,31 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Events do def query(_context, _filters, _opts), do: {:error, :invalid_opts} + @impl true + def query_event_refs(_context, filters, opts) when is_list(opts) do + with :ok <- Filter.validate_filters(filters) do + now = Keyword.get(opts, :now, System.system_time(:second)) + + refs = + filters + |> event_ref_union_query_for_filters(now, opts) + |> subquery() + |> then(fn union_query -> + from(ref in union_query, + group_by: [ref.created_at, ref.id], + order_by: [asc: ref.created_at, asc: ref.id], + select: %{created_at: ref.created_at, id: ref.id} + ) + end) + |> maybe_limit_query(Keyword.get(opts, :limit)) + |> Repo.all() + + {:ok, refs} + end + end + + def query_event_refs(_context, _filters, _opts), do: {:error, :invalid_opts} + @impl true def count(_context, filters, opts) when is_list(opts) do with :ok <- Filter.validate_filters(filters) do @@ -665,6 +690,40 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Events do end) end + defp event_ref_query_for_filter(filter, now, opts) do + from(event in "events", + where: is_nil(event.deleted_at) and (is_nil(event.expires_at) or event.expires_at > ^now), + order_by: [asc: event.created_at, asc: event.id], + select: %{ + created_at: event.created_at, + id: event.id + } + ) + |> maybe_filter_ids(Map.get(filter, "ids")) + |> maybe_filter_authors(Map.get(filter, "authors")) + |> maybe_filter_kinds(Map.get(filter, "kinds")) + |> maybe_filter_since(Map.get(filter, "since")) + |> maybe_filter_until(Map.get(filter, "until")) + |> maybe_filter_search(Map.get(filter, "search")) + |> filter_by_tags(filter) + |> maybe_restrict_giftwrap_access(filter, opts) + |> maybe_limit_query(effective_filter_limit(filter, opts)) + end + + defp event_ref_union_query_for_filters([], now, _opts) do + from(event in "events", + where: event.created_at > ^now and event.created_at < ^now, + select: %{created_at: event.created_at, id: event.id} + ) + end + + defp event_ref_union_query_for_filters([first_filter | rest_filters], now, opts) do + Enum.reduce(rest_filters, event_ref_query_for_filter(first_filter, now, opts), fn filter, + acc -> + union_all(acc, ^event_ref_query_for_filter(filter, now, opts)) + end) + end + defp maybe_filter_ids(query, nil), do: query defp maybe_filter_ids(query, ids) do diff --git a/lib/parrhesia/storage/events.ex b/lib/parrhesia/storage/events.ex index be684c2..0b1e3bc 100644 --- a/lib/parrhesia/storage/events.ex +++ b/lib/parrhesia/storage/events.ex @@ -7,6 +7,7 @@ defmodule Parrhesia.Storage.Events do @type event_id :: binary() @type event :: map() @type filter :: map() + @type event_ref :: %{created_at: non_neg_integer(), id: binary()} @type query_opts :: keyword() @type count_result :: non_neg_integer() | %{optional(atom()) => term()} @type reason :: term() @@ -14,6 +15,8 @@ defmodule Parrhesia.Storage.Events do @callback put_event(context(), event()) :: {:ok, event()} | {:error, reason()} @callback get_event(context(), event_id()) :: {:ok, event() | nil} | {:error, reason()} @callback query(context(), [filter()], query_opts()) :: {:ok, [event()]} | {:error, reason()} + @callback query_event_refs(context(), [filter()], query_opts()) :: + {:ok, [event_ref()]} | {:error, reason()} @callback count(context(), [filter()], query_opts()) :: {:ok, count_result()} | {:error, reason()} @callback delete_by_request(context(), event()) :: {:ok, non_neg_integer()} | {:error, reason()} diff --git a/lib/parrhesia/test_support/expiration_stub_events.ex b/lib/parrhesia/test_support/expiration_stub_events.ex index b455635..68530cd 100644 --- a/lib/parrhesia/test_support/expiration_stub_events.ex +++ b/lib/parrhesia/test_support/expiration_stub_events.ex @@ -12,6 +12,9 @@ defmodule Parrhesia.TestSupport.ExpirationStubEvents do @impl true def query(_context, _filters, _opts), do: {:ok, []} + @impl true + def query_event_refs(_context, _filters, _opts), do: {:ok, []} + @impl true def count(_context, _filters, _opts), do: {:ok, 0} diff --git a/lib/parrhesia/test_support/failing_events.ex b/lib/parrhesia/test_support/failing_events.ex index 811b213..9a7a1a9 100644 --- a/lib/parrhesia/test_support/failing_events.ex +++ b/lib/parrhesia/test_support/failing_events.ex @@ -12,6 +12,9 @@ defmodule Parrhesia.TestSupport.FailingEvents do @impl true def query(_context, _filters, _opts), do: {:error, :db_down} + @impl true + def query_event_refs(_context, _filters, _opts), do: {:error, :db_down} + @impl true def count(_context, _filters, _opts), do: {:error, :db_down} diff --git a/lib/parrhesia/web/connection.ex b/lib/parrhesia/web/connection.ex index 6fbdf40..e187c03 100644 --- a/lib/parrhesia/web/connection.ex +++ b/lib/parrhesia/web/connection.ex @@ -160,11 +160,11 @@ defmodule Parrhesia.Web.Connection do defp handle_decoded_message({:auth, auth_event}, state), do: handle_auth(state, auth_event) - defp handle_decoded_message({:neg_open, subscription_id, payload}, state), - do: handle_neg_open(state, subscription_id, payload) + defp handle_decoded_message({:neg_open, subscription_id, filter, message}, state), + do: handle_neg_open(state, subscription_id, filter, message) - defp handle_decoded_message({:neg_msg, subscription_id, payload}, state), - do: handle_neg_msg(state, subscription_id, payload) + defp handle_decoded_message({:neg_msg, subscription_id, message}, state), + do: handle_neg_msg(state, subscription_id, message) defp handle_decoded_message({:neg_close, subscription_id}, state), do: handle_neg_close(state, subscription_id) @@ -447,34 +447,41 @@ defmodule Parrhesia.Web.Connection do end end - defp handle_neg_open(%__MODULE__{} = state, subscription_id, payload) do - case maybe_open_negentropy(state, subscription_id, payload) do - {:ok, message} -> - response = Protocol.encode_relay({:neg_msg, subscription_id, message}) - {:push, {:text, response}, state} + defp handle_neg_open(%__MODULE__{} = state, subscription_id, filter, message) do + with :ok <- Filter.validate_filters([filter]), + :ok <- EventPolicy.authorize_read([filter], state.authenticated_pubkeys), + {:ok, response_message} <- + maybe_open_negentropy(state, subscription_id, filter, message) do + response = + response_message + |> Base.encode16(case: :lower) + |> then(&Protocol.encode_relay({:neg_msg, subscription_id, &1})) + {:push, {:text, response}, state} + else {:error, reason} -> - response = Protocol.encode_relay({:closed, subscription_id, "error: #{inspect(reason)}"}) - {:push, {:text, response}, state} + negentropy_error_response(state, subscription_id, reason) end end - defp handle_neg_msg(%__MODULE__{} = state, subscription_id, payload) do - case maybe_negentropy_message(state, subscription_id, payload) do - {:ok, message} -> - response = Protocol.encode_relay({:neg_msg, subscription_id, message}) + defp handle_neg_msg(%__MODULE__{} = state, subscription_id, message) do + case maybe_negentropy_message(state, subscription_id, message) do + {:ok, response_message} -> + response = + response_message + |> Base.encode16(case: :lower) + |> then(&Protocol.encode_relay({:neg_msg, subscription_id, &1})) + {:push, {:text, response}, state} {:error, reason} -> - response = Protocol.encode_relay({:closed, subscription_id, "error: #{inspect(reason)}"}) - {:push, {:text, response}, state} + negentropy_error_response(state, subscription_id, reason) end end defp handle_neg_close(%__MODULE__{} = state, subscription_id) do :ok = maybe_close_negentropy(state, subscription_id) - response = Protocol.encode_relay({:neg_msg, subscription_id, %{"status" => "closed"}}) - {:push, {:text, response}, state} + {:ok, state} end defp maybe_process_group_event(event) do @@ -664,6 +671,91 @@ defmodule Parrhesia.Web.Connection do with_auth_challenge_frame(state, {:push, {:text, response}, state}) end + defp negentropy_error_response(state, subscription_id, reason) do + response = + Protocol.encode_relay({ + :neg_err, + subscription_id, + negentropy_error_message(reason) + }) + + if reason in [:auth_required, :restricted_giftwrap] do + with_auth_challenge_frame(state, {:push, {:text, response}, state}) + else + {:push, {:text, response}, state} + end + end + + defp negentropy_error_message(reason) + when reason in [ + :invalid_filters, + :empty_filters, + :too_many_filters, + :invalid_filter, + :invalid_filter_key, + :invalid_ids, + :invalid_authors, + :invalid_kinds, + :invalid_since, + :invalid_until, + :invalid_limit, + :invalid_search, + :invalid_tag_filter, + :auth_required, + :restricted_giftwrap, + :marmot_group_h_tag_required, + :marmot_group_h_values_exceeded, + :marmot_group_filter_window_too_wide + ], + do: negentropy_policy_or_filter_error_message(reason) + + defp negentropy_error_message(:negentropy_disabled), + do: "blocked: negentropy is disabled" + + defp negentropy_error_message(:negentropy_unavailable), + do: "blocked: negentropy is unavailable" + + defp negentropy_error_message(:payload_too_large), + do: "blocked: negentropy payload exceeds configured limit" + + defp negentropy_error_message(:session_limit_reached), + do: "blocked: maximum negentropy sessions reached" + + defp negentropy_error_message(:owner_session_limit_reached), + do: "blocked: maximum negentropy sessions per connection reached" + + defp negentropy_error_message(:query_too_big), + do: "blocked: negentropy query is too big" + + defp negentropy_error_message(:unknown_session), + do: "closed: negentropy subscription is not open" + + defp negentropy_error_message(:invalid_message), + do: "invalid: invalid negentropy message" + + defp negentropy_error_message(reason) when is_binary(reason), do: reason + defp negentropy_error_message(reason), do: "error: #{inspect(reason)}" + + defp negentropy_policy_or_filter_error_message(reason) + when reason in [ + :invalid_filters, + :empty_filters, + :too_many_filters, + :invalid_filter, + :invalid_filter_key, + :invalid_ids, + :invalid_authors, + :invalid_kinds, + :invalid_since, + :invalid_until, + :invalid_limit, + :invalid_search, + :invalid_tag_filter + ], + do: Filter.error_message(reason) + + defp negentropy_policy_or_filter_error_message(reason), do: EventPolicy.error_message(reason) + defp validate_auth_event(%__MODULE__{} = state, %{"kind" => 22_242} = auth_event) do tags = Map.get(auth_event, "tags", []) @@ -776,15 +868,31 @@ defmodule Parrhesia.Web.Connection do :exit, _reason -> :ok end - defp maybe_open_negentropy(%__MODULE__{negentropy_sessions: nil}, _subscription_id, _payload), - do: {:error, :negentropy_disabled} + defp maybe_open_negentropy( + %__MODULE__{negentropy_sessions: nil}, + _subscription_id, + _filter, + _message + ), + do: {:error, :negentropy_disabled} defp maybe_open_negentropy( - %__MODULE__{negentropy_sessions: negentropy_sessions}, + %__MODULE__{ + negentropy_sessions: negentropy_sessions, + authenticated_pubkeys: authenticated_pubkeys + }, subscription_id, - payload + filter, + message ) do - Sessions.open(negentropy_sessions, self(), subscription_id, payload) + Sessions.open( + negentropy_sessions, + self(), + subscription_id, + filter, + message, + requester_pubkeys: MapSet.to_list(authenticated_pubkeys) + ) catch :exit, _reason -> {:error, :negentropy_unavailable} end @@ -799,9 +907,9 @@ defmodule Parrhesia.Web.Connection do defp maybe_negentropy_message( %__MODULE__{negentropy_sessions: negentropy_sessions}, subscription_id, - payload + message ) do - Sessions.message(negentropy_sessions, self(), subscription_id, payload) + Sessions.message(negentropy_sessions, self(), subscription_id, message) catch :exit, _reason -> {:error, :negentropy_unavailable} end diff --git a/test/parrhesia/negentropy/engine_test.exs b/test/parrhesia/negentropy/engine_test.exs new file mode 100644 index 0000000..28d72f8 --- /dev/null +++ b/test/parrhesia/negentropy/engine_test.exs @@ -0,0 +1,42 @@ +defmodule Parrhesia.Negentropy.EngineTest do + use ExUnit.Case, async: true + + alias Parrhesia.Negentropy.Engine + alias Parrhesia.Negentropy.Message + + test "returns exact id list for small mismatched ranges" do + server_items = [ + %{created_at: 10, id: <<1::size(256)>>}, + %{created_at: 11, id: <<2::size(256)>>} + ] + + assert {:ok, response} = Engine.answer(server_items, Engine.initial_message([])) + + assert {:ok, [%{mode: :id_list, payload: ids, upper_bound: :infinity}]} = + Message.decode(response) + + assert ids == Enum.map(server_items, & &1.id) + end + + test "splits large mismatched fingerprint ranges" do + client_items = + Enum.map(1..4, fn idx -> + %{created_at: 100 + idx, id: <>} + end) + + server_items = + client_items ++ [%{created_at: 200, id: <<99::size(256)>>}] + + initial_message = Engine.initial_message(client_items, id_list_threshold: 1) + + assert {:ok, response} = Engine.answer(server_items, initial_message, id_list_threshold: 1) + assert {:ok, ranges} = Message.decode(response) + + assert Enum.all?(ranges, &(&1.mode in [:fingerprint, :id_list])) + assert length(ranges) >= 2 + end + + test "downgrades unsupported versions" do + assert {:ok, <<0x61>>} = Engine.answer([], <<0x62>>) + end +end diff --git a/test/parrhesia/negentropy/message_test.exs b/test/parrhesia/negentropy/message_test.exs new file mode 100644 index 0000000..7da5c1c --- /dev/null +++ b/test/parrhesia/negentropy/message_test.exs @@ -0,0 +1,28 @@ +defmodule Parrhesia.Negentropy.MessageTest do + use ExUnit.Case, async: true + + alias Parrhesia.Negentropy.Message + + test "encodes and decodes mixed range messages" do + first_id = <<1::size(256)>> + second_id = <<2::size(256)>> + + boundary = + Message.split_bound(%{created_at: 10, id: first_id}, %{created_at: 10, id: second_id}) + + ranges = [ + %{upper_bound: boundary, mode: :fingerprint, payload: <<0::size(128)>>}, + %{upper_bound: {11, Message.zero_id()}, mode: :id_list, payload: [second_id]}, + %{upper_bound: :infinity, mode: :skip, payload: nil} + ] + + assert {:ok, decoded_ranges} = ranges |> Message.encode() |> Message.decode() + + assert decoded_ranges == + Enum.reject(ranges, &(&1.mode == :skip and &1.upper_bound == :infinity)) + end + + test "rejects malformed bounds and payloads" do + assert {:error, :invalid_message} = Message.decode(<<0x61, 0x00, 0x01, 0x02>>) + end +end diff --git a/test/parrhesia/negentropy/sessions_test.exs b/test/parrhesia/negentropy/sessions_test.exs index a5f3089..f1b8542 100644 --- a/test/parrhesia/negentropy/sessions_test.exs +++ b/test/parrhesia/negentropy/sessions_test.exs @@ -1,19 +1,64 @@ defmodule Parrhesia.Negentropy.SessionsTest do - use ExUnit.Case, async: true + use ExUnit.Case, async: false + alias Ecto.Adapters.SQL.Sandbox + alias Parrhesia.Negentropy.Engine + alias Parrhesia.Negentropy.Message alias Parrhesia.Negentropy.Sessions + alias Parrhesia.Protocol.EventValidator + alias Parrhesia.Repo + alias Parrhesia.Storage.Adapters.Postgres.Events - test "opens, advances and closes sessions" do + setup_all do + if is_nil(Process.whereis(Repo)) do + start_supervised!(Repo) + end + + Sandbox.mode(Repo, :manual) + :ok + end + + setup do + :ok = Sandbox.checkout(Repo) + :ok + end + + test "opens, responds, advances and closes sessions" do server = start_supervised!({Sessions, name: nil}) + Sandbox.allow(Repo, self(), server) - assert {:ok, %{"status" => "open", "cursor" => 0}} = - Sessions.open(server, self(), "sub-neg", %{"cursor" => 0}) + first = + persist_event(%{ + "created_at" => 1_700_100_000, + "content" => "neg-1" + }) - assert {:ok, %{"status" => "ack", "cursor" => 1}} = - Sessions.message(server, self(), "sub-neg", %{"delta" => "abc"}) + second = + persist_event(%{ + "created_at" => 1_700_100_001, + "content" => "neg-2" + }) + + initial_message = Engine.initial_message([]) + + assert {:ok, response_message} = + Sessions.open(server, self(), "sub-neg", %{"kinds" => [1]}, initial_message) + + assert {:ok, [%{mode: :id_list, payload: ids, upper_bound: :infinity}]} = + Message.decode(response_message) + + assert ids == [ + Base.decode16!(first["id"], case: :mixed), + Base.decode16!(second["id"], case: :mixed) + ] + + {:ok, refs} = Events.query_event_refs(%{}, [%{"kinds" => [1]}], []) + matching_message = Engine.initial_message(refs) + + assert {:ok, <<0x61>>} = Sessions.message(server, self(), "sub-neg", matching_message) assert :ok = Sessions.close(server, self(), "sub-neg") - assert {:error, :unknown_session} = Sessions.message(server, self(), "sub-neg", %{}) + assert {:error, :unknown_session} = Sessions.message(server, self(), "sub-neg", <<0x61>>) end test "rejects oversized NEG payloads" do @@ -28,8 +73,16 @@ defmodule Parrhesia.Negentropy.SessionsTest do sweep_interval_seconds: 60} ) + Sandbox.allow(Repo, self(), server) + assert {:error, :payload_too_large} = - Sessions.open(server, self(), "sub-neg", %{"delta" => String.duplicate("a", 256)}) + Sessions.open( + server, + self(), + "sub-neg", + %{"kinds" => [1]}, + String.duplicate(<<0x61>>, 128) + ) end test "enforces per-owner session limits" do @@ -44,10 +97,60 @@ defmodule Parrhesia.Negentropy.SessionsTest do sweep_interval_seconds: 60} ) - assert {:ok, %{"status" => "open", "cursor" => 0}} = - Sessions.open(server, self(), "sub-1", %{}) + Sandbox.allow(Repo, self(), server) + + assert {:ok, _response} = + Sessions.open(server, self(), "sub-1", %{"kinds" => [1]}, Engine.initial_message([])) assert {:error, :owner_session_limit_reached} = - Sessions.open(server, self(), "sub-2", %{}) + Sessions.open(server, self(), "sub-2", %{"kinds" => [1]}, Engine.initial_message([])) + end + + test "blocks queries larger than the configured session snapshot limit" do + server = + start_supervised!( + {Sessions, + name: nil, + max_payload_bytes: 1024, + max_sessions_per_owner: 8, + max_total_sessions: 16, + max_idle_seconds: 60, + sweep_interval_seconds: 60, + max_items_per_session: 1} + ) + + Sandbox.allow(Repo, self(), server) + + persist_event(%{"created_at" => 1_700_200_000, "content" => "first"}) + persist_event(%{"created_at" => 1_700_200_001, "content" => "second"}) + + assert {:error, :query_too_big} = + Sessions.open( + server, + self(), + "sub-neg", + %{"kinds" => [1]}, + Engine.initial_message([]) + ) + end + + defp persist_event(overrides) do + event = build_event(overrides) + assert {:ok, persisted_event} = Events.put_event(%{}, event) + persisted_event + end + + defp build_event(overrides) do + base_event = %{ + "pubkey" => String.duplicate("1", 64), + "created_at" => System.system_time(:second), + "kind" => 1, + "tags" => [], + "content" => "negentropy-test", + "sig" => String.duplicate("2", 128) + } + + event = Map.merge(base_event, overrides) + Map.put(event, "id", EventValidator.compute_id(event)) end end diff --git a/test/parrhesia/protocol_test.exs b/test/parrhesia/protocol_test.exs index 72adbc6..324a479 100644 --- a/test/parrhesia/protocol_test.exs +++ b/test/parrhesia/protocol_test.exs @@ -41,11 +41,13 @@ defmodule Parrhesia.ProtocolTest do assert {:ok, {:auth, ^auth_event}} = Protocol.decode_client(JSON.encode!(["AUTH", auth_event])) - assert {:ok, {:neg_open, "sub-neg", %{"cursor" => 0}}} = - Protocol.decode_client(JSON.encode!(["NEG-OPEN", "sub-neg", %{"cursor" => 0}])) + assert {:ok, {:neg_open, "sub-neg", %{"kinds" => [1]}, <<0x61>>}} = + Protocol.decode_client( + JSON.encode!(["NEG-OPEN", "sub-neg", %{"kinds" => [1]}, "61"]) + ) - assert {:ok, {:neg_msg, "sub-neg", %{"delta" => "abc"}}} = - Protocol.decode_client(JSON.encode!(["NEG-MSG", "sub-neg", %{"delta" => "abc"}])) + assert {:ok, {:neg_msg, "sub-neg", <<0x61, 0x00>>}} = + Protocol.decode_client(JSON.encode!(["NEG-MSG", "sub-neg", "6100"])) assert {:ok, {:neg_close, "sub-neg"}} = Protocol.decode_client(JSON.encode!(["NEG-CLOSE", "sub-neg"])) @@ -90,6 +92,12 @@ defmodule Parrhesia.ProtocolTest do count_frame = Protocol.encode_relay({:count, "sub-1", %{"count" => 1}}) assert JSON.decode!(count_frame) == ["COUNT", "sub-1", %{"count" => 1}] + + neg_message_frame = Protocol.encode_relay({:neg_msg, "sub-neg", "61"}) + assert JSON.decode!(neg_message_frame) == ["NEG-MSG", "sub-neg", "61"] + + neg_error_frame = Protocol.encode_relay({:neg_err, "sub-neg", "closed: too slow"}) + assert JSON.decode!(neg_error_frame) == ["NEG-ERR", "sub-neg", "closed: too slow"] end defp valid_event do diff --git a/test/parrhesia/storage/adapters/memory/adapter_test.exs b/test/parrhesia/storage/adapters/memory/adapter_test.exs index 753243d..9c1a5df 100644 --- a/test/parrhesia/storage/adapters/memory/adapter_test.exs +++ b/test/parrhesia/storage/adapters/memory/adapter_test.exs @@ -8,12 +8,23 @@ defmodule Parrhesia.Storage.Adapters.Memory.AdapterTest do test "memory adapter supports basic behavior contract operations" do event_id = String.duplicate("a", 64) - event = %{"id" => event_id, "pubkey" => "pk", "kind" => 1, "tags" => [], "content" => "hello"} + + event = %{ + "id" => event_id, + "pubkey" => "pk", + "created_at" => 1_700_000_000, + "kind" => 1, + "tags" => [], + "content" => "hello" + } assert {:ok, _event} = Events.put_event(%{}, event) assert {:ok, [result]} = Events.query(%{}, [%{"ids" => [event_id]}], []) assert result["id"] == event_id + assert {:ok, [%{created_at: 1_700_000_000, id: <<_::size(256)>>}]} = + Events.query_event_refs(%{}, [%{"ids" => [event_id]}], []) + assert :ok = Moderation.ban_pubkey(%{}, "pk") assert {:ok, true} = Moderation.pubkey_banned?(%{}, "pk") diff --git a/test/parrhesia/storage/adapters/postgres/events_query_count_test.exs b/test/parrhesia/storage/adapters/postgres/events_query_count_test.exs index 3f61594..2cb4f6f 100644 --- a/test/parrhesia/storage/adapters/postgres/events_query_count_test.exs +++ b/test/parrhesia/storage/adapters/postgres/events_query_count_test.exs @@ -106,6 +106,37 @@ defmodule Parrhesia.Storage.Adapters.Postgres.EventsQueryCountTest do assert Enum.map(results, & &1["id"]) == [newest["id"], tie_winner_id] end + test "query_event_refs/3 returns sorted lightweight refs for negentropy" do + author = String.duplicate("9", 64) + + later = + persist_event(%{ + "pubkey" => author, + "created_at" => 1_700_000_510, + "kind" => 1, + "content" => "later" + }) + + earlier = + persist_event(%{ + "pubkey" => author, + "created_at" => 1_700_000_500, + "kind" => 1, + "content" => "earlier" + }) + + assert {:ok, refs} = + Events.query_event_refs(%{}, [%{"authors" => [author], "kinds" => [1]}], []) + + assert refs == [ + %{ + created_at: earlier["created_at"], + id: Base.decode16!(earlier["id"], case: :mixed) + }, + %{created_at: later["created_at"], id: Base.decode16!(later["id"], case: :mixed)} + ] + end + test "count/3 ORs filters, deduplicates matches and respects tag filters" do now = 1_700_001_000 target_pubkey = String.duplicate("f", 64) diff --git a/test/parrhesia/storage/behaviour_contracts_test.exs b/test/parrhesia/storage/behaviour_contracts_test.exs index 6b969ae..d48ac93 100644 --- a/test/parrhesia/storage/behaviour_contracts_test.exs +++ b/test/parrhesia/storage/behaviour_contracts_test.exs @@ -3,7 +3,16 @@ defmodule Parrhesia.Storage.BehaviourContractsTest do test "events behavior exposes expected callbacks" do assert callback_names(Parrhesia.Storage.Events) == - [:count, :delete_by_request, :get_event, :purge_expired, :put_event, :query, :vanish] + [ + :count, + :delete_by_request, + :get_event, + :purge_expired, + :put_event, + :query, + :query_event_refs, + :vanish + ] end test "moderation behavior exposes expected callbacks" do diff --git a/test/parrhesia/web/connection_test.exs b/test/parrhesia/web/connection_test.exs index 65024a9..7eff83c 100644 --- a/test/parrhesia/web/connection_test.exs +++ b/test/parrhesia/web/connection_test.exs @@ -2,6 +2,8 @@ defmodule Parrhesia.Web.ConnectionTest do use ExUnit.Case, async: false alias Ecto.Adapters.SQL.Sandbox + alias Parrhesia.Negentropy.Engine + alias Parrhesia.Negentropy.Message alias Parrhesia.Protocol.EventValidator alias Parrhesia.Repo alias Parrhesia.Web.Connection @@ -435,23 +437,109 @@ defmodule Parrhesia.Web.ConnectionTest do ] end - test "NEG sessions open and close" do - state = connection_state() + test "NEG sessions open, return reconciliation payloads and close silently" do + negentropy_sessions = + start_supervised!( + {Parrhesia.Negentropy.Sessions, + name: nil, + max_payload_bytes: 1024, + max_sessions_per_owner: 8, + max_total_sessions: 16, + max_idle_seconds: 60, + sweep_interval_seconds: 60} + ) - open_payload = JSON.encode!(["NEG-OPEN", "neg-1", %{"cursor" => 0}]) + Sandbox.allow(Repo, self(), negentropy_sessions) + + state = connection_state(negentropy_sessions: negentropy_sessions) + + first = + valid_event(%{ + "created_at" => 1_700_300_000, + "content" => "neg-a" + }) + + second = + valid_event(%{ + "created_at" => 1_700_300_001, + "content" => "neg-b" + }) + + assert {:push, {:text, _response}, _next_state} = + Connection.handle_in({JSON.encode!(["EVENT", first]), [opcode: :text]}, state) + + assert {:push, {:text, _response}, _next_state} = + Connection.handle_in({JSON.encode!(["EVENT", second]), [opcode: :text]}, state) + + open_payload = + JSON.encode!([ + "NEG-OPEN", + "neg-1", + %{"kinds" => [1]}, + Base.encode16(Engine.initial_message([]), case: :lower) + ]) assert {:push, {:text, open_response}, _next_state} = Connection.handle_in({open_payload, [opcode: :text]}, state) - assert ["NEG-MSG", "neg-1", %{"status" => "open", "cursor" => 0}] = - JSON.decode!(open_response) + assert ["NEG-MSG", "neg-1", response_hex] = JSON.decode!(open_response) + + assert {:ok, [%{mode: :id_list, payload: ids, upper_bound: :infinity}]} = + response_hex |> Base.decode16!(case: :mixed) |> Message.decode() + + assert ids == [ + Base.decode16!(first["id"], case: :mixed), + Base.decode16!(second["id"], case: :mixed) + ] close_payload = JSON.encode!(["NEG-CLOSE", "neg-1"]) - assert {:push, {:text, close_response}, _next_state} = + assert {:ok, _next_state} = Connection.handle_in({close_payload, [opcode: :text]}, state) + end - assert JSON.decode!(close_response) == ["NEG-MSG", "neg-1", %{"status" => "closed"}] + test "NEG sessions return NEG-ERR for oversized snapshots" do + negentropy_sessions = + start_supervised!( + {Parrhesia.Negentropy.Sessions, + name: nil, + max_payload_bytes: 1024, + max_sessions_per_owner: 8, + max_total_sessions: 16, + max_idle_seconds: 60, + sweep_interval_seconds: 60, + max_items_per_session: 1} + ) + + Sandbox.allow(Repo, self(), negentropy_sessions) + + state = connection_state(negentropy_sessions: negentropy_sessions) + + first = valid_event(%{"created_at" => 1_700_301_000, "content" => "neg-big-a"}) + second = valid_event(%{"created_at" => 1_700_301_001, "content" => "neg-big-b"}) + + assert {:push, {:text, _response}, _next_state} = + Connection.handle_in({JSON.encode!(["EVENT", first]), [opcode: :text]}, state) + + assert {:push, {:text, _response}, _next_state} = + Connection.handle_in({JSON.encode!(["EVENT", second]), [opcode: :text]}, state) + + open_payload = + JSON.encode!([ + "NEG-OPEN", + "neg-oversized", + %{"kinds" => [1]}, + Base.encode16(Engine.initial_message([]), case: :lower) + ]) + + assert {:push, {:text, response}, _next_state} = + Connection.handle_in({open_payload, [opcode: :text]}, state) + + assert JSON.decode!(response) == [ + "NEG-ERR", + "neg-oversized", + "blocked: negentropy query is too big" + ] end test "CLOSE removes subscription and replies with CLOSED" do