From 336b19249273897e2bab8a66ed19bf37cb9d93b3 Mon Sep 17 00:00:00 2001 From: Steffen Beyer Date: Fri, 13 Mar 2026 20:46:50 +0100 Subject: [PATCH] Implement remaining Postgres storage adapters and contracts --- PROGRESS.md | 6 +- .../storage/adapters/postgres/admin.ex | 170 +++++++++- .../storage/adapters/postgres/events.ex | 127 +++++++- .../storage/adapters/postgres/groups.ex | 300 +++++++++++++++++- .../storage/adapters/postgres/moderation.ex | 156 ++++++++- .../postgres/adapter_contract_test.exs | 130 ++++++++ .../postgres/events_query_count_test.exs | 65 ++++ 7 files changed, 905 insertions(+), 49 deletions(-) create mode 100644 test/parrhesia/storage/adapters/postgres/adapter_contract_test.exs diff --git a/PROGRESS.md b/PROGRESS.md index e8c3428..b75fa1c 100644 --- a/PROGRESS.md +++ b/PROGRESS.md @@ -21,10 +21,10 @@ Implementation checklist for Parrhesia relay. ## Phase 2 — storage boundary + postgres adapter - [x] Define `Parrhesia.Storage.*` behaviors (events/moderation/groups/admin) -- [ ] Implement Postgres adapter modules behind behaviors +- [x] Implement Postgres adapter modules behind behaviors - [x] Create migrations for events, tags, moderation, membership -- [ ] Implement replaceable/addressable semantics at storage layer -- [ ] Add adapter contract test suite +- [x] Implement replaceable/addressable semantics at storage layer +- [x] Add adapter contract test suite ## Phase 3 — fanout + performance primitives diff --git a/lib/parrhesia/storage/adapters/postgres/admin.ex b/lib/parrhesia/storage/adapters/postgres/admin.ex index 751c302..38a1265 100644 --- a/lib/parrhesia/storage/adapters/postgres/admin.ex +++ b/lib/parrhesia/storage/adapters/postgres/admin.ex @@ -1,19 +1,175 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Admin do @moduledoc """ PostgreSQL-backed implementation for `Parrhesia.Storage.Admin`. - - Implementation is intentionally staged; callbacks currently return - `{:error, :not_implemented}` until NIP-86 management storage lands. """ + import Ecto.Query + + alias Parrhesia.Repo + @behaviour Parrhesia.Storage.Admin - @impl true - def execute(_context, _method, _params), do: {:error, :not_implemented} + @default_limit 100 + @max_limit 1_000 @impl true - def append_audit_log(_context, _entry), do: {:error, :not_implemented} + def execute(_context, method, _params) do + {:error, {:unsupported_method, normalize_method_name(method)}} + end @impl true - def list_audit_logs(_context, _opts), do: {:error, :not_implemented} + def append_audit_log(_context, audit_entry) when is_map(audit_entry) do + with {:ok, method} <- fetch_required_method(audit_entry), + {:ok, actor_pubkey} <- fetch_optional_pubkey(audit_entry), + {:ok, params} <- fetch_optional_map(audit_entry, :params), + {:ok, result} <- fetch_optional_map(audit_entry, :result, true) do + now = DateTime.utc_now() |> DateTime.truncate(:microsecond) + + {inserted, _result} = + Repo.insert_all("management_audit_logs", [ + audit_log_row(method, actor_pubkey, params, result, now) + ]) + + if inserted == 1 do + :ok + else + {:error, :audit_log_insert_failed} + end + end + end + + def append_audit_log(_context, _audit_entry), do: {:error, :invalid_audit_entry} + + @impl true + def list_audit_logs(_context, opts) when is_list(opts) do + limit = normalize_limit(Keyword.get(opts, :limit, @default_limit)) + + query = + from(log in "management_audit_logs", + order_by: [desc: log.inserted_at, desc: log.id], + limit: ^limit, + select: %{ + id: log.id, + actor_pubkey: log.actor_pubkey, + method: log.method, + params: log.params, + result: log.result, + inserted_at: log.inserted_at + } + ) + |> maybe_filter_method(Keyword.get(opts, :method)) + |> maybe_filter_actor_pubkey(Keyword.get(opts, :actor_pubkey)) + + logs = + query + |> Repo.all() + |> Enum.map(&to_audit_log_map/1) + + {:ok, logs} + end + + def list_audit_logs(_context, _opts), do: {:error, :invalid_opts} + + defp fetch_required_method(audit_entry) do + audit_entry + |> fetch_value(:method) + |> normalize_non_empty_string(:invalid_method) + end + + defp fetch_optional_pubkey(audit_entry) do + case fetch_value(audit_entry, :actor_pubkey) do + nil -> {:ok, nil} + value -> normalize_pubkey(value) + end + end + + defp fetch_optional_map(audit_entry, key, allow_nil \\ false) do + case fetch_value(audit_entry, key) do + nil when allow_nil -> {:ok, nil} + nil -> {:ok, %{}} + value when is_map(value) -> {:ok, value} + _value -> {:error, invalid_key_reason(key)} + end + end + + defp fetch_value(map, key) when is_map(map) do + Map.get(map, key) || Map.get(map, Atom.to_string(key)) + end + + defp normalize_method_name(method) when is_atom(method), do: Atom.to_string(method) + defp normalize_method_name(method) when is_binary(method), do: method + defp normalize_method_name(method), do: inspect(method) + + defp normalize_non_empty_string(value, _reason) when is_binary(value) and value != "", + do: {:ok, value} + + defp normalize_non_empty_string(_value, reason), do: {:error, reason} + + defp normalize_pubkey(value) when is_binary(value) and byte_size(value) == 32, do: {:ok, value} + + defp normalize_pubkey(value) when is_binary(value) and byte_size(value) == 64 do + case Base.decode16(value, case: :mixed) do + {:ok, pubkey} -> {:ok, pubkey} + :error -> {:error, :invalid_actor_pubkey} + end + end + + defp normalize_pubkey(_value), do: {:error, :invalid_actor_pubkey} + + defp invalid_key_reason(:params), do: :invalid_params + defp invalid_key_reason(:result), do: :invalid_result + + defp audit_log_row(method, actor_pubkey, params, result, inserted_at) do + %{ + method: method, + actor_pubkey: actor_pubkey, + params: params, + result: result, + inserted_at: inserted_at + } + end + + defp normalize_limit(limit) when is_integer(limit) and limit > 0 do + min(limit, @max_limit) + end + + defp normalize_limit(_limit), do: @default_limit + + defp maybe_filter_method(query, nil), do: query + + defp maybe_filter_method(query, method) when is_atom(method) do + maybe_filter_method(query, Atom.to_string(method)) + end + + defp maybe_filter_method(query, method) when is_binary(method) and method != "" do + where(query, [log], log.method == ^method) + end + + defp maybe_filter_method(query, _method), do: query + + defp maybe_filter_actor_pubkey(query, nil), do: query + + defp maybe_filter_actor_pubkey(query, actor_pubkey) do + case normalize_pubkey(actor_pubkey) do + {:ok, normalized_actor_pubkey} -> + where(query, [log], log.actor_pubkey == ^normalized_actor_pubkey) + + {:error, _reason} -> + where(query, [log], false) + end + end + + defp to_audit_log_map(log) do + %{ + id: log.id, + actor_pubkey: encode_optional_hex(log.actor_pubkey), + method: log.method, + params: log.params, + result: log.result, + inserted_at: log.inserted_at + } + end + + defp encode_optional_hex(nil), do: nil + defp encode_optional_hex(value), do: Base.encode16(value, case: :lower) end diff --git a/lib/parrhesia/storage/adapters/postgres/events.ex b/lib/parrhesia/storage/adapters/postgres/events.ex index 5467ceb..a72ee03 100644 --- a/lib/parrhesia/storage/adapters/postgres/events.ex +++ b/lib/parrhesia/storage/adapters/postgres/events.ex @@ -255,12 +255,14 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Events do end defp upsert_state_tables!(normalized_event, now) do - :ok = maybe_upsert_replaceable_state(normalized_event, now) - :ok = maybe_upsert_addressable_state(normalized_event, now) + deleted_at = DateTime.to_unix(now, :second) + + :ok = maybe_upsert_replaceable_state(normalized_event, now, deleted_at) + :ok = maybe_upsert_addressable_state(normalized_event, now, deleted_at) :ok end - defp maybe_upsert_replaceable_state(normalized_event, now) do + defp maybe_upsert_replaceable_state(normalized_event, now, deleted_at) do if replaceable_kind?(normalized_event.kind) do lookup_query = from(state in "replaceable_event_state", @@ -283,6 +285,7 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Events do replaceable_state_row(normalized_event, now), normalized_event, now, + deleted_at, :replaceable_state_update_failed ) else @@ -290,7 +293,7 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Events do end end - defp maybe_upsert_addressable_state(normalized_event, now) do + defp maybe_upsert_addressable_state(normalized_event, now, deleted_at) do if addressable_kind?(normalized_event.kind) do lookup_query = from(state in "addressable_event_state", @@ -316,6 +319,7 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Events do addressable_state_row(normalized_event, now), normalized_event, now, + deleted_at, :addressable_state_update_failed ) else @@ -330,24 +334,95 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Events do insert_row, normalized_event, now, + deleted_at, failure_reason ) do case Repo.one(lookup_query) do nil -> - {inserted, _result} = Repo.insert_all(table_name, [insert_row], on_conflict: :nothing) - - if inserted <= 1 do - :ok - else - Repo.rollback(failure_reason) - end + insert_state_or_resolve_race( + table_name, + lookup_query, + update_query, + insert_row, + normalized_event, + now, + deleted_at, + failure_reason + ) current_state -> - maybe_update_state(update_query, normalized_event, current_state, now, failure_reason) + maybe_update_state( + update_query, + normalized_event, + current_state, + now, + deleted_at, + failure_reason + ) end end - defp maybe_update_state(update_query, normalized_event, current_state, now, failure_reason) do + defp insert_state_or_resolve_race( + table_name, + lookup_query, + update_query, + insert_row, + normalized_event, + now, + deleted_at, + failure_reason + ) do + case Repo.insert_all(table_name, [insert_row], on_conflict: :nothing) do + {1, _result} -> + :ok + + {0, _result} -> + resolve_state_race( + lookup_query, + update_query, + normalized_event, + now, + deleted_at, + failure_reason + ) + + {_inserted, _result} -> + Repo.rollback(failure_reason) + end + end + + defp resolve_state_race( + lookup_query, + update_query, + normalized_event, + now, + deleted_at, + failure_reason + ) do + case Repo.one(lookup_query) do + nil -> + Repo.rollback(failure_reason) + + current_state -> + maybe_update_state( + update_query, + normalized_event, + current_state, + now, + deleted_at, + failure_reason + ) + end + end + + defp maybe_update_state( + update_query, + normalized_event, + current_state, + now, + deleted_at, + failure_reason + ) do if candidate_wins_state?(normalized_event, current_state) do {updated, _result} = Repo.update_all(update_query, @@ -359,12 +434,36 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Events do ) if updated == 1 do - :ok + retire_event!( + current_state.event_created_at, + current_state.event_id, + deleted_at, + failure_reason + ) else Repo.rollback(failure_reason) end else + retire_event!(normalized_event.created_at, normalized_event.id, deleted_at, failure_reason) + end + end + + defp retire_event!(event_created_at, event_id, deleted_at, failure_reason) do + {updated, _result} = + Repo.update_all( + from(event in "events", + where: + event.created_at == ^event_created_at and + event.id == ^event_id and + is_nil(event.deleted_at) + ), + set: [deleted_at: deleted_at] + ) + + if updated in [0, 1] do :ok + else + Repo.rollback(failure_reason) end end diff --git a/lib/parrhesia/storage/adapters/postgres/groups.ex b/lib/parrhesia/storage/adapters/postgres/groups.ex index 28b5fea..5e56ec3 100644 --- a/lib/parrhesia/storage/adapters/postgres/groups.ex +++ b/lib/parrhesia/storage/adapters/postgres/groups.ex @@ -1,31 +1,311 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Groups do @moduledoc """ PostgreSQL-backed implementation for `Parrhesia.Storage.Groups`. - - Implementation is intentionally staged; callbacks currently return - `{:error, :not_implemented}` until group/membership schema lands. """ + import Ecto.Query + + alias Parrhesia.Repo + @behaviour Parrhesia.Storage.Groups @impl true - def put_membership(_context, _membership), do: {:error, :not_implemented} + def put_membership(_context, membership) when is_map(membership) do + with {:ok, group_id} <- fetch_required_string(membership, :group_id), + {:ok, pubkey} <- fetch_required_pubkey(membership), + {:ok, role} <- fetch_required_string(membership, :role), + {:ok, metadata} <- fetch_map(membership, :metadata) do + now = DateTime.utc_now() |> DateTime.truncate(:microsecond) + + Repo.transaction(fn -> + ensure_group_exists!(group_id, now) + upsert_group_membership!(group_id, pubkey, role, metadata, now) + to_membership_map(group_id, pubkey, role, metadata) + end) + |> unwrap_transaction_result() + end + end + + def put_membership(_context, _membership), do: {:error, :invalid_membership} @impl true - def get_membership(_context, _group_id, _pubkey), do: {:error, :not_implemented} + def get_membership(_context, group_id, pubkey) do + with {:ok, normalized_group_id} <- normalize_group_id(group_id), + {:ok, normalized_pubkey} <- normalize_pubkey(pubkey) do + query = + from(membership in "group_memberships", + where: + membership.group_id == ^normalized_group_id and + membership.pubkey == ^normalized_pubkey, + select: %{ + group_id: membership.group_id, + pubkey: membership.pubkey, + role: membership.role, + metadata: membership.metadata + }, + limit: 1 + ) + + case Repo.one(query) do + nil -> + {:ok, nil} + + membership -> + {:ok, + to_membership_map( + membership.group_id, + membership.pubkey, + membership.role, + membership.metadata + )} + end + end + end @impl true - def delete_membership(_context, _group_id, _pubkey), do: {:error, :not_implemented} + def delete_membership(_context, group_id, pubkey) do + with {:ok, normalized_group_id} <- normalize_group_id(group_id), + {:ok, normalized_pubkey} <- normalize_pubkey(pubkey) do + query = + from(membership in "group_memberships", + where: + membership.group_id == ^normalized_group_id and + membership.pubkey == ^normalized_pubkey + ) + + {_deleted, _result} = Repo.delete_all(query) + :ok + end + end @impl true - def list_memberships(_context, _group_id), do: {:error, :not_implemented} + def list_memberships(_context, group_id) do + with {:ok, normalized_group_id} <- normalize_group_id(group_id) do + query = + from(membership in "group_memberships", + where: membership.group_id == ^normalized_group_id, + order_by: [asc: membership.role, asc: membership.pubkey], + select: %{ + group_id: membership.group_id, + pubkey: membership.pubkey, + role: membership.role, + metadata: membership.metadata + } + ) + + memberships = + query + |> Repo.all() + |> Enum.map(fn membership -> + to_membership_map( + membership.group_id, + membership.pubkey, + membership.role, + membership.metadata + ) + end) + + {:ok, memberships} + end + end @impl true - def put_role(_context, _role), do: {:error, :not_implemented} + def put_role(_context, role) when is_map(role) do + with {:ok, group_id} <- fetch_required_string(role, :group_id), + {:ok, pubkey} <- fetch_required_pubkey(role), + {:ok, role_name} <- fetch_required_string(role, :role), + {:ok, metadata} <- fetch_map(role, :metadata) do + now = DateTime.utc_now() |> DateTime.truncate(:microsecond) + + Repo.transaction(fn -> + ensure_group_exists!(group_id, now) + upsert_group_role!(group_id, pubkey, role_name, metadata, now) + to_role_map(group_id, pubkey, role_name, metadata) + end) + |> unwrap_transaction_result() + end + end + + def put_role(_context, _role), do: {:error, :invalid_role} @impl true - def delete_role(_context, _group_id, _pubkey, _role), do: {:error, :not_implemented} + def delete_role(_context, group_id, pubkey, role_name) do + with {:ok, normalized_group_id} <- normalize_group_id(group_id), + {:ok, normalized_pubkey} <- normalize_pubkey(pubkey), + {:ok, normalized_role_name} <- normalize_role(role_name) do + query = + from(role in "group_roles", + where: + role.group_id == ^normalized_group_id and + role.pubkey == ^normalized_pubkey and + role.role == ^normalized_role_name + ) + + {_deleted, _result} = Repo.delete_all(query) + :ok + end + end @impl true - def list_roles(_context, _group_id, _pubkey), do: {:error, :not_implemented} + def list_roles(_context, group_id, pubkey) do + with {:ok, normalized_group_id} <- normalize_group_id(group_id), + {:ok, normalized_pubkey} <- normalize_pubkey(pubkey) do + query = + from(role in "group_roles", + where: role.group_id == ^normalized_group_id and role.pubkey == ^normalized_pubkey, + order_by: [asc: role.role], + select: %{ + group_id: role.group_id, + pubkey: role.pubkey, + role: role.role, + metadata: role.metadata + } + ) + + roles = + query + |> Repo.all() + |> Enum.map(fn role -> + to_role_map(role.group_id, role.pubkey, role.role, role.metadata) + end) + + {:ok, roles} + end + end + + defp ensure_group_exists!(group_id, now) do + {inserted, _result} = + Repo.insert_all( + "relay_groups", + [ + %{ + group_id: group_id, + metadata: %{}, + inserted_at: now, + updated_at: now + } + ], + on_conflict: :nothing, + conflict_target: [:group_id] + ) + + ensure_single_upsert_row!(inserted, :group_upsert_failed) + end + + defp upsert_group_membership!(group_id, pubkey, role, metadata, now) do + {inserted, _result} = + Repo.insert_all( + "group_memberships", + [ + %{ + group_id: group_id, + pubkey: pubkey, + role: role, + metadata: metadata, + inserted_at: now, + updated_at: now + } + ], + on_conflict: [set: [role: role, metadata: metadata, updated_at: now]], + conflict_target: [:group_id, :pubkey] + ) + + ensure_single_upsert_row!(inserted, :membership_upsert_failed) + end + + defp upsert_group_role!(group_id, pubkey, role_name, metadata, now) do + {inserted, _result} = + Repo.insert_all( + "group_roles", + [ + %{ + group_id: group_id, + pubkey: pubkey, + role: role_name, + metadata: metadata, + inserted_at: now, + updated_at: now + } + ], + on_conflict: [set: [metadata: metadata, updated_at: now]], + conflict_target: [:group_id, :pubkey, :role] + ) + + ensure_single_upsert_row!(inserted, :role_upsert_failed) + end + + defp ensure_single_upsert_row!(inserted, _failure_reason) when inserted <= 1, do: :ok + + defp ensure_single_upsert_row!(_inserted, failure_reason) do + Repo.rollback(failure_reason) + end + + defp unwrap_transaction_result({:ok, result}), do: {:ok, result} + defp unwrap_transaction_result({:error, reason}), do: {:error, reason} + + defp fetch_required_string(map, key) do + map + |> fetch_value(key) + |> normalize_non_empty_string(invalid_key_reason(key)) + end + + defp fetch_required_pubkey(map) do + map + |> fetch_value(:pubkey) + |> normalize_pubkey() + end + + defp fetch_map(map, key) do + case fetch_value(map, key) do + nil -> {:ok, %{}} + value when is_map(value) -> {:ok, value} + _value -> {:error, invalid_key_reason(key)} + end + end + + defp fetch_value(map, key) when is_map(map) do + Map.get(map, key) || Map.get(map, Atom.to_string(key)) + end + + defp normalize_group_id(group_id), do: normalize_non_empty_string(group_id, :invalid_group_id) + defp normalize_role(role_name), do: normalize_non_empty_string(role_name, :invalid_role) + + defp normalize_non_empty_string(value, _reason) when is_binary(value) and value != "", + do: {:ok, value} + + defp normalize_non_empty_string(_value, reason), do: {:error, reason} + + defp normalize_pubkey(value) when is_binary(value) and byte_size(value) == 32, do: {:ok, value} + + defp normalize_pubkey(value) when is_binary(value) and byte_size(value) == 64 do + case Base.decode16(value, case: :mixed) do + {:ok, pubkey} -> {:ok, pubkey} + :error -> {:error, :invalid_pubkey} + end + end + + defp normalize_pubkey(_value), do: {:error, :invalid_pubkey} + + defp invalid_key_reason(:group_id), do: :invalid_group_id + defp invalid_key_reason(:pubkey), do: :invalid_pubkey + defp invalid_key_reason(:role), do: :invalid_role + defp invalid_key_reason(:metadata), do: :invalid_metadata + + defp to_membership_map(group_id, pubkey, role, metadata) do + %{ + group_id: group_id, + pubkey: Base.encode16(pubkey, case: :lower), + role: role, + metadata: metadata + } + end + + defp to_role_map(group_id, pubkey, role_name, metadata) do + %{ + group_id: group_id, + pubkey: Base.encode16(pubkey, case: :lower), + role: role_name, + metadata: metadata + } + end end diff --git a/lib/parrhesia/storage/adapters/postgres/moderation.ex b/lib/parrhesia/storage/adapters/postgres/moderation.ex index 3e8ee8e..4f29c58 100644 --- a/lib/parrhesia/storage/adapters/postgres/moderation.ex +++ b/lib/parrhesia/storage/adapters/postgres/moderation.ex @@ -1,46 +1,172 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Moderation do @moduledoc """ PostgreSQL-backed implementation for `Parrhesia.Storage.Moderation`. - - Implementation is intentionally staged; callbacks currently return - `{:error, :not_implemented}` until table design and policy paths land. """ + import Ecto.Query + + alias Parrhesia.Repo + @behaviour Parrhesia.Storage.Moderation @impl true - def ban_pubkey(_context, _pubkey), do: {:error, :not_implemented} + def ban_pubkey(_context, pubkey) do + with {:ok, normalized_pubkey} <- normalize_hex_or_binary(pubkey, 32, :invalid_pubkey) do + upsert_presence_table("banned_pubkeys", :pubkey, normalized_pubkey) + end + end @impl true - def unban_pubkey(_context, _pubkey), do: {:error, :not_implemented} + def unban_pubkey(_context, pubkey) do + with {:ok, normalized_pubkey} <- normalize_hex_or_binary(pubkey, 32, :invalid_pubkey) do + delete_from_table("banned_pubkeys", :pubkey, normalized_pubkey) + end + end @impl true - def pubkey_banned?(_context, _pubkey), do: {:error, :not_implemented} + def pubkey_banned?(_context, pubkey) do + with {:ok, normalized_pubkey} <- normalize_hex_or_binary(pubkey, 32, :invalid_pubkey) do + {:ok, exists_in_table?("banned_pubkeys", :pubkey, normalized_pubkey)} + end + end @impl true - def allow_pubkey(_context, _pubkey), do: {:error, :not_implemented} + def allow_pubkey(_context, pubkey) do + with {:ok, normalized_pubkey} <- normalize_hex_or_binary(pubkey, 32, :invalid_pubkey) do + upsert_presence_table("allowed_pubkeys", :pubkey, normalized_pubkey) + end + end @impl true - def disallow_pubkey(_context, _pubkey), do: {:error, :not_implemented} + def disallow_pubkey(_context, pubkey) do + with {:ok, normalized_pubkey} <- normalize_hex_or_binary(pubkey, 32, :invalid_pubkey) do + delete_from_table("allowed_pubkeys", :pubkey, normalized_pubkey) + end + end @impl true - def pubkey_allowed?(_context, _pubkey), do: {:error, :not_implemented} + def pubkey_allowed?(_context, pubkey) do + with {:ok, normalized_pubkey} <- normalize_hex_or_binary(pubkey, 32, :invalid_pubkey) do + {:ok, exists_in_table?("allowed_pubkeys", :pubkey, normalized_pubkey)} + end + end @impl true - def ban_event(_context, _event_id), do: {:error, :not_implemented} + def ban_event(_context, event_id) do + with {:ok, normalized_event_id} <- normalize_hex_or_binary(event_id, 32, :invalid_event_id) do + upsert_presence_table("banned_events", :event_id, normalized_event_id) + end + end @impl true - def unban_event(_context, _event_id), do: {:error, :not_implemented} + def unban_event(_context, event_id) do + with {:ok, normalized_event_id} <- normalize_hex_or_binary(event_id, 32, :invalid_event_id) do + delete_from_table("banned_events", :event_id, normalized_event_id) + end + end @impl true - def event_banned?(_context, _event_id), do: {:error, :not_implemented} + def event_banned?(_context, event_id) do + with {:ok, normalized_event_id} <- normalize_hex_or_binary(event_id, 32, :invalid_event_id) do + {:ok, exists_in_table?("banned_events", :event_id, normalized_event_id)} + end + end @impl true - def block_ip(_context, _ip_address), do: {:error, :not_implemented} + def block_ip(_context, ip_address) do + with {:ok, normalized_ip} <- normalize_ip(ip_address) do + upsert_presence_table("blocked_ips", :ip, normalized_ip) + end + end @impl true - def unblock_ip(_context, _ip_address), do: {:error, :not_implemented} + def unblock_ip(_context, ip_address) do + with {:ok, normalized_ip} <- normalize_ip(ip_address) do + delete_from_table("blocked_ips", :ip, normalized_ip) + end + end @impl true - def ip_blocked?(_context, _ip_address), do: {:error, :not_implemented} + def ip_blocked?(_context, ip_address) do + with {:ok, normalized_ip} <- normalize_ip(ip_address) do + {:ok, exists_in_table?("blocked_ips", :ip, normalized_ip)} + end + end + + defp upsert_presence_table(table, field, value) do + now = DateTime.utc_now() |> DateTime.truncate(:microsecond) + + {inserted, _result} = + Repo.insert_all( + table, + [ + %{ + field => value, + inserted_at: now + } + ], + on_conflict: :nothing, + conflict_target: [field] + ) + + if inserted <= 1 do + :ok + else + {:error, :insert_failed} + end + end + + defp delete_from_table(table, field, value) do + query = from(record in table, where: field(record, ^field) == ^value) + {_deleted, _result} = Repo.delete_all(query) + :ok + end + + defp exists_in_table?(table, field, value) do + query = + from(record in table, + where: field(record, ^field) == ^value, + select: 1, + limit: 1 + ) + + Repo.one(query) == 1 + end + + defp normalize_hex_or_binary(value, expected_bytes, _reason) + when is_binary(value) and byte_size(value) == expected_bytes, + do: {:ok, value} + + defp normalize_hex_or_binary(value, expected_bytes, reason) when is_binary(value) do + if byte_size(value) == expected_bytes * 2 do + case Base.decode16(value, case: :mixed) do + {:ok, decoded} -> {:ok, decoded} + :error -> {:error, reason} + end + else + {:error, reason} + end + end + + defp normalize_hex_or_binary(_value, _expected_bytes, reason), do: {:error, reason} + + defp normalize_ip({_, _, _, _} = ip_tuple), do: {:ok, to_inet(ip_tuple)} + defp normalize_ip({_, _, _, _, _, _, _, _} = ip_tuple), do: {:ok, to_inet(ip_tuple)} + + defp normalize_ip(ip_address) when is_binary(ip_address) do + ip_address + |> String.to_charlist() + |> :inet.parse_address() + |> case do + {:ok, normalized_ip} -> {:ok, to_inet(normalized_ip)} + {:error, _reason} -> {:error, :invalid_ip_address} + end + end + + defp normalize_ip(_ip_address), do: {:error, :invalid_ip_address} + + defp to_inet({_, _, _, _} = ip_tuple), do: %Postgrex.INET{address: ip_tuple, netmask: 32} + + defp to_inet({_, _, _, _, _, _, _, _} = ip_tuple), + do: %Postgrex.INET{address: ip_tuple, netmask: 128} end diff --git a/test/parrhesia/storage/adapters/postgres/adapter_contract_test.exs b/test/parrhesia/storage/adapters/postgres/adapter_contract_test.exs new file mode 100644 index 0000000..4f873bb --- /dev/null +++ b/test/parrhesia/storage/adapters/postgres/adapter_contract_test.exs @@ -0,0 +1,130 @@ +defmodule Parrhesia.Storage.Adapters.Postgres.AdapterContractTest do + use ExUnit.Case, async: false + + alias Ecto.Adapters.SQL.Sandbox + alias Parrhesia.Repo + alias Parrhesia.Storage.Adapters.Postgres.Admin + alias Parrhesia.Storage.Adapters.Postgres.Groups + alias Parrhesia.Storage.Adapters.Postgres.Moderation + + setup_all do + start_supervised!(Repo) + Sandbox.mode(Repo, :manual) + :ok + end + + setup do + :ok = Sandbox.checkout(Repo) + end + + test "moderation adapter persists pubkey/event/ip block state" do + pubkey = String.duplicate("a", 64) + event_id = String.duplicate("b", 64) + + assert {:ok, false} = Moderation.pubkey_banned?(%{}, pubkey) + assert :ok = Moderation.ban_pubkey(%{}, pubkey) + assert :ok = Moderation.ban_pubkey(%{}, pubkey) + assert {:ok, true} = Moderation.pubkey_banned?(%{}, pubkey) + assert :ok = Moderation.unban_pubkey(%{}, pubkey) + assert {:ok, false} = Moderation.pubkey_banned?(%{}, pubkey) + + assert {:ok, false} = Moderation.pubkey_allowed?(%{}, pubkey) + assert :ok = Moderation.allow_pubkey(%{}, pubkey) + assert {:ok, true} = Moderation.pubkey_allowed?(%{}, pubkey) + assert :ok = Moderation.disallow_pubkey(%{}, pubkey) + assert {:ok, false} = Moderation.pubkey_allowed?(%{}, pubkey) + + assert {:ok, false} = Moderation.event_banned?(%{}, event_id) + assert :ok = Moderation.ban_event(%{}, event_id) + assert {:ok, true} = Moderation.event_banned?(%{}, event_id) + assert :ok = Moderation.unban_event(%{}, event_id) + assert {:ok, false} = Moderation.event_banned?(%{}, event_id) + + assert {:ok, false} = Moderation.ip_blocked?(%{}, "127.0.0.1") + assert :ok = Moderation.block_ip(%{}, "127.0.0.1") + assert {:ok, true} = Moderation.ip_blocked?(%{}, "127.0.0.1") + assert :ok = Moderation.unblock_ip(%{}, "127.0.0.1") + assert {:ok, false} = Moderation.ip_blocked?(%{}, "127.0.0.1") + end + + test "groups adapter upserts and lists memberships and roles" do + group_id = "group-alpha" + member_pubkey = String.duplicate("c", 64) + + assert {:ok, membership} = + Groups.put_membership(%{}, %{ + group_id: group_id, + pubkey: member_pubkey, + role: "member", + metadata: %{"joined_via" => "invite"} + }) + + assert membership.group_id == group_id + assert membership.pubkey == member_pubkey + assert membership.role == "member" + + assert {:ok, fetched_membership} = Groups.get_membership(%{}, group_id, member_pubkey) + assert fetched_membership.metadata == %{"joined_via" => "invite"} + + assert {:ok, updated_membership} = + Groups.put_membership(%{}, %{ + "group_id" => group_id, + "pubkey" => member_pubkey, + "role" => "admin", + "metadata" => %{"joined_via" => "promoted"} + }) + + assert updated_membership.role == "admin" + + assert {:ok, memberships} = Groups.list_memberships(%{}, group_id) + assert Enum.map(memberships, &{&1.pubkey, &1.role}) == [{member_pubkey, "admin"}] + + assert {:ok, role} = + Groups.put_role(%{}, %{ + group_id: group_id, + pubkey: member_pubkey, + role: "moderator", + metadata: %{"scope" => "global"} + }) + + assert role.role == "moderator" + + assert {:ok, roles} = Groups.list_roles(%{}, group_id, member_pubkey) + assert Enum.map(roles, & &1.role) == ["moderator"] + + assert :ok = Groups.delete_role(%{}, group_id, member_pubkey, "moderator") + assert {:ok, []} = Groups.list_roles(%{}, group_id, member_pubkey) + + assert :ok = Groups.delete_membership(%{}, group_id, member_pubkey) + assert {:ok, nil} = Groups.get_membership(%{}, group_id, member_pubkey) + end + + test "admin adapter appends and filters audit logs" do + actor_pubkey = String.duplicate("d", 64) + + assert :ok = + Admin.append_audit_log(%{}, %{ + method: "ban_pubkey", + actor_pubkey: actor_pubkey, + params: %{"pubkey" => String.duplicate("e", 64)}, + result: %{"ok" => true} + }) + + assert :ok = + Admin.append_audit_log(%{}, %{ + method: "stats", + params: %{"window" => "24h"} + }) + + assert {:ok, logs} = Admin.list_audit_logs(%{}, limit: 10) + assert length(logs) == 2 + + assert {:ok, actor_logs} = Admin.list_audit_logs(%{}, actor_pubkey: actor_pubkey) + assert Enum.map(actor_logs, & &1.method) == ["ban_pubkey"] + + assert {:ok, stats_logs} = Admin.list_audit_logs(%{}, method: :stats) + assert Enum.map(stats_logs, & &1.method) == ["stats"] + + assert {:error, {:unsupported_method, "status"}} = Admin.execute(%{}, :status, %{}) + end +end diff --git a/test/parrhesia/storage/adapters/postgres/events_query_count_test.exs b/test/parrhesia/storage/adapters/postgres/events_query_count_test.exs index 29f350b..03e6c79 100644 --- a/test/parrhesia/storage/adapters/postgres/events_query_count_test.exs +++ b/test/parrhesia/storage/adapters/postgres/events_query_count_test.exs @@ -152,6 +152,71 @@ defmodule Parrhesia.Storage.Adapters.Postgres.EventsQueryCountTest do assert {:ok, 2} = Events.count(%{}, filters, now: now) end + test "replaceable events expose only the current winner" do + author = String.duplicate("a", 64) + + older = + persist_event(%{ + "pubkey" => author, + "created_at" => 1_700_000_300, + "kind" => 0, + "content" => "profile-v1" + }) + + newer = + persist_event(%{ + "pubkey" => author, + "created_at" => 1_700_000_301, + "kind" => 0, + "content" => "profile-v2" + }) + + assert {:ok, [result]} = Events.query(%{}, [%{"authors" => [author], "kinds" => [0]}], []) + assert result["id"] == newer["id"] + + assert {:ok, nil} = Events.get_event(%{}, older["id"]) + assert {:ok, persisted_newer} = Events.get_event(%{}, newer["id"]) + assert persisted_newer["id"] == newer["id"] + + assert {:ok, 1} = Events.count(%{}, [%{"ids" => [older["id"], newer["id"]]}], []) + end + + test "addressable events tie-break by lexical id for identical timestamps" do + author = String.duplicate("b", 64) + + first = + persist_event(%{ + "pubkey" => author, + "created_at" => 1_700_000_400, + "kind" => 30_023, + "tags" => [["d", "topic"]], + "content" => "version-a" + }) + + second = + persist_event(%{ + "pubkey" => author, + "created_at" => 1_700_000_400, + "kind" => 30_023, + "tags" => [["d", "topic"]], + "content" => "version-b" + }) + + winner_id = Enum.min([first["id"], second["id"]]) + loser_id = Enum.max([first["id"], second["id"]]) + + assert {:ok, [result]} = + Events.query( + %{}, + [%{"authors" => [author], "kinds" => [30_023], "#d" => ["topic"]}], + [] + ) + + assert result["id"] == winner_id + assert {:ok, nil} = Events.get_event(%{}, loser_id) + assert {:ok, 1} = Events.count(%{}, [%{"ids" => [first["id"], second["id"]]}], []) + end + defp persist_event(overrides) do event = build_event(overrides) assert {:ok, _persisted} = Events.put_event(%{}, event)