diff --git a/config/config.exs b/config/config.exs index 7ea3353..fe9e944 100644 --- a/config/config.exs +++ b/config/config.exs @@ -10,7 +10,8 @@ config :parrhesia, private_key: nil ], sync: [ - path: nil + path: nil, + start_workers?: true ], limits: [ max_frame_bytes: 1_048_576, diff --git a/config/runtime.exs b/config/runtime.exs index b0aa688..462981e 100644 --- a/config/runtime.exs +++ b/config/runtime.exs @@ -456,7 +456,12 @@ if config_env() == :prod do private_key: string_env.("PARRHESIA_IDENTITY_PRIVATE_KEY", nil) ], sync: [ - path: string_env.("PARRHESIA_SYNC_PATH", nil) + path: string_env.("PARRHESIA_SYNC_PATH", nil), + start_workers?: + bool_env.( + "PARRHESIA_SYNC_START_WORKERS", + Keyword.get(Application.get_env(:parrhesia, :sync, []), :start_workers?, true) + ) ], moderation_cache_enabled: bool_env.("PARRHESIA_MODERATION_CACHE_ENABLED", moderation_cache_enabled_default), diff --git a/config/test.exs b/config/test.exs index 0c585cb..235eed7 100644 --- a/config/test.exs +++ b/config/test.exs @@ -20,7 +20,8 @@ config :parrhesia, private_key: nil ], sync: [ - path: Path.join(System.tmp_dir!(), "parrhesia_test_sync.json") + path: Path.join(System.tmp_dir!(), "parrhesia_test_sync.json"), + start_workers?: false ], features: [verify_event_signatures: false] diff --git a/lib/parrhesia/api/sync/manager.ex b/lib/parrhesia/api/sync/manager.ex index 48ba7f0..0b60957 100644 --- a/lib/parrhesia/api/sync/manager.ex +++ b/lib/parrhesia/api/sync/manager.ex @@ -5,6 +5,8 @@ defmodule Parrhesia.API.Sync.Manager do alias Parrhesia.API.Sync alias Parrhesia.Protocol.Filter + alias Parrhesia.Sync.Transport.WebSockexClient + alias Parrhesia.Sync.Worker require Logger @@ -19,61 +21,63 @@ defmodule Parrhesia.API.Sync.Manager do GenServer.start_link(__MODULE__, opts, name: name) end - def put_server(name, server) do - GenServer.call(name, {:put_server, server}) - end + def put_server(name, server), do: GenServer.call(name, {:put_server, server}) + def remove_server(name, server_id), do: GenServer.call(name, {:remove_server, server_id}) + def get_server(name, server_id), do: GenServer.call(name, {:get_server, server_id}) + def list_servers(name), do: GenServer.call(name, :list_servers) + def start_server(name, server_id), do: GenServer.call(name, {:start_server, server_id}) + def stop_server(name, server_id), do: GenServer.call(name, {:stop_server, server_id}) + def sync_now(name, server_id), do: GenServer.call(name, {:sync_now, server_id}) + def server_stats(name, server_id), do: GenServer.call(name, {:server_stats, server_id}) + def sync_stats(name), do: GenServer.call(name, :sync_stats) + def sync_health(name), do: GenServer.call(name, :sync_health) - def remove_server(name, server_id) do - GenServer.call(name, {:remove_server, server_id}) - end - - def get_server(name, server_id) do - GenServer.call(name, {:get_server, server_id}) - end - - def list_servers(name) do - GenServer.call(name, :list_servers) - end - - def start_server(name, server_id) do - GenServer.call(name, {:start_server, server_id}) - end - - def stop_server(name, server_id) do - GenServer.call(name, {:stop_server, server_id}) - end - - def sync_now(name, server_id) do - GenServer.call(name, {:sync_now, server_id}) - end - - def server_stats(name, server_id) do - GenServer.call(name, {:server_stats, server_id}) - end - - def sync_stats(name) do - GenServer.call(name, :sync_stats) - end - - def sync_health(name) do - GenServer.call(name, :sync_health) + def runtime_event(name, server_id, kind, attrs \\ %{}) do + GenServer.cast(name, {:runtime_event, server_id, kind, attrs}) end @impl true def init(opts) do path = Keyword.get(opts, :path, config_path() || Sync.default_path()) - {:ok, load_state(path)} + + state = + load_state(path) + |> Map.merge(%{ + start_workers?: Keyword.get(opts, :start_workers?, config_value(:start_workers?, true)), + worker_supervisor: Keyword.get(opts, :worker_supervisor, Parrhesia.Sync.WorkerSupervisor), + worker_registry: Keyword.get(opts, :worker_registry, Parrhesia.Sync.WorkerRegistry), + transport_module: Keyword.get(opts, :transport_module, WebSockexClient), + relay_info_opts: Keyword.get(opts, :relay_info_opts, []), + transport_opts: Keyword.get(opts, :transport_opts, []) + }) + + {:ok, state, {:continue, :bootstrap}} + end + + @impl true + def handle_continue(:bootstrap, state) do + next_state = + if state.start_workers? do + state.servers + |> Map.keys() + |> Enum.reduce(state, fn server_id, acc -> maybe_start_worker(acc, server_id) end) + else + state + end + + {:noreply, next_state} end @impl true def handle_call({:put_server, server}, _from, state) do case normalize_server(server) do {:ok, normalized_server} -> - updated_state = put_server_state(state, normalized_server) + updated_state = + state + |> put_server_state(normalized_server) + |> persist_and_reconcile!(normalized_server.id) - with :ok <- persist_state(updated_state) do - {:reply, {:ok, merged_server(updated_state, normalized_server.id)}, updated_state} - end + {:reply, {:ok, merged_server(updated_state, normalized_server.id)}, updated_state} {:error, reason} -> {:reply, {:error, reason}, state} @@ -82,14 +86,14 @@ defmodule Parrhesia.API.Sync.Manager do def handle_call({:remove_server, server_id}, _from, state) do if Map.has_key?(state.servers, server_id) do - updated_state = %{ + next_state = state - | servers: Map.delete(state.servers, server_id), - runtime: Map.delete(state.runtime, server_id) - } + |> stop_worker_if_running(server_id) + |> Map.update!(:servers, &Map.delete(&1, server_id)) + |> Map.update!(:runtime, &Map.delete(&1, server_id)) - with :ok <- persist_state(updated_state) do - {:reply, :ok, updated_state} + with :ok <- persist_state(next_state) do + {:reply, :ok, next_state} end else {:reply, {:error, :not_found}, state} @@ -114,32 +118,69 @@ defmodule Parrhesia.API.Sync.Manager do end def handle_call({:start_server, server_id}, _from, state) do - with {:ok, updated_state} <- update_runtime(state, server_id, &mark_running/1), - :ok <- persist_state(updated_state) do - {:reply, :ok, updated_state} - else - :error -> {:reply, {:error, :not_found}, state} - {:error, reason} -> {:reply, {:error, reason}, state} + case Map.fetch(state.runtime, server_id) do + {:ok, runtime} -> + next_state = + state + |> put_runtime(server_id, %{runtime | state: :running, last_error: nil}) + |> persist_and_reconcile!(server_id) + + {:reply, :ok, next_state} + + :error -> + {:reply, {:error, :not_found}, state} end end def handle_call({:stop_server, server_id}, _from, state) do - with {:ok, updated_state} <- update_runtime(state, server_id, &mark_stopped/1), - :ok <- persist_state(updated_state) do - {:reply, :ok, updated_state} - else - :error -> {:reply, {:error, :not_found}, state} - {:error, reason} -> {:reply, {:error, reason}, state} + case Map.fetch(state.runtime, server_id) do + {:ok, runtime} -> + next_runtime = + runtime + |> Map.put(:state, :stopped) + |> Map.put(:connected?, false) + |> Map.put(:last_disconnected_at, now()) + + next_state = + state + |> stop_worker_if_running(server_id) + |> put_runtime(server_id, next_runtime) + + with :ok <- persist_state(next_state) do + {:reply, :ok, next_state} + end + + :error -> + {:reply, {:error, :not_found}, state} end end def handle_call({:sync_now, server_id}, _from, state) do - with {:ok, updated_state} <- update_runtime(state, server_id, &record_sync_now/1), - :ok <- persist_state(updated_state) do - {:reply, :ok, updated_state} - else - :error -> {:reply, {:error, :not_found}, state} - {:error, reason} -> {:reply, {:error, reason}, state} + case {Map.has_key?(state.runtime, server_id), state.start_workers?, + lookup_worker(state, server_id)} do + {false, _start_workers?, _worker_pid} -> + {:reply, {:error, :not_found}, state} + + {true, true, worker_pid} when is_pid(worker_pid) -> + Worker.sync_now(worker_pid) + {:reply, :ok, state} + + {true, true, nil} -> + next_state = + state + |> put_in([:runtime, server_id, :state], :running) + |> persist_and_reconcile!(server_id) + + {:reply, :ok, next_state} + + {true, false, _worker_pid} -> + next_state = + apply_runtime_event(state, server_id, :sync_started, %{}) + |> apply_runtime_event(server_id, :sync_completed, %{}) + + with :ok <- persist_state(next_state) do + {:reply, :ok, next_state} + end end end @@ -150,19 +191,39 @@ defmodule Parrhesia.API.Sync.Manager do end end - def handle_call(:sync_stats, _from, state) do - {:reply, {:ok, aggregate_stats(state)}, state} + def handle_call(:sync_stats, _from, state), do: {:reply, {:ok, aggregate_stats(state)}, state} + def handle_call(:sync_health, _from, state), do: {:reply, {:ok, health_summary(state)}, state} + + @impl true + def handle_cast({:runtime_event, server_id, kind, attrs}, state) do + next_state = + state + |> apply_runtime_event(server_id, kind, attrs) + |> persist_state_if_known_server(server_id) + + {:noreply, next_state} end - def handle_call(:sync_health, _from, state) do - {:reply, {:ok, health_summary(state)}, state} + defp persist_state_if_known_server(state, server_id) do + if Map.has_key?(state.runtime, server_id) do + case persist_state(state) do + :ok -> + state + + {:error, reason} -> + Logger.warning("failed to persist sync runtime for #{server_id}: #{inspect(reason)}") + state + end + else + state + end end defp put_server_state(state, server) do runtime = case Map.get(state.runtime, server.id) do nil -> default_runtime(server) - existing_runtime -> maybe_align_runtime_with_server(existing_runtime, server) + existing_runtime -> existing_runtime end %{ @@ -172,72 +233,105 @@ defmodule Parrhesia.API.Sync.Manager do } end - defp maybe_align_runtime_with_server(runtime, %{enabled?: false}) do - runtime - |> Map.put(:state, :stopped) - |> Map.put(:connected?, false) - |> Map.put(:last_disconnected_at, now()) + defp put_runtime(state, server_id, runtime) do + %{state | runtime: Map.put(state.runtime, server_id, runtime)} end - defp maybe_align_runtime_with_server(runtime, _server), do: runtime - - defp default_runtime(server) do - %{ - server_id: server.id, - state: if(server.enabled?, do: :running, else: :stopped), - connected?: false, - last_connected_at: nil, - last_disconnected_at: nil, - last_sync_started_at: nil, - last_sync_completed_at: nil, - last_event_received_at: nil, - last_eose_at: nil, - reconnect_attempts: 0, - last_error: nil, - events_received: 0, - events_accepted: 0, - events_duplicate: 0, - events_rejected: 0, - query_runs: 0, - subscription_restarts: 0, - reconnects: 0, - last_remote_eose_at: nil - } + defp persist_and_reconcile!(state, server_id) do + :ok = persist_state(state) + reconcile_worker(state, server_id) end - defp mark_running(runtime) do - runtime - |> Map.put(:state, :running) - |> Map.put(:last_error, nil) - end + defp reconcile_worker(state, server_id) do + cond do + not state.start_workers? -> + state - defp mark_stopped(runtime) do - runtime - |> Map.put(:state, :stopped) - |> Map.put(:connected?, false) - |> Map.put(:last_disconnected_at, now()) - end + desired_running?(state, server_id) -> + state + |> stop_worker_if_running(server_id) + |> maybe_start_worker(server_id) - defp record_sync_now(runtime) do - sync_timestamp = now() - - runtime - |> Map.update!(:query_runs, &(&1 + 1)) - |> Map.put(:last_sync_started_at, sync_timestamp) - |> Map.put(:last_sync_completed_at, sync_timestamp) - end - - defp update_runtime(state, server_id, fun) do - case Map.fetch(state.runtime, server_id) do - {:ok, runtime} -> - updated_runtime = fun.(runtime) - {:ok, %{state | runtime: Map.put(state.runtime, server_id, updated_runtime)}} - - :error -> - :error + true -> + stop_worker_if_running(state, server_id) end end + defp maybe_start_worker(state, server_id) do + cond do + not state.start_workers? -> + state + + not desired_running?(state, server_id) -> + state + + lookup_worker(state, server_id) != nil -> + state + + true -> + server = Map.fetch!(state.servers, server_id) + runtime = Map.fetch!(state.runtime, server_id) + + child_spec = %{ + id: {:sync_worker, server_id}, + start: + {Worker, :start_link, + [ + [ + name: via_tuple(server_id, state.worker_registry), + server: server, + runtime: runtime, + manager: self(), + transport_module: state.transport_module, + relay_info_opts: state.relay_info_opts, + transport_opts: state.transport_opts + ] + ]}, + restart: :transient + } + + case DynamicSupervisor.start_child(state.worker_supervisor, child_spec) do + {:ok, _pid} -> + state + + {:error, {:already_started, _pid}} -> + state + + {:error, reason} -> + Logger.warning("failed to start sync worker #{server_id}: #{inspect(reason)}") + state + end + end + end + + defp stop_worker_if_running(state, server_id) do + if worker_pid = lookup_worker(state, server_id) do + _ = Worker.stop(worker_pid) + end + + state + end + + defp desired_running?(state, server_id) do + case Map.fetch(state.runtime, server_id) do + {:ok, runtime} -> runtime.state == :running + :error -> false + end + end + + defp lookup_worker(state, server_id) do + case Registry.lookup(state.worker_registry, server_id) do + [{pid, _value}] -> pid + [] -> nil + end + catch + :exit, _reason -> nil + end + + defp via_tuple(server_id, registry) do + {:via, Registry, {registry, server_id}} + end + defp merged_server(state, server_id) do state.servers |> Map.fetch!(server_id) @@ -259,7 +353,9 @@ defmodule Parrhesia.API.Sync.Manager do "last_sync_started_at" => runtime.last_sync_started_at, "last_sync_completed_at" => runtime.last_sync_completed_at, "last_remote_eose_at" => runtime.last_remote_eose_at, - "last_error" => runtime.last_error + "last_error" => runtime.last_error, + "cursor_created_at" => runtime.cursor_created_at, + "cursor_event_id" => runtime.cursor_event_id } end @@ -301,6 +397,129 @@ defmodule Parrhesia.API.Sync.Manager do } end + defp apply_runtime_event(state, server_id, kind, attrs) do + case Map.fetch(state.runtime, server_id) do + {:ok, runtime} -> + updated_runtime = update_runtime_for_event(runtime, kind, attrs) + put_runtime(state, server_id, updated_runtime) + + :error -> + state + end + end + + defp update_runtime_for_event(runtime, :connected, _attrs) do + runtime + |> Map.put(:state, :running) + |> Map.put(:connected?, true) + |> Map.put(:last_connected_at, now()) + |> Map.put(:last_error, nil) + end + + defp update_runtime_for_event(runtime, :disconnected, attrs) do + reason = format_reason(Map.get(attrs, :reason)) + + runtime + |> Map.put(:connected?, false) + |> Map.put(:last_disconnected_at, now()) + |> Map.update!(:reconnects, &(&1 + 1)) + |> Map.put(:last_error, reason) + end + + defp update_runtime_for_event(runtime, :error, attrs) do + Map.put(runtime, :last_error, format_reason(Map.get(attrs, :reason))) + end + + defp update_runtime_for_event(runtime, :sync_started, _attrs) do + runtime + |> Map.put(:last_sync_started_at, now()) + |> Map.update!(:query_runs, &(&1 + 1)) + end + + defp update_runtime_for_event(runtime, :sync_completed, _attrs) do + timestamp = now() + + runtime + |> Map.put(:last_sync_completed_at, timestamp) + |> Map.put(:last_eose_at, timestamp) + |> Map.put(:last_remote_eose_at, timestamp) + end + + defp update_runtime_for_event(runtime, :subscription_restart, _attrs) do + Map.update!(runtime, :subscription_restarts, &(&1 + 1)) + end + + defp update_runtime_for_event(runtime, :cursor_advanced, attrs) do + runtime + |> Map.put(:cursor_created_at, Map.get(attrs, :created_at)) + |> Map.put(:cursor_event_id, Map.get(attrs, :event_id)) + end + + defp update_runtime_for_event(runtime, :event_result, attrs) do + event = Map.get(attrs, :event, %{}) + result = Map.get(attrs, :result) + + runtime + |> Map.update!(:events_received, &(&1 + 1)) + |> Map.put(:last_event_received_at, now()) + |> increment_result_counter(result) + |> maybe_put_last_error(attrs) + |> maybe_advance_runtime_cursor(event, result) + end + + defp update_runtime_for_event(runtime, _kind, _attrs), do: runtime + + defp increment_result_counter(runtime, :accepted), + do: Map.update!(runtime, :events_accepted, &(&1 + 1)) + + defp increment_result_counter(runtime, :duplicate), + do: Map.update!(runtime, :events_duplicate, &(&1 + 1)) + + defp increment_result_counter(runtime, :rejected), + do: Map.update!(runtime, :events_rejected, &(&1 + 1)) + + defp increment_result_counter(runtime, _result), do: runtime + + defp maybe_put_last_error(runtime, %{reason: nil}), do: runtime + + defp maybe_put_last_error(runtime, attrs), + do: Map.put(runtime, :last_error, format_reason(attrs[:reason])) + + defp maybe_advance_runtime_cursor(runtime, event, result) + when result in [:accepted, :duplicate] do + created_at = Map.get(event, "created_at") + event_id = Map.get(event, "id") + + cond do + not is_integer(created_at) or not is_binary(event_id) -> + runtime + + is_nil(runtime.cursor_created_at) -> + runtime + |> Map.put(:cursor_created_at, created_at) + |> Map.put(:cursor_event_id, event_id) + + created_at > runtime.cursor_created_at -> + runtime + |> Map.put(:cursor_created_at, created_at) + |> Map.put(:cursor_event_id, event_id) + + created_at == runtime.cursor_created_at and event_id > runtime.cursor_event_id -> + runtime + |> Map.put(:cursor_created_at, created_at) + |> Map.put(:cursor_event_id, event_id) + + true -> + runtime + end + end + + defp maybe_advance_runtime_cursor(runtime, _event, _result), do: runtime + + defp format_reason(nil), do: nil + defp format_reason(reason) when is_binary(reason), do: reason + defp format_reason(reason), do: inspect(reason) + defp load_state(path) do case File.read(path) do {:ok, payload} -> @@ -361,29 +580,29 @@ defmodule Parrhesia.API.Sync.Manager do defp normalize_runtime(nil, server), do: default_runtime(server) defp normalize_runtime(runtime, server) when is_map(runtime) do - state = - runtime - |> Map.put(:server_id, server.id) - |> Map.put(:state, normalize_runtime_state(fetch_value(runtime, :state))) - |> Map.put(:connected?, fetch_boolean(runtime, :connected?) || false) - |> Map.put(:last_connected_at, fetch_string_or_nil(runtime, :last_connected_at)) - |> Map.put(:last_disconnected_at, fetch_string_or_nil(runtime, :last_disconnected_at)) - |> Map.put(:last_sync_started_at, fetch_string_or_nil(runtime, :last_sync_started_at)) - |> Map.put(:last_sync_completed_at, fetch_string_or_nil(runtime, :last_sync_completed_at)) - |> Map.put(:last_event_received_at, fetch_string_or_nil(runtime, :last_event_received_at)) - |> Map.put(:last_eose_at, fetch_string_or_nil(runtime, :last_eose_at)) - |> Map.put(:last_remote_eose_at, fetch_string_or_nil(runtime, :last_remote_eose_at)) - |> Map.put(:last_error, fetch_string_or_nil(runtime, :last_error)) - |> Map.put(:reconnect_attempts, fetch_non_neg_integer(runtime, :reconnect_attempts)) - |> Map.put(:events_received, fetch_non_neg_integer(runtime, :events_received)) - |> Map.put(:events_accepted, fetch_non_neg_integer(runtime, :events_accepted)) - |> Map.put(:events_duplicate, fetch_non_neg_integer(runtime, :events_duplicate)) - |> Map.put(:events_rejected, fetch_non_neg_integer(runtime, :events_rejected)) - |> Map.put(:query_runs, fetch_non_neg_integer(runtime, :query_runs)) - |> Map.put(:subscription_restarts, fetch_non_neg_integer(runtime, :subscription_restarts)) - |> Map.put(:reconnects, fetch_non_neg_integer(runtime, :reconnects)) - - maybe_align_runtime_with_server(state, server) + %{ + server_id: server.id, + state: normalize_runtime_state(fetch_value(runtime, :state)), + connected?: fetch_boolean(runtime, :connected?) || false, + last_connected_at: fetch_string_or_nil(runtime, :last_connected_at), + last_disconnected_at: fetch_string_or_nil(runtime, :last_disconnected_at), + last_sync_started_at: fetch_string_or_nil(runtime, :last_sync_started_at), + last_sync_completed_at: fetch_string_or_nil(runtime, :last_sync_completed_at), + last_event_received_at: fetch_string_or_nil(runtime, :last_event_received_at), + last_eose_at: fetch_string_or_nil(runtime, :last_eose_at), + reconnect_attempts: fetch_non_neg_integer(runtime, :reconnect_attempts), + last_error: fetch_string_or_nil(runtime, :last_error), + events_received: fetch_non_neg_integer(runtime, :events_received), + events_accepted: fetch_non_neg_integer(runtime, :events_accepted), + events_duplicate: fetch_non_neg_integer(runtime, :events_duplicate), + events_rejected: fetch_non_neg_integer(runtime, :events_rejected), + query_runs: fetch_non_neg_integer(runtime, :query_runs), + subscription_restarts: fetch_non_neg_integer(runtime, :subscription_restarts), + reconnects: fetch_non_neg_integer(runtime, :reconnects), + last_remote_eose_at: fetch_string_or_nil(runtime, :last_remote_eose_at), + cursor_created_at: fetch_optional_integer(runtime, :cursor_created_at), + cursor_event_id: fetch_string_or_nil(runtime, :cursor_event_id) + } end defp normalize_runtime(_runtime, server), do: default_runtime(server) @@ -404,7 +623,7 @@ defmodule Parrhesia.API.Sync.Manager do defp encode_state(state) do %{ - "version" => 1, + "version" => 2, "servers" => Map.new(state.servers, fn {server_id, server} -> {server_id, encode_server(server)} end), "runtime" => @@ -457,7 +676,9 @@ defmodule Parrhesia.API.Sync.Manager do "query_runs" => runtime.query_runs, "subscription_restarts" => runtime.subscription_restarts, "reconnects" => runtime.reconnects, - "last_remote_eose_at" => runtime.last_remote_eose_at + "last_remote_eose_at" => runtime.last_remote_eose_at, + "cursor_created_at" => runtime.cursor_created_at, + "cursor_event_id" => runtime.cursor_event_id } end @@ -465,9 +686,35 @@ defmodule Parrhesia.API.Sync.Manager do %{path: path, servers: %{}, runtime: %{}} end + defp default_runtime(server) do + %{ + server_id: server.id, + state: if(server.enabled?, do: :running, else: :stopped), + connected?: false, + last_connected_at: nil, + last_disconnected_at: nil, + last_sync_started_at: nil, + last_sync_completed_at: nil, + last_event_received_at: nil, + last_eose_at: nil, + reconnect_attempts: 0, + last_error: nil, + events_received: 0, + events_accepted: 0, + events_duplicate: 0, + events_rejected: 0, + query_runs: 0, + subscription_restarts: 0, + reconnects: 0, + last_remote_eose_at: nil, + cursor_created_at: nil, + cursor_event_id: nil + } + end + defp normalize_server(server) when is_map(server) do with {:ok, id} <- normalize_non_empty_string(fetch_value(server, :id), :invalid_server_id), - {:ok, {url, host}} <- normalize_url(fetch_value(server, :url)), + {:ok, {url, host, scheme}} <- normalize_url(fetch_value(server, :url)), {:ok, enabled?} <- normalize_boolean(fetch_value(server, :enabled?), true), {:ok, auth_pubkey} <- normalize_pubkey(fetch_value(server, :auth_pubkey)), {:ok, filters} <- normalize_filters(fetch_value(server, :filters)), @@ -475,7 +722,7 @@ defmodule Parrhesia.API.Sync.Manager do {:ok, overlap_window_seconds} <- normalize_overlap_window(fetch_value(server, :overlap_window_seconds)), {:ok, auth} <- normalize_auth(fetch_value(server, :auth)), - {:ok, tls} <- normalize_tls(fetch_value(server, :tls), host), + {:ok, tls} <- normalize_tls(fetch_value(server, :tls), host, scheme), {:ok, metadata} <- normalize_metadata(fetch_value(server, :metadata)) do {:ok, %{ @@ -499,7 +746,7 @@ defmodule Parrhesia.API.Sync.Manager do uri = URI.parse(url) if uri.scheme in ["ws", "wss"] and is_binary(uri.host) and uri.host != "" do - {:ok, {URI.to_string(uri), uri.host}} + {:ok, {URI.to_string(uri), uri.host, uri.scheme}} else {:error, :invalid_url} end @@ -556,27 +803,37 @@ defmodule Parrhesia.API.Sync.Manager do defp normalize_auth_type("nip42"), do: {:ok, :nip42} defp normalize_auth_type(_type), do: {:error, :invalid_auth_type} - defp normalize_tls(tls, host) when is_map(tls) do + defp normalize_tls(tls, host, scheme) when is_map(tls) do with {:ok, mode} <- normalize_tls_mode(fetch_value(tls, :mode)), + :ok <- validate_tls_mode_against_scheme(mode, scheme), {:ok, hostname} <- normalize_hostname(fetch_value(tls, :hostname) || host), - {:ok, pins} <- normalize_tls_pins(fetch_value(tls, :pins)) do + {:ok, pins} <- normalize_tls_pins(mode, fetch_value(tls, :pins)) do {:ok, %{mode: mode, hostname: hostname, pins: pins}} end end - defp normalize_tls(_tls, _host), do: {:error, :invalid_tls} + defp normalize_tls(_tls, _host, _scheme), do: {:error, :invalid_tls} defp normalize_tls_mode(nil), do: {:ok, @default_tls_mode} defp normalize_tls_mode(:required), do: {:ok, :required} defp normalize_tls_mode("required"), do: {:ok, :required} + defp normalize_tls_mode(:disabled), do: {:ok, :disabled} + defp normalize_tls_mode("disabled"), do: {:ok, :disabled} defp normalize_tls_mode(_mode), do: {:error, :invalid_tls_mode} + defp validate_tls_mode_against_scheme(:required, "wss"), do: :ok + defp validate_tls_mode_against_scheme(:required, _scheme), do: {:error, :invalid_url} + defp validate_tls_mode_against_scheme(:disabled, _scheme), do: :ok + defp normalize_hostname(hostname) when is_binary(hostname) and hostname != "", do: {:ok, hostname} defp normalize_hostname(_hostname), do: {:error, :invalid_tls_hostname} - defp normalize_tls_pins(pins) when is_list(pins) and pins != [] do + defp normalize_tls_pins(:disabled, nil), do: {:ok, []} + defp normalize_tls_pins(:disabled, pins) when is_list(pins), do: {:ok, []} + + defp normalize_tls_pins(:required, pins) when is_list(pins) and pins != [] do Enum.reduce_while(pins, {:ok, []}, fn pin, {:ok, acc} -> case normalize_tls_pin(pin) do {:ok, normalized_pin} -> {:cont, {:ok, [normalized_pin | acc]}} @@ -589,7 +846,7 @@ defmodule Parrhesia.API.Sync.Manager do end end - defp normalize_tls_pins(_pins), do: {:error, :invalid_tls_pins} + defp normalize_tls_pins(:required, _pins), do: {:error, :invalid_tls_pins} defp normalize_tls_pin(pin) when is_map(pin) do with {:ok, type} <- normalize_tls_pin_type(fetch_value(pin, :type)), @@ -639,6 +896,13 @@ defmodule Parrhesia.API.Sync.Manager do end end + defp fetch_optional_integer(map, key) do + case fetch_value(map, key) do + value when is_integer(value) and value >= 0 -> value + _other -> nil + end + end + defp fetch_boolean(map, key) do case fetch_value(map, key) do value when is_boolean(value) -> value @@ -658,9 +922,13 @@ defmodule Parrhesia.API.Sync.Manager do end defp config_path do + config_value(:path) + end + + defp config_value(key, default \\ nil) do :parrhesia |> Application.get_env(:sync, []) - |> Keyword.get(:path) + |> Keyword.get(key, default) end defp now do diff --git a/lib/parrhesia/sync/relay_info_client.ex b/lib/parrhesia/sync/relay_info_client.ex new file mode 100644 index 0000000..4c1484e --- /dev/null +++ b/lib/parrhesia/sync/relay_info_client.ex @@ -0,0 +1,60 @@ +defmodule Parrhesia.Sync.RelayInfoClient do + @moduledoc false + + alias Parrhesia.Sync.TLS + + @spec verify_remote_identity(map(), keyword()) :: :ok | {:error, term()} + def verify_remote_identity(server, opts \\ []) do + request_fun = Keyword.get(opts, :request_fun, &default_request/2) + + with {:ok, response} <- request_fun.(relay_info_url(server.url), request_opts(server)), + {:ok, pubkey} <- extract_pubkey(response) do + if pubkey == server.auth_pubkey do + :ok + else + {:error, :remote_identity_mismatch} + end + end + end + + defp default_request(url, opts) do + case Req.get( + url: url, + headers: [{"accept", "application/nostr+json"}], + decode_body: false, + connect_options: opts + ) do + {:ok, response} -> {:ok, response} + {:error, reason} -> {:error, reason} + end + end + + defp extract_pubkey(%Req.Response{status: 200, body: body}) when is_binary(body) do + with {:ok, payload} <- JSON.decode(body), + pubkey when is_binary(pubkey) and pubkey != "" <- Map.get(payload, "pubkey") do + {:ok, String.downcase(pubkey)} + else + nil -> {:error, :missing_remote_identity} + {:error, reason} -> {:error, reason} + _other -> {:error, :missing_remote_identity} + end + end + + defp extract_pubkey(%Req.Response{status: status}), + do: {:error, {:relay_info_request_failed, status}} + + defp extract_pubkey(_response), do: {:error, :invalid_relay_info} + + defp request_opts(%{tls: %{mode: :disabled}}), do: [] + defp request_opts(%{tls: tls}), do: TLS.req_connect_options(tls) + + defp relay_info_url(relay_url) do + relay_url + |> URI.parse() + |> Map.update!(:scheme, fn + "wss" -> "https" + "ws" -> "http" + end) + |> URI.to_string() + end +end diff --git a/lib/parrhesia/sync/supervisor.ex b/lib/parrhesia/sync/supervisor.ex index 5dc83fd..265ef86 100644 --- a/lib/parrhesia/sync/supervisor.ex +++ b/lib/parrhesia/sync/supervisor.ex @@ -6,15 +6,38 @@ defmodule Parrhesia.Sync.Supervisor do use Supervisor def start_link(init_arg \\ []) do - Supervisor.start_link(__MODULE__, init_arg, name: __MODULE__) + name = Keyword.get(init_arg, :name, __MODULE__) + Supervisor.start_link(__MODULE__, init_arg, name: name) end @impl true - def init(_init_arg) do + def init(init_arg) do + worker_registry = Keyword.get(init_arg, :worker_registry, Parrhesia.Sync.WorkerRegistry) + worker_supervisor = Keyword.get(init_arg, :worker_supervisor, Parrhesia.Sync.WorkerSupervisor) + manager_name = Keyword.get(init_arg, :manager, Parrhesia.API.Sync.Manager) + children = [ - {Parrhesia.API.Sync.Manager, []} + {Registry, keys: :unique, name: worker_registry}, + {DynamicSupervisor, strategy: :one_for_one, name: worker_supervisor}, + {Parrhesia.API.Sync.Manager, + manager_opts(init_arg, manager_name, worker_registry, worker_supervisor)} ] Supervisor.init(children, strategy: :one_for_one) end + + defp manager_opts(init_arg, manager_name, worker_registry, worker_supervisor) do + [ + name: manager_name, + worker_registry: worker_registry, + worker_supervisor: worker_supervisor + ] ++ + Keyword.take(init_arg, [ + :path, + :start_workers?, + :transport_module, + :relay_info_opts, + :transport_opts + ]) + end end diff --git a/lib/parrhesia/sync/tls.ex b/lib/parrhesia/sync/tls.ex new file mode 100644 index 0000000..1f10d12 --- /dev/null +++ b/lib/parrhesia/sync/tls.ex @@ -0,0 +1,88 @@ +defmodule Parrhesia.Sync.TLS do + @moduledoc false + + require Record + + Record.defrecordp( + :otp_certificate, + Record.extract(:OTPCertificate, from_lib: "public_key/include/OTP-PUB-KEY.hrl") + ) + + Record.defrecordp( + :otp_tbs_certificate, + Record.extract(:OTPTBSCertificate, from_lib: "public_key/include/OTP-PUB-KEY.hrl") + ) + + @type tls_config :: %{ + mode: :required | :disabled, + hostname: String.t(), + pins: [%{type: :spki_sha256, value: String.t()}] + } + + @spec websocket_options(tls_config()) :: keyword() + def websocket_options(%{mode: :disabled}), do: [insecure: true] + + def websocket_options(%{mode: :required} = tls) do + [ + ssl_options: transport_opts(tls) + ] + end + + @spec req_connect_options(tls_config()) :: keyword() + def req_connect_options(%{mode: :disabled}), do: [] + + def req_connect_options(%{mode: :required} = tls) do + [ + transport_opts: transport_opts(tls) + ] + end + + def transport_opts(%{hostname: hostname, pins: pins}) do + [ + verify: :verify_peer, + cacerts: system_cacerts(), + server_name_indication: String.to_charlist(hostname), + customize_hostname_check: [ + match_fun: :public_key.pkix_verify_hostname_match_fun(:https) + ], + verify_fun: + {&verify_certificate/3, %{pins: MapSet.new(Enum.map(pins, & &1.value)), matched?: false}} + ] + end + + defp verify_certificate(_cert, :valid_peer, %{matched?: true} = state), do: {:valid, state} + defp verify_certificate(_cert, :valid_peer, _state), do: {:fail, :pin_mismatch} + + defp verify_certificate(_cert, {:bad_cert, reason}, _state), do: {:fail, reason} + + defp verify_certificate(cert, _event, state) when is_binary(cert) do + matched? = MapSet.member?(state.pins, spki_pin(cert)) + {:valid, %{state | matched?: state.matched? or matched?}} + rescue + _error -> {:fail, :invalid_certificate} + end + + defp verify_certificate(_cert, _event, state), do: {:valid, state} + + defp spki_pin(cert_der) do + cert = :public_key.pkix_decode_cert(cert_der, :otp) + + spki = + cert + |> otp_certificate(:tbsCertificate) + |> otp_tbs_certificate(:subjectPublicKeyInfo) + + spki + |> :public_key.der_encode(:SubjectPublicKeyInfo) + |> then(&:crypto.hash(:sha256, &1)) + |> Base.encode64() + end + + defp system_cacerts do + if function_exported?(:public_key, :cacerts_get, 0) do + :public_key.cacerts_get() + else + [] + end + end +end diff --git a/lib/parrhesia/sync/transport.ex b/lib/parrhesia/sync/transport.ex new file mode 100644 index 0000000..10b54af --- /dev/null +++ b/lib/parrhesia/sync/transport.ex @@ -0,0 +1,7 @@ +defmodule Parrhesia.Sync.Transport do + @moduledoc false + + @callback connect(pid(), map(), keyword()) :: {:ok, pid()} | {:error, term()} + @callback send_json(pid(), term()) :: :ok | {:error, term()} + @callback close(pid()) :: :ok +end diff --git a/lib/parrhesia/sync/transport/websockex_client.ex b/lib/parrhesia/sync/transport/websockex_client.ex new file mode 100644 index 0000000..5d091f7 --- /dev/null +++ b/lib/parrhesia/sync/transport/websockex_client.ex @@ -0,0 +1,74 @@ +defmodule Parrhesia.Sync.Transport.WebSockexClient do + @moduledoc false + + use WebSockex + + alias Parrhesia.Sync.TLS + + @behaviour Parrhesia.Sync.Transport + + @impl true + def connect(owner, server, opts \\ []) do + state = %{ + owner: owner, + server: server + } + + transport_opts = + server.tls + |> TLS.websocket_options() + |> Keyword.merge(Keyword.get(opts, :websocket_opts, [])) + |> Keyword.put(:handle_initial_conn_failure, true) + + WebSockex.start(server.url, __MODULE__, state, transport_opts) + end + + @impl true + def send_json(pid, payload) do + WebSockex.cast(pid, {:send_json, payload}) + end + + @impl true + def close(pid) do + WebSockex.cast(pid, :close) + :ok + end + + @impl true + def handle_connect(conn, state) do + send(state.owner, {:sync_transport, self(), :connected, %{resp_headers: conn.resp_headers}}) + {:ok, state} + end + + @impl true + def handle_frame({:text, payload}, state) do + message = + case JSON.decode(payload) do + {:ok, frame} -> frame + {:error, reason} -> {:decode_error, reason, payload} + end + + send(state.owner, {:sync_transport, self(), :frame, message}) + {:ok, state} + end + + def handle_frame(frame, state) do + send(state.owner, {:sync_transport, self(), :frame, frame}) + {:ok, state} + end + + @impl true + def handle_cast({:send_json, payload}, state) do + {:reply, {:text, JSON.encode!(payload)}, state} + end + + def handle_cast(:close, state) do + {:close, state} + end + + @impl true + def handle_disconnect(status, state) do + send(state.owner, {:sync_transport, self(), :disconnected, status}) + {:ok, state} + end +end diff --git a/lib/parrhesia/sync/worker.ex b/lib/parrhesia/sync/worker.ex new file mode 100644 index 0000000..56d3f54 --- /dev/null +++ b/lib/parrhesia/sync/worker.ex @@ -0,0 +1,367 @@ +defmodule Parrhesia.Sync.Worker do + @moduledoc false + + use GenServer + + alias Parrhesia.API.Events + alias Parrhesia.API.Identity + alias Parrhesia.API.RequestContext + alias Parrhesia.API.Sync.Manager + alias Parrhesia.Sync.RelayInfoClient + alias Parrhesia.Sync.Transport.WebSockexClient + + @initial_backoff_ms 1_000 + @max_backoff_ms 30_000 + @auth_kind 22_242 + + defstruct server: nil, + manager: nil, + transport_module: WebSockexClient, + transport_pid: nil, + phase: :idle, + current_subscription_id: nil, + backoff_ms: @initial_backoff_ms, + authenticated?: false, + auth_event_id: nil, + resubscribe_after_auth?: false, + cursor_created_at: nil, + cursor_event_id: nil, + relay_info_opts: [], + transport_opts: [] + + @type t :: %__MODULE__{} + + def child_spec(opts) do + server = Keyword.fetch!(opts, :server) + + %{ + id: {:sync_worker, server.id}, + start: {__MODULE__, :start_link, [opts]}, + restart: :transient + } + end + + def start_link(opts) do + name = Keyword.get(opts, :name) + GenServer.start_link(__MODULE__, opts, name: name) + end + + def sync_now(worker), do: GenServer.cast(worker, :sync_now) + def stop(worker), do: GenServer.stop(worker, :normal) + + @impl true + def init(opts) do + server = Keyword.fetch!(opts, :server) + runtime = Keyword.get(opts, :runtime, %{}) + + state = %__MODULE__{ + server: server, + manager: Keyword.fetch!(opts, :manager), + transport_module: Keyword.get(opts, :transport_module, WebSockexClient), + cursor_created_at: Map.get(runtime, :cursor_created_at), + cursor_event_id: Map.get(runtime, :cursor_event_id), + relay_info_opts: Keyword.get(opts, :relay_info_opts, []), + transport_opts: Keyword.get(opts, :transport_opts, []) + } + + send(self(), :connect) + {:ok, state} + end + + @impl true + def handle_cast(:sync_now, state) do + Manager.runtime_event(state.manager, state.server.id, :subscription_restart) + + next_state = + state + |> close_subscription() + |> issue_subscription() + + {:noreply, next_state} + end + + @impl true + def handle_info(:connect, %__MODULE__{transport_pid: nil} = state) do + case RelayInfoClient.verify_remote_identity(state.server, state.relay_info_opts) do + :ok -> + connect_transport(state) + + {:error, reason} -> + Manager.runtime_event(state.manager, state.server.id, :disconnected, %{reason: reason}) + {:noreply, schedule_reconnect(state)} + end + end + + def handle_info(:connect, state), do: {:noreply, state} + + def handle_info({:sync_transport, transport_pid, :connected, _info}, state) do + Manager.runtime_event(state.manager, state.server.id, :connected, %{}) + + next_state = + state + |> Map.put(:transport_pid, transport_pid) + |> Map.put(:backoff_ms, @initial_backoff_ms) + |> Map.put(:authenticated?, false) + |> Map.put(:auth_event_id, nil) + |> Map.put(:resubscribe_after_auth?, false) + |> issue_subscription() + + {:noreply, next_state} + end + + def handle_info({:sync_transport, _transport_pid, :frame, frame}, state) do + {:noreply, handle_transport_frame(state, frame)} + end + + def handle_info({:sync_transport, _transport_pid, :disconnected, status}, state) do + Manager.runtime_event(state.manager, state.server.id, :disconnected, %{reason: status.reason}) + + next_state = + state + |> Map.put(:transport_pid, nil) + |> Map.put(:phase, :idle) + |> Map.put(:authenticated?, false) + |> Map.put(:auth_event_id, nil) + |> Map.put(:resubscribe_after_auth?, false) + |> Map.put(:current_subscription_id, nil) + |> schedule_reconnect() + + {:noreply, next_state} + end + + def handle_info(_message, state), do: {:noreply, state} + + defp connect_transport(state) do + case state.transport_module.connect(self(), state.server, state.transport_opts) do + {:ok, transport_pid} -> + {:noreply, %{state | transport_pid: transport_pid, phase: :connecting}} + + {:error, reason} -> + Manager.runtime_event(state.manager, state.server.id, :disconnected, %{reason: reason}) + {:noreply, schedule_reconnect(state)} + end + end + + defp handle_transport_frame(state, ["AUTH", challenge]) when is_binary(challenge) do + case send_auth_event(state, challenge) do + {:ok, auth_event_id} -> + %{state | auth_event_id: auth_event_id, phase: :authenticating} + + {:error, reason} -> + Manager.runtime_event(state.manager, state.server.id, :error, %{reason: reason}) + state + end + end + + defp handle_transport_frame(state, ["OK", event_id, true, _message]) + when event_id == state.auth_event_id do + next_state = %{state | authenticated?: true, auth_event_id: nil} + + if next_state.resubscribe_after_auth? do + next_state + |> Map.put(:resubscribe_after_auth?, false) + |> issue_subscription() + else + next_state + end + end + + defp handle_transport_frame(state, ["OK", event_id, false, message]) + when event_id == state.auth_event_id do + Manager.runtime_event(state.manager, state.server.id, :error, %{reason: message}) + schedule_reconnect(%{state | auth_event_id: nil, authenticated?: false}) + end + + defp handle_transport_frame(state, ["EVENT", subscription_id, event]) + when subscription_id == state.current_subscription_id and is_map(event) do + handle_remote_event(state, event) + end + + defp handle_transport_frame(state, ["EOSE", subscription_id]) + when subscription_id == state.current_subscription_id do + Manager.runtime_event(state.manager, state.server.id, :sync_completed, %{}) + %{state | phase: :streaming} + end + + defp handle_transport_frame(state, ["CLOSED", subscription_id, message]) + when subscription_id == state.current_subscription_id do + auth_required? = is_binary(message) and String.contains?(String.downcase(message), "auth") + + next_state = + state + |> Map.put(:current_subscription_id, nil) + |> Map.put(:phase, :idle) + + if auth_required? and not state.authenticated? do + %{next_state | resubscribe_after_auth?: true} + else + Manager.runtime_event(state.manager, state.server.id, :error, %{reason: message}) + schedule_reconnect(next_state) + end + end + + defp handle_transport_frame(state, {:decode_error, reason, _payload}) do + Manager.runtime_event(state.manager, state.server.id, :error, %{reason: reason}) + state + end + + defp handle_transport_frame(state, _frame), do: state + + defp issue_subscription(%__MODULE__{transport_pid: nil} = state), do: state + + defp issue_subscription(state) do + subscription_id = subscription_id(state.server.id) + filters = sync_filters(state) + + :ok = + state.transport_module.send_json(state.transport_pid, ["REQ", subscription_id | filters]) + + Manager.runtime_event(state.manager, state.server.id, :sync_started, %{}) + + %{ + state + | current_subscription_id: subscription_id, + phase: :catchup + } + end + + defp close_subscription(%__MODULE__{transport_pid: nil} = state), do: state + defp close_subscription(%__MODULE__{current_subscription_id: nil} = state), do: state + + defp close_subscription(state) do + :ok = + state.transport_module.send_json(state.transport_pid, [ + "CLOSE", + state.current_subscription_id + ]) + + %{state | current_subscription_id: nil} + end + + defp send_auth_event(state, challenge) do + event = %{ + "created_at" => System.system_time(:second), + "kind" => @auth_kind, + "tags" => [["challenge", challenge], ["relay", state.server.url]], + "content" => "" + } + + with {:ok, signed_event} <- Identity.sign_event(event) do + :ok = state.transport_module.send_json(state.transport_pid, ["AUTH", signed_event]) + {:ok, signed_event["id"]} + end + end + + defp handle_remote_event(state, event) do + context = request_context(state) + + case Events.publish(event, context: context) do + {:ok, %{accepted: true}} -> + Manager.runtime_event(state.manager, state.server.id, :event_result, %{ + result: :accepted, + event: event + }) + + advance_cursor(state, event) + + {:ok, %{accepted: false, reason: :duplicate_event}} -> + Manager.runtime_event(state.manager, state.server.id, :event_result, %{ + result: :duplicate, + event: event + }) + + advance_cursor(state, event) + + {:ok, %{accepted: false, reason: reason}} -> + Manager.runtime_event(state.manager, state.server.id, :event_result, %{ + result: :rejected, + event: event, + reason: reason + }) + + state + + {:error, reason} -> + Manager.runtime_event(state.manager, state.server.id, :event_result, %{ + result: :rejected, + event: event, + reason: reason + }) + + state + end + end + + defp request_context(state) do + %RequestContext{ + authenticated_pubkeys: MapSet.new([state.server.auth_pubkey]), + caller: :sync, + subscription_id: state.current_subscription_id, + peer_id: state.server.id, + metadata: %{ + sync_server_id: state.server.id, + remote_url: state.server.url + } + } + end + + defp advance_cursor(state, event) do + created_at = Map.get(event, "created_at") + event_id = Map.get(event, "id") + + if newer_cursor?(state.cursor_created_at, state.cursor_event_id, created_at, event_id) do + Manager.runtime_event(state.manager, state.server.id, :cursor_advanced, %{ + created_at: created_at, + event_id: event_id + }) + + %{state | cursor_created_at: created_at, cursor_event_id: event_id} + else + state + end + end + + defp newer_cursor?(nil, _cursor_event_id, created_at, event_id), + do: is_integer(created_at) and is_binary(event_id) + + defp newer_cursor?(cursor_created_at, cursor_event_id, created_at, event_id) do + cond do + not is_integer(created_at) or not is_binary(event_id) -> + false + + created_at > cursor_created_at -> + true + + created_at == cursor_created_at and is_binary(cursor_event_id) and + event_id > cursor_event_id -> + true + + true -> + false + end + end + + defp sync_filters(state) do + Enum.map(state.server.filters, fn filter -> + case since_value(state, filter) do + nil -> filter + since -> Map.put(filter, "since", since) + end + end) + end + + defp since_value(%__MODULE__{cursor_created_at: nil}, _filter), do: nil + + defp since_value(state, _filter) do + max(state.cursor_created_at - state.server.overlap_window_seconds, 0) + end + + defp schedule_reconnect(state) do + Process.send_after(self(), :connect, state.backoff_ms) + %{state | backoff_ms: min(state.backoff_ms * 2, @max_backoff_ms)} + end + + defp subscription_id(server_id) do + "sync-#{server_id}-#{System.unique_integer([:positive, :monotonic])}" + end +end diff --git a/lib/parrhesia/test_support/sync_fake_relay/plug.ex b/lib/parrhesia/test_support/sync_fake_relay/plug.ex new file mode 100644 index 0000000..f70c78a --- /dev/null +++ b/lib/parrhesia/test_support/sync_fake_relay/plug.ex @@ -0,0 +1,49 @@ +defmodule Parrhesia.TestSupport.SyncFakeRelay.Plug do + @moduledoc false + + import Plug.Conn + + alias Parrhesia.TestSupport.SyncFakeRelay.Server + + def init(opts), do: opts + + def call(conn, opts) do + server = Keyword.fetch!(opts, :server) + + cond do + conn.request_path == "/relay" and wants_nip11?(conn) -> + send_json(conn, 200, Server.document(server)) + + conn.request_path == "/relay" -> + conn + |> WebSockAdapter.upgrade( + Parrhesia.TestSupport.SyncFakeRelay.Socket, + %{server: server, relay_url: relay_url(conn)}, + timeout: 60_000 + ) + |> halt() + + true -> + send_resp(conn, 404, "not found") + end + end + + defp wants_nip11?(conn) do + conn + |> get_req_header("accept") + |> Enum.any?(&String.contains?(&1, "application/nostr+json")) + end + + defp send_json(conn, status, body) do + encoded = JSON.encode!(body) + + conn + |> put_resp_content_type("application/nostr+json") + |> send_resp(status, encoded) + end + + defp relay_url(conn) do + scheme = if conn.scheme == :https, do: "wss", else: "ws" + "#{scheme}://#{conn.host}:#{conn.port}#{conn.request_path}" + end +end diff --git a/lib/parrhesia/test_support/sync_fake_relay/server.ex b/lib/parrhesia/test_support/sync_fake_relay/server.ex new file mode 100644 index 0000000..f9c45af --- /dev/null +++ b/lib/parrhesia/test_support/sync_fake_relay/server.ex @@ -0,0 +1,65 @@ +defmodule Parrhesia.TestSupport.SyncFakeRelay.Server do + @moduledoc false + + use Agent + + def start_link(opts) do + name = Keyword.fetch!(opts, :name) + + initial_state = %{ + pubkey: Keyword.fetch!(opts, :pubkey), + expected_client_pubkey: Keyword.fetch!(opts, :expected_client_pubkey), + initial_events: Keyword.get(opts, :initial_events, []), + subscribers: %{} + } + + Agent.start_link(fn -> initial_state end, name: name) + end + + def document(server) do + Agent.get(server, fn state -> + %{ + "name" => "Sync Fake Relay", + "description" => "test relay", + "pubkey" => state.pubkey, + "supported_nips" => [1, 11, 42] + } + end) + end + + def initial_events(server) do + Agent.get(server, & &1.initial_events) + end + + def expected_client_pubkey(server) do + Agent.get(server, & &1.expected_client_pubkey) + end + + def register_subscription(server, pid, subscription_id) do + Agent.update(server, fn state -> + put_in(state, [:subscribers, {pid, subscription_id}], true) + end) + end + + def unregister_subscription(server, pid, subscription_id) do + Agent.update(server, fn state -> + update_in(state.subscribers, &Map.delete(&1, {pid, subscription_id})) + end) + end + + def publish_live_event(server, event) do + subscribers = + Agent.get_and_update(server, fn state -> + { + Map.keys(state.subscribers), + %{state | initial_events: state.initial_events ++ [event]} + } + end) + + Enum.each(subscribers, fn {pid, subscription_id} -> + send(pid, {:sync_fake_relay_event, subscription_id, event}) + end) + + :ok + end +end diff --git a/lib/parrhesia/test_support/sync_fake_relay/socket.ex b/lib/parrhesia/test_support/sync_fake_relay/socket.ex new file mode 100644 index 0000000..072c065 --- /dev/null +++ b/lib/parrhesia/test_support/sync_fake_relay/socket.ex @@ -0,0 +1,118 @@ +defmodule Parrhesia.TestSupport.SyncFakeRelay.Socket do + @moduledoc false + + @behaviour WebSock + + alias Parrhesia.TestSupport.SyncFakeRelay.Server + + def init(state), do: {:ok, Map.put(state, :authenticated?, false)} + + def handle_in({payload, [opcode: :text]}, state) do + case JSON.decode(payload) do + {:ok, ["REQ", subscription_id | _filters]} -> + maybe_authorize_req(state, subscription_id) + + {:ok, ["AUTH", auth_event]} when is_map(auth_event) -> + handle_auth(auth_event, state) + + {:ok, ["CLOSE", subscription_id]} -> + Server.unregister_subscription(state.server, self(), subscription_id) + + {:push, {:text, JSON.encode!(["CLOSED", subscription_id, "error: subscription closed"])}, + state} + + _other -> + {:ok, state} + end + end + + def handle_in(_frame, state), do: {:ok, state} + + def handle_info({:sync_fake_relay_event, subscription_id, event}, state) do + {:push, {:text, JSON.encode!(["EVENT", subscription_id, event])}, state} + end + + def handle_info(_message, state), do: {:ok, state} + + def terminate(_reason, state) do + Enum.each(Map.get(state, :subscriptions, []), fn subscription_id -> + Server.unregister_subscription(state.server, self(), subscription_id) + end) + + :ok + end + + defp maybe_authorize_req(%{authenticated?: true} = state, subscription_id) do + Server.register_subscription(state.server, self(), subscription_id) + + frames = + Server.initial_events(state.server) + |> Enum.map(fn event -> {:text, JSON.encode!(["EVENT", subscription_id, event])} end) + |> Kernel.++([{:text, JSON.encode!(["EOSE", subscription_id])}]) + + next_state = + state + |> Map.update(:subscriptions, [subscription_id], &[subscription_id | &1]) + + {:push, frames, next_state} + end + + defp maybe_authorize_req(state, subscription_id) do + challenge = Base.encode16(:crypto.strong_rand_bytes(12), case: :lower) + + next_state = + state + |> Map.put(:challenge, challenge) + |> Map.put(:pending_subscription_id, subscription_id) + + {:push, + [ + {:text, JSON.encode!(["AUTH", challenge])}, + {:text, + JSON.encode!(["CLOSED", subscription_id, "auth-required: sync access requires AUTH"])} + ], next_state} + end + + defp handle_auth(auth_event, state) do + challenge_ok? = has_tag?(auth_event, "challenge", state.challenge) + relay_ok? = has_tag?(auth_event, "relay", state.relay_url) + pubkey_ok? = Map.get(auth_event, "pubkey") == Server.expected_client_pubkey(state.server) + + if challenge_ok? and relay_ok? and pubkey_ok? do + accepted_state = %{state | authenticated?: true} + ok_frame = ["OK", Map.get(auth_event, "id"), true, "ok: auth accepted"] + + if subscription_id = Map.get(accepted_state, :pending_subscription_id) do + next_state = + accepted_state + |> Map.delete(:pending_subscription_id) + |> Map.update(:subscriptions, [subscription_id], &[subscription_id | &1]) + + Server.register_subscription(state.server, self(), subscription_id) + + {:push, + [{:text, JSON.encode!(ok_frame)} | auth_success_frames(accepted_state, subscription_id)], + next_state} + else + {:push, {:text, JSON.encode!(ok_frame)}, accepted_state} + end + else + {:push, + {:text, JSON.encode!(["OK", Map.get(auth_event, "id"), false, "invalid: auth rejected"])}, + state} + end + end + + defp auth_success_frames(state, subscription_id) do + Server.initial_events(state.server) + |> Enum.map(fn event -> {:text, JSON.encode!(["EVENT", subscription_id, event])} end) + |> Kernel.++([{:text, JSON.encode!(["EOSE", subscription_id])}]) + end + + defp has_tag?(event, name, expected_value) do + Enum.any?(Map.get(event, "tags", []), fn + [^name, ^expected_value | _rest] -> true + _other -> false + end) + end +end diff --git a/lib/parrhesia/web/relay_info.ex b/lib/parrhesia/web/relay_info.ex index 3f38938..79d2f82 100644 --- a/lib/parrhesia/web/relay_info.ex +++ b/lib/parrhesia/web/relay_info.ex @@ -3,12 +3,14 @@ defmodule Parrhesia.Web.RelayInfo do NIP-11 relay information document. """ + alias Parrhesia.API.Identity + @spec document() :: map() def document do %{ "name" => "Parrhesia", "description" => "Nostr/Marmot relay", - "pubkey" => nil, + "pubkey" => relay_pubkey(), "supported_nips" => supported_nips(), "software" => "https://git.teralink.net/self/parrhesia", "version" => Application.spec(:parrhesia, :vsn) |> to_string(), @@ -44,4 +46,11 @@ defmodule Parrhesia.Web.RelayInfo do |> Application.get_env(:features, []) |> Keyword.get(:nip_77_negentropy, true) end + + defp relay_pubkey do + case Identity.get() do + {:ok, %{pubkey: pubkey}} -> pubkey + {:error, _reason} -> nil + end + end end diff --git a/mix.exs b/mix.exs index 1533ded..cdb92c3 100644 --- a/mix.exs +++ b/mix.exs @@ -44,7 +44,7 @@ defmodule Parrhesia.MixProject do # Test tooling {:stream_data, "~> 1.0", only: :test}, - {:websockex, "~> 0.4", only: :test}, + {:websockex, "~> 0.4"}, # Project tooling {:credo, "~> 1.7", only: [:dev, :test], runtime: false}, diff --git a/test/parrhesia/api/sync_test.exs b/test/parrhesia/api/sync_test.exs index c6b3978..ccde79f 100644 --- a/test/parrhesia/api/sync_test.exs +++ b/test/parrhesia/api/sync_test.exs @@ -128,7 +128,7 @@ defmodule Parrhesia.API.SyncTest do defp start_sync_manager do path = unique_sync_path() manager = {:global, {:sync_manager, System.unique_integer([:positive, :monotonic])}} - pid = start_supervised!({Manager, name: manager, path: path}) + pid = start_supervised!({Manager, name: manager, path: path, start_workers?: false}) {manager, path, pid} end diff --git a/test/parrhesia/sync/worker_test.exs b/test/parrhesia/sync/worker_test.exs new file mode 100644 index 0000000..022f7c8 --- /dev/null +++ b/test/parrhesia/sync/worker_test.exs @@ -0,0 +1,260 @@ +defmodule Parrhesia.Sync.WorkerTest do + use ExUnit.Case, async: false + + alias Ecto.Adapters.SQL.Sandbox + alias Parrhesia.API.ACL + alias Parrhesia.API.Events + alias Parrhesia.API.Identity + alias Parrhesia.API.RequestContext + alias Parrhesia.API.Sync + alias Parrhesia.Protocol.EventValidator + alias Parrhesia.Repo + alias Parrhesia.Sync.Supervisor + alias Parrhesia.TestSupport.SyncFakeRelay.Plug + alias Parrhesia.TestSupport.SyncFakeRelay.Server + + setup do + :ok = Sandbox.checkout(Repo) + Sandbox.mode(Repo, {:shared, self()}) + + on_exit(fn -> + Sandbox.mode(Repo, :manual) + end) + + :ok + end + + test "req_stream worker verifies remote identity, authenticates, syncs catch-up, streams live, and sync_now reruns catch-up" do + {:ok, %{pubkey: local_pubkey}} = Identity.ensure() + remote_pubkey = String.duplicate("b", 64) + initial_event = valid_sync_event("initial-sync", 1_762_000_000) + live_event = valid_sync_event("live-sync", 1_762_000_100) + + relay_server = + start_supervised!( + {Server, + name: unique_name("FakeRelayServer"), + pubkey: remote_pubkey, + expected_client_pubkey: local_pubkey, + initial_events: [initial_event]} + ) + + port = free_port() + + start_supervised!( + {Bandit, plug: {Plug, server: relay_server}, ip: {127, 0, 0, 1}, port: port} + ) + + relay_url = "ws://127.0.0.1:#{port}/relay" + wait_for_relay(relay_url, remote_pubkey) + + assert :ok = + ACL.grant(%{ + principal_type: :pubkey, + principal: remote_pubkey, + capability: :sync_write, + match: %{"kinds" => [5000], "#r" => ["tribes.accounts.user"]} + }) + + assert :ok = + ACL.grant(%{ + principal_type: :pubkey, + principal: remote_pubkey, + capability: :sync_read, + match: %{"kinds" => [5000], "#r" => ["tribes.accounts.user"]} + }) + + {manager_name, _supervisor_name} = start_sync_runtime() + + assert {:ok, _server} = + Sync.put_server( + %{ + "id" => "fake-relay", + "url" => relay_url, + "enabled?" => true, + "auth_pubkey" => remote_pubkey, + "filters" => [%{"kinds" => [5000], "#r" => ["tribes.accounts.user"]}], + "tls" => %{"mode" => "disabled", "pins" => []} + }, + manager: manager_name + ) + + assert_event_synced(initial_event, remote_pubkey) + + assert :ok = Server.publish_live_event(relay_server, live_event) + assert_event_synced(live_event, remote_pubkey) + + assert {:ok, stats_before_sync_now} = Sync.sync_stats(manager: manager_name) + assert stats_before_sync_now["events_accepted"] >= 2 + + assert :ok = Sync.sync_now("fake-relay", manager: manager_name) + + assert_eventually(fn -> + case Sync.sync_stats(manager: manager_name) do + {:ok, stats} -> stats["query_runs"] >= 2 and stats["subscription_restarts"] >= 1 + _other -> false + end + end) + + assert {:ok, health} = Sync.sync_health(manager: manager_name) + assert health["status"] == "ok" + assert health["servers_connected"] == 1 + end + + test "worker marks remote identity mismatches as failing health" do + {:ok, %{pubkey: local_pubkey}} = Identity.ensure() + + relay_server = + start_supervised!( + {Server, + name: unique_name("MismatchRelayServer"), + pubkey: String.duplicate("d", 64), + expected_client_pubkey: local_pubkey, + initial_events: []} + ) + + port = free_port() + + start_supervised!( + {Bandit, plug: {Plug, server: relay_server}, ip: {127, 0, 0, 1}, port: port} + ) + + relay_url = "ws://127.0.0.1:#{port}/relay" + wait_for_relay(relay_url, String.duplicate("d", 64)) + + {manager_name, _supervisor_name} = start_sync_runtime() + + assert {:ok, _server} = + Sync.put_server( + %{ + "id" => "mismatch-relay", + "url" => relay_url, + "enabled?" => true, + "auth_pubkey" => String.duplicate("e", 64), + "filters" => [%{"kinds" => [5000], "#r" => ["tribes.accounts.user"]}], + "tls" => %{"mode" => "disabled", "pins" => []} + }, + manager: manager_name + ) + + assert_eventually(fn -> + case Sync.sync_health(manager: manager_name) do + {:ok, %{"status" => "degraded", "servers_failing" => servers}} -> + Enum.any?( + servers, + &(&1["id"] == "mismatch-relay" and &1["reason"] == ":remote_identity_mismatch") + ) + + _other -> + false + end + end) + end + + defp start_sync_runtime do + manager_name = unique_name("SyncManager") + worker_registry = unique_name("SyncRegistry") + worker_supervisor = unique_name("SyncWorkerSupervisor") + supervisor_name = unique_name("SyncSupervisor") + + start_supervised!( + {Supervisor, + name: supervisor_name, + manager: manager_name, + worker_registry: worker_registry, + worker_supervisor: worker_supervisor, + path: unique_sync_path(), + start_workers?: true} + ) + + {manager_name, supervisor_name} + end + + defp assert_event_synced(event, remote_pubkey) do + assert_eventually(fn -> + case Events.query( + [%{"ids" => [event["id"]]}], + context: %RequestContext{ + authenticated_pubkeys: MapSet.new([remote_pubkey]) + } + ) do + {:ok, [stored_event]} -> stored_event["id"] == event["id"] + _other -> false + end + end) + end + + defp wait_for_relay(relay_url, expected_pubkey) do + info_url = + relay_url + |> String.replace_prefix("ws://", "http://") + |> String.replace_prefix("wss://", "https://") + + assert_eventually(fn -> + with {:ok, %{status: 200, body: body}} <- + Req.get( + url: info_url, + headers: [{"accept", "application/nostr+json"}], + decode_body: false + ), + {:ok, %{"pubkey" => ^expected_pubkey}} <- JSON.decode(body) do + true + else + _other -> false + end + end) + end + + defp valid_sync_event(content, created_at) do + base_event = %{ + "pubkey" => String.duplicate("f", 64), + "created_at" => created_at, + "kind" => 5000, + "tags" => [["r", "tribes.accounts.user"]], + "content" => content, + "sig" => String.duplicate("0", 128) + } + + Map.put(base_event, "id", EventValidator.compute_id(base_event)) + end + + defp free_port do + {:ok, socket} = :gen_tcp.listen(0, [:binary, active: false, packet: :raw, reuseaddr: true]) + {:ok, port} = :inet.port(socket) + :ok = :gen_tcp.close(socket) + port + end + + defp unique_name(prefix) do + :"#{prefix}_#{System.unique_integer([:positive, :monotonic])}" + end + + defp unique_sync_path do + path = + Path.join( + System.tmp_dir!(), + "parrhesia_sync_runtime_#{System.unique_integer([:positive, :monotonic])}.json" + ) + + on_exit(fn -> + _ = File.rm(path) + end) + + path + end + + defp assert_eventually(fun, attempts \\ 50) + + defp assert_eventually(_fun, 0), do: flunk("condition was not met in time") + + defp assert_eventually(fun, attempts) do + if fun.() do + :ok + else + receive do + after + 50 -> assert_eventually(fun, attempts - 1) + end + end + end +end