feat: NIF-77 negentropy sync

This commit is contained in:
2026-03-16 16:00:15 +01:00
parent 4c2c93deb3
commit 39dbc069a7
22 changed files with 1194 additions and 101 deletions

View File

@@ -21,6 +21,8 @@ config :parrhesia,
max_negentropy_payload_bytes: 4096,
max_negentropy_sessions_per_connection: 8,
max_negentropy_total_sessions: 10_000,
max_negentropy_items_per_session: 50_000,
negentropy_id_list_threshold: 32,
negentropy_session_idle_timeout_seconds: 60,
negentropy_session_sweep_interval_seconds: 10
],

View File

@@ -210,6 +210,16 @@ if config_env() == :prod do
"PARRHESIA_LIMITS_MAX_NEGENTROPY_TOTAL_SESSIONS",
Keyword.get(limits_defaults, :max_negentropy_total_sessions, 10_000)
),
max_negentropy_items_per_session:
int_env.(
"PARRHESIA_LIMITS_MAX_NEGENTROPY_ITEMS_PER_SESSION",
Keyword.get(limits_defaults, :max_negentropy_items_per_session, 50_000)
),
negentropy_id_list_threshold:
int_env.(
"PARRHESIA_LIMITS_NEGENTROPY_ID_LIST_THRESHOLD",
Keyword.get(limits_defaults, :negentropy_id_list_threshold, 32)
),
negentropy_session_idle_timeout_seconds:
int_env.(
"PARRHESIA_LIMITS_NEGENTROPY_SESSION_IDLE_TIMEOUT_SECONDS",

View File

@@ -131,14 +131,16 @@ This is enough for Tribes and keeps the first version simple.
### NIP-77
NIP-77 is **not required** for the first sync implementation.
Parrhesia now has a real reusable relay-side NIP-77 engine:
Reason:
- proper `NEG-OPEN` / `NEG-MSG` / `NEG-CLOSE` / `NEG-ERR` framing,
- a reusable negentropy codec and reconciliation engine,
- bounded local `(created_at, id)` snapshot enumeration for matching filters,
- connection/session integration with policy checks and resource limits.
- Parrhesia currently only has `NEG-*` session tracking, not real negentropy reconciliation.
- The current Tribes sync profile already assumes catch-up plus live replay, not negentropy.
That means NIP-77 can be used for bandwidth-efficient catch-up between trusted nodes.
NIP-77 should be treated as a later optimization for bandwidth-efficient reconciliation once Parrhesia has a real reusable implementation.
The first sync worker implementation may still default to ordinary NIP-01 catch-up plus live replay, because that path is operationally simpler and already matches the current Tribes sync profile. `:negentropy` can now be introduced as an optimization mode rather than a future prerequisite.
---

View File

@@ -359,8 +359,8 @@ Initial mode should be `:req_stream`:
Future optimization:
- `:negentropy` may be added when real NIP-77 reconciliation exists.
- It is not required for the first implementation.
- `:negentropy` may be added as an optimization mode on top of the simpler `:req_stream` baseline.
- Parrhesia now has a reusable NIP-77 engine, but a sync worker does not need to depend on it for the first implementation.
---
@@ -372,7 +372,7 @@ Future optimization:
- `REQ` -> `Parrhesia.API.Stream.subscribe/4`
- `COUNT` -> `Parrhesia.API.Events.count/2`
- `AUTH` stays connection-specific, but validation helpers may move to `API.Auth`
- `NEG-*` remains transport-specific until Parrhesia has a real reusable NIP-77 engine
- `NEG-*` maps to the reusable NIP-77 engine and remains exposed through the websocket transport boundary
### HTTP management

View File

@@ -0,0 +1,136 @@
defmodule Parrhesia.Negentropy.Engine do
@moduledoc """
Relay/client-agnostic negentropy reconciliation engine.
"""
alias Parrhesia.Negentropy.Message
@default_id_list_threshold 32
@type item :: Message.item()
@spec initial_message([item()], keyword()) :: binary()
def initial_message(items, opts \\ []) when is_list(opts) do
normalized_items = normalize_items(items)
Message.encode([
describe_range(normalized_items, :infinity, id_list_threshold(opts))
])
end
@spec answer([item()], binary(), keyword()) :: {:ok, binary()} | {:error, term()}
def answer(items, incoming_message, opts \\ [])
when is_binary(incoming_message) and is_list(opts) do
normalized_items = normalize_items(items)
threshold = id_list_threshold(opts)
case Message.decode(incoming_message) do
{:ok, ranges} ->
response_ranges =
respond_to_ranges(normalized_items, ranges, Message.initial_lower_bound(), threshold)
{:ok, Message.encode(response_ranges)}
{:unsupported_version, _supported_version} ->
{:ok, Message.supported_version_message()}
{:error, reason} ->
{:error, reason}
end
end
defp respond_to_ranges(_items, [], _lower_bound, _threshold), do: []
defp respond_to_ranges(items, [range | rest], lower_bound, threshold) do
upper_bound = Map.fetch!(range, :upper_bound)
items_in_range =
Enum.filter(items, fn item ->
Message.item_in_range?(item, lower_bound, upper_bound)
end)
response =
case range.mode do
:skip ->
[%{upper_bound: upper_bound, mode: :skip, payload: nil}]
:fingerprint ->
respond_to_fingerprint_range(items_in_range, upper_bound, range.payload, threshold)
:id_list ->
respond_to_id_list_range(items_in_range, upper_bound, range.payload, threshold)
end
response ++ respond_to_ranges(items, rest, upper_bound, threshold)
end
defp respond_to_fingerprint_range(items, upper_bound, remote_fingerprint, threshold) do
if Message.fingerprint(items) == remote_fingerprint do
[%{upper_bound: upper_bound, mode: :skip, payload: nil}]
else
mismatch_response(items, upper_bound, threshold)
end
end
defp respond_to_id_list_range(items, upper_bound, remote_ids, threshold) do
if Enum.map(items, & &1.id) == remote_ids do
[%{upper_bound: upper_bound, mode: :skip, payload: nil}]
else
mismatch_response(items, upper_bound, threshold)
end
end
defp mismatch_response(items, upper_bound, threshold) do
if length(items) <= threshold do
[%{upper_bound: upper_bound, mode: :id_list, payload: Enum.map(items, & &1.id)}]
else
split_response(items, upper_bound, threshold)
end
end
defp split_response(items, upper_bound, threshold) do
midpoint = div(length(items), 2)
left_items = Enum.take(items, midpoint)
right_items = Enum.drop(items, midpoint)
boundary =
left_items
|> List.last()
|> then(&Message.split_bound(&1, hd(right_items)))
[
describe_range(left_items, boundary, threshold),
describe_range(right_items, upper_bound, threshold)
]
end
defp describe_range(items, upper_bound, threshold) do
if length(items) <= threshold do
%{upper_bound: upper_bound, mode: :id_list, payload: Enum.map(items, & &1.id)}
else
%{upper_bound: upper_bound, mode: :fingerprint, payload: Message.fingerprint(items)}
end
end
defp normalize_items(items) do
items
|> Enum.map(&normalize_item/1)
|> Enum.sort(&(Message.compare_items(&1, &2) != :gt))
end
defp normalize_item(%{created_at: created_at, id: id})
when is_integer(created_at) and created_at >= 0 and is_binary(id) and byte_size(id) == 32 do
%{created_at: created_at, id: id}
end
defp normalize_item(item) do
raise ArgumentError, "invalid negentropy item: #{inspect(item)}"
end
defp id_list_threshold(opts) do
case Keyword.get(opts, :id_list_threshold, @default_id_list_threshold) do
threshold when is_integer(threshold) and threshold > 0 -> threshold
_other -> @default_id_list_threshold
end
end
end

View File

@@ -0,0 +1,349 @@
defmodule Parrhesia.Negentropy.Message do
@moduledoc """
NIP-77 negentropy message codec and helpers.
"""
import Bitwise
@protocol_version 0x61
@id_size 32
@fingerprint_size 16
@u256_mod 1 <<< 256
@zero_id <<0::size(256)>>
@type item :: %{created_at: non_neg_integer(), id: binary()}
@type bound :: :infinity | {non_neg_integer(), binary()}
@type range ::
%{
upper_bound: bound(),
mode: :skip | :fingerprint | :id_list,
payload: nil | binary() | [binary()]
}
@spec protocol_version() :: byte()
def protocol_version, do: @protocol_version
@spec supported_version_message() :: binary()
def supported_version_message, do: <<@protocol_version>>
@spec decode(binary()) :: {:ok, [range()]} | {:unsupported_version, byte()} | {:error, term()}
def decode(<<version, _rest::binary>>) when version != @protocol_version,
do: {:unsupported_version, @protocol_version}
def decode(<<@protocol_version, rest::binary>>) do
decode_ranges(rest, 0, initial_lower_bound(), [])
end
def decode(_message), do: {:error, :invalid_message}
@spec encode([range()]) :: binary()
def encode(ranges) when is_list(ranges) do
ranges
|> drop_trailing_skip_ranges()
|> Enum.reduce({[@protocol_version], 0}, fn range, {acc, previous_timestamp} ->
{encoded_range, next_timestamp} = encode_range(range, previous_timestamp)
{[acc, encoded_range], next_timestamp}
end)
|> elem(0)
|> IO.iodata_to_binary()
end
@spec fingerprint([item()]) :: binary()
def fingerprint(items) when is_list(items) do
sum =
Enum.reduce(items, 0, fn %{id: id}, acc ->
<<id_integer::unsigned-little-size(256)>> = id
rem(acc + id_integer, @u256_mod)
end)
payload = [<<sum::unsigned-little-size(256)>>, encode_varint(length(items))]
payload
|> IO.iodata_to_binary()
|> then(&:crypto.hash(:sha256, &1))
|> binary_part(0, @fingerprint_size)
end
@spec compare_items(item(), item()) :: :lt | :eq | :gt
def compare_items(left, right) do
cond do
left.created_at < right.created_at -> :lt
left.created_at > right.created_at -> :gt
left.id < right.id -> :lt
left.id > right.id -> :gt
true -> :eq
end
end
@spec compare_bound(bound(), bound()) :: :lt | :eq | :gt
def compare_bound(:infinity, :infinity), do: :eq
def compare_bound(:infinity, _other), do: :gt
def compare_bound(_other, :infinity), do: :lt
def compare_bound({left_timestamp, left_id}, {right_timestamp, right_id}) do
cond do
left_timestamp < right_timestamp -> :lt
left_timestamp > right_timestamp -> :gt
left_id < right_id -> :lt
left_id > right_id -> :gt
true -> :eq
end
end
@spec item_in_range?(item(), bound(), bound()) :: boolean()
def item_in_range?(item, lower_bound, upper_bound) do
compare_item_to_bound(item, lower_bound) != :lt and
compare_item_to_bound(item, upper_bound) == :lt
end
@spec initial_lower_bound() :: bound()
def initial_lower_bound, do: {0, @zero_id}
@spec zero_id() :: binary()
def zero_id, do: @zero_id
@spec split_bound(item(), item()) :: bound()
def split_bound(previous_item, next_item)
when is_map(previous_item) and is_map(next_item) do
cond do
previous_item.created_at < next_item.created_at ->
{next_item.created_at, @zero_id}
previous_item.created_at == next_item.created_at ->
prefix_length = shared_prefix_length(previous_item.id, next_item.id) + 1
<<prefix::binary-size(prefix_length), _rest::binary>> = next_item.id
{next_item.created_at, prefix <> :binary.copy(<<0>>, @id_size - prefix_length)}
true ->
raise ArgumentError, "split_bound/2 requires previous_item <= next_item"
end
end
defp decode_ranges(<<>>, _previous_timestamp, _lower_bound, ranges),
do: {:ok, Enum.reverse(ranges)}
defp decode_ranges(binary, previous_timestamp, lower_bound, ranges) do
with {:ok, upper_bound, rest, next_timestamp} <- decode_bound(binary, previous_timestamp),
:ok <- validate_upper_bound(lower_bound, upper_bound),
{:ok, mode, payload, tail} <- decode_payload(rest) do
next_ranges = [%{upper_bound: upper_bound, mode: mode, payload: payload} | ranges]
if upper_bound == :infinity and tail != <<>> do
{:error, :invalid_message}
else
decode_ranges(tail, next_timestamp, upper_bound, next_ranges)
end
end
end
defp validate_upper_bound(lower_bound, upper_bound) do
if compare_bound(lower_bound, upper_bound) == :lt do
:ok
else
{:error, :invalid_message}
end
end
defp decode_bound(binary, previous_timestamp) do
with {:ok, encoded_timestamp, rest} <- decode_varint(binary),
{:ok, length, tail} <- decode_varint(rest),
:ok <- validate_bound_prefix_length(length),
{:ok, prefix, remainder} <- decode_prefix(tail, length) do
decode_bound_value(encoded_timestamp, length, prefix, remainder, previous_timestamp)
end
end
defp decode_payload(binary) do
with {:ok, mode_value, rest} <- decode_varint(binary) do
case mode_value do
0 ->
{:ok, :skip, nil, rest}
1 ->
decode_fingerprint_payload(rest)
2 ->
decode_id_list_payload(rest)
_other ->
{:error, :invalid_message}
end
end
end
defp decode_varint(binary), do: decode_varint(binary, 0)
defp decode_varint(<<>>, _acc), do: {:error, :invalid_message}
defp decode_varint(<<byte, rest::binary>>, acc) do
value = acc * 128 + band(byte, 0x7F)
if band(byte, 0x80) == 0 do
{:ok, value, rest}
else
decode_varint(rest, value)
end
end
defp encode_range(range, previous_timestamp) do
{encoded_bound, next_timestamp} = encode_bound(range.upper_bound, previous_timestamp)
{mode, payload} = encode_payload(range)
{[encoded_bound, mode, payload], next_timestamp}
end
defp encode_bound(:infinity, previous_timestamp),
do: {[encode_varint(0), encode_varint(0)], previous_timestamp}
defp encode_bound({timestamp, id}, previous_timestamp) do
prefix_length = id_prefix_length(id)
<<prefix::binary-size(prefix_length), _rest::binary>> = id
{
[encode_varint(timestamp - previous_timestamp + 1), encode_varint(prefix_length), prefix],
timestamp
}
end
defp encode_payload(%{mode: :skip}) do
{encode_varint(0), <<>>}
end
defp encode_payload(%{mode: :fingerprint, payload: fingerprint})
when is_binary(fingerprint) and byte_size(fingerprint) == @fingerprint_size do
{encode_varint(1), fingerprint}
end
defp encode_payload(%{mode: :id_list, payload: ids}) when is_list(ids) do
encoded_ids = Enum.map(ids, fn id -> validate_id!(id) end)
{encode_varint(2), [encode_varint(length(encoded_ids)), encoded_ids]}
end
defp encode_varint(value) when is_integer(value) and value >= 0 do
digits = collect_base128_digits(value, [])
last_index = length(digits) - 1
digits
|> Enum.with_index()
|> Enum.map(fn {digit, index} ->
if index == last_index do
digit
else
digit + 128
end
end)
|> :erlang.list_to_binary()
end
defp collect_base128_digits(value, acc) do
quotient = div(value, 128)
remainder = rem(value, 128)
if quotient == 0 do
[remainder | acc]
else
collect_base128_digits(quotient, [remainder | acc])
end
end
defp unpack_ids(binary), do: unpack_ids(binary, [])
defp unpack_ids(<<>>, acc), do: Enum.reverse(acc)
defp unpack_ids(<<id::binary-size(@id_size), rest::binary>>, acc),
do: unpack_ids(rest, [id | acc])
defp decode_prefix(binary, length) when byte_size(binary) >= length do
<<prefix::binary-size(length), rest::binary>> = binary
{:ok, prefix, rest}
end
defp decode_prefix(_binary, _length), do: {:error, :invalid_message}
defp decode_bound_value(0, 0, _prefix, remainder, previous_timestamp),
do: {:ok, :infinity, remainder, previous_timestamp}
defp decode_bound_value(0, _length, _prefix, _remainder, _previous_timestamp),
do: {:error, :invalid_message}
defp decode_bound_value(encoded_timestamp, length, prefix, remainder, previous_timestamp) do
timestamp = previous_timestamp + encoded_timestamp - 1
id = prefix <> :binary.copy(<<0>>, @id_size - length)
{:ok, {timestamp, id}, remainder, timestamp}
end
defp decode_fingerprint_payload(<<fingerprint::binary-size(@fingerprint_size), tail::binary>>),
do: {:ok, :fingerprint, fingerprint, tail}
defp decode_fingerprint_payload(_payload), do: {:error, :invalid_message}
defp decode_id_list_payload(rest) do
with {:ok, count, tail} <- decode_varint(rest),
{:ok, ids, remainder} <- decode_id_list_bytes(tail, count) do
{:ok, :id_list, ids, remainder}
end
end
defp decode_id_list_bytes(tail, count) do
expected_bytes = count * @id_size
if byte_size(tail) >= expected_bytes do
<<ids::binary-size(expected_bytes), remainder::binary>> = tail
{:ok, unpack_ids(ids), remainder}
else
{:error, :invalid_message}
end
end
defp validate_bound_prefix_length(length)
when is_integer(length) and length >= 0 and length <= @id_size,
do: :ok
defp validate_bound_prefix_length(_length), do: {:error, :invalid_message}
defp id_prefix_length(id) do
id
|> validate_id!()
|> :binary.bin_to_list()
|> Enum.reverse()
|> Enum.drop_while(&(&1 == 0))
|> length()
end
defp shared_prefix_length(left_id, right_id) do
left_id = validate_id!(left_id)
right_id = validate_id!(right_id)
left_id
|> :binary.bin_to_list()
|> Enum.zip(:binary.bin_to_list(right_id))
|> Enum.reduce_while(0, fn
{left_byte, right_byte}, acc when left_byte == right_byte -> {:cont, acc + 1}
_pair, acc -> {:halt, acc}
end)
end
defp drop_trailing_skip_ranges(ranges) do
ranges
|> Enum.reverse()
|> Enum.drop_while(fn range -> range.mode == :skip end)
|> Enum.reverse()
end
defp compare_item_to_bound(_item, :infinity), do: :lt
defp compare_item_to_bound(item, {timestamp, id}) do
cond do
item.created_at < timestamp -> :lt
item.created_at > timestamp -> :gt
item.id < id -> :lt
item.id > id -> :gt
true -> :eq
end
end
defp validate_id!(id) when is_binary(id) and byte_size(id) == @id_size, do: id
defp validate_id!(_id) do
raise ArgumentError, "negentropy ids must be 32-byte binaries"
end
end

View File

@@ -1,10 +1,13 @@
defmodule Parrhesia.Negentropy.Sessions do
@moduledoc """
In-memory NEG-* session tracking.
In-memory NIP-77 session tracking over bounded local event snapshots.
"""
use GenServer
alias Parrhesia.Negentropy.Engine
alias Parrhesia.Storage
@type session_key :: {pid(), String.t()}
@default_max_payload_bytes 4096
@@ -12,6 +15,8 @@ defmodule Parrhesia.Negentropy.Sessions do
@default_max_total_sessions 10_000
@default_max_idle_seconds 60
@default_sweep_interval_seconds 10
@default_max_items_per_session 50_000
@default_id_list_threshold 32
@sweep_idle_sessions :sweep_idle_sessions
@spec start_link(keyword()) :: GenServer.on_start()
@@ -20,16 +25,19 @@ defmodule Parrhesia.Negentropy.Sessions do
GenServer.start_link(__MODULE__, opts, name: name)
end
@spec open(GenServer.server(), pid(), String.t(), map()) :: {:ok, map()} | {:error, term()}
def open(server \\ __MODULE__, owner_pid, subscription_id, params)
when is_pid(owner_pid) and is_binary(subscription_id) and is_map(params) do
GenServer.call(server, {:open, owner_pid, subscription_id, params})
@spec open(GenServer.server(), pid(), String.t(), map(), binary(), keyword()) ::
{:ok, binary()} | {:error, term()}
def open(server \\ __MODULE__, owner_pid, subscription_id, filter, message, opts \\ [])
when is_pid(owner_pid) and is_binary(subscription_id) and is_map(filter) and
is_binary(message) and is_list(opts) do
GenServer.call(server, {:open, owner_pid, subscription_id, filter, message, opts})
end
@spec message(GenServer.server(), pid(), String.t(), map()) :: {:ok, map()} | {:error, term()}
def message(server \\ __MODULE__, owner_pid, subscription_id, payload)
when is_pid(owner_pid) and is_binary(subscription_id) and is_map(payload) do
GenServer.call(server, {:message, owner_pid, subscription_id, payload})
@spec message(GenServer.server(), pid(), String.t(), binary()) ::
{:ok, binary()} | {:error, term()}
def message(server \\ __MODULE__, owner_pid, subscription_id, message)
when is_pid(owner_pid) and is_binary(subscription_id) and is_binary(message) do
GenServer.call(server, {:message, owner_pid, subscription_id, message})
end
@spec close(GenServer.server(), pid(), String.t()) :: :ok
@@ -63,7 +71,17 @@ defmodule Parrhesia.Negentropy.Sessions do
max_total_sessions:
normalize_positive_integer(Keyword.get(opts, :max_total_sessions), max_total_sessions()),
max_idle_ms: max_idle_ms,
sweep_interval_ms: sweep_interval_ms
sweep_interval_ms: sweep_interval_ms,
max_items_per_session:
normalize_positive_integer(
Keyword.get(opts, :max_items_per_session),
max_items_per_session()
),
id_list_threshold:
normalize_positive_integer(
Keyword.get(opts, :id_list_threshold),
id_list_threshold()
)
}
:ok = schedule_idle_sweep(sweep_interval_ms)
@@ -72,16 +90,19 @@ defmodule Parrhesia.Negentropy.Sessions do
end
@impl true
def handle_call({:open, owner_pid, subscription_id, params}, _from, state) do
def handle_call({:open, owner_pid, subscription_id, filter, message, opts}, _from, state) do
key = {owner_pid, subscription_id}
with :ok <- validate_payload_size(params, state.max_payload_bytes),
:ok <- enforce_session_limits(state, owner_pid, key) do
with :ok <- validate_payload_size(filter, message, state.max_payload_bytes),
:ok <- enforce_session_limits(state, owner_pid, key),
{:ok, refs} <- fetch_event_refs(filter, opts, state.max_items_per_session),
{:ok, response} <-
Engine.answer(refs, message, id_list_threshold: state.id_list_threshold) do
now_ms = System.monotonic_time(:millisecond)
session = %{
cursor: 0,
params: params,
filter: filter,
refs: refs,
opened_at: System.system_time(:second),
last_active_at_ms: now_ms
}
@@ -91,14 +112,14 @@ defmodule Parrhesia.Negentropy.Sessions do
|> ensure_monitor(owner_pid)
|> put_in([:sessions, key], session)
{:reply, {:ok, %{"status" => "open", "cursor" => 0}}, state}
{:reply, {:ok, response}, state}
else
{:error, reason} ->
{:reply, {:error, reason}, state}
end
end
def handle_call({:message, owner_pid, subscription_id, payload}, _from, state) do
def handle_call({:message, owner_pid, subscription_id, message}, _from, state) do
key = {owner_pid, subscription_id}
case Map.get(state.sessions, key) do
@@ -106,20 +127,18 @@ defmodule Parrhesia.Negentropy.Sessions do
{:reply, {:error, :unknown_session}, state}
session ->
case validate_payload_size(payload, state.max_payload_bytes) do
:ok ->
cursor = session.cursor + 1
with :ok <- validate_payload_size(session.filter, message, state.max_payload_bytes),
{:ok, response} <-
Engine.answer(session.refs, message, id_list_threshold: state.id_list_threshold) do
next_session = %{
session
| last_active_at_ms: System.monotonic_time(:millisecond)
}
next_session = %{
session
| cursor: cursor,
last_active_at_ms: System.monotonic_time(:millisecond)
}
state = put_in(state, [:sessions, key], next_session)
{:reply, {:ok, %{"status" => "ack", "cursor" => cursor}}, state}
state = put_in(state, [:sessions, key], next_session)
{:reply, {:ok, response}, state}
else
{:error, reason} ->
{:reply, {:error, reason}, state}
end
@@ -185,6 +204,21 @@ defmodule Parrhesia.Negentropy.Sessions do
def handle_info(_message, state), do: {:noreply, state}
defp fetch_event_refs(filter, opts, max_items_per_session) do
query_opts =
opts
|> Keyword.take([:now, :requester_pubkeys])
|> Keyword.put(:limit, max_items_per_session + 1)
with {:ok, refs} <- Storage.events().query_event_refs(%{}, [filter], query_opts) do
if length(refs) > max_items_per_session do
{:error, :query_too_big}
else
{:ok, refs}
end
end
end
defp clear_monitors_without_sessions(state, owner_pids) do
Enum.reduce(Map.keys(state.monitors), state, fn owner_pid, acc ->
if MapSet.member?(owner_pids, owner_pid) do
@@ -203,8 +237,8 @@ defmodule Parrhesia.Negentropy.Sessions do
end)
end
defp validate_payload_size(payload, max_payload_bytes) do
if :erlang.external_size(payload) <= max_payload_bytes do
defp validate_payload_size(filter, message, max_payload_bytes) do
if :erlang.external_size({filter, message}) <= max_payload_bytes do
:ok
else
{:error, :payload_too_large}
@@ -296,6 +330,18 @@ defmodule Parrhesia.Negentropy.Sessions do
|> Keyword.get(:negentropy_session_sweep_interval_seconds, @default_sweep_interval_seconds)
end
defp max_items_per_session do
:parrhesia
|> Application.get_env(:limits, [])
|> Keyword.get(:max_negentropy_items_per_session, @default_max_items_per_session)
end
defp id_list_threshold do
:parrhesia
|> Application.get_env(:limits, [])
|> Keyword.get(:negentropy_id_list_threshold, @default_id_list_threshold)
end
defp normalize_positive_integer(value, _default) when is_integer(value) and value > 0,
do: value

View File

@@ -14,8 +14,8 @@ defmodule Parrhesia.Protocol do
| {:close, String.t()}
| {:auth, event()}
| {:count, String.t(), [filter()], map()}
| {:neg_open, String.t(), map()}
| {:neg_msg, String.t(), map()}
| {:neg_open, String.t(), filter(), binary()}
| {:neg_msg, String.t(), binary()}
| {:neg_close, String.t()}
@type relay_message ::
@@ -26,7 +26,8 @@ defmodule Parrhesia.Protocol do
| {:event, String.t(), event()}
| {:auth, String.t()}
| {:count, String.t(), map()}
| {:neg_msg, String.t(), map()}
| {:neg_msg, String.t(), String.t()}
| {:neg_err, String.t(), String.t()}
@type decode_error ::
:invalid_json
@@ -122,21 +123,25 @@ defmodule Parrhesia.Protocol do
defp decode_message(["AUTH", _invalid]), do: {:error, :invalid_auth}
defp decode_message(["NEG-OPEN", subscription_id, payload])
when is_binary(subscription_id) and is_map(payload) do
if valid_subscription_id?(subscription_id) do
{:ok, {:neg_open, subscription_id, payload}}
defp decode_message(["NEG-OPEN", subscription_id, filter, initial_message])
when is_binary(subscription_id) and is_map(filter) and is_binary(initial_message) do
with true <- valid_subscription_id?(subscription_id),
{:ok, decoded_message} <- decode_negentropy_hex(initial_message) do
{:ok, {:neg_open, subscription_id, filter, decoded_message}}
else
{:error, :invalid_subscription_id}
false -> {:error, :invalid_subscription_id}
{:error, _reason} -> {:error, :invalid_negentropy}
end
end
defp decode_message(["NEG-MSG", subscription_id, payload])
when is_binary(subscription_id) and is_map(payload) do
if valid_subscription_id?(subscription_id) do
{:ok, {:neg_msg, subscription_id, payload}}
when is_binary(subscription_id) and is_binary(payload) do
with true <- valid_subscription_id?(subscription_id),
{:ok, decoded_payload} <- decode_negentropy_hex(payload) do
{:ok, {:neg_msg, subscription_id, decoded_payload}}
else
{:error, :invalid_subscription_id}
false -> {:error, :invalid_subscription_id}
{:error, _reason} -> {:error, :invalid_negentropy}
end
end
@@ -215,7 +220,19 @@ defmodule Parrhesia.Protocol do
defp relay_frame({:neg_msg, subscription_id, payload}),
do: ["NEG-MSG", subscription_id, payload]
defp relay_frame({:neg_err, subscription_id, reason}),
do: ["NEG-ERR", subscription_id, reason]
defp valid_subscription_id?(subscription_id) do
subscription_id != "" and String.length(subscription_id) <= 64
end
defp decode_negentropy_hex(payload) when is_binary(payload) and payload != "" do
case Base.decode16(payload, case: :mixed) do
{:ok, decoded} when decoded != <<>> -> {:ok, decoded}
_other -> {:error, :invalid_negentropy}
end
end
defp decode_negentropy_hex(_payload), do: {:error, :invalid_negentropy}
end

View File

@@ -55,6 +55,24 @@ defmodule Parrhesia.Storage.Adapters.Memory.Events do
end
end
@impl true
def query_event_refs(context, filters, opts) do
with {:ok, events} <- query(context, filters, opts) do
refs =
events
|> Enum.map(fn event ->
%{
created_at: Map.fetch!(event, "created_at"),
id: Base.decode16!(Map.fetch!(event, "id"), case: :mixed)
}
end)
|> Enum.sort(&(compare_event_refs(&1, &2) != :gt))
|> maybe_limit_event_refs(opts)
{:ok, refs}
end
end
@impl true
def count(context, filters, opts) do
with {:ok, events} <- query(context, filters, opts) do
@@ -189,4 +207,21 @@ defmodule Parrhesia.Storage.Adapters.Memory.Events do
_tag -> false
end)
end
defp compare_event_refs(left, right) do
cond do
left.created_at < right.created_at -> :lt
left.created_at > right.created_at -> :gt
left.id < right.id -> :lt
left.id > right.id -> :gt
true -> :eq
end
end
defp maybe_limit_event_refs(refs, opts) do
case Keyword.get(opts, :limit) do
limit when is_integer(limit) and limit > 0 -> Enum.take(refs, limit)
_other -> refs
end
end
end

View File

@@ -94,6 +94,31 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Events do
def query(_context, _filters, _opts), do: {:error, :invalid_opts}
@impl true
def query_event_refs(_context, filters, opts) when is_list(opts) do
with :ok <- Filter.validate_filters(filters) do
now = Keyword.get(opts, :now, System.system_time(:second))
refs =
filters
|> event_ref_union_query_for_filters(now, opts)
|> subquery()
|> then(fn union_query ->
from(ref in union_query,
group_by: [ref.created_at, ref.id],
order_by: [asc: ref.created_at, asc: ref.id],
select: %{created_at: ref.created_at, id: ref.id}
)
end)
|> maybe_limit_query(Keyword.get(opts, :limit))
|> Repo.all()
{:ok, refs}
end
end
def query_event_refs(_context, _filters, _opts), do: {:error, :invalid_opts}
@impl true
def count(_context, filters, opts) when is_list(opts) do
with :ok <- Filter.validate_filters(filters) do
@@ -665,6 +690,40 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Events do
end)
end
defp event_ref_query_for_filter(filter, now, opts) do
from(event in "events",
where: is_nil(event.deleted_at) and (is_nil(event.expires_at) or event.expires_at > ^now),
order_by: [asc: event.created_at, asc: event.id],
select: %{
created_at: event.created_at,
id: event.id
}
)
|> maybe_filter_ids(Map.get(filter, "ids"))
|> maybe_filter_authors(Map.get(filter, "authors"))
|> maybe_filter_kinds(Map.get(filter, "kinds"))
|> maybe_filter_since(Map.get(filter, "since"))
|> maybe_filter_until(Map.get(filter, "until"))
|> maybe_filter_search(Map.get(filter, "search"))
|> filter_by_tags(filter)
|> maybe_restrict_giftwrap_access(filter, opts)
|> maybe_limit_query(effective_filter_limit(filter, opts))
end
defp event_ref_union_query_for_filters([], now, _opts) do
from(event in "events",
where: event.created_at > ^now and event.created_at < ^now,
select: %{created_at: event.created_at, id: event.id}
)
end
defp event_ref_union_query_for_filters([first_filter | rest_filters], now, opts) do
Enum.reduce(rest_filters, event_ref_query_for_filter(first_filter, now, opts), fn filter,
acc ->
union_all(acc, ^event_ref_query_for_filter(filter, now, opts))
end)
end
defp maybe_filter_ids(query, nil), do: query
defp maybe_filter_ids(query, ids) do

View File

@@ -7,6 +7,7 @@ defmodule Parrhesia.Storage.Events do
@type event_id :: binary()
@type event :: map()
@type filter :: map()
@type event_ref :: %{created_at: non_neg_integer(), id: binary()}
@type query_opts :: keyword()
@type count_result :: non_neg_integer() | %{optional(atom()) => term()}
@type reason :: term()
@@ -14,6 +15,8 @@ defmodule Parrhesia.Storage.Events do
@callback put_event(context(), event()) :: {:ok, event()} | {:error, reason()}
@callback get_event(context(), event_id()) :: {:ok, event() | nil} | {:error, reason()}
@callback query(context(), [filter()], query_opts()) :: {:ok, [event()]} | {:error, reason()}
@callback query_event_refs(context(), [filter()], query_opts()) ::
{:ok, [event_ref()]} | {:error, reason()}
@callback count(context(), [filter()], query_opts()) ::
{:ok, count_result()} | {:error, reason()}
@callback delete_by_request(context(), event()) :: {:ok, non_neg_integer()} | {:error, reason()}

View File

@@ -12,6 +12,9 @@ defmodule Parrhesia.TestSupport.ExpirationStubEvents do
@impl true
def query(_context, _filters, _opts), do: {:ok, []}
@impl true
def query_event_refs(_context, _filters, _opts), do: {:ok, []}
@impl true
def count(_context, _filters, _opts), do: {:ok, 0}

View File

@@ -12,6 +12,9 @@ defmodule Parrhesia.TestSupport.FailingEvents do
@impl true
def query(_context, _filters, _opts), do: {:error, :db_down}
@impl true
def query_event_refs(_context, _filters, _opts), do: {:error, :db_down}
@impl true
def count(_context, _filters, _opts), do: {:error, :db_down}

View File

@@ -160,11 +160,11 @@ defmodule Parrhesia.Web.Connection do
defp handle_decoded_message({:auth, auth_event}, state), do: handle_auth(state, auth_event)
defp handle_decoded_message({:neg_open, subscription_id, payload}, state),
do: handle_neg_open(state, subscription_id, payload)
defp handle_decoded_message({:neg_open, subscription_id, filter, message}, state),
do: handle_neg_open(state, subscription_id, filter, message)
defp handle_decoded_message({:neg_msg, subscription_id, payload}, state),
do: handle_neg_msg(state, subscription_id, payload)
defp handle_decoded_message({:neg_msg, subscription_id, message}, state),
do: handle_neg_msg(state, subscription_id, message)
defp handle_decoded_message({:neg_close, subscription_id}, state),
do: handle_neg_close(state, subscription_id)
@@ -447,34 +447,41 @@ defmodule Parrhesia.Web.Connection do
end
end
defp handle_neg_open(%__MODULE__{} = state, subscription_id, payload) do
case maybe_open_negentropy(state, subscription_id, payload) do
{:ok, message} ->
response = Protocol.encode_relay({:neg_msg, subscription_id, message})
{:push, {:text, response}, state}
defp handle_neg_open(%__MODULE__{} = state, subscription_id, filter, message) do
with :ok <- Filter.validate_filters([filter]),
:ok <- EventPolicy.authorize_read([filter], state.authenticated_pubkeys),
{:ok, response_message} <-
maybe_open_negentropy(state, subscription_id, filter, message) do
response =
response_message
|> Base.encode16(case: :lower)
|> then(&Protocol.encode_relay({:neg_msg, subscription_id, &1}))
{:push, {:text, response}, state}
else
{:error, reason} ->
response = Protocol.encode_relay({:closed, subscription_id, "error: #{inspect(reason)}"})
{:push, {:text, response}, state}
negentropy_error_response(state, subscription_id, reason)
end
end
defp handle_neg_msg(%__MODULE__{} = state, subscription_id, payload) do
case maybe_negentropy_message(state, subscription_id, payload) do
{:ok, message} ->
response = Protocol.encode_relay({:neg_msg, subscription_id, message})
defp handle_neg_msg(%__MODULE__{} = state, subscription_id, message) do
case maybe_negentropy_message(state, subscription_id, message) do
{:ok, response_message} ->
response =
response_message
|> Base.encode16(case: :lower)
|> then(&Protocol.encode_relay({:neg_msg, subscription_id, &1}))
{:push, {:text, response}, state}
{:error, reason} ->
response = Protocol.encode_relay({:closed, subscription_id, "error: #{inspect(reason)}"})
{:push, {:text, response}, state}
negentropy_error_response(state, subscription_id, reason)
end
end
defp handle_neg_close(%__MODULE__{} = state, subscription_id) do
:ok = maybe_close_negentropy(state, subscription_id)
response = Protocol.encode_relay({:neg_msg, subscription_id, %{"status" => "closed"}})
{:push, {:text, response}, state}
{:ok, state}
end
defp maybe_process_group_event(event) do
@@ -664,6 +671,91 @@ defmodule Parrhesia.Web.Connection do
with_auth_challenge_frame(state, {:push, {:text, response}, state})
end
defp negentropy_error_response(state, subscription_id, reason) do
response =
Protocol.encode_relay({
:neg_err,
subscription_id,
negentropy_error_message(reason)
})
if reason in [:auth_required, :restricted_giftwrap] do
with_auth_challenge_frame(state, {:push, {:text, response}, state})
else
{:push, {:text, response}, state}
end
end
defp negentropy_error_message(reason)
when reason in [
:invalid_filters,
:empty_filters,
:too_many_filters,
:invalid_filter,
:invalid_filter_key,
:invalid_ids,
:invalid_authors,
:invalid_kinds,
:invalid_since,
:invalid_until,
:invalid_limit,
:invalid_search,
:invalid_tag_filter,
:auth_required,
:restricted_giftwrap,
:marmot_group_h_tag_required,
:marmot_group_h_values_exceeded,
:marmot_group_filter_window_too_wide
],
do: negentropy_policy_or_filter_error_message(reason)
defp negentropy_error_message(:negentropy_disabled),
do: "blocked: negentropy is disabled"
defp negentropy_error_message(:negentropy_unavailable),
do: "blocked: negentropy is unavailable"
defp negentropy_error_message(:payload_too_large),
do: "blocked: negentropy payload exceeds configured limit"
defp negentropy_error_message(:session_limit_reached),
do: "blocked: maximum negentropy sessions reached"
defp negentropy_error_message(:owner_session_limit_reached),
do: "blocked: maximum negentropy sessions per connection reached"
defp negentropy_error_message(:query_too_big),
do: "blocked: negentropy query is too big"
defp negentropy_error_message(:unknown_session),
do: "closed: negentropy subscription is not open"
defp negentropy_error_message(:invalid_message),
do: "invalid: invalid negentropy message"
defp negentropy_error_message(reason) when is_binary(reason), do: reason
defp negentropy_error_message(reason), do: "error: #{inspect(reason)}"
defp negentropy_policy_or_filter_error_message(reason)
when reason in [
:invalid_filters,
:empty_filters,
:too_many_filters,
:invalid_filter,
:invalid_filter_key,
:invalid_ids,
:invalid_authors,
:invalid_kinds,
:invalid_since,
:invalid_until,
:invalid_limit,
:invalid_search,
:invalid_tag_filter
],
do: Filter.error_message(reason)
defp negentropy_policy_or_filter_error_message(reason), do: EventPolicy.error_message(reason)
defp validate_auth_event(%__MODULE__{} = state, %{"kind" => 22_242} = auth_event) do
tags = Map.get(auth_event, "tags", [])
@@ -776,15 +868,31 @@ defmodule Parrhesia.Web.Connection do
:exit, _reason -> :ok
end
defp maybe_open_negentropy(%__MODULE__{negentropy_sessions: nil}, _subscription_id, _payload),
do: {:error, :negentropy_disabled}
defp maybe_open_negentropy(
%__MODULE__{negentropy_sessions: nil},
_subscription_id,
_filter,
_message
),
do: {:error, :negentropy_disabled}
defp maybe_open_negentropy(
%__MODULE__{negentropy_sessions: negentropy_sessions},
%__MODULE__{
negentropy_sessions: negentropy_sessions,
authenticated_pubkeys: authenticated_pubkeys
},
subscription_id,
payload
filter,
message
) do
Sessions.open(negentropy_sessions, self(), subscription_id, payload)
Sessions.open(
negentropy_sessions,
self(),
subscription_id,
filter,
message,
requester_pubkeys: MapSet.to_list(authenticated_pubkeys)
)
catch
:exit, _reason -> {:error, :negentropy_unavailable}
end
@@ -799,9 +907,9 @@ defmodule Parrhesia.Web.Connection do
defp maybe_negentropy_message(
%__MODULE__{negentropy_sessions: negentropy_sessions},
subscription_id,
payload
message
) do
Sessions.message(negentropy_sessions, self(), subscription_id, payload)
Sessions.message(negentropy_sessions, self(), subscription_id, message)
catch
:exit, _reason -> {:error, :negentropy_unavailable}
end

View File

@@ -0,0 +1,42 @@
defmodule Parrhesia.Negentropy.EngineTest do
use ExUnit.Case, async: true
alias Parrhesia.Negentropy.Engine
alias Parrhesia.Negentropy.Message
test "returns exact id list for small mismatched ranges" do
server_items = [
%{created_at: 10, id: <<1::size(256)>>},
%{created_at: 11, id: <<2::size(256)>>}
]
assert {:ok, response} = Engine.answer(server_items, Engine.initial_message([]))
assert {:ok, [%{mode: :id_list, payload: ids, upper_bound: :infinity}]} =
Message.decode(response)
assert ids == Enum.map(server_items, & &1.id)
end
test "splits large mismatched fingerprint ranges" do
client_items =
Enum.map(1..4, fn idx ->
%{created_at: 100 + idx, id: <<idx::size(256)>>}
end)
server_items =
client_items ++ [%{created_at: 200, id: <<99::size(256)>>}]
initial_message = Engine.initial_message(client_items, id_list_threshold: 1)
assert {:ok, response} = Engine.answer(server_items, initial_message, id_list_threshold: 1)
assert {:ok, ranges} = Message.decode(response)
assert Enum.all?(ranges, &(&1.mode in [:fingerprint, :id_list]))
assert length(ranges) >= 2
end
test "downgrades unsupported versions" do
assert {:ok, <<0x61>>} = Engine.answer([], <<0x62>>)
end
end

View File

@@ -0,0 +1,28 @@
defmodule Parrhesia.Negentropy.MessageTest do
use ExUnit.Case, async: true
alias Parrhesia.Negentropy.Message
test "encodes and decodes mixed range messages" do
first_id = <<1::size(256)>>
second_id = <<2::size(256)>>
boundary =
Message.split_bound(%{created_at: 10, id: first_id}, %{created_at: 10, id: second_id})
ranges = [
%{upper_bound: boundary, mode: :fingerprint, payload: <<0::size(128)>>},
%{upper_bound: {11, Message.zero_id()}, mode: :id_list, payload: [second_id]},
%{upper_bound: :infinity, mode: :skip, payload: nil}
]
assert {:ok, decoded_ranges} = ranges |> Message.encode() |> Message.decode()
assert decoded_ranges ==
Enum.reject(ranges, &(&1.mode == :skip and &1.upper_bound == :infinity))
end
test "rejects malformed bounds and payloads" do
assert {:error, :invalid_message} = Message.decode(<<0x61, 0x00, 0x01, 0x02>>)
end
end

View File

@@ -1,19 +1,64 @@
defmodule Parrhesia.Negentropy.SessionsTest do
use ExUnit.Case, async: true
use ExUnit.Case, async: false
alias Ecto.Adapters.SQL.Sandbox
alias Parrhesia.Negentropy.Engine
alias Parrhesia.Negentropy.Message
alias Parrhesia.Negentropy.Sessions
alias Parrhesia.Protocol.EventValidator
alias Parrhesia.Repo
alias Parrhesia.Storage.Adapters.Postgres.Events
test "opens, advances and closes sessions" do
setup_all do
if is_nil(Process.whereis(Repo)) do
start_supervised!(Repo)
end
Sandbox.mode(Repo, :manual)
:ok
end
setup do
:ok = Sandbox.checkout(Repo)
:ok
end
test "opens, responds, advances and closes sessions" do
server = start_supervised!({Sessions, name: nil})
Sandbox.allow(Repo, self(), server)
assert {:ok, %{"status" => "open", "cursor" => 0}} =
Sessions.open(server, self(), "sub-neg", %{"cursor" => 0})
first =
persist_event(%{
"created_at" => 1_700_100_000,
"content" => "neg-1"
})
assert {:ok, %{"status" => "ack", "cursor" => 1}} =
Sessions.message(server, self(), "sub-neg", %{"delta" => "abc"})
second =
persist_event(%{
"created_at" => 1_700_100_001,
"content" => "neg-2"
})
initial_message = Engine.initial_message([])
assert {:ok, response_message} =
Sessions.open(server, self(), "sub-neg", %{"kinds" => [1]}, initial_message)
assert {:ok, [%{mode: :id_list, payload: ids, upper_bound: :infinity}]} =
Message.decode(response_message)
assert ids == [
Base.decode16!(first["id"], case: :mixed),
Base.decode16!(second["id"], case: :mixed)
]
{:ok, refs} = Events.query_event_refs(%{}, [%{"kinds" => [1]}], [])
matching_message = Engine.initial_message(refs)
assert {:ok, <<0x61>>} = Sessions.message(server, self(), "sub-neg", matching_message)
assert :ok = Sessions.close(server, self(), "sub-neg")
assert {:error, :unknown_session} = Sessions.message(server, self(), "sub-neg", %{})
assert {:error, :unknown_session} = Sessions.message(server, self(), "sub-neg", <<0x61>>)
end
test "rejects oversized NEG payloads" do
@@ -28,8 +73,16 @@ defmodule Parrhesia.Negentropy.SessionsTest do
sweep_interval_seconds: 60}
)
Sandbox.allow(Repo, self(), server)
assert {:error, :payload_too_large} =
Sessions.open(server, self(), "sub-neg", %{"delta" => String.duplicate("a", 256)})
Sessions.open(
server,
self(),
"sub-neg",
%{"kinds" => [1]},
String.duplicate(<<0x61>>, 128)
)
end
test "enforces per-owner session limits" do
@@ -44,10 +97,60 @@ defmodule Parrhesia.Negentropy.SessionsTest do
sweep_interval_seconds: 60}
)
assert {:ok, %{"status" => "open", "cursor" => 0}} =
Sessions.open(server, self(), "sub-1", %{})
Sandbox.allow(Repo, self(), server)
assert {:ok, _response} =
Sessions.open(server, self(), "sub-1", %{"kinds" => [1]}, Engine.initial_message([]))
assert {:error, :owner_session_limit_reached} =
Sessions.open(server, self(), "sub-2", %{})
Sessions.open(server, self(), "sub-2", %{"kinds" => [1]}, Engine.initial_message([]))
end
test "blocks queries larger than the configured session snapshot limit" do
server =
start_supervised!(
{Sessions,
name: nil,
max_payload_bytes: 1024,
max_sessions_per_owner: 8,
max_total_sessions: 16,
max_idle_seconds: 60,
sweep_interval_seconds: 60,
max_items_per_session: 1}
)
Sandbox.allow(Repo, self(), server)
persist_event(%{"created_at" => 1_700_200_000, "content" => "first"})
persist_event(%{"created_at" => 1_700_200_001, "content" => "second"})
assert {:error, :query_too_big} =
Sessions.open(
server,
self(),
"sub-neg",
%{"kinds" => [1]},
Engine.initial_message([])
)
end
defp persist_event(overrides) do
event = build_event(overrides)
assert {:ok, persisted_event} = Events.put_event(%{}, event)
persisted_event
end
defp build_event(overrides) do
base_event = %{
"pubkey" => String.duplicate("1", 64),
"created_at" => System.system_time(:second),
"kind" => 1,
"tags" => [],
"content" => "negentropy-test",
"sig" => String.duplicate("2", 128)
}
event = Map.merge(base_event, overrides)
Map.put(event, "id", EventValidator.compute_id(event))
end
end

View File

@@ -41,11 +41,13 @@ defmodule Parrhesia.ProtocolTest do
assert {:ok, {:auth, ^auth_event}} =
Protocol.decode_client(JSON.encode!(["AUTH", auth_event]))
assert {:ok, {:neg_open, "sub-neg", %{"cursor" => 0}}} =
Protocol.decode_client(JSON.encode!(["NEG-OPEN", "sub-neg", %{"cursor" => 0}]))
assert {:ok, {:neg_open, "sub-neg", %{"kinds" => [1]}, <<0x61>>}} =
Protocol.decode_client(
JSON.encode!(["NEG-OPEN", "sub-neg", %{"kinds" => [1]}, "61"])
)
assert {:ok, {:neg_msg, "sub-neg", %{"delta" => "abc"}}} =
Protocol.decode_client(JSON.encode!(["NEG-MSG", "sub-neg", %{"delta" => "abc"}]))
assert {:ok, {:neg_msg, "sub-neg", <<0x61, 0x00>>}} =
Protocol.decode_client(JSON.encode!(["NEG-MSG", "sub-neg", "6100"]))
assert {:ok, {:neg_close, "sub-neg"}} =
Protocol.decode_client(JSON.encode!(["NEG-CLOSE", "sub-neg"]))
@@ -90,6 +92,12 @@ defmodule Parrhesia.ProtocolTest do
count_frame = Protocol.encode_relay({:count, "sub-1", %{"count" => 1}})
assert JSON.decode!(count_frame) == ["COUNT", "sub-1", %{"count" => 1}]
neg_message_frame = Protocol.encode_relay({:neg_msg, "sub-neg", "61"})
assert JSON.decode!(neg_message_frame) == ["NEG-MSG", "sub-neg", "61"]
neg_error_frame = Protocol.encode_relay({:neg_err, "sub-neg", "closed: too slow"})
assert JSON.decode!(neg_error_frame) == ["NEG-ERR", "sub-neg", "closed: too slow"]
end
defp valid_event do

View File

@@ -8,12 +8,23 @@ defmodule Parrhesia.Storage.Adapters.Memory.AdapterTest do
test "memory adapter supports basic behavior contract operations" do
event_id = String.duplicate("a", 64)
event = %{"id" => event_id, "pubkey" => "pk", "kind" => 1, "tags" => [], "content" => "hello"}
event = %{
"id" => event_id,
"pubkey" => "pk",
"created_at" => 1_700_000_000,
"kind" => 1,
"tags" => [],
"content" => "hello"
}
assert {:ok, _event} = Events.put_event(%{}, event)
assert {:ok, [result]} = Events.query(%{}, [%{"ids" => [event_id]}], [])
assert result["id"] == event_id
assert {:ok, [%{created_at: 1_700_000_000, id: <<_::size(256)>>}]} =
Events.query_event_refs(%{}, [%{"ids" => [event_id]}], [])
assert :ok = Moderation.ban_pubkey(%{}, "pk")
assert {:ok, true} = Moderation.pubkey_banned?(%{}, "pk")

View File

@@ -106,6 +106,37 @@ defmodule Parrhesia.Storage.Adapters.Postgres.EventsQueryCountTest do
assert Enum.map(results, & &1["id"]) == [newest["id"], tie_winner_id]
end
test "query_event_refs/3 returns sorted lightweight refs for negentropy" do
author = String.duplicate("9", 64)
later =
persist_event(%{
"pubkey" => author,
"created_at" => 1_700_000_510,
"kind" => 1,
"content" => "later"
})
earlier =
persist_event(%{
"pubkey" => author,
"created_at" => 1_700_000_500,
"kind" => 1,
"content" => "earlier"
})
assert {:ok, refs} =
Events.query_event_refs(%{}, [%{"authors" => [author], "kinds" => [1]}], [])
assert refs == [
%{
created_at: earlier["created_at"],
id: Base.decode16!(earlier["id"], case: :mixed)
},
%{created_at: later["created_at"], id: Base.decode16!(later["id"], case: :mixed)}
]
end
test "count/3 ORs filters, deduplicates matches and respects tag filters" do
now = 1_700_001_000
target_pubkey = String.duplicate("f", 64)

View File

@@ -3,7 +3,16 @@ defmodule Parrhesia.Storage.BehaviourContractsTest do
test "events behavior exposes expected callbacks" do
assert callback_names(Parrhesia.Storage.Events) ==
[:count, :delete_by_request, :get_event, :purge_expired, :put_event, :query, :vanish]
[
:count,
:delete_by_request,
:get_event,
:purge_expired,
:put_event,
:query,
:query_event_refs,
:vanish
]
end
test "moderation behavior exposes expected callbacks" do

View File

@@ -2,6 +2,8 @@ defmodule Parrhesia.Web.ConnectionTest do
use ExUnit.Case, async: false
alias Ecto.Adapters.SQL.Sandbox
alias Parrhesia.Negentropy.Engine
alias Parrhesia.Negentropy.Message
alias Parrhesia.Protocol.EventValidator
alias Parrhesia.Repo
alias Parrhesia.Web.Connection
@@ -435,23 +437,109 @@ defmodule Parrhesia.Web.ConnectionTest do
]
end
test "NEG sessions open and close" do
state = connection_state()
test "NEG sessions open, return reconciliation payloads and close silently" do
negentropy_sessions =
start_supervised!(
{Parrhesia.Negentropy.Sessions,
name: nil,
max_payload_bytes: 1024,
max_sessions_per_owner: 8,
max_total_sessions: 16,
max_idle_seconds: 60,
sweep_interval_seconds: 60}
)
open_payload = JSON.encode!(["NEG-OPEN", "neg-1", %{"cursor" => 0}])
Sandbox.allow(Repo, self(), negentropy_sessions)
state = connection_state(negentropy_sessions: negentropy_sessions)
first =
valid_event(%{
"created_at" => 1_700_300_000,
"content" => "neg-a"
})
second =
valid_event(%{
"created_at" => 1_700_300_001,
"content" => "neg-b"
})
assert {:push, {:text, _response}, _next_state} =
Connection.handle_in({JSON.encode!(["EVENT", first]), [opcode: :text]}, state)
assert {:push, {:text, _response}, _next_state} =
Connection.handle_in({JSON.encode!(["EVENT", second]), [opcode: :text]}, state)
open_payload =
JSON.encode!([
"NEG-OPEN",
"neg-1",
%{"kinds" => [1]},
Base.encode16(Engine.initial_message([]), case: :lower)
])
assert {:push, {:text, open_response}, _next_state} =
Connection.handle_in({open_payload, [opcode: :text]}, state)
assert ["NEG-MSG", "neg-1", %{"status" => "open", "cursor" => 0}] =
JSON.decode!(open_response)
assert ["NEG-MSG", "neg-1", response_hex] = JSON.decode!(open_response)
assert {:ok, [%{mode: :id_list, payload: ids, upper_bound: :infinity}]} =
response_hex |> Base.decode16!(case: :mixed) |> Message.decode()
assert ids == [
Base.decode16!(first["id"], case: :mixed),
Base.decode16!(second["id"], case: :mixed)
]
close_payload = JSON.encode!(["NEG-CLOSE", "neg-1"])
assert {:push, {:text, close_response}, _next_state} =
assert {:ok, _next_state} =
Connection.handle_in({close_payload, [opcode: :text]}, state)
end
assert JSON.decode!(close_response) == ["NEG-MSG", "neg-1", %{"status" => "closed"}]
test "NEG sessions return NEG-ERR for oversized snapshots" do
negentropy_sessions =
start_supervised!(
{Parrhesia.Negentropy.Sessions,
name: nil,
max_payload_bytes: 1024,
max_sessions_per_owner: 8,
max_total_sessions: 16,
max_idle_seconds: 60,
sweep_interval_seconds: 60,
max_items_per_session: 1}
)
Sandbox.allow(Repo, self(), negentropy_sessions)
state = connection_state(negentropy_sessions: negentropy_sessions)
first = valid_event(%{"created_at" => 1_700_301_000, "content" => "neg-big-a"})
second = valid_event(%{"created_at" => 1_700_301_001, "content" => "neg-big-b"})
assert {:push, {:text, _response}, _next_state} =
Connection.handle_in({JSON.encode!(["EVENT", first]), [opcode: :text]}, state)
assert {:push, {:text, _response}, _next_state} =
Connection.handle_in({JSON.encode!(["EVENT", second]), [opcode: :text]}, state)
open_payload =
JSON.encode!([
"NEG-OPEN",
"neg-oversized",
%{"kinds" => [1]},
Base.encode16(Engine.initial_message([]), case: :lower)
])
assert {:push, {:text, response}, _next_state} =
Connection.handle_in({open_payload, [opcode: :text]}, state)
assert JSON.decode!(response) == [
"NEG-ERR",
"neg-oversized",
"blocked: negentropy query is too big"
]
end
test "CLOSE removes subscription and replies with CLOSED" do