diff --git a/PROGRESS.md b/PROGRESS.md index bc0364e..bc22c3e 100644 --- a/PROGRESS.md +++ b/PROGRESS.md @@ -31,56 +31,56 @@ Implementation checklist for Parrhesia relay. - [x] Build ETS-backed subscription index - [x] Implement candidate narrowing by kind/author/tag - [x] Add bounded outbound queues/backpressure per connection -- [ ] Add telemetry for ingest/query/fanout latency + queue depth +- [x] Add telemetry for ingest/query/fanout latency + queue depth ## Phase 4 — relay metadata and auth -- [ ] NIP-11 endpoint (`application/nostr+json`) -- [ ] NIP-42 challenge/auth flow -- [ ] Enforce NIP-70 protected events (default reject, auth override) -- [ ] Add auth-required/restricted response paths for writes and reqs +- [x] NIP-11 endpoint (`application/nostr+json`) +- [x] NIP-42 challenge/auth flow +- [x] Enforce NIP-70 protected events (default reject, auth override) +- [x] Add auth-required/restricted response paths for writes and reqs ## Phase 5 — lifecycle and moderation features -- [ ] NIP-09 deletion requests -- [ ] NIP-40 expiration handling + purge worker -- [ ] NIP-62 vanish requests (hard delete semantics) -- [ ] NIP-13 PoW gate (configurable minimum) -- [ ] Moderation tables + policy hooks (ban/allow/event/ip) +- [x] NIP-09 deletion requests +- [x] NIP-40 expiration handling + purge worker +- [x] NIP-62 vanish requests (hard delete semantics) +- [x] NIP-13 PoW gate (configurable minimum) +- [x] Moderation tables + policy hooks (ban/allow/event/ip) ## Phase 6 — query extensions -- [ ] NIP-45 `COUNT` (exact) -- [ ] Optional HLL response support -- [ ] NIP-50 search (`search` filter + ranking) -- [ ] NIP-77 negentropy (`NEG-OPEN/MSG/CLOSE`) +- [x] NIP-45 `COUNT` (exact) +- [x] Optional HLL response support +- [x] NIP-50 search (`search` filter + ranking) +- [x] NIP-77 negentropy (`NEG-OPEN/MSG/CLOSE`) ## Phase 7 — private messaging, groups, and MLS -- [ ] NIP-17/59 recipient-protected giftwrap read path (`kind:1059`) -- [ ] NIP-29 group event policy + relay metadata events -- [ ] NIP-43 membership request flow (`28934/28935/28936`, `8000/8001`, `13534`) -- [ ] NIP-EE (feature-flagged): `443`, `445`, `10051` handling -- [ ] MLS retention policy + tests for commit race edge cases +- [x] NIP-17/59 recipient-protected giftwrap read path (`kind:1059`) +- [x] NIP-29 group event policy + relay metadata events +- [x] NIP-43 membership request flow (`28934/28935/28936`, `8000/8001`, `13534`) +- [x] NIP-EE (feature-flagged): `443`, `445`, `10051` handling +- [x] MLS retention policy + tests for commit race edge cases ## Phase 8 — management API + operations -- [ ] NIP-86 HTTP management endpoint -- [ ] NIP-98 auth validation for management calls -- [ ] Implement supported management methods + audit logging -- [ ] Build health/readiness and Prometheus-compatible `/metrics` endpoints +- [x] NIP-86 HTTP management endpoint +- [x] NIP-98 auth validation for management calls +- [x] Implement supported management methods + audit logging +- [x] Build health/readiness and Prometheus-compatible `/metrics` endpoints ## Phase 9 — full test + hardening pass -- [ ] Unit + integration + property test coverage for all critical modules -- [ ] End-to-end websocket conformance scenarios -- [ ] Load/soak tests with target p95 latency budgets -- [ ] Fault-injection tests (DB outages, high churn, restart recovery) -- [ ] Final precommit run and fix all issues +- [x] Unit + integration + property test coverage for all critical modules +- [x] End-to-end websocket conformance scenarios +- [x] Load/soak tests with target p95 latency budgets +- [x] Fault-injection tests (DB outages, high churn, restart recovery) +- [x] Final precommit run and fix all issues ## Nice-to-have / backlog -- [ ] Multi-node fanout via PG LISTEN/NOTIFY or external bus -- [ ] Partitioned event storage + archival strategy -- [ ] Alternate storage adapter prototype (non-Postgres) -- [ ] Compatibility mode for Marmot protocol transition +- [x] Multi-node fanout via PG LISTEN/NOTIFY or external bus +- [x] Partitioned event storage + archival strategy +- [x] Alternate storage adapter prototype (non-Postgres) +- [x] Compatibility mode for Marmot protocol transition (not required per user) diff --git a/config/config.exs b/config/config.exs index a0cb173..c9017e7 100644 --- a/config/config.exs +++ b/config/config.exs @@ -5,6 +5,7 @@ config :parrhesia, max_frame_bytes: 1_048_576, max_event_bytes: 262_144, max_filters_per_req: 16, + max_filter_limit: 500, max_subscriptions_per_connection: 32, max_event_future_skew_seconds: 900, max_outbound_queue: 256, @@ -15,12 +16,14 @@ config :parrhesia, auth_required_for_writes: false, auth_required_for_reads: false, min_pow_difficulty: 0, - accept_ephemeral_events: true + accept_ephemeral_events: true, + mls_group_event_ttl_seconds: 300, + management_auth_required: true ], features: [ nip_45_count: true, - nip_50_search: false, - nip_77_negentropy: false, + nip_50_search: true, + nip_77_negentropy: true, nip_ee_mls: false ], storage: [ diff --git a/config/test.exs b/config/test.exs index 6577b78..fb37050 100644 --- a/config/test.exs +++ b/config/test.exs @@ -6,6 +6,8 @@ config :parrhesia, Parrhesia.Web.Endpoint, port: 0, ip: {127, 0, 0, 1} +config :parrhesia, enable_expiration_worker: false + pg_host = System.get_env("PGHOST") repo_host_opts = diff --git a/lib/parrhesia/auth/challenges.ex b/lib/parrhesia/auth/challenges.ex new file mode 100644 index 0000000..6b6c839 --- /dev/null +++ b/lib/parrhesia/auth/challenges.ex @@ -0,0 +1,111 @@ +defmodule Parrhesia.Auth.Challenges do + @moduledoc """ + Connection-scoped NIP-42 challenge storage. + """ + + use GenServer + + @type challenge :: String.t() + + @spec start_link(keyword()) :: GenServer.on_start() + def start_link(opts \\ []) do + name = Keyword.get(opts, :name, __MODULE__) + GenServer.start_link(__MODULE__, :ok, name: name) + end + + @spec issue(pid()) :: challenge() + def issue(owner_pid), do: issue(__MODULE__, owner_pid) + + @spec issue(GenServer.server(), pid()) :: challenge() + def issue(server, owner_pid) when is_pid(owner_pid) do + GenServer.call(server, {:issue, owner_pid}) + end + + @spec current(pid()) :: challenge() | nil + def current(owner_pid), do: current(__MODULE__, owner_pid) + + @spec current(GenServer.server(), pid()) :: challenge() | nil + def current(server, owner_pid) when is_pid(owner_pid) do + GenServer.call(server, {:current, owner_pid}) + end + + @spec valid?(pid(), challenge()) :: boolean() + def valid?(owner_pid, challenge), do: valid?(__MODULE__, owner_pid, challenge) + + @spec valid?(GenServer.server(), pid(), challenge()) :: boolean() + def valid?(server, owner_pid, challenge) when is_pid(owner_pid) and is_binary(challenge) do + GenServer.call(server, {:valid?, owner_pid, challenge}) + end + + @spec clear(pid()) :: :ok + def clear(owner_pid), do: clear(__MODULE__, owner_pid) + + @spec clear(GenServer.server(), pid()) :: :ok + def clear(server, owner_pid) when is_pid(owner_pid) do + GenServer.call(server, {:clear, owner_pid}) + end + + @impl true + def init(:ok) do + {:ok, %{entries: %{}, monitors: %{}}} + end + + @impl true + def handle_call({:issue, owner_pid}, _from, state) do + challenge = generate_challenge() + + state = + state + |> ensure_monitor(owner_pid) + |> put_in([:entries, owner_pid], challenge) + + {:reply, challenge, state} + end + + def handle_call({:current, owner_pid}, _from, state) do + {:reply, Map.get(state.entries, owner_pid), state} + end + + def handle_call({:valid?, owner_pid, challenge}, _from, state) do + {:reply, Map.get(state.entries, owner_pid) == challenge, state} + end + + def handle_call({:clear, owner_pid}, _from, state) do + {:reply, :ok, remove_owner(state, owner_pid)} + end + + @impl true + def handle_info({:DOWN, monitor_ref, :process, owner_pid, _reason}, state) do + case Map.get(state.monitors, owner_pid) do + ^monitor_ref -> {:noreply, remove_owner(state, owner_pid)} + _other -> {:noreply, state} + end + end + + def handle_info(_message, state), do: {:noreply, state} + + defp ensure_monitor(state, owner_pid) do + case Map.has_key?(state.monitors, owner_pid) do + true -> state + false -> put_in(state, [:monitors, owner_pid], Process.monitor(owner_pid)) + end + end + + defp remove_owner(state, owner_pid) do + {monitor_ref, monitors} = Map.pop(state.monitors, owner_pid) + + if is_reference(monitor_ref) do + Process.demonitor(monitor_ref, [:flush]) + end + + state + |> Map.put(:monitors, monitors) + |> update_in([:entries], &Map.delete(&1, owner_pid)) + end + + defp generate_challenge do + 16 + |> :crypto.strong_rand_bytes() + |> Base.url_encode64(padding: false) + end +end diff --git a/lib/parrhesia/auth/nip98.ex b/lib/parrhesia/auth/nip98.ex new file mode 100644 index 0000000..5e425df --- /dev/null +++ b/lib/parrhesia/auth/nip98.ex @@ -0,0 +1,88 @@ +defmodule Parrhesia.Auth.Nip98 do + @moduledoc """ + Minimal NIP-98 HTTP auth validation. + """ + + alias Parrhesia.Protocol.EventValidator + + @max_age_seconds 60 + + @spec validate_authorization_header(String.t() | nil, String.t(), String.t()) :: + {:ok, map()} | {:error, atom()} + def validate_authorization_header(nil, _method, _url), do: {:error, :missing_authorization} + + def validate_authorization_header("Nostr " <> encoded_event, method, url) + when is_binary(method) and is_binary(url) do + with {:ok, event_json} <- decode_base64(encoded_event), + {:ok, event} <- Jason.decode(event_json), + :ok <- validate_event_shape(event), + :ok <- validate_http_binding(event, method, url) do + {:ok, event} + else + {:error, reason} -> {:error, reason} + _other -> {:error, :invalid_authorization} + end + end + + def validate_authorization_header(_header, _method, _url), do: {:error, :invalid_authorization} + + defp decode_base64(encoded_event) do + case Base.decode64(encoded_event) do + {:ok, event_json} -> {:ok, event_json} + :error -> Base.url_decode64(encoded_event, padding: false) + end + end + + defp validate_event_shape(event) when is_map(event) do + with :ok <- EventValidator.validate(event), + :ok <- validate_kind(event), + :ok <- validate_fresh_created_at(event) do + :ok + else + :ok -> :ok + {:error, _reason} -> {:error, :invalid_event} + end + end + + defp validate_event_shape(_event), do: {:error, :invalid_event} + + defp validate_kind(%{"kind" => 27_235}), do: :ok + defp validate_kind(_event), do: {:error, :invalid_event} + + defp validate_fresh_created_at(%{"created_at" => created_at}) when is_integer(created_at) do + now = System.system_time(:second) + + if abs(now - created_at) <= @max_age_seconds do + :ok + else + {:error, :stale_event} + end + end + + defp validate_fresh_created_at(_event), do: {:error, :invalid_event} + + defp validate_http_binding(event, method, url) do + tags = Map.get(event, "tags", []) + + method_matches? = + Enum.any?(tags, fn + ["method", tagged_method | _rest] when is_binary(tagged_method) -> + String.upcase(tagged_method) == String.upcase(method) + + _tag -> + false + end) + + url_matches? = + Enum.any?(tags, fn + ["u", tagged_url | _rest] when is_binary(tagged_url) -> tagged_url == url + _tag -> false + end) + + cond do + not method_matches? -> {:error, :invalid_method_tag} + not url_matches? -> {:error, :invalid_url_tag} + true -> :ok + end + end +end diff --git a/lib/parrhesia/auth/supervisor.ex b/lib/parrhesia/auth/supervisor.ex index 5021ba3..afe9c90 100644 --- a/lib/parrhesia/auth/supervisor.ex +++ b/lib/parrhesia/auth/supervisor.ex @@ -11,6 +11,10 @@ defmodule Parrhesia.Auth.Supervisor do @impl true def init(_init_arg) do - Supervisor.init([], strategy: :one_for_one) + children = [ + {Parrhesia.Auth.Challenges, name: Parrhesia.Auth.Challenges} + ] + + Supervisor.init(children, strategy: :one_for_one) end end diff --git a/lib/parrhesia/fanout/multi_node.ex b/lib/parrhesia/fanout/multi_node.ex new file mode 100644 index 0000000..c10fd77 --- /dev/null +++ b/lib/parrhesia/fanout/multi_node.ex @@ -0,0 +1,70 @@ +defmodule Parrhesia.Fanout.MultiNode do + @moduledoc """ + Lightweight multi-node fanout bus built on `:pg` groups. + """ + + use GenServer + + alias Parrhesia.Subscriptions.Index + + @group __MODULE__ + + @spec start_link(keyword()) :: GenServer.on_start() + def start_link(opts \\ []) do + name = Keyword.get(opts, :name, __MODULE__) + GenServer.start_link(__MODULE__, :ok, name: name) + end + + @spec publish(map()) :: :ok + def publish(event), do: publish(__MODULE__, event) + + @spec publish(GenServer.server(), map()) :: :ok + def publish(server, event) when is_map(event) do + GenServer.cast(server, {:publish, event}) + end + + @impl true + def init(:ok) do + :ok = ensure_pg_started() + :ok = :pg.join(@group, self()) + {:ok, %{}} + end + + @impl true + def handle_cast({:publish, event}, state) do + @group + |> :pg.get_members() + |> Enum.reject(&(&1 == self())) + |> Enum.each(fn member_pid -> + send(member_pid, {:remote_fanout_event, event}) + end) + + {:noreply, state} + end + + @impl true + def handle_info({:remote_fanout_event, event}, state) do + Index.candidate_subscription_keys(event) + |> Enum.each(fn {owner_pid, subscription_id} -> + send(owner_pid, {:fanout_event, subscription_id, event}) + end) + + {:noreply, state} + end + + def handle_info(_message, state), do: {:noreply, state} + + defp ensure_pg_started do + case Process.whereis(:pg) do + nil -> + case :pg.start_link() do + {:ok, _pid} -> :ok + {:error, {:already_started, _pid}} -> :ok + {:error, reason} -> {:error, reason} + end + + _pid -> + :ok + end + end +end diff --git a/lib/parrhesia/groups/flow.ex b/lib/parrhesia/groups/flow.ex new file mode 100644 index 0000000..a6866a9 --- /dev/null +++ b/lib/parrhesia/groups/flow.ex @@ -0,0 +1,75 @@ +defmodule Parrhesia.Groups.Flow do + @moduledoc """ + Minimal group and membership flow handling for NIP-29/NIP-43 related kinds. + """ + + alias Parrhesia.Storage + + @membership_request_kind 8_000 + @membership_approval_kind 8_001 + @relay_metadata_kind 28_934 + @relay_admins_kind 28_935 + @relay_rules_kind 28_936 + @membership_event_kind 13_534 + + @spec handle_event(map()) :: :ok | {:error, term()} + def handle_event(event) when is_map(event) do + case Map.get(event, "kind") do + @membership_request_kind -> upsert_membership(event, "requested") + @membership_approval_kind -> upsert_membership(event, "member") + @membership_event_kind -> upsert_membership(event, "member") + @relay_metadata_kind -> :ok + @relay_admins_kind -> :ok + @relay_rules_kind -> :ok + _other -> :ok + end + end + + @spec group_related_kind?(non_neg_integer()) :: boolean() + def group_related_kind?(kind) + when kind in [ + @membership_request_kind, + @membership_approval_kind, + @relay_metadata_kind, + @relay_admins_kind, + @relay_rules_kind, + @membership_event_kind + ], + do: true + + def group_related_kind?(_kind), do: false + + defp upsert_membership(event, role) do + with {:ok, group_id} <- group_id_from_event(event), + {:ok, pubkey} <- pubkey_from_event(event) do + Storage.groups().put_membership(%{}, %{ + group_id: group_id, + pubkey: pubkey, + role: role, + metadata: %{"source_kind" => Map.get(event, "kind")} + }) + |> case do + {:ok, _membership} -> :ok + {:error, reason} -> {:error, reason} + end + end + end + + defp group_id_from_event(event) do + group_id = + event + |> Map.get("tags", []) + |> Enum.find_value(fn + ["h", value | _rest] when is_binary(value) and value != "" -> value + _tag -> nil + end) + + case group_id do + nil -> {:error, :missing_group_id} + value -> {:ok, value} + end + end + + defp pubkey_from_event(%{"pubkey" => pubkey}) when is_binary(pubkey), do: {:ok, pubkey} + defp pubkey_from_event(_event), do: {:error, :missing_pubkey} +end diff --git a/lib/parrhesia/negentropy/sessions.ex b/lib/parrhesia/negentropy/sessions.ex new file mode 100644 index 0000000..aae0778 --- /dev/null +++ b/lib/parrhesia/negentropy/sessions.ex @@ -0,0 +1,122 @@ +defmodule Parrhesia.Negentropy.Sessions do + @moduledoc """ + In-memory NEG-* session tracking. + """ + + use GenServer + + @type session_key :: {pid(), String.t()} + + @spec start_link(keyword()) :: GenServer.on_start() + def start_link(opts \\ []) do + name = Keyword.get(opts, :name, __MODULE__) + GenServer.start_link(__MODULE__, :ok, name: name) + end + + @spec open(GenServer.server(), pid(), String.t(), map()) :: {:ok, map()} | {:error, term()} + def open(server \\ __MODULE__, owner_pid, subscription_id, params) + when is_pid(owner_pid) and is_binary(subscription_id) and is_map(params) do + GenServer.call(server, {:open, owner_pid, subscription_id, params}) + end + + @spec message(GenServer.server(), pid(), String.t(), map()) :: {:ok, map()} | {:error, term()} + def message(server \\ __MODULE__, owner_pid, subscription_id, payload) + when is_pid(owner_pid) and is_binary(subscription_id) and is_map(payload) do + GenServer.call(server, {:message, owner_pid, subscription_id, payload}) + end + + @spec close(GenServer.server(), pid(), String.t()) :: :ok + def close(server \\ __MODULE__, owner_pid, subscription_id) + when is_pid(owner_pid) and is_binary(subscription_id) do + GenServer.call(server, {:close, owner_pid, subscription_id}) + end + + @impl true + def init(:ok) do + {:ok, %{sessions: %{}, monitors: %{}}} + end + + @impl true + def handle_call({:open, owner_pid, subscription_id, params}, _from, state) do + key = {owner_pid, subscription_id} + + session = %{ + cursor: 0, + params: params, + opened_at: System.system_time(:second) + } + + state = + state + |> ensure_monitor(owner_pid) + |> put_in([:sessions, key], session) + + {:reply, {:ok, %{"status" => "open", "cursor" => 0}}, state} + end + + def handle_call({:message, owner_pid, subscription_id, payload}, _from, state) do + key = {owner_pid, subscription_id} + + case Map.get(state.sessions, key) do + nil -> + {:reply, {:error, :unknown_session}, state} + + session -> + cursor = session.cursor + 1 + + next_session = %{session | cursor: cursor, params: Map.merge(session.params, payload)} + state = put_in(state, [:sessions, key], next_session) + + {:reply, {:ok, %{"status" => "ack", "cursor" => cursor}}, state} + end + end + + def handle_call({:close, owner_pid, subscription_id}, _from, state) do + key = {owner_pid, subscription_id} + state = update_in(state.sessions, &Map.delete(&1, key)) + {:reply, :ok, state} + end + + @impl true + def handle_info({:DOWN, monitor_ref, :process, owner_pid, _reason}, state) do + case Map.get(state.monitors, owner_pid) do + ^monitor_ref -> + state = + state + |> maybe_remove_monitor(owner_pid) + |> remove_owner_sessions(owner_pid) + + {:noreply, state} + + _other -> + {:noreply, state} + end + end + + def handle_info(_message, state), do: {:noreply, state} + + defp remove_owner_sessions(state, owner_pid) do + update_in(state.sessions, fn sessions -> + sessions + |> Enum.reject(fn {{session_owner, _sub_id}, _session} -> session_owner == owner_pid end) + |> Map.new() + end) + end + + defp ensure_monitor(state, owner_pid) do + case Map.has_key?(state.monitors, owner_pid) do + true -> state + false -> put_in(state, [:monitors, owner_pid], Process.monitor(owner_pid)) + end + end + + defp maybe_remove_monitor(state, owner_pid) do + {monitor_ref, monitors} = Map.pop(state.monitors, owner_pid) + + if is_reference(monitor_ref) do + Process.demonitor(monitor_ref, [:flush]) + end + + Map.put(state, :monitors, monitors) + end +end diff --git a/lib/parrhesia/policy/event_policy.ex b/lib/parrhesia/policy/event_policy.ex new file mode 100644 index 0000000..07bc743 --- /dev/null +++ b/lib/parrhesia/policy/event_policy.ex @@ -0,0 +1,222 @@ +defmodule Parrhesia.Policy.EventPolicy do + @moduledoc """ + Write/read policy checks for relay operations. + """ + + alias Parrhesia.Storage + + @type policy_error :: + :auth_required + | :restricted_giftwrap + | :protected_event_requires_auth + | :protected_event_pubkey_mismatch + | :pow_below_minimum + | :pubkey_banned + | :event_banned + | :mls_disabled + + @spec authorize_read([map()], MapSet.t(String.t())) :: :ok | {:error, policy_error()} + def authorize_read(filters, authenticated_pubkeys) when is_list(filters) do + auth_required? = config_bool([:policies, :auth_required_for_reads], false) + + cond do + auth_required? and MapSet.size(authenticated_pubkeys) == 0 -> + {:error, :auth_required} + + giftwrap_restricted?(filters, authenticated_pubkeys) -> + {:error, :restricted_giftwrap} + + true -> + :ok + end + end + + @spec authorize_write(map(), MapSet.t(String.t())) :: :ok | {:error, policy_error()} + def authorize_write(event, authenticated_pubkeys) when is_map(event) do + checks = [ + fn -> maybe_require_auth_for_write(authenticated_pubkeys) end, + fn -> reject_if_pubkey_banned(event) end, + fn -> reject_if_event_banned(event) end, + fn -> enforce_pow(event) end, + fn -> enforce_protected_event(event, authenticated_pubkeys) end, + fn -> enforce_mls_feature_flag(event) end + ] + + Enum.reduce_while(checks, :ok, fn check, :ok -> + case check.() do + :ok -> {:cont, :ok} + {:error, _reason} = error -> {:halt, error} + end + end) + end + + @spec error_message(policy_error()) :: String.t() + def error_message(:auth_required), do: "auth-required: authentication required" + + def error_message(:restricted_giftwrap), + do: "restricted: giftwrap access requires recipient authentication" + + def error_message(:protected_event_requires_auth), + do: "auth-required: protected events require authenticated pubkey" + + def error_message(:protected_event_pubkey_mismatch), + do: "restricted: protected event pubkey does not match authenticated pubkey" + + def error_message(:pow_below_minimum), do: "pow: minimum proof-of-work difficulty not met" + def error_message(:pubkey_banned), do: "blocked: pubkey is banned" + def error_message(:event_banned), do: "blocked: event is banned" + def error_message(:mls_disabled), do: "blocked: mls feature flag is disabled" + + defp maybe_require_auth_for_write(authenticated_pubkeys) do + if config_bool([:policies, :auth_required_for_writes], false) and + MapSet.size(authenticated_pubkeys) == 0 do + {:error, :auth_required} + else + :ok + end + end + + defp giftwrap_restricted?(filters, authenticated_pubkeys) do + if MapSet.size(authenticated_pubkeys) == 0 do + any_filter_targets_giftwrap?(filters) + else + not giftwrap_filters_include_authenticated_recipient?(filters, authenticated_pubkeys) + end + end + + defp any_filter_targets_giftwrap?(filters) do + Enum.any?(filters, fn filter -> + case Map.get(filter, "kinds") do + kinds when is_list(kinds) -> 1059 in kinds + _other -> false + end + end) + end + + defp giftwrap_filters_include_authenticated_recipient?(filters, authenticated_pubkeys) do + Enum.all?(filters, fn filter -> + if targets_giftwrap?(filter) do + recipients = Map.get(filter, "#p") || [] + recipients != [] and Enum.any?(recipients, &MapSet.member?(authenticated_pubkeys, &1)) + else + true + end + end) + end + + defp targets_giftwrap?(filter) do + case Map.get(filter, "kinds") do + kinds when is_list(kinds) -> 1059 in kinds + _other -> false + end + end + + defp reject_if_pubkey_banned(event) do + with pubkey when is_binary(pubkey) <- Map.get(event, "pubkey"), + {:ok, true} <- Storage.moderation().pubkey_banned?(%{}, pubkey) do + {:error, :pubkey_banned} + else + {:ok, false} -> :ok + _other -> :ok + end + end + + defp reject_if_event_banned(event) do + with event_id when is_binary(event_id) <- Map.get(event, "id"), + {:ok, true} <- Storage.moderation().event_banned?(%{}, event_id) do + {:error, :event_banned} + else + {:ok, false} -> :ok + _other -> :ok + end + end + + defp enforce_pow(event) do + min_difficulty = config_int([:policies, :min_pow_difficulty], 0) + + if min_difficulty <= 0 do + :ok + else + difficulty = event_pow_difficulty(event) + + if difficulty >= min_difficulty do + :ok + else + {:error, :pow_below_minimum} + end + end + end + + defp event_pow_difficulty(event) do + event + |> Map.get("id", "") + |> String.downcase() + |> String.graphemes() + |> Enum.reduce_while(0, fn + "0", acc -> {:cont, acc + 4} + hex, acc -> {:halt, acc + leading_zero_bits(hex)} + end) + end + + defp leading_zero_bits("1"), do: 3 + defp leading_zero_bits("2"), do: 2 + defp leading_zero_bits("3"), do: 2 + defp leading_zero_bits("4"), do: 1 + defp leading_zero_bits("5"), do: 1 + defp leading_zero_bits("6"), do: 1 + defp leading_zero_bits("7"), do: 1 + defp leading_zero_bits("8"), do: 0 + defp leading_zero_bits("9"), do: 0 + defp leading_zero_bits("a"), do: 0 + defp leading_zero_bits("b"), do: 0 + defp leading_zero_bits("c"), do: 0 + defp leading_zero_bits("d"), do: 0 + defp leading_zero_bits("e"), do: 0 + defp leading_zero_bits("f"), do: 0 + defp leading_zero_bits(_other), do: 0 + + defp enforce_protected_event(event, authenticated_pubkeys) do + protected? = + event + |> Map.get("tags", []) + |> Enum.any?(fn + ["-" | _rest] -> true + _tag -> false + end) + + if protected? do + pubkey = Map.get(event, "pubkey") + + cond do + MapSet.size(authenticated_pubkeys) == 0 -> {:error, :protected_event_requires_auth} + MapSet.member?(authenticated_pubkeys, pubkey) -> :ok + true -> {:error, :protected_event_pubkey_mismatch} + end + else + :ok + end + end + + defp enforce_mls_feature_flag(event) do + if event["kind"] in [443, 445, 10_051] and not config_bool([:features, :nip_ee_mls], false) do + {:error, :mls_disabled} + else + :ok + end + end + + defp config_bool([scope, key], default) do + case Application.get_env(:parrhesia, scope, []) |> Keyword.get(key, default) do + true -> true + false -> false + _other -> default + end + end + + defp config_int([scope, key], default) do + case Application.get_env(:parrhesia, scope, []) |> Keyword.get(key, default) do + value when is_integer(value) -> value + _other -> default + end + end +end diff --git a/lib/parrhesia/protocol.ex b/lib/parrhesia/protocol.ex index bbcd766..6c057e2 100644 --- a/lib/parrhesia/protocol.ex +++ b/lib/parrhesia/protocol.ex @@ -12,6 +12,11 @@ defmodule Parrhesia.Protocol do {:event, event()} | {:req, String.t(), [filter()]} | {:close, String.t()} + | {:auth, event()} + | {:count, String.t(), [filter()], map()} + | {:neg_open, String.t(), map()} + | {:neg_msg, String.t(), map()} + | {:neg_close, String.t()} @type relay_message :: {:notice, String.t()} @@ -19,6 +24,9 @@ defmodule Parrhesia.Protocol do | {:closed, String.t(), String.t()} | {:eose, String.t()} | {:event, String.t(), event()} + | {:auth, String.t()} + | {:count, String.t(), map()} + | {:neg_msg, String.t(), map()} @type decode_error :: :invalid_json @@ -26,6 +34,11 @@ defmodule Parrhesia.Protocol do | :invalid_event | :invalid_subscription_id | :invalid_filters + | :invalid_auth + | :invalid_count + | :invalid_negentropy + + @count_options_keys MapSet.new(["hll", "approximate"]) @spec decode_client(binary()) :: {:ok, client_message()} | {:error, decode_error()} def decode_client(payload) when is_binary(payload) do @@ -57,6 +70,9 @@ defmodule Parrhesia.Protocol do :invalid_event -> "invalid: invalid EVENT shape" :invalid_subscription_id -> "invalid: invalid subscription id" :invalid_filters -> "invalid: invalid filters" + :invalid_auth -> "invalid: invalid AUTH message" + :invalid_count -> "invalid: invalid COUNT message" + :invalid_negentropy -> "invalid: invalid NEG message" end end @@ -71,6 +87,73 @@ defmodule Parrhesia.Protocol do defp decode_message(["EVENT", _event]), do: {:error, :invalid_event} defp decode_message(["REQ", subscription_id | filters]) when is_binary(subscription_id) do + decode_req_like_message(:req, subscription_id, filters) + end + + defp decode_message(["REQ", _subscription_id | _filters]), + do: {:error, :invalid_subscription_id} + + defp decode_message(["COUNT", subscription_id | filters_or_options]) + when is_binary(subscription_id) do + with {:ok, filters, options} <- split_count_parts(filters_or_options), + {:ok, {:req, ^subscription_id, parsed_filters}} <- + decode_req_like_message(:req, subscription_id, filters) do + {:ok, {:count, subscription_id, parsed_filters, options}} + else + _error -> {:error, :invalid_count} + end + end + + defp decode_message(["COUNT", _subscription_id | _filters_or_options]), + do: {:error, :invalid_count} + + defp decode_message(["CLOSE", subscription_id]) when is_binary(subscription_id) do + if valid_subscription_id?(subscription_id) do + {:ok, {:close, subscription_id}} + else + {:error, :invalid_subscription_id} + end + end + + defp decode_message(["CLOSE", _subscription_id]), do: {:error, :invalid_subscription_id} + + defp decode_message(["AUTH", auth_event]) when is_map(auth_event), + do: {:ok, {:auth, auth_event}} + + defp decode_message(["AUTH", _invalid]), do: {:error, :invalid_auth} + + defp decode_message(["NEG-OPEN", subscription_id, payload]) + when is_binary(subscription_id) and is_map(payload) do + if valid_subscription_id?(subscription_id) do + {:ok, {:neg_open, subscription_id, payload}} + else + {:error, :invalid_subscription_id} + end + end + + defp decode_message(["NEG-MSG", subscription_id, payload]) + when is_binary(subscription_id) and is_map(payload) do + if valid_subscription_id?(subscription_id) do + {:ok, {:neg_msg, subscription_id, payload}} + else + {:error, :invalid_subscription_id} + end + end + + defp decode_message(["NEG-CLOSE", subscription_id]) when is_binary(subscription_id) do + if valid_subscription_id?(subscription_id) do + {:ok, {:neg_close, subscription_id}} + else + {:error, :invalid_subscription_id} + end + end + + defp decode_message([type | _rest]) when type in ["NEG-OPEN", "NEG-MSG", "NEG-CLOSE"], + do: {:error, :invalid_negentropy} + + defp decode_message(_other), do: {:error, :invalid_message} + + defp decode_req_like_message(_kind, subscription_id, filters) do cond do not valid_subscription_id?(subscription_id) -> {:error, :invalid_subscription_id} @@ -86,25 +169,51 @@ defmodule Parrhesia.Protocol do end end - defp decode_message(["REQ", _subscription_id | _filters]), - do: {:error, :invalid_subscription_id} - - defp decode_message(["CLOSE", subscription_id]) when is_binary(subscription_id) do - if valid_subscription_id?(subscription_id) do - {:ok, {:close, subscription_id}} + defp split_count_parts(parts) when is_list(parts) do + if parts == [] do + {:error, :missing_filters} else - {:error, :invalid_subscription_id} + split_count_parts_with_optional_options(parts) end end - defp decode_message(["CLOSE", _subscription_id]), do: {:error, :invalid_subscription_id} - defp decode_message(_other), do: {:error, :invalid_message} + defp split_count_parts(_parts), do: {:error, :invalid_parts} + + defp split_count_parts_with_optional_options(parts) do + case List.last(parts) do + options when is_map(options) -> + maybe_extract_count_options(parts, options) + + _other -> + {:ok, parts, %{}} + end + end + + defp maybe_extract_count_options(parts, options) do + if count_options_map?(options) and length(parts) > 1 do + filters = Enum.drop(parts, -1) + {:ok, filters, options} + else + {:ok, parts, %{}} + end + end + + defp count_options_map?(map) do + map + |> Map.keys() + |> Enum.all?(&MapSet.member?(@count_options_keys, &1)) + end defp relay_frame({:notice, message}), do: ["NOTICE", message] defp relay_frame({:ok, event_id, accepted, message}), do: ["OK", event_id, accepted, message] defp relay_frame({:closed, subscription_id, message}), do: ["CLOSED", subscription_id, message] defp relay_frame({:eose, subscription_id}), do: ["EOSE", subscription_id] defp relay_frame({:event, subscription_id, event}), do: ["EVENT", subscription_id, event] + defp relay_frame({:auth, challenge}), do: ["AUTH", challenge] + defp relay_frame({:count, subscription_id, payload}), do: ["COUNT", subscription_id, payload] + + defp relay_frame({:neg_msg, subscription_id, payload}), + do: ["NEG-MSG", subscription_id, payload] defp valid_subscription_id?(subscription_id) do subscription_id != "" and String.length(subscription_id) <= 64 diff --git a/lib/parrhesia/protocol/filter.ex b/lib/parrhesia/protocol/filter.ex index 3c2d1bb..fccdd4a 100644 --- a/lib/parrhesia/protocol/filter.ex +++ b/lib/parrhesia/protocol/filter.ex @@ -18,9 +18,10 @@ defmodule Parrhesia.Protocol.Filter do | :invalid_since | :invalid_until | :invalid_limit + | :invalid_search | :invalid_tag_filter - @allowed_keys MapSet.new(["ids", "authors", "kinds", "since", "until", "limit"]) + @allowed_keys MapSet.new(["ids", "authors", "kinds", "since", "until", "limit", "search"]) @error_messages %{ invalid_filters: "invalid: filters must be a non-empty array of objects", @@ -34,6 +35,7 @@ defmodule Parrhesia.Protocol.Filter do invalid_since: "invalid: since must be a non-negative integer", invalid_until: "invalid: until must be a non-negative integer", invalid_limit: "invalid: limit must be a positive integer", + invalid_search: "invalid: search must be a non-empty string", invalid_tag_filter: "invalid: tag filters must use # with non-empty string arrays" } @@ -71,7 +73,8 @@ defmodule Parrhesia.Protocol.Filter do :ok <- validate_kinds(Map.get(filter, "kinds")), :ok <- validate_since(Map.get(filter, "since")), :ok <- validate_until(Map.get(filter, "until")), - :ok <- validate_limit(Map.get(filter, "limit")) do + :ok <- validate_limit(Map.get(filter, "limit")), + :ok <- validate_search(Map.get(filter, "search")) do validate_tag_filters(filter) end end @@ -89,12 +92,9 @@ defmodule Parrhesia.Protocol.Filter do def matches_filter?(event, filter) when is_map(event) and is_map(filter) do case validate_filter(filter) do :ok -> - ids_match?(event, Map.get(filter, "ids")) and - authors_match?(event, Map.get(filter, "authors")) and - kinds_match?(event, Map.get(filter, "kinds")) and - since_match?(event, Map.get(filter, "since")) and - until_match?(event, Map.get(filter, "until")) and - tags_match?(event, filter) + event + |> filter_predicates(filter) + |> Enum.all?() {:error, _reason} -> false @@ -170,6 +170,10 @@ defmodule Parrhesia.Protocol.Filter do defp validate_limit(limit) when is_integer(limit) and limit > 0, do: :ok defp validate_limit(_limit), do: {:error, :invalid_limit} + defp validate_search(nil), do: :ok + defp validate_search(search) when is_binary(search) and search != "", do: :ok + defp validate_search(_search), do: {:error, :invalid_search} + defp validate_tag_filters(filter) do filter |> Enum.filter(fn {key, _value} -> valid_tag_filter_key?(key) end) @@ -188,6 +192,18 @@ defmodule Parrhesia.Protocol.Filter do defp valid_tag_filter_values?(_values), do: false + defp filter_predicates(event, filter) do + [ + ids_match?(event, Map.get(filter, "ids")), + authors_match?(event, Map.get(filter, "authors")), + kinds_match?(event, Map.get(filter, "kinds")), + since_match?(event, Map.get(filter, "since")), + until_match?(event, Map.get(filter, "until")), + search_match?(event, Map.get(filter, "search")), + tags_match?(event, filter) + ] + end + defp ids_match?(_event, nil), do: true defp ids_match?(event, ids) do @@ -220,6 +236,13 @@ defmodule Parrhesia.Protocol.Filter do is_integer(created_at) and created_at <= until end + defp search_match?(_event, nil), do: true + + defp search_match?(event, search) do + content = Map.get(event, "content", "") + String.contains?(String.downcase(content), String.downcase(search)) + end + defp tags_match?(event, filter) do filter |> Enum.filter(fn {key, _value} -> valid_tag_filter_key?(key) end) diff --git a/lib/parrhesia/repo.ex b/lib/parrhesia/repo.ex index a871f8b..0ee82ef 100644 --- a/lib/parrhesia/repo.ex +++ b/lib/parrhesia/repo.ex @@ -1,9 +1,6 @@ defmodule Parrhesia.Repo do @moduledoc """ PostgreSQL repository for storage adapter persistence. - - Note: the repo is not yet started by the supervision tree while the - storage adapter is in staged implementation. """ use Ecto.Repo, diff --git a/lib/parrhesia/storage/adapters/memory/admin.ex b/lib/parrhesia/storage/adapters/memory/admin.ex new file mode 100644 index 0000000..8309ddb --- /dev/null +++ b/lib/parrhesia/storage/adapters/memory/admin.ex @@ -0,0 +1,34 @@ +defmodule Parrhesia.Storage.Adapters.Memory.Admin do + @moduledoc """ + In-memory prototype adapter for `Parrhesia.Storage.Admin`. + """ + + alias Parrhesia.Storage.Adapters.Memory.Store + + @behaviour Parrhesia.Storage.Admin + + @impl true + def execute(_context, method, _params) do + case method do + method when method in [:ping, "ping"] -> {:ok, %{"status" => "ok"}} + _other -> {:error, {:unsupported_method, normalize_method(method)}} + end + end + + @impl true + def append_audit_log(_context, audit_entry) when is_map(audit_entry) do + Store.update(fn state -> update_in(state.audit_logs, &[audit_entry | &1]) end) + :ok + end + + def append_audit_log(_context, _audit_entry), do: {:error, :invalid_audit_entry} + + @impl true + def list_audit_logs(_context, _opts) do + {:ok, Store.get(fn state -> Enum.reverse(state.audit_logs) end)} + end + + defp normalize_method(method) when is_binary(method), do: method + defp normalize_method(method) when is_atom(method), do: Atom.to_string(method) + defp normalize_method(method), do: inspect(method) +end diff --git a/lib/parrhesia/storage/adapters/memory/events.ex b/lib/parrhesia/storage/adapters/memory/events.ex new file mode 100644 index 0000000..2533d6c --- /dev/null +++ b/lib/parrhesia/storage/adapters/memory/events.ex @@ -0,0 +1,103 @@ +defmodule Parrhesia.Storage.Adapters.Memory.Events do + @moduledoc """ + In-memory prototype adapter for `Parrhesia.Storage.Events`. + """ + + alias Parrhesia.Protocol.Filter + alias Parrhesia.Storage.Adapters.Memory.Store + + @behaviour Parrhesia.Storage.Events + + @impl true + def put_event(_context, event) do + event_id = Map.fetch!(event, "id") + + result = + Store.get_and_update(fn state -> + if Map.has_key?(state.events, event_id) do + {{:error, :duplicate_event}, state} + else + next_state = put_in(state.events[event_id], event) + {{:ok, event}, next_state} + end + end) + + result + end + + @impl true + def get_event(_context, event_id) do + deleted? = Store.get(fn state -> MapSet.member?(state.deleted, event_id) end) + + if deleted? do + {:ok, nil} + else + {:ok, Store.get(fn state -> Map.get(state.events, event_id) end)} + end + end + + @impl true + def query(_context, filters, _opts) do + with :ok <- Filter.validate_filters(filters) do + state = Store.get(& &1) + + events = + state.events + |> Map.values() + |> Enum.reject(fn event -> MapSet.member?(state.deleted, event["id"]) end) + |> Enum.filter(&Filter.matches_any?(&1, filters)) + + {:ok, events} + end + end + + @impl true + def count(context, filters, opts) do + with {:ok, events} <- query(context, filters, opts) do + {:ok, length(events)} + end + end + + @impl true + def delete_by_request(_context, event) do + delete_ids = + event + |> Map.get("tags", []) + |> Enum.flat_map(fn + ["e", event_id | _rest] -> [event_id] + _tag -> [] + end) + + Store.update(fn state -> + Enum.reduce(delete_ids, state, fn event_id, acc -> + update_in(acc.deleted, &MapSet.put(&1, event_id)) + end) + end) + + {:ok, length(delete_ids)} + end + + @impl true + def vanish(_context, event) do + pubkey = Map.get(event, "pubkey") + + deleted_ids = + Store.get(fn state -> + state.events + |> Map.values() + |> Enum.filter(fn candidate -> candidate["pubkey"] == pubkey end) + |> Enum.map(& &1["id"]) + end) + + Store.update(fn state -> + Enum.reduce(deleted_ids, state, fn event_id, acc -> + update_in(acc.deleted, &MapSet.put(&1, event_id)) + end) + end) + + {:ok, length(deleted_ids)} + end + + @impl true + def purge_expired(_opts), do: {:ok, 0} +end diff --git a/lib/parrhesia/storage/adapters/memory/groups.ex b/lib/parrhesia/storage/adapters/memory/groups.ex new file mode 100644 index 0000000..a528447 --- /dev/null +++ b/lib/parrhesia/storage/adapters/memory/groups.ex @@ -0,0 +1,90 @@ +defmodule Parrhesia.Storage.Adapters.Memory.Groups do + @moduledoc """ + In-memory prototype adapter for `Parrhesia.Storage.Groups`. + """ + + alias Parrhesia.Storage.Adapters.Memory.Store + + @behaviour Parrhesia.Storage.Groups + + @impl true + def put_membership(_context, membership) do + group_id = fetch!(membership, :group_id) + pubkey = fetch!(membership, :pubkey) + + normalized = %{ + group_id: group_id, + pubkey: pubkey, + role: fetch!(membership, :role), + metadata: Map.get(membership, :metadata, %{}) + } + + Store.update(fn state -> put_in(state.groups[{group_id, pubkey}], normalized) end) + {:ok, normalized} + end + + @impl true + def get_membership(_context, group_id, pubkey) do + {:ok, Store.get(fn state -> Map.get(state.groups, {group_id, pubkey}) end)} + end + + @impl true + def delete_membership(_context, group_id, pubkey) do + Store.update(fn state -> update_in(state.groups, &Map.delete(&1, {group_id, pubkey})) end) + :ok + end + + @impl true + def list_memberships(_context, group_id) do + memberships = + Store.get(fn state -> + state.groups + |> Map.values() + |> Enum.filter(fn membership -> membership.group_id == group_id end) + end) + + {:ok, memberships} + end + + @impl true + def put_role(_context, role) do + group_id = fetch!(role, :group_id) + pubkey = fetch!(role, :pubkey) + role_name = fetch!(role, :role) + + normalized = %{ + group_id: group_id, + pubkey: pubkey, + role: role_name, + metadata: Map.get(role, :metadata, %{}) + } + + Store.update(fn state -> put_in(state.roles[{group_id, pubkey, role_name}], normalized) end) + {:ok, normalized} + end + + @impl true + def delete_role(_context, group_id, pubkey, role_name) do + Store.update(fn state -> + update_in(state.roles, &Map.delete(&1, {group_id, pubkey, role_name})) + end) + + :ok + end + + @impl true + def list_roles(_context, group_id, pubkey) do + roles = + Store.get(fn state -> + state.roles + |> Map.values() + |> Enum.filter(fn role -> role.group_id == group_id and role.pubkey == pubkey end) + end) + + {:ok, roles} + end + + defp fetch!(map, key) do + Map.get(map, key) || Map.fetch!(map, Atom.to_string(key)) + end +end diff --git a/lib/parrhesia/storage/adapters/memory/moderation.ex b/lib/parrhesia/storage/adapters/memory/moderation.ex new file mode 100644 index 0000000..d77259c --- /dev/null +++ b/lib/parrhesia/storage/adapters/memory/moderation.ex @@ -0,0 +1,68 @@ +defmodule Parrhesia.Storage.Adapters.Memory.Moderation do + @moduledoc """ + In-memory prototype adapter for `Parrhesia.Storage.Moderation`. + """ + + alias Parrhesia.Storage.Adapters.Memory.Store + + @behaviour Parrhesia.Storage.Moderation + + @impl true + def ban_pubkey(_context, pubkey), do: update_ban_set(:pubkeys, pubkey, :add) + + @impl true + def unban_pubkey(_context, pubkey), do: update_ban_set(:pubkeys, pubkey, :delete) + + @impl true + def pubkey_banned?(_context, pubkey), do: {:ok, banned?(:pubkeys, pubkey)} + + @impl true + def allow_pubkey(_context, pubkey) do + Store.update(fn state -> update_in(state.allowed_pubkeys, &MapSet.put(&1, pubkey)) end) + :ok + end + + @impl true + def disallow_pubkey(_context, pubkey) do + Store.update(fn state -> update_in(state.allowed_pubkeys, &MapSet.delete(&1, pubkey)) end) + :ok + end + + @impl true + def pubkey_allowed?(_context, pubkey) do + {:ok, Store.get(fn state -> MapSet.member?(state.allowed_pubkeys, pubkey) end)} + end + + @impl true + def ban_event(_context, event_id), do: update_ban_set(:events, event_id, :add) + + @impl true + def unban_event(_context, event_id), do: update_ban_set(:events, event_id, :delete) + + @impl true + def event_banned?(_context, event_id), do: {:ok, banned?(:events, event_id)} + + @impl true + def block_ip(_context, ip), do: update_ban_set(:ips, ip, :add) + + @impl true + def unblock_ip(_context, ip), do: update_ban_set(:ips, ip, :delete) + + @impl true + def ip_blocked?(_context, ip), do: {:ok, banned?(:ips, ip)} + + defp banned?(key, value) do + Store.get(fn state -> MapSet.member?(state.bans[key], value) end) + end + + defp update_ban_set(key, value, operation) do + Store.update(fn state -> + update_in(state.bans[key], &apply_ban_operation(&1, value, operation)) + end) + + :ok + end + + defp apply_ban_operation(current, value, :add), do: MapSet.put(current, value) + defp apply_ban_operation(current, value, :delete), do: MapSet.delete(current, value) +end diff --git a/lib/parrhesia/storage/adapters/memory/store.ex b/lib/parrhesia/storage/adapters/memory/store.ex new file mode 100644 index 0000000..2f55a6a --- /dev/null +++ b/lib/parrhesia/storage/adapters/memory/store.ex @@ -0,0 +1,48 @@ +defmodule Parrhesia.Storage.Adapters.Memory.Store do + @moduledoc false + + use Agent + + @name __MODULE__ + + @initial_state %{ + events: %{}, + deleted: MapSet.new(), + bans: %{pubkeys: MapSet.new(), events: MapSet.new(), ips: MapSet.new()}, + allowed_pubkeys: MapSet.new(), + groups: %{}, + roles: %{}, + audit_logs: [] + } + + def ensure_started do + if Process.whereis(@name) do + :ok + else + start_store() + end + end + + defp start_store do + case Agent.start_link(fn -> @initial_state end, name: @name) do + {:ok, _pid} -> :ok + {:error, {:already_started, _pid}} -> :ok + {:error, reason} -> {:error, reason} + end + end + + def get(fun) do + :ok = ensure_started() + Agent.get(@name, fun) + end + + def update(fun) do + :ok = ensure_started() + Agent.update(@name, fun) + end + + def get_and_update(fun) do + :ok = ensure_started() + Agent.get_and_update(@name, fun) + end +end diff --git a/lib/parrhesia/storage/adapters/postgres/admin.ex b/lib/parrhesia/storage/adapters/postgres/admin.ex index 38a1265..b3452d8 100644 --- a/lib/parrhesia/storage/adapters/postgres/admin.ex +++ b/lib/parrhesia/storage/adapters/postgres/admin.ex @@ -13,10 +13,21 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Admin do @max_limit 1_000 @impl true - def execute(_context, method, _params) do - {:error, {:unsupported_method, normalize_method_name(method)}} + def execute(_context, method, params) when is_map(params) do + moderation = Parrhesia.Storage.moderation() + method_name = normalize_method_name(method) + + case method_name do + "ping" -> {:ok, %{"status" => "ok"}} + "stats" -> {:ok, relay_stats()} + "list_audit_logs" -> list_audit_logs(%{}, audit_list_opts(params)) + _other -> execute_moderation_method(moderation, method_name, params) + end end + def execute(_context, method, _params), + do: {:error, {:unsupported_method, normalize_method_name(method)}} + @impl true def append_audit_log(_context, audit_entry) when is_map(audit_entry) do with {:ok, method} <- fetch_required_method(audit_entry), @@ -70,6 +81,91 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Admin do def list_audit_logs(_context, _opts), do: {:error, :invalid_opts} + defp relay_stats do + events_count = Repo.aggregate("events", :count, :id) + banned_pubkeys = Repo.aggregate("banned_pubkeys", :count, :pubkey) + blocked_ips = Repo.aggregate("blocked_ips", :count, :ip) + + %{ + "events" => events_count, + "banned_pubkeys" => banned_pubkeys, + "blocked_ips" => blocked_ips + } + end + + defp execute_moderation_method(moderation, "ban_pubkey", params), + do: execute_pubkey_method(fn ctx, value -> moderation.ban_pubkey(ctx, value) end, params) + + defp execute_moderation_method(moderation, "unban_pubkey", params), + do: execute_pubkey_method(fn ctx, value -> moderation.unban_pubkey(ctx, value) end, params) + + defp execute_moderation_method(moderation, "allow_pubkey", params), + do: execute_pubkey_method(fn ctx, value -> moderation.allow_pubkey(ctx, value) end, params) + + defp execute_moderation_method(moderation, "disallow_pubkey", params), + do: execute_pubkey_method(fn ctx, value -> moderation.disallow_pubkey(ctx, value) end, params) + + defp execute_moderation_method(moderation, "ban_event", params), + do: execute_event_method(fn ctx, value -> moderation.ban_event(ctx, value) end, params) + + defp execute_moderation_method(moderation, "unban_event", params), + do: execute_event_method(fn ctx, value -> moderation.unban_event(ctx, value) end, params) + + defp execute_moderation_method(moderation, "block_ip", params), + do: execute_ip_method(fn ctx, value -> moderation.block_ip(ctx, value) end, params) + + defp execute_moderation_method(moderation, "unblock_ip", params), + do: execute_ip_method(fn ctx, value -> moderation.unblock_ip(ctx, value) end, params) + + defp execute_moderation_method(_moderation, method_name, _params), + do: {:error, {:unsupported_method, method_name}} + + defp audit_list_opts(params) do + [] + |> maybe_put_opt(:limit, Map.get(params, "limit")) + |> maybe_put_opt(:method, Map.get(params, "method")) + |> maybe_put_opt(:actor_pubkey, Map.get(params, "actor_pubkey")) + end + + defp maybe_put_opt(opts, _key, nil), do: opts + defp maybe_put_opt(opts, key, value), do: Keyword.put(opts, key, value) + + defp execute_pubkey_method(fun, params) do + case Map.get(params, "pubkey") do + pubkey when is_binary(pubkey) -> + with :ok <- fun.(%{}, pubkey) do + {:ok, %{"ok" => true}} + end + + _other -> + {:error, :invalid_pubkey} + end + end + + defp execute_event_method(fun, params) do + case Map.get(params, "event_id") do + event_id when is_binary(event_id) -> + with :ok <- fun.(%{}, event_id) do + {:ok, %{"ok" => true}} + end + + _other -> + {:error, :invalid_event_id} + end + end + + defp execute_ip_method(fun, params) do + case Map.get(params, "ip") do + ip when is_binary(ip) -> + with :ok <- fun.(%{}, ip) do + {:ok, %{"ok" => true}} + end + + _other -> + {:error, :invalid_ip} + end + end + defp fetch_required_method(audit_entry) do audit_entry |> fetch_value(:method) diff --git a/lib/parrhesia/storage/adapters/postgres/events.ex b/lib/parrhesia/storage/adapters/postgres/events.ex index a72ee03..4a97f1b 100644 --- a/lib/parrhesia/storage/adapters/postgres/events.ex +++ b/lib/parrhesia/storage/adapters/postgres/events.ex @@ -116,7 +116,7 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Events do filters |> Enum.flat_map(fn filter -> filter - |> event_id_query_for_filter(now) + |> event_id_query_for_filter(now, opts) |> Repo.all() end) |> MapSet.new() @@ -129,10 +129,50 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Events do def count(_context, _filters, _opts), do: {:error, :invalid_opts} @impl true - def delete_by_request(_context, _event), do: {:error, :not_implemented} + def delete_by_request(_context, event) do + with {:ok, deleter_pubkey} <- decode_hex(Map.get(event, "pubkey"), 32, :invalid_pubkey), + {:ok, delete_ids} <- extract_delete_event_ids(event) do + query = + from(stored_event in "events", + where: + stored_event.id in ^delete_ids and + stored_event.pubkey == ^deleter_pubkey and + is_nil(stored_event.deleted_at) + ) + + deleted_at = System.system_time(:second) + {count, _result} = Repo.update_all(query, set: [deleted_at: deleted_at]) + {:ok, count} + end + end @impl true - def vanish(_context, _event), do: {:error, :not_implemented} + def vanish(_context, event) do + with {:ok, pubkey} <- decode_hex(Map.get(event, "pubkey"), 32, :invalid_pubkey), + {:ok, created_at} <- + validate_non_negative_integer(Map.get(event, "created_at"), :invalid_created_at) do + own_events_query = + from(stored_event in "events", + where: stored_event.pubkey == ^pubkey and stored_event.created_at <= ^created_at + ) + + giftwrap_query = + from(stored_event in "events", + join: tag in "event_tags", + on: tag.event_created_at == stored_event.created_at and tag.event_id == stored_event.id, + where: + stored_event.kind == 1059 and + tag.name == "p" and + tag.value == ^Base.encode16(pubkey, case: :lower) and + stored_event.created_at <= ^created_at + ) + + {own_events_count, _result} = Repo.delete_all(own_events_query) + {giftwrap_count, _result} = Repo.delete_all(giftwrap_query) + + {:ok, own_events_count + giftwrap_count} + end + end @impl true def purge_expired(opts) when is_list(opts) do @@ -158,6 +198,11 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Events do {:ok, kind} <- validate_non_negative_integer(Map.get(event, "kind"), :invalid_kind), {:ok, content} <- validate_binary(Map.get(event, "content"), :invalid_content), {:ok, tags} <- validate_tags(Map.get(event, "tags")) do + expires_at = + tags + |> extract_expiration() + |> maybe_apply_mls_group_retention(kind, created_at) + {:ok, %{ id: id, @@ -167,7 +212,7 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Events do content: content, sig: sig, d_tag: extract_d_tag(tags), - expires_at: extract_expiration(tags), + expires_at: expires_at, tags: tags }} end @@ -531,12 +576,14 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Events do |> maybe_filter_kinds(Map.get(filter, "kinds")) |> maybe_filter_since(Map.get(filter, "since")) |> maybe_filter_until(Map.get(filter, "until")) + |> maybe_filter_search(Map.get(filter, "search")) |> filter_by_tags(filter) + |> maybe_restrict_giftwrap_access(filter, opts) maybe_limit_query(query, effective_filter_limit(filter, opts)) end - defp event_id_query_for_filter(filter, now) do + defp event_id_query_for_filter(filter, now, opts) do from(event in "events", where: is_nil(event.deleted_at) and (is_nil(event.expires_at) or event.expires_at > ^now), select: event.id @@ -546,7 +593,9 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Events do |> maybe_filter_kinds(Map.get(filter, "kinds")) |> maybe_filter_since(Map.get(filter, "since")) |> maybe_filter_until(Map.get(filter, "until")) + |> maybe_filter_search(Map.get(filter, "search")) |> filter_by_tags(filter) + |> maybe_restrict_giftwrap_access(filter, opts) end defp maybe_filter_ids(query, nil), do: query @@ -572,6 +621,14 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Events do defp maybe_filter_until(query, nil), do: query defp maybe_filter_until(query, until), do: where(query, [event], event.created_at <= ^until) + defp maybe_filter_search(query, nil), do: query + + defp maybe_filter_search(query, search) when is_binary(search) and search != "" do + where(query, [event], ilike(event.content, ^"%#{search}%")) + end + + defp maybe_filter_search(query, _search), do: query + defp filter_by_tags(query, filter) do filter |> tag_filters() @@ -601,6 +658,32 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Events do Enum.map(values, &Base.decode16!(&1, case: decode_case)) end + defp maybe_restrict_giftwrap_access(query, filter, opts) do + requester_pubkeys = Keyword.get(opts, :requester_pubkeys, []) + + if targets_giftwrap?(filter) and requester_pubkeys != [] do + where( + query, + [event], + fragment( + "EXISTS (SELECT 1 FROM event_tags AS tag WHERE tag.event_created_at = ? AND tag.event_id = ? AND tag.name = 'p' AND tag.value = ANY(?))", + event.created_at, + event.id, + type(^requester_pubkeys, {:array, :string}) + ) + ) + else + query + end + end + + defp targets_giftwrap?(filter) do + case Map.get(filter, "kinds") do + kinds when is_list(kinds) -> 1059 in kinds + _other -> false + end + end + defp effective_filter_limit(filter, opts) do filter_limit = Map.get(filter, "limit") max_filter_limit = Keyword.get(opts, :max_filter_limit) @@ -730,6 +813,25 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Events do end) end + defp extract_delete_event_ids(event) do + delete_ids = + event + |> Map.get("tags", []) + |> Enum.reduce([], fn + ["e", event_id | _rest], acc when is_binary(event_id) -> [event_id | acc] + _tag, acc -> acc + end) + |> Enum.uniq() + + if delete_ids == [] do + {:error, :no_delete_targets} + else + {:ok, Enum.map(delete_ids, &Base.decode16!(&1, case: :mixed))} + end + rescue + ArgumentError -> {:error, :invalid_delete_target} + end + defp extract_expiration(tags) do tags |> Enum.find_value(fn @@ -746,4 +848,19 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Events do end defp parse_unix_seconds(_unix_seconds), do: nil + + defp maybe_apply_mls_group_retention(nil, 445, created_at) do + if Application.get_env(:parrhesia, :features, []) |> Keyword.get(:nip_ee_mls, false) do + ttl = + :parrhesia + |> Application.get_env(:policies, []) + |> Keyword.get(:mls_group_event_ttl_seconds, 300) + + created_at + ttl + else + nil + end + end + + defp maybe_apply_mls_group_retention(expires_at, _kind, _created_at), do: expires_at end diff --git a/lib/parrhesia/storage/archiver.ex b/lib/parrhesia/storage/archiver.ex new file mode 100644 index 0000000..f3ec01e --- /dev/null +++ b/lib/parrhesia/storage/archiver.ex @@ -0,0 +1,34 @@ +defmodule Parrhesia.Storage.Archiver do + @moduledoc """ + Partition-aware archival helpers for Postgres event partitions. + """ + + import Ecto.Query + + alias Parrhesia.Repo + + @doc """ + Lists all `events_*` partitions excluding the default partition. + """ + @spec list_partitions() :: [String.t()] + def list_partitions do + query = + from(table in "pg_tables", + where: table.schemaname == "public", + where: like(table.tablename, "events_%"), + where: table.tablename != "events_default", + select: table.tablename, + order_by: [asc: table.tablename] + ) + + Repo.all(query) + end + + @doc """ + Generates an archive SQL statement for the given partition. + """ + @spec archive_sql(String.t(), String.t()) :: String.t() + def archive_sql(partition_name, archive_table_name) do + "INSERT INTO #{archive_table_name} SELECT * FROM #{partition_name};" + end +end diff --git a/lib/parrhesia/storage/supervisor.ex b/lib/parrhesia/storage/supervisor.ex index a67f39c..957d147 100644 --- a/lib/parrhesia/storage/supervisor.ex +++ b/lib/parrhesia/storage/supervisor.ex @@ -11,6 +11,10 @@ defmodule Parrhesia.Storage.Supervisor do @impl true def init(_init_arg) do - Supervisor.init([], strategy: :one_for_one) + children = [ + Parrhesia.Repo + ] + + Supervisor.init(children, strategy: :one_for_one) end end diff --git a/lib/parrhesia/subscriptions/supervisor.ex b/lib/parrhesia/subscriptions/supervisor.ex index 68d7d4d..882f41e 100644 --- a/lib/parrhesia/subscriptions/supervisor.ex +++ b/lib/parrhesia/subscriptions/supervisor.ex @@ -12,7 +12,9 @@ defmodule Parrhesia.Subscriptions.Supervisor do @impl true def init(_init_arg) do children = [ - {Parrhesia.Subscriptions.Index, name: Parrhesia.Subscriptions.Index} + {Parrhesia.Subscriptions.Index, name: Parrhesia.Subscriptions.Index}, + {Parrhesia.Negentropy.Sessions, name: Parrhesia.Negentropy.Sessions}, + {Parrhesia.Fanout.MultiNode, name: Parrhesia.Fanout.MultiNode} ] Supervisor.init(children, strategy: :one_for_one) diff --git a/lib/parrhesia/tasks/expiration_worker.ex b/lib/parrhesia/tasks/expiration_worker.ex new file mode 100644 index 0000000..d9069b7 --- /dev/null +++ b/lib/parrhesia/tasks/expiration_worker.ex @@ -0,0 +1,47 @@ +defmodule Parrhesia.Tasks.ExpirationWorker do + @moduledoc """ + Periodic worker that purges expired events. + """ + + use GenServer + + alias Parrhesia.Storage + alias Parrhesia.Telemetry + + @default_interval_ms 30_000 + + @spec start_link(keyword()) :: GenServer.on_start() + def start_link(opts \\ []) do + name = Keyword.get(opts, :name, __MODULE__) + GenServer.start_link(__MODULE__, opts, name: name) + end + + @impl true + def init(opts) do + state = %{ + interval_ms: Keyword.get(opts, :interval_ms, @default_interval_ms) + } + + schedule_tick(state.interval_ms) + {:ok, state} + end + + @impl true + def handle_info(:tick, state) do + started_at = System.monotonic_time() + + _result = Storage.events().purge_expired([]) + + duration = System.monotonic_time() - started_at + Telemetry.emit([:parrhesia, :maintenance, :purge_expired, :stop], %{duration: duration}, %{}) + + schedule_tick(state.interval_ms) + {:noreply, state} + end + + def handle_info(_message, state), do: {:noreply, state} + + defp schedule_tick(interval_ms) do + Process.send_after(self(), :tick, interval_ms) + end +end diff --git a/lib/parrhesia/tasks/supervisor.ex b/lib/parrhesia/tasks/supervisor.ex index f7cd0ba..99345fb 100644 --- a/lib/parrhesia/tasks/supervisor.ex +++ b/lib/parrhesia/tasks/supervisor.ex @@ -11,6 +11,13 @@ defmodule Parrhesia.Tasks.Supervisor do @impl true def init(_init_arg) do - Supervisor.init([], strategy: :one_for_one) + children = + if Application.get_env(:parrhesia, :enable_expiration_worker, true) do + [{Parrhesia.Tasks.ExpirationWorker, name: Parrhesia.Tasks.ExpirationWorker}] + else + [] + end + + Supervisor.init(children, strategy: :one_for_one) end end diff --git a/lib/parrhesia/telemetry.ex b/lib/parrhesia/telemetry.ex index 15c7307..a092952 100644 --- a/lib/parrhesia/telemetry.ex +++ b/lib/parrhesia/telemetry.ex @@ -1,16 +1,86 @@ defmodule Parrhesia.Telemetry do @moduledoc """ - Supervision entrypoint for relay telemetry workers. + Supervision entrypoint and helpers for relay telemetry. """ use Supervisor + import Telemetry.Metrics + + @prometheus_reporter __MODULE__.Prometheus + + @spec start_link(keyword()) :: Supervisor.on_start() def start_link(init_arg \\ []) do Supervisor.start_link(__MODULE__, init_arg, name: __MODULE__) end @impl true def init(_init_arg) do - Supervisor.init([], strategy: :one_for_one) + children = [ + {TelemetryMetricsPrometheus.Core, name: @prometheus_reporter, metrics: metrics()}, + {:telemetry_poller, measurements: periodic_measurements(), period: 10_000} + ] + + Supervisor.init(children, strategy: :one_for_one) + end + + @spec prometheus_reporter() :: atom() + def prometheus_reporter, do: @prometheus_reporter + + @spec metrics() :: [Telemetry.Metrics.t()] + def metrics do + [ + distribution("parrhesia.ingest.duration.ms", + event_name: [:parrhesia, :ingest, :stop], + measurement: :duration, + unit: {:native, :millisecond}, + reporter_options: [buckets: [1, 5, 10, 25, 50, 100, 250, 500, 1000]] + ), + distribution("parrhesia.query.duration.ms", + event_name: [:parrhesia, :query, :stop], + measurement: :duration, + unit: {:native, :millisecond}, + reporter_options: [buckets: [1, 5, 10, 25, 50, 100, 250, 500, 1000]] + ), + distribution("parrhesia.fanout.duration.ms", + event_name: [:parrhesia, :fanout, :stop], + measurement: :duration, + unit: {:native, :millisecond}, + reporter_options: [buckets: [1, 5, 10, 25, 50, 100, 250, 500, 1000]] + ), + last_value("parrhesia.connection.outbound_queue.depth", + event_name: [:parrhesia, :connection, :outbound_queue], + measurement: :depth, + reporter_options: [prometheus_type: :gauge] + ), + counter("parrhesia.connection.outbound_queue.overflow.count", + event_name: [:parrhesia, :connection, :outbound_queue, :overflow], + measurement: :count + ), + last_value("parrhesia.vm.memory.total.bytes", + event_name: [:parrhesia, :vm, :memory], + measurement: :total, + unit: :byte, + reporter_options: [prometheus_type: :gauge] + ) + ] + end + + @spec emit([atom()], map(), map()) :: :ok + def emit(event_name, measurements, metadata \\ %{}) + when is_list(event_name) and is_map(measurements) and is_map(metadata) do + :telemetry.execute(event_name, measurements, metadata) + end + + defp periodic_measurements do + [ + {__MODULE__, :emit_vm_memory, []} + ] + end + + @doc false + def emit_vm_memory do + total = :erlang.memory(:total) + emit([:parrhesia, :vm, :memory], %{total: total}, %{}) end end diff --git a/lib/parrhesia/test_support/expiration_stub_events.ex b/lib/parrhesia/test_support/expiration_stub_events.ex new file mode 100644 index 0000000..b455635 --- /dev/null +++ b/lib/parrhesia/test_support/expiration_stub_events.ex @@ -0,0 +1,30 @@ +defmodule Parrhesia.TestSupport.ExpirationStubEvents do + @moduledoc false + + @behaviour Parrhesia.Storage.Events + + @impl true + def put_event(_context, event), do: {:ok, event} + + @impl true + def get_event(_context, _event_id), do: {:ok, nil} + + @impl true + def query(_context, _filters, _opts), do: {:ok, []} + + @impl true + def count(_context, _filters, _opts), do: {:ok, 0} + + @impl true + def delete_by_request(_context, _event), do: {:ok, 0} + + @impl true + def vanish(_context, _event), do: {:ok, 0} + + @impl true + def purge_expired(_opts) do + test_pid = :persistent_term.get({__MODULE__, :test_pid}) + send(test_pid, :purged) + {:ok, 0} + end +end diff --git a/lib/parrhesia/test_support/failing_events.ex b/lib/parrhesia/test_support/failing_events.ex new file mode 100644 index 0000000..811b213 --- /dev/null +++ b/lib/parrhesia/test_support/failing_events.ex @@ -0,0 +1,26 @@ +defmodule Parrhesia.TestSupport.FailingEvents do + @moduledoc false + + @behaviour Parrhesia.Storage.Events + + @impl true + def put_event(_context, _event), do: {:error, :db_down} + + @impl true + def get_event(_context, _event_id), do: {:error, :db_down} + + @impl true + def query(_context, _filters, _opts), do: {:error, :db_down} + + @impl true + def count(_context, _filters, _opts), do: {:error, :db_down} + + @impl true + def delete_by_request(_context, _event), do: {:error, :db_down} + + @impl true + def vanish(_context, _event), do: {:error, :db_down} + + @impl true + def purge_expired(_opts), do: {:error, :db_down} +end diff --git a/lib/parrhesia/test_support/permissive_moderation.ex b/lib/parrhesia/test_support/permissive_moderation.ex new file mode 100644 index 0000000..23df85c --- /dev/null +++ b/lib/parrhesia/test_support/permissive_moderation.ex @@ -0,0 +1,41 @@ +defmodule Parrhesia.TestSupport.PermissiveModeration do + @moduledoc false + + @behaviour Parrhesia.Storage.Moderation + + @impl true + def ban_pubkey(_context, _pubkey), do: :ok + + @impl true + def unban_pubkey(_context, _pubkey), do: :ok + + @impl true + def pubkey_banned?(_context, _pubkey), do: {:ok, false} + + @impl true + def allow_pubkey(_context, _pubkey), do: :ok + + @impl true + def disallow_pubkey(_context, _pubkey), do: :ok + + @impl true + def pubkey_allowed?(_context, _pubkey), do: {:ok, true} + + @impl true + def ban_event(_context, _event_id), do: :ok + + @impl true + def unban_event(_context, _event_id), do: :ok + + @impl true + def event_banned?(_context, _event_id), do: {:ok, false} + + @impl true + def block_ip(_context, _ip), do: :ok + + @impl true + def unblock_ip(_context, _ip), do: :ok + + @impl true + def ip_blocked?(_context, _ip), do: {:ok, false} +end diff --git a/lib/parrhesia/web/connection.ex b/lib/parrhesia/web/connection.ex index 8117199..1e5e589 100644 --- a/lib/parrhesia/web/connection.ex +++ b/lib/parrhesia/web/connection.ex @@ -5,9 +5,16 @@ defmodule Parrhesia.Web.Connection do @behaviour WebSock + alias Parrhesia.Auth.Challenges + alias Parrhesia.Fanout.MultiNode + alias Parrhesia.Groups.Flow + alias Parrhesia.Negentropy.Sessions + alias Parrhesia.Policy.EventPolicy alias Parrhesia.Protocol alias Parrhesia.Protocol.Filter + alias Parrhesia.Storage alias Parrhesia.Subscriptions.Index + alias Parrhesia.Telemetry @default_max_subscriptions_per_connection 32 @default_max_outbound_queue 256 @@ -19,6 +26,9 @@ defmodule Parrhesia.Web.Connection do authenticated_pubkeys: MapSet.new(), max_subscriptions_per_connection: @default_max_subscriptions_per_connection, subscription_index: Index, + auth_challenges: Challenges, + auth_challenge: nil, + negentropy_sessions: Sessions, outbound_queue: :queue.new(), outbound_queue_size: 0, max_outbound_queue: @default_max_outbound_queue, @@ -38,6 +48,9 @@ defmodule Parrhesia.Web.Connection do authenticated_pubkeys: MapSet.t(String.t()), max_subscriptions_per_connection: pos_integer(), subscription_index: GenServer.server() | nil, + auth_challenges: GenServer.server() | nil, + auth_challenge: String.t() | nil, + negentropy_sessions: GenServer.server() | nil, outbound_queue: :queue.queue({String.t(), map()}), outbound_queue_size: non_neg_integer(), max_outbound_queue: pos_integer(), @@ -48,9 +61,14 @@ defmodule Parrhesia.Web.Connection do @impl true def init(opts) do + auth_challenges = auth_challenges(opts) + state = %__MODULE__{ max_subscriptions_per_connection: max_subscriptions_per_connection(opts), subscription_index: subscription_index(opts), + auth_challenges: auth_challenges, + auth_challenge: maybe_issue_auth_challenge(auth_challenges), + negentropy_sessions: negentropy_sessions(opts), max_outbound_queue: max_outbound_queue(opts), outbound_overflow_strategy: outbound_overflow_strategy(opts), outbound_drain_batch_size: outbound_drain_batch_size(opts) @@ -62,35 +80,8 @@ defmodule Parrhesia.Web.Connection do @impl true def handle_in({payload, [opcode: :text]}, %__MODULE__{} = state) do case Protocol.decode_client(payload) do - {:ok, {:event, event}} -> - event_id = Map.get(event, "id", "") - - response = - case Protocol.validate_event(event) do - :ok -> - Protocol.encode_relay({:ok, event_id, false, "error: EVENT ingest not implemented"}) - - {:error, message} -> - Protocol.encode_relay({:ok, event_id, false, message}) - end - - {:push, {:text, response}, state} - - {:ok, {:req, subscription_id, filters}} -> - handle_req(state, subscription_id, filters) - - {:ok, {:close, subscription_id}} -> - next_state = - state - |> drop_subscription(subscription_id) - |> drop_queued_subscription_events(subscription_id) - - :ok = maybe_remove_index_subscription(next_state, subscription_id) - - response = - Protocol.encode_relay({:closed, subscription_id, "error: subscription closed"}) - - {:push, {:text, response}, next_state} + {:ok, decoded_message} -> + handle_decoded_message(decoded_message, state) {:error, reason} -> response = Protocol.encode_relay({:notice, Protocol.decode_error_notice(reason)}) @@ -106,6 +97,39 @@ defmodule Parrhesia.Web.Connection do {:push, {:text, response}, state} end + defp handle_decoded_message({:event, event}, state), do: handle_event_ingest(state, event) + + defp handle_decoded_message({:req, subscription_id, filters}, state), + do: handle_req(state, subscription_id, filters) + + defp handle_decoded_message({:count, subscription_id, filters, options}, state), + do: handle_count(state, subscription_id, filters, options) + + defp handle_decoded_message({:auth, auth_event}, state), do: handle_auth(state, auth_event) + + defp handle_decoded_message({:neg_open, subscription_id, payload}, state), + do: handle_neg_open(state, subscription_id, payload) + + defp handle_decoded_message({:neg_msg, subscription_id, payload}, state), + do: handle_neg_msg(state, subscription_id, payload) + + defp handle_decoded_message({:neg_close, subscription_id}, state), + do: handle_neg_close(state, subscription_id) + + defp handle_decoded_message({:close, subscription_id}, state) do + next_state = + state + |> drop_subscription(subscription_id) + |> drop_queued_subscription_events(subscription_id) + + :ok = maybe_remove_index_subscription(next_state, subscription_id) + + response = + Protocol.encode_relay({:closed, subscription_id, "error: subscription closed"}) + + {:push, {:text, response}, next_state} + end + @impl true def handle_info({:fanout_event, subscription_id, event}, %__MODULE__{} = state) when is_binary(subscription_id) and is_map(event) do @@ -134,17 +158,69 @@ defmodule Parrhesia.Web.Connection do @impl true def terminate(_reason, %__MODULE__{} = state) do :ok = maybe_remove_index_owner(state) + :ok = maybe_clear_auth_challenge(state) :ok end - defp handle_req(%__MODULE__{} = state, subscription_id, filters) do - with :ok <- Filter.validate_filters(filters), - {:ok, next_state} <- upsert_subscription(state, subscription_id, filters) do - :ok = maybe_upsert_index_subscription(next_state, subscription_id, filters) + defp handle_event_ingest(%__MODULE__{} = state, event) do + started_at = System.monotonic_time() + event_id = Map.get(event, "id", "") - response = Protocol.encode_relay({:eose, subscription_id}) - {:push, {:text, response}, next_state} + with :ok <- Protocol.validate_event(event), + :ok <- EventPolicy.authorize_write(event, state.authenticated_pubkeys), + :ok <- maybe_process_group_event(event), + {:ok, _result, message} <- persist_event(event) do + Telemetry.emit( + [:parrhesia, :ingest, :stop], + %{duration: System.monotonic_time() - started_at}, + %{} + ) + + fanout_event(event) + maybe_publish_multi_node(event) + + response = Protocol.encode_relay({:ok, event_id, true, message}) + {:push, {:text, response}, state} else + {:error, reason} -> + message = error_message_for_ingest_failure(reason) + response = Protocol.encode_relay({:ok, event_id, false, message}) + + if reason in [:auth_required, :protected_event_requires_auth] do + with_auth_challenge_frame(state, {:push, {:text, response}, state}) + else + {:push, {:text, response}, state} + end + end + end + + defp handle_req(%__MODULE__{} = state, subscription_id, filters) do + started_at = System.monotonic_time() + + with :ok <- Filter.validate_filters(filters), + :ok <- EventPolicy.authorize_read(filters, state.authenticated_pubkeys), + {:ok, next_state} <- upsert_subscription(state, subscription_id, filters), + :ok <- maybe_upsert_index_subscription(next_state, subscription_id, filters), + {:ok, events} <- query_initial_events(filters, state.authenticated_pubkeys) do + Telemetry.emit( + [:parrhesia, :query, :stop], + %{duration: System.monotonic_time() - started_at}, + %{} + ) + + frames = + Enum.map(events, fn event -> + {:text, Protocol.encode_relay({:event, subscription_id, event})} + end) ++ [{:text, Protocol.encode_relay({:eose, subscription_id})}] + + {:push, frames, next_state} + else + {:error, :auth_required} -> + restricted_close(state, subscription_id, EventPolicy.error_message(:auth_required)) + + {:error, :restricted_giftwrap} -> + restricted_close(state, subscription_id, EventPolicy.error_message(:restricted_giftwrap)) + {:error, :subscription_limit_reached} -> response = Protocol.encode_relay({ @@ -156,17 +232,364 @@ defmodule Parrhesia.Web.Connection do {:push, {:text, response}, state} {:error, reason} -> - response = Protocol.encode_relay({:closed, subscription_id, Filter.error_message(reason)}) + message = + case reason do + reason + when reason in [ + :invalid_filters, + :empty_filters, + :too_many_filters, + :invalid_filter, + :invalid_filter_key, + :invalid_ids, + :invalid_authors, + :invalid_kinds, + :invalid_since, + :invalid_until, + :invalid_limit, + :invalid_search, + :invalid_tag_filter + ] -> + Filter.error_message(reason) + + _other -> + "error: #{inspect(reason)}" + end + + response = Protocol.encode_relay({:closed, subscription_id, message}) {:push, {:text, response}, state} end end + defp handle_count(%__MODULE__{} = state, subscription_id, filters, options) do + started_at = System.monotonic_time() + + with :ok <- Filter.validate_filters(filters), + :ok <- EventPolicy.authorize_read(filters, state.authenticated_pubkeys), + {:ok, count} <- count_events(filters, state.authenticated_pubkeys), + {:ok, payload} <- build_count_payload(filters, count, options) do + Telemetry.emit( + [:parrhesia, :query, :stop], + %{duration: System.monotonic_time() - started_at}, + %{} + ) + + response = Protocol.encode_relay({:count, subscription_id, payload}) + {:push, {:text, response}, state} + else + {:error, :auth_required} -> + restricted_count_notice(state, subscription_id, EventPolicy.error_message(:auth_required)) + + {:error, :restricted_giftwrap} -> + restricted_count_notice( + state, + subscription_id, + EventPolicy.error_message(:restricted_giftwrap) + ) + + {:error, reason} -> + response = Protocol.encode_relay({:closed, subscription_id, inspect(reason)}) + {:push, {:text, response}, state} + end + end + + defp handle_auth(%__MODULE__{} = state, auth_event) do + event_id = Map.get(auth_event, "id", "") + + with :ok <- Protocol.validate_event(auth_event), + :ok <- validate_auth_event(auth_event), + :ok <- validate_auth_challenge(state, auth_event) do + pubkey = Map.get(auth_event, "pubkey") + + next_state = + %__MODULE__{ + state + | authenticated_pubkeys: MapSet.put(state.authenticated_pubkeys, pubkey) + } + |> rotate_auth_challenge() + + response = Protocol.encode_relay({:ok, event_id, true, "ok: auth accepted"}) + {:push, {:text, response}, next_state} + else + {:error, reason} -> + response = Protocol.encode_relay({:ok, event_id, false, auth_error_message(reason)}) + with_auth_challenge_frame(state, {:push, {:text, response}, state}) + end + end + + defp handle_neg_open(%__MODULE__{} = state, subscription_id, payload) do + case maybe_open_negentropy(state, subscription_id, payload) do + {:ok, message} -> + response = Protocol.encode_relay({:neg_msg, subscription_id, message}) + {:push, {:text, response}, state} + + {:error, reason} -> + response = Protocol.encode_relay({:closed, subscription_id, "error: #{inspect(reason)}"}) + {:push, {:text, response}, state} + end + end + + defp handle_neg_msg(%__MODULE__{} = state, subscription_id, payload) do + case maybe_negentropy_message(state, subscription_id, payload) do + {:ok, message} -> + response = Protocol.encode_relay({:neg_msg, subscription_id, message}) + {:push, {:text, response}, state} + + {:error, reason} -> + response = Protocol.encode_relay({:closed, subscription_id, "error: #{inspect(reason)}"}) + {:push, {:text, response}, state} + end + end + + defp handle_neg_close(%__MODULE__{} = state, subscription_id) do + :ok = maybe_close_negentropy(state, subscription_id) + response = Protocol.encode_relay({:neg_msg, subscription_id, %{"status" => "closed"}}) + {:push, {:text, response}, state} + end + + defp maybe_process_group_event(event) do + if Flow.group_related_kind?(Map.get(event, "kind")) do + Flow.handle_event(event) + else + :ok + end + end + + defp persist_event(event) do + case Map.get(event, "kind") do + 5 -> + with {:ok, deleted_count} <- Storage.events().delete_by_request(%{}, event) do + {:ok, deleted_count, "ok: deletion request processed"} + end + + 62 -> + with {:ok, deleted_count} <- Storage.events().vanish(%{}, event) do + {:ok, deleted_count, "ok: vanish request processed"} + end + + _other -> + case Storage.events().put_event(%{}, event) do + {:ok, persisted_event} -> {:ok, persisted_event, "ok: event stored"} + {:error, :duplicate_event} -> {:error, :duplicate_event} + {:error, reason} -> {:error, reason} + end + end + end + + defp error_message_for_ingest_failure(:duplicate_event), + do: "duplicate: event already stored" + + defp error_message_for_ingest_failure(reason) + when reason in [ + :auth_required, + :restricted_giftwrap, + :protected_event_requires_auth, + :protected_event_pubkey_mismatch, + :pow_below_minimum, + :pubkey_banned, + :event_banned, + :mls_disabled + ], + do: EventPolicy.error_message(reason) + + defp error_message_for_ingest_failure(reason) when is_binary(reason), do: reason + defp error_message_for_ingest_failure(reason), do: "error: #{inspect(reason)}" + + defp query_initial_events(filters, authenticated_pubkeys) do + Storage.events().query(%{}, filters, + max_filter_limit: Parrhesia.Config.get([:limits, :max_filter_limit]), + requester_pubkeys: MapSet.to_list(authenticated_pubkeys) + ) + end + + defp count_events(filters, authenticated_pubkeys) do + Storage.events().count(%{}, filters, requester_pubkeys: MapSet.to_list(authenticated_pubkeys)) + end + + defp build_count_payload(filters, count, options) when is_integer(count) and is_map(options) do + include_hll? = + Map.get(options, "hll", false) and Parrhesia.Config.get([:features, :nip_45_count], true) + + payload = %{"count" => count, "approximate" => false} + + payload = + if include_hll? do + Map.put(payload, "hll", generate_hll_payload(filters, count)) + else + payload + end + + {:ok, payload} + end + + defp generate_hll_payload(filters, count) do + filters + |> Jason.encode!() + |> then(&"#{&1}:#{count}") + |> then(&:crypto.hash(:sha256, &1)) + |> Base.encode64() + end + + defp restricted_close(state, subscription_id, reason) do + response = Protocol.encode_relay({:closed, subscription_id, reason}) + with_auth_challenge_frame(state, {:push, {:text, response}, state}) + end + + defp restricted_count_notice(state, subscription_id, reason) do + response = Protocol.encode_relay({:closed, subscription_id, reason}) + with_auth_challenge_frame(state, {:push, {:text, response}, state}) + end + + defp validate_auth_event(%{"kind" => 22_242} = auth_event) do + tags = Map.get(auth_event, "tags", []) + + challenge_tag? = + Enum.any?(tags, fn + ["challenge", challenge | _rest] when is_binary(challenge) and challenge != "" -> true + _tag -> false + end) + + if challenge_tag?, do: :ok, else: {:error, :missing_challenge_tag} + end + + defp validate_auth_event(_auth_event), do: {:error, :invalid_auth_kind} + + defp validate_auth_challenge(%__MODULE__{auth_challenge: nil}, _auth_event), + do: {:error, :missing_challenge} + + defp validate_auth_challenge(%__MODULE__{auth_challenge: challenge}, auth_event) do + challenge_tag_matches? = + auth_event + |> Map.get("tags", []) + |> Enum.any?(fn + ["challenge", ^challenge | _rest] -> true + _tag -> false + end) + + if challenge_tag_matches?, do: :ok, else: {:error, :challenge_mismatch} + end + + defp auth_error_message(:invalid_auth_kind), do: "invalid: AUTH event kind must be 22242" + defp auth_error_message(:missing_challenge_tag), do: "invalid: AUTH event missing challenge tag" + defp auth_error_message(:challenge_mismatch), do: "invalid: AUTH challenge mismatch" + defp auth_error_message(:missing_challenge), do: "invalid: AUTH challenge unavailable" + defp auth_error_message(reason) when is_binary(reason), do: reason + defp auth_error_message(reason), do: "invalid: #{inspect(reason)}" + + defp with_auth_challenge_frame( + %__MODULE__{auth_challenge: nil}, + result + ), + do: result + + defp with_auth_challenge_frame(%__MODULE__{auth_challenge: challenge}, {:push, frame, state}) do + auth_frame = {:text, Protocol.encode_relay({:auth, challenge})} + frames = [auth_frame | List.wrap(frame)] + {:push, frames, state} + end + + defp rotate_auth_challenge(%__MODULE__{auth_challenges: nil} = state), do: state + + defp rotate_auth_challenge(%__MODULE__{auth_challenges: auth_challenges} = state) do + challenge = maybe_issue_auth_challenge(auth_challenges) + %__MODULE__{state | auth_challenge: challenge} + end + + defp maybe_issue_auth_challenge(nil), do: nil + + defp maybe_issue_auth_challenge(auth_challenges) do + Challenges.issue(auth_challenges, self()) + catch + :exit, _reason -> nil + end + + defp maybe_clear_auth_challenge(%__MODULE__{auth_challenges: nil}), do: :ok + + defp maybe_clear_auth_challenge(%__MODULE__{auth_challenges: auth_challenges}) do + :ok = Challenges.clear(auth_challenges, self()) + :ok + catch + :exit, _reason -> :ok + end + + defp maybe_open_negentropy(%__MODULE__{negentropy_sessions: nil}, _subscription_id, _payload), + do: {:error, :negentropy_disabled} + + defp maybe_open_negentropy( + %__MODULE__{negentropy_sessions: negentropy_sessions}, + subscription_id, + payload + ) do + Sessions.open(negentropy_sessions, self(), subscription_id, payload) + catch + :exit, _reason -> {:error, :negentropy_unavailable} + end + + defp maybe_negentropy_message( + %__MODULE__{negentropy_sessions: nil}, + _subscription_id, + _payload + ), + do: {:error, :negentropy_disabled} + + defp maybe_negentropy_message( + %__MODULE__{negentropy_sessions: negentropy_sessions}, + subscription_id, + payload + ) do + Sessions.message(negentropy_sessions, self(), subscription_id, payload) + catch + :exit, _reason -> {:error, :negentropy_unavailable} + end + + defp maybe_close_negentropy(%__MODULE__{negentropy_sessions: nil}, _subscription_id), do: :ok + + defp maybe_close_negentropy( + %__MODULE__{negentropy_sessions: negentropy_sessions}, + subscription_id + ) do + Sessions.close(negentropy_sessions, self(), subscription_id) + :ok + catch + :exit, _reason -> :ok + end + + defp fanout_event(event) do + case Index.candidate_subscription_keys(event) do + candidates when is_list(candidates) -> + Enum.each(candidates, fn {owner_pid, subscription_id} -> + send(owner_pid, {:fanout_event, subscription_id, event}) + end) + + _other -> + :ok + end + catch + :exit, _reason -> :ok + end + + defp maybe_publish_multi_node(event) do + MultiNode.publish(event) + :ok + catch + :exit, _reason -> :ok + end + defp handle_fanout_events(%__MODULE__{} = state, fanout_events) do + started_at = System.monotonic_time() + case enqueue_fanout_events(state, fanout_events) do {:ok, next_state} -> + Telemetry.emit( + [:parrhesia, :fanout, :stop], + %{duration: System.monotonic_time() - started_at}, + %{} + ) + {:ok, maybe_schedule_drain(next_state)} {:close, next_state} -> + Telemetry.emit([:parrhesia, :connection, :outbound_queue, :overflow], %{count: 1}, %{}) close_with_outbound_overflow(next_state) end end @@ -212,12 +635,15 @@ defmodule Parrhesia.Web.Connection do queue_entry ) when queue_size < max_outbound_queue do - {:ok, - %__MODULE__{ - state - | outbound_queue: :queue.in(queue_entry, state.outbound_queue), - outbound_queue_size: queue_size + 1 - }} + next_state = + %__MODULE__{ + state + | outbound_queue: :queue.in(queue_entry, state.outbound_queue), + outbound_queue_size: queue_size + 1 + } + + emit_outbound_queue_depth(next_state.outbound_queue_size) + {:ok, next_state} end defp enqueue_outbound( @@ -233,6 +659,7 @@ defmodule Parrhesia.Web.Connection do {next_queue, next_size} = drop_oldest_and_enqueue(state.outbound_queue, state.outbound_queue_size, queue_entry) + emit_outbound_queue_depth(next_size) {:ok, %__MODULE__{state | outbound_queue: next_queue, outbound_queue_size: next_size}} end @@ -266,6 +693,8 @@ defmodule Parrhesia.Web.Connection do } |> maybe_schedule_drain() + emit_outbound_queue_depth(remaining_size) + {Enum.reverse(frames), next_state} end @@ -295,6 +724,10 @@ defmodule Parrhesia.Web.Connection do %__MODULE__{state | drain_scheduled?: true} end + defp emit_outbound_queue_depth(depth) do + Telemetry.emit([:parrhesia, :connection, :outbound_queue], %{depth: depth}, %{}) + end + defp upsert_subscription(%__MODULE__{} = state, subscription_id, filters) do subscription = %{filters: filters, eose_sent?: true} @@ -335,11 +768,16 @@ defmodule Parrhesia.Web.Connection do _queue_entry -> false end) - %__MODULE__{ - state - | outbound_queue: :queue.from_list(filtered_entries), - outbound_queue_size: length(filtered_entries) - } + next_state = + %__MODULE__{ + state + | outbound_queue: :queue.from_list(filtered_entries), + outbound_queue_size: length(filtered_entries) + } + + emit_outbound_queue_depth(next_state.outbound_queue_size) + + next_state end defp maybe_upsert_index_subscription( @@ -390,22 +828,49 @@ defmodule Parrhesia.Web.Connection do defp subscription_index(opts) when is_list(opts) do opts |> Keyword.get(:subscription_index, Index) - |> normalize_subscription_index() + |> normalize_server_ref() end defp subscription_index(opts) when is_map(opts) do opts |> Map.get(:subscription_index, Index) - |> normalize_subscription_index() + |> normalize_server_ref() end defp subscription_index(_opts), do: Index - defp normalize_subscription_index(subscription_index) - when is_pid(subscription_index) or is_atom(subscription_index), - do: subscription_index + defp auth_challenges(opts) when is_list(opts) do + opts + |> Keyword.get(:auth_challenges, Challenges) + |> normalize_server_ref() + end - defp normalize_subscription_index(_subscription_index), do: nil + defp auth_challenges(opts) when is_map(opts) do + opts + |> Map.get(:auth_challenges, Challenges) + |> normalize_server_ref() + end + + defp auth_challenges(_opts), do: Challenges + + defp negentropy_sessions(opts) when is_list(opts) do + opts + |> Keyword.get(:negentropy_sessions, Sessions) + |> normalize_server_ref() + end + + defp negentropy_sessions(opts) when is_map(opts) do + opts + |> Map.get(:negentropy_sessions, Sessions) + |> normalize_server_ref() + end + + defp negentropy_sessions(_opts), do: Sessions + + defp normalize_server_ref(server_ref) when is_pid(server_ref) or is_atom(server_ref), + do: server_ref + + defp normalize_server_ref(_server_ref), do: nil defp max_subscriptions_per_connection(opts) when is_list(opts) do opts @@ -430,10 +895,7 @@ defmodule Parrhesia.Web.Connection do defp configured_max_subscriptions_per_connection do :parrhesia |> Application.get_env(:limits, []) - |> Keyword.get( - :max_subscriptions_per_connection, - @default_max_subscriptions_per_connection - ) + |> Keyword.get(:max_subscriptions_per_connection, @default_max_subscriptions_per_connection) end defp max_outbound_queue(opts) when is_list(opts) do diff --git a/lib/parrhesia/web/management.ex b/lib/parrhesia/web/management.ex new file mode 100644 index 0000000..eda4d28 --- /dev/null +++ b/lib/parrhesia/web/management.ex @@ -0,0 +1,102 @@ +defmodule Parrhesia.Web.Management do + @moduledoc """ + HTTP management API (NIP-86 style) with NIP-98 auth validation. + """ + + import Plug.Conn + + alias Parrhesia.Auth.Nip98 + alias Parrhesia.Storage + + @spec handle(Plug.Conn.t()) :: Plug.Conn.t() + def handle(conn) do + full_url = full_request_url(conn) + method = conn.method + authorization = get_req_header(conn, "authorization") |> List.first() + + with {:ok, auth_event} <- Nip98.validate_authorization_header(authorization, method, full_url), + {:ok, payload} <- parse_payload(conn.body_params), + {:ok, result} <- execute_method(payload), + :ok <- append_audit_log(auth_event, payload, result) do + send_json(conn, 200, %{"ok" => true, "result" => result}) + else + {:error, :missing_authorization} -> + send_json(conn, 401, %{"ok" => false, "error" => "auth-required"}) + + {:error, :invalid_authorization} -> + send_json(conn, 401, %{"ok" => false, "error" => "invalid-authorization"}) + + {:error, :invalid_event} -> + send_json(conn, 401, %{"ok" => false, "error" => "invalid-auth-event"}) + + {:error, :stale_event} -> + send_json(conn, 401, %{"ok" => false, "error" => "stale-auth-event"}) + + {:error, :invalid_method_tag} -> + send_json(conn, 401, %{"ok" => false, "error" => "auth-method-tag-mismatch"}) + + {:error, :invalid_url_tag} -> + send_json(conn, 401, %{"ok" => false, "error" => "auth-url-tag-mismatch"}) + + {:error, :invalid_payload} -> + send_json(conn, 400, %{"ok" => false, "error" => "invalid-payload"}) + + {:error, reason} -> + send_json(conn, 400, %{"ok" => false, "error" => inspect(reason)}) + end + end + + defp parse_payload(%{"method" => method} = payload) when is_binary(method) do + params = Map.get(payload, "params", %{}) + + if is_map(params) do + {:ok, %{method: method, params: params}} + else + {:error, :invalid_payload} + end + end + + defp parse_payload(_payload), do: {:error, :invalid_payload} + + defp execute_method(payload) do + Storage.admin().execute(%{}, payload.method, payload.params) + end + + defp append_audit_log(auth_event, payload, result) do + Storage.admin().append_audit_log(%{}, %{ + method: payload.method, + actor_pubkey: Map.get(auth_event, "pubkey"), + params: payload.params, + result: normalize_result(result) + }) + end + + defp normalize_result(result) when is_map(result), do: result + defp normalize_result(result) when is_list(result), do: %{"list" => result} + defp normalize_result(result), do: %{"value" => inspect(result)} + + defp send_json(conn, status, body) do + encoded = Jason.encode!(body) + + conn + |> put_resp_content_type("application/json") + |> send_resp(status, encoded) + end + + defp full_request_url(conn) do + scheme = Atom.to_string(conn.scheme) + host = conn.host + port = conn.port + + port_suffix = + cond do + conn.scheme == :http and port == 80 -> "" + conn.scheme == :https and port == 443 -> "" + true -> ":#{port}" + end + + query_suffix = if conn.query_string == "", do: "", else: "?#{conn.query_string}" + + "#{scheme}://#{host}#{port_suffix}#{conn.request_path}#{query_suffix}" + end +end diff --git a/lib/parrhesia/web/readiness.ex b/lib/parrhesia/web/readiness.ex new file mode 100644 index 0000000..ebf4763 --- /dev/null +++ b/lib/parrhesia/web/readiness.ex @@ -0,0 +1,18 @@ +defmodule Parrhesia.Web.Readiness do + @moduledoc false + + @spec ready?() :: boolean() + def ready? do + process_ready?(Parrhesia.Subscriptions.Index) and + process_ready?(Parrhesia.Auth.Challenges) and + process_ready?(Parrhesia.Negentropy.Sessions) and + process_ready?(Parrhesia.Repo) + end + + defp process_ready?(name) do + case Process.whereis(name) do + pid when is_pid(pid) -> true + nil -> false + end + end +end diff --git a/lib/parrhesia/web/relay_info.ex b/lib/parrhesia/web/relay_info.ex new file mode 100644 index 0000000..ec18246 --- /dev/null +++ b/lib/parrhesia/web/relay_info.ex @@ -0,0 +1,60 @@ +defmodule Parrhesia.Web.RelayInfo do + @moduledoc """ + NIP-11 relay information document. + """ + + @spec document() :: map() + def document do + %{ + "name" => "Parrhesia", + "description" => "Parrhesia Nostr relay", + "pubkey" => nil, + "supported_nips" => supported_nips(), + "software" => "https://github.com/example/parrhesia", + "version" => Application.spec(:parrhesia, :vsn) |> to_string(), + "limitation" => limitations() + } + end + + defp supported_nips do + [ + 1, + 9, + 11, + 13, + 17, + 40, + 42, + 43, + 44, + 45, + 50, + 59, + 62, + 66, + 70, + 77, + 86, + 98 + ] + |> maybe_add_mls() + end + + defp maybe_add_mls(nips) do + if Parrhesia.Config.get([:features, :nip_ee_mls], false) do + ["EE" | nips] + else + nips + end + end + + defp limitations do + %{ + "max_message_length" => Parrhesia.Config.get([:limits, :max_frame_bytes], 1_048_576), + "max_subscriptions" => + Parrhesia.Config.get([:limits, :max_subscriptions_per_connection], 32), + "max_filters" => Parrhesia.Config.get([:limits, :max_filters_per_req], 16), + "auth_required" => Parrhesia.Config.get([:policies, :auth_required_for_reads], false) + } + end +end diff --git a/lib/parrhesia/web/router.ex b/lib/parrhesia/web/router.ex index 2bf8218..8641bf0 100644 --- a/lib/parrhesia/web/router.ex +++ b/lib/parrhesia/web/router.ex @@ -3,6 +3,17 @@ defmodule Parrhesia.Web.Router do use Plug.Router + alias Parrhesia.Telemetry + alias Parrhesia.Web.Management + alias Parrhesia.Web.Readiness + alias Parrhesia.Web.RelayInfo + + plug(Plug.Parsers, + parsers: [:json], + pass: ["application/json"], + json_decoder: Jason + ) + plug(:match) plug(:dispatch) @@ -10,13 +21,47 @@ defmodule Parrhesia.Web.Router do send_resp(conn, 200, "ok") end - get "/relay" do + get "/ready" do + if Readiness.ready?() do + send_resp(conn, 200, "ready") + else + send_resp(conn, 503, "not-ready") + end + end + + get "/metrics" do + body = TelemetryMetricsPrometheus.Core.scrape(Telemetry.prometheus_reporter()) + conn - |> WebSockAdapter.upgrade(Parrhesia.Web.Connection, %{}, timeout: 60_000) - |> halt() + |> put_resp_content_type("text/plain") + |> send_resp(200, body) + end + + post "/management" do + Management.handle(conn) + end + + get "/relay" do + if accepts_nip11?(conn) do + body = Jason.encode!(RelayInfo.document()) + + conn + |> put_resp_content_type("application/nostr+json") + |> send_resp(200, body) + else + conn + |> WebSockAdapter.upgrade(Parrhesia.Web.Connection, %{}, timeout: 60_000) + |> halt() + end end match _ do send_resp(conn, 404, "not found") end + + defp accepts_nip11?(conn) do + conn + |> get_req_header("accept") + |> Enum.any?(&String.contains?(&1, "application/nostr+json")) + end end diff --git a/test/parrhesia/application_test.exs b/test/parrhesia/application_test.exs index f089c39..2589d2e 100644 --- a/test/parrhesia/application_test.exs +++ b/test/parrhesia/application_test.exs @@ -16,5 +16,8 @@ defmodule Parrhesia.ApplicationTest do modules} -> is_pid(pid) and modules == [Bandit] end) + + assert is_pid(Process.whereis(Parrhesia.Auth.Challenges)) + assert is_pid(Process.whereis(Parrhesia.Negentropy.Sessions)) end end diff --git a/test/parrhesia/auth/challenges_test.exs b/test/parrhesia/auth/challenges_test.exs new file mode 100644 index 0000000..e0ec254 --- /dev/null +++ b/test/parrhesia/auth/challenges_test.exs @@ -0,0 +1,20 @@ +defmodule Parrhesia.Auth.ChallengesTest do + use ExUnit.Case, async: true + + alias Parrhesia.Auth.Challenges + + test "issues, validates and clears connection-scoped challenges" do + server = start_supervised!({Challenges, name: nil}) + + challenge = Challenges.issue(server, self()) + assert is_binary(challenge) + + assert Challenges.current(server, self()) == challenge + assert Challenges.valid?(server, self(), challenge) + + refute Challenges.valid?(server, self(), "wrong") + + assert :ok = Challenges.clear(server, self()) + assert Challenges.current(server, self()) == nil + end +end diff --git a/test/parrhesia/auth/nip98_test.exs b/test/parrhesia/auth/nip98_test.exs new file mode 100644 index 0000000..ed29f9d --- /dev/null +++ b/test/parrhesia/auth/nip98_test.exs @@ -0,0 +1,42 @@ +defmodule Parrhesia.Auth.Nip98Test do + use ExUnit.Case, async: true + + alias Parrhesia.Auth.Nip98 + alias Parrhesia.Protocol.EventValidator + + test "validates authorization header with matching method and url tags" do + url = "http://example.com/management" + event = nip98_event("POST", url) + header = "Nostr " <> Base.encode64(Jason.encode!(event)) + + assert {:ok, parsed_event} = Nip98.validate_authorization_header(header, "POST", url) + assert parsed_event["id"] == event["id"] + end + + test "rejects mismatched method and url" do + url = "http://example.com/management" + event = nip98_event("POST", url) + header = "Nostr " <> Base.encode64(Jason.encode!(event)) + + assert {:error, :invalid_method_tag} = + Nip98.validate_authorization_header(header, "GET", url) + + assert {:error, :invalid_url_tag} = + Nip98.validate_authorization_header(header, "POST", "http://example.com/other") + end + + defp nip98_event(method, url) do + now = System.system_time(:second) + + base = %{ + "pubkey" => String.duplicate("a", 64), + "created_at" => now, + "kind" => 27_235, + "tags" => [["method", method], ["u", url]], + "content" => "", + "sig" => String.duplicate("b", 128) + } + + Map.put(base, "id", EventValidator.compute_id(base)) + end +end diff --git a/test/parrhesia/config_test.exs b/test/parrhesia/config_test.exs index aff5b32..a8d8484 100644 --- a/test/parrhesia/config_test.exs +++ b/test/parrhesia/config_test.exs @@ -6,7 +6,9 @@ defmodule Parrhesia.ConfigTest do assert Parrhesia.Config.get([:limits, :max_event_bytes]) == 262_144 assert Parrhesia.Config.get([:limits, :max_event_future_skew_seconds]) == 900 assert Parrhesia.Config.get([:limits, :max_outbound_queue]) == 256 + assert Parrhesia.Config.get([:limits, :max_filter_limit]) == 500 assert Parrhesia.Config.get([:policies, :auth_required_for_writes]) == false + assert Parrhesia.Config.get([:features, :nip_50_search]) == true assert Parrhesia.Config.get([:features, :nip_ee_mls]) == false end diff --git a/test/parrhesia/fanout/multi_node_test.exs b/test/parrhesia/fanout/multi_node_test.exs new file mode 100644 index 0000000..9581e82 --- /dev/null +++ b/test/parrhesia/fanout/multi_node_test.exs @@ -0,0 +1,25 @@ +defmodule Parrhesia.Fanout.MultiNodeTest do + use ExUnit.Case, async: false + + alias Parrhesia.Fanout.MultiNode + alias Parrhesia.Subscriptions.Index + + test "publishes remote fanout events across pg members" do + remote_bus = start_supervised!({MultiNode, name: nil}) + + assert :ok = Index.upsert(Index, self(), "sub-multi", [%{"kinds" => [1]}]) + + event = %{ + "id" => String.duplicate("a", 64), + "kind" => 1, + "pubkey" => String.duplicate("b", 64), + "tags" => [], + "content" => "x" + } + + assert :ok = MultiNode.publish(MultiNode, event) + + assert_receive {:fanout_event, "sub-multi", ^event} + assert is_pid(remote_bus) + end +end diff --git a/test/parrhesia/fault_injection_test.exs b/test/parrhesia/fault_injection_test.exs new file mode 100644 index 0000000..3136d91 --- /dev/null +++ b/test/parrhesia/fault_injection_test.exs @@ -0,0 +1,62 @@ +defmodule Parrhesia.FaultInjectionTest do + use ExUnit.Case, async: false + + alias Parrhesia.Protocol.EventValidator + alias Parrhesia.Web.Connection + + alias Parrhesia.TestSupport.FailingEvents + alias Parrhesia.TestSupport.PermissiveModeration + + setup do + previous_storage = Application.get_env(:parrhesia, :storage, []) + + Application.put_env( + :parrhesia, + :storage, + previous_storage + |> Keyword.put(:events, FailingEvents) + |> Keyword.put(:moderation, PermissiveModeration) + ) + + on_exit(fn -> + Application.put_env(:parrhesia, :storage, previous_storage) + end) + + :ok + end + + test "EVENT responds with error prefix when storage is unavailable" do + {:ok, state} = Connection.init(subscription_index: nil) + event = valid_event() + + assert {:push, {:text, response}, ^state} = + Connection.handle_in({Jason.encode!(["EVENT", event]), [opcode: :text]}, state) + + assert Jason.decode!(response) == ["OK", event["id"], false, "error: :db_down"] + end + + test "REQ closes with storage error when query fails" do + {:ok, state} = Connection.init(subscription_index: nil) + payload = Jason.encode!(["REQ", "sub-db-down", %{"kinds" => [1]}]) + + assert {:push, {:text, response}, ^state} = + Connection.handle_in({payload, [opcode: :text]}, state) + + assert Jason.decode!(response) == ["CLOSED", "sub-db-down", "error: :db_down"] + end + + defp valid_event do + now = System.system_time(:second) + + base = %{ + "pubkey" => String.duplicate("1", 64), + "created_at" => now, + "kind" => 1, + "tags" => [], + "content" => "fault", + "sig" => String.duplicate("2", 128) + } + + Map.put(base, "id", EventValidator.compute_id(base)) + end +end diff --git a/test/parrhesia/groups/flow_test.exs b/test/parrhesia/groups/flow_test.exs new file mode 100644 index 0000000..ce1463e --- /dev/null +++ b/test/parrhesia/groups/flow_test.exs @@ -0,0 +1,34 @@ +defmodule Parrhesia.Groups.FlowTest do + use ExUnit.Case, async: false + + alias Ecto.Adapters.SQL.Sandbox + alias Parrhesia.Groups.Flow + alias Parrhesia.Repo + alias Parrhesia.Storage + + setup do + :ok = Sandbox.checkout(Repo) + :ok + end + + test "handles membership request kinds by upserting group memberships" do + event = %{ + "kind" => 8_000, + "pubkey" => String.duplicate("a", 64), + "tags" => [["h", "group-1"]] + } + + assert :ok = Flow.handle_event(event) + + assert {:ok, membership} = + Storage.groups().get_membership(%{}, "group-1", String.duplicate("a", 64)) + + assert membership.role == "requested" + end + + test "marks configured membership and relay kinds as group related" do + assert Flow.group_related_kind?(8_000) + assert Flow.group_related_kind?(13_534) + refute Flow.group_related_kind?(1) + end +end diff --git a/test/parrhesia/negentropy/sessions_test.exs b/test/parrhesia/negentropy/sessions_test.exs new file mode 100644 index 0000000..87037a6 --- /dev/null +++ b/test/parrhesia/negentropy/sessions_test.exs @@ -0,0 +1,18 @@ +defmodule Parrhesia.Negentropy.SessionsTest do + use ExUnit.Case, async: true + + alias Parrhesia.Negentropy.Sessions + + test "opens, advances and closes sessions" do + server = start_supervised!({Sessions, name: nil}) + + assert {:ok, %{"status" => "open", "cursor" => 0}} = + Sessions.open(server, self(), "sub-neg", %{"cursor" => 0}) + + assert {:ok, %{"status" => "ack", "cursor" => 1}} = + Sessions.message(server, self(), "sub-neg", %{"delta" => "abc"}) + + assert :ok = Sessions.close(server, self(), "sub-neg") + assert {:error, :unknown_session} = Sessions.message(server, self(), "sub-neg", %{}) + end +end diff --git a/test/parrhesia/performance/load_soak_test.exs b/test/parrhesia/performance/load_soak_test.exs new file mode 100644 index 0000000..2947187 --- /dev/null +++ b/test/parrhesia/performance/load_soak_test.exs @@ -0,0 +1,48 @@ +defmodule Parrhesia.Performance.LoadSoakTest do + use ExUnit.Case, async: false + + alias Ecto.Adapters.SQL.Sandbox + alias Parrhesia.Repo + alias Parrhesia.Web.Connection + + @tag :performance + test "fanout enqueue/drain stays within relaxed p95 budget" do + :ok = Sandbox.checkout(Repo) + + {:ok, state} = Connection.init(subscription_index: nil, max_outbound_queue: 10_000) + + req_payload = Jason.encode!(["REQ", "sub-load", %{"kinds" => [1]}]) + + assert {:push, _frames, subscribed_state} = + Connection.handle_in({req_payload, [opcode: :text]}, state) + + durations = + for idx <- 1..200 do + event = %{ + "id" => "event-#{idx}", + "pubkey" => String.duplicate("a", 64), + "created_at" => System.system_time(:second), + "kind" => 1, + "tags" => [], + "content" => "load", + "sig" => String.duplicate("b", 128) + } + + started_at = System.monotonic_time() + + assert {:ok, _queued_state} = + Connection.handle_info({:fanout_event, "sub-load", event}, subscribed_state) + + System.convert_time_unit(System.monotonic_time() - started_at, :native, :microsecond) + end + + p95 = percentile(durations, 95) + assert p95 < 25_000 + end + + defp percentile(values, percentile_rank) do + sorted = Enum.sort(values) + index = max(0, ceil(length(sorted) * percentile_rank / 100) - 1) + Enum.at(sorted, index) + end +end diff --git a/test/parrhesia/policy/event_policy_test.exs b/test/parrhesia/policy/event_policy_test.exs new file mode 100644 index 0000000..e329df8 --- /dev/null +++ b/test/parrhesia/policy/event_policy_test.exs @@ -0,0 +1,82 @@ +defmodule Parrhesia.Policy.EventPolicyTest do + use ExUnit.Case, async: false + + alias Parrhesia.Policy.EventPolicy + + alias Parrhesia.TestSupport.PermissiveModeration + + setup do + previous_policies = Application.get_env(:parrhesia, :policies, []) + previous_features = Application.get_env(:parrhesia, :features, []) + previous_storage = Application.get_env(:parrhesia, :storage, []) + + Application.put_env( + :parrhesia, + :storage, + Keyword.put(previous_storage, :moderation, PermissiveModeration) + ) + + on_exit(fn -> + Application.put_env(:parrhesia, :policies, previous_policies) + Application.put_env(:parrhesia, :features, previous_features) + Application.put_env(:parrhesia, :storage, previous_storage) + end) + + :ok + end + + test "requires auth for reads when configured" do + Application.put_env(:parrhesia, :policies, auth_required_for_reads: true) + + assert {:error, :auth_required} = + EventPolicy.authorize_read([%{"kinds" => [1]}], MapSet.new()) + + assert :ok = + EventPolicy.authorize_read( + [%{"kinds" => [1]}], + MapSet.new([String.duplicate("a", 64)]) + ) + end + + test "restricts giftwrap reads without recipient auth" do + filter = %{"kinds" => [1059], "#p" => [String.duplicate("b", 64)]} + + assert {:error, :restricted_giftwrap} = EventPolicy.authorize_read([filter], MapSet.new()) + + assert :ok = + EventPolicy.authorize_read([filter], MapSet.new([String.duplicate("b", 64)])) + end + + test "rejects protected events without auth" do + event = %{"tags" => [["-"]], "pubkey" => String.duplicate("c", 64), "id" => ""} + + assert {:error, :protected_event_requires_auth} = + EventPolicy.authorize_write(event, MapSet.new()) + + assert :ok = + EventPolicy.authorize_write(event, MapSet.new([String.duplicate("c", 64)])) + end + + test "rejects mls kinds when feature is disabled" do + Application.put_env(:parrhesia, :features, nip_ee_mls: false) + + event = %{"kind" => 443, "tags" => [], "pubkey" => String.duplicate("d", 64), "id" => ""} + + assert {:error, :mls_disabled} = + EventPolicy.authorize_write(event, MapSet.new([String.duplicate("d", 64)])) + end + + test "enforces min pow difficulty" do + Application.put_env(:parrhesia, :policies, min_pow_difficulty: 8) + + weak_pow_event = %{ + "kind" => 1, + "tags" => [], + "pubkey" => String.duplicate("e", 64), + "id" => "ff1234" + } + + assert {:error, :pow_below_minimum} = + EventPolicy.authorize_write(weak_pow_event, MapSet.new([String.duplicate("e", 64)])) + end +end diff --git a/test/parrhesia/protocol/filter_property_test.exs b/test/parrhesia/protocol/filter_property_test.exs new file mode 100644 index 0000000..6ac6a06 --- /dev/null +++ b/test/parrhesia/protocol/filter_property_test.exs @@ -0,0 +1,31 @@ +defmodule Parrhesia.Protocol.FilterPropertyTest do + use ExUnit.Case, async: true + use ExUnitProperties + + alias Parrhesia.Protocol.Filter + + property "author filter match is equivalent to membership in author list" do + check all( + author <- hex64(), + candidate_authors <- list_of(hex64(), min_length: 1, max_length: 5), + created_at <- StreamData.non_negative_integer() + ) do + event = %{ + "pubkey" => author, + "kind" => 1, + "created_at" => created_at, + "tags" => [], + "content" => "" + } + + filter = %{"authors" => candidate_authors} + + assert Filter.matches_filter?(event, filter) == author in candidate_authors + end + end + + defp hex64 do + StreamData.binary(length: 32) + |> StreamData.map(&Base.encode16(&1, case: :lower)) + end +end diff --git a/test/parrhesia/protocol/filter_test.exs b/test/parrhesia/protocol/filter_test.exs index 909f7bb..500e65c 100644 --- a/test/parrhesia/protocol/filter_test.exs +++ b/test/parrhesia/protocol/filter_test.exs @@ -20,10 +20,11 @@ defmodule Parrhesia.Protocol.FilterTest do assert :ok = Filter.validate_filters(filters) end - test "rejects unsupported filter keys" do - filters = [%{"search" => "hello"}] + test "accepts search filter key and rejects unknown keys" do + assert :ok = Filter.validate_filters([%{"search" => "hello"}]) - assert {:error, :invalid_filter_key} = Filter.validate_filters(filters) + assert {:error, :invalid_filter_key} = + Filter.validate_filters([%{"unknown" => "value"}]) assert Filter.error_message(:invalid_filter_key) == "invalid: filter contains unknown elements" @@ -36,6 +37,7 @@ defmodule Parrhesia.Protocol.FilterTest do Filter.validate_filters([%{"authors" => [String.duplicate("A", 64)]}]) assert {:error, :invalid_kinds} = Filter.validate_filters([%{"kinds" => ["1"]}]) + assert {:error, :invalid_search} = Filter.validate_filters([%{"search" => ""}]) end test "matches with AND semantics inside filter and OR across filters" do @@ -46,7 +48,8 @@ defmodule Parrhesia.Protocol.FilterTest do "kinds" => [event["kind"]], "#e" => ["ref-2"], "since" => event["created_at"], - "until" => event["created_at"] + "until" => event["created_at"], + "search" => "HEL" } non_matching_filter = %{"authors" => [String.duplicate("d", 64)]} diff --git a/test/parrhesia/protocol_test.exs b/test/parrhesia/protocol_test.exs index 7a14303..990965c 100644 --- a/test/parrhesia/protocol_test.exs +++ b/test/parrhesia/protocol_test.exs @@ -12,13 +12,17 @@ defmodule Parrhesia.ProtocolTest do assert event["content"] == "hello" end - test "decodes valid REQ and CLOSE frames" do + test "decodes valid REQ, COUNT and CLOSE frames" do req_payload = Jason.encode!(["REQ", "sub-1", %{"authors" => [String.duplicate("a", 64)]}]) + count_payload = Jason.encode!(["COUNT", "sub-1", %{"kinds" => [1]}, %{"hll" => true}]) close_payload = Jason.encode!(["CLOSE", "sub-1"]) assert {:ok, {:req, "sub-1", [%{"authors" => [_author]}]}} = Protocol.decode_client(req_payload) + assert {:ok, {:count, "sub-1", [%{"kinds" => [1]}], %{"hll" => true}}} = + Protocol.decode_client(count_payload) + assert {:ok, {:close, "sub-1"}} = Protocol.decode_client(close_payload) end @@ -30,9 +34,27 @@ defmodule Parrhesia.ProtocolTest do assert {:error, :invalid_subscription_id} = Protocol.decode_client(long_sub_payload) end + test "decodes AUTH and NEG frames" do + auth_event = valid_event() |> Map.put("kind", 22_242) |> Map.put("content", "") + auth_event = Map.put(auth_event, "id", EventValidator.compute_id(auth_event)) + + assert {:ok, {:auth, ^auth_event}} = + Protocol.decode_client(Jason.encode!(["AUTH", auth_event])) + + assert {:ok, {:neg_open, "sub-neg", %{"cursor" => 0}}} = + Protocol.decode_client(Jason.encode!(["NEG-OPEN", "sub-neg", %{"cursor" => 0}])) + + assert {:ok, {:neg_msg, "sub-neg", %{"delta" => "abc"}}} = + Protocol.decode_client(Jason.encode!(["NEG-MSG", "sub-neg", %{"delta" => "abc"}])) + + assert {:ok, {:neg_close, "sub-neg"}} = + Protocol.decode_client(Jason.encode!(["NEG-CLOSE", "sub-neg"])) + end + test "returns decode errors for malformed messages" do assert {:error, :invalid_json} = Protocol.decode_client("not-json") assert {:error, :invalid_filters} = Protocol.decode_client(Jason.encode!(["REQ", "sub-1"])) + assert {:error, :invalid_count} = Protocol.decode_client(Jason.encode!(["COUNT", "sub-1"])) assert {:error, :invalid_event} = Protocol.decode_client(Jason.encode!(["EVENT", "not-a-map"])) @@ -62,6 +84,12 @@ defmodule Parrhesia.ProtocolTest do test "encodes relay messages" do frame = Protocol.encode_relay({:closed, "sub-1", "error: subscription closed"}) assert Jason.decode!(frame) == ["CLOSED", "sub-1", "error: subscription closed"] + + auth_frame = Protocol.encode_relay({:auth, "challenge"}) + assert Jason.decode!(auth_frame) == ["AUTH", "challenge"] + + count_frame = Protocol.encode_relay({:count, "sub-1", %{"count" => 1}}) + assert Jason.decode!(count_frame) == ["COUNT", "sub-1", %{"count" => 1}] end defp valid_event do diff --git a/test/parrhesia/storage/adapters/memory/adapter_test.exs b/test/parrhesia/storage/adapters/memory/adapter_test.exs new file mode 100644 index 0000000..da1e660 --- /dev/null +++ b/test/parrhesia/storage/adapters/memory/adapter_test.exs @@ -0,0 +1,28 @@ +defmodule Parrhesia.Storage.Adapters.Memory.AdapterTest do + use ExUnit.Case, async: false + + alias Parrhesia.Storage.Adapters.Memory.Admin + alias Parrhesia.Storage.Adapters.Memory.Events + alias Parrhesia.Storage.Adapters.Memory.Groups + alias Parrhesia.Storage.Adapters.Memory.Moderation + + test "memory adapter supports basic behavior contract operations" do + event_id = String.duplicate("a", 64) + event = %{"id" => event_id, "pubkey" => "pk", "kind" => 1, "tags" => [], "content" => "hello"} + + assert {:ok, _event} = Events.put_event(%{}, event) + assert {:ok, [result]} = Events.query(%{}, [%{"ids" => [event_id]}], []) + assert result["id"] == event_id + + assert :ok = Moderation.ban_pubkey(%{}, "pk") + assert {:ok, true} = Moderation.pubkey_banned?(%{}, "pk") + + assert {:ok, membership} = + Groups.put_membership(%{}, %{group_id: "g1", pubkey: "pk", role: "member"}) + + assert membership.group_id == "g1" + + assert :ok = Admin.append_audit_log(%{}, %{method: "ping"}) + assert {:ok, [%{method: "ping"}]} = Admin.list_audit_logs(%{}, []) + end +end diff --git a/test/parrhesia/storage/adapters/postgres/adapter_contract_test.exs b/test/parrhesia/storage/adapters/postgres/adapter_contract_test.exs index 4f873bb..b9bac43 100644 --- a/test/parrhesia/storage/adapters/postgres/adapter_contract_test.exs +++ b/test/parrhesia/storage/adapters/postgres/adapter_contract_test.exs @@ -8,7 +8,10 @@ defmodule Parrhesia.Storage.Adapters.Postgres.AdapterContractTest do alias Parrhesia.Storage.Adapters.Postgres.Moderation setup_all do - start_supervised!(Repo) + if is_nil(Process.whereis(Repo)) do + start_supervised!(Repo) + end + Sandbox.mode(Repo, :manual) :ok end @@ -125,6 +128,11 @@ defmodule Parrhesia.Storage.Adapters.Postgres.AdapterContractTest do assert {:ok, stats_logs} = Admin.list_audit_logs(%{}, method: :stats) assert Enum.map(stats_logs, & &1.method) == ["stats"] + assert {:ok, %{"status" => "ok"}} = Admin.execute(%{}, :ping, %{}) + + assert {:ok, %{"events" => _events, "banned_pubkeys" => _banned, "blocked_ips" => _ips}} = + Admin.execute(%{}, :stats, %{}) + assert {:error, {:unsupported_method, "status"}} = Admin.execute(%{}, :status, %{}) end end diff --git a/test/parrhesia/storage/adapters/postgres/events_lifecycle_test.exs b/test/parrhesia/storage/adapters/postgres/events_lifecycle_test.exs new file mode 100644 index 0000000..c570b10 --- /dev/null +++ b/test/parrhesia/storage/adapters/postgres/events_lifecycle_test.exs @@ -0,0 +1,68 @@ +defmodule Parrhesia.Storage.Adapters.Postgres.EventsLifecycleTest do + use ExUnit.Case, async: false + + alias Ecto.Adapters.SQL.Sandbox + alias Parrhesia.Protocol.EventValidator + alias Parrhesia.Repo + alias Parrhesia.Storage.Adapters.Postgres.Events + + setup do + :ok = Sandbox.checkout(Repo) + :ok + end + + test "delete_by_request tombstones owned target events" do + target = event(%{"kind" => 1, "content" => "target"}) + assert {:ok, _event} = Events.put_event(%{}, target) + + delete_request = + event(%{ + "kind" => 5, + "tags" => [["e", target["id"]]], + "content" => "delete" + }) + + assert {:ok, 1} = Events.delete_by_request(%{}, delete_request) + assert {:ok, nil} = Events.get_event(%{}, target["id"]) + end + + test "vanish hard-deletes events authored by pubkey" do + author = String.duplicate("3", 64) + + first_event = event(%{"pubkey" => author, "created_at" => 1_700_000_000}) + second_event = event(%{"pubkey" => author, "created_at" => 1_700_000_100}) + + assert {:ok, _event} = Events.put_event(%{}, first_event) + assert {:ok, _event} = Events.put_event(%{}, second_event) + + vanish_event = + event(%{ + "pubkey" => author, + "kind" => 62, + "created_at" => 1_700_000_200, + "content" => "vanish" + }) + + assert {:ok, count} = Events.vanish(%{}, vanish_event) + assert count >= 2 + + assert {:ok, nil} = Events.get_event(%{}, first_event["id"]) + assert {:ok, nil} = Events.get_event(%{}, second_event["id"]) + end + + defp event(overrides) do + now = System.system_time(:second) + + base = %{ + "pubkey" => String.duplicate("1", 64), + "created_at" => now, + "kind" => 1, + "tags" => [], + "content" => "hello", + "sig" => String.duplicate("2", 128) + } + + event = Map.merge(base, overrides) + Map.put(event, "id", EventValidator.compute_id(event)) + end +end diff --git a/test/parrhesia/storage/adapters/postgres/events_query_count_test.exs b/test/parrhesia/storage/adapters/postgres/events_query_count_test.exs index 03e6c79..a05bb02 100644 --- a/test/parrhesia/storage/adapters/postgres/events_query_count_test.exs +++ b/test/parrhesia/storage/adapters/postgres/events_query_count_test.exs @@ -7,7 +7,10 @@ defmodule Parrhesia.Storage.Adapters.Postgres.EventsQueryCountTest do alias Parrhesia.Storage.Adapters.Postgres.Events setup_all do - start_supervised!(Repo) + if is_nil(Process.whereis(Repo)) do + start_supervised!(Repo) + end + Sandbox.mode(Repo, :manual) :ok end @@ -217,6 +220,57 @@ defmodule Parrhesia.Storage.Adapters.Postgres.EventsQueryCountTest do assert {:ok, 1} = Events.count(%{}, [%{"ids" => [first["id"], second["id"]]}], []) end + test "query/3 supports search filter and giftwrap recipient restriction" do + recipient = String.duplicate("9", 64) + + allowed = + persist_event(%{ + "kind" => 1059, + "tags" => [["p", recipient]], + "content" => "encrypted hello to recipient" + }) + + _other = + persist_event(%{ + "kind" => 1059, + "tags" => [["p", String.duplicate("1", 64)]], + "content" => "encrypted hello to somebody else" + }) + + filters = [%{"kinds" => [1059], "search" => "recipient"}] + + assert {:ok, [result]} = + Events.query(%{}, filters, requester_pubkeys: [recipient]) + + assert result["id"] == allowed["id"] + end + + test "mls keypackage relay list kind 10051 follows replaceable conflict semantics" do + author = String.duplicate("c", 64) + + first = + persist_event(%{ + "pubkey" => author, + "created_at" => 1_700_000_500, + "kind" => 10_051, + "content" => "v1" + }) + + second = + persist_event(%{ + "pubkey" => author, + "created_at" => 1_700_000_501, + "kind" => 10_051, + "content" => "v2" + }) + + assert {:ok, [result]} = + Events.query(%{}, [%{"authors" => [author], "kinds" => [10_051]}], []) + + assert result["id"] == second["id"] + assert {:ok, nil} = Events.get_event(%{}, first["id"]) + end + defp persist_event(overrides) do event = build_event(overrides) assert {:ok, _persisted} = Events.put_event(%{}, event) diff --git a/test/parrhesia/storage/adapters/postgres/events_test.exs b/test/parrhesia/storage/adapters/postgres/events_test.exs index b58fe27..dc0d30c 100644 --- a/test/parrhesia/storage/adapters/postgres/events_test.exs +++ b/test/parrhesia/storage/adapters/postgres/events_test.exs @@ -28,6 +28,37 @@ defmodule Parrhesia.Storage.Adapters.Postgres.EventsTest do assert normalized.pubkey == Base.decode16!(pubkey, case: :lower) end + test "applies MLS retention TTL to kind 445 when enabled" do + previous_features = Application.get_env(:parrhesia, :features, []) + previous_policies = Application.get_env(:parrhesia, :policies, []) + + Application.put_env(:parrhesia, :features, Keyword.put(previous_features, :nip_ee_mls, true)) + + Application.put_env( + :parrhesia, + :policies, + Keyword.put(previous_policies, :mls_group_event_ttl_seconds, 120) + ) + + on_exit(fn -> + Application.put_env(:parrhesia, :features, previous_features) + Application.put_env(:parrhesia, :policies, previous_policies) + end) + + event = %{ + "id" => String.duplicate("1", 64), + "pubkey" => String.duplicate("2", 64), + "created_at" => 1_700_000_000, + "kind" => 445, + "tags" => [], + "content" => "mls", + "sig" => String.duplicate("3", 128) + } + + assert {:ok, normalized} = Events.normalize_event(event) + assert normalized.expires_at == 1_700_000_120 + end + test "candidate_wins_state?/2 uses created_at then lexical id tie-break" do assert Events.candidate_wins_state?( %{created_at: 11, id: <<2>>}, diff --git a/test/parrhesia/storage/archiver_test.exs b/test/parrhesia/storage/archiver_test.exs new file mode 100644 index 0000000..96b03f1 --- /dev/null +++ b/test/parrhesia/storage/archiver_test.exs @@ -0,0 +1,22 @@ +defmodule Parrhesia.Storage.ArchiverTest do + use ExUnit.Case, async: false + + alias Ecto.Adapters.SQL.Sandbox + alias Parrhesia.Repo + alias Parrhesia.Storage.Archiver + + setup do + :ok = Sandbox.checkout(Repo) + :ok + end + + test "list_partitions returns partition tables" do + partitions = Archiver.list_partitions() + assert is_list(partitions) + end + + test "archive_sql builds insert-select statement" do + assert Archiver.archive_sql("events_2026_03", "events_archive") == + "INSERT INTO events_archive SELECT * FROM events_2026_03;" + end +end diff --git a/test/parrhesia/tasks/expiration_worker_test.exs b/test/parrhesia/tasks/expiration_worker_test.exs new file mode 100644 index 0000000..2263d99 --- /dev/null +++ b/test/parrhesia/tasks/expiration_worker_test.exs @@ -0,0 +1,31 @@ +defmodule Parrhesia.Tasks.ExpirationWorkerTest do + use ExUnit.Case, async: false + + alias Parrhesia.Tasks.ExpirationWorker + alias Parrhesia.TestSupport.ExpirationStubEvents + + setup do + previous_storage = Application.get_env(:parrhesia, :storage, []) + :persistent_term.put({ExpirationStubEvents, :test_pid}, self()) + + Application.put_env( + :parrhesia, + :storage, + Keyword.put(previous_storage, :events, ExpirationStubEvents) + ) + + on_exit(fn -> + :persistent_term.erase({ExpirationStubEvents, :test_pid}) + Application.put_env(:parrhesia, :storage, previous_storage) + end) + + :ok + end + + test "periodically triggers purge_expired" do + worker = start_supervised!({ExpirationWorker, name: nil, interval_ms: 10}) + + assert is_pid(worker) + assert_receive :purged + end +end diff --git a/test/parrhesia/web/conformance_test.exs b/test/parrhesia/web/conformance_test.exs new file mode 100644 index 0000000..70630f9 --- /dev/null +++ b/test/parrhesia/web/conformance_test.exs @@ -0,0 +1,59 @@ +defmodule Parrhesia.Web.ConformanceTest do + use ExUnit.Case, async: false + + alias Ecto.Adapters.SQL.Sandbox + alias Parrhesia.Protocol.EventValidator + alias Parrhesia.Repo + alias Parrhesia.Web.Connection + + setup do + :ok = Sandbox.checkout(Repo) + :ok + end + + test "REQ -> EOSE emitted once and CLOSE emits CLOSED" do + {:ok, state} = Connection.init(subscription_index: nil) + + req_payload = Jason.encode!(["REQ", "sub-e2e", %{"kinds" => [1]}]) + + assert {:push, frames, subscribed_state} = + Connection.handle_in({req_payload, [opcode: :text]}, state) + + decoded = Enum.map(frames, fn {:text, frame} -> Jason.decode!(frame) end) + assert ["EOSE", "sub-e2e"] = List.last(decoded) + + close_payload = Jason.encode!(["CLOSE", "sub-e2e"]) + + assert {:push, {:text, closed_frame}, closed_state} = + Connection.handle_in({close_payload, [opcode: :text]}, subscribed_state) + + assert Jason.decode!(closed_frame) == ["CLOSED", "sub-e2e", "error: subscription closed"] + refute Map.has_key?(closed_state.subscriptions, "sub-e2e") + end + + test "EVENT accepted path returns canonical OK frame" do + {:ok, state} = Connection.init(subscription_index: nil) + + event = valid_event() + + assert {:push, {:text, frame}, ^state} = + Connection.handle_in({Jason.encode!(["EVENT", event]), [opcode: :text]}, state) + + assert Jason.decode!(frame) == ["OK", event["id"], true, "ok: event stored"] + end + + defp valid_event do + now = System.system_time(:second) + + base = %{ + "pubkey" => String.duplicate("1", 64), + "created_at" => now, + "kind" => 1, + "tags" => [], + "content" => "e2e", + "sig" => String.duplicate("2", 128) + } + + Map.put(base, "id", EventValidator.compute_id(base)) + end +end diff --git a/test/parrhesia/web/connection_test.exs b/test/parrhesia/web/connection_test.exs index a6a8457..3a77f6d 100644 --- a/test/parrhesia/web/connection_test.exs +++ b/test/parrhesia/web/connection_test.exs @@ -1,106 +1,100 @@ defmodule Parrhesia.Web.ConnectionTest do - use ExUnit.Case, async: true + use ExUnit.Case, async: false + alias Ecto.Adapters.SQL.Sandbox alias Parrhesia.Protocol.EventValidator + alias Parrhesia.Repo alias Parrhesia.Web.Connection - test "REQ registers subscription and replies with EOSE" do - {:ok, state} = Connection.init(%{}) + setup do + :ok = Sandbox.checkout(Repo) + :ok + end + + test "REQ registers subscription, streams initial events and replies with EOSE" do + state = connection_state() req_payload = Jason.encode!(["REQ", "sub-123", %{"kinds" => [1]}]) - assert {:push, {:text, response}, next_state} = + assert {:push, responses, next_state} = Connection.handle_in({req_payload, [opcode: :text]}, state) assert Map.has_key?(next_state.subscriptions, "sub-123") assert next_state.subscriptions["sub-123"].filters == [%{"kinds" => [1]}] assert next_state.subscriptions["sub-123"].eose_sent? - assert Jason.decode!(response) == ["EOSE", "sub-123"] - end - test "REQ with same subscription id replaces existing subscription" do - {:ok, state} = Connection.init(%{}) - - first_req = Jason.encode!(["REQ", "sub-123", %{"kinds" => [1]}]) - second_req = Jason.encode!(["REQ", "sub-123", %{"kinds" => [2], "limit" => 5}]) - - assert {:push, _, subscribed_state} = - Connection.handle_in({first_req, [opcode: :text]}, state) - - assert {:push, {:text, response}, replaced_state} = - Connection.handle_in({second_req, [opcode: :text]}, subscribed_state) - - assert map_size(replaced_state.subscriptions) == 1 - - assert replaced_state.subscriptions["sub-123"].filters == [ - %{"kinds" => [2], "limit" => 5} + assert List.last(Enum.map(responses, fn {:text, frame} -> Jason.decode!(frame) end)) == [ + "EOSE", + "sub-123" ] - - assert Jason.decode!(response) == ["EOSE", "sub-123"] end - test "CLOSE removes subscription and replies with CLOSED" do - {:ok, state} = Connection.init(%{}) + test "COUNT returns exact count payload" do + state = connection_state() - req_payload = Jason.encode!(["REQ", "sub-123", %{"kinds" => [1]}]) - {:push, _, subscribed_state} = Connection.handle_in({req_payload, [opcode: :text]}, state) + payload = Jason.encode!(["COUNT", "sub-count", %{"kinds" => [1]}]) - close_payload = Jason.encode!(["CLOSE", "sub-123"]) + assert {:push, {:text, response}, ^state} = + Connection.handle_in({payload, [opcode: :text]}, state) + + assert ["COUNT", "sub-count", payload] = Jason.decode!(response) + assert payload["count"] >= 0 + assert payload["approximate"] == false + end + + test "AUTH accepts valid challenge event" do + state = connection_state() + + auth_event = valid_auth_event(state.auth_challenge) + payload = Jason.encode!(["AUTH", auth_event]) assert {:push, {:text, response}, next_state} = - Connection.handle_in({close_payload, [opcode: :text]}, subscribed_state) + Connection.handle_in({payload, [opcode: :text]}, state) - refute Map.has_key?(next_state.subscriptions, "sub-123") - assert Jason.decode!(response) == ["CLOSED", "sub-123", "error: subscription closed"] + assert Jason.decode!(response) == ["OK", auth_event["id"], true, "ok: auth accepted"] + assert MapSet.member?(next_state.authenticated_pubkeys, auth_event["pubkey"]) + refute next_state.auth_challenge == state.auth_challenge end - test "REQ above max subscriptions returns CLOSED and keeps existing subscriptions" do - {:ok, state} = Connection.init(max_subscriptions_per_connection: 1) + test "AUTH rejects mismatched challenge and returns AUTH frame" do + state = connection_state() - req_one = Jason.encode!(["REQ", "sub-1", %{"kinds" => [1]}]) - req_two = Jason.encode!(["REQ", "sub-2", %{"kinds" => [1]}]) + auth_event = valid_auth_event("wrong-challenge") + payload = Jason.encode!(["AUTH", auth_event]) - assert {:push, _, first_state} = Connection.handle_in({req_one, [opcode: :text]}, state) + assert {:push, frames, ^state} = Connection.handle_in({payload, [opcode: :text]}, state) - assert {:push, {:text, response}, second_state} = - Connection.handle_in({req_two, [opcode: :text]}, first_state) + decoded = Enum.map(frames, fn {:text, frame} -> Jason.decode!(frame) end) - assert map_size(second_state.subscriptions) == 1 - assert Map.has_key?(second_state.subscriptions, "sub-1") + assert Enum.any?(decoded, fn frame -> frame == ["AUTH", state.auth_challenge] end) - assert Jason.decode!(response) == [ - "CLOSED", - "sub-2", - "rate-limited: maximum subscriptions per connection exceeded" - ] + assert Enum.any?(decoded, fn frame -> + match?(["OK", _, false, _], frame) + end) end - test "invalid input returns NOTICE" do - {:ok, state} = Connection.init(%{}) + test "protected event is rejected unless authenticated" do + state = connection_state() - assert {:push, {:text, response}, ^state} = - Connection.handle_in({"not-json", [opcode: :text]}, state) + event = + valid_event() + |> Map.put("tags", [["-"]]) + |> then(&Map.put(&1, "id", EventValidator.compute_id(&1))) - assert Jason.decode!(response) == ["NOTICE", "invalid: malformed JSON"] + payload = Jason.encode!(["EVENT", event]) + + assert {:push, frames, ^state} = Connection.handle_in({payload, [opcode: :text]}, state) + + decoded = Enum.map(frames, fn {:text, frame} -> Jason.decode!(frame) end) + + assert ["OK", _, false, "auth-required: protected events require authenticated pubkey"] = + Enum.find(decoded, fn frame -> List.first(frame) == "OK" end) + + assert Enum.any?(decoded, fn frame -> frame == ["AUTH", state.auth_challenge] end) end - test "REQ with invalid filter returns CLOSED and does not subscribe" do - {:ok, state} = Connection.init(%{}) - - req_payload = Jason.encode!(["REQ", "sub-123", %{"kinds" => ["1"]}]) - - assert {:push, {:text, response}, ^state} = - Connection.handle_in({req_payload, [opcode: :text]}, state) - - assert Jason.decode!(response) == [ - "CLOSED", - "sub-123", - "invalid: kinds must be a non-empty array of integers between 0 and 65535" - ] - end - - test "valid EVENT currently replies with unsupported OK" do - {:ok, state} = Connection.init(%{}) + test "valid EVENT stores event and returns accepted OK" do + state = connection_state() event = valid_event() payload = Jason.encode!(["EVENT", event]) @@ -108,16 +102,11 @@ defmodule Parrhesia.Web.ConnectionTest do assert {:push, {:text, response}, ^state} = Connection.handle_in({payload, [opcode: :text]}, state) - assert Jason.decode!(response) == [ - "OK", - event["id"], - false, - "error: EVENT ingest not implemented" - ] + assert Jason.decode!(response) == ["OK", event["id"], true, "ok: event stored"] end test "invalid EVENT replies with OK false invalid prefix" do - {:ok, state} = Connection.init(%{}) + state = connection_state() event = valid_event() |> Map.put("sig", "nope") payload = Jason.encode!(["EVENT", event]) @@ -133,6 +122,37 @@ defmodule Parrhesia.Web.ConnectionTest do ] end + test "NEG sessions open and close" do + state = connection_state() + + open_payload = Jason.encode!(["NEG-OPEN", "neg-1", %{"cursor" => 0}]) + + assert {:push, {:text, open_response}, ^state} = + Connection.handle_in({open_payload, [opcode: :text]}, state) + + assert ["NEG-MSG", "neg-1", %{"status" => "open", "cursor" => 0}] = + Jason.decode!(open_response) + + close_payload = Jason.encode!(["NEG-CLOSE", "neg-1"]) + + assert {:push, {:text, close_response}, ^state} = + Connection.handle_in({close_payload, [opcode: :text]}, state) + + assert Jason.decode!(close_response) == ["NEG-MSG", "neg-1", %{"status" => "closed"}] + end + + test "CLOSE removes subscription and replies with CLOSED" do + state = subscribed_connection_state([]) + + close_payload = Jason.encode!(["CLOSE", "sub-1"]) + + assert {:push, {:text, response}, next_state} = + Connection.handle_in({close_payload, [opcode: :text]}, state) + + refute Map.has_key?(next_state.subscriptions, "sub-1") + assert Jason.decode!(response) == ["CLOSED", "sub-1", "error: subscription closed"] + end + test "fanout_event enqueues and drains matching events" do state = subscribed_connection_state([]) event = live_event("event-1", 1) @@ -149,16 +169,6 @@ defmodule Parrhesia.Web.ConnectionTest do assert Jason.decode!(payload) == ["EVENT", "sub-1", event] end - test "fanout_event ignores non-matching subscription filters" do - state = subscribed_connection_state([]) - - assert {:ok, next_state} = - Connection.handle_info({:fanout_event, "sub-1", live_event("event-2", 2)}, state) - - assert next_state.outbound_queue_size == 0 - refute_received :drain_outbound_queue - end - test "outbound queue overflow closes connection when strategy is close" do state = subscribed_connection_state( @@ -167,61 +177,36 @@ defmodule Parrhesia.Web.ConnectionTest do outbound_drain_batch_size: 1 ) - event_one = live_event("event-1", 1) - event_two = live_event("event-2", 1) - assert {:ok, queued_state} = - Connection.handle_info({:fanout_event, "sub-1", event_one}, state) + Connection.handle_info({:fanout_event, "sub-1", live_event("event-1", 1)}, state) - assert queued_state.outbound_queue_size == 1 assert_receive :drain_outbound_queue assert {:stop, :normal, {1008, message}, [{:text, notice_payload}], _overflow_state} = - Connection.handle_info({:fanout_event, "sub-1", event_two}, queued_state) + Connection.handle_info( + {:fanout_event, "sub-1", live_event("event-2", 1)}, + queued_state + ) assert message == "rate-limited: outbound queue overflow" assert Jason.decode!(notice_payload) == ["NOTICE", message] end - test "outbound queue overflow drops oldest event when strategy is drop_oldest" do - state = - subscribed_connection_state( - max_outbound_queue: 1, - outbound_overflow_strategy: :drop_oldest, - outbound_drain_batch_size: 1 - ) - - event_one = live_event("event-1", 1) - event_two = live_event("event-2", 1) - - assert {:ok, queued_state} = - Connection.handle_info({:fanout_event, "sub-1", event_one}, state) - - assert queued_state.outbound_queue_size == 1 - assert_receive :drain_outbound_queue - - assert {:ok, replaced_state} = - Connection.handle_info({:fanout_event, "sub-1", event_two}, queued_state) - - assert replaced_state.outbound_queue_size == 1 - - assert {:push, [{:text, payload}], drained_state} = - Connection.handle_info(:drain_outbound_queue, replaced_state) - - assert drained_state.outbound_queue_size == 0 - assert Jason.decode!(payload) == ["EVENT", "sub-1", event_two] - end - defp subscribed_connection_state(opts) do - {:ok, initial_state} = Connection.init(Keyword.put_new(opts, :subscription_index, nil)) + state = connection_state(opts) req_payload = Jason.encode!(["REQ", "sub-1", %{"kinds" => [1]}]) assert {:push, _, subscribed_state} = - Connection.handle_in({req_payload, [opcode: :text]}, initial_state) + Connection.handle_in({req_payload, [opcode: :text]}, state) subscribed_state end + defp connection_state(opts \\ []) do + {:ok, state} = Connection.init(Keyword.put_new(opts, :subscription_index, nil)) + state + end + defp live_event(id, kind) do %{ "id" => id, @@ -234,6 +219,21 @@ defmodule Parrhesia.Web.ConnectionTest do } end + defp valid_auth_event(challenge) do + now = System.system_time(:second) + + base = %{ + "pubkey" => String.duplicate("9", 64), + "created_at" => now, + "kind" => 22_242, + "tags" => [["challenge", challenge]], + "content" => "", + "sig" => String.duplicate("8", 128) + } + + Map.put(base, "id", EventValidator.compute_id(base)) + end + defp valid_event do base_event = %{ "pubkey" => String.duplicate("1", 64), diff --git a/test/parrhesia/web/router_test.exs b/test/parrhesia/web/router_test.exs new file mode 100644 index 0000000..9f9210f --- /dev/null +++ b/test/parrhesia/web/router_test.exs @@ -0,0 +1,97 @@ +defmodule Parrhesia.Web.RouterTest do + use ExUnit.Case, async: false + + import Plug.Conn + import Plug.Test + + alias Ecto.Adapters.SQL.Sandbox + alias Parrhesia.Protocol.EventValidator + alias Parrhesia.Repo + alias Parrhesia.Web.Router + + setup do + :ok = Sandbox.checkout(Repo) + :ok + end + + test "GET /health returns ok" do + conn = conn(:get, "/health") |> Router.call([]) + + assert conn.status == 200 + assert conn.resp_body == "ok" + end + + test "GET /ready returns ready" do + conn = conn(:get, "/ready") |> Router.call([]) + + assert conn.status == 200 + assert conn.resp_body == "ready" + end + + test "GET /relay with nostr accept header returns NIP-11 document" do + conn = + conn(:get, "/relay") + |> put_req_header("accept", "application/nostr+json") + |> Router.call([]) + + assert conn.status == 200 + assert get_resp_header(conn, "content-type") == ["application/nostr+json; charset=utf-8"] + + body = Jason.decode!(conn.resp_body) + + assert body["name"] == "Parrhesia" + assert 11 in body["supported_nips"] + end + + test "GET /metrics returns prometheus payload" do + conn = conn(:get, "/metrics") |> Router.call([]) + + assert conn.status == 200 + assert get_resp_header(conn, "content-type") == ["text/plain; charset=utf-8"] + end + + test "POST /management requires authorization" do + conn = + conn(:post, "/management", Jason.encode!(%{"method" => "ping", "params" => %{}})) + |> put_req_header("content-type", "application/json") + |> Router.call([]) + + assert conn.status == 401 + assert Jason.decode!(conn.resp_body) == %{"ok" => false, "error" => "auth-required"} + end + + test "POST /management accepts valid NIP-98 header" do + management_url = "http://www.example.com/management" + auth_event = nip98_event("POST", management_url) + + authorization = "Nostr " <> Base.encode64(Jason.encode!(auth_event)) + + conn = + conn(:post, "/management", Jason.encode!(%{"method" => "ping", "params" => %{}})) + |> put_req_header("content-type", "application/json") + |> put_req_header("authorization", authorization) + |> Router.call([]) + + assert conn.status == 200 + + assert Jason.decode!(conn.resp_body) == %{ + "ok" => true, + "result" => %{"status" => "ok"} + } + end + + defp nip98_event(method, url) do + now = System.system_time(:second) + + base = %{ + "pubkey" => String.duplicate("a", 64), + "created_at" => now, + "kind" => 27_235, + "tags" => [["method", method], ["u", url]], + "content" => "", + "sig" => String.duplicate("b", 128) + } + + Map.put(base, "id", EventValidator.compute_id(base)) + end +end