Align websocket and admin APIs with shared surfaces

This commit is contained in:
2026-03-17 01:08:41 +01:00
parent e8fd6c7328
commit 02f2584757
6 changed files with 298 additions and 40 deletions

View File

@@ -9,6 +9,7 @@ defmodule Parrhesia.API.Admin do
alias Parrhesia.Storage alias Parrhesia.Storage
alias Parrhesia.Web.Endpoint alias Parrhesia.Web.Endpoint
@supported_admin_methods ~w(health list_audit_logs stats)
@supported_acl_methods ~w(acl_grant acl_revoke acl_list) @supported_acl_methods ~w(acl_grant acl_revoke acl_list)
@supported_identity_methods ~w(identity_ensure identity_get identity_import identity_rotate) @supported_identity_methods ~w(identity_ensure identity_get identity_import identity_rotate)
@supported_listener_methods ~w(listener_reload) @supported_listener_methods ~w(listener_reload)
@@ -98,6 +99,7 @@ defmodule Parrhesia.API.Admin do
end end
(storage_supported ++ (storage_supported ++
@supported_admin_methods ++
@supported_acl_methods ++ @supported_acl_methods ++
@supported_identity_methods ++ @supported_listener_methods ++ @supported_sync_methods) @supported_identity_methods ++ @supported_listener_methods ++ @supported_sync_methods)
|> Enum.uniq() |> Enum.uniq()
@@ -114,6 +116,13 @@ defmodule Parrhesia.API.Admin do
Identity.import(params) Identity.import(params)
end end
defp admin_stats(_params, opts), do: stats(opts)
defp admin_health(_params, opts), do: health(opts)
defp admin_list_audit_logs(params, _opts) do
list_audit_logs(audit_log_opts(params))
end
defp listener_reload(params) do defp listener_reload(params) do
case normalize_listener_id(fetch_value(params, :id)) do case normalize_listener_id(fetch_value(params, :id)) do
:all -> :all ->
@@ -185,6 +194,9 @@ defmodule Parrhesia.API.Admin do
defp sync_stats(_params, opts), do: Sync.sync_stats(opts) defp sync_stats(_params, opts), do: Sync.sync_stats(opts)
defp sync_health(_params, opts), do: Sync.sync_health(opts) defp sync_health(_params, opts), do: Sync.sync_health(opts)
defp execute_builtin("stats", params, opts), do: admin_stats(params, opts)
defp execute_builtin("health", params, opts), do: admin_health(params, opts)
defp execute_builtin("list_audit_logs", params, opts), do: admin_list_audit_logs(params, opts)
defp execute_builtin("acl_grant", params, _opts), do: acl_grant(params) defp execute_builtin("acl_grant", params, _opts), do: acl_grant(params)
defp execute_builtin("acl_revoke", params, _opts), do: acl_revoke(params) defp execute_builtin("acl_revoke", params, _opts), do: acl_revoke(params)
defp execute_builtin("acl_list", params, _opts), do: acl_list(params) defp execute_builtin("acl_list", params, _opts), do: acl_list(params)
@@ -220,6 +232,13 @@ defmodule Parrhesia.API.Admin do
defp overall_health_status(%{"status" => "degraded"}), do: "degraded" defp overall_health_status(%{"status" => "degraded"}), do: "degraded"
defp overall_health_status(_sync_health), do: "ok" defp overall_health_status(_sync_health), do: "ok"
defp audit_log_opts(params) do
[]
|> maybe_put_opt(:limit, fetch_value(params, :limit))
|> maybe_put_opt(:method, fetch_value(params, :method))
|> maybe_put_opt(:actor_pubkey, fetch_value(params, :actor_pubkey))
end
defp maybe_put_opt(opts, _key, nil), do: opts defp maybe_put_opt(opts, _key, nil), do: opts
defp maybe_put_opt(opts, key, value), do: Keyword.put(opts, key, value) defp maybe_put_opt(opts, key, value), do: Keyword.put(opts, key, value)

View File

@@ -7,6 +7,7 @@ defmodule Parrhesia.Web.Connection do
alias Parrhesia.API.Events alias Parrhesia.API.Events
alias Parrhesia.API.RequestContext alias Parrhesia.API.RequestContext
alias Parrhesia.API.Stream
alias Parrhesia.Auth.Challenges alias Parrhesia.Auth.Challenges
alias Parrhesia.Negentropy.Sessions alias Parrhesia.Negentropy.Sessions
alias Parrhesia.Policy.ConnectionPolicy alias Parrhesia.Policy.ConnectionPolicy
@@ -70,6 +71,7 @@ defmodule Parrhesia.Web.Connection do
@type overflow_strategy :: :close | :drop_oldest | :drop_newest @type overflow_strategy :: :close | :drop_oldest | :drop_newest
@type subscription :: %{ @type subscription :: %{
ref: reference(),
filters: [map()], filters: [map()],
eose_sent?: boolean() eose_sent?: boolean()
} }
@@ -181,6 +183,7 @@ defmodule Parrhesia.Web.Connection do
defp handle_decoded_message({:close, subscription_id}, state) do defp handle_decoded_message({:close, subscription_id}, state) do
next_state = next_state =
state state
|> maybe_unsubscribe_stream_subscription(subscription_id)
|> drop_subscription(subscription_id) |> drop_subscription(subscription_id)
|> drop_queued_subscription_events(subscription_id) |> drop_queued_subscription_events(subscription_id)
@@ -193,6 +196,50 @@ defmodule Parrhesia.Web.Connection do
end end
@impl true @impl true
def handle_info(
{:parrhesia, :event, ref, subscription_id, event},
%__MODULE__{} = state
)
when is_reference(ref) and is_binary(subscription_id) and is_map(event) do
if current_subscription_ref?(state, subscription_id, ref) do
handle_fanout_events(state, [{subscription_id, event}])
else
{:ok, state}
end
end
def handle_info(
{:parrhesia, :eose, ref, subscription_id},
%__MODULE__{} = state
)
when is_reference(ref) and is_binary(subscription_id) do
if current_subscription_ref?(state, subscription_id, ref) and
not subscription_eose_sent?(state, subscription_id) do
response = Protocol.encode_relay({:eose, subscription_id})
{:push, {:text, response}, mark_subscription_eose_sent(state, subscription_id)}
else
{:ok, state}
end
end
def handle_info(
{:parrhesia, :closed, ref, subscription_id, reason},
%__MODULE__{} = state
)
when is_reference(ref) and is_binary(subscription_id) do
if current_subscription_ref?(state, subscription_id, ref) do
next_state =
state
|> drop_subscription(subscription_id)
|> drop_queued_subscription_events(subscription_id)
response = Protocol.encode_relay({:closed, subscription_id, stream_closed_reason(reason)})
{:push, {:text, response}, next_state}
else
{:ok, state}
end
end
def handle_info({:fanout_event, subscription_id, event}, %__MODULE__{} = state) def handle_info({:fanout_event, subscription_id, event}, %__MODULE__{} = state)
when is_binary(subscription_id) and is_map(event) do when is_binary(subscription_id) and is_map(event) do
handle_fanout_events(state, [{subscription_id, event}]) handle_fanout_events(state, [{subscription_id, event}])
@@ -219,6 +266,7 @@ defmodule Parrhesia.Web.Connection do
@impl true @impl true
def terminate(_reason, %__MODULE__{} = state) do def terminate(_reason, %__MODULE__{} = state) do
:ok = maybe_unsubscribe_all_stream_subscriptions(state)
:ok = maybe_remove_index_owner(state) :ok = maybe_remove_index_owner(state)
:ok = maybe_clear_auth_challenge(state) :ok = maybe_clear_auth_challenge(state)
:ok :ok
@@ -282,20 +330,18 @@ defmodule Parrhesia.Web.Connection do
state.authenticated_pubkeys, state.authenticated_pubkeys,
request_context(state, subscription_id) request_context(state, subscription_id)
), ),
{:ok, next_state} <- upsert_subscription(state, subscription_id, filters), :ok <- ensure_subscription_capacity(state, subscription_id),
:ok <- maybe_upsert_index_subscription(next_state, subscription_id, filters), {:ok, ref} <-
{:ok, events} <- Stream.subscribe(self(), subscription_id, filters,
Events.query(filters, context: request_context(state, subscription_id)
context: request_context(next_state, subscription_id),
validate_filters?: false,
authorize_read?: false
) do ) do
frames = next_state =
Enum.map(events, fn event -> state
{:text, Protocol.encode_relay({:event, subscription_id, event})} |> maybe_unsubscribe_stream_subscription(subscription_id)
end) ++ [{:text, Protocol.encode_relay({:eose, subscription_id})}] |> put_subscription(subscription_id, %{ref: ref, filters: filters, eose_sent?: false})
{:push, frames, next_state} {frames, finalized_state} = drain_initial_stream_frames(next_state, ref, subscription_id)
{:push, frames, finalized_state}
else else
{:error, :auth_required} -> {:error, :auth_required} ->
restricted_close(state, subscription_id, EventPolicy.error_message(:auth_required)) restricted_close(state, subscription_id, EventPolicy.error_message(:auth_required))
@@ -1049,15 +1095,13 @@ defmodule Parrhesia.Web.Connection do
end end
end end
defp upsert_subscription(%__MODULE__{} = state, subscription_id, filters) do defp ensure_subscription_capacity(%__MODULE__{} = state, subscription_id) do
subscription = %{filters: filters, eose_sent?: true}
cond do cond do
Map.has_key?(state.subscriptions, subscription_id) -> Map.has_key?(state.subscriptions, subscription_id) ->
{:ok, put_subscription(state, subscription_id, subscription)} :ok
map_size(state.subscriptions) < state.max_subscriptions_per_connection -> map_size(state.subscriptions) < state.max_subscriptions_per_connection ->
{:ok, put_subscription(state, subscription_id, subscription)} :ok
true -> true ->
{:error, :subscription_limit_reached} {:error, :subscription_limit_reached}
@@ -1101,26 +1145,6 @@ defmodule Parrhesia.Web.Connection do
next_state next_state
end end
defp maybe_upsert_index_subscription(
%__MODULE__{subscription_index: nil},
_subscription_id,
_filters
),
do: :ok
defp maybe_upsert_index_subscription(
%__MODULE__{subscription_index: subscription_index},
subscription_id,
filters
) do
case Index.upsert(subscription_index, self(), subscription_id, filters) do
:ok -> :ok
{:error, _reason} -> :ok
end
catch
:exit, _reason -> :ok
end
defp maybe_remove_index_subscription( defp maybe_remove_index_subscription(
%__MODULE__{subscription_index: nil}, %__MODULE__{subscription_index: nil},
_subscription_id _subscription_id
@@ -1146,6 +1170,81 @@ defmodule Parrhesia.Web.Connection do
:exit, _reason -> :ok :exit, _reason -> :ok
end end
defp maybe_unsubscribe_stream_subscription(%__MODULE__{} = state, subscription_id) do
case Map.get(state.subscriptions, subscription_id) do
%{ref: ref} when is_reference(ref) -> Stream.unsubscribe(ref)
_other -> :ok
end
state
end
defp maybe_unsubscribe_all_stream_subscriptions(%__MODULE__{} = state) do
Enum.each(state.subscriptions, fn {_subscription_id, subscription} ->
case Map.get(subscription, :ref) do
ref when is_reference(ref) -> Stream.unsubscribe(ref)
_other -> :ok
end
end)
:ok
end
defp current_subscription_ref?(%__MODULE__{} = state, subscription_id, ref) do
case Map.get(state.subscriptions, subscription_id) do
%{ref: ^ref} -> true
_other -> false
end
end
defp subscription_eose_sent?(%__MODULE__{} = state, subscription_id) do
case Map.get(state.subscriptions, subscription_id) do
%{eose_sent?: true} -> true
_other -> false
end
end
defp mark_subscription_eose_sent(%__MODULE__{} = state, subscription_id) do
case Map.get(state.subscriptions, subscription_id) do
nil ->
state
subscription ->
put_subscription(state, subscription_id, Map.put(subscription, :eose_sent?, true))
end
end
defp drain_initial_stream_frames(%__MODULE__{} = state, ref, subscription_id) do
do_drain_initial_stream_frames(state, ref, subscription_id, [])
end
defp do_drain_initial_stream_frames(state, ref, subscription_id, acc) do
receive do
{:parrhesia, :event, ^ref, ^subscription_id, event} ->
frame = {:text, Protocol.encode_relay({:event, subscription_id, event})}
do_drain_initial_stream_frames(state, ref, subscription_id, [frame | acc])
{:parrhesia, :eose, ^ref, ^subscription_id} ->
next_state = mark_subscription_eose_sent(state, subscription_id)
frame = {:text, Protocol.encode_relay({:eose, subscription_id})}
{Enum.reverse([frame | acc]), next_state}
{:parrhesia, :closed, ^ref, ^subscription_id, reason} ->
next_state = drop_subscription(state, subscription_id)
frame =
{:text, Protocol.encode_relay({:closed, subscription_id, stream_closed_reason(reason)})}
{Enum.reverse([frame | acc]), next_state}
after
0 ->
{Enum.reverse(acc), state}
end
end
defp stream_closed_reason(reason) when is_binary(reason), do: reason
defp stream_closed_reason(reason), do: inspect(reason)
defp subscription_index(opts) when is_list(opts) do defp subscription_index(opts) when is_list(opts) do
opts opts
|> Keyword.get(:subscription_index, Index) |> Keyword.get(:subscription_index, Index)

View File

@@ -19,7 +19,7 @@ defmodule Parrhesia.Web.Management do
with {:ok, auth_context} <- with {:ok, auth_context} <-
maybe_validate_nip98(auth_required?, authorization, method, full_url), maybe_validate_nip98(auth_required?, authorization, method, full_url),
{:ok, payload} <- parse_payload(conn.body_params), {:ok, payload} <- parse_payload(conn.body_params),
{:ok, result} <- execute_method(payload), {:ok, result} <- execute_method(payload, opts),
:ok <- append_audit_log(auth_context, payload, result) do :ok <- append_audit_log(auth_context, payload, result) do
send_json(conn, 200, %{"ok" => true, "result" => result}) send_json(conn, 200, %{"ok" => true, "result" => result})
else else
@@ -69,8 +69,8 @@ defmodule Parrhesia.Web.Management do
defp parse_payload(_payload), do: {:error, :invalid_payload} defp parse_payload(_payload), do: {:error, :invalid_payload}
defp execute_method(payload) do defp execute_method(payload, opts) do
Admin.execute(payload.method, payload.params) Admin.execute(payload.method, payload.params, opts)
end end
defp append_audit_log(auth_context, payload, result) do defp append_audit_log(auth_context, payload, result) do

View File

@@ -120,9 +120,16 @@ defmodule Parrhesia.API.SyncTest do
assert sync_stats["sync"]["servers_total"] == 1 assert sync_stats["sync"]["servers_total"] == 1
assert sync_stats["sync"]["query_runs"] == 1 assert sync_stats["sync"]["query_runs"] == 1
assert {:ok, execute_stats} = Admin.execute("stats", %{}, manager: manager)
assert execute_stats["sync"]["servers_total"] == 1
assert {:ok, health} = Admin.health(manager: manager) assert {:ok, health} = Admin.health(manager: manager)
assert health["status"] == "ok" assert health["status"] == "ok"
assert health["sync"]["servers_total"] == 1 assert health["sync"]["servers_total"] == 1
assert {:ok, execute_health} = Admin.execute("health", %{}, manager: manager)
assert execute_health["status"] == "ok"
assert execute_health["sync"]["servers_total"] == 1
end end
defp start_sync_manager do defp start_sync_manager do

View File

@@ -3,6 +3,8 @@ defmodule Parrhesia.Web.ConnectionTest do
alias Ecto.Adapters.SQL.Sandbox alias Ecto.Adapters.SQL.Sandbox
alias Parrhesia.API.ACL alias Parrhesia.API.ACL
alias Parrhesia.API.Events
alias Parrhesia.API.RequestContext
alias Parrhesia.Negentropy.Engine alias Parrhesia.Negentropy.Engine
alias Parrhesia.Negentropy.Message alias Parrhesia.Negentropy.Message
alias Parrhesia.Protocol.EventValidator alias Parrhesia.Protocol.EventValidator
@@ -11,6 +13,7 @@ defmodule Parrhesia.Web.ConnectionTest do
setup do setup do
:ok = Sandbox.checkout(Repo) :ok = Sandbox.checkout(Repo)
ensure_stream_runtime_started()
:ok :ok
end end
@@ -738,16 +741,47 @@ defmodule Parrhesia.Web.ConnectionTest do
test "CLOSE removes subscription and replies with CLOSED" do test "CLOSE removes subscription and replies with CLOSED" do
state = subscribed_connection_state([]) state = subscribed_connection_state([])
subscription = state.subscriptions["sub-1"]
[{stream_pid, _value}] = Registry.lookup(Parrhesia.API.Stream.Registry, subscription.ref)
monitor_ref = Process.monitor(stream_pid)
close_payload = JSON.encode!(["CLOSE", "sub-1"]) close_payload = JSON.encode!(["CLOSE", "sub-1"])
assert {:push, {:text, response}, next_state} = assert {:push, {:text, response}, next_state} =
Connection.handle_in({close_payload, [opcode: :text]}, state) Connection.handle_in({close_payload, [opcode: :text]}, state)
assert_receive {:DOWN, ^monitor_ref, :process, ^stream_pid, :normal}
refute Map.has_key?(next_state.subscriptions, "sub-1") refute Map.has_key?(next_state.subscriptions, "sub-1")
assert JSON.decode!(response) == ["CLOSED", "sub-1", "error: subscription closed"] assert JSON.decode!(response) == ["CLOSED", "sub-1", "error: subscription closed"]
end end
test "REQ live delivery is bridged through API.Stream" do
state = subscribed_connection_state([])
subscription = state.subscriptions["sub-1"]
subscription_ref = subscription.ref
event = valid_event(%{"content" => "stream-live"}) |> recalculate_event_id()
assert {:ok, %{accepted: true}} = Events.publish(event, context: %RequestContext{})
assert_receive {:parrhesia, :event, ^subscription_ref, "sub-1", received_event}
assert received_event["id"] == event["id"]
assert {:ok, queued_state} =
Connection.handle_info(
{:parrhesia, :event, subscription_ref, "sub-1", received_event},
state
)
assert queued_state.outbound_queue_size == 1
assert_receive :drain_outbound_queue
assert {:push, [{:text, payload}], drained_state} =
Connection.handle_info(:drain_outbound_queue, queued_state)
assert drained_state.outbound_queue_size == 0
assert JSON.decode!(payload) == ["EVENT", "sub-1", received_event]
end
test "fanout_event enqueues and drains matching events" do test "fanout_event enqueues and drains matching events" do
state = subscribed_connection_state([]) state = subscribed_connection_state([])
event = live_event("event-1", 1) event = live_event("event-1", 1)
@@ -841,6 +875,26 @@ defmodule Parrhesia.Web.ConnectionTest do
state state
end end
defp ensure_stream_runtime_started do
if is_nil(Process.whereis(Parrhesia.Subscriptions.Supervisor)) do
start_supervised!({Parrhesia.Subscriptions.Supervisor, []})
end
if is_nil(Process.whereis(Parrhesia.Subscriptions.Index)) do
start_supervised!({Parrhesia.Subscriptions.Index, name: Parrhesia.Subscriptions.Index})
end
if is_nil(Process.whereis(Parrhesia.API.Stream.Registry)) do
start_supervised!({Registry, keys: :unique, name: Parrhesia.API.Stream.Registry})
end
if is_nil(Process.whereis(Parrhesia.API.Stream.Supervisor)) do
start_supervised!(
{DynamicSupervisor, strategy: :one_for_one, name: Parrhesia.API.Stream.Supervisor}
)
end
end
defp listener(overrides) do defp listener(overrides) do
base = %{ base = %{
id: :test, id: :test,

View File

@@ -5,6 +5,7 @@ defmodule Parrhesia.Web.RouterTest do
import Plug.Test import Plug.Test
alias Ecto.Adapters.SQL.Sandbox alias Ecto.Adapters.SQL.Sandbox
alias Parrhesia.API.Sync
alias Parrhesia.Protocol.EventValidator alias Parrhesia.Protocol.EventValidator
alias Parrhesia.Repo alias Parrhesia.Repo
alias Parrhesia.Web.Listener alias Parrhesia.Web.Listener
@@ -334,6 +335,84 @@ defmodule Parrhesia.Web.RouterTest do
assert byte_size(pubkey) == 64 assert byte_size(pubkey) == 64
end end
test "POST /management stats and health include sync summary" do
management_url = "http://www.example.com/management"
auth_event = nip98_event("POST", management_url)
authorization = "Nostr " <> Base.encode64(JSON.encode!(auth_event))
initial_total = Sync.sync_stats() |> elem(1) |> Map.fetch!("servers_total")
server_id = "router-sync-#{System.unique_integer([:positive, :monotonic])}"
assert {:ok, _server} =
Sync.put_server(%{
"id" => server_id,
"url" => "wss://relay-a.example/relay",
"enabled?" => false,
"auth_pubkey" => String.duplicate("a", 64),
"filters" => [%{"kinds" => [5000], "#r" => ["tribes.accounts.user"]}],
"tls" => %{
"pins" => [
%{
"type" => "spki_sha256",
"value" => "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA="
}
]
}
})
on_exit(fn ->
_ = Sync.remove_server(server_id)
end)
stats_conn =
conn(
:post,
"/management",
JSON.encode!(%{
"method" => "stats",
"params" => %{}
})
)
|> put_req_header("content-type", "application/json")
|> put_req_header("authorization", authorization)
|> Router.call([])
assert stats_conn.status == 200
assert %{
"ok" => true,
"result" => %{
"sync" => %{"servers_total" => servers_total}
}
} = JSON.decode!(stats_conn.resp_body)
assert servers_total == initial_total + 1
health_conn =
conn(
:post,
"/management",
JSON.encode!(%{
"method" => "health",
"params" => %{}
})
)
|> put_req_header("content-type", "application/json")
|> put_req_header("authorization", authorization)
|> Router.call([])
assert health_conn.status == 200
assert %{
"ok" => true,
"result" => %{
"status" => status,
"sync" => %{"servers_total" => ^servers_total}
}
} = JSON.decode!(health_conn.resp_body)
assert status in ["ok", "degraded"]
end
test "POST /management returns not found when admin feature is disabled on the listener" do test "POST /management returns not found when admin feature is disabled on the listener" do
conn = conn =
conn(:post, "/management", JSON.encode!(%{"method" => "ping", "params" => %{}})) conn(:post, "/management", JSON.encode!(%{"method" => "ping", "params" => %{}}))