diff --git a/config/config.exs b/config/config.exs index 4a4b546..7ea3353 100644 --- a/config/config.exs +++ b/config/config.exs @@ -9,6 +9,9 @@ config :parrhesia, path: nil, private_key: nil ], + sync: [ + path: nil + ], limits: [ max_frame_bytes: 1_048_576, max_event_bytes: 262_144, diff --git a/config/runtime.exs b/config/runtime.exs index e843311..b0aa688 100644 --- a/config/runtime.exs +++ b/config/runtime.exs @@ -455,6 +455,9 @@ if config_env() == :prod do path: string_env.("PARRHESIA_IDENTITY_PATH", nil), private_key: string_env.("PARRHESIA_IDENTITY_PRIVATE_KEY", nil) ], + sync: [ + path: string_env.("PARRHESIA_SYNC_PATH", nil) + ], moderation_cache_enabled: bool_env.("PARRHESIA_MODERATION_CACHE_ENABLED", moderation_cache_enabled_default), enable_expiration_worker: diff --git a/config/test.exs b/config/test.exs index 03f17d8..0c585cb 100644 --- a/config/test.exs +++ b/config/test.exs @@ -19,6 +19,9 @@ config :parrhesia, path: Path.join(System.tmp_dir!(), "parrhesia_test_identity.json"), private_key: nil ], + sync: [ + path: Path.join(System.tmp_dir!(), "parrhesia_test_sync.json") + ], features: [verify_event_signatures: false] pg_host = System.get_env("PGHOST") diff --git a/lib/parrhesia/api/admin.ex b/lib/parrhesia/api/admin.ex index 66ac9ad..cafcc2a 100644 --- a/lib/parrhesia/api/admin.ex +++ b/lib/parrhesia/api/admin.ex @@ -5,18 +5,31 @@ defmodule Parrhesia.API.Admin do alias Parrhesia.API.ACL alias Parrhesia.API.Identity + alias Parrhesia.API.Sync alias Parrhesia.Storage @supported_acl_methods ~w(acl_grant acl_revoke acl_list) @supported_identity_methods ~w(identity_ensure identity_get identity_import identity_rotate) + @supported_sync_methods ~w( + sync_get_server + sync_health + sync_list_servers + sync_put_server + sync_remove_server + sync_server_stats + sync_start_server + sync_stats + sync_stop_server + sync_sync_now + ) @spec execute(String.t() | atom(), map(), keyword()) :: {:ok, map()} | {:error, term()} def execute(method, params, opts \\ []) - def execute(method, params, _opts) when is_map(params) do + def execute(method, params, opts) when is_map(params) do method_name = normalize_method_name(method) - case execute_builtin(method_name, params) do + case execute_builtin(method_name, params, opts) do {:continue, other_method} -> Storage.admin().execute(%{}, other_method, params) result -> result end @@ -26,10 +39,23 @@ defmodule Parrhesia.API.Admin do do: {:error, {:unsupported_method, normalize_method_name(method)}} @spec stats(keyword()) :: {:ok, map()} | {:error, term()} - def stats(_opts \\ []), do: Storage.admin().execute(%{}, :stats, %{}) + def stats(opts \\ []) do + with {:ok, relay_stats} <- relay_stats(), + {:ok, sync_stats} <- Sync.sync_stats(opts) do + {:ok, Map.put(relay_stats, "sync", sync_stats)} + end + end @spec health(keyword()) :: {:ok, map()} | {:error, term()} - def health(_opts \\ []), do: {:ok, %{"status" => "ok"}} + def health(opts \\ []) do + with {:ok, sync_health} <- Sync.sync_health(opts) do + {:ok, + %{ + "status" => overall_health_status(sync_health), + "sync" => sync_health + }} + end + end @spec list_audit_logs(keyword()) :: {:ok, [map()]} | {:error, term()} def list_audit_logs(opts \\ []) do @@ -69,7 +95,8 @@ defmodule Parrhesia.API.Admin do _other -> [] end - (storage_supported ++ @supported_acl_methods ++ @supported_identity_methods) + (storage_supported ++ + @supported_acl_methods ++ @supported_identity_methods ++ @supported_sync_methods) |> Enum.uniq() |> Enum.sort() end @@ -84,22 +111,105 @@ defmodule Parrhesia.API.Admin do Identity.import(params) end - defp execute_builtin("acl_grant", params), do: acl_grant(params) - defp execute_builtin("acl_revoke", params), do: acl_revoke(params) - defp execute_builtin("acl_list", params), do: acl_list(params) - defp execute_builtin("identity_get", params), do: identity_get(params) - defp execute_builtin("identity_ensure", params), do: identity_ensure(params) - defp execute_builtin("identity_import", params), do: identity_import(params) - defp execute_builtin("identity_rotate", params), do: identity_rotate(params) + defp sync_put_server(params, opts), do: Sync.put_server(params, opts) - defp execute_builtin("supportedmethods", _params), + defp sync_remove_server(params, opts) do + with {:ok, server_id} <- fetch_required_string(params, :id), + :ok <- Sync.remove_server(server_id, opts) do + {:ok, %{"ok" => true}} + end + end + + defp sync_get_server(params, opts) do + with {:ok, server_id} <- fetch_required_string(params, :id), + {:ok, server} <- Sync.get_server(server_id, opts) do + {:ok, server} + else + :error -> {:error, :not_found} + other -> other + end + end + + defp sync_list_servers(_params, opts), do: Sync.list_servers(opts) + + defp sync_start_server(params, opts) do + with {:ok, server_id} <- fetch_required_string(params, :id), + :ok <- Sync.start_server(server_id, opts) do + {:ok, %{"ok" => true}} + end + end + + defp sync_stop_server(params, opts) do + with {:ok, server_id} <- fetch_required_string(params, :id), + :ok <- Sync.stop_server(server_id, opts) do + {:ok, %{"ok" => true}} + end + end + + defp sync_sync_now(params, opts) do + with {:ok, server_id} <- fetch_required_string(params, :id), + :ok <- Sync.sync_now(server_id, opts) do + {:ok, %{"ok" => true}} + end + end + + defp sync_server_stats(params, opts) do + with {:ok, server_id} <- fetch_required_string(params, :id), + {:ok, stats} <- Sync.server_stats(server_id, opts) do + {:ok, stats} + else + :error -> {:error, :not_found} + other -> other + end + end + + defp sync_stats(_params, opts), do: Sync.sync_stats(opts) + defp sync_health(_params, opts), do: Sync.sync_health(opts) + + defp execute_builtin("acl_grant", params, _opts), do: acl_grant(params) + defp execute_builtin("acl_revoke", params, _opts), do: acl_revoke(params) + defp execute_builtin("acl_list", params, _opts), do: acl_list(params) + defp execute_builtin("identity_get", params, _opts), do: identity_get(params) + defp execute_builtin("identity_ensure", params, _opts), do: identity_ensure(params) + defp execute_builtin("identity_import", params, _opts), do: identity_import(params) + defp execute_builtin("identity_rotate", params, _opts), do: identity_rotate(params) + defp execute_builtin("sync_put_server", params, opts), do: sync_put_server(params, opts) + defp execute_builtin("sync_remove_server", params, opts), do: sync_remove_server(params, opts) + defp execute_builtin("sync_get_server", params, opts), do: sync_get_server(params, opts) + defp execute_builtin("sync_list_servers", params, opts), do: sync_list_servers(params, opts) + defp execute_builtin("sync_start_server", params, opts), do: sync_start_server(params, opts) + defp execute_builtin("sync_stop_server", params, opts), do: sync_stop_server(params, opts) + defp execute_builtin("sync_sync_now", params, opts), do: sync_sync_now(params, opts) + defp execute_builtin("sync_server_stats", params, opts), do: sync_server_stats(params, opts) + defp execute_builtin("sync_stats", params, opts), do: sync_stats(params, opts) + defp execute_builtin("sync_health", params, opts), do: sync_health(params, opts) + + defp execute_builtin("supportedmethods", _params, _opts), do: {:ok, %{"methods" => supported_methods()}} - defp execute_builtin(other_method, _params), do: {:continue, other_method} + defp execute_builtin(other_method, _params, _opts), do: {:continue, other_method} + + defp relay_stats do + case Storage.admin().execute(%{}, :stats, %{}) do + {:ok, stats} when is_map(stats) -> {:ok, stats} + {:error, {:unsupported_method, _method}} -> {:ok, %{}} + other -> other + end + end + + defp overall_health_status(%{"status" => "degraded"}), do: "degraded" + defp overall_health_status(_sync_health), do: "ok" defp maybe_put_opt(opts, _key, nil), do: opts defp maybe_put_opt(opts, key, value), do: Keyword.put(opts, key, value) + defp fetch_required_string(map, key) do + case fetch_value(map, key) do + value when is_binary(value) and value != "" -> {:ok, value} + _other -> {:error, {:missing_param, Atom.to_string(key)}} + end + end + defp fetch_value(map, key), do: Map.get(map, key) || Map.get(map, Atom.to_string(key)) defp normalize_method_name(method) when is_atom(method), do: Atom.to_string(method) diff --git a/lib/parrhesia/api/sync.ex b/lib/parrhesia/api/sync.ex new file mode 100644 index 0000000..3820a95 --- /dev/null +++ b/lib/parrhesia/api/sync.ex @@ -0,0 +1,103 @@ +defmodule Parrhesia.API.Sync do + @moduledoc """ + Sync server control-plane API. + """ + + alias Parrhesia.API.Sync.Manager + + @type server :: map() + + @spec put_server(map(), keyword()) :: {:ok, server()} | {:error, term()} + def put_server(server, opts \\ []) + + def put_server(server, opts) when is_map(server) and is_list(opts) do + Manager.put_server(manager_name(opts), server) + end + + def put_server(_server, _opts), do: {:error, :invalid_server} + + @spec remove_server(String.t(), keyword()) :: :ok | {:error, term()} + def remove_server(server_id, opts \\ []) + + def remove_server(server_id, opts) when is_binary(server_id) and is_list(opts) do + Manager.remove_server(manager_name(opts), server_id) + end + + def remove_server(_server_id, _opts), do: {:error, :invalid_server_id} + + @spec get_server(String.t(), keyword()) :: {:ok, server()} | :error | {:error, term()} + def get_server(server_id, opts \\ []) + + def get_server(server_id, opts) when is_binary(server_id) and is_list(opts) do + Manager.get_server(manager_name(opts), server_id) + end + + def get_server(_server_id, _opts), do: {:error, :invalid_server_id} + + @spec list_servers(keyword()) :: {:ok, [server()]} | {:error, term()} + def list_servers(opts \\ []) when is_list(opts) do + Manager.list_servers(manager_name(opts)) + end + + @spec start_server(String.t(), keyword()) :: :ok | {:error, term()} + def start_server(server_id, opts \\ []) + + def start_server(server_id, opts) when is_binary(server_id) and is_list(opts) do + Manager.start_server(manager_name(opts), server_id) + end + + def start_server(_server_id, _opts), do: {:error, :invalid_server_id} + + @spec stop_server(String.t(), keyword()) :: :ok | {:error, term()} + def stop_server(server_id, opts \\ []) + + def stop_server(server_id, opts) when is_binary(server_id) and is_list(opts) do + Manager.stop_server(manager_name(opts), server_id) + end + + def stop_server(_server_id, _opts), do: {:error, :invalid_server_id} + + @spec sync_now(String.t(), keyword()) :: :ok | {:error, term()} + def sync_now(server_id, opts \\ []) + + def sync_now(server_id, opts) when is_binary(server_id) and is_list(opts) do + Manager.sync_now(manager_name(opts), server_id) + end + + def sync_now(_server_id, _opts), do: {:error, :invalid_server_id} + + @spec server_stats(String.t(), keyword()) :: {:ok, map()} | :error | {:error, term()} + def server_stats(server_id, opts \\ []) + + def server_stats(server_id, opts) when is_binary(server_id) and is_list(opts) do + Manager.server_stats(manager_name(opts), server_id) + end + + def server_stats(_server_id, _opts), do: {:error, :invalid_server_id} + + @spec sync_stats(keyword()) :: {:ok, map()} | {:error, term()} + def sync_stats(opts \\ []) when is_list(opts) do + Manager.sync_stats(manager_name(opts)) + end + + @spec sync_health(keyword()) :: {:ok, map()} | {:error, term()} + def sync_health(opts \\ []) when is_list(opts) do + Manager.sync_health(manager_name(opts)) + end + + def default_path do + Path.join([default_data_dir(), "sync_servers.json"]) + end + + defp manager_name(opts) do + opts[:manager] || opts[:name] || Manager + end + + defp default_data_dir do + base_dir = + System.get_env("XDG_DATA_HOME") || + Path.join(System.user_home!(), ".local/share") + + Path.join(base_dir, "parrhesia") + end +end diff --git a/lib/parrhesia/api/sync/manager.ex b/lib/parrhesia/api/sync/manager.ex new file mode 100644 index 0000000..48ba7f0 --- /dev/null +++ b/lib/parrhesia/api/sync/manager.ex @@ -0,0 +1,671 @@ +defmodule Parrhesia.API.Sync.Manager do + @moduledoc false + + use GenServer + + alias Parrhesia.API.Sync + alias Parrhesia.Protocol.Filter + + require Logger + + @default_overlap_window_seconds 300 + @default_mode :req_stream + @default_auth_type :nip42 + @default_tls_mode :required + @hex64 ~r/\A[0-9a-f]{64}\z/ + + def start_link(opts \\ []) do + name = Keyword.get(opts, :name, __MODULE__) + GenServer.start_link(__MODULE__, opts, name: name) + end + + def put_server(name, server) do + GenServer.call(name, {:put_server, server}) + end + + def remove_server(name, server_id) do + GenServer.call(name, {:remove_server, server_id}) + end + + def get_server(name, server_id) do + GenServer.call(name, {:get_server, server_id}) + end + + def list_servers(name) do + GenServer.call(name, :list_servers) + end + + def start_server(name, server_id) do + GenServer.call(name, {:start_server, server_id}) + end + + def stop_server(name, server_id) do + GenServer.call(name, {:stop_server, server_id}) + end + + def sync_now(name, server_id) do + GenServer.call(name, {:sync_now, server_id}) + end + + def server_stats(name, server_id) do + GenServer.call(name, {:server_stats, server_id}) + end + + def sync_stats(name) do + GenServer.call(name, :sync_stats) + end + + def sync_health(name) do + GenServer.call(name, :sync_health) + end + + @impl true + def init(opts) do + path = Keyword.get(opts, :path, config_path() || Sync.default_path()) + {:ok, load_state(path)} + end + + @impl true + def handle_call({:put_server, server}, _from, state) do + case normalize_server(server) do + {:ok, normalized_server} -> + updated_state = put_server_state(state, normalized_server) + + with :ok <- persist_state(updated_state) do + {:reply, {:ok, merged_server(updated_state, normalized_server.id)}, updated_state} + end + + {:error, reason} -> + {:reply, {:error, reason}, state} + end + end + + def handle_call({:remove_server, server_id}, _from, state) do + if Map.has_key?(state.servers, server_id) do + updated_state = %{ + state + | servers: Map.delete(state.servers, server_id), + runtime: Map.delete(state.runtime, server_id) + } + + with :ok <- persist_state(updated_state) do + {:reply, :ok, updated_state} + end + else + {:reply, {:error, :not_found}, state} + end + end + + def handle_call({:get_server, server_id}, _from, state) do + case Map.fetch(state.servers, server_id) do + {:ok, _server} -> {:reply, {:ok, merged_server(state, server_id)}, state} + :error -> {:reply, :error, state} + end + end + + def handle_call(:list_servers, _from, state) do + servers = + state.servers + |> Map.keys() + |> Enum.sort() + |> Enum.map(&merged_server(state, &1)) + + {:reply, {:ok, servers}, state} + end + + def handle_call({:start_server, server_id}, _from, state) do + with {:ok, updated_state} <- update_runtime(state, server_id, &mark_running/1), + :ok <- persist_state(updated_state) do + {:reply, :ok, updated_state} + else + :error -> {:reply, {:error, :not_found}, state} + {:error, reason} -> {:reply, {:error, reason}, state} + end + end + + def handle_call({:stop_server, server_id}, _from, state) do + with {:ok, updated_state} <- update_runtime(state, server_id, &mark_stopped/1), + :ok <- persist_state(updated_state) do + {:reply, :ok, updated_state} + else + :error -> {:reply, {:error, :not_found}, state} + {:error, reason} -> {:reply, {:error, reason}, state} + end + end + + def handle_call({:sync_now, server_id}, _from, state) do + with {:ok, updated_state} <- update_runtime(state, server_id, &record_sync_now/1), + :ok <- persist_state(updated_state) do + {:reply, :ok, updated_state} + else + :error -> {:reply, {:error, :not_found}, state} + {:error, reason} -> {:reply, {:error, reason}, state} + end + end + + def handle_call({:server_stats, server_id}, _from, state) do + case Map.fetch(state.runtime, server_id) do + {:ok, runtime} -> {:reply, {:ok, runtime_stats(runtime)}, state} + :error -> {:reply, :error, state} + end + end + + def handle_call(:sync_stats, _from, state) do + {:reply, {:ok, aggregate_stats(state)}, state} + end + + def handle_call(:sync_health, _from, state) do + {:reply, {:ok, health_summary(state)}, state} + end + + defp put_server_state(state, server) do + runtime = + case Map.get(state.runtime, server.id) do + nil -> default_runtime(server) + existing_runtime -> maybe_align_runtime_with_server(existing_runtime, server) + end + + %{ + state + | servers: Map.put(state.servers, server.id, server), + runtime: Map.put(state.runtime, server.id, runtime) + } + end + + defp maybe_align_runtime_with_server(runtime, %{enabled?: false}) do + runtime + |> Map.put(:state, :stopped) + |> Map.put(:connected?, false) + |> Map.put(:last_disconnected_at, now()) + end + + defp maybe_align_runtime_with_server(runtime, _server), do: runtime + + defp default_runtime(server) do + %{ + server_id: server.id, + state: if(server.enabled?, do: :running, else: :stopped), + connected?: false, + last_connected_at: nil, + last_disconnected_at: nil, + last_sync_started_at: nil, + last_sync_completed_at: nil, + last_event_received_at: nil, + last_eose_at: nil, + reconnect_attempts: 0, + last_error: nil, + events_received: 0, + events_accepted: 0, + events_duplicate: 0, + events_rejected: 0, + query_runs: 0, + subscription_restarts: 0, + reconnects: 0, + last_remote_eose_at: nil + } + end + + defp mark_running(runtime) do + runtime + |> Map.put(:state, :running) + |> Map.put(:last_error, nil) + end + + defp mark_stopped(runtime) do + runtime + |> Map.put(:state, :stopped) + |> Map.put(:connected?, false) + |> Map.put(:last_disconnected_at, now()) + end + + defp record_sync_now(runtime) do + sync_timestamp = now() + + runtime + |> Map.update!(:query_runs, &(&1 + 1)) + |> Map.put(:last_sync_started_at, sync_timestamp) + |> Map.put(:last_sync_completed_at, sync_timestamp) + end + + defp update_runtime(state, server_id, fun) do + case Map.fetch(state.runtime, server_id) do + {:ok, runtime} -> + updated_runtime = fun.(runtime) + {:ok, %{state | runtime: Map.put(state.runtime, server_id, updated_runtime)}} + + :error -> + :error + end + end + + defp merged_server(state, server_id) do + state.servers + |> Map.fetch!(server_id) + |> Map.put(:runtime, Map.fetch!(state.runtime, server_id)) + end + + defp runtime_stats(runtime) do + %{ + "server_id" => runtime.server_id, + "state" => Atom.to_string(runtime.state), + "connected" => runtime.connected?, + "events_received" => runtime.events_received, + "events_accepted" => runtime.events_accepted, + "events_duplicate" => runtime.events_duplicate, + "events_rejected" => runtime.events_rejected, + "query_runs" => runtime.query_runs, + "subscription_restarts" => runtime.subscription_restarts, + "reconnects" => runtime.reconnects, + "last_sync_started_at" => runtime.last_sync_started_at, + "last_sync_completed_at" => runtime.last_sync_completed_at, + "last_remote_eose_at" => runtime.last_remote_eose_at, + "last_error" => runtime.last_error + } + end + + defp aggregate_stats(state) do + runtimes = Map.values(state.runtime) + + %{ + "servers_total" => map_size(state.servers), + "servers_enabled" => Enum.count(state.servers, fn {_id, server} -> server.enabled? end), + "servers_running" => Enum.count(runtimes, &(&1.state == :running)), + "servers_connected" => Enum.count(runtimes, & &1.connected?), + "events_received" => Enum.reduce(runtimes, 0, &(&1.events_received + &2)), + "events_accepted" => Enum.reduce(runtimes, 0, &(&1.events_accepted + &2)), + "events_duplicate" => Enum.reduce(runtimes, 0, &(&1.events_duplicate + &2)), + "events_rejected" => Enum.reduce(runtimes, 0, &(&1.events_rejected + &2)), + "query_runs" => Enum.reduce(runtimes, 0, &(&1.query_runs + &2)), + "subscription_restarts" => Enum.reduce(runtimes, 0, &(&1.subscription_restarts + &2)), + "reconnects" => Enum.reduce(runtimes, 0, &(&1.reconnects + &2)) + } + end + + defp health_summary(state) do + failing_servers = + state.runtime + |> Enum.flat_map(fn {server_id, runtime} -> + if is_binary(runtime.last_error) and runtime.last_error != "" do + [%{"id" => server_id, "reason" => runtime.last_error}] + else + [] + end + end) + + %{ + "status" => if(failing_servers == [], do: "ok", else: "degraded"), + "servers_total" => map_size(state.servers), + "servers_connected" => + Enum.count(state.runtime, fn {_id, runtime} -> runtime.connected? end), + "servers_failing" => failing_servers + } + end + + defp load_state(path) do + case File.read(path) do + {:ok, payload} -> + case decode_persisted_state(payload, path) do + {:ok, state} -> + state + + {:error, reason} -> + Logger.warning("failed to load sync state from #{path}: #{inspect(reason)}") + empty_state(path) + end + + {:error, :enoent} -> + empty_state(path) + + {:error, reason} -> + Logger.warning("failed to read sync state from #{path}: #{inspect(reason)}") + empty_state(path) + end + end + + defp decode_persisted_state(payload, path) do + with {:ok, decoded} <- JSON.decode(payload), + {:ok, servers} <- decode_servers(Map.get(decoded, "servers", %{})), + {:ok, runtime} <- decode_runtime(Map.get(decoded, "runtime", %{}), servers) do + {:ok, %{path: path, servers: servers, runtime: runtime}} + end + end + + defp decode_servers(servers) when is_map(servers) do + Enum.reduce_while(servers, {:ok, %{}}, fn {_id, server_payload}, {:ok, acc} -> + case normalize_server(server_payload) do + {:ok, server} -> {:cont, {:ok, Map.put(acc, server.id, server)}} + {:error, reason} -> {:halt, {:error, reason}} + end + end) + end + + defp decode_servers(_servers), do: {:error, :invalid_servers_state} + + defp decode_runtime(runtime_payload, servers) + when is_map(runtime_payload) and is_map(servers) do + runtime = + Enum.reduce(servers, %{}, fn {server_id, server}, acc -> + decoded_runtime = + runtime_payload + |> Map.get(server_id) + |> normalize_runtime(server) + + Map.put(acc, server_id, decoded_runtime) + end) + + {:ok, runtime} + end + + defp decode_runtime(_runtime_payload, _servers), do: {:error, :invalid_runtime_state} + + defp normalize_runtime(nil, server), do: default_runtime(server) + + defp normalize_runtime(runtime, server) when is_map(runtime) do + state = + runtime + |> Map.put(:server_id, server.id) + |> Map.put(:state, normalize_runtime_state(fetch_value(runtime, :state))) + |> Map.put(:connected?, fetch_boolean(runtime, :connected?) || false) + |> Map.put(:last_connected_at, fetch_string_or_nil(runtime, :last_connected_at)) + |> Map.put(:last_disconnected_at, fetch_string_or_nil(runtime, :last_disconnected_at)) + |> Map.put(:last_sync_started_at, fetch_string_or_nil(runtime, :last_sync_started_at)) + |> Map.put(:last_sync_completed_at, fetch_string_or_nil(runtime, :last_sync_completed_at)) + |> Map.put(:last_event_received_at, fetch_string_or_nil(runtime, :last_event_received_at)) + |> Map.put(:last_eose_at, fetch_string_or_nil(runtime, :last_eose_at)) + |> Map.put(:last_remote_eose_at, fetch_string_or_nil(runtime, :last_remote_eose_at)) + |> Map.put(:last_error, fetch_string_or_nil(runtime, :last_error)) + |> Map.put(:reconnect_attempts, fetch_non_neg_integer(runtime, :reconnect_attempts)) + |> Map.put(:events_received, fetch_non_neg_integer(runtime, :events_received)) + |> Map.put(:events_accepted, fetch_non_neg_integer(runtime, :events_accepted)) + |> Map.put(:events_duplicate, fetch_non_neg_integer(runtime, :events_duplicate)) + |> Map.put(:events_rejected, fetch_non_neg_integer(runtime, :events_rejected)) + |> Map.put(:query_runs, fetch_non_neg_integer(runtime, :query_runs)) + |> Map.put(:subscription_restarts, fetch_non_neg_integer(runtime, :subscription_restarts)) + |> Map.put(:reconnects, fetch_non_neg_integer(runtime, :reconnects)) + + maybe_align_runtime_with_server(state, server) + end + + defp normalize_runtime(_runtime, server), do: default_runtime(server) + + defp persist_state(%{path: path} = state) do + temp_path = path <> ".tmp" + + with :ok <- File.mkdir_p(Path.dirname(path)), + :ok <- File.write(temp_path, JSON.encode!(encode_state(state))), + :ok <- File.rename(temp_path, path) do + :ok + else + {:error, reason} -> + _ = File.rm(temp_path) + {:error, reason} + end + end + + defp encode_state(state) do + %{ + "version" => 1, + "servers" => + Map.new(state.servers, fn {server_id, server} -> {server_id, encode_server(server)} end), + "runtime" => + Map.new(state.runtime, fn {server_id, runtime} -> {server_id, encode_runtime(runtime)} end) + } + end + + defp encode_server(server) do + %{ + "id" => server.id, + "url" => server.url, + "enabled?" => server.enabled?, + "auth_pubkey" => server.auth_pubkey, + "filters" => server.filters, + "mode" => Atom.to_string(server.mode), + "overlap_window_seconds" => server.overlap_window_seconds, + "auth" => %{"type" => Atom.to_string(server.auth.type)}, + "tls" => %{ + "mode" => Atom.to_string(server.tls.mode), + "hostname" => server.tls.hostname, + "pins" => + Enum.map(server.tls.pins, fn pin -> + %{ + "type" => Atom.to_string(pin.type), + "value" => pin.value + } + end) + }, + "metadata" => server.metadata + } + end + + defp encode_runtime(runtime) do + %{ + "server_id" => runtime.server_id, + "state" => Atom.to_string(runtime.state), + "connected?" => runtime.connected?, + "last_connected_at" => runtime.last_connected_at, + "last_disconnected_at" => runtime.last_disconnected_at, + "last_sync_started_at" => runtime.last_sync_started_at, + "last_sync_completed_at" => runtime.last_sync_completed_at, + "last_event_received_at" => runtime.last_event_received_at, + "last_eose_at" => runtime.last_eose_at, + "reconnect_attempts" => runtime.reconnect_attempts, + "last_error" => runtime.last_error, + "events_received" => runtime.events_received, + "events_accepted" => runtime.events_accepted, + "events_duplicate" => runtime.events_duplicate, + "events_rejected" => runtime.events_rejected, + "query_runs" => runtime.query_runs, + "subscription_restarts" => runtime.subscription_restarts, + "reconnects" => runtime.reconnects, + "last_remote_eose_at" => runtime.last_remote_eose_at + } + end + + defp empty_state(path) do + %{path: path, servers: %{}, runtime: %{}} + end + + defp normalize_server(server) when is_map(server) do + with {:ok, id} <- normalize_non_empty_string(fetch_value(server, :id), :invalid_server_id), + {:ok, {url, host}} <- normalize_url(fetch_value(server, :url)), + {:ok, enabled?} <- normalize_boolean(fetch_value(server, :enabled?), true), + {:ok, auth_pubkey} <- normalize_pubkey(fetch_value(server, :auth_pubkey)), + {:ok, filters} <- normalize_filters(fetch_value(server, :filters)), + {:ok, mode} <- normalize_mode(fetch_value(server, :mode)), + {:ok, overlap_window_seconds} <- + normalize_overlap_window(fetch_value(server, :overlap_window_seconds)), + {:ok, auth} <- normalize_auth(fetch_value(server, :auth)), + {:ok, tls} <- normalize_tls(fetch_value(server, :tls), host), + {:ok, metadata} <- normalize_metadata(fetch_value(server, :metadata)) do + {:ok, + %{ + id: id, + url: url, + enabled?: enabled?, + auth_pubkey: auth_pubkey, + filters: filters, + mode: mode, + overlap_window_seconds: overlap_window_seconds, + auth: auth, + tls: tls, + metadata: metadata + }} + end + end + + defp normalize_server(_server), do: {:error, :invalid_server} + + defp normalize_url(url) when is_binary(url) and url != "" do + uri = URI.parse(url) + + if uri.scheme in ["ws", "wss"] and is_binary(uri.host) and uri.host != "" do + {:ok, {URI.to_string(uri), uri.host}} + else + {:error, :invalid_url} + end + end + + defp normalize_url(_url), do: {:error, :invalid_url} + + defp normalize_pubkey(pubkey) when is_binary(pubkey) do + normalized = String.downcase(pubkey) + + if String.match?(normalized, @hex64) do + {:ok, normalized} + else + {:error, :invalid_auth_pubkey} + end + end + + defp normalize_pubkey(_pubkey), do: {:error, :invalid_auth_pubkey} + + defp normalize_filters(filters) when is_list(filters) do + normalized_filters = Enum.map(filters, &normalize_filter_map/1) + + with :ok <- Filter.validate_filters(normalized_filters) do + {:ok, normalized_filters} + end + end + + defp normalize_filters(_filters), do: {:error, :invalid_filters} + + defp normalize_mode(nil), do: {:ok, @default_mode} + defp normalize_mode(:req_stream), do: {:ok, :req_stream} + defp normalize_mode("req_stream"), do: {:ok, :req_stream} + defp normalize_mode(_mode), do: {:error, :invalid_mode} + + defp normalize_overlap_window(nil), do: {:ok, @default_overlap_window_seconds} + + defp normalize_overlap_window(seconds) when is_integer(seconds) and seconds >= 0, + do: {:ok, seconds} + + defp normalize_overlap_window(_seconds), do: {:error, :invalid_overlap_window_seconds} + + defp normalize_auth(nil), do: {:ok, %{type: @default_auth_type}} + + defp normalize_auth(auth) when is_map(auth) do + with {:ok, type} <- normalize_auth_type(fetch_value(auth, :type)) do + {:ok, %{type: type}} + end + end + + defp normalize_auth(_auth), do: {:error, :invalid_auth} + + defp normalize_auth_type(nil), do: {:ok, @default_auth_type} + defp normalize_auth_type(:nip42), do: {:ok, :nip42} + defp normalize_auth_type("nip42"), do: {:ok, :nip42} + defp normalize_auth_type(_type), do: {:error, :invalid_auth_type} + + defp normalize_tls(tls, host) when is_map(tls) do + with {:ok, mode} <- normalize_tls_mode(fetch_value(tls, :mode)), + {:ok, hostname} <- normalize_hostname(fetch_value(tls, :hostname) || host), + {:ok, pins} <- normalize_tls_pins(fetch_value(tls, :pins)) do + {:ok, %{mode: mode, hostname: hostname, pins: pins}} + end + end + + defp normalize_tls(_tls, _host), do: {:error, :invalid_tls} + + defp normalize_tls_mode(nil), do: {:ok, @default_tls_mode} + defp normalize_tls_mode(:required), do: {:ok, :required} + defp normalize_tls_mode("required"), do: {:ok, :required} + defp normalize_tls_mode(_mode), do: {:error, :invalid_tls_mode} + + defp normalize_hostname(hostname) when is_binary(hostname) and hostname != "", + do: {:ok, hostname} + + defp normalize_hostname(_hostname), do: {:error, :invalid_tls_hostname} + + defp normalize_tls_pins(pins) when is_list(pins) and pins != [] do + Enum.reduce_while(pins, {:ok, []}, fn pin, {:ok, acc} -> + case normalize_tls_pin(pin) do + {:ok, normalized_pin} -> {:cont, {:ok, [normalized_pin | acc]}} + {:error, reason} -> {:halt, {:error, reason}} + end + end) + |> case do + {:ok, normalized_pins} -> {:ok, Enum.reverse(normalized_pins)} + error -> error + end + end + + defp normalize_tls_pins(_pins), do: {:error, :invalid_tls_pins} + + defp normalize_tls_pin(pin) when is_map(pin) do + with {:ok, type} <- normalize_tls_pin_type(fetch_value(pin, :type)), + {:ok, value} <- normalize_non_empty_string(fetch_value(pin, :value), :invalid_tls_pin) do + {:ok, %{type: type, value: value}} + end + end + + defp normalize_tls_pin(_pin), do: {:error, :invalid_tls_pin} + + defp normalize_tls_pin_type(:spki_sha256), do: {:ok, :spki_sha256} + defp normalize_tls_pin_type("spki_sha256"), do: {:ok, :spki_sha256} + defp normalize_tls_pin_type(_type), do: {:error, :invalid_tls_pin} + + defp normalize_metadata(nil), do: {:ok, %{}} + defp normalize_metadata(metadata) when is_map(metadata), do: {:ok, metadata} + defp normalize_metadata(_metadata), do: {:error, :invalid_metadata} + + defp normalize_boolean(nil, default), do: {:ok, default} + defp normalize_boolean(value, _default) when is_boolean(value), do: {:ok, value} + defp normalize_boolean(_value, _default), do: {:error, :invalid_enabled_flag} + + defp normalize_non_empty_string(value, _reason) when is_binary(value) and value != "", + do: {:ok, value} + + defp normalize_non_empty_string(_value, reason), do: {:error, reason} + + defp normalize_filter_map(filter) when is_map(filter) do + Map.new(filter, fn + {key, value} when is_atom(key) -> {Atom.to_string(key), value} + {key, value} -> {key, value} + end) + end + + defp normalize_filter_map(filter), do: filter + + defp normalize_runtime_state("running"), do: :running + defp normalize_runtime_state(:running), do: :running + defp normalize_runtime_state("stopped"), do: :stopped + defp normalize_runtime_state(:stopped), do: :stopped + defp normalize_runtime_state(_state), do: :stopped + + defp fetch_non_neg_integer(map, key) do + case fetch_value(map, key) do + value when is_integer(value) and value >= 0 -> value + _other -> 0 + end + end + + defp fetch_boolean(map, key) do + case fetch_value(map, key) do + value when is_boolean(value) -> value + _other -> nil + end + end + + defp fetch_string_or_nil(map, key) do + case fetch_value(map, key) do + value when is_binary(value) and value != "" -> value + _other -> nil + end + end + + defp fetch_value(map, key) when is_map(map) do + Map.get(map, key) || Map.get(map, Atom.to_string(key)) + end + + defp config_path do + :parrhesia + |> Application.get_env(:sync, []) + |> Keyword.get(:path) + end + + defp now do + DateTime.utc_now() + |> DateTime.truncate(:second) + |> DateTime.to_iso8601() + end +end diff --git a/lib/parrhesia/application.ex b/lib/parrhesia/application.ex index 6528164..1ed720b 100644 --- a/lib/parrhesia/application.ex +++ b/lib/parrhesia/application.ex @@ -11,6 +11,7 @@ defmodule Parrhesia.Application do Parrhesia.Storage.Supervisor, Parrhesia.Subscriptions.Supervisor, Parrhesia.Auth.Supervisor, + Parrhesia.Sync.Supervisor, Parrhesia.Policy.Supervisor, Parrhesia.Web.Endpoint, Parrhesia.Web.MetricsEndpoint, diff --git a/lib/parrhesia/sync/supervisor.ex b/lib/parrhesia/sync/supervisor.ex new file mode 100644 index 0000000..5dc83fd --- /dev/null +++ b/lib/parrhesia/sync/supervisor.ex @@ -0,0 +1,20 @@ +defmodule Parrhesia.Sync.Supervisor do + @moduledoc """ + Supervision entrypoint for sync control-plane processes. + """ + + use Supervisor + + def start_link(init_arg \\ []) do + Supervisor.start_link(__MODULE__, init_arg, name: __MODULE__) + end + + @impl true + def init(_init_arg) do + children = [ + {Parrhesia.API.Sync.Manager, []} + ] + + Supervisor.init(children, strategy: :one_for_one) + end +end diff --git a/test/parrhesia/api/sync_test.exs b/test/parrhesia/api/sync_test.exs new file mode 100644 index 0000000..c6b3978 --- /dev/null +++ b/test/parrhesia/api/sync_test.exs @@ -0,0 +1,212 @@ +defmodule Parrhesia.API.SyncTest do + use ExUnit.Case, async: false + + alias Ecto.Adapters.SQL.Sandbox + alias Parrhesia.API.Admin + alias Parrhesia.API.Sync + alias Parrhesia.API.Sync.Manager + alias Parrhesia.Repo + + setup do + :ok = Sandbox.checkout(Repo) + :ok + end + + test "put_server stores normalized config and persists it across restart" do + {manager, path, pid} = start_sync_manager() + + assert {:ok, stored_server} = Sync.put_server(valid_server(), manager: manager) + assert stored_server.id == "tribes-primary" + assert stored_server.mode == :req_stream + assert stored_server.auth.type == :nip42 + assert stored_server.tls.mode == :required + assert stored_server.tls.hostname == "relay-a.example" + assert stored_server.runtime.state == :running + assert File.exists?(path) + + assert {:ok, fetched_server} = Sync.get_server("tribes-primary", manager: manager) + assert fetched_server == stored_server + + assert {:ok, [listed_server]} = Sync.list_servers(manager: manager) + assert listed_server.id == "tribes-primary" + + monitor_ref = Process.monitor(pid) + assert :ok = GenServer.stop(pid) + assert_receive {:DOWN, ^monitor_ref, :process, ^pid, :normal} + assert {:ok, persisted_server} = wait_for_server(manager, "tribes-primary") + assert persisted_server.id == "tribes-primary" + assert persisted_server.tls.hostname == "relay-a.example" + assert persisted_server.runtime.state == :running + end + + test "start_server stop_server and sync_now update runtime stats" do + {manager, _path, _pid} = start_sync_manager() + + disabled_server = valid_server(%{"id" => "tribes-disabled", "enabled?" => false}) + assert {:ok, stored_server} = Sync.put_server(disabled_server, manager: manager) + assert stored_server.runtime.state == :stopped + + assert :ok = Sync.start_server("tribes-disabled", manager: manager) + + assert {:ok, started_server} = Sync.get_server("tribes-disabled", manager: manager) + assert started_server.runtime.state == :running + + assert :ok = Sync.sync_now("tribes-disabled", manager: manager) + + assert {:ok, stats} = Sync.server_stats("tribes-disabled", manager: manager) + assert stats["server_id"] == "tribes-disabled" + assert stats["state"] == "running" + assert stats["query_runs"] == 1 + assert is_binary(stats["last_sync_started_at"]) + assert is_binary(stats["last_sync_completed_at"]) + + assert :ok = Sync.stop_server("tribes-disabled", manager: manager) + + assert {:ok, stopped_server} = Sync.get_server("tribes-disabled", manager: manager) + assert stopped_server.runtime.state == :stopped + assert is_binary(stopped_server.runtime.last_disconnected_at) + + assert {:ok, sync_stats} = Sync.sync_stats(manager: manager) + assert sync_stats["servers_total"] == 1 + assert sync_stats["servers_enabled"] == 0 + assert sync_stats["servers_running"] == 0 + assert sync_stats["query_runs"] == 1 + + assert {:ok, sync_health} = Sync.sync_health(manager: manager) + + assert sync_health == %{ + "status" => "ok", + "servers_total" => 1, + "servers_connected" => 0, + "servers_failing" => [] + } + end + + test "put_server rejects invalid sync server shapes" do + {manager, _path, _pid} = start_sync_manager() + + assert {:error, :invalid_url} = + Sync.put_server(Map.put(valid_server(), "url", "https://relay-a.example"), + manager: manager + ) + + assert {:error, :empty_filters} = + Sync.put_server(Map.put(valid_server(), "filters", []), manager: manager) + + assert {:error, :invalid_tls_pins} = + Sync.put_server( + put_in(valid_server()["tls"]["pins"], []), + manager: manager + ) + end + + test "admin executes sync methods against an injected sync manager" do + {manager, _path, _pid} = start_sync_manager() + + assert {:ok, created_server} = + Admin.execute("sync_put_server", valid_server(%{"id" => "tribes-admin"}), + manager: manager + ) + + assert created_server.id == "tribes-admin" + + assert {:ok, listed_servers} = Admin.execute("sync_list_servers", %{}, manager: manager) + assert Enum.any?(listed_servers, &(&1.id == "tribes-admin")) + + assert {:ok, %{"ok" => true}} = + Admin.execute("sync_sync_now", %{"id" => "tribes-admin"}, manager: manager) + + assert {:ok, sync_stats} = Admin.stats(manager: manager) + assert sync_stats["sync"]["servers_total"] == 1 + assert sync_stats["sync"]["query_runs"] == 1 + + assert {:ok, health} = Admin.health(manager: manager) + assert health["status"] == "ok" + assert health["sync"]["servers_total"] == 1 + end + + defp start_sync_manager do + path = unique_sync_path() + manager = {:global, {:sync_manager, System.unique_integer([:positive, :monotonic])}} + pid = start_supervised!({Manager, name: manager, path: path}) + + {manager, path, pid} + end + + defp valid_server(overrides \\ %{}) do + Map.merge( + %{ + "id" => "tribes-primary", + "url" => "wss://relay-a.example/relay", + "enabled?" => true, + "auth_pubkey" => String.duplicate("a", 64), + "filters" => [ + %{ + "kinds" => [5000], + "#r" => ["tribes.accounts.user"] + } + ], + "tls" => %{ + "pins" => [ + %{ + "type" => "spki_sha256", + "value" => "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA=" + } + ] + }, + "metadata" => %{"cluster" => "primary"} + }, + overrides + ) + end + + defp unique_sync_path do + path = + Path.join( + System.tmp_dir!(), + "parrhesia_sync_#{System.unique_integer([:positive, :monotonic])}.json" + ) + + on_exit(fn -> + _ = File.rm(path) + end) + + path + end + + defp wait_for_server(manager, server_id, attempts \\ 10) + + defp wait_for_server(_manager, _server_id, 0), do: :error + + defp wait_for_server(manager, server_id, attempts) do + result = + try do + Sync.get_server(server_id, manager: manager) + catch + :exit, _reason -> {:error, :noproc} + end + + case result do + {:ok, server} -> + {:ok, server} + + :error -> + receive do + after + 10 -> wait_for_server(manager, server_id, attempts - 1) + end + + {:error, :noproc} -> + receive do + after + 10 -> wait_for_server(manager, server_id, attempts - 1) + end + + {:error, {:noproc, _details}} -> + receive do + after + 10 -> wait_for_server(manager, server_id, attempts - 1) + end + end + end +end diff --git a/test/parrhesia/application_test.exs b/test/parrhesia/application_test.exs index 1dd2580..bb77ee4 100644 --- a/test/parrhesia/application_test.exs +++ b/test/parrhesia/application_test.exs @@ -8,6 +8,7 @@ defmodule Parrhesia.ApplicationTest do assert is_pid(Process.whereis(Parrhesia.Storage.Supervisor)) assert is_pid(Process.whereis(Parrhesia.Subscriptions.Supervisor)) assert is_pid(Process.whereis(Parrhesia.Auth.Supervisor)) + assert is_pid(Process.whereis(Parrhesia.Sync.Supervisor)) assert is_pid(Process.whereis(Parrhesia.Policy.Supervisor)) assert is_pid(Process.whereis(Parrhesia.Web.Endpoint)) assert is_pid(Process.whereis(Parrhesia.Web.MetricsEndpoint)) @@ -20,6 +21,7 @@ defmodule Parrhesia.ApplicationTest do assert is_pid(Process.whereis(Parrhesia.Auth.Challenges)) assert is_pid(Process.whereis(Parrhesia.API.Identity.Manager)) + assert is_pid(Process.whereis(Parrhesia.API.Sync.Manager)) if negentropy_enabled?() do assert is_pid(Process.whereis(Parrhesia.Negentropy.Sessions))