From d119d21d9937aa12ed4e6ba656aa5dcd966a205b Mon Sep 17 00:00:00 2001 From: Steffen Beyer Date: Mon, 16 Mar 2026 20:21:58 +0100 Subject: [PATCH] Extract API events and stream layers --- lib/parrhesia/api/events.ex | 373 +++++++++++++++++++++ lib/parrhesia/api/events/publish_result.ex | 14 + lib/parrhesia/api/stream.ex | 97 ++++++ lib/parrhesia/api/stream/subscription.ex | 178 ++++++++++ lib/parrhesia/subscriptions/supervisor.ex | 4 +- lib/parrhesia/web/connection.ex | 328 +++--------------- test/parrhesia/api/events_test.exs | 81 +++++ test/parrhesia/api/stream_test.exs | 80 +++++ 8 files changed, 879 insertions(+), 276 deletions(-) create mode 100644 lib/parrhesia/api/events.ex create mode 100644 lib/parrhesia/api/events/publish_result.ex create mode 100644 lib/parrhesia/api/stream.ex create mode 100644 lib/parrhesia/api/stream/subscription.ex create mode 100644 test/parrhesia/api/events_test.exs create mode 100644 test/parrhesia/api/stream_test.exs diff --git a/lib/parrhesia/api/events.ex b/lib/parrhesia/api/events.ex new file mode 100644 index 0000000..83449e5 --- /dev/null +++ b/lib/parrhesia/api/events.ex @@ -0,0 +1,373 @@ +defmodule Parrhesia.API.Events do + @moduledoc """ + Canonical event publish, query, and count API. + """ + + alias Parrhesia.API.Events.PublishResult + alias Parrhesia.API.RequestContext + alias Parrhesia.Fanout.MultiNode + alias Parrhesia.Groups.Flow + alias Parrhesia.Policy.EventPolicy + alias Parrhesia.Protocol + alias Parrhesia.Protocol.Filter + alias Parrhesia.Storage + alias Parrhesia.Subscriptions.Index + alias Parrhesia.Telemetry + + @default_max_event_bytes 262_144 + + @marmot_kinds MapSet.new([ + 443, + 444, + 445, + 1059, + 10_050, + 10_051, + 446, + 447, + 448, + 449 + ]) + + @spec publish(map(), keyword()) :: {:ok, PublishResult.t()} | {:error, term()} + def publish(event, opts \\ []) + + def publish(event, opts) when is_map(event) and is_list(opts) do + started_at = System.monotonic_time() + event_id = Map.get(event, "id", "") + + with {:ok, context} <- fetch_context(opts), + :ok <- validate_event_payload_size(event, max_event_bytes(opts)), + :ok <- Protocol.validate_event(event), + :ok <- EventPolicy.authorize_write(event, context.authenticated_pubkeys, context), + :ok <- maybe_process_group_event(event), + {:ok, _stored, message} <- persist_event(event) do + Telemetry.emit( + [:parrhesia, :ingest, :stop], + %{duration: System.monotonic_time() - started_at}, + telemetry_metadata_for_event(event) + ) + + fanout_event(event) + maybe_publish_multi_node(event) + + {:ok, + %PublishResult{ + event_id: event_id, + accepted: true, + message: message, + reason: nil + }} + else + {:error, :invalid_context} = error -> + error + + {:error, reason} -> + {:ok, + %PublishResult{ + event_id: event_id, + accepted: false, + message: error_message_for_publish_failure(reason), + reason: reason + }} + end + end + + def publish(_event, _opts), do: {:error, :invalid_event} + + @spec query([map()], keyword()) :: {:ok, [map()]} | {:error, term()} + def query(filters, opts \\ []) + + def query(filters, opts) when is_list(filters) and is_list(opts) do + started_at = System.monotonic_time() + + with {:ok, context} <- fetch_context(opts), + :ok <- maybe_validate_filters(filters, opts), + :ok <- maybe_authorize_read(filters, context, opts), + {:ok, events} <- Storage.events().query(%{}, filters, storage_query_opts(context, opts)) do + Telemetry.emit( + [:parrhesia, :query, :stop], + %{duration: System.monotonic_time() - started_at}, + telemetry_metadata_for_filters(filters) + ) + + {:ok, events} + end + end + + def query(_filters, _opts), do: {:error, :invalid_filters} + + @spec count([map()], keyword()) :: {:ok, non_neg_integer() | map()} | {:error, term()} + def count(filters, opts \\ []) + + def count(filters, opts) when is_list(filters) and is_list(opts) do + started_at = System.monotonic_time() + + with {:ok, context} <- fetch_context(opts), + :ok <- maybe_validate_filters(filters, opts), + :ok <- maybe_authorize_read(filters, context, opts), + {:ok, count} <- + Storage.events().count(%{}, filters, requester_pubkeys: requester_pubkeys(context)), + {:ok, result} <- maybe_build_count_result(filters, count, Keyword.get(opts, :options)) do + Telemetry.emit( + [:parrhesia, :query, :stop], + %{duration: System.monotonic_time() - started_at}, + telemetry_metadata_for_filters(filters) + ) + + {:ok, result} + end + end + + def count(_filters, _opts), do: {:error, :invalid_filters} + + defp maybe_validate_filters(filters, opts) do + if Keyword.get(opts, :validate_filters?, true) do + Filter.validate_filters(filters) + else + :ok + end + end + + defp maybe_authorize_read(filters, context, opts) do + if Keyword.get(opts, :authorize_read?, true) do + EventPolicy.authorize_read(filters, context.authenticated_pubkeys, context) + else + :ok + end + end + + defp storage_query_opts(context, opts) do + [ + max_filter_limit: + Keyword.get(opts, :max_filter_limit, Parrhesia.Config.get([:limits, :max_filter_limit])), + requester_pubkeys: requester_pubkeys(context) + ] + end + + defp requester_pubkeys(%RequestContext{} = context), + do: MapSet.to_list(context.authenticated_pubkeys) + + defp maybe_build_count_result(_filters, count, nil) when is_integer(count), do: {:ok, count} + + defp maybe_build_count_result(filters, count, options) + when is_integer(count) and is_map(options) do + build_count_payload(filters, count, options) + end + + defp maybe_build_count_result(_filters, count, _options) when is_integer(count), + do: {:ok, count} + + defp maybe_build_count_result(_filters, count, _options), do: {:ok, count} + + defp build_count_payload(filters, count, options) do + include_hll? = + Map.get(options, "hll", false) and Parrhesia.Config.get([:features, :nip_45_count], true) + + payload = %{"count" => count, "approximate" => false} + + payload = + if include_hll? do + Map.put(payload, "hll", generate_hll_payload(filters, count)) + else + payload + end + + {:ok, payload} + end + + defp generate_hll_payload(filters, count) do + filters + |> JSON.encode!() + |> then(&"#{&1}:#{count}") + |> then(&:crypto.hash(:sha256, &1)) + |> Base.encode64() + end + + defp maybe_process_group_event(event) do + if Flow.group_related_kind?(Map.get(event, "kind")) do + Flow.handle_event(event) + else + :ok + end + end + + defp persist_event(event) do + kind = Map.get(event, "kind") + + cond do + kind in [5, 62] -> persist_control_event(kind, event) + ephemeral_kind?(kind) -> persist_ephemeral_event() + true -> persist_regular_event(event) + end + end + + defp persist_control_event(5, event) do + with {:ok, deleted_count} <- Storage.events().delete_by_request(%{}, event) do + {:ok, deleted_count, "ok: deletion request processed"} + end + end + + defp persist_control_event(62, event) do + with {:ok, deleted_count} <- Storage.events().vanish(%{}, event) do + {:ok, deleted_count, "ok: vanish request processed"} + end + end + + defp persist_ephemeral_event do + if accept_ephemeral_events?() do + {:ok, :ephemeral, "ok: ephemeral event accepted"} + else + {:error, :ephemeral_events_disabled} + end + end + + defp persist_regular_event(event) do + case Storage.events().put_event(%{}, event) do + {:ok, persisted_event} -> {:ok, persisted_event, "ok: event stored"} + {:error, :duplicate_event} -> {:error, :duplicate_event} + {:error, reason} -> {:error, reason} + end + end + + defp fanout_event(event) do + case Index.candidate_subscription_keys(event) do + candidates when is_list(candidates) -> + Enum.each(candidates, fn {owner_pid, subscription_id} -> + send(owner_pid, {:fanout_event, subscription_id, event}) + end) + + _other -> + :ok + end + catch + :exit, _reason -> :ok + end + + defp maybe_publish_multi_node(event) do + MultiNode.publish(event) + :ok + catch + :exit, _reason -> :ok + end + + defp telemetry_metadata_for_event(event) do + %{traffic_class: traffic_class_for_event(event)} + end + + defp telemetry_metadata_for_filters(filters) do + %{traffic_class: traffic_class_for_filters(filters)} + end + + defp traffic_class_for_filters(filters) do + if Enum.any?(filters, &marmot_filter?/1) do + :marmot + else + :generic + end + end + + defp marmot_filter?(filter) when is_map(filter) do + has_marmot_kind? = + case Map.get(filter, "kinds") do + kinds when is_list(kinds) -> Enum.any?(kinds, &MapSet.member?(@marmot_kinds, &1)) + _other -> false + end + + has_marmot_kind? or Map.has_key?(filter, "#h") or Map.has_key?(filter, "#i") + end + + defp marmot_filter?(_filter), do: false + + defp traffic_class_for_event(event) when is_map(event) do + if MapSet.member?(@marmot_kinds, Map.get(event, "kind")) do + :marmot + else + :generic + end + end + + defp traffic_class_for_event(_event), do: :generic + + defp fetch_context(opts) do + case Keyword.get(opts, :context) do + %RequestContext{} = context -> {:ok, context} + _other -> {:error, :invalid_context} + end + end + + defp error_message_for_publish_failure(:duplicate_event), + do: "duplicate: event already stored" + + defp error_message_for_publish_failure(:event_too_large), + do: "invalid: event exceeds max event size" + + defp error_message_for_publish_failure(:ephemeral_events_disabled), + do: "blocked: ephemeral events are disabled" + + defp error_message_for_publish_failure(reason) + when reason in [ + :auth_required, + :pubkey_not_allowed, + :restricted_giftwrap, + :sync_write_not_allowed, + :protected_event_requires_auth, + :protected_event_pubkey_mismatch, + :pow_below_minimum, + :pubkey_banned, + :event_banned, + :media_metadata_tags_exceeded, + :media_metadata_tag_value_too_large, + :media_metadata_url_too_long, + :media_metadata_invalid_url, + :media_metadata_invalid_hash, + :media_metadata_invalid_mime, + :media_metadata_mime_not_allowed, + :media_metadata_unsupported_version, + :push_notification_relay_tags_exceeded, + :push_notification_payload_too_large, + :push_notification_replay_window_exceeded, + :push_notification_missing_expiration, + :push_notification_expiration_too_far, + :push_notification_server_recipients_exceeded + ], + do: EventPolicy.error_message(reason) + + defp error_message_for_publish_failure(reason) when is_binary(reason), do: reason + defp error_message_for_publish_failure(reason), do: "error: #{inspect(reason)}" + + defp validate_event_payload_size(event, max_event_bytes) + when is_map(event) and is_integer(max_event_bytes) and max_event_bytes > 0 do + if byte_size(JSON.encode!(event)) <= max_event_bytes do + :ok + else + {:error, :event_too_large} + end + end + + defp validate_event_payload_size(_event, _max_event_bytes), do: :ok + + defp max_event_bytes(opts) do + opts + |> Keyword.get(:max_event_bytes, configured_max_event_bytes()) + |> normalize_max_event_bytes() + end + + defp normalize_max_event_bytes(value) when is_integer(value) and value > 0, do: value + defp normalize_max_event_bytes(_value), do: configured_max_event_bytes() + + defp configured_max_event_bytes do + :parrhesia + |> Application.get_env(:limits, []) + |> Keyword.get(:max_event_bytes, @default_max_event_bytes) + end + + defp ephemeral_kind?(kind) when is_integer(kind), do: kind >= 20_000 and kind < 30_000 + defp ephemeral_kind?(_kind), do: false + + defp accept_ephemeral_events? do + :parrhesia + |> Application.get_env(:policies, []) + |> Keyword.get(:accept_ephemeral_events, true) + end +end diff --git a/lib/parrhesia/api/events/publish_result.ex b/lib/parrhesia/api/events/publish_result.ex new file mode 100644 index 0000000..e115d2b --- /dev/null +++ b/lib/parrhesia/api/events/publish_result.ex @@ -0,0 +1,14 @@ +defmodule Parrhesia.API.Events.PublishResult do + @moduledoc """ + Result shape for event publish attempts. + """ + + defstruct [:event_id, :accepted, :message, :reason] + + @type t :: %__MODULE__{ + event_id: String.t(), + accepted: boolean(), + message: String.t(), + reason: term() + } +end diff --git a/lib/parrhesia/api/stream.ex b/lib/parrhesia/api/stream.ex new file mode 100644 index 0000000..7dd0413 --- /dev/null +++ b/lib/parrhesia/api/stream.ex @@ -0,0 +1,97 @@ +defmodule Parrhesia.API.Stream do + @moduledoc """ + In-process subscription API with relay-equivalent catch-up and live fanout semantics. + """ + + alias Parrhesia.API.Events + alias Parrhesia.API.RequestContext + alias Parrhesia.API.Stream.Subscription + alias Parrhesia.Policy.EventPolicy + alias Parrhesia.Protocol.Filter + + @spec subscribe(pid(), String.t(), [map()], keyword()) :: {:ok, reference()} | {:error, term()} + def subscribe(subscriber, subscription_id, filters, opts \\ []) + + def subscribe(subscriber, subscription_id, filters, opts) + when is_pid(subscriber) and is_binary(subscription_id) and is_list(filters) and + is_list(opts) do + with {:ok, context} <- fetch_context(opts), + :ok <- Filter.validate_filters(filters), + :ok <- + EventPolicy.authorize_read( + filters, + context.authenticated_pubkeys, + stream_context(context, subscription_id) + ) do + ref = make_ref() + + case DynamicSupervisor.start_child( + Parrhesia.API.Stream.Supervisor, + {Subscription, + ref: ref, subscriber: subscriber, subscription_id: subscription_id, filters: filters} + ) do + {:ok, pid} -> + finalize_subscription(pid, ref, filters, stream_context(context, subscription_id)) + + {:error, reason} -> + {:error, reason} + end + end + end + + def subscribe(_subscriber, _subscription_id, _filters, _opts), + do: {:error, :invalid_subscription} + + @spec unsubscribe(reference()) :: :ok + def unsubscribe(ref) when is_reference(ref) do + case Registry.lookup(Parrhesia.API.Stream.Registry, ref) do + [{pid, _value}] -> + try do + :ok = GenServer.stop(pid, :normal) + catch + :exit, _reason -> :ok + end + + :ok + + [] -> + :ok + end + end + + def unsubscribe(_ref), do: :ok + + defp fetch_context(opts) do + case Keyword.get(opts, :context) do + %RequestContext{} = context -> {:ok, context} + _other -> {:error, :invalid_context} + end + end + + defp finalize_subscription(pid, ref, filters, context) do + with {:ok, initial_events} <- + Events.query(filters, + context: context, + validate_filters?: false, + authorize_read?: false + ), + :ok <- Subscription.deliver_initial(pid, initial_events) do + {:ok, ref} + else + {:error, reason} -> + _ = safe_stop_subscription(pid) + {:error, reason} + end + end + + defp safe_stop_subscription(pid) do + GenServer.stop(pid, :shutdown) + :ok + catch + :exit, _reason -> :ok + end + + defp stream_context(%RequestContext{} = context, subscription_id) do + %RequestContext{context | subscription_id: subscription_id} + end +end diff --git a/lib/parrhesia/api/stream/subscription.ex b/lib/parrhesia/api/stream/subscription.ex new file mode 100644 index 0000000..80fb475 --- /dev/null +++ b/lib/parrhesia/api/stream/subscription.ex @@ -0,0 +1,178 @@ +defmodule Parrhesia.API.Stream.Subscription do + @moduledoc false + + use GenServer + + alias Parrhesia.Protocol.Filter + alias Parrhesia.Subscriptions.Index + + defstruct [ + :ref, + :subscriber, + :subscriber_monitor_ref, + :subscription_id, + :filters, + ready?: false, + buffered_events: [] + ] + + @type t :: %__MODULE__{ + ref: reference(), + subscriber: pid(), + subscriber_monitor_ref: reference(), + subscription_id: String.t(), + filters: [map()], + ready?: boolean(), + buffered_events: [map()] + } + + @spec start_link(keyword()) :: GenServer.on_start() + def start_link(opts) when is_list(opts) do + ref = Keyword.fetch!(opts, :ref) + + GenServer.start_link(__MODULE__, opts, name: via_tuple(ref)) + end + + @spec deliver_initial(GenServer.server(), [map()]) :: :ok | {:error, term()} + def deliver_initial(server, initial_events) when is_list(initial_events) do + GenServer.call(server, {:deliver_initial, initial_events}) + end + + @impl true + def init(opts) do + with {:ok, subscriber} <- fetch_subscriber(opts), + {:ok, subscription_id} <- fetch_subscription_id(opts), + {:ok, filters} <- fetch_filters(opts), + :ok <- + maybe_upsert_index_subscription(subscription_index(opts), subscription_id, filters) do + monitor_ref = Process.monitor(subscriber) + + state = %__MODULE__{ + ref: Keyword.fetch!(opts, :ref), + subscriber: subscriber, + subscriber_monitor_ref: monitor_ref, + subscription_id: subscription_id, + filters: filters, + ready?: false, + buffered_events: [] + } + + {:ok, state} + else + {:error, reason} -> {:stop, reason} + end + end + + @impl true + def handle_call({:deliver_initial, initial_events}, _from, %__MODULE__{} = state) do + send_initial_events(state, initial_events) + + Enum.each(Enum.reverse(state.buffered_events), fn event -> + send(state.subscriber, {:parrhesia, :event, state.ref, state.subscription_id, event}) + end) + + {:reply, :ok, %__MODULE__{state | ready?: true, buffered_events: []}} + end + + @impl true + def handle_info({:fanout_event, subscription_id, event}, %__MODULE__{} = state) + when is_binary(subscription_id) and is_map(event) do + handle_fanout_event(state, subscription_id, event) + end + + def handle_info({:DOWN, monitor_ref, :process, subscriber, _reason}, %__MODULE__{} = state) + when monitor_ref == state.subscriber_monitor_ref and subscriber == state.subscriber do + {:stop, :normal, state} + end + + def handle_info(_message, %__MODULE__{} = state), do: {:noreply, state} + + @impl true + def terminate(reason, %__MODULE__{} = state) do + :ok = maybe_remove_index_subscription(state.subscription_id) + + if reason not in [:normal, :shutdown] do + send(state.subscriber, {:parrhesia, :closed, state.ref, state.subscription_id, reason}) + end + + :ok + end + + defp send_initial_events(state, events) do + Enum.each(events, fn event -> + send(state.subscriber, {:parrhesia, :event, state.ref, state.subscription_id, event}) + end) + + send(state.subscriber, {:parrhesia, :eose, state.ref, state.subscription_id}) + end + + defp via_tuple(ref), do: {:via, Registry, {Parrhesia.API.Stream.Registry, ref}} + + defp fetch_subscriber(opts) do + case Keyword.get(opts, :subscriber) do + subscriber when is_pid(subscriber) -> {:ok, subscriber} + _other -> {:error, :invalid_subscriber} + end + end + + defp fetch_subscription_id(opts) do + case Keyword.get(opts, :subscription_id) do + subscription_id when is_binary(subscription_id) -> {:ok, subscription_id} + _other -> {:error, :invalid_subscription_id} + end + end + + defp fetch_filters(opts) do + case Keyword.get(opts, :filters) do + filters when is_list(filters) -> {:ok, filters} + _other -> {:error, :invalid_filters} + end + end + + defp subscription_index(opts) do + case Keyword.get(opts, :subscription_index, Index) do + subscription_index when is_pid(subscription_index) or is_atom(subscription_index) -> + subscription_index + + _other -> + nil + end + end + + defp maybe_upsert_index_subscription(nil, _subscription_id, _filters), + do: {:error, :subscription_index_unavailable} + + defp maybe_upsert_index_subscription(subscription_index, subscription_id, filters) do + case Index.upsert(subscription_index, self(), subscription_id, filters) do + :ok -> :ok + {:error, reason} -> {:error, reason} + end + catch + :exit, _reason -> {:error, :subscription_index_unavailable} + end + + defp maybe_remove_index_subscription(subscription_id) do + :ok = Index.remove(Index, self(), subscription_id) + :ok + catch + :exit, _reason -> :ok + end + + defp handle_fanout_event(%__MODULE__{} = state, subscription_id, event) do + cond do + subscription_id != state.subscription_id -> + {:noreply, state} + + not Filter.matches_any?(event, state.filters) -> + {:noreply, state} + + state.ready? -> + send(state.subscriber, {:parrhesia, :event, state.ref, state.subscription_id, event}) + {:noreply, state} + + true -> + buffered_events = [event | state.buffered_events] + {:noreply, %__MODULE__{state | buffered_events: buffered_events}} + end + end +end diff --git a/lib/parrhesia/subscriptions/supervisor.ex b/lib/parrhesia/subscriptions/supervisor.ex index 6db147c..92b009e 100644 --- a/lib/parrhesia/subscriptions/supervisor.ex +++ b/lib/parrhesia/subscriptions/supervisor.ex @@ -13,7 +13,9 @@ defmodule Parrhesia.Subscriptions.Supervisor do def init(_init_arg) do children = [ - {Parrhesia.Subscriptions.Index, name: Parrhesia.Subscriptions.Index} + {Parrhesia.Subscriptions.Index, name: Parrhesia.Subscriptions.Index}, + {Registry, keys: :unique, name: Parrhesia.API.Stream.Registry}, + {DynamicSupervisor, strategy: :one_for_one, name: Parrhesia.API.Stream.Supervisor} ] ++ negentropy_children() ++ [{Parrhesia.Fanout.MultiNode, name: Parrhesia.Fanout.MultiNode}] diff --git a/lib/parrhesia/web/connection.ex b/lib/parrhesia/web/connection.ex index 89e85a6..87d518d 100644 --- a/lib/parrhesia/web/connection.ex +++ b/lib/parrhesia/web/connection.ex @@ -5,16 +5,14 @@ defmodule Parrhesia.Web.Connection do @behaviour WebSock + alias Parrhesia.API.Events alias Parrhesia.API.RequestContext alias Parrhesia.Auth.Challenges - alias Parrhesia.Fanout.MultiNode - alias Parrhesia.Groups.Flow alias Parrhesia.Negentropy.Sessions alias Parrhesia.Policy.ConnectionPolicy alias Parrhesia.Policy.EventPolicy alias Parrhesia.Protocol alias Parrhesia.Protocol.Filter - alias Parrhesia.Storage alias Parrhesia.Subscriptions.Index alias Parrhesia.Telemetry @@ -28,7 +26,6 @@ defmodule Parrhesia.Web.Connection do @default_event_ingest_window_seconds 1 @default_auth_max_age_seconds 600 @drain_outbound_queue :drain_outbound_queue - @post_ack_ingest :post_ack_ingest @outbound_queue_pressure_threshold 0.75 @marmot_kinds MapSet.new([ @@ -199,12 +196,6 @@ defmodule Parrhesia.Web.Connection do handle_fanout_events(state, fanout_events) end - def handle_info({@post_ack_ingest, event}, %__MODULE__{} = state) when is_map(event) do - fanout_event(event) - maybe_publish_multi_node(event) - {:ok, state} - end - def handle_info(@drain_outbound_queue, %__MODULE__{} = state) do {frames, next_state} = drain_outbound_frames(state) @@ -227,59 +218,39 @@ defmodule Parrhesia.Web.Connection do end defp handle_event_ingest(%__MODULE__{} = state, event) do - started_at = System.monotonic_time() event_id = Map.get(event, "id", "") case maybe_allow_event_ingest(state) do {:ok, next_state} -> - result = - with :ok <- validate_event_payload_size(event, next_state.max_event_bytes), - :ok <- Protocol.validate_event(event), - :ok <- - EventPolicy.authorize_write( - event, - next_state.authenticated_pubkeys, - request_context(next_state) - ), - :ok <- maybe_process_group_event(event), - {:ok, _result, message} <- persist_event(event) do - {:ok, message} - end - - handle_event_ingest_result(result, next_state, event, event_id, started_at) + publish_event_response(next_state, event) {:error, reason} -> ingest_error_response(state, event_id, reason) end end - defp handle_event_ingest_result( - {:ok, message}, - %__MODULE__{} = state, - event, - event_id, - started_at - ) do - Telemetry.emit( - [:parrhesia, :ingest, :stop], - %{duration: System.monotonic_time() - started_at}, - telemetry_metadata_for_event(event) - ) + defp publish_event_response(%__MODULE__{} = state, event) do + case Events.publish( + event, + context: request_context(state), + max_event_bytes: state.max_event_bytes + ) do + {:ok, %{event_id: event_id, accepted: accepted, message: message, reason: reason}} -> + response = Protocol.encode_relay({:ok, event_id, accepted, message}) + maybe_with_auth_challenge(state, reason, response) - send(self(), {@post_ack_ingest, event}) - - response = Protocol.encode_relay({:ok, event_id, true, message}) - {:push, {:text, response}, state} + {:error, reason} -> + ingest_error_response(state, Map.get(event, "id", ""), reason) + end end - defp handle_event_ingest_result( - {:error, reason}, - %__MODULE__{} = state, - _event, - event_id, - _started_at - ), - do: ingest_error_response(state, event_id, reason) + defp maybe_with_auth_challenge(state, reason, response) do + if reason in [:auth_required, :protected_event_requires_auth] do + with_auth_challenge_frame(state, {:push, {:text, response}, state}) + else + {:push, {:text, response}, state} + end + end defp ingest_error_response(%__MODULE__{} = state, event_id, reason) do message = error_message_for_ingest_failure(reason) @@ -293,8 +264,6 @@ defmodule Parrhesia.Web.Connection do end defp handle_req(%__MODULE__{} = state, subscription_id, filters) do - started_at = System.monotonic_time() - with :ok <- Filter.validate_filters(filters), :ok <- EventPolicy.authorize_read( @@ -304,13 +273,12 @@ defmodule Parrhesia.Web.Connection do ), {:ok, next_state} <- upsert_subscription(state, subscription_id, filters), :ok <- maybe_upsert_index_subscription(next_state, subscription_id, filters), - {:ok, events} <- query_initial_events(filters, state.authenticated_pubkeys) do - Telemetry.emit( - [:parrhesia, :query, :stop], - %{duration: System.monotonic_time() - started_at}, - telemetry_metadata_for_filters(filters) - ) - + {:ok, events} <- + Events.query(filters, + context: request_context(next_state, subscription_id), + validate_filters?: false, + authorize_read?: false + ) do frames = Enum.map(events, fn event -> {:text, Protocol.encode_relay({:event, subscription_id, event})} @@ -396,77 +364,37 @@ defmodule Parrhesia.Web.Connection do end defp handle_count(%__MODULE__{} = state, subscription_id, filters, options) do - started_at = System.monotonic_time() - - with :ok <- Filter.validate_filters(filters), - :ok <- - EventPolicy.authorize_read( - filters, - state.authenticated_pubkeys, - request_context(state, subscription_id) - ), - {:ok, count} <- count_events(filters, state.authenticated_pubkeys), - {:ok, payload} <- build_count_payload(filters, count, options) do - Telemetry.emit( - [:parrhesia, :query, :stop], - %{duration: System.monotonic_time() - started_at}, - telemetry_metadata_for_filters(filters) - ) - - response = Protocol.encode_relay({:count, subscription_id, payload}) - {:push, {:text, response}, state} - else - {:error, :auth_required} -> - restricted_count_notice(state, subscription_id, EventPolicy.error_message(:auth_required)) - - {:error, :pubkey_not_allowed} -> - restricted_count_notice( - state, - subscription_id, - EventPolicy.error_message(:pubkey_not_allowed) - ) - - {:error, :restricted_giftwrap} -> - restricted_count_notice( - state, - subscription_id, - EventPolicy.error_message(:restricted_giftwrap) - ) - - {:error, :sync_read_not_allowed} -> - restricted_count_notice( - state, - subscription_id, - EventPolicy.error_message(:sync_read_not_allowed) - ) - - {:error, :marmot_group_h_tag_required} -> - restricted_count_notice( - state, - subscription_id, - EventPolicy.error_message(:marmot_group_h_tag_required) - ) - - {:error, :marmot_group_h_values_exceeded} -> - restricted_count_notice( - state, - subscription_id, - EventPolicy.error_message(:marmot_group_h_values_exceeded) - ) - - {:error, :marmot_group_filter_window_too_wide} -> - restricted_count_notice( - state, - subscription_id, - EventPolicy.error_message(:marmot_group_filter_window_too_wide) - ) + case Events.count(filters, + context: request_context(state, subscription_id), + options: options + ) do + {:ok, payload} -> + response = Protocol.encode_relay({:count, subscription_id, payload}) + {:push, {:text, response}, state} {:error, reason} -> - response = Protocol.encode_relay({:closed, subscription_id, inspect(reason)}) - {:push, {:text, response}, state} + handle_count_error(state, subscription_id, reason) end end + defp handle_count_error(state, subscription_id, reason) + when reason in [ + :auth_required, + :pubkey_not_allowed, + :restricted_giftwrap, + :sync_read_not_allowed, + :marmot_group_h_tag_required, + :marmot_group_h_values_exceeded, + :marmot_group_filter_window_too_wide + ] do + restricted_count_notice(state, subscription_id, EventPolicy.error_message(reason)) + end + + defp handle_count_error(state, subscription_id, reason) do + response = Protocol.encode_relay({:closed, subscription_id, inspect(reason)}) + {:push, {:text, response}, state} + end + defp handle_auth(%__MODULE__{} = state, auth_event) do event_id = Map.get(auth_event, "id", "") @@ -534,52 +462,6 @@ defmodule Parrhesia.Web.Connection do {:ok, state} end - defp maybe_process_group_event(event) do - if Flow.group_related_kind?(Map.get(event, "kind")) do - Flow.handle_event(event) - else - :ok - end - end - - defp persist_event(event) do - kind = Map.get(event, "kind") - - cond do - kind in [5, 62] -> persist_control_event(kind, event) - ephemeral_kind?(kind) -> persist_ephemeral_event() - true -> persist_regular_event(event) - end - end - - defp persist_control_event(5, event) do - with {:ok, deleted_count} <- Storage.events().delete_by_request(%{}, event) do - {:ok, deleted_count, "ok: deletion request processed"} - end - end - - defp persist_control_event(62, event) do - with {:ok, deleted_count} <- Storage.events().vanish(%{}, event) do - {:ok, deleted_count, "ok: vanish request processed"} - end - end - - defp persist_ephemeral_event do - if accept_ephemeral_events?() do - {:ok, :ephemeral, "ok: ephemeral event accepted"} - else - {:error, :ephemeral_events_disabled} - end - end - - defp persist_regular_event(event) do - case Storage.events().put_event(%{}, event) do - {:ok, persisted_event} -> {:ok, persisted_event, "ok: event stored"} - {:error, :duplicate_event} -> {:error, :duplicate_event} - {:error, reason} -> {:error, reason} - end - end - defp error_message_for_ingest_failure(:duplicate_event), do: "duplicate: event already stored" @@ -623,49 +505,6 @@ defmodule Parrhesia.Web.Connection do defp error_message_for_ingest_failure(reason) when is_binary(reason), do: reason defp error_message_for_ingest_failure(reason), do: "error: #{inspect(reason)}" - defp query_initial_events(filters, authenticated_pubkeys) do - Storage.events().query(%{}, filters, - max_filter_limit: Parrhesia.Config.get([:limits, :max_filter_limit]), - requester_pubkeys: MapSet.to_list(authenticated_pubkeys) - ) - end - - defp count_events(filters, authenticated_pubkeys) do - Storage.events().count(%{}, filters, requester_pubkeys: MapSet.to_list(authenticated_pubkeys)) - end - - defp build_count_payload(filters, count, options) when is_integer(count) and is_map(options) do - include_hll? = - Map.get(options, "hll", false) and Parrhesia.Config.get([:features, :nip_45_count], true) - - payload = %{"count" => count, "approximate" => false} - - payload = - if include_hll? do - Map.put(payload, "hll", generate_hll_payload(filters, count)) - else - payload - end - - {:ok, payload} - end - - defp generate_hll_payload(filters, count) do - filters - |> JSON.encode!() - |> then(&"#{&1}:#{count}") - |> then(&:crypto.hash(:sha256, &1)) - |> Base.encode64() - end - - defp telemetry_metadata_for_event(event) do - %{traffic_class: traffic_class_for_event(event)} - end - - defp telemetry_metadata_for_filters(filters) do - %{traffic_class: traffic_class_for_filters(filters)} - end - defp telemetry_metadata_for_fanout_events(fanout_events) do traffic_class = if Enum.any?(fanout_events, fn @@ -683,26 +522,6 @@ defmodule Parrhesia.Web.Connection do %{traffic_class: traffic_class} end - defp traffic_class_for_filters(filters) do - if Enum.any?(filters, &marmot_filter?/1) do - :marmot - else - :generic - end - end - - defp marmot_filter?(filter) when is_map(filter) do - has_marmot_kind? = - case Map.get(filter, "kinds") do - kinds when is_list(kinds) -> Enum.any?(kinds, &MapSet.member?(@marmot_kinds, &1)) - _other -> false - end - - has_marmot_kind? or Map.has_key?(filter, "#h") or Map.has_key?(filter, "#i") - end - - defp marmot_filter?(_filter), do: false - defp traffic_class_for_event(event) when is_map(event) do if MapSet.member?(@marmot_kinds, Map.get(event, "kind")) do :marmot @@ -981,27 +800,6 @@ defmodule Parrhesia.Web.Connection do :exit, _reason -> :ok end - defp fanout_event(event) do - case Index.candidate_subscription_keys(event) do - candidates when is_list(candidates) -> - Enum.each(candidates, fn {owner_pid, subscription_id} -> - send(owner_pid, {:fanout_event, subscription_id, event}) - end) - - _other -> - :ok - end - catch - :exit, _reason -> :ok - end - - defp maybe_publish_multi_node(event) do - MultiNode.publish(event) - :ok - catch - :exit, _reason -> :ok - end - defp handle_fanout_events(%__MODULE__{} = state, fanout_events) do started_at = System.monotonic_time() telemetry_metadata = telemetry_metadata_for_fanout_events(fanout_events) @@ -1656,24 +1454,4 @@ defmodule Parrhesia.Web.Connection do {:error, :event_rate_limited} end end - - defp validate_event_payload_size(event, max_event_bytes) - when is_map(event) and is_integer(max_event_bytes) and max_event_bytes > 0 do - if byte_size(JSON.encode!(event)) <= max_event_bytes do - :ok - else - {:error, :event_too_large} - end - end - - defp validate_event_payload_size(_event, _max_event_bytes), do: :ok - - defp ephemeral_kind?(kind) when is_integer(kind), do: kind >= 20_000 and kind < 30_000 - defp ephemeral_kind?(_kind), do: false - - defp accept_ephemeral_events? do - :parrhesia - |> Application.get_env(:policies, []) - |> Keyword.get(:accept_ephemeral_events, true) - end end diff --git a/test/parrhesia/api/events_test.exs b/test/parrhesia/api/events_test.exs new file mode 100644 index 0000000..09f8bd3 --- /dev/null +++ b/test/parrhesia/api/events_test.exs @@ -0,0 +1,81 @@ +defmodule Parrhesia.API.EventsTest do + use ExUnit.Case, async: false + + alias Ecto.Adapters.SQL.Sandbox + alias Parrhesia.API.Events + alias Parrhesia.API.RequestContext + alias Parrhesia.Protocol.EventValidator + alias Parrhesia.Repo + + setup do + :ok = Sandbox.checkout(Repo) + :ok + end + + test "publish stores valid events through the shared API" do + event = valid_event() + + assert {:ok, result} = Events.publish(event, context: %RequestContext{}) + assert result.accepted + assert result.event_id == event["id"] + assert result.message == "ok: event stored" + assert result.reason == nil + + assert {:ok, stored_event} = Parrhesia.Storage.events().get_event(%{}, event["id"]) + assert stored_event["id"] == event["id"] + end + + test "publish returns duplicate results without raising transport errors" do + event = valid_event() + + assert {:ok, first_result} = Events.publish(event, context: %RequestContext{}) + assert first_result.accepted + + assert {:ok, second_result} = Events.publish(event, context: %RequestContext{}) + refute second_result.accepted + assert second_result.reason == :duplicate_event + assert second_result.message == "duplicate: event already stored" + end + + test "query and count preserve read semantics through the shared API" do + now = System.system_time(:second) + first = valid_event(%{"content" => "first", "created_at" => now}) + second = valid_event(%{"content" => "second", "created_at" => now + 1}) + + assert {:ok, %{accepted: true}} = Events.publish(first, context: %RequestContext{}) + assert {:ok, %{accepted: true}} = Events.publish(second, context: %RequestContext{}) + + assert {:ok, events} = + Events.query([%{"kinds" => [1]}], context: %RequestContext{}) + + assert Enum.map(events, & &1["id"]) == [second["id"], first["id"]] + + assert {:ok, 2} = + Events.count([%{"kinds" => [1]}], context: %RequestContext{}) + + assert {:ok, %{"count" => 2, "approximate" => false}} = + Events.count([%{"kinds" => [1]}], + context: %RequestContext{}, + options: %{} + ) + end + + defp valid_event(overrides \\ %{}) do + base_event = %{ + "pubkey" => String.duplicate("1", 64), + "created_at" => System.system_time(:second), + "kind" => 1, + "tags" => [], + "content" => "hello", + "sig" => String.duplicate("3", 128) + } + + base_event + |> Map.merge(overrides) + |> recalculate_event_id() + end + + defp recalculate_event_id(event) do + Map.put(event, "id", EventValidator.compute_id(event)) + end +end diff --git a/test/parrhesia/api/stream_test.exs b/test/parrhesia/api/stream_test.exs new file mode 100644 index 0000000..d41efcf --- /dev/null +++ b/test/parrhesia/api/stream_test.exs @@ -0,0 +1,80 @@ +defmodule Parrhesia.API.StreamTest do + use ExUnit.Case, async: false + + alias Ecto.Adapters.SQL.Sandbox + alias Parrhesia.API.Events + alias Parrhesia.API.RequestContext + alias Parrhesia.API.Stream + alias Parrhesia.Protocol.EventValidator + alias Parrhesia.Repo + + setup do + :ok = Sandbox.checkout(Repo) + :ok + end + + test "subscribe streams catch-up events followed by eose" do + event = valid_event() + context = %RequestContext{} + + assert {:ok, %{accepted: true}} = Events.publish(event, context: context) + assert {:ok, ref} = Stream.subscribe(self(), "sub-1", [%{"kinds" => [1]}], context: context) + + assert_receive {:parrhesia, :event, ^ref, "sub-1", received_event} + assert received_event["id"] == event["id"] + assert_receive {:parrhesia, :eose, ^ref, "sub-1"} + assert :ok = Stream.unsubscribe(ref) + end + + test "subscribe receives live fanout events after eose" do + context = %RequestContext{} + event = valid_event() + + assert {:ok, ref} = + Stream.subscribe(self(), "sub-live", [%{"kinds" => [1]}], context: context) + + assert_receive {:parrhesia, :eose, ^ref, "sub-live"}, 1_000 + + assert {:ok, %{accepted: true}} = Events.publish(event, context: context) + + assert_receive {:parrhesia, :event, ^ref, "sub-live", received_event}, 1_000 + assert received_event["id"] == event["id"] + assert :ok = Stream.unsubscribe(ref) + end + + test "unsubscribe stops the subscription bridge" do + context = %RequestContext{} + + assert {:ok, ref} = + Stream.subscribe(self(), "sub-stop", [%{"kinds" => [1]}], context: context) + + assert_receive {:parrhesia, :eose, ^ref, "sub-stop"} + + [{stream_pid, _value}] = Registry.lookup(Parrhesia.API.Stream.Registry, ref) + _ = :sys.get_state(stream_pid) + monitor_ref = Process.monitor(stream_pid) + + assert :ok = Stream.unsubscribe(ref) + assert_receive {:DOWN, ^monitor_ref, :process, ^stream_pid, reason} + assert reason in [:normal, :noproc] + end + + defp valid_event(overrides \\ %{}) do + base_event = %{ + "pubkey" => String.duplicate("1", 64), + "created_at" => System.system_time(:second), + "kind" => 1, + "tags" => [], + "content" => "hello", + "sig" => String.duplicate("3", 128) + } + + base_event + |> Map.merge(overrides) + |> recalculate_event_id() + end + + defp recalculate_event_id(event) do + Map.put(event, "id", EventValidator.compute_id(event)) + end +end