Add sync control plane API

This commit is contained in:
2026-03-16 21:23:39 +01:00
parent 769177a63e
commit 9be3b6ca52
10 changed files with 1142 additions and 14 deletions

View File

@@ -9,6 +9,9 @@ config :parrhesia,
path: nil,
private_key: nil
],
sync: [
path: nil
],
limits: [
max_frame_bytes: 1_048_576,
max_event_bytes: 262_144,

View File

@@ -455,6 +455,9 @@ if config_env() == :prod do
path: string_env.("PARRHESIA_IDENTITY_PATH", nil),
private_key: string_env.("PARRHESIA_IDENTITY_PRIVATE_KEY", nil)
],
sync: [
path: string_env.("PARRHESIA_SYNC_PATH", nil)
],
moderation_cache_enabled:
bool_env.("PARRHESIA_MODERATION_CACHE_ENABLED", moderation_cache_enabled_default),
enable_expiration_worker:

View File

@@ -19,6 +19,9 @@ config :parrhesia,
path: Path.join(System.tmp_dir!(), "parrhesia_test_identity.json"),
private_key: nil
],
sync: [
path: Path.join(System.tmp_dir!(), "parrhesia_test_sync.json")
],
features: [verify_event_signatures: false]
pg_host = System.get_env("PGHOST")

View File

@@ -5,18 +5,31 @@ defmodule Parrhesia.API.Admin do
alias Parrhesia.API.ACL
alias Parrhesia.API.Identity
alias Parrhesia.API.Sync
alias Parrhesia.Storage
@supported_acl_methods ~w(acl_grant acl_revoke acl_list)
@supported_identity_methods ~w(identity_ensure identity_get identity_import identity_rotate)
@supported_sync_methods ~w(
sync_get_server
sync_health
sync_list_servers
sync_put_server
sync_remove_server
sync_server_stats
sync_start_server
sync_stats
sync_stop_server
sync_sync_now
)
@spec execute(String.t() | atom(), map(), keyword()) :: {:ok, map()} | {:error, term()}
def execute(method, params, opts \\ [])
def execute(method, params, _opts) when is_map(params) do
def execute(method, params, opts) when is_map(params) do
method_name = normalize_method_name(method)
case execute_builtin(method_name, params) do
case execute_builtin(method_name, params, opts) do
{:continue, other_method} -> Storage.admin().execute(%{}, other_method, params)
result -> result
end
@@ -26,10 +39,23 @@ defmodule Parrhesia.API.Admin do
do: {:error, {:unsupported_method, normalize_method_name(method)}}
@spec stats(keyword()) :: {:ok, map()} | {:error, term()}
def stats(_opts \\ []), do: Storage.admin().execute(%{}, :stats, %{})
def stats(opts \\ []) do
with {:ok, relay_stats} <- relay_stats(),
{:ok, sync_stats} <- Sync.sync_stats(opts) do
{:ok, Map.put(relay_stats, "sync", sync_stats)}
end
end
@spec health(keyword()) :: {:ok, map()} | {:error, term()}
def health(_opts \\ []), do: {:ok, %{"status" => "ok"}}
def health(opts \\ []) do
with {:ok, sync_health} <- Sync.sync_health(opts) do
{:ok,
%{
"status" => overall_health_status(sync_health),
"sync" => sync_health
}}
end
end
@spec list_audit_logs(keyword()) :: {:ok, [map()]} | {:error, term()}
def list_audit_logs(opts \\ []) do
@@ -69,7 +95,8 @@ defmodule Parrhesia.API.Admin do
_other -> []
end
(storage_supported ++ @supported_acl_methods ++ @supported_identity_methods)
(storage_supported ++
@supported_acl_methods ++ @supported_identity_methods ++ @supported_sync_methods)
|> Enum.uniq()
|> Enum.sort()
end
@@ -84,22 +111,105 @@ defmodule Parrhesia.API.Admin do
Identity.import(params)
end
defp execute_builtin("acl_grant", params), do: acl_grant(params)
defp execute_builtin("acl_revoke", params), do: acl_revoke(params)
defp execute_builtin("acl_list", params), do: acl_list(params)
defp execute_builtin("identity_get", params), do: identity_get(params)
defp execute_builtin("identity_ensure", params), do: identity_ensure(params)
defp execute_builtin("identity_import", params), do: identity_import(params)
defp execute_builtin("identity_rotate", params), do: identity_rotate(params)
defp sync_put_server(params, opts), do: Sync.put_server(params, opts)
defp execute_builtin("supportedmethods", _params),
defp sync_remove_server(params, opts) do
with {:ok, server_id} <- fetch_required_string(params, :id),
:ok <- Sync.remove_server(server_id, opts) do
{:ok, %{"ok" => true}}
end
end
defp sync_get_server(params, opts) do
with {:ok, server_id} <- fetch_required_string(params, :id),
{:ok, server} <- Sync.get_server(server_id, opts) do
{:ok, server}
else
:error -> {:error, :not_found}
other -> other
end
end
defp sync_list_servers(_params, opts), do: Sync.list_servers(opts)
defp sync_start_server(params, opts) do
with {:ok, server_id} <- fetch_required_string(params, :id),
:ok <- Sync.start_server(server_id, opts) do
{:ok, %{"ok" => true}}
end
end
defp sync_stop_server(params, opts) do
with {:ok, server_id} <- fetch_required_string(params, :id),
:ok <- Sync.stop_server(server_id, opts) do
{:ok, %{"ok" => true}}
end
end
defp sync_sync_now(params, opts) do
with {:ok, server_id} <- fetch_required_string(params, :id),
:ok <- Sync.sync_now(server_id, opts) do
{:ok, %{"ok" => true}}
end
end
defp sync_server_stats(params, opts) do
with {:ok, server_id} <- fetch_required_string(params, :id),
{:ok, stats} <- Sync.server_stats(server_id, opts) do
{:ok, stats}
else
:error -> {:error, :not_found}
other -> other
end
end
defp sync_stats(_params, opts), do: Sync.sync_stats(opts)
defp sync_health(_params, opts), do: Sync.sync_health(opts)
defp execute_builtin("acl_grant", params, _opts), do: acl_grant(params)
defp execute_builtin("acl_revoke", params, _opts), do: acl_revoke(params)
defp execute_builtin("acl_list", params, _opts), do: acl_list(params)
defp execute_builtin("identity_get", params, _opts), do: identity_get(params)
defp execute_builtin("identity_ensure", params, _opts), do: identity_ensure(params)
defp execute_builtin("identity_import", params, _opts), do: identity_import(params)
defp execute_builtin("identity_rotate", params, _opts), do: identity_rotate(params)
defp execute_builtin("sync_put_server", params, opts), do: sync_put_server(params, opts)
defp execute_builtin("sync_remove_server", params, opts), do: sync_remove_server(params, opts)
defp execute_builtin("sync_get_server", params, opts), do: sync_get_server(params, opts)
defp execute_builtin("sync_list_servers", params, opts), do: sync_list_servers(params, opts)
defp execute_builtin("sync_start_server", params, opts), do: sync_start_server(params, opts)
defp execute_builtin("sync_stop_server", params, opts), do: sync_stop_server(params, opts)
defp execute_builtin("sync_sync_now", params, opts), do: sync_sync_now(params, opts)
defp execute_builtin("sync_server_stats", params, opts), do: sync_server_stats(params, opts)
defp execute_builtin("sync_stats", params, opts), do: sync_stats(params, opts)
defp execute_builtin("sync_health", params, opts), do: sync_health(params, opts)
defp execute_builtin("supportedmethods", _params, _opts),
do: {:ok, %{"methods" => supported_methods()}}
defp execute_builtin(other_method, _params), do: {:continue, other_method}
defp execute_builtin(other_method, _params, _opts), do: {:continue, other_method}
defp relay_stats do
case Storage.admin().execute(%{}, :stats, %{}) do
{:ok, stats} when is_map(stats) -> {:ok, stats}
{:error, {:unsupported_method, _method}} -> {:ok, %{}}
other -> other
end
end
defp overall_health_status(%{"status" => "degraded"}), do: "degraded"
defp overall_health_status(_sync_health), do: "ok"
defp maybe_put_opt(opts, _key, nil), do: opts
defp maybe_put_opt(opts, key, value), do: Keyword.put(opts, key, value)
defp fetch_required_string(map, key) do
case fetch_value(map, key) do
value when is_binary(value) and value != "" -> {:ok, value}
_other -> {:error, {:missing_param, Atom.to_string(key)}}
end
end
defp fetch_value(map, key), do: Map.get(map, key) || Map.get(map, Atom.to_string(key))
defp normalize_method_name(method) when is_atom(method), do: Atom.to_string(method)

103
lib/parrhesia/api/sync.ex Normal file
View File

@@ -0,0 +1,103 @@
defmodule Parrhesia.API.Sync do
@moduledoc """
Sync server control-plane API.
"""
alias Parrhesia.API.Sync.Manager
@type server :: map()
@spec put_server(map(), keyword()) :: {:ok, server()} | {:error, term()}
def put_server(server, opts \\ [])
def put_server(server, opts) when is_map(server) and is_list(opts) do
Manager.put_server(manager_name(opts), server)
end
def put_server(_server, _opts), do: {:error, :invalid_server}
@spec remove_server(String.t(), keyword()) :: :ok | {:error, term()}
def remove_server(server_id, opts \\ [])
def remove_server(server_id, opts) when is_binary(server_id) and is_list(opts) do
Manager.remove_server(manager_name(opts), server_id)
end
def remove_server(_server_id, _opts), do: {:error, :invalid_server_id}
@spec get_server(String.t(), keyword()) :: {:ok, server()} | :error | {:error, term()}
def get_server(server_id, opts \\ [])
def get_server(server_id, opts) when is_binary(server_id) and is_list(opts) do
Manager.get_server(manager_name(opts), server_id)
end
def get_server(_server_id, _opts), do: {:error, :invalid_server_id}
@spec list_servers(keyword()) :: {:ok, [server()]} | {:error, term()}
def list_servers(opts \\ []) when is_list(opts) do
Manager.list_servers(manager_name(opts))
end
@spec start_server(String.t(), keyword()) :: :ok | {:error, term()}
def start_server(server_id, opts \\ [])
def start_server(server_id, opts) when is_binary(server_id) and is_list(opts) do
Manager.start_server(manager_name(opts), server_id)
end
def start_server(_server_id, _opts), do: {:error, :invalid_server_id}
@spec stop_server(String.t(), keyword()) :: :ok | {:error, term()}
def stop_server(server_id, opts \\ [])
def stop_server(server_id, opts) when is_binary(server_id) and is_list(opts) do
Manager.stop_server(manager_name(opts), server_id)
end
def stop_server(_server_id, _opts), do: {:error, :invalid_server_id}
@spec sync_now(String.t(), keyword()) :: :ok | {:error, term()}
def sync_now(server_id, opts \\ [])
def sync_now(server_id, opts) when is_binary(server_id) and is_list(opts) do
Manager.sync_now(manager_name(opts), server_id)
end
def sync_now(_server_id, _opts), do: {:error, :invalid_server_id}
@spec server_stats(String.t(), keyword()) :: {:ok, map()} | :error | {:error, term()}
def server_stats(server_id, opts \\ [])
def server_stats(server_id, opts) when is_binary(server_id) and is_list(opts) do
Manager.server_stats(manager_name(opts), server_id)
end
def server_stats(_server_id, _opts), do: {:error, :invalid_server_id}
@spec sync_stats(keyword()) :: {:ok, map()} | {:error, term()}
def sync_stats(opts \\ []) when is_list(opts) do
Manager.sync_stats(manager_name(opts))
end
@spec sync_health(keyword()) :: {:ok, map()} | {:error, term()}
def sync_health(opts \\ []) when is_list(opts) do
Manager.sync_health(manager_name(opts))
end
def default_path do
Path.join([default_data_dir(), "sync_servers.json"])
end
defp manager_name(opts) do
opts[:manager] || opts[:name] || Manager
end
defp default_data_dir do
base_dir =
System.get_env("XDG_DATA_HOME") ||
Path.join(System.user_home!(), ".local/share")
Path.join(base_dir, "parrhesia")
end
end

View File

@@ -0,0 +1,671 @@
defmodule Parrhesia.API.Sync.Manager do
@moduledoc false
use GenServer
alias Parrhesia.API.Sync
alias Parrhesia.Protocol.Filter
require Logger
@default_overlap_window_seconds 300
@default_mode :req_stream
@default_auth_type :nip42
@default_tls_mode :required
@hex64 ~r/\A[0-9a-f]{64}\z/
def start_link(opts \\ []) do
name = Keyword.get(opts, :name, __MODULE__)
GenServer.start_link(__MODULE__, opts, name: name)
end
def put_server(name, server) do
GenServer.call(name, {:put_server, server})
end
def remove_server(name, server_id) do
GenServer.call(name, {:remove_server, server_id})
end
def get_server(name, server_id) do
GenServer.call(name, {:get_server, server_id})
end
def list_servers(name) do
GenServer.call(name, :list_servers)
end
def start_server(name, server_id) do
GenServer.call(name, {:start_server, server_id})
end
def stop_server(name, server_id) do
GenServer.call(name, {:stop_server, server_id})
end
def sync_now(name, server_id) do
GenServer.call(name, {:sync_now, server_id})
end
def server_stats(name, server_id) do
GenServer.call(name, {:server_stats, server_id})
end
def sync_stats(name) do
GenServer.call(name, :sync_stats)
end
def sync_health(name) do
GenServer.call(name, :sync_health)
end
@impl true
def init(opts) do
path = Keyword.get(opts, :path, config_path() || Sync.default_path())
{:ok, load_state(path)}
end
@impl true
def handle_call({:put_server, server}, _from, state) do
case normalize_server(server) do
{:ok, normalized_server} ->
updated_state = put_server_state(state, normalized_server)
with :ok <- persist_state(updated_state) do
{:reply, {:ok, merged_server(updated_state, normalized_server.id)}, updated_state}
end
{:error, reason} ->
{:reply, {:error, reason}, state}
end
end
def handle_call({:remove_server, server_id}, _from, state) do
if Map.has_key?(state.servers, server_id) do
updated_state = %{
state
| servers: Map.delete(state.servers, server_id),
runtime: Map.delete(state.runtime, server_id)
}
with :ok <- persist_state(updated_state) do
{:reply, :ok, updated_state}
end
else
{:reply, {:error, :not_found}, state}
end
end
def handle_call({:get_server, server_id}, _from, state) do
case Map.fetch(state.servers, server_id) do
{:ok, _server} -> {:reply, {:ok, merged_server(state, server_id)}, state}
:error -> {:reply, :error, state}
end
end
def handle_call(:list_servers, _from, state) do
servers =
state.servers
|> Map.keys()
|> Enum.sort()
|> Enum.map(&merged_server(state, &1))
{:reply, {:ok, servers}, state}
end
def handle_call({:start_server, server_id}, _from, state) do
with {:ok, updated_state} <- update_runtime(state, server_id, &mark_running/1),
:ok <- persist_state(updated_state) do
{:reply, :ok, updated_state}
else
:error -> {:reply, {:error, :not_found}, state}
{:error, reason} -> {:reply, {:error, reason}, state}
end
end
def handle_call({:stop_server, server_id}, _from, state) do
with {:ok, updated_state} <- update_runtime(state, server_id, &mark_stopped/1),
:ok <- persist_state(updated_state) do
{:reply, :ok, updated_state}
else
:error -> {:reply, {:error, :not_found}, state}
{:error, reason} -> {:reply, {:error, reason}, state}
end
end
def handle_call({:sync_now, server_id}, _from, state) do
with {:ok, updated_state} <- update_runtime(state, server_id, &record_sync_now/1),
:ok <- persist_state(updated_state) do
{:reply, :ok, updated_state}
else
:error -> {:reply, {:error, :not_found}, state}
{:error, reason} -> {:reply, {:error, reason}, state}
end
end
def handle_call({:server_stats, server_id}, _from, state) do
case Map.fetch(state.runtime, server_id) do
{:ok, runtime} -> {:reply, {:ok, runtime_stats(runtime)}, state}
:error -> {:reply, :error, state}
end
end
def handle_call(:sync_stats, _from, state) do
{:reply, {:ok, aggregate_stats(state)}, state}
end
def handle_call(:sync_health, _from, state) do
{:reply, {:ok, health_summary(state)}, state}
end
defp put_server_state(state, server) do
runtime =
case Map.get(state.runtime, server.id) do
nil -> default_runtime(server)
existing_runtime -> maybe_align_runtime_with_server(existing_runtime, server)
end
%{
state
| servers: Map.put(state.servers, server.id, server),
runtime: Map.put(state.runtime, server.id, runtime)
}
end
defp maybe_align_runtime_with_server(runtime, %{enabled?: false}) do
runtime
|> Map.put(:state, :stopped)
|> Map.put(:connected?, false)
|> Map.put(:last_disconnected_at, now())
end
defp maybe_align_runtime_with_server(runtime, _server), do: runtime
defp default_runtime(server) do
%{
server_id: server.id,
state: if(server.enabled?, do: :running, else: :stopped),
connected?: false,
last_connected_at: nil,
last_disconnected_at: nil,
last_sync_started_at: nil,
last_sync_completed_at: nil,
last_event_received_at: nil,
last_eose_at: nil,
reconnect_attempts: 0,
last_error: nil,
events_received: 0,
events_accepted: 0,
events_duplicate: 0,
events_rejected: 0,
query_runs: 0,
subscription_restarts: 0,
reconnects: 0,
last_remote_eose_at: nil
}
end
defp mark_running(runtime) do
runtime
|> Map.put(:state, :running)
|> Map.put(:last_error, nil)
end
defp mark_stopped(runtime) do
runtime
|> Map.put(:state, :stopped)
|> Map.put(:connected?, false)
|> Map.put(:last_disconnected_at, now())
end
defp record_sync_now(runtime) do
sync_timestamp = now()
runtime
|> Map.update!(:query_runs, &(&1 + 1))
|> Map.put(:last_sync_started_at, sync_timestamp)
|> Map.put(:last_sync_completed_at, sync_timestamp)
end
defp update_runtime(state, server_id, fun) do
case Map.fetch(state.runtime, server_id) do
{:ok, runtime} ->
updated_runtime = fun.(runtime)
{:ok, %{state | runtime: Map.put(state.runtime, server_id, updated_runtime)}}
:error ->
:error
end
end
defp merged_server(state, server_id) do
state.servers
|> Map.fetch!(server_id)
|> Map.put(:runtime, Map.fetch!(state.runtime, server_id))
end
defp runtime_stats(runtime) do
%{
"server_id" => runtime.server_id,
"state" => Atom.to_string(runtime.state),
"connected" => runtime.connected?,
"events_received" => runtime.events_received,
"events_accepted" => runtime.events_accepted,
"events_duplicate" => runtime.events_duplicate,
"events_rejected" => runtime.events_rejected,
"query_runs" => runtime.query_runs,
"subscription_restarts" => runtime.subscription_restarts,
"reconnects" => runtime.reconnects,
"last_sync_started_at" => runtime.last_sync_started_at,
"last_sync_completed_at" => runtime.last_sync_completed_at,
"last_remote_eose_at" => runtime.last_remote_eose_at,
"last_error" => runtime.last_error
}
end
defp aggregate_stats(state) do
runtimes = Map.values(state.runtime)
%{
"servers_total" => map_size(state.servers),
"servers_enabled" => Enum.count(state.servers, fn {_id, server} -> server.enabled? end),
"servers_running" => Enum.count(runtimes, &(&1.state == :running)),
"servers_connected" => Enum.count(runtimes, & &1.connected?),
"events_received" => Enum.reduce(runtimes, 0, &(&1.events_received + &2)),
"events_accepted" => Enum.reduce(runtimes, 0, &(&1.events_accepted + &2)),
"events_duplicate" => Enum.reduce(runtimes, 0, &(&1.events_duplicate + &2)),
"events_rejected" => Enum.reduce(runtimes, 0, &(&1.events_rejected + &2)),
"query_runs" => Enum.reduce(runtimes, 0, &(&1.query_runs + &2)),
"subscription_restarts" => Enum.reduce(runtimes, 0, &(&1.subscription_restarts + &2)),
"reconnects" => Enum.reduce(runtimes, 0, &(&1.reconnects + &2))
}
end
defp health_summary(state) do
failing_servers =
state.runtime
|> Enum.flat_map(fn {server_id, runtime} ->
if is_binary(runtime.last_error) and runtime.last_error != "" do
[%{"id" => server_id, "reason" => runtime.last_error}]
else
[]
end
end)
%{
"status" => if(failing_servers == [], do: "ok", else: "degraded"),
"servers_total" => map_size(state.servers),
"servers_connected" =>
Enum.count(state.runtime, fn {_id, runtime} -> runtime.connected? end),
"servers_failing" => failing_servers
}
end
defp load_state(path) do
case File.read(path) do
{:ok, payload} ->
case decode_persisted_state(payload, path) do
{:ok, state} ->
state
{:error, reason} ->
Logger.warning("failed to load sync state from #{path}: #{inspect(reason)}")
empty_state(path)
end
{:error, :enoent} ->
empty_state(path)
{:error, reason} ->
Logger.warning("failed to read sync state from #{path}: #{inspect(reason)}")
empty_state(path)
end
end
defp decode_persisted_state(payload, path) do
with {:ok, decoded} <- JSON.decode(payload),
{:ok, servers} <- decode_servers(Map.get(decoded, "servers", %{})),
{:ok, runtime} <- decode_runtime(Map.get(decoded, "runtime", %{}), servers) do
{:ok, %{path: path, servers: servers, runtime: runtime}}
end
end
defp decode_servers(servers) when is_map(servers) do
Enum.reduce_while(servers, {:ok, %{}}, fn {_id, server_payload}, {:ok, acc} ->
case normalize_server(server_payload) do
{:ok, server} -> {:cont, {:ok, Map.put(acc, server.id, server)}}
{:error, reason} -> {:halt, {:error, reason}}
end
end)
end
defp decode_servers(_servers), do: {:error, :invalid_servers_state}
defp decode_runtime(runtime_payload, servers)
when is_map(runtime_payload) and is_map(servers) do
runtime =
Enum.reduce(servers, %{}, fn {server_id, server}, acc ->
decoded_runtime =
runtime_payload
|> Map.get(server_id)
|> normalize_runtime(server)
Map.put(acc, server_id, decoded_runtime)
end)
{:ok, runtime}
end
defp decode_runtime(_runtime_payload, _servers), do: {:error, :invalid_runtime_state}
defp normalize_runtime(nil, server), do: default_runtime(server)
defp normalize_runtime(runtime, server) when is_map(runtime) do
state =
runtime
|> Map.put(:server_id, server.id)
|> Map.put(:state, normalize_runtime_state(fetch_value(runtime, :state)))
|> Map.put(:connected?, fetch_boolean(runtime, :connected?) || false)
|> Map.put(:last_connected_at, fetch_string_or_nil(runtime, :last_connected_at))
|> Map.put(:last_disconnected_at, fetch_string_or_nil(runtime, :last_disconnected_at))
|> Map.put(:last_sync_started_at, fetch_string_or_nil(runtime, :last_sync_started_at))
|> Map.put(:last_sync_completed_at, fetch_string_or_nil(runtime, :last_sync_completed_at))
|> Map.put(:last_event_received_at, fetch_string_or_nil(runtime, :last_event_received_at))
|> Map.put(:last_eose_at, fetch_string_or_nil(runtime, :last_eose_at))
|> Map.put(:last_remote_eose_at, fetch_string_or_nil(runtime, :last_remote_eose_at))
|> Map.put(:last_error, fetch_string_or_nil(runtime, :last_error))
|> Map.put(:reconnect_attempts, fetch_non_neg_integer(runtime, :reconnect_attempts))
|> Map.put(:events_received, fetch_non_neg_integer(runtime, :events_received))
|> Map.put(:events_accepted, fetch_non_neg_integer(runtime, :events_accepted))
|> Map.put(:events_duplicate, fetch_non_neg_integer(runtime, :events_duplicate))
|> Map.put(:events_rejected, fetch_non_neg_integer(runtime, :events_rejected))
|> Map.put(:query_runs, fetch_non_neg_integer(runtime, :query_runs))
|> Map.put(:subscription_restarts, fetch_non_neg_integer(runtime, :subscription_restarts))
|> Map.put(:reconnects, fetch_non_neg_integer(runtime, :reconnects))
maybe_align_runtime_with_server(state, server)
end
defp normalize_runtime(_runtime, server), do: default_runtime(server)
defp persist_state(%{path: path} = state) do
temp_path = path <> ".tmp"
with :ok <- File.mkdir_p(Path.dirname(path)),
:ok <- File.write(temp_path, JSON.encode!(encode_state(state))),
:ok <- File.rename(temp_path, path) do
:ok
else
{:error, reason} ->
_ = File.rm(temp_path)
{:error, reason}
end
end
defp encode_state(state) do
%{
"version" => 1,
"servers" =>
Map.new(state.servers, fn {server_id, server} -> {server_id, encode_server(server)} end),
"runtime" =>
Map.new(state.runtime, fn {server_id, runtime} -> {server_id, encode_runtime(runtime)} end)
}
end
defp encode_server(server) do
%{
"id" => server.id,
"url" => server.url,
"enabled?" => server.enabled?,
"auth_pubkey" => server.auth_pubkey,
"filters" => server.filters,
"mode" => Atom.to_string(server.mode),
"overlap_window_seconds" => server.overlap_window_seconds,
"auth" => %{"type" => Atom.to_string(server.auth.type)},
"tls" => %{
"mode" => Atom.to_string(server.tls.mode),
"hostname" => server.tls.hostname,
"pins" =>
Enum.map(server.tls.pins, fn pin ->
%{
"type" => Atom.to_string(pin.type),
"value" => pin.value
}
end)
},
"metadata" => server.metadata
}
end
defp encode_runtime(runtime) do
%{
"server_id" => runtime.server_id,
"state" => Atom.to_string(runtime.state),
"connected?" => runtime.connected?,
"last_connected_at" => runtime.last_connected_at,
"last_disconnected_at" => runtime.last_disconnected_at,
"last_sync_started_at" => runtime.last_sync_started_at,
"last_sync_completed_at" => runtime.last_sync_completed_at,
"last_event_received_at" => runtime.last_event_received_at,
"last_eose_at" => runtime.last_eose_at,
"reconnect_attempts" => runtime.reconnect_attempts,
"last_error" => runtime.last_error,
"events_received" => runtime.events_received,
"events_accepted" => runtime.events_accepted,
"events_duplicate" => runtime.events_duplicate,
"events_rejected" => runtime.events_rejected,
"query_runs" => runtime.query_runs,
"subscription_restarts" => runtime.subscription_restarts,
"reconnects" => runtime.reconnects,
"last_remote_eose_at" => runtime.last_remote_eose_at
}
end
defp empty_state(path) do
%{path: path, servers: %{}, runtime: %{}}
end
defp normalize_server(server) when is_map(server) do
with {:ok, id} <- normalize_non_empty_string(fetch_value(server, :id), :invalid_server_id),
{:ok, {url, host}} <- normalize_url(fetch_value(server, :url)),
{:ok, enabled?} <- normalize_boolean(fetch_value(server, :enabled?), true),
{:ok, auth_pubkey} <- normalize_pubkey(fetch_value(server, :auth_pubkey)),
{:ok, filters} <- normalize_filters(fetch_value(server, :filters)),
{:ok, mode} <- normalize_mode(fetch_value(server, :mode)),
{:ok, overlap_window_seconds} <-
normalize_overlap_window(fetch_value(server, :overlap_window_seconds)),
{:ok, auth} <- normalize_auth(fetch_value(server, :auth)),
{:ok, tls} <- normalize_tls(fetch_value(server, :tls), host),
{:ok, metadata} <- normalize_metadata(fetch_value(server, :metadata)) do
{:ok,
%{
id: id,
url: url,
enabled?: enabled?,
auth_pubkey: auth_pubkey,
filters: filters,
mode: mode,
overlap_window_seconds: overlap_window_seconds,
auth: auth,
tls: tls,
metadata: metadata
}}
end
end
defp normalize_server(_server), do: {:error, :invalid_server}
defp normalize_url(url) when is_binary(url) and url != "" do
uri = URI.parse(url)
if uri.scheme in ["ws", "wss"] and is_binary(uri.host) and uri.host != "" do
{:ok, {URI.to_string(uri), uri.host}}
else
{:error, :invalid_url}
end
end
defp normalize_url(_url), do: {:error, :invalid_url}
defp normalize_pubkey(pubkey) when is_binary(pubkey) do
normalized = String.downcase(pubkey)
if String.match?(normalized, @hex64) do
{:ok, normalized}
else
{:error, :invalid_auth_pubkey}
end
end
defp normalize_pubkey(_pubkey), do: {:error, :invalid_auth_pubkey}
defp normalize_filters(filters) when is_list(filters) do
normalized_filters = Enum.map(filters, &normalize_filter_map/1)
with :ok <- Filter.validate_filters(normalized_filters) do
{:ok, normalized_filters}
end
end
defp normalize_filters(_filters), do: {:error, :invalid_filters}
defp normalize_mode(nil), do: {:ok, @default_mode}
defp normalize_mode(:req_stream), do: {:ok, :req_stream}
defp normalize_mode("req_stream"), do: {:ok, :req_stream}
defp normalize_mode(_mode), do: {:error, :invalid_mode}
defp normalize_overlap_window(nil), do: {:ok, @default_overlap_window_seconds}
defp normalize_overlap_window(seconds) when is_integer(seconds) and seconds >= 0,
do: {:ok, seconds}
defp normalize_overlap_window(_seconds), do: {:error, :invalid_overlap_window_seconds}
defp normalize_auth(nil), do: {:ok, %{type: @default_auth_type}}
defp normalize_auth(auth) when is_map(auth) do
with {:ok, type} <- normalize_auth_type(fetch_value(auth, :type)) do
{:ok, %{type: type}}
end
end
defp normalize_auth(_auth), do: {:error, :invalid_auth}
defp normalize_auth_type(nil), do: {:ok, @default_auth_type}
defp normalize_auth_type(:nip42), do: {:ok, :nip42}
defp normalize_auth_type("nip42"), do: {:ok, :nip42}
defp normalize_auth_type(_type), do: {:error, :invalid_auth_type}
defp normalize_tls(tls, host) when is_map(tls) do
with {:ok, mode} <- normalize_tls_mode(fetch_value(tls, :mode)),
{:ok, hostname} <- normalize_hostname(fetch_value(tls, :hostname) || host),
{:ok, pins} <- normalize_tls_pins(fetch_value(tls, :pins)) do
{:ok, %{mode: mode, hostname: hostname, pins: pins}}
end
end
defp normalize_tls(_tls, _host), do: {:error, :invalid_tls}
defp normalize_tls_mode(nil), do: {:ok, @default_tls_mode}
defp normalize_tls_mode(:required), do: {:ok, :required}
defp normalize_tls_mode("required"), do: {:ok, :required}
defp normalize_tls_mode(_mode), do: {:error, :invalid_tls_mode}
defp normalize_hostname(hostname) when is_binary(hostname) and hostname != "",
do: {:ok, hostname}
defp normalize_hostname(_hostname), do: {:error, :invalid_tls_hostname}
defp normalize_tls_pins(pins) when is_list(pins) and pins != [] do
Enum.reduce_while(pins, {:ok, []}, fn pin, {:ok, acc} ->
case normalize_tls_pin(pin) do
{:ok, normalized_pin} -> {:cont, {:ok, [normalized_pin | acc]}}
{:error, reason} -> {:halt, {:error, reason}}
end
end)
|> case do
{:ok, normalized_pins} -> {:ok, Enum.reverse(normalized_pins)}
error -> error
end
end
defp normalize_tls_pins(_pins), do: {:error, :invalid_tls_pins}
defp normalize_tls_pin(pin) when is_map(pin) do
with {:ok, type} <- normalize_tls_pin_type(fetch_value(pin, :type)),
{:ok, value} <- normalize_non_empty_string(fetch_value(pin, :value), :invalid_tls_pin) do
{:ok, %{type: type, value: value}}
end
end
defp normalize_tls_pin(_pin), do: {:error, :invalid_tls_pin}
defp normalize_tls_pin_type(:spki_sha256), do: {:ok, :spki_sha256}
defp normalize_tls_pin_type("spki_sha256"), do: {:ok, :spki_sha256}
defp normalize_tls_pin_type(_type), do: {:error, :invalid_tls_pin}
defp normalize_metadata(nil), do: {:ok, %{}}
defp normalize_metadata(metadata) when is_map(metadata), do: {:ok, metadata}
defp normalize_metadata(_metadata), do: {:error, :invalid_metadata}
defp normalize_boolean(nil, default), do: {:ok, default}
defp normalize_boolean(value, _default) when is_boolean(value), do: {:ok, value}
defp normalize_boolean(_value, _default), do: {:error, :invalid_enabled_flag}
defp normalize_non_empty_string(value, _reason) when is_binary(value) and value != "",
do: {:ok, value}
defp normalize_non_empty_string(_value, reason), do: {:error, reason}
defp normalize_filter_map(filter) when is_map(filter) do
Map.new(filter, fn
{key, value} when is_atom(key) -> {Atom.to_string(key), value}
{key, value} -> {key, value}
end)
end
defp normalize_filter_map(filter), do: filter
defp normalize_runtime_state("running"), do: :running
defp normalize_runtime_state(:running), do: :running
defp normalize_runtime_state("stopped"), do: :stopped
defp normalize_runtime_state(:stopped), do: :stopped
defp normalize_runtime_state(_state), do: :stopped
defp fetch_non_neg_integer(map, key) do
case fetch_value(map, key) do
value when is_integer(value) and value >= 0 -> value
_other -> 0
end
end
defp fetch_boolean(map, key) do
case fetch_value(map, key) do
value when is_boolean(value) -> value
_other -> nil
end
end
defp fetch_string_or_nil(map, key) do
case fetch_value(map, key) do
value when is_binary(value) and value != "" -> value
_other -> nil
end
end
defp fetch_value(map, key) when is_map(map) do
Map.get(map, key) || Map.get(map, Atom.to_string(key))
end
defp config_path do
:parrhesia
|> Application.get_env(:sync, [])
|> Keyword.get(:path)
end
defp now do
DateTime.utc_now()
|> DateTime.truncate(:second)
|> DateTime.to_iso8601()
end
end

View File

@@ -11,6 +11,7 @@ defmodule Parrhesia.Application do
Parrhesia.Storage.Supervisor,
Parrhesia.Subscriptions.Supervisor,
Parrhesia.Auth.Supervisor,
Parrhesia.Sync.Supervisor,
Parrhesia.Policy.Supervisor,
Parrhesia.Web.Endpoint,
Parrhesia.Web.MetricsEndpoint,

View File

@@ -0,0 +1,20 @@
defmodule Parrhesia.Sync.Supervisor do
@moduledoc """
Supervision entrypoint for sync control-plane processes.
"""
use Supervisor
def start_link(init_arg \\ []) do
Supervisor.start_link(__MODULE__, init_arg, name: __MODULE__)
end
@impl true
def init(_init_arg) do
children = [
{Parrhesia.API.Sync.Manager, []}
]
Supervisor.init(children, strategy: :one_for_one)
end
end

View File

@@ -0,0 +1,212 @@
defmodule Parrhesia.API.SyncTest do
use ExUnit.Case, async: false
alias Ecto.Adapters.SQL.Sandbox
alias Parrhesia.API.Admin
alias Parrhesia.API.Sync
alias Parrhesia.API.Sync.Manager
alias Parrhesia.Repo
setup do
:ok = Sandbox.checkout(Repo)
:ok
end
test "put_server stores normalized config and persists it across restart" do
{manager, path, pid} = start_sync_manager()
assert {:ok, stored_server} = Sync.put_server(valid_server(), manager: manager)
assert stored_server.id == "tribes-primary"
assert stored_server.mode == :req_stream
assert stored_server.auth.type == :nip42
assert stored_server.tls.mode == :required
assert stored_server.tls.hostname == "relay-a.example"
assert stored_server.runtime.state == :running
assert File.exists?(path)
assert {:ok, fetched_server} = Sync.get_server("tribes-primary", manager: manager)
assert fetched_server == stored_server
assert {:ok, [listed_server]} = Sync.list_servers(manager: manager)
assert listed_server.id == "tribes-primary"
monitor_ref = Process.monitor(pid)
assert :ok = GenServer.stop(pid)
assert_receive {:DOWN, ^monitor_ref, :process, ^pid, :normal}
assert {:ok, persisted_server} = wait_for_server(manager, "tribes-primary")
assert persisted_server.id == "tribes-primary"
assert persisted_server.tls.hostname == "relay-a.example"
assert persisted_server.runtime.state == :running
end
test "start_server stop_server and sync_now update runtime stats" do
{manager, _path, _pid} = start_sync_manager()
disabled_server = valid_server(%{"id" => "tribes-disabled", "enabled?" => false})
assert {:ok, stored_server} = Sync.put_server(disabled_server, manager: manager)
assert stored_server.runtime.state == :stopped
assert :ok = Sync.start_server("tribes-disabled", manager: manager)
assert {:ok, started_server} = Sync.get_server("tribes-disabled", manager: manager)
assert started_server.runtime.state == :running
assert :ok = Sync.sync_now("tribes-disabled", manager: manager)
assert {:ok, stats} = Sync.server_stats("tribes-disabled", manager: manager)
assert stats["server_id"] == "tribes-disabled"
assert stats["state"] == "running"
assert stats["query_runs"] == 1
assert is_binary(stats["last_sync_started_at"])
assert is_binary(stats["last_sync_completed_at"])
assert :ok = Sync.stop_server("tribes-disabled", manager: manager)
assert {:ok, stopped_server} = Sync.get_server("tribes-disabled", manager: manager)
assert stopped_server.runtime.state == :stopped
assert is_binary(stopped_server.runtime.last_disconnected_at)
assert {:ok, sync_stats} = Sync.sync_stats(manager: manager)
assert sync_stats["servers_total"] == 1
assert sync_stats["servers_enabled"] == 0
assert sync_stats["servers_running"] == 0
assert sync_stats["query_runs"] == 1
assert {:ok, sync_health} = Sync.sync_health(manager: manager)
assert sync_health == %{
"status" => "ok",
"servers_total" => 1,
"servers_connected" => 0,
"servers_failing" => []
}
end
test "put_server rejects invalid sync server shapes" do
{manager, _path, _pid} = start_sync_manager()
assert {:error, :invalid_url} =
Sync.put_server(Map.put(valid_server(), "url", "https://relay-a.example"),
manager: manager
)
assert {:error, :empty_filters} =
Sync.put_server(Map.put(valid_server(), "filters", []), manager: manager)
assert {:error, :invalid_tls_pins} =
Sync.put_server(
put_in(valid_server()["tls"]["pins"], []),
manager: manager
)
end
test "admin executes sync methods against an injected sync manager" do
{manager, _path, _pid} = start_sync_manager()
assert {:ok, created_server} =
Admin.execute("sync_put_server", valid_server(%{"id" => "tribes-admin"}),
manager: manager
)
assert created_server.id == "tribes-admin"
assert {:ok, listed_servers} = Admin.execute("sync_list_servers", %{}, manager: manager)
assert Enum.any?(listed_servers, &(&1.id == "tribes-admin"))
assert {:ok, %{"ok" => true}} =
Admin.execute("sync_sync_now", %{"id" => "tribes-admin"}, manager: manager)
assert {:ok, sync_stats} = Admin.stats(manager: manager)
assert sync_stats["sync"]["servers_total"] == 1
assert sync_stats["sync"]["query_runs"] == 1
assert {:ok, health} = Admin.health(manager: manager)
assert health["status"] == "ok"
assert health["sync"]["servers_total"] == 1
end
defp start_sync_manager do
path = unique_sync_path()
manager = {:global, {:sync_manager, System.unique_integer([:positive, :monotonic])}}
pid = start_supervised!({Manager, name: manager, path: path})
{manager, path, pid}
end
defp valid_server(overrides \\ %{}) do
Map.merge(
%{
"id" => "tribes-primary",
"url" => "wss://relay-a.example/relay",
"enabled?" => true,
"auth_pubkey" => String.duplicate("a", 64),
"filters" => [
%{
"kinds" => [5000],
"#r" => ["tribes.accounts.user"]
}
],
"tls" => %{
"pins" => [
%{
"type" => "spki_sha256",
"value" => "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA="
}
]
},
"metadata" => %{"cluster" => "primary"}
},
overrides
)
end
defp unique_sync_path do
path =
Path.join(
System.tmp_dir!(),
"parrhesia_sync_#{System.unique_integer([:positive, :monotonic])}.json"
)
on_exit(fn ->
_ = File.rm(path)
end)
path
end
defp wait_for_server(manager, server_id, attempts \\ 10)
defp wait_for_server(_manager, _server_id, 0), do: :error
defp wait_for_server(manager, server_id, attempts) do
result =
try do
Sync.get_server(server_id, manager: manager)
catch
:exit, _reason -> {:error, :noproc}
end
case result do
{:ok, server} ->
{:ok, server}
:error ->
receive do
after
10 -> wait_for_server(manager, server_id, attempts - 1)
end
{:error, :noproc} ->
receive do
after
10 -> wait_for_server(manager, server_id, attempts - 1)
end
{:error, {:noproc, _details}} ->
receive do
after
10 -> wait_for_server(manager, server_id, attempts - 1)
end
end
end
end

View File

@@ -8,6 +8,7 @@ defmodule Parrhesia.ApplicationTest do
assert is_pid(Process.whereis(Parrhesia.Storage.Supervisor))
assert is_pid(Process.whereis(Parrhesia.Subscriptions.Supervisor))
assert is_pid(Process.whereis(Parrhesia.Auth.Supervisor))
assert is_pid(Process.whereis(Parrhesia.Sync.Supervisor))
assert is_pid(Process.whereis(Parrhesia.Policy.Supervisor))
assert is_pid(Process.whereis(Parrhesia.Web.Endpoint))
assert is_pid(Process.whereis(Parrhesia.Web.MetricsEndpoint))
@@ -20,6 +21,7 @@ defmodule Parrhesia.ApplicationTest do
assert is_pid(Process.whereis(Parrhesia.Auth.Challenges))
assert is_pid(Process.whereis(Parrhesia.API.Identity.Manager))
assert is_pid(Process.whereis(Parrhesia.API.Sync.Manager))
if negentropy_enabled?() do
assert is_pid(Process.whereis(Parrhesia.Negentropy.Sessions))