Implement remaining Postgres storage adapters and contracts

This commit is contained in:
2026-03-13 20:46:50 +01:00
parent 693786615f
commit 336b192492
7 changed files with 905 additions and 49 deletions

View File

@@ -21,10 +21,10 @@ Implementation checklist for Parrhesia relay.
## Phase 2 — storage boundary + postgres adapter
- [x] Define `Parrhesia.Storage.*` behaviors (events/moderation/groups/admin)
- [ ] Implement Postgres adapter modules behind behaviors
- [x] Implement Postgres adapter modules behind behaviors
- [x] Create migrations for events, tags, moderation, membership
- [ ] Implement replaceable/addressable semantics at storage layer
- [ ] Add adapter contract test suite
- [x] Implement replaceable/addressable semantics at storage layer
- [x] Add adapter contract test suite
## Phase 3 — fanout + performance primitives

View File

@@ -1,19 +1,175 @@
defmodule Parrhesia.Storage.Adapters.Postgres.Admin do
@moduledoc """
PostgreSQL-backed implementation for `Parrhesia.Storage.Admin`.
Implementation is intentionally staged; callbacks currently return
`{:error, :not_implemented}` until NIP-86 management storage lands.
"""
import Ecto.Query
alias Parrhesia.Repo
@behaviour Parrhesia.Storage.Admin
@impl true
def execute(_context, _method, _params), do: {:error, :not_implemented}
@default_limit 100
@max_limit 1_000
@impl true
def append_audit_log(_context, _entry), do: {:error, :not_implemented}
def execute(_context, method, _params) do
{:error, {:unsupported_method, normalize_method_name(method)}}
end
@impl true
def list_audit_logs(_context, _opts), do: {:error, :not_implemented}
def append_audit_log(_context, audit_entry) when is_map(audit_entry) do
with {:ok, method} <- fetch_required_method(audit_entry),
{:ok, actor_pubkey} <- fetch_optional_pubkey(audit_entry),
{:ok, params} <- fetch_optional_map(audit_entry, :params),
{:ok, result} <- fetch_optional_map(audit_entry, :result, true) do
now = DateTime.utc_now() |> DateTime.truncate(:microsecond)
{inserted, _result} =
Repo.insert_all("management_audit_logs", [
audit_log_row(method, actor_pubkey, params, result, now)
])
if inserted == 1 do
:ok
else
{:error, :audit_log_insert_failed}
end
end
end
def append_audit_log(_context, _audit_entry), do: {:error, :invalid_audit_entry}
@impl true
def list_audit_logs(_context, opts) when is_list(opts) do
limit = normalize_limit(Keyword.get(opts, :limit, @default_limit))
query =
from(log in "management_audit_logs",
order_by: [desc: log.inserted_at, desc: log.id],
limit: ^limit,
select: %{
id: log.id,
actor_pubkey: log.actor_pubkey,
method: log.method,
params: log.params,
result: log.result,
inserted_at: log.inserted_at
}
)
|> maybe_filter_method(Keyword.get(opts, :method))
|> maybe_filter_actor_pubkey(Keyword.get(opts, :actor_pubkey))
logs =
query
|> Repo.all()
|> Enum.map(&to_audit_log_map/1)
{:ok, logs}
end
def list_audit_logs(_context, _opts), do: {:error, :invalid_opts}
defp fetch_required_method(audit_entry) do
audit_entry
|> fetch_value(:method)
|> normalize_non_empty_string(:invalid_method)
end
defp fetch_optional_pubkey(audit_entry) do
case fetch_value(audit_entry, :actor_pubkey) do
nil -> {:ok, nil}
value -> normalize_pubkey(value)
end
end
defp fetch_optional_map(audit_entry, key, allow_nil \\ false) do
case fetch_value(audit_entry, key) do
nil when allow_nil -> {:ok, nil}
nil -> {:ok, %{}}
value when is_map(value) -> {:ok, value}
_value -> {:error, invalid_key_reason(key)}
end
end
defp fetch_value(map, key) when is_map(map) do
Map.get(map, key) || Map.get(map, Atom.to_string(key))
end
defp normalize_method_name(method) when is_atom(method), do: Atom.to_string(method)
defp normalize_method_name(method) when is_binary(method), do: method
defp normalize_method_name(method), do: inspect(method)
defp normalize_non_empty_string(value, _reason) when is_binary(value) and value != "",
do: {:ok, value}
defp normalize_non_empty_string(_value, reason), do: {:error, reason}
defp normalize_pubkey(value) when is_binary(value) and byte_size(value) == 32, do: {:ok, value}
defp normalize_pubkey(value) when is_binary(value) and byte_size(value) == 64 do
case Base.decode16(value, case: :mixed) do
{:ok, pubkey} -> {:ok, pubkey}
:error -> {:error, :invalid_actor_pubkey}
end
end
defp normalize_pubkey(_value), do: {:error, :invalid_actor_pubkey}
defp invalid_key_reason(:params), do: :invalid_params
defp invalid_key_reason(:result), do: :invalid_result
defp audit_log_row(method, actor_pubkey, params, result, inserted_at) do
%{
method: method,
actor_pubkey: actor_pubkey,
params: params,
result: result,
inserted_at: inserted_at
}
end
defp normalize_limit(limit) when is_integer(limit) and limit > 0 do
min(limit, @max_limit)
end
defp normalize_limit(_limit), do: @default_limit
defp maybe_filter_method(query, nil), do: query
defp maybe_filter_method(query, method) when is_atom(method) do
maybe_filter_method(query, Atom.to_string(method))
end
defp maybe_filter_method(query, method) when is_binary(method) and method != "" do
where(query, [log], log.method == ^method)
end
defp maybe_filter_method(query, _method), do: query
defp maybe_filter_actor_pubkey(query, nil), do: query
defp maybe_filter_actor_pubkey(query, actor_pubkey) do
case normalize_pubkey(actor_pubkey) do
{:ok, normalized_actor_pubkey} ->
where(query, [log], log.actor_pubkey == ^normalized_actor_pubkey)
{:error, _reason} ->
where(query, [log], false)
end
end
defp to_audit_log_map(log) do
%{
id: log.id,
actor_pubkey: encode_optional_hex(log.actor_pubkey),
method: log.method,
params: log.params,
result: log.result,
inserted_at: log.inserted_at
}
end
defp encode_optional_hex(nil), do: nil
defp encode_optional_hex(value), do: Base.encode16(value, case: :lower)
end

View File

@@ -255,12 +255,14 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Events do
end
defp upsert_state_tables!(normalized_event, now) do
:ok = maybe_upsert_replaceable_state(normalized_event, now)
:ok = maybe_upsert_addressable_state(normalized_event, now)
deleted_at = DateTime.to_unix(now, :second)
:ok = maybe_upsert_replaceable_state(normalized_event, now, deleted_at)
:ok = maybe_upsert_addressable_state(normalized_event, now, deleted_at)
:ok
end
defp maybe_upsert_replaceable_state(normalized_event, now) do
defp maybe_upsert_replaceable_state(normalized_event, now, deleted_at) do
if replaceable_kind?(normalized_event.kind) do
lookup_query =
from(state in "replaceable_event_state",
@@ -283,6 +285,7 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Events do
replaceable_state_row(normalized_event, now),
normalized_event,
now,
deleted_at,
:replaceable_state_update_failed
)
else
@@ -290,7 +293,7 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Events do
end
end
defp maybe_upsert_addressable_state(normalized_event, now) do
defp maybe_upsert_addressable_state(normalized_event, now, deleted_at) do
if addressable_kind?(normalized_event.kind) do
lookup_query =
from(state in "addressable_event_state",
@@ -316,6 +319,7 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Events do
addressable_state_row(normalized_event, now),
normalized_event,
now,
deleted_at,
:addressable_state_update_failed
)
else
@@ -330,24 +334,95 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Events do
insert_row,
normalized_event,
now,
deleted_at,
failure_reason
) do
case Repo.one(lookup_query) do
nil ->
{inserted, _result} = Repo.insert_all(table_name, [insert_row], on_conflict: :nothing)
if inserted <= 1 do
:ok
else
Repo.rollback(failure_reason)
end
insert_state_or_resolve_race(
table_name,
lookup_query,
update_query,
insert_row,
normalized_event,
now,
deleted_at,
failure_reason
)
current_state ->
maybe_update_state(update_query, normalized_event, current_state, now, failure_reason)
maybe_update_state(
update_query,
normalized_event,
current_state,
now,
deleted_at,
failure_reason
)
end
end
defp maybe_update_state(update_query, normalized_event, current_state, now, failure_reason) do
defp insert_state_or_resolve_race(
table_name,
lookup_query,
update_query,
insert_row,
normalized_event,
now,
deleted_at,
failure_reason
) do
case Repo.insert_all(table_name, [insert_row], on_conflict: :nothing) do
{1, _result} ->
:ok
{0, _result} ->
resolve_state_race(
lookup_query,
update_query,
normalized_event,
now,
deleted_at,
failure_reason
)
{_inserted, _result} ->
Repo.rollback(failure_reason)
end
end
defp resolve_state_race(
lookup_query,
update_query,
normalized_event,
now,
deleted_at,
failure_reason
) do
case Repo.one(lookup_query) do
nil ->
Repo.rollback(failure_reason)
current_state ->
maybe_update_state(
update_query,
normalized_event,
current_state,
now,
deleted_at,
failure_reason
)
end
end
defp maybe_update_state(
update_query,
normalized_event,
current_state,
now,
deleted_at,
failure_reason
) do
if candidate_wins_state?(normalized_event, current_state) do
{updated, _result} =
Repo.update_all(update_query,
@@ -359,12 +434,36 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Events do
)
if updated == 1 do
:ok
retire_event!(
current_state.event_created_at,
current_state.event_id,
deleted_at,
failure_reason
)
else
Repo.rollback(failure_reason)
end
else
retire_event!(normalized_event.created_at, normalized_event.id, deleted_at, failure_reason)
end
end
defp retire_event!(event_created_at, event_id, deleted_at, failure_reason) do
{updated, _result} =
Repo.update_all(
from(event in "events",
where:
event.created_at == ^event_created_at and
event.id == ^event_id and
is_nil(event.deleted_at)
),
set: [deleted_at: deleted_at]
)
if updated in [0, 1] do
:ok
else
Repo.rollback(failure_reason)
end
end

View File

@@ -1,31 +1,311 @@
defmodule Parrhesia.Storage.Adapters.Postgres.Groups do
@moduledoc """
PostgreSQL-backed implementation for `Parrhesia.Storage.Groups`.
Implementation is intentionally staged; callbacks currently return
`{:error, :not_implemented}` until group/membership schema lands.
"""
import Ecto.Query
alias Parrhesia.Repo
@behaviour Parrhesia.Storage.Groups
@impl true
def put_membership(_context, _membership), do: {:error, :not_implemented}
def put_membership(_context, membership) when is_map(membership) do
with {:ok, group_id} <- fetch_required_string(membership, :group_id),
{:ok, pubkey} <- fetch_required_pubkey(membership),
{:ok, role} <- fetch_required_string(membership, :role),
{:ok, metadata} <- fetch_map(membership, :metadata) do
now = DateTime.utc_now() |> DateTime.truncate(:microsecond)
Repo.transaction(fn ->
ensure_group_exists!(group_id, now)
upsert_group_membership!(group_id, pubkey, role, metadata, now)
to_membership_map(group_id, pubkey, role, metadata)
end)
|> unwrap_transaction_result()
end
end
def put_membership(_context, _membership), do: {:error, :invalid_membership}
@impl true
def get_membership(_context, _group_id, _pubkey), do: {:error, :not_implemented}
def get_membership(_context, group_id, pubkey) do
with {:ok, normalized_group_id} <- normalize_group_id(group_id),
{:ok, normalized_pubkey} <- normalize_pubkey(pubkey) do
query =
from(membership in "group_memberships",
where:
membership.group_id == ^normalized_group_id and
membership.pubkey == ^normalized_pubkey,
select: %{
group_id: membership.group_id,
pubkey: membership.pubkey,
role: membership.role,
metadata: membership.metadata
},
limit: 1
)
case Repo.one(query) do
nil ->
{:ok, nil}
membership ->
{:ok,
to_membership_map(
membership.group_id,
membership.pubkey,
membership.role,
membership.metadata
)}
end
end
end
@impl true
def delete_membership(_context, _group_id, _pubkey), do: {:error, :not_implemented}
def delete_membership(_context, group_id, pubkey) do
with {:ok, normalized_group_id} <- normalize_group_id(group_id),
{:ok, normalized_pubkey} <- normalize_pubkey(pubkey) do
query =
from(membership in "group_memberships",
where:
membership.group_id == ^normalized_group_id and
membership.pubkey == ^normalized_pubkey
)
{_deleted, _result} = Repo.delete_all(query)
:ok
end
end
@impl true
def list_memberships(_context, _group_id), do: {:error, :not_implemented}
def list_memberships(_context, group_id) do
with {:ok, normalized_group_id} <- normalize_group_id(group_id) do
query =
from(membership in "group_memberships",
where: membership.group_id == ^normalized_group_id,
order_by: [asc: membership.role, asc: membership.pubkey],
select: %{
group_id: membership.group_id,
pubkey: membership.pubkey,
role: membership.role,
metadata: membership.metadata
}
)
memberships =
query
|> Repo.all()
|> Enum.map(fn membership ->
to_membership_map(
membership.group_id,
membership.pubkey,
membership.role,
membership.metadata
)
end)
{:ok, memberships}
end
end
@impl true
def put_role(_context, _role), do: {:error, :not_implemented}
def put_role(_context, role) when is_map(role) do
with {:ok, group_id} <- fetch_required_string(role, :group_id),
{:ok, pubkey} <- fetch_required_pubkey(role),
{:ok, role_name} <- fetch_required_string(role, :role),
{:ok, metadata} <- fetch_map(role, :metadata) do
now = DateTime.utc_now() |> DateTime.truncate(:microsecond)
Repo.transaction(fn ->
ensure_group_exists!(group_id, now)
upsert_group_role!(group_id, pubkey, role_name, metadata, now)
to_role_map(group_id, pubkey, role_name, metadata)
end)
|> unwrap_transaction_result()
end
end
def put_role(_context, _role), do: {:error, :invalid_role}
@impl true
def delete_role(_context, _group_id, _pubkey, _role), do: {:error, :not_implemented}
def delete_role(_context, group_id, pubkey, role_name) do
with {:ok, normalized_group_id} <- normalize_group_id(group_id),
{:ok, normalized_pubkey} <- normalize_pubkey(pubkey),
{:ok, normalized_role_name} <- normalize_role(role_name) do
query =
from(role in "group_roles",
where:
role.group_id == ^normalized_group_id and
role.pubkey == ^normalized_pubkey and
role.role == ^normalized_role_name
)
{_deleted, _result} = Repo.delete_all(query)
:ok
end
end
@impl true
def list_roles(_context, _group_id, _pubkey), do: {:error, :not_implemented}
def list_roles(_context, group_id, pubkey) do
with {:ok, normalized_group_id} <- normalize_group_id(group_id),
{:ok, normalized_pubkey} <- normalize_pubkey(pubkey) do
query =
from(role in "group_roles",
where: role.group_id == ^normalized_group_id and role.pubkey == ^normalized_pubkey,
order_by: [asc: role.role],
select: %{
group_id: role.group_id,
pubkey: role.pubkey,
role: role.role,
metadata: role.metadata
}
)
roles =
query
|> Repo.all()
|> Enum.map(fn role ->
to_role_map(role.group_id, role.pubkey, role.role, role.metadata)
end)
{:ok, roles}
end
end
defp ensure_group_exists!(group_id, now) do
{inserted, _result} =
Repo.insert_all(
"relay_groups",
[
%{
group_id: group_id,
metadata: %{},
inserted_at: now,
updated_at: now
}
],
on_conflict: :nothing,
conflict_target: [:group_id]
)
ensure_single_upsert_row!(inserted, :group_upsert_failed)
end
defp upsert_group_membership!(group_id, pubkey, role, metadata, now) do
{inserted, _result} =
Repo.insert_all(
"group_memberships",
[
%{
group_id: group_id,
pubkey: pubkey,
role: role,
metadata: metadata,
inserted_at: now,
updated_at: now
}
],
on_conflict: [set: [role: role, metadata: metadata, updated_at: now]],
conflict_target: [:group_id, :pubkey]
)
ensure_single_upsert_row!(inserted, :membership_upsert_failed)
end
defp upsert_group_role!(group_id, pubkey, role_name, metadata, now) do
{inserted, _result} =
Repo.insert_all(
"group_roles",
[
%{
group_id: group_id,
pubkey: pubkey,
role: role_name,
metadata: metadata,
inserted_at: now,
updated_at: now
}
],
on_conflict: [set: [metadata: metadata, updated_at: now]],
conflict_target: [:group_id, :pubkey, :role]
)
ensure_single_upsert_row!(inserted, :role_upsert_failed)
end
defp ensure_single_upsert_row!(inserted, _failure_reason) when inserted <= 1, do: :ok
defp ensure_single_upsert_row!(_inserted, failure_reason) do
Repo.rollback(failure_reason)
end
defp unwrap_transaction_result({:ok, result}), do: {:ok, result}
defp unwrap_transaction_result({:error, reason}), do: {:error, reason}
defp fetch_required_string(map, key) do
map
|> fetch_value(key)
|> normalize_non_empty_string(invalid_key_reason(key))
end
defp fetch_required_pubkey(map) do
map
|> fetch_value(:pubkey)
|> normalize_pubkey()
end
defp fetch_map(map, key) do
case fetch_value(map, key) do
nil -> {:ok, %{}}
value when is_map(value) -> {:ok, value}
_value -> {:error, invalid_key_reason(key)}
end
end
defp fetch_value(map, key) when is_map(map) do
Map.get(map, key) || Map.get(map, Atom.to_string(key))
end
defp normalize_group_id(group_id), do: normalize_non_empty_string(group_id, :invalid_group_id)
defp normalize_role(role_name), do: normalize_non_empty_string(role_name, :invalid_role)
defp normalize_non_empty_string(value, _reason) when is_binary(value) and value != "",
do: {:ok, value}
defp normalize_non_empty_string(_value, reason), do: {:error, reason}
defp normalize_pubkey(value) when is_binary(value) and byte_size(value) == 32, do: {:ok, value}
defp normalize_pubkey(value) when is_binary(value) and byte_size(value) == 64 do
case Base.decode16(value, case: :mixed) do
{:ok, pubkey} -> {:ok, pubkey}
:error -> {:error, :invalid_pubkey}
end
end
defp normalize_pubkey(_value), do: {:error, :invalid_pubkey}
defp invalid_key_reason(:group_id), do: :invalid_group_id
defp invalid_key_reason(:pubkey), do: :invalid_pubkey
defp invalid_key_reason(:role), do: :invalid_role
defp invalid_key_reason(:metadata), do: :invalid_metadata
defp to_membership_map(group_id, pubkey, role, metadata) do
%{
group_id: group_id,
pubkey: Base.encode16(pubkey, case: :lower),
role: role,
metadata: metadata
}
end
defp to_role_map(group_id, pubkey, role_name, metadata) do
%{
group_id: group_id,
pubkey: Base.encode16(pubkey, case: :lower),
role: role_name,
metadata: metadata
}
end
end

View File

@@ -1,46 +1,172 @@
defmodule Parrhesia.Storage.Adapters.Postgres.Moderation do
@moduledoc """
PostgreSQL-backed implementation for `Parrhesia.Storage.Moderation`.
Implementation is intentionally staged; callbacks currently return
`{:error, :not_implemented}` until table design and policy paths land.
"""
import Ecto.Query
alias Parrhesia.Repo
@behaviour Parrhesia.Storage.Moderation
@impl true
def ban_pubkey(_context, _pubkey), do: {:error, :not_implemented}
def ban_pubkey(_context, pubkey) do
with {:ok, normalized_pubkey} <- normalize_hex_or_binary(pubkey, 32, :invalid_pubkey) do
upsert_presence_table("banned_pubkeys", :pubkey, normalized_pubkey)
end
end
@impl true
def unban_pubkey(_context, _pubkey), do: {:error, :not_implemented}
def unban_pubkey(_context, pubkey) do
with {:ok, normalized_pubkey} <- normalize_hex_or_binary(pubkey, 32, :invalid_pubkey) do
delete_from_table("banned_pubkeys", :pubkey, normalized_pubkey)
end
end
@impl true
def pubkey_banned?(_context, _pubkey), do: {:error, :not_implemented}
def pubkey_banned?(_context, pubkey) do
with {:ok, normalized_pubkey} <- normalize_hex_or_binary(pubkey, 32, :invalid_pubkey) do
{:ok, exists_in_table?("banned_pubkeys", :pubkey, normalized_pubkey)}
end
end
@impl true
def allow_pubkey(_context, _pubkey), do: {:error, :not_implemented}
def allow_pubkey(_context, pubkey) do
with {:ok, normalized_pubkey} <- normalize_hex_or_binary(pubkey, 32, :invalid_pubkey) do
upsert_presence_table("allowed_pubkeys", :pubkey, normalized_pubkey)
end
end
@impl true
def disallow_pubkey(_context, _pubkey), do: {:error, :not_implemented}
def disallow_pubkey(_context, pubkey) do
with {:ok, normalized_pubkey} <- normalize_hex_or_binary(pubkey, 32, :invalid_pubkey) do
delete_from_table("allowed_pubkeys", :pubkey, normalized_pubkey)
end
end
@impl true
def pubkey_allowed?(_context, _pubkey), do: {:error, :not_implemented}
def pubkey_allowed?(_context, pubkey) do
with {:ok, normalized_pubkey} <- normalize_hex_or_binary(pubkey, 32, :invalid_pubkey) do
{:ok, exists_in_table?("allowed_pubkeys", :pubkey, normalized_pubkey)}
end
end
@impl true
def ban_event(_context, _event_id), do: {:error, :not_implemented}
def ban_event(_context, event_id) do
with {:ok, normalized_event_id} <- normalize_hex_or_binary(event_id, 32, :invalid_event_id) do
upsert_presence_table("banned_events", :event_id, normalized_event_id)
end
end
@impl true
def unban_event(_context, _event_id), do: {:error, :not_implemented}
def unban_event(_context, event_id) do
with {:ok, normalized_event_id} <- normalize_hex_or_binary(event_id, 32, :invalid_event_id) do
delete_from_table("banned_events", :event_id, normalized_event_id)
end
end
@impl true
def event_banned?(_context, _event_id), do: {:error, :not_implemented}
def event_banned?(_context, event_id) do
with {:ok, normalized_event_id} <- normalize_hex_or_binary(event_id, 32, :invalid_event_id) do
{:ok, exists_in_table?("banned_events", :event_id, normalized_event_id)}
end
end
@impl true
def block_ip(_context, _ip_address), do: {:error, :not_implemented}
def block_ip(_context, ip_address) do
with {:ok, normalized_ip} <- normalize_ip(ip_address) do
upsert_presence_table("blocked_ips", :ip, normalized_ip)
end
end
@impl true
def unblock_ip(_context, _ip_address), do: {:error, :not_implemented}
def unblock_ip(_context, ip_address) do
with {:ok, normalized_ip} <- normalize_ip(ip_address) do
delete_from_table("blocked_ips", :ip, normalized_ip)
end
end
@impl true
def ip_blocked?(_context, _ip_address), do: {:error, :not_implemented}
def ip_blocked?(_context, ip_address) do
with {:ok, normalized_ip} <- normalize_ip(ip_address) do
{:ok, exists_in_table?("blocked_ips", :ip, normalized_ip)}
end
end
defp upsert_presence_table(table, field, value) do
now = DateTime.utc_now() |> DateTime.truncate(:microsecond)
{inserted, _result} =
Repo.insert_all(
table,
[
%{
field => value,
inserted_at: now
}
],
on_conflict: :nothing,
conflict_target: [field]
)
if inserted <= 1 do
:ok
else
{:error, :insert_failed}
end
end
defp delete_from_table(table, field, value) do
query = from(record in table, where: field(record, ^field) == ^value)
{_deleted, _result} = Repo.delete_all(query)
:ok
end
defp exists_in_table?(table, field, value) do
query =
from(record in table,
where: field(record, ^field) == ^value,
select: 1,
limit: 1
)
Repo.one(query) == 1
end
defp normalize_hex_or_binary(value, expected_bytes, _reason)
when is_binary(value) and byte_size(value) == expected_bytes,
do: {:ok, value}
defp normalize_hex_or_binary(value, expected_bytes, reason) when is_binary(value) do
if byte_size(value) == expected_bytes * 2 do
case Base.decode16(value, case: :mixed) do
{:ok, decoded} -> {:ok, decoded}
:error -> {:error, reason}
end
else
{:error, reason}
end
end
defp normalize_hex_or_binary(_value, _expected_bytes, reason), do: {:error, reason}
defp normalize_ip({_, _, _, _} = ip_tuple), do: {:ok, to_inet(ip_tuple)}
defp normalize_ip({_, _, _, _, _, _, _, _} = ip_tuple), do: {:ok, to_inet(ip_tuple)}
defp normalize_ip(ip_address) when is_binary(ip_address) do
ip_address
|> String.to_charlist()
|> :inet.parse_address()
|> case do
{:ok, normalized_ip} -> {:ok, to_inet(normalized_ip)}
{:error, _reason} -> {:error, :invalid_ip_address}
end
end
defp normalize_ip(_ip_address), do: {:error, :invalid_ip_address}
defp to_inet({_, _, _, _} = ip_tuple), do: %Postgrex.INET{address: ip_tuple, netmask: 32}
defp to_inet({_, _, _, _, _, _, _, _} = ip_tuple),
do: %Postgrex.INET{address: ip_tuple, netmask: 128}
end

View File

@@ -0,0 +1,130 @@
defmodule Parrhesia.Storage.Adapters.Postgres.AdapterContractTest do
use ExUnit.Case, async: false
alias Ecto.Adapters.SQL.Sandbox
alias Parrhesia.Repo
alias Parrhesia.Storage.Adapters.Postgres.Admin
alias Parrhesia.Storage.Adapters.Postgres.Groups
alias Parrhesia.Storage.Adapters.Postgres.Moderation
setup_all do
start_supervised!(Repo)
Sandbox.mode(Repo, :manual)
:ok
end
setup do
:ok = Sandbox.checkout(Repo)
end
test "moderation adapter persists pubkey/event/ip block state" do
pubkey = String.duplicate("a", 64)
event_id = String.duplicate("b", 64)
assert {:ok, false} = Moderation.pubkey_banned?(%{}, pubkey)
assert :ok = Moderation.ban_pubkey(%{}, pubkey)
assert :ok = Moderation.ban_pubkey(%{}, pubkey)
assert {:ok, true} = Moderation.pubkey_banned?(%{}, pubkey)
assert :ok = Moderation.unban_pubkey(%{}, pubkey)
assert {:ok, false} = Moderation.pubkey_banned?(%{}, pubkey)
assert {:ok, false} = Moderation.pubkey_allowed?(%{}, pubkey)
assert :ok = Moderation.allow_pubkey(%{}, pubkey)
assert {:ok, true} = Moderation.pubkey_allowed?(%{}, pubkey)
assert :ok = Moderation.disallow_pubkey(%{}, pubkey)
assert {:ok, false} = Moderation.pubkey_allowed?(%{}, pubkey)
assert {:ok, false} = Moderation.event_banned?(%{}, event_id)
assert :ok = Moderation.ban_event(%{}, event_id)
assert {:ok, true} = Moderation.event_banned?(%{}, event_id)
assert :ok = Moderation.unban_event(%{}, event_id)
assert {:ok, false} = Moderation.event_banned?(%{}, event_id)
assert {:ok, false} = Moderation.ip_blocked?(%{}, "127.0.0.1")
assert :ok = Moderation.block_ip(%{}, "127.0.0.1")
assert {:ok, true} = Moderation.ip_blocked?(%{}, "127.0.0.1")
assert :ok = Moderation.unblock_ip(%{}, "127.0.0.1")
assert {:ok, false} = Moderation.ip_blocked?(%{}, "127.0.0.1")
end
test "groups adapter upserts and lists memberships and roles" do
group_id = "group-alpha"
member_pubkey = String.duplicate("c", 64)
assert {:ok, membership} =
Groups.put_membership(%{}, %{
group_id: group_id,
pubkey: member_pubkey,
role: "member",
metadata: %{"joined_via" => "invite"}
})
assert membership.group_id == group_id
assert membership.pubkey == member_pubkey
assert membership.role == "member"
assert {:ok, fetched_membership} = Groups.get_membership(%{}, group_id, member_pubkey)
assert fetched_membership.metadata == %{"joined_via" => "invite"}
assert {:ok, updated_membership} =
Groups.put_membership(%{}, %{
"group_id" => group_id,
"pubkey" => member_pubkey,
"role" => "admin",
"metadata" => %{"joined_via" => "promoted"}
})
assert updated_membership.role == "admin"
assert {:ok, memberships} = Groups.list_memberships(%{}, group_id)
assert Enum.map(memberships, &{&1.pubkey, &1.role}) == [{member_pubkey, "admin"}]
assert {:ok, role} =
Groups.put_role(%{}, %{
group_id: group_id,
pubkey: member_pubkey,
role: "moderator",
metadata: %{"scope" => "global"}
})
assert role.role == "moderator"
assert {:ok, roles} = Groups.list_roles(%{}, group_id, member_pubkey)
assert Enum.map(roles, & &1.role) == ["moderator"]
assert :ok = Groups.delete_role(%{}, group_id, member_pubkey, "moderator")
assert {:ok, []} = Groups.list_roles(%{}, group_id, member_pubkey)
assert :ok = Groups.delete_membership(%{}, group_id, member_pubkey)
assert {:ok, nil} = Groups.get_membership(%{}, group_id, member_pubkey)
end
test "admin adapter appends and filters audit logs" do
actor_pubkey = String.duplicate("d", 64)
assert :ok =
Admin.append_audit_log(%{}, %{
method: "ban_pubkey",
actor_pubkey: actor_pubkey,
params: %{"pubkey" => String.duplicate("e", 64)},
result: %{"ok" => true}
})
assert :ok =
Admin.append_audit_log(%{}, %{
method: "stats",
params: %{"window" => "24h"}
})
assert {:ok, logs} = Admin.list_audit_logs(%{}, limit: 10)
assert length(logs) == 2
assert {:ok, actor_logs} = Admin.list_audit_logs(%{}, actor_pubkey: actor_pubkey)
assert Enum.map(actor_logs, & &1.method) == ["ban_pubkey"]
assert {:ok, stats_logs} = Admin.list_audit_logs(%{}, method: :stats)
assert Enum.map(stats_logs, & &1.method) == ["stats"]
assert {:error, {:unsupported_method, "status"}} = Admin.execute(%{}, :status, %{})
end
end

View File

@@ -152,6 +152,71 @@ defmodule Parrhesia.Storage.Adapters.Postgres.EventsQueryCountTest do
assert {:ok, 2} = Events.count(%{}, filters, now: now)
end
test "replaceable events expose only the current winner" do
author = String.duplicate("a", 64)
older =
persist_event(%{
"pubkey" => author,
"created_at" => 1_700_000_300,
"kind" => 0,
"content" => "profile-v1"
})
newer =
persist_event(%{
"pubkey" => author,
"created_at" => 1_700_000_301,
"kind" => 0,
"content" => "profile-v2"
})
assert {:ok, [result]} = Events.query(%{}, [%{"authors" => [author], "kinds" => [0]}], [])
assert result["id"] == newer["id"]
assert {:ok, nil} = Events.get_event(%{}, older["id"])
assert {:ok, persisted_newer} = Events.get_event(%{}, newer["id"])
assert persisted_newer["id"] == newer["id"]
assert {:ok, 1} = Events.count(%{}, [%{"ids" => [older["id"], newer["id"]]}], [])
end
test "addressable events tie-break by lexical id for identical timestamps" do
author = String.duplicate("b", 64)
first =
persist_event(%{
"pubkey" => author,
"created_at" => 1_700_000_400,
"kind" => 30_023,
"tags" => [["d", "topic"]],
"content" => "version-a"
})
second =
persist_event(%{
"pubkey" => author,
"created_at" => 1_700_000_400,
"kind" => 30_023,
"tags" => [["d", "topic"]],
"content" => "version-b"
})
winner_id = Enum.min([first["id"], second["id"]])
loser_id = Enum.max([first["id"], second["id"]])
assert {:ok, [result]} =
Events.query(
%{},
[%{"authors" => [author], "kinds" => [30_023], "#d" => ["topic"]}],
[]
)
assert result["id"] == winner_id
assert {:ok, nil} = Events.get_event(%{}, loser_id)
assert {:ok, 1} = Events.count(%{}, [%{"ids" => [first["id"], second["id"]]}], [])
end
defp persist_event(overrides) do
event = build_event(overrides)
assert {:ok, _persisted} = Events.put_event(%{}, event)