Extract API events and stream layers
Some checks failed
CI / Test (OTP 27.2 / Elixir 1.18.2) (push) Failing after 1s
CI / Test (OTP 28.4 / Elixir 1.19.4 + Marmot E2E) (push) Failing after 1s

This commit is contained in:
2026-03-16 20:21:58 +01:00
parent 5d4d181d00
commit d119d21d99
8 changed files with 879 additions and 276 deletions

373
lib/parrhesia/api/events.ex Normal file
View File

@@ -0,0 +1,373 @@
defmodule Parrhesia.API.Events do
@moduledoc """
Canonical event publish, query, and count API.
"""
alias Parrhesia.API.Events.PublishResult
alias Parrhesia.API.RequestContext
alias Parrhesia.Fanout.MultiNode
alias Parrhesia.Groups.Flow
alias Parrhesia.Policy.EventPolicy
alias Parrhesia.Protocol
alias Parrhesia.Protocol.Filter
alias Parrhesia.Storage
alias Parrhesia.Subscriptions.Index
alias Parrhesia.Telemetry
@default_max_event_bytes 262_144
@marmot_kinds MapSet.new([
443,
444,
445,
1059,
10_050,
10_051,
446,
447,
448,
449
])
@spec publish(map(), keyword()) :: {:ok, PublishResult.t()} | {:error, term()}
def publish(event, opts \\ [])
def publish(event, opts) when is_map(event) and is_list(opts) do
started_at = System.monotonic_time()
event_id = Map.get(event, "id", "")
with {:ok, context} <- fetch_context(opts),
:ok <- validate_event_payload_size(event, max_event_bytes(opts)),
:ok <- Protocol.validate_event(event),
:ok <- EventPolicy.authorize_write(event, context.authenticated_pubkeys, context),
:ok <- maybe_process_group_event(event),
{:ok, _stored, message} <- persist_event(event) do
Telemetry.emit(
[:parrhesia, :ingest, :stop],
%{duration: System.monotonic_time() - started_at},
telemetry_metadata_for_event(event)
)
fanout_event(event)
maybe_publish_multi_node(event)
{:ok,
%PublishResult{
event_id: event_id,
accepted: true,
message: message,
reason: nil
}}
else
{:error, :invalid_context} = error ->
error
{:error, reason} ->
{:ok,
%PublishResult{
event_id: event_id,
accepted: false,
message: error_message_for_publish_failure(reason),
reason: reason
}}
end
end
def publish(_event, _opts), do: {:error, :invalid_event}
@spec query([map()], keyword()) :: {:ok, [map()]} | {:error, term()}
def query(filters, opts \\ [])
def query(filters, opts) when is_list(filters) and is_list(opts) do
started_at = System.monotonic_time()
with {:ok, context} <- fetch_context(opts),
:ok <- maybe_validate_filters(filters, opts),
:ok <- maybe_authorize_read(filters, context, opts),
{:ok, events} <- Storage.events().query(%{}, filters, storage_query_opts(context, opts)) do
Telemetry.emit(
[:parrhesia, :query, :stop],
%{duration: System.monotonic_time() - started_at},
telemetry_metadata_for_filters(filters)
)
{:ok, events}
end
end
def query(_filters, _opts), do: {:error, :invalid_filters}
@spec count([map()], keyword()) :: {:ok, non_neg_integer() | map()} | {:error, term()}
def count(filters, opts \\ [])
def count(filters, opts) when is_list(filters) and is_list(opts) do
started_at = System.monotonic_time()
with {:ok, context} <- fetch_context(opts),
:ok <- maybe_validate_filters(filters, opts),
:ok <- maybe_authorize_read(filters, context, opts),
{:ok, count} <-
Storage.events().count(%{}, filters, requester_pubkeys: requester_pubkeys(context)),
{:ok, result} <- maybe_build_count_result(filters, count, Keyword.get(opts, :options)) do
Telemetry.emit(
[:parrhesia, :query, :stop],
%{duration: System.monotonic_time() - started_at},
telemetry_metadata_for_filters(filters)
)
{:ok, result}
end
end
def count(_filters, _opts), do: {:error, :invalid_filters}
defp maybe_validate_filters(filters, opts) do
if Keyword.get(opts, :validate_filters?, true) do
Filter.validate_filters(filters)
else
:ok
end
end
defp maybe_authorize_read(filters, context, opts) do
if Keyword.get(opts, :authorize_read?, true) do
EventPolicy.authorize_read(filters, context.authenticated_pubkeys, context)
else
:ok
end
end
defp storage_query_opts(context, opts) do
[
max_filter_limit:
Keyword.get(opts, :max_filter_limit, Parrhesia.Config.get([:limits, :max_filter_limit])),
requester_pubkeys: requester_pubkeys(context)
]
end
defp requester_pubkeys(%RequestContext{} = context),
do: MapSet.to_list(context.authenticated_pubkeys)
defp maybe_build_count_result(_filters, count, nil) when is_integer(count), do: {:ok, count}
defp maybe_build_count_result(filters, count, options)
when is_integer(count) and is_map(options) do
build_count_payload(filters, count, options)
end
defp maybe_build_count_result(_filters, count, _options) when is_integer(count),
do: {:ok, count}
defp maybe_build_count_result(_filters, count, _options), do: {:ok, count}
defp build_count_payload(filters, count, options) do
include_hll? =
Map.get(options, "hll", false) and Parrhesia.Config.get([:features, :nip_45_count], true)
payload = %{"count" => count, "approximate" => false}
payload =
if include_hll? do
Map.put(payload, "hll", generate_hll_payload(filters, count))
else
payload
end
{:ok, payload}
end
defp generate_hll_payload(filters, count) do
filters
|> JSON.encode!()
|> then(&"#{&1}:#{count}")
|> then(&:crypto.hash(:sha256, &1))
|> Base.encode64()
end
defp maybe_process_group_event(event) do
if Flow.group_related_kind?(Map.get(event, "kind")) do
Flow.handle_event(event)
else
:ok
end
end
defp persist_event(event) do
kind = Map.get(event, "kind")
cond do
kind in [5, 62] -> persist_control_event(kind, event)
ephemeral_kind?(kind) -> persist_ephemeral_event()
true -> persist_regular_event(event)
end
end
defp persist_control_event(5, event) do
with {:ok, deleted_count} <- Storage.events().delete_by_request(%{}, event) do
{:ok, deleted_count, "ok: deletion request processed"}
end
end
defp persist_control_event(62, event) do
with {:ok, deleted_count} <- Storage.events().vanish(%{}, event) do
{:ok, deleted_count, "ok: vanish request processed"}
end
end
defp persist_ephemeral_event do
if accept_ephemeral_events?() do
{:ok, :ephemeral, "ok: ephemeral event accepted"}
else
{:error, :ephemeral_events_disabled}
end
end
defp persist_regular_event(event) do
case Storage.events().put_event(%{}, event) do
{:ok, persisted_event} -> {:ok, persisted_event, "ok: event stored"}
{:error, :duplicate_event} -> {:error, :duplicate_event}
{:error, reason} -> {:error, reason}
end
end
defp fanout_event(event) do
case Index.candidate_subscription_keys(event) do
candidates when is_list(candidates) ->
Enum.each(candidates, fn {owner_pid, subscription_id} ->
send(owner_pid, {:fanout_event, subscription_id, event})
end)
_other ->
:ok
end
catch
:exit, _reason -> :ok
end
defp maybe_publish_multi_node(event) do
MultiNode.publish(event)
:ok
catch
:exit, _reason -> :ok
end
defp telemetry_metadata_for_event(event) do
%{traffic_class: traffic_class_for_event(event)}
end
defp telemetry_metadata_for_filters(filters) do
%{traffic_class: traffic_class_for_filters(filters)}
end
defp traffic_class_for_filters(filters) do
if Enum.any?(filters, &marmot_filter?/1) do
:marmot
else
:generic
end
end
defp marmot_filter?(filter) when is_map(filter) do
has_marmot_kind? =
case Map.get(filter, "kinds") do
kinds when is_list(kinds) -> Enum.any?(kinds, &MapSet.member?(@marmot_kinds, &1))
_other -> false
end
has_marmot_kind? or Map.has_key?(filter, "#h") or Map.has_key?(filter, "#i")
end
defp marmot_filter?(_filter), do: false
defp traffic_class_for_event(event) when is_map(event) do
if MapSet.member?(@marmot_kinds, Map.get(event, "kind")) do
:marmot
else
:generic
end
end
defp traffic_class_for_event(_event), do: :generic
defp fetch_context(opts) do
case Keyword.get(opts, :context) do
%RequestContext{} = context -> {:ok, context}
_other -> {:error, :invalid_context}
end
end
defp error_message_for_publish_failure(:duplicate_event),
do: "duplicate: event already stored"
defp error_message_for_publish_failure(:event_too_large),
do: "invalid: event exceeds max event size"
defp error_message_for_publish_failure(:ephemeral_events_disabled),
do: "blocked: ephemeral events are disabled"
defp error_message_for_publish_failure(reason)
when reason in [
:auth_required,
:pubkey_not_allowed,
:restricted_giftwrap,
:sync_write_not_allowed,
:protected_event_requires_auth,
:protected_event_pubkey_mismatch,
:pow_below_minimum,
:pubkey_banned,
:event_banned,
:media_metadata_tags_exceeded,
:media_metadata_tag_value_too_large,
:media_metadata_url_too_long,
:media_metadata_invalid_url,
:media_metadata_invalid_hash,
:media_metadata_invalid_mime,
:media_metadata_mime_not_allowed,
:media_metadata_unsupported_version,
:push_notification_relay_tags_exceeded,
:push_notification_payload_too_large,
:push_notification_replay_window_exceeded,
:push_notification_missing_expiration,
:push_notification_expiration_too_far,
:push_notification_server_recipients_exceeded
],
do: EventPolicy.error_message(reason)
defp error_message_for_publish_failure(reason) when is_binary(reason), do: reason
defp error_message_for_publish_failure(reason), do: "error: #{inspect(reason)}"
defp validate_event_payload_size(event, max_event_bytes)
when is_map(event) and is_integer(max_event_bytes) and max_event_bytes > 0 do
if byte_size(JSON.encode!(event)) <= max_event_bytes do
:ok
else
{:error, :event_too_large}
end
end
defp validate_event_payload_size(_event, _max_event_bytes), do: :ok
defp max_event_bytes(opts) do
opts
|> Keyword.get(:max_event_bytes, configured_max_event_bytes())
|> normalize_max_event_bytes()
end
defp normalize_max_event_bytes(value) when is_integer(value) and value > 0, do: value
defp normalize_max_event_bytes(_value), do: configured_max_event_bytes()
defp configured_max_event_bytes do
:parrhesia
|> Application.get_env(:limits, [])
|> Keyword.get(:max_event_bytes, @default_max_event_bytes)
end
defp ephemeral_kind?(kind) when is_integer(kind), do: kind >= 20_000 and kind < 30_000
defp ephemeral_kind?(_kind), do: false
defp accept_ephemeral_events? do
:parrhesia
|> Application.get_env(:policies, [])
|> Keyword.get(:accept_ephemeral_events, true)
end
end

View File

@@ -0,0 +1,14 @@
defmodule Parrhesia.API.Events.PublishResult do
@moduledoc """
Result shape for event publish attempts.
"""
defstruct [:event_id, :accepted, :message, :reason]
@type t :: %__MODULE__{
event_id: String.t(),
accepted: boolean(),
message: String.t(),
reason: term()
}
end

View File

@@ -0,0 +1,97 @@
defmodule Parrhesia.API.Stream do
@moduledoc """
In-process subscription API with relay-equivalent catch-up and live fanout semantics.
"""
alias Parrhesia.API.Events
alias Parrhesia.API.RequestContext
alias Parrhesia.API.Stream.Subscription
alias Parrhesia.Policy.EventPolicy
alias Parrhesia.Protocol.Filter
@spec subscribe(pid(), String.t(), [map()], keyword()) :: {:ok, reference()} | {:error, term()}
def subscribe(subscriber, subscription_id, filters, opts \\ [])
def subscribe(subscriber, subscription_id, filters, opts)
when is_pid(subscriber) and is_binary(subscription_id) and is_list(filters) and
is_list(opts) do
with {:ok, context} <- fetch_context(opts),
:ok <- Filter.validate_filters(filters),
:ok <-
EventPolicy.authorize_read(
filters,
context.authenticated_pubkeys,
stream_context(context, subscription_id)
) do
ref = make_ref()
case DynamicSupervisor.start_child(
Parrhesia.API.Stream.Supervisor,
{Subscription,
ref: ref, subscriber: subscriber, subscription_id: subscription_id, filters: filters}
) do
{:ok, pid} ->
finalize_subscription(pid, ref, filters, stream_context(context, subscription_id))
{:error, reason} ->
{:error, reason}
end
end
end
def subscribe(_subscriber, _subscription_id, _filters, _opts),
do: {:error, :invalid_subscription}
@spec unsubscribe(reference()) :: :ok
def unsubscribe(ref) when is_reference(ref) do
case Registry.lookup(Parrhesia.API.Stream.Registry, ref) do
[{pid, _value}] ->
try do
:ok = GenServer.stop(pid, :normal)
catch
:exit, _reason -> :ok
end
:ok
[] ->
:ok
end
end
def unsubscribe(_ref), do: :ok
defp fetch_context(opts) do
case Keyword.get(opts, :context) do
%RequestContext{} = context -> {:ok, context}
_other -> {:error, :invalid_context}
end
end
defp finalize_subscription(pid, ref, filters, context) do
with {:ok, initial_events} <-
Events.query(filters,
context: context,
validate_filters?: false,
authorize_read?: false
),
:ok <- Subscription.deliver_initial(pid, initial_events) do
{:ok, ref}
else
{:error, reason} ->
_ = safe_stop_subscription(pid)
{:error, reason}
end
end
defp safe_stop_subscription(pid) do
GenServer.stop(pid, :shutdown)
:ok
catch
:exit, _reason -> :ok
end
defp stream_context(%RequestContext{} = context, subscription_id) do
%RequestContext{context | subscription_id: subscription_id}
end
end

View File

@@ -0,0 +1,178 @@
defmodule Parrhesia.API.Stream.Subscription do
@moduledoc false
use GenServer
alias Parrhesia.Protocol.Filter
alias Parrhesia.Subscriptions.Index
defstruct [
:ref,
:subscriber,
:subscriber_monitor_ref,
:subscription_id,
:filters,
ready?: false,
buffered_events: []
]
@type t :: %__MODULE__{
ref: reference(),
subscriber: pid(),
subscriber_monitor_ref: reference(),
subscription_id: String.t(),
filters: [map()],
ready?: boolean(),
buffered_events: [map()]
}
@spec start_link(keyword()) :: GenServer.on_start()
def start_link(opts) when is_list(opts) do
ref = Keyword.fetch!(opts, :ref)
GenServer.start_link(__MODULE__, opts, name: via_tuple(ref))
end
@spec deliver_initial(GenServer.server(), [map()]) :: :ok | {:error, term()}
def deliver_initial(server, initial_events) when is_list(initial_events) do
GenServer.call(server, {:deliver_initial, initial_events})
end
@impl true
def init(opts) do
with {:ok, subscriber} <- fetch_subscriber(opts),
{:ok, subscription_id} <- fetch_subscription_id(opts),
{:ok, filters} <- fetch_filters(opts),
:ok <-
maybe_upsert_index_subscription(subscription_index(opts), subscription_id, filters) do
monitor_ref = Process.monitor(subscriber)
state = %__MODULE__{
ref: Keyword.fetch!(opts, :ref),
subscriber: subscriber,
subscriber_monitor_ref: monitor_ref,
subscription_id: subscription_id,
filters: filters,
ready?: false,
buffered_events: []
}
{:ok, state}
else
{:error, reason} -> {:stop, reason}
end
end
@impl true
def handle_call({:deliver_initial, initial_events}, _from, %__MODULE__{} = state) do
send_initial_events(state, initial_events)
Enum.each(Enum.reverse(state.buffered_events), fn event ->
send(state.subscriber, {:parrhesia, :event, state.ref, state.subscription_id, event})
end)
{:reply, :ok, %__MODULE__{state | ready?: true, buffered_events: []}}
end
@impl true
def handle_info({:fanout_event, subscription_id, event}, %__MODULE__{} = state)
when is_binary(subscription_id) and is_map(event) do
handle_fanout_event(state, subscription_id, event)
end
def handle_info({:DOWN, monitor_ref, :process, subscriber, _reason}, %__MODULE__{} = state)
when monitor_ref == state.subscriber_monitor_ref and subscriber == state.subscriber do
{:stop, :normal, state}
end
def handle_info(_message, %__MODULE__{} = state), do: {:noreply, state}
@impl true
def terminate(reason, %__MODULE__{} = state) do
:ok = maybe_remove_index_subscription(state.subscription_id)
if reason not in [:normal, :shutdown] do
send(state.subscriber, {:parrhesia, :closed, state.ref, state.subscription_id, reason})
end
:ok
end
defp send_initial_events(state, events) do
Enum.each(events, fn event ->
send(state.subscriber, {:parrhesia, :event, state.ref, state.subscription_id, event})
end)
send(state.subscriber, {:parrhesia, :eose, state.ref, state.subscription_id})
end
defp via_tuple(ref), do: {:via, Registry, {Parrhesia.API.Stream.Registry, ref}}
defp fetch_subscriber(opts) do
case Keyword.get(opts, :subscriber) do
subscriber when is_pid(subscriber) -> {:ok, subscriber}
_other -> {:error, :invalid_subscriber}
end
end
defp fetch_subscription_id(opts) do
case Keyword.get(opts, :subscription_id) do
subscription_id when is_binary(subscription_id) -> {:ok, subscription_id}
_other -> {:error, :invalid_subscription_id}
end
end
defp fetch_filters(opts) do
case Keyword.get(opts, :filters) do
filters when is_list(filters) -> {:ok, filters}
_other -> {:error, :invalid_filters}
end
end
defp subscription_index(opts) do
case Keyword.get(opts, :subscription_index, Index) do
subscription_index when is_pid(subscription_index) or is_atom(subscription_index) ->
subscription_index
_other ->
nil
end
end
defp maybe_upsert_index_subscription(nil, _subscription_id, _filters),
do: {:error, :subscription_index_unavailable}
defp maybe_upsert_index_subscription(subscription_index, subscription_id, filters) do
case Index.upsert(subscription_index, self(), subscription_id, filters) do
:ok -> :ok
{:error, reason} -> {:error, reason}
end
catch
:exit, _reason -> {:error, :subscription_index_unavailable}
end
defp maybe_remove_index_subscription(subscription_id) do
:ok = Index.remove(Index, self(), subscription_id)
:ok
catch
:exit, _reason -> :ok
end
defp handle_fanout_event(%__MODULE__{} = state, subscription_id, event) do
cond do
subscription_id != state.subscription_id ->
{:noreply, state}
not Filter.matches_any?(event, state.filters) ->
{:noreply, state}
state.ready? ->
send(state.subscriber, {:parrhesia, :event, state.ref, state.subscription_id, event})
{:noreply, state}
true ->
buffered_events = [event | state.buffered_events]
{:noreply, %__MODULE__{state | buffered_events: buffered_events}}
end
end
end

View File

@@ -13,7 +13,9 @@ defmodule Parrhesia.Subscriptions.Supervisor do
def init(_init_arg) do
children =
[
{Parrhesia.Subscriptions.Index, name: Parrhesia.Subscriptions.Index}
{Parrhesia.Subscriptions.Index, name: Parrhesia.Subscriptions.Index},
{Registry, keys: :unique, name: Parrhesia.API.Stream.Registry},
{DynamicSupervisor, strategy: :one_for_one, name: Parrhesia.API.Stream.Supervisor}
] ++
negentropy_children() ++ [{Parrhesia.Fanout.MultiNode, name: Parrhesia.Fanout.MultiNode}]

View File

@@ -5,16 +5,14 @@ defmodule Parrhesia.Web.Connection do
@behaviour WebSock
alias Parrhesia.API.Events
alias Parrhesia.API.RequestContext
alias Parrhesia.Auth.Challenges
alias Parrhesia.Fanout.MultiNode
alias Parrhesia.Groups.Flow
alias Parrhesia.Negentropy.Sessions
alias Parrhesia.Policy.ConnectionPolicy
alias Parrhesia.Policy.EventPolicy
alias Parrhesia.Protocol
alias Parrhesia.Protocol.Filter
alias Parrhesia.Storage
alias Parrhesia.Subscriptions.Index
alias Parrhesia.Telemetry
@@ -28,7 +26,6 @@ defmodule Parrhesia.Web.Connection do
@default_event_ingest_window_seconds 1
@default_auth_max_age_seconds 600
@drain_outbound_queue :drain_outbound_queue
@post_ack_ingest :post_ack_ingest
@outbound_queue_pressure_threshold 0.75
@marmot_kinds MapSet.new([
@@ -199,12 +196,6 @@ defmodule Parrhesia.Web.Connection do
handle_fanout_events(state, fanout_events)
end
def handle_info({@post_ack_ingest, event}, %__MODULE__{} = state) when is_map(event) do
fanout_event(event)
maybe_publish_multi_node(event)
{:ok, state}
end
def handle_info(@drain_outbound_queue, %__MODULE__{} = state) do
{frames, next_state} = drain_outbound_frames(state)
@@ -227,59 +218,39 @@ defmodule Parrhesia.Web.Connection do
end
defp handle_event_ingest(%__MODULE__{} = state, event) do
started_at = System.monotonic_time()
event_id = Map.get(event, "id", "")
case maybe_allow_event_ingest(state) do
{:ok, next_state} ->
result =
with :ok <- validate_event_payload_size(event, next_state.max_event_bytes),
:ok <- Protocol.validate_event(event),
:ok <-
EventPolicy.authorize_write(
event,
next_state.authenticated_pubkeys,
request_context(next_state)
),
:ok <- maybe_process_group_event(event),
{:ok, _result, message} <- persist_event(event) do
{:ok, message}
end
handle_event_ingest_result(result, next_state, event, event_id, started_at)
publish_event_response(next_state, event)
{:error, reason} ->
ingest_error_response(state, event_id, reason)
end
end
defp handle_event_ingest_result(
{:ok, message},
%__MODULE__{} = state,
event,
event_id,
started_at
) do
Telemetry.emit(
[:parrhesia, :ingest, :stop],
%{duration: System.monotonic_time() - started_at},
telemetry_metadata_for_event(event)
)
defp publish_event_response(%__MODULE__{} = state, event) do
case Events.publish(
event,
context: request_context(state),
max_event_bytes: state.max_event_bytes
) do
{:ok, %{event_id: event_id, accepted: accepted, message: message, reason: reason}} ->
response = Protocol.encode_relay({:ok, event_id, accepted, message})
maybe_with_auth_challenge(state, reason, response)
send(self(), {@post_ack_ingest, event})
response = Protocol.encode_relay({:ok, event_id, true, message})
{:push, {:text, response}, state}
{:error, reason} ->
ingest_error_response(state, Map.get(event, "id", ""), reason)
end
end
defp handle_event_ingest_result(
{:error, reason},
%__MODULE__{} = state,
_event,
event_id,
_started_at
),
do: ingest_error_response(state, event_id, reason)
defp maybe_with_auth_challenge(state, reason, response) do
if reason in [:auth_required, :protected_event_requires_auth] do
with_auth_challenge_frame(state, {:push, {:text, response}, state})
else
{:push, {:text, response}, state}
end
end
defp ingest_error_response(%__MODULE__{} = state, event_id, reason) do
message = error_message_for_ingest_failure(reason)
@@ -293,8 +264,6 @@ defmodule Parrhesia.Web.Connection do
end
defp handle_req(%__MODULE__{} = state, subscription_id, filters) do
started_at = System.monotonic_time()
with :ok <- Filter.validate_filters(filters),
:ok <-
EventPolicy.authorize_read(
@@ -304,13 +273,12 @@ defmodule Parrhesia.Web.Connection do
),
{:ok, next_state} <- upsert_subscription(state, subscription_id, filters),
:ok <- maybe_upsert_index_subscription(next_state, subscription_id, filters),
{:ok, events} <- query_initial_events(filters, state.authenticated_pubkeys) do
Telemetry.emit(
[:parrhesia, :query, :stop],
%{duration: System.monotonic_time() - started_at},
telemetry_metadata_for_filters(filters)
)
{:ok, events} <-
Events.query(filters,
context: request_context(next_state, subscription_id),
validate_filters?: false,
authorize_read?: false
) do
frames =
Enum.map(events, fn event ->
{:text, Protocol.encode_relay({:event, subscription_id, event})}
@@ -396,77 +364,37 @@ defmodule Parrhesia.Web.Connection do
end
defp handle_count(%__MODULE__{} = state, subscription_id, filters, options) do
started_at = System.monotonic_time()
with :ok <- Filter.validate_filters(filters),
:ok <-
EventPolicy.authorize_read(
filters,
state.authenticated_pubkeys,
request_context(state, subscription_id)
),
{:ok, count} <- count_events(filters, state.authenticated_pubkeys),
{:ok, payload} <- build_count_payload(filters, count, options) do
Telemetry.emit(
[:parrhesia, :query, :stop],
%{duration: System.monotonic_time() - started_at},
telemetry_metadata_for_filters(filters)
)
response = Protocol.encode_relay({:count, subscription_id, payload})
{:push, {:text, response}, state}
else
{:error, :auth_required} ->
restricted_count_notice(state, subscription_id, EventPolicy.error_message(:auth_required))
{:error, :pubkey_not_allowed} ->
restricted_count_notice(
state,
subscription_id,
EventPolicy.error_message(:pubkey_not_allowed)
)
{:error, :restricted_giftwrap} ->
restricted_count_notice(
state,
subscription_id,
EventPolicy.error_message(:restricted_giftwrap)
)
{:error, :sync_read_not_allowed} ->
restricted_count_notice(
state,
subscription_id,
EventPolicy.error_message(:sync_read_not_allowed)
)
{:error, :marmot_group_h_tag_required} ->
restricted_count_notice(
state,
subscription_id,
EventPolicy.error_message(:marmot_group_h_tag_required)
)
{:error, :marmot_group_h_values_exceeded} ->
restricted_count_notice(
state,
subscription_id,
EventPolicy.error_message(:marmot_group_h_values_exceeded)
)
{:error, :marmot_group_filter_window_too_wide} ->
restricted_count_notice(
state,
subscription_id,
EventPolicy.error_message(:marmot_group_filter_window_too_wide)
)
case Events.count(filters,
context: request_context(state, subscription_id),
options: options
) do
{:ok, payload} ->
response = Protocol.encode_relay({:count, subscription_id, payload})
{:push, {:text, response}, state}
{:error, reason} ->
response = Protocol.encode_relay({:closed, subscription_id, inspect(reason)})
{:push, {:text, response}, state}
handle_count_error(state, subscription_id, reason)
end
end
defp handle_count_error(state, subscription_id, reason)
when reason in [
:auth_required,
:pubkey_not_allowed,
:restricted_giftwrap,
:sync_read_not_allowed,
:marmot_group_h_tag_required,
:marmot_group_h_values_exceeded,
:marmot_group_filter_window_too_wide
] do
restricted_count_notice(state, subscription_id, EventPolicy.error_message(reason))
end
defp handle_count_error(state, subscription_id, reason) do
response = Protocol.encode_relay({:closed, subscription_id, inspect(reason)})
{:push, {:text, response}, state}
end
defp handle_auth(%__MODULE__{} = state, auth_event) do
event_id = Map.get(auth_event, "id", "")
@@ -534,52 +462,6 @@ defmodule Parrhesia.Web.Connection do
{:ok, state}
end
defp maybe_process_group_event(event) do
if Flow.group_related_kind?(Map.get(event, "kind")) do
Flow.handle_event(event)
else
:ok
end
end
defp persist_event(event) do
kind = Map.get(event, "kind")
cond do
kind in [5, 62] -> persist_control_event(kind, event)
ephemeral_kind?(kind) -> persist_ephemeral_event()
true -> persist_regular_event(event)
end
end
defp persist_control_event(5, event) do
with {:ok, deleted_count} <- Storage.events().delete_by_request(%{}, event) do
{:ok, deleted_count, "ok: deletion request processed"}
end
end
defp persist_control_event(62, event) do
with {:ok, deleted_count} <- Storage.events().vanish(%{}, event) do
{:ok, deleted_count, "ok: vanish request processed"}
end
end
defp persist_ephemeral_event do
if accept_ephemeral_events?() do
{:ok, :ephemeral, "ok: ephemeral event accepted"}
else
{:error, :ephemeral_events_disabled}
end
end
defp persist_regular_event(event) do
case Storage.events().put_event(%{}, event) do
{:ok, persisted_event} -> {:ok, persisted_event, "ok: event stored"}
{:error, :duplicate_event} -> {:error, :duplicate_event}
{:error, reason} -> {:error, reason}
end
end
defp error_message_for_ingest_failure(:duplicate_event),
do: "duplicate: event already stored"
@@ -623,49 +505,6 @@ defmodule Parrhesia.Web.Connection do
defp error_message_for_ingest_failure(reason) when is_binary(reason), do: reason
defp error_message_for_ingest_failure(reason), do: "error: #{inspect(reason)}"
defp query_initial_events(filters, authenticated_pubkeys) do
Storage.events().query(%{}, filters,
max_filter_limit: Parrhesia.Config.get([:limits, :max_filter_limit]),
requester_pubkeys: MapSet.to_list(authenticated_pubkeys)
)
end
defp count_events(filters, authenticated_pubkeys) do
Storage.events().count(%{}, filters, requester_pubkeys: MapSet.to_list(authenticated_pubkeys))
end
defp build_count_payload(filters, count, options) when is_integer(count) and is_map(options) do
include_hll? =
Map.get(options, "hll", false) and Parrhesia.Config.get([:features, :nip_45_count], true)
payload = %{"count" => count, "approximate" => false}
payload =
if include_hll? do
Map.put(payload, "hll", generate_hll_payload(filters, count))
else
payload
end
{:ok, payload}
end
defp generate_hll_payload(filters, count) do
filters
|> JSON.encode!()
|> then(&"#{&1}:#{count}")
|> then(&:crypto.hash(:sha256, &1))
|> Base.encode64()
end
defp telemetry_metadata_for_event(event) do
%{traffic_class: traffic_class_for_event(event)}
end
defp telemetry_metadata_for_filters(filters) do
%{traffic_class: traffic_class_for_filters(filters)}
end
defp telemetry_metadata_for_fanout_events(fanout_events) do
traffic_class =
if Enum.any?(fanout_events, fn
@@ -683,26 +522,6 @@ defmodule Parrhesia.Web.Connection do
%{traffic_class: traffic_class}
end
defp traffic_class_for_filters(filters) do
if Enum.any?(filters, &marmot_filter?/1) do
:marmot
else
:generic
end
end
defp marmot_filter?(filter) when is_map(filter) do
has_marmot_kind? =
case Map.get(filter, "kinds") do
kinds when is_list(kinds) -> Enum.any?(kinds, &MapSet.member?(@marmot_kinds, &1))
_other -> false
end
has_marmot_kind? or Map.has_key?(filter, "#h") or Map.has_key?(filter, "#i")
end
defp marmot_filter?(_filter), do: false
defp traffic_class_for_event(event) when is_map(event) do
if MapSet.member?(@marmot_kinds, Map.get(event, "kind")) do
:marmot
@@ -981,27 +800,6 @@ defmodule Parrhesia.Web.Connection do
:exit, _reason -> :ok
end
defp fanout_event(event) do
case Index.candidate_subscription_keys(event) do
candidates when is_list(candidates) ->
Enum.each(candidates, fn {owner_pid, subscription_id} ->
send(owner_pid, {:fanout_event, subscription_id, event})
end)
_other ->
:ok
end
catch
:exit, _reason -> :ok
end
defp maybe_publish_multi_node(event) do
MultiNode.publish(event)
:ok
catch
:exit, _reason -> :ok
end
defp handle_fanout_events(%__MODULE__{} = state, fanout_events) do
started_at = System.monotonic_time()
telemetry_metadata = telemetry_metadata_for_fanout_events(fanout_events)
@@ -1656,24 +1454,4 @@ defmodule Parrhesia.Web.Connection do
{:error, :event_rate_limited}
end
end
defp validate_event_payload_size(event, max_event_bytes)
when is_map(event) and is_integer(max_event_bytes) and max_event_bytes > 0 do
if byte_size(JSON.encode!(event)) <= max_event_bytes do
:ok
else
{:error, :event_too_large}
end
end
defp validate_event_payload_size(_event, _max_event_bytes), do: :ok
defp ephemeral_kind?(kind) when is_integer(kind), do: kind >= 20_000 and kind < 30_000
defp ephemeral_kind?(_kind), do: false
defp accept_ephemeral_events? do
:parrhesia
|> Application.get_env(:policies, [])
|> Keyword.get(:accept_ephemeral_events, true)
end
end

View File

@@ -0,0 +1,81 @@
defmodule Parrhesia.API.EventsTest do
use ExUnit.Case, async: false
alias Ecto.Adapters.SQL.Sandbox
alias Parrhesia.API.Events
alias Parrhesia.API.RequestContext
alias Parrhesia.Protocol.EventValidator
alias Parrhesia.Repo
setup do
:ok = Sandbox.checkout(Repo)
:ok
end
test "publish stores valid events through the shared API" do
event = valid_event()
assert {:ok, result} = Events.publish(event, context: %RequestContext{})
assert result.accepted
assert result.event_id == event["id"]
assert result.message == "ok: event stored"
assert result.reason == nil
assert {:ok, stored_event} = Parrhesia.Storage.events().get_event(%{}, event["id"])
assert stored_event["id"] == event["id"]
end
test "publish returns duplicate results without raising transport errors" do
event = valid_event()
assert {:ok, first_result} = Events.publish(event, context: %RequestContext{})
assert first_result.accepted
assert {:ok, second_result} = Events.publish(event, context: %RequestContext{})
refute second_result.accepted
assert second_result.reason == :duplicate_event
assert second_result.message == "duplicate: event already stored"
end
test "query and count preserve read semantics through the shared API" do
now = System.system_time(:second)
first = valid_event(%{"content" => "first", "created_at" => now})
second = valid_event(%{"content" => "second", "created_at" => now + 1})
assert {:ok, %{accepted: true}} = Events.publish(first, context: %RequestContext{})
assert {:ok, %{accepted: true}} = Events.publish(second, context: %RequestContext{})
assert {:ok, events} =
Events.query([%{"kinds" => [1]}], context: %RequestContext{})
assert Enum.map(events, & &1["id"]) == [second["id"], first["id"]]
assert {:ok, 2} =
Events.count([%{"kinds" => [1]}], context: %RequestContext{})
assert {:ok, %{"count" => 2, "approximate" => false}} =
Events.count([%{"kinds" => [1]}],
context: %RequestContext{},
options: %{}
)
end
defp valid_event(overrides \\ %{}) do
base_event = %{
"pubkey" => String.duplicate("1", 64),
"created_at" => System.system_time(:second),
"kind" => 1,
"tags" => [],
"content" => "hello",
"sig" => String.duplicate("3", 128)
}
base_event
|> Map.merge(overrides)
|> recalculate_event_id()
end
defp recalculate_event_id(event) do
Map.put(event, "id", EventValidator.compute_id(event))
end
end

View File

@@ -0,0 +1,80 @@
defmodule Parrhesia.API.StreamTest do
use ExUnit.Case, async: false
alias Ecto.Adapters.SQL.Sandbox
alias Parrhesia.API.Events
alias Parrhesia.API.RequestContext
alias Parrhesia.API.Stream
alias Parrhesia.Protocol.EventValidator
alias Parrhesia.Repo
setup do
:ok = Sandbox.checkout(Repo)
:ok
end
test "subscribe streams catch-up events followed by eose" do
event = valid_event()
context = %RequestContext{}
assert {:ok, %{accepted: true}} = Events.publish(event, context: context)
assert {:ok, ref} = Stream.subscribe(self(), "sub-1", [%{"kinds" => [1]}], context: context)
assert_receive {:parrhesia, :event, ^ref, "sub-1", received_event}
assert received_event["id"] == event["id"]
assert_receive {:parrhesia, :eose, ^ref, "sub-1"}
assert :ok = Stream.unsubscribe(ref)
end
test "subscribe receives live fanout events after eose" do
context = %RequestContext{}
event = valid_event()
assert {:ok, ref} =
Stream.subscribe(self(), "sub-live", [%{"kinds" => [1]}], context: context)
assert_receive {:parrhesia, :eose, ^ref, "sub-live"}, 1_000
assert {:ok, %{accepted: true}} = Events.publish(event, context: context)
assert_receive {:parrhesia, :event, ^ref, "sub-live", received_event}, 1_000
assert received_event["id"] == event["id"]
assert :ok = Stream.unsubscribe(ref)
end
test "unsubscribe stops the subscription bridge" do
context = %RequestContext{}
assert {:ok, ref} =
Stream.subscribe(self(), "sub-stop", [%{"kinds" => [1]}], context: context)
assert_receive {:parrhesia, :eose, ^ref, "sub-stop"}
[{stream_pid, _value}] = Registry.lookup(Parrhesia.API.Stream.Registry, ref)
_ = :sys.get_state(stream_pid)
monitor_ref = Process.monitor(stream_pid)
assert :ok = Stream.unsubscribe(ref)
assert_receive {:DOWN, ^monitor_ref, :process, ^stream_pid, reason}
assert reason in [:normal, :noproc]
end
defp valid_event(overrides \\ %{}) do
base_event = %{
"pubkey" => String.duplicate("1", 64),
"created_at" => System.system_time(:second),
"kind" => 1,
"tags" => [],
"content" => "hello",
"sig" => String.duplicate("3", 128)
}
base_event
|> Map.merge(overrides)
|> recalculate_event_id()
end
defp recalculate_event_id(event) do
Map.put(event, "id", EventValidator.compute_id(event))
end
end