diff --git a/lib/parrhesia/api/admin.ex b/lib/parrhesia/api/admin.ex index 0ab670d..259bf0a 100644 --- a/lib/parrhesia/api/admin.ex +++ b/lib/parrhesia/api/admin.ex @@ -9,6 +9,7 @@ defmodule Parrhesia.API.Admin do alias Parrhesia.Storage alias Parrhesia.Web.Endpoint + @supported_admin_methods ~w(health list_audit_logs stats) @supported_acl_methods ~w(acl_grant acl_revoke acl_list) @supported_identity_methods ~w(identity_ensure identity_get identity_import identity_rotate) @supported_listener_methods ~w(listener_reload) @@ -98,6 +99,7 @@ defmodule Parrhesia.API.Admin do end (storage_supported ++ + @supported_admin_methods ++ @supported_acl_methods ++ @supported_identity_methods ++ @supported_listener_methods ++ @supported_sync_methods) |> Enum.uniq() @@ -114,6 +116,13 @@ defmodule Parrhesia.API.Admin do Identity.import(params) end + defp admin_stats(_params, opts), do: stats(opts) + defp admin_health(_params, opts), do: health(opts) + + defp admin_list_audit_logs(params, _opts) do + list_audit_logs(audit_log_opts(params)) + end + defp listener_reload(params) do case normalize_listener_id(fetch_value(params, :id)) do :all -> @@ -185,6 +194,9 @@ defmodule Parrhesia.API.Admin do defp sync_stats(_params, opts), do: Sync.sync_stats(opts) defp sync_health(_params, opts), do: Sync.sync_health(opts) + defp execute_builtin("stats", params, opts), do: admin_stats(params, opts) + defp execute_builtin("health", params, opts), do: admin_health(params, opts) + defp execute_builtin("list_audit_logs", params, opts), do: admin_list_audit_logs(params, opts) defp execute_builtin("acl_grant", params, _opts), do: acl_grant(params) defp execute_builtin("acl_revoke", params, _opts), do: acl_revoke(params) defp execute_builtin("acl_list", params, _opts), do: acl_list(params) @@ -220,6 +232,13 @@ defmodule Parrhesia.API.Admin do defp overall_health_status(%{"status" => "degraded"}), do: "degraded" defp overall_health_status(_sync_health), do: "ok" + defp audit_log_opts(params) do + [] + |> maybe_put_opt(:limit, fetch_value(params, :limit)) + |> maybe_put_opt(:method, fetch_value(params, :method)) + |> maybe_put_opt(:actor_pubkey, fetch_value(params, :actor_pubkey)) + end + defp maybe_put_opt(opts, _key, nil), do: opts defp maybe_put_opt(opts, key, value), do: Keyword.put(opts, key, value) diff --git a/lib/parrhesia/web/connection.ex b/lib/parrhesia/web/connection.ex index b982e45..0e814fa 100644 --- a/lib/parrhesia/web/connection.ex +++ b/lib/parrhesia/web/connection.ex @@ -7,6 +7,7 @@ defmodule Parrhesia.Web.Connection do alias Parrhesia.API.Events alias Parrhesia.API.RequestContext + alias Parrhesia.API.Stream alias Parrhesia.Auth.Challenges alias Parrhesia.Negentropy.Sessions alias Parrhesia.Policy.ConnectionPolicy @@ -70,6 +71,7 @@ defmodule Parrhesia.Web.Connection do @type overflow_strategy :: :close | :drop_oldest | :drop_newest @type subscription :: %{ + ref: reference(), filters: [map()], eose_sent?: boolean() } @@ -181,6 +183,7 @@ defmodule Parrhesia.Web.Connection do defp handle_decoded_message({:close, subscription_id}, state) do next_state = state + |> maybe_unsubscribe_stream_subscription(subscription_id) |> drop_subscription(subscription_id) |> drop_queued_subscription_events(subscription_id) @@ -193,6 +196,50 @@ defmodule Parrhesia.Web.Connection do end @impl true + def handle_info( + {:parrhesia, :event, ref, subscription_id, event}, + %__MODULE__{} = state + ) + when is_reference(ref) and is_binary(subscription_id) and is_map(event) do + if current_subscription_ref?(state, subscription_id, ref) do + handle_fanout_events(state, [{subscription_id, event}]) + else + {:ok, state} + end + end + + def handle_info( + {:parrhesia, :eose, ref, subscription_id}, + %__MODULE__{} = state + ) + when is_reference(ref) and is_binary(subscription_id) do + if current_subscription_ref?(state, subscription_id, ref) and + not subscription_eose_sent?(state, subscription_id) do + response = Protocol.encode_relay({:eose, subscription_id}) + {:push, {:text, response}, mark_subscription_eose_sent(state, subscription_id)} + else + {:ok, state} + end + end + + def handle_info( + {:parrhesia, :closed, ref, subscription_id, reason}, + %__MODULE__{} = state + ) + when is_reference(ref) and is_binary(subscription_id) do + if current_subscription_ref?(state, subscription_id, ref) do + next_state = + state + |> drop_subscription(subscription_id) + |> drop_queued_subscription_events(subscription_id) + + response = Protocol.encode_relay({:closed, subscription_id, stream_closed_reason(reason)}) + {:push, {:text, response}, next_state} + else + {:ok, state} + end + end + def handle_info({:fanout_event, subscription_id, event}, %__MODULE__{} = state) when is_binary(subscription_id) and is_map(event) do handle_fanout_events(state, [{subscription_id, event}]) @@ -219,6 +266,7 @@ defmodule Parrhesia.Web.Connection do @impl true def terminate(_reason, %__MODULE__{} = state) do + :ok = maybe_unsubscribe_all_stream_subscriptions(state) :ok = maybe_remove_index_owner(state) :ok = maybe_clear_auth_challenge(state) :ok @@ -282,20 +330,18 @@ defmodule Parrhesia.Web.Connection do state.authenticated_pubkeys, request_context(state, subscription_id) ), - {:ok, next_state} <- upsert_subscription(state, subscription_id, filters), - :ok <- maybe_upsert_index_subscription(next_state, subscription_id, filters), - {:ok, events} <- - Events.query(filters, - context: request_context(next_state, subscription_id), - validate_filters?: false, - authorize_read?: false + :ok <- ensure_subscription_capacity(state, subscription_id), + {:ok, ref} <- + Stream.subscribe(self(), subscription_id, filters, + context: request_context(state, subscription_id) ) do - frames = - Enum.map(events, fn event -> - {:text, Protocol.encode_relay({:event, subscription_id, event})} - end) ++ [{:text, Protocol.encode_relay({:eose, subscription_id})}] + next_state = + state + |> maybe_unsubscribe_stream_subscription(subscription_id) + |> put_subscription(subscription_id, %{ref: ref, filters: filters, eose_sent?: false}) - {:push, frames, next_state} + {frames, finalized_state} = drain_initial_stream_frames(next_state, ref, subscription_id) + {:push, frames, finalized_state} else {:error, :auth_required} -> restricted_close(state, subscription_id, EventPolicy.error_message(:auth_required)) @@ -1049,15 +1095,13 @@ defmodule Parrhesia.Web.Connection do end end - defp upsert_subscription(%__MODULE__{} = state, subscription_id, filters) do - subscription = %{filters: filters, eose_sent?: true} - + defp ensure_subscription_capacity(%__MODULE__{} = state, subscription_id) do cond do Map.has_key?(state.subscriptions, subscription_id) -> - {:ok, put_subscription(state, subscription_id, subscription)} + :ok map_size(state.subscriptions) < state.max_subscriptions_per_connection -> - {:ok, put_subscription(state, subscription_id, subscription)} + :ok true -> {:error, :subscription_limit_reached} @@ -1101,26 +1145,6 @@ defmodule Parrhesia.Web.Connection do next_state end - defp maybe_upsert_index_subscription( - %__MODULE__{subscription_index: nil}, - _subscription_id, - _filters - ), - do: :ok - - defp maybe_upsert_index_subscription( - %__MODULE__{subscription_index: subscription_index}, - subscription_id, - filters - ) do - case Index.upsert(subscription_index, self(), subscription_id, filters) do - :ok -> :ok - {:error, _reason} -> :ok - end - catch - :exit, _reason -> :ok - end - defp maybe_remove_index_subscription( %__MODULE__{subscription_index: nil}, _subscription_id @@ -1146,6 +1170,81 @@ defmodule Parrhesia.Web.Connection do :exit, _reason -> :ok end + defp maybe_unsubscribe_stream_subscription(%__MODULE__{} = state, subscription_id) do + case Map.get(state.subscriptions, subscription_id) do + %{ref: ref} when is_reference(ref) -> Stream.unsubscribe(ref) + _other -> :ok + end + + state + end + + defp maybe_unsubscribe_all_stream_subscriptions(%__MODULE__{} = state) do + Enum.each(state.subscriptions, fn {_subscription_id, subscription} -> + case Map.get(subscription, :ref) do + ref when is_reference(ref) -> Stream.unsubscribe(ref) + _other -> :ok + end + end) + + :ok + end + + defp current_subscription_ref?(%__MODULE__{} = state, subscription_id, ref) do + case Map.get(state.subscriptions, subscription_id) do + %{ref: ^ref} -> true + _other -> false + end + end + + defp subscription_eose_sent?(%__MODULE__{} = state, subscription_id) do + case Map.get(state.subscriptions, subscription_id) do + %{eose_sent?: true} -> true + _other -> false + end + end + + defp mark_subscription_eose_sent(%__MODULE__{} = state, subscription_id) do + case Map.get(state.subscriptions, subscription_id) do + nil -> + state + + subscription -> + put_subscription(state, subscription_id, Map.put(subscription, :eose_sent?, true)) + end + end + + defp drain_initial_stream_frames(%__MODULE__{} = state, ref, subscription_id) do + do_drain_initial_stream_frames(state, ref, subscription_id, []) + end + + defp do_drain_initial_stream_frames(state, ref, subscription_id, acc) do + receive do + {:parrhesia, :event, ^ref, ^subscription_id, event} -> + frame = {:text, Protocol.encode_relay({:event, subscription_id, event})} + do_drain_initial_stream_frames(state, ref, subscription_id, [frame | acc]) + + {:parrhesia, :eose, ^ref, ^subscription_id} -> + next_state = mark_subscription_eose_sent(state, subscription_id) + frame = {:text, Protocol.encode_relay({:eose, subscription_id})} + {Enum.reverse([frame | acc]), next_state} + + {:parrhesia, :closed, ^ref, ^subscription_id, reason} -> + next_state = drop_subscription(state, subscription_id) + + frame = + {:text, Protocol.encode_relay({:closed, subscription_id, stream_closed_reason(reason)})} + + {Enum.reverse([frame | acc]), next_state} + after + 0 -> + {Enum.reverse(acc), state} + end + end + + defp stream_closed_reason(reason) when is_binary(reason), do: reason + defp stream_closed_reason(reason), do: inspect(reason) + defp subscription_index(opts) when is_list(opts) do opts |> Keyword.get(:subscription_index, Index) diff --git a/lib/parrhesia/web/management.ex b/lib/parrhesia/web/management.ex index ad9e9bb..c637376 100644 --- a/lib/parrhesia/web/management.ex +++ b/lib/parrhesia/web/management.ex @@ -19,7 +19,7 @@ defmodule Parrhesia.Web.Management do with {:ok, auth_context} <- maybe_validate_nip98(auth_required?, authorization, method, full_url), {:ok, payload} <- parse_payload(conn.body_params), - {:ok, result} <- execute_method(payload), + {:ok, result} <- execute_method(payload, opts), :ok <- append_audit_log(auth_context, payload, result) do send_json(conn, 200, %{"ok" => true, "result" => result}) else @@ -69,8 +69,8 @@ defmodule Parrhesia.Web.Management do defp parse_payload(_payload), do: {:error, :invalid_payload} - defp execute_method(payload) do - Admin.execute(payload.method, payload.params) + defp execute_method(payload, opts) do + Admin.execute(payload.method, payload.params, opts) end defp append_audit_log(auth_context, payload, result) do diff --git a/test/parrhesia/api/sync_test.exs b/test/parrhesia/api/sync_test.exs index ccde79f..3b4eacf 100644 --- a/test/parrhesia/api/sync_test.exs +++ b/test/parrhesia/api/sync_test.exs @@ -120,9 +120,16 @@ defmodule Parrhesia.API.SyncTest do assert sync_stats["sync"]["servers_total"] == 1 assert sync_stats["sync"]["query_runs"] == 1 + assert {:ok, execute_stats} = Admin.execute("stats", %{}, manager: manager) + assert execute_stats["sync"]["servers_total"] == 1 + assert {:ok, health} = Admin.health(manager: manager) assert health["status"] == "ok" assert health["sync"]["servers_total"] == 1 + + assert {:ok, execute_health} = Admin.execute("health", %{}, manager: manager) + assert execute_health["status"] == "ok" + assert execute_health["sync"]["servers_total"] == 1 end defp start_sync_manager do diff --git a/test/parrhesia/web/connection_test.exs b/test/parrhesia/web/connection_test.exs index 99265bc..4c21ffc 100644 --- a/test/parrhesia/web/connection_test.exs +++ b/test/parrhesia/web/connection_test.exs @@ -3,6 +3,8 @@ defmodule Parrhesia.Web.ConnectionTest do alias Ecto.Adapters.SQL.Sandbox alias Parrhesia.API.ACL + alias Parrhesia.API.Events + alias Parrhesia.API.RequestContext alias Parrhesia.Negentropy.Engine alias Parrhesia.Negentropy.Message alias Parrhesia.Protocol.EventValidator @@ -11,6 +13,7 @@ defmodule Parrhesia.Web.ConnectionTest do setup do :ok = Sandbox.checkout(Repo) + ensure_stream_runtime_started() :ok end @@ -738,16 +741,47 @@ defmodule Parrhesia.Web.ConnectionTest do test "CLOSE removes subscription and replies with CLOSED" do state = subscribed_connection_state([]) + subscription = state.subscriptions["sub-1"] + [{stream_pid, _value}] = Registry.lookup(Parrhesia.API.Stream.Registry, subscription.ref) + monitor_ref = Process.monitor(stream_pid) close_payload = JSON.encode!(["CLOSE", "sub-1"]) assert {:push, {:text, response}, next_state} = Connection.handle_in({close_payload, [opcode: :text]}, state) + assert_receive {:DOWN, ^monitor_ref, :process, ^stream_pid, :normal} refute Map.has_key?(next_state.subscriptions, "sub-1") assert JSON.decode!(response) == ["CLOSED", "sub-1", "error: subscription closed"] end + test "REQ live delivery is bridged through API.Stream" do + state = subscribed_connection_state([]) + subscription = state.subscriptions["sub-1"] + subscription_ref = subscription.ref + event = valid_event(%{"content" => "stream-live"}) |> recalculate_event_id() + + assert {:ok, %{accepted: true}} = Events.publish(event, context: %RequestContext{}) + + assert_receive {:parrhesia, :event, ^subscription_ref, "sub-1", received_event} + assert received_event["id"] == event["id"] + + assert {:ok, queued_state} = + Connection.handle_info( + {:parrhesia, :event, subscription_ref, "sub-1", received_event}, + state + ) + + assert queued_state.outbound_queue_size == 1 + assert_receive :drain_outbound_queue + + assert {:push, [{:text, payload}], drained_state} = + Connection.handle_info(:drain_outbound_queue, queued_state) + + assert drained_state.outbound_queue_size == 0 + assert JSON.decode!(payload) == ["EVENT", "sub-1", received_event] + end + test "fanout_event enqueues and drains matching events" do state = subscribed_connection_state([]) event = live_event("event-1", 1) @@ -841,6 +875,26 @@ defmodule Parrhesia.Web.ConnectionTest do state end + defp ensure_stream_runtime_started do + if is_nil(Process.whereis(Parrhesia.Subscriptions.Supervisor)) do + start_supervised!({Parrhesia.Subscriptions.Supervisor, []}) + end + + if is_nil(Process.whereis(Parrhesia.Subscriptions.Index)) do + start_supervised!({Parrhesia.Subscriptions.Index, name: Parrhesia.Subscriptions.Index}) + end + + if is_nil(Process.whereis(Parrhesia.API.Stream.Registry)) do + start_supervised!({Registry, keys: :unique, name: Parrhesia.API.Stream.Registry}) + end + + if is_nil(Process.whereis(Parrhesia.API.Stream.Supervisor)) do + start_supervised!( + {DynamicSupervisor, strategy: :one_for_one, name: Parrhesia.API.Stream.Supervisor} + ) + end + end + defp listener(overrides) do base = %{ id: :test, diff --git a/test/parrhesia/web/router_test.exs b/test/parrhesia/web/router_test.exs index 1564799..5b8fcaf 100644 --- a/test/parrhesia/web/router_test.exs +++ b/test/parrhesia/web/router_test.exs @@ -5,6 +5,7 @@ defmodule Parrhesia.Web.RouterTest do import Plug.Test alias Ecto.Adapters.SQL.Sandbox + alias Parrhesia.API.Sync alias Parrhesia.Protocol.EventValidator alias Parrhesia.Repo alias Parrhesia.Web.Listener @@ -334,6 +335,84 @@ defmodule Parrhesia.Web.RouterTest do assert byte_size(pubkey) == 64 end + test "POST /management stats and health include sync summary" do + management_url = "http://www.example.com/management" + auth_event = nip98_event("POST", management_url) + authorization = "Nostr " <> Base.encode64(JSON.encode!(auth_event)) + initial_total = Sync.sync_stats() |> elem(1) |> Map.fetch!("servers_total") + server_id = "router-sync-#{System.unique_integer([:positive, :monotonic])}" + + assert {:ok, _server} = + Sync.put_server(%{ + "id" => server_id, + "url" => "wss://relay-a.example/relay", + "enabled?" => false, + "auth_pubkey" => String.duplicate("a", 64), + "filters" => [%{"kinds" => [5000], "#r" => ["tribes.accounts.user"]}], + "tls" => %{ + "pins" => [ + %{ + "type" => "spki_sha256", + "value" => "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA=" + } + ] + } + }) + + on_exit(fn -> + _ = Sync.remove_server(server_id) + end) + + stats_conn = + conn( + :post, + "/management", + JSON.encode!(%{ + "method" => "stats", + "params" => %{} + }) + ) + |> put_req_header("content-type", "application/json") + |> put_req_header("authorization", authorization) + |> Router.call([]) + + assert stats_conn.status == 200 + + assert %{ + "ok" => true, + "result" => %{ + "sync" => %{"servers_total" => servers_total} + } + } = JSON.decode!(stats_conn.resp_body) + + assert servers_total == initial_total + 1 + + health_conn = + conn( + :post, + "/management", + JSON.encode!(%{ + "method" => "health", + "params" => %{} + }) + ) + |> put_req_header("content-type", "application/json") + |> put_req_header("authorization", authorization) + |> Router.call([]) + + assert health_conn.status == 200 + + assert %{ + "ok" => true, + "result" => %{ + "status" => status, + "sync" => %{"servers_total" => ^servers_total} + } + } = JSON.decode!(health_conn.resp_body) + + assert status in ["ok", "degraded"] + end + test "POST /management returns not found when admin feature is disabled on the listener" do conn = conn(:post, "/management", JSON.encode!(%{"method" => "ping", "params" => %{}}))