storage: add initial postgres event persistence and schema migration

This commit is contained in:
2026-03-13 20:29:58 +01:00
parent 7ec588805b
commit cd1adf94f0
6 changed files with 668 additions and 8 deletions

View File

@@ -22,7 +22,7 @@ Implementation checklist for Parrhesia relay.
- [x] Define `Parrhesia.Storage.*` behaviors (events/moderation/groups/admin)
- [ ] Implement Postgres adapter modules behind behaviors
- [ ] Create migrations for events, tags, moderation, membership
- [x] Create migrations for events, tags, moderation, membership
- [ ] Implement replaceable/addressable semantics at storage layer
- [ ] Add adapter contract test suite

12
lib/parrhesia/repo.ex Normal file
View File

@@ -0,0 +1,12 @@
defmodule Parrhesia.Repo do
@moduledoc """
PostgreSQL repository for storage adapter persistence.
Note: the repo is not yet started by the supervision tree while the
storage adapter is in staged implementation.
"""
use Ecto.Repo,
otp_app: :parrhesia,
adapter: Ecto.Adapters.Postgres
end

View File

@@ -1,18 +1,80 @@
defmodule Parrhesia.Storage.Adapters.Postgres.Events do
@moduledoc """
PostgreSQL-backed implementation for `Parrhesia.Storage.Events`.
Implementation is intentionally staged; callbacks currently return
`{:error, :not_implemented}` until migrations and query paths land.
"""
import Ecto.Query
alias Parrhesia.Repo
@behaviour Parrhesia.Storage.Events
@impl true
def put_event(_context, _event), do: {:error, :not_implemented}
@type normalized_event :: %{
id: binary(),
pubkey: binary(),
created_at: non_neg_integer(),
kind: non_neg_integer(),
content: String.t(),
sig: binary(),
d_tag: String.t(),
expires_at: non_neg_integer() | nil,
tags: [list(String.t())]
}
@impl true
def get_event(_context, _event_id), do: {:error, :not_implemented}
def put_event(_context, event) do
with {:ok, normalized} <- normalize_event(event) do
now = DateTime.utc_now() |> DateTime.truncate(:microsecond)
Repo.transaction(fn ->
insert_event_id!(normalized, now)
insert_event!(normalized, now)
insert_tags!(normalized, now)
upsert_state_tables!(normalized, now)
event
end)
|> case do
{:ok, persisted_event} -> {:ok, persisted_event}
{:error, :duplicate_event} -> {:error, :duplicate_event}
{:error, reason} -> {:error, reason}
end
end
end
@impl true
def get_event(_context, event_id) do
with {:ok, decoded_event_id} <- decode_hex(event_id, 32, :invalid_event_id) do
event_query =
from(event in "events",
where: event.id == ^decoded_event_id and is_nil(event.deleted_at),
order_by: [desc: event.created_at],
limit: 1,
select: %{
id: event.id,
pubkey: event.pubkey,
created_at: event.created_at,
kind: event.kind,
content: event.content,
sig: event.sig
}
)
case Repo.one(event_query) do
nil ->
{:ok, nil}
persisted_event ->
tags = load_tags([{persisted_event.created_at, persisted_event.id}])
{:ok,
to_nostr_event(
persisted_event,
Map.get(tags, {persisted_event.created_at, persisted_event.id}, [])
)}
end
end
end
@impl true
def query(_context, _filters, _opts), do: {:error, :not_implemented}
@@ -27,5 +89,377 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Events do
def vanish(_context, _event), do: {:error, :not_implemented}
@impl true
def purge_expired(_opts), do: {:error, :not_implemented}
def purge_expired(opts) when is_list(opts) do
now = Keyword.get(opts, :now, System.system_time(:second))
query =
from(event in "events",
where: not is_nil(event.expires_at) and event.expires_at <= ^now
)
{count, _result} = Repo.delete_all(query)
{:ok, count}
end
@doc false
@spec normalize_event(map()) :: {:ok, normalized_event()} | {:error, atom()}
def normalize_event(event) when is_map(event) do
with {:ok, id} <- decode_hex(Map.get(event, "id"), 32, :invalid_event_id),
{:ok, pubkey} <- decode_hex(Map.get(event, "pubkey"), 32, :invalid_pubkey),
{:ok, sig} <- decode_hex(Map.get(event, "sig"), 64, :invalid_sig),
{:ok, created_at} <-
validate_non_negative_integer(Map.get(event, "created_at"), :invalid_created_at),
{:ok, kind} <- validate_non_negative_integer(Map.get(event, "kind"), :invalid_kind),
{:ok, content} <- validate_binary(Map.get(event, "content"), :invalid_content),
{:ok, tags} <- validate_tags(Map.get(event, "tags")) do
{:ok,
%{
id: id,
pubkey: pubkey,
created_at: created_at,
kind: kind,
content: content,
sig: sig,
d_tag: extract_d_tag(tags),
expires_at: extract_expiration(tags),
tags: tags
}}
end
end
def normalize_event(_event), do: {:error, :invalid_event}
@doc false
@spec candidate_wins_state?(normalized_event(), map()) :: boolean()
def candidate_wins_state?(candidate, current) do
cond do
candidate.created_at > current.event_created_at -> true
candidate.created_at < current.event_created_at -> false
true -> candidate.id < current.event_id
end
end
defp insert_event_id!(normalized_event, now) do
{inserted, _result} =
Repo.insert_all(
"event_ids",
[
%{
id: normalized_event.id,
created_at: normalized_event.created_at,
inserted_at: now
}
],
on_conflict: :nothing,
conflict_target: [:id]
)
if inserted == 1 do
:ok
else
Repo.rollback(:duplicate_event)
end
end
defp insert_event!(normalized_event, now) do
{inserted, _result} =
Repo.insert_all("events", [event_row(normalized_event, now)])
if inserted == 1 do
:ok
else
Repo.rollback(:event_insert_failed)
end
end
defp insert_tags!(normalized_event, now) do
tag_rows =
normalized_event.tags
|> Enum.with_index()
|> Enum.flat_map(fn {tag, idx} ->
case tag do
[name, value | _rest] when is_binary(name) and is_binary(value) ->
[
%{
event_created_at: normalized_event.created_at,
event_id: normalized_event.id,
name: name,
value: value,
idx: idx,
inserted_at: now
}
]
_other ->
[]
end
end)
if tag_rows == [] do
:ok
else
{inserted, _result} = Repo.insert_all("event_tags", tag_rows)
if inserted == length(tag_rows) do
:ok
else
Repo.rollback(:tag_insert_failed)
end
end
end
defp upsert_state_tables!(normalized_event, now) do
:ok = maybe_upsert_replaceable_state(normalized_event, now)
:ok = maybe_upsert_addressable_state(normalized_event, now)
:ok
end
defp maybe_upsert_replaceable_state(normalized_event, now) do
if replaceable_kind?(normalized_event.kind) do
lookup_query =
from(state in "replaceable_event_state",
where:
state.pubkey == ^normalized_event.pubkey and state.kind == ^normalized_event.kind,
select: %{event_created_at: state.event_created_at, event_id: state.event_id}
)
update_query =
from(state in "replaceable_event_state",
where:
state.pubkey == ^normalized_event.pubkey and
state.kind == ^normalized_event.kind
)
upsert_state_table(
"replaceable_event_state",
lookup_query,
update_query,
replaceable_state_row(normalized_event, now),
normalized_event,
now,
:replaceable_state_update_failed
)
else
:ok
end
end
defp maybe_upsert_addressable_state(normalized_event, now) do
if addressable_kind?(normalized_event.kind) do
lookup_query =
from(state in "addressable_event_state",
where:
state.pubkey == ^normalized_event.pubkey and
state.kind == ^normalized_event.kind and
state.d_tag == ^normalized_event.d_tag,
select: %{event_created_at: state.event_created_at, event_id: state.event_id}
)
update_query =
from(state in "addressable_event_state",
where:
state.pubkey == ^normalized_event.pubkey and
state.kind == ^normalized_event.kind and
state.d_tag == ^normalized_event.d_tag
)
upsert_state_table(
"addressable_event_state",
lookup_query,
update_query,
addressable_state_row(normalized_event, now),
normalized_event,
now,
:addressable_state_update_failed
)
else
:ok
end
end
defp upsert_state_table(
table_name,
lookup_query,
update_query,
insert_row,
normalized_event,
now,
failure_reason
) do
case Repo.one(lookup_query) do
nil ->
{inserted, _result} = Repo.insert_all(table_name, [insert_row], on_conflict: :nothing)
if inserted <= 1 do
:ok
else
Repo.rollback(failure_reason)
end
current_state ->
maybe_update_state(update_query, normalized_event, current_state, now, failure_reason)
end
end
defp maybe_update_state(update_query, normalized_event, current_state, now, failure_reason) do
if candidate_wins_state?(normalized_event, current_state) do
{updated, _result} =
Repo.update_all(update_query,
set: [
event_created_at: normalized_event.created_at,
event_id: normalized_event.id,
updated_at: now
]
)
if updated == 1 do
:ok
else
Repo.rollback(failure_reason)
end
else
:ok
end
end
defp replaceable_kind?(kind), do: kind in [0, 3] or (kind >= 10_000 and kind < 20_000)
defp addressable_kind?(kind), do: kind >= 30_000 and kind < 40_000
defp replaceable_state_row(normalized_event, now) do
%{
pubkey: normalized_event.pubkey,
kind: normalized_event.kind,
event_created_at: normalized_event.created_at,
event_id: normalized_event.id,
inserted_at: now,
updated_at: now
}
end
defp addressable_state_row(normalized_event, now) do
%{
pubkey: normalized_event.pubkey,
kind: normalized_event.kind,
d_tag: normalized_event.d_tag,
event_created_at: normalized_event.created_at,
event_id: normalized_event.id,
inserted_at: now,
updated_at: now
}
end
defp event_row(normalized_event, now) do
%{
id: normalized_event.id,
pubkey: normalized_event.pubkey,
created_at: normalized_event.created_at,
kind: normalized_event.kind,
content: normalized_event.content,
sig: normalized_event.sig,
d_tag: normalized_event.d_tag,
expires_at: normalized_event.expires_at,
inserted_at: now,
updated_at: now
}
end
defp load_tags(event_keys) when is_list(event_keys) do
created_at_values = Enum.map(event_keys, fn {created_at, _event_id} -> created_at end)
event_id_values = Enum.map(event_keys, fn {_created_at, event_id} -> event_id end)
query =
from(tag in "event_tags",
where: tag.event_created_at in ^created_at_values and tag.event_id in ^event_id_values,
order_by: [asc: tag.idx],
select: %{
event_created_at: tag.event_created_at,
event_id: tag.event_id,
name: tag.name,
value: tag.value
}
)
query
|> Repo.all()
|> Enum.group_by(
fn tag -> {tag.event_created_at, tag.event_id} end,
fn tag -> [tag.name, tag.value] end
)
end
defp to_nostr_event(persisted_event, tags) do
%{
"id" => Base.encode16(persisted_event.id, case: :lower),
"pubkey" => Base.encode16(persisted_event.pubkey, case: :lower),
"created_at" => persisted_event.created_at,
"kind" => persisted_event.kind,
"tags" => tags,
"content" => persisted_event.content,
"sig" => Base.encode16(persisted_event.sig, case: :lower)
}
end
defp decode_hex(value, bytes, reason) when is_binary(value) do
if byte_size(value) == bytes * 2 do
case Base.decode16(value, case: :mixed) do
{:ok, decoded} -> {:ok, decoded}
:error -> {:error, reason}
end
else
{:error, reason}
end
end
defp decode_hex(_value, _bytes, reason), do: {:error, reason}
defp validate_non_negative_integer(value, _reason)
when is_integer(value) and value >= 0,
do: {:ok, value}
defp validate_non_negative_integer(_value, reason), do: {:error, reason}
defp validate_binary(value, _reason) when is_binary(value), do: {:ok, value}
defp validate_binary(_value, reason), do: {:error, reason}
defp validate_tags(tags) when is_list(tags) do
if Enum.all?(tags, &valid_tag?/1) do
{:ok, tags}
else
{:error, :invalid_tags}
end
end
defp validate_tags(_tags), do: {:error, :invalid_tags}
defp valid_tag?(tag) when is_list(tag) do
tag != [] and Enum.all?(tag, &is_binary/1)
end
defp valid_tag?(_tag), do: false
defp extract_d_tag(tags) do
tags
|> Enum.find_value("", fn
["d", value | _rest] when is_binary(value) -> value
_tag -> nil
end)
end
defp extract_expiration(tags) do
tags
|> Enum.find_value(fn
["expiration", unix_seconds | _rest] -> parse_unix_seconds(unix_seconds)
_tag -> nil
end)
end
defp parse_unix_seconds(unix_seconds) when is_binary(unix_seconds) do
case Integer.parse(unix_seconds) do
{parsed, ""} when parsed >= 0 -> parsed
_other -> nil
end
end
defp parse_unix_seconds(_unix_seconds), do: nil
end

View File

@@ -0,0 +1,159 @@
defmodule Parrhesia.Repo.Migrations.CreateRelayStorage do
use Ecto.Migration
def up do
create table(:event_ids, primary_key: false) do
add(:id, :binary, primary_key: true)
add(:created_at, :bigint, null: false)
timestamps(updated_at: false, type: :utc_datetime_usec)
end
create table(:events, primary_key: false, options: "PARTITION BY RANGE (created_at)") do
add(:created_at, :bigint, primary_key: true)
add(:id, :binary, primary_key: true)
add(:pubkey, :binary, null: false)
add(:kind, :integer, null: false)
add(:content, :text, null: false)
add(:sig, :binary, null: false)
add(:d_tag, :text)
add(:deleted_at, :bigint)
add(:expires_at, :bigint)
timestamps(type: :utc_datetime_usec)
end
execute("CREATE TABLE events_default PARTITION OF events DEFAULT")
execute("CREATE INDEX events_kind_created_at_idx ON events (kind, created_at DESC)")
execute("CREATE INDEX events_pubkey_created_at_idx ON events (pubkey, created_at DESC)")
execute("CREATE INDEX events_created_at_idx ON events (created_at DESC)")
create(index(:events, [:id]))
create(index(:events, [:expires_at], where: "expires_at IS NOT NULL"))
create(index(:events, [:deleted_at], where: "deleted_at IS NOT NULL"))
create table(:event_tags, primary_key: false) do
add(:event_created_at, :bigint, null: false)
add(:event_id, :binary, null: false)
add(:name, :string, null: false)
add(:value, :string, null: false)
add(:idx, :integer, null: false)
timestamps(updated_at: false, type: :utc_datetime_usec)
end
execute("""
ALTER TABLE event_tags
ADD CONSTRAINT event_tags_event_fk
FOREIGN KEY (event_created_at, event_id)
REFERENCES events (created_at, id)
ON DELETE CASCADE
""")
create(unique_index(:event_tags, [:event_created_at, :event_id, :idx]))
execute(
"CREATE INDEX event_tags_name_value_created_at_idx ON event_tags (name, value, event_created_at DESC)"
)
create(index(:event_tags, [:event_id]))
create table(:replaceable_event_state, primary_key: false) do
add(:pubkey, :binary, primary_key: true)
add(:kind, :integer, primary_key: true)
add(:event_created_at, :bigint, null: false)
add(:event_id, :binary, null: false)
timestamps(type: :utc_datetime_usec)
end
create table(:addressable_event_state, primary_key: false) do
add(:pubkey, :binary, primary_key: true)
add(:kind, :integer, primary_key: true)
add(:d_tag, :text, primary_key: true)
add(:event_created_at, :bigint, null: false)
add(:event_id, :binary, null: false)
timestamps(type: :utc_datetime_usec)
end
create table(:banned_pubkeys, primary_key: false) do
add(:pubkey, :binary, primary_key: true)
timestamps(updated_at: false, type: :utc_datetime_usec)
end
create table(:allowed_pubkeys, primary_key: false) do
add(:pubkey, :binary, primary_key: true)
timestamps(updated_at: false, type: :utc_datetime_usec)
end
create table(:banned_events, primary_key: false) do
add(:event_id, :binary, primary_key: true)
timestamps(updated_at: false, type: :utc_datetime_usec)
end
create table(:blocked_ips, primary_key: false) do
add(:ip, :inet, primary_key: true)
timestamps(updated_at: false, type: :utc_datetime_usec)
end
create table(:relay_groups, primary_key: false) do
add(:group_id, :text, primary_key: true)
add(:metadata, :map, null: false, default: %{})
timestamps(type: :utc_datetime_usec)
end
create table(:group_memberships, primary_key: false) do
add(
:group_id,
references(:relay_groups, column: :group_id, type: :text, on_delete: :delete_all),
primary_key: true
)
add(:pubkey, :binary, primary_key: true)
add(:role, :string, null: false)
add(:metadata, :map, null: false, default: %{})
timestamps(type: :utc_datetime_usec)
end
create(index(:group_memberships, [:pubkey]))
create table(:group_roles, primary_key: false) do
add(
:group_id,
references(:relay_groups, column: :group_id, type: :text, on_delete: :delete_all),
primary_key: true
)
add(:pubkey, :binary, primary_key: true)
add(:role, :string, primary_key: true)
add(:metadata, :map, null: false, default: %{})
timestamps(type: :utc_datetime_usec)
end
create table(:management_audit_logs) do
add(:actor_pubkey, :binary)
add(:method, :string, null: false)
add(:params, :map, null: false, default: %{})
add(:result, :map)
timestamps(updated_at: false, type: :utc_datetime_usec)
end
create(index(:management_audit_logs, [:actor_pubkey, :inserted_at]))
create(index(:management_audit_logs, [:method, :inserted_at]))
end
def down do
drop(table(:management_audit_logs))
drop(table(:group_roles))
drop(table(:group_memberships))
drop(table(:relay_groups))
drop(table(:blocked_ips))
drop(table(:banned_events))
drop(table(:allowed_pubkeys))
drop(table(:banned_pubkeys))
drop(table(:addressable_event_state))
drop(table(:replaceable_event_state))
drop(table(:event_tags))
execute("DROP TABLE events_default")
drop(table(:events))
drop(table(:event_ids))
end
end

3
priv/repo/seeds.exs Normal file
View File

@@ -0,0 +1,3 @@
# Seed data for Parrhesia.
#
# Intentionally empty for now.

View File

@@ -0,0 +1,52 @@
defmodule Parrhesia.Storage.Adapters.Postgres.EventsTest do
use ExUnit.Case, async: true
alias Parrhesia.Protocol.EventValidator
alias Parrhesia.Storage.Adapters.Postgres.Events
test "normalize_event/1 decodes hex fields and extracts d_tag/expiration" do
pubkey = String.duplicate("a", 64)
event = %{
"pubkey" => pubkey,
"created_at" => 1_700_000_000,
"kind" => 30_023,
"tags" => [["d", "profile"], ["expiration", "1700001000"], ["p", String.duplicate("b", 64)]],
"content" => "hello",
"sig" => String.duplicate("c", 128)
}
id = EventValidator.compute_id(event)
event = Map.put(event, "id", id)
assert {:ok, normalized} = Events.normalize_event(event)
assert normalized.created_at == 1_700_000_000
assert normalized.kind == 30_023
assert normalized.d_tag == "profile"
assert normalized.expires_at == 1_700_001_000
assert normalized.id == Base.decode16!(id, case: :lower)
assert normalized.pubkey == Base.decode16!(pubkey, case: :lower)
end
test "candidate_wins_state?/2 uses created_at then lexical id tie-break" do
assert Events.candidate_wins_state?(
%{created_at: 11, id: <<2>>},
%{event_created_at: 10, event_id: <<255>>}
)
refute Events.candidate_wins_state?(
%{created_at: 9, id: <<0>>},
%{event_created_at: 10, event_id: <<255>>}
)
assert Events.candidate_wins_state?(
%{created_at: 10, id: <<1>>},
%{event_created_at: 10, event_id: <<2>>}
)
refute Events.candidate_wins_state?(
%{created_at: 10, id: <<3>>},
%{event_created_at: 10, event_id: <<2>>}
)
end
end