diff --git a/PROGRESS.md b/PROGRESS.md index 8b67389..e8c3428 100644 --- a/PROGRESS.md +++ b/PROGRESS.md @@ -22,7 +22,7 @@ Implementation checklist for Parrhesia relay. - [x] Define `Parrhesia.Storage.*` behaviors (events/moderation/groups/admin) - [ ] Implement Postgres adapter modules behind behaviors -- [ ] Create migrations for events, tags, moderation, membership +- [x] Create migrations for events, tags, moderation, membership - [ ] Implement replaceable/addressable semantics at storage layer - [ ] Add adapter contract test suite diff --git a/lib/parrhesia/repo.ex b/lib/parrhesia/repo.ex new file mode 100644 index 0000000..a871f8b --- /dev/null +++ b/lib/parrhesia/repo.ex @@ -0,0 +1,12 @@ +defmodule Parrhesia.Repo do + @moduledoc """ + PostgreSQL repository for storage adapter persistence. + + Note: the repo is not yet started by the supervision tree while the + storage adapter is in staged implementation. + """ + + use Ecto.Repo, + otp_app: :parrhesia, + adapter: Ecto.Adapters.Postgres +end diff --git a/lib/parrhesia/storage/adapters/postgres/events.ex b/lib/parrhesia/storage/adapters/postgres/events.ex index afc09ee..bef0b4a 100644 --- a/lib/parrhesia/storage/adapters/postgres/events.ex +++ b/lib/parrhesia/storage/adapters/postgres/events.ex @@ -1,18 +1,80 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Events do @moduledoc """ PostgreSQL-backed implementation for `Parrhesia.Storage.Events`. - - Implementation is intentionally staged; callbacks currently return - `{:error, :not_implemented}` until migrations and query paths land. """ + import Ecto.Query + + alias Parrhesia.Repo + @behaviour Parrhesia.Storage.Events - @impl true - def put_event(_context, _event), do: {:error, :not_implemented} + @type normalized_event :: %{ + id: binary(), + pubkey: binary(), + created_at: non_neg_integer(), + kind: non_neg_integer(), + content: String.t(), + sig: binary(), + d_tag: String.t(), + expires_at: non_neg_integer() | nil, + tags: [list(String.t())] + } @impl true - def get_event(_context, _event_id), do: {:error, :not_implemented} + def put_event(_context, event) do + with {:ok, normalized} <- normalize_event(event) do + now = DateTime.utc_now() |> DateTime.truncate(:microsecond) + + Repo.transaction(fn -> + insert_event_id!(normalized, now) + insert_event!(normalized, now) + insert_tags!(normalized, now) + upsert_state_tables!(normalized, now) + + event + end) + |> case do + {:ok, persisted_event} -> {:ok, persisted_event} + {:error, :duplicate_event} -> {:error, :duplicate_event} + {:error, reason} -> {:error, reason} + end + end + end + + @impl true + def get_event(_context, event_id) do + with {:ok, decoded_event_id} <- decode_hex(event_id, 32, :invalid_event_id) do + event_query = + from(event in "events", + where: event.id == ^decoded_event_id and is_nil(event.deleted_at), + order_by: [desc: event.created_at], + limit: 1, + select: %{ + id: event.id, + pubkey: event.pubkey, + created_at: event.created_at, + kind: event.kind, + content: event.content, + sig: event.sig + } + ) + + case Repo.one(event_query) do + nil -> + {:ok, nil} + + persisted_event -> + tags = load_tags([{persisted_event.created_at, persisted_event.id}]) + + {:ok, + to_nostr_event( + persisted_event, + Map.get(tags, {persisted_event.created_at, persisted_event.id}, []) + )} + end + end + end @impl true def query(_context, _filters, _opts), do: {:error, :not_implemented} @@ -27,5 +89,377 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Events do def vanish(_context, _event), do: {:error, :not_implemented} @impl true - def purge_expired(_opts), do: {:error, :not_implemented} + def purge_expired(opts) when is_list(opts) do + now = Keyword.get(opts, :now, System.system_time(:second)) + + query = + from(event in "events", + where: not is_nil(event.expires_at) and event.expires_at <= ^now + ) + + {count, _result} = Repo.delete_all(query) + {:ok, count} + end + + @doc false + @spec normalize_event(map()) :: {:ok, normalized_event()} | {:error, atom()} + def normalize_event(event) when is_map(event) do + with {:ok, id} <- decode_hex(Map.get(event, "id"), 32, :invalid_event_id), + {:ok, pubkey} <- decode_hex(Map.get(event, "pubkey"), 32, :invalid_pubkey), + {:ok, sig} <- decode_hex(Map.get(event, "sig"), 64, :invalid_sig), + {:ok, created_at} <- + validate_non_negative_integer(Map.get(event, "created_at"), :invalid_created_at), + {:ok, kind} <- validate_non_negative_integer(Map.get(event, "kind"), :invalid_kind), + {:ok, content} <- validate_binary(Map.get(event, "content"), :invalid_content), + {:ok, tags} <- validate_tags(Map.get(event, "tags")) do + {:ok, + %{ + id: id, + pubkey: pubkey, + created_at: created_at, + kind: kind, + content: content, + sig: sig, + d_tag: extract_d_tag(tags), + expires_at: extract_expiration(tags), + tags: tags + }} + end + end + + def normalize_event(_event), do: {:error, :invalid_event} + + @doc false + @spec candidate_wins_state?(normalized_event(), map()) :: boolean() + def candidate_wins_state?(candidate, current) do + cond do + candidate.created_at > current.event_created_at -> true + candidate.created_at < current.event_created_at -> false + true -> candidate.id < current.event_id + end + end + + defp insert_event_id!(normalized_event, now) do + {inserted, _result} = + Repo.insert_all( + "event_ids", + [ + %{ + id: normalized_event.id, + created_at: normalized_event.created_at, + inserted_at: now + } + ], + on_conflict: :nothing, + conflict_target: [:id] + ) + + if inserted == 1 do + :ok + else + Repo.rollback(:duplicate_event) + end + end + + defp insert_event!(normalized_event, now) do + {inserted, _result} = + Repo.insert_all("events", [event_row(normalized_event, now)]) + + if inserted == 1 do + :ok + else + Repo.rollback(:event_insert_failed) + end + end + + defp insert_tags!(normalized_event, now) do + tag_rows = + normalized_event.tags + |> Enum.with_index() + |> Enum.flat_map(fn {tag, idx} -> + case tag do + [name, value | _rest] when is_binary(name) and is_binary(value) -> + [ + %{ + event_created_at: normalized_event.created_at, + event_id: normalized_event.id, + name: name, + value: value, + idx: idx, + inserted_at: now + } + ] + + _other -> + [] + end + end) + + if tag_rows == [] do + :ok + else + {inserted, _result} = Repo.insert_all("event_tags", tag_rows) + + if inserted == length(tag_rows) do + :ok + else + Repo.rollback(:tag_insert_failed) + end + end + end + + defp upsert_state_tables!(normalized_event, now) do + :ok = maybe_upsert_replaceable_state(normalized_event, now) + :ok = maybe_upsert_addressable_state(normalized_event, now) + :ok + end + + defp maybe_upsert_replaceable_state(normalized_event, now) do + if replaceable_kind?(normalized_event.kind) do + lookup_query = + from(state in "replaceable_event_state", + where: + state.pubkey == ^normalized_event.pubkey and state.kind == ^normalized_event.kind, + select: %{event_created_at: state.event_created_at, event_id: state.event_id} + ) + + update_query = + from(state in "replaceable_event_state", + where: + state.pubkey == ^normalized_event.pubkey and + state.kind == ^normalized_event.kind + ) + + upsert_state_table( + "replaceable_event_state", + lookup_query, + update_query, + replaceable_state_row(normalized_event, now), + normalized_event, + now, + :replaceable_state_update_failed + ) + else + :ok + end + end + + defp maybe_upsert_addressable_state(normalized_event, now) do + if addressable_kind?(normalized_event.kind) do + lookup_query = + from(state in "addressable_event_state", + where: + state.pubkey == ^normalized_event.pubkey and + state.kind == ^normalized_event.kind and + state.d_tag == ^normalized_event.d_tag, + select: %{event_created_at: state.event_created_at, event_id: state.event_id} + ) + + update_query = + from(state in "addressable_event_state", + where: + state.pubkey == ^normalized_event.pubkey and + state.kind == ^normalized_event.kind and + state.d_tag == ^normalized_event.d_tag + ) + + upsert_state_table( + "addressable_event_state", + lookup_query, + update_query, + addressable_state_row(normalized_event, now), + normalized_event, + now, + :addressable_state_update_failed + ) + else + :ok + end + end + + defp upsert_state_table( + table_name, + lookup_query, + update_query, + insert_row, + normalized_event, + now, + failure_reason + ) do + case Repo.one(lookup_query) do + nil -> + {inserted, _result} = Repo.insert_all(table_name, [insert_row], on_conflict: :nothing) + + if inserted <= 1 do + :ok + else + Repo.rollback(failure_reason) + end + + current_state -> + maybe_update_state(update_query, normalized_event, current_state, now, failure_reason) + end + end + + defp maybe_update_state(update_query, normalized_event, current_state, now, failure_reason) do + if candidate_wins_state?(normalized_event, current_state) do + {updated, _result} = + Repo.update_all(update_query, + set: [ + event_created_at: normalized_event.created_at, + event_id: normalized_event.id, + updated_at: now + ] + ) + + if updated == 1 do + :ok + else + Repo.rollback(failure_reason) + end + else + :ok + end + end + + defp replaceable_kind?(kind), do: kind in [0, 3] or (kind >= 10_000 and kind < 20_000) + + defp addressable_kind?(kind), do: kind >= 30_000 and kind < 40_000 + + defp replaceable_state_row(normalized_event, now) do + %{ + pubkey: normalized_event.pubkey, + kind: normalized_event.kind, + event_created_at: normalized_event.created_at, + event_id: normalized_event.id, + inserted_at: now, + updated_at: now + } + end + + defp addressable_state_row(normalized_event, now) do + %{ + pubkey: normalized_event.pubkey, + kind: normalized_event.kind, + d_tag: normalized_event.d_tag, + event_created_at: normalized_event.created_at, + event_id: normalized_event.id, + inserted_at: now, + updated_at: now + } + end + + defp event_row(normalized_event, now) do + %{ + id: normalized_event.id, + pubkey: normalized_event.pubkey, + created_at: normalized_event.created_at, + kind: normalized_event.kind, + content: normalized_event.content, + sig: normalized_event.sig, + d_tag: normalized_event.d_tag, + expires_at: normalized_event.expires_at, + inserted_at: now, + updated_at: now + } + end + + defp load_tags(event_keys) when is_list(event_keys) do + created_at_values = Enum.map(event_keys, fn {created_at, _event_id} -> created_at end) + event_id_values = Enum.map(event_keys, fn {_created_at, event_id} -> event_id end) + + query = + from(tag in "event_tags", + where: tag.event_created_at in ^created_at_values and tag.event_id in ^event_id_values, + order_by: [asc: tag.idx], + select: %{ + event_created_at: tag.event_created_at, + event_id: tag.event_id, + name: tag.name, + value: tag.value + } + ) + + query + |> Repo.all() + |> Enum.group_by( + fn tag -> {tag.event_created_at, tag.event_id} end, + fn tag -> [tag.name, tag.value] end + ) + end + + defp to_nostr_event(persisted_event, tags) do + %{ + "id" => Base.encode16(persisted_event.id, case: :lower), + "pubkey" => Base.encode16(persisted_event.pubkey, case: :lower), + "created_at" => persisted_event.created_at, + "kind" => persisted_event.kind, + "tags" => tags, + "content" => persisted_event.content, + "sig" => Base.encode16(persisted_event.sig, case: :lower) + } + end + + defp decode_hex(value, bytes, reason) when is_binary(value) do + if byte_size(value) == bytes * 2 do + case Base.decode16(value, case: :mixed) do + {:ok, decoded} -> {:ok, decoded} + :error -> {:error, reason} + end + else + {:error, reason} + end + end + + defp decode_hex(_value, _bytes, reason), do: {:error, reason} + + defp validate_non_negative_integer(value, _reason) + when is_integer(value) and value >= 0, + do: {:ok, value} + + defp validate_non_negative_integer(_value, reason), do: {:error, reason} + + defp validate_binary(value, _reason) when is_binary(value), do: {:ok, value} + defp validate_binary(_value, reason), do: {:error, reason} + + defp validate_tags(tags) when is_list(tags) do + if Enum.all?(tags, &valid_tag?/1) do + {:ok, tags} + else + {:error, :invalid_tags} + end + end + + defp validate_tags(_tags), do: {:error, :invalid_tags} + + defp valid_tag?(tag) when is_list(tag) do + tag != [] and Enum.all?(tag, &is_binary/1) + end + + defp valid_tag?(_tag), do: false + + defp extract_d_tag(tags) do + tags + |> Enum.find_value("", fn + ["d", value | _rest] when is_binary(value) -> value + _tag -> nil + end) + end + + defp extract_expiration(tags) do + tags + |> Enum.find_value(fn + ["expiration", unix_seconds | _rest] -> parse_unix_seconds(unix_seconds) + _tag -> nil + end) + end + + defp parse_unix_seconds(unix_seconds) when is_binary(unix_seconds) do + case Integer.parse(unix_seconds) do + {parsed, ""} when parsed >= 0 -> parsed + _other -> nil + end + end + + defp parse_unix_seconds(_unix_seconds), do: nil end diff --git a/priv/repo/migrations/20260313192433_create_relay_storage.exs b/priv/repo/migrations/20260313192433_create_relay_storage.exs new file mode 100644 index 0000000..6c4fb19 --- /dev/null +++ b/priv/repo/migrations/20260313192433_create_relay_storage.exs @@ -0,0 +1,159 @@ +defmodule Parrhesia.Repo.Migrations.CreateRelayStorage do + use Ecto.Migration + + def up do + create table(:event_ids, primary_key: false) do + add(:id, :binary, primary_key: true) + add(:created_at, :bigint, null: false) + timestamps(updated_at: false, type: :utc_datetime_usec) + end + + create table(:events, primary_key: false, options: "PARTITION BY RANGE (created_at)") do + add(:created_at, :bigint, primary_key: true) + add(:id, :binary, primary_key: true) + add(:pubkey, :binary, null: false) + add(:kind, :integer, null: false) + add(:content, :text, null: false) + add(:sig, :binary, null: false) + add(:d_tag, :text) + add(:deleted_at, :bigint) + add(:expires_at, :bigint) + timestamps(type: :utc_datetime_usec) + end + + execute("CREATE TABLE events_default PARTITION OF events DEFAULT") + + execute("CREATE INDEX events_kind_created_at_idx ON events (kind, created_at DESC)") + execute("CREATE INDEX events_pubkey_created_at_idx ON events (pubkey, created_at DESC)") + execute("CREATE INDEX events_created_at_idx ON events (created_at DESC)") + create(index(:events, [:id])) + create(index(:events, [:expires_at], where: "expires_at IS NOT NULL")) + create(index(:events, [:deleted_at], where: "deleted_at IS NOT NULL")) + + create table(:event_tags, primary_key: false) do + add(:event_created_at, :bigint, null: false) + add(:event_id, :binary, null: false) + add(:name, :string, null: false) + add(:value, :string, null: false) + add(:idx, :integer, null: false) + timestamps(updated_at: false, type: :utc_datetime_usec) + end + + execute(""" + ALTER TABLE event_tags + ADD CONSTRAINT event_tags_event_fk + FOREIGN KEY (event_created_at, event_id) + REFERENCES events (created_at, id) + ON DELETE CASCADE + """) + + create(unique_index(:event_tags, [:event_created_at, :event_id, :idx])) + + execute( + "CREATE INDEX event_tags_name_value_created_at_idx ON event_tags (name, value, event_created_at DESC)" + ) + + create(index(:event_tags, [:event_id])) + + create table(:replaceable_event_state, primary_key: false) do + add(:pubkey, :binary, primary_key: true) + add(:kind, :integer, primary_key: true) + add(:event_created_at, :bigint, null: false) + add(:event_id, :binary, null: false) + timestamps(type: :utc_datetime_usec) + end + + create table(:addressable_event_state, primary_key: false) do + add(:pubkey, :binary, primary_key: true) + add(:kind, :integer, primary_key: true) + add(:d_tag, :text, primary_key: true) + add(:event_created_at, :bigint, null: false) + add(:event_id, :binary, null: false) + timestamps(type: :utc_datetime_usec) + end + + create table(:banned_pubkeys, primary_key: false) do + add(:pubkey, :binary, primary_key: true) + timestamps(updated_at: false, type: :utc_datetime_usec) + end + + create table(:allowed_pubkeys, primary_key: false) do + add(:pubkey, :binary, primary_key: true) + timestamps(updated_at: false, type: :utc_datetime_usec) + end + + create table(:banned_events, primary_key: false) do + add(:event_id, :binary, primary_key: true) + timestamps(updated_at: false, type: :utc_datetime_usec) + end + + create table(:blocked_ips, primary_key: false) do + add(:ip, :inet, primary_key: true) + timestamps(updated_at: false, type: :utc_datetime_usec) + end + + create table(:relay_groups, primary_key: false) do + add(:group_id, :text, primary_key: true) + add(:metadata, :map, null: false, default: %{}) + timestamps(type: :utc_datetime_usec) + end + + create table(:group_memberships, primary_key: false) do + add( + :group_id, + references(:relay_groups, column: :group_id, type: :text, on_delete: :delete_all), + primary_key: true + ) + + add(:pubkey, :binary, primary_key: true) + add(:role, :string, null: false) + add(:metadata, :map, null: false, default: %{}) + timestamps(type: :utc_datetime_usec) + end + + create(index(:group_memberships, [:pubkey])) + + create table(:group_roles, primary_key: false) do + add( + :group_id, + references(:relay_groups, column: :group_id, type: :text, on_delete: :delete_all), + primary_key: true + ) + + add(:pubkey, :binary, primary_key: true) + add(:role, :string, primary_key: true) + add(:metadata, :map, null: false, default: %{}) + timestamps(type: :utc_datetime_usec) + end + + create table(:management_audit_logs) do + add(:actor_pubkey, :binary) + add(:method, :string, null: false) + add(:params, :map, null: false, default: %{}) + add(:result, :map) + timestamps(updated_at: false, type: :utc_datetime_usec) + end + + create(index(:management_audit_logs, [:actor_pubkey, :inserted_at])) + create(index(:management_audit_logs, [:method, :inserted_at])) + end + + def down do + drop(table(:management_audit_logs)) + drop(table(:group_roles)) + drop(table(:group_memberships)) + drop(table(:relay_groups)) + drop(table(:blocked_ips)) + drop(table(:banned_events)) + drop(table(:allowed_pubkeys)) + drop(table(:banned_pubkeys)) + drop(table(:addressable_event_state)) + drop(table(:replaceable_event_state)) + drop(table(:event_tags)) + + execute("DROP TABLE events_default") + drop(table(:events)) + + drop(table(:event_ids)) + end +end diff --git a/priv/repo/seeds.exs b/priv/repo/seeds.exs new file mode 100644 index 0000000..8ee8d78 --- /dev/null +++ b/priv/repo/seeds.exs @@ -0,0 +1,3 @@ +# Seed data for Parrhesia. +# +# Intentionally empty for now. diff --git a/test/parrhesia/storage/adapters/postgres/events_test.exs b/test/parrhesia/storage/adapters/postgres/events_test.exs new file mode 100644 index 0000000..b58fe27 --- /dev/null +++ b/test/parrhesia/storage/adapters/postgres/events_test.exs @@ -0,0 +1,52 @@ +defmodule Parrhesia.Storage.Adapters.Postgres.EventsTest do + use ExUnit.Case, async: true + + alias Parrhesia.Protocol.EventValidator + alias Parrhesia.Storage.Adapters.Postgres.Events + + test "normalize_event/1 decodes hex fields and extracts d_tag/expiration" do + pubkey = String.duplicate("a", 64) + + event = %{ + "pubkey" => pubkey, + "created_at" => 1_700_000_000, + "kind" => 30_023, + "tags" => [["d", "profile"], ["expiration", "1700001000"], ["p", String.duplicate("b", 64)]], + "content" => "hello", + "sig" => String.duplicate("c", 128) + } + + id = EventValidator.compute_id(event) + event = Map.put(event, "id", id) + + assert {:ok, normalized} = Events.normalize_event(event) + assert normalized.created_at == 1_700_000_000 + assert normalized.kind == 30_023 + assert normalized.d_tag == "profile" + assert normalized.expires_at == 1_700_001_000 + assert normalized.id == Base.decode16!(id, case: :lower) + assert normalized.pubkey == Base.decode16!(pubkey, case: :lower) + end + + test "candidate_wins_state?/2 uses created_at then lexical id tie-break" do + assert Events.candidate_wins_state?( + %{created_at: 11, id: <<2>>}, + %{event_created_at: 10, event_id: <<255>>} + ) + + refute Events.candidate_wins_state?( + %{created_at: 9, id: <<0>>}, + %{event_created_at: 10, event_id: <<255>>} + ) + + assert Events.candidate_wins_state?( + %{created_at: 10, id: <<1>>}, + %{event_created_at: 10, event_id: <<2>>} + ) + + refute Events.candidate_wins_state?( + %{created_at: 10, id: <<3>>}, + %{event_created_at: 10, event_id: <<2>>} + ) + end +end