diff --git a/config/config.exs b/config/config.exs index 7ca71a3..c9d0891 100644 --- a/config/config.exs +++ b/config/config.exs @@ -29,4 +29,6 @@ config :parrhesia, config :parrhesia, Parrhesia.Web.Endpoint, port: 4000 +config :parrhesia, ecto_repos: [Parrhesia.Repo] + import_config "#{config_env()}.exs" diff --git a/config/dev.exs b/config/dev.exs index fd08519..b2afdef 100644 --- a/config/dev.exs +++ b/config/dev.exs @@ -1,3 +1,25 @@ import Config config :logger, :console, format: "[$level] $message\n" + +pg_host = System.get_env("PGHOST") + +repo_host_opts = + if is_binary(pg_host) and String.starts_with?(pg_host, "/") do + [socket_dir: pg_host] + else + [ + hostname: pg_host || "localhost", + port: String.to_integer(System.get_env("PGPORT") || "5432") + ] + end + +config :parrhesia, + Parrhesia.Repo, + [ + username: System.get_env("PGUSER") || System.get_env("USER") || "agent", + password: System.get_env("PGPASSWORD"), + database: System.get_env("PGDATABASE") || "parrhesia_dev", + show_sensitive_data_on_connection_error: true, + pool_size: 10 + ] ++ repo_host_opts diff --git a/config/prod.exs b/config/prod.exs index becde76..d4763a8 100644 --- a/config/prod.exs +++ b/config/prod.exs @@ -1 +1,9 @@ import Config + +database_url = + System.get_env("DATABASE_URL") || + raise "environment variable DATABASE_URL is missing. Example: ecto://USER:PASS@HOST/DATABASE" + +config :parrhesia, Parrhesia.Repo, + url: database_url, + pool_size: String.to_integer(System.get_env("POOL_SIZE") || "10") diff --git a/config/test.exs b/config/test.exs index f533db2..6577b78 100644 --- a/config/test.exs +++ b/config/test.exs @@ -5,3 +5,25 @@ config :logger, level: :warning config :parrhesia, Parrhesia.Web.Endpoint, port: 0, ip: {127, 0, 0, 1} + +pg_host = System.get_env("PGHOST") + +repo_host_opts = + if is_binary(pg_host) and String.starts_with?(pg_host, "/") do + [socket_dir: pg_host] + else + [ + hostname: pg_host || "localhost", + port: String.to_integer(System.get_env("PGPORT") || "5432") + ] + end + +config :parrhesia, + Parrhesia.Repo, + [ + username: System.get_env("PGUSER") || System.get_env("USER") || "agent", + password: System.get_env("PGPASSWORD"), + database: System.get_env("PGDATABASE") || "parrhesia_test", + pool: Ecto.Adapters.SQL.Sandbox, + pool_size: 10 + ] ++ repo_host_opts diff --git a/lib/parrhesia/storage/adapters/postgres/events.ex b/lib/parrhesia/storage/adapters/postgres/events.ex index bef0b4a..5467ceb 100644 --- a/lib/parrhesia/storage/adapters/postgres/events.ex +++ b/lib/parrhesia/storage/adapters/postgres/events.ex @@ -5,6 +5,7 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Events do import Ecto.Query + alias Parrhesia.Protocol.Filter alias Parrhesia.Repo @behaviour Parrhesia.Storage.Events @@ -77,10 +78,55 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Events do end @impl true - def query(_context, _filters, _opts), do: {:error, :not_implemented} + def query(_context, filters, opts) when is_list(opts) do + with :ok <- Filter.validate_filters(filters) do + now = Keyword.get(opts, :now, System.system_time(:second)) + + persisted_events = + filters + |> Enum.flat_map(fn filter -> + filter + |> event_query_for_filter(now, opts) + |> Repo.all() + end) + |> deduplicate_events() + |> sort_persisted_events() + |> maybe_apply_query_limit(opts) + + event_keys = Enum.map(persisted_events, fn event -> {event.created_at, event.id} end) + tags_by_event = load_tags(event_keys) + + nostr_events = + Enum.map(persisted_events, fn event -> + to_nostr_event(event, Map.get(tags_by_event, {event.created_at, event.id}, [])) + end) + + {:ok, nostr_events} + end + end + + def query(_context, _filters, _opts), do: {:error, :invalid_opts} @impl true - def count(_context, _filters, _opts), do: {:error, :not_implemented} + def count(_context, filters, opts) when is_list(opts) do + with :ok <- Filter.validate_filters(filters) do + now = Keyword.get(opts, :now, System.system_time(:second)) + + total_count = + filters + |> Enum.flat_map(fn filter -> + filter + |> event_id_query_for_filter(now) + |> Repo.all() + end) + |> MapSet.new() + |> MapSet.size() + + {:ok, total_count} + end + end + + def count(_context, _filters, _opts), do: {:error, :invalid_opts} @impl true def delete_by_request(_context, _event), do: {:error, :not_implemented} @@ -364,6 +410,145 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Events do } end + defp event_query_for_filter(filter, now, opts) do + base_query = + from(event in "events", + where: is_nil(event.deleted_at) and (is_nil(event.expires_at) or event.expires_at > ^now), + order_by: [desc: event.created_at, asc: event.id], + select: %{ + id: event.id, + pubkey: event.pubkey, + created_at: event.created_at, + kind: event.kind, + content: event.content, + sig: event.sig + } + ) + + query = + base_query + |> maybe_filter_ids(Map.get(filter, "ids")) + |> maybe_filter_authors(Map.get(filter, "authors")) + |> maybe_filter_kinds(Map.get(filter, "kinds")) + |> maybe_filter_since(Map.get(filter, "since")) + |> maybe_filter_until(Map.get(filter, "until")) + |> filter_by_tags(filter) + + maybe_limit_query(query, effective_filter_limit(filter, opts)) + end + + defp event_id_query_for_filter(filter, now) do + from(event in "events", + where: is_nil(event.deleted_at) and (is_nil(event.expires_at) or event.expires_at > ^now), + select: event.id + ) + |> maybe_filter_ids(Map.get(filter, "ids")) + |> maybe_filter_authors(Map.get(filter, "authors")) + |> maybe_filter_kinds(Map.get(filter, "kinds")) + |> maybe_filter_since(Map.get(filter, "since")) + |> maybe_filter_until(Map.get(filter, "until")) + |> filter_by_tags(filter) + end + + defp maybe_filter_ids(query, nil), do: query + + defp maybe_filter_ids(query, ids) do + decoded_ids = decode_hex_list(ids, :lower) + where(query, [event], event.id in ^decoded_ids) + end + + defp maybe_filter_authors(query, nil), do: query + + defp maybe_filter_authors(query, authors) do + decoded_authors = decode_hex_list(authors, :lower) + where(query, [event], event.pubkey in ^decoded_authors) + end + + defp maybe_filter_kinds(query, nil), do: query + defp maybe_filter_kinds(query, kinds), do: where(query, [event], event.kind in ^kinds) + + defp maybe_filter_since(query, nil), do: query + defp maybe_filter_since(query, since), do: where(query, [event], event.created_at >= ^since) + + defp maybe_filter_until(query, nil), do: query + defp maybe_filter_until(query, until), do: where(query, [event], event.created_at <= ^until) + + defp filter_by_tags(query, filter) do + filter + |> tag_filters() + |> Enum.reduce(query, fn {tag_name, values}, acc -> + where( + acc, + [event], + fragment( + "EXISTS (SELECT 1 FROM event_tags AS tag WHERE tag.event_created_at = ? AND tag.event_id = ? AND tag.name = ? AND tag.value = ANY(?))", + event.created_at, + event.id, + ^tag_name, + type(^values, {:array, :string}) + ) + ) + end) + end + + defp tag_filters(filter) do + Enum.reduce(filter, [], fn + {<<"#", tag_name::binary-size(1)>>, values}, acc -> [{tag_name, values} | acc] + _entry, acc -> acc + end) + end + + defp decode_hex_list(values, decode_case) when is_list(values) do + Enum.map(values, &Base.decode16!(&1, case: decode_case)) + end + + defp effective_filter_limit(filter, opts) do + filter_limit = Map.get(filter, "limit") + max_filter_limit = Keyword.get(opts, :max_filter_limit) + + cond do + is_integer(filter_limit) and is_integer(max_filter_limit) -> + min(filter_limit, max_filter_limit) + + is_integer(filter_limit) -> + filter_limit + + is_integer(max_filter_limit) -> + max_filter_limit + + true -> + nil + end + end + + defp maybe_limit_query(query, nil), do: query + defp maybe_limit_query(query, limit), do: limit(query, ^limit) + + defp deduplicate_events(events) do + events + |> Enum.reduce(%{}, fn event, acc -> Map.put_new(acc, event.id, event) end) + |> Map.values() + end + + defp sort_persisted_events(events) do + Enum.sort(events, fn left, right -> + cond do + left.created_at > right.created_at -> true + left.created_at < right.created_at -> false + true -> left.id < right.id + end + end) + end + + defp maybe_apply_query_limit(events, opts) do + case Keyword.get(opts, :limit) do + limit when is_integer(limit) and limit > 0 -> Enum.take(events, limit) + _other -> events + end + end + + defp load_tags([]), do: %{} + defp load_tags(event_keys) when is_list(event_keys) do created_at_values = Enum.map(event_keys, fn {created_at, _event_id} -> created_at end) event_id_values = Enum.map(event_keys, fn {_created_at, event_id} -> event_id end) diff --git a/test/parrhesia/storage/adapters/postgres/events_query_count_test.exs b/test/parrhesia/storage/adapters/postgres/events_query_count_test.exs new file mode 100644 index 0000000..29f350b --- /dev/null +++ b/test/parrhesia/storage/adapters/postgres/events_query_count_test.exs @@ -0,0 +1,174 @@ +defmodule Parrhesia.Storage.Adapters.Postgres.EventsQueryCountTest do + use ExUnit.Case, async: false + + alias Ecto.Adapters.SQL.Sandbox + alias Parrhesia.Protocol.EventValidator + alias Parrhesia.Repo + alias Parrhesia.Storage.Adapters.Postgres.Events + + setup_all do + start_supervised!(Repo) + Sandbox.mode(Repo, :manual) + :ok + end + + setup do + :ok = Sandbox.checkout(Repo) + end + + test "query/3 translates NIP filters including tag filters" do + author = String.duplicate("a", 64) + other_author = String.duplicate("b", 64) + target_pubkey = String.duplicate("c", 64) + other_target = String.duplicate("d", 64) + referenced_event = String.duplicate("e", 64) + + matching = + persist_event(%{ + "pubkey" => author, + "created_at" => 1_700_000_000, + "kind" => 1, + "tags" => [["p", target_pubkey], ["e", referenced_event], ["x", "topic"]], + "content" => "matching" + }) + + _non_matching_tag = + persist_event(%{ + "pubkey" => author, + "created_at" => 1_700_000_001, + "kind" => 1, + "tags" => [["p", other_target], ["e", referenced_event]], + "content" => "other-target" + }) + + _non_matching_author = + persist_event(%{ + "pubkey" => other_author, + "created_at" => 1_700_000_002, + "kind" => 1, + "tags" => [["p", target_pubkey], ["e", referenced_event]], + "content" => "other-author" + }) + + filters = [ + %{ + "authors" => [author], + "kinds" => [1], + "#p" => [target_pubkey], + "#e" => [referenced_event] + } + ] + + assert {:ok, [result]} = Events.query(%{}, filters, []) + assert result["id"] == matching["id"] + assert ["p", target_pubkey] in result["tags"] + assert ["e", referenced_event] in result["tags"] + end + + test "query/3 applies filter limit and deterministic tie-break ordering" do + author = String.duplicate("1", 64) + + tie_a = + persist_event(%{ + "pubkey" => author, + "created_at" => 1_700_000_100, + "kind" => 1, + "tags" => [], + "content" => "tie-a" + }) + + tie_b = + persist_event(%{ + "pubkey" => author, + "created_at" => 1_700_000_100, + "kind" => 1, + "tags" => [], + "content" => "tie-b" + }) + + newest = + persist_event(%{ + "pubkey" => author, + "created_at" => 1_700_000_101, + "kind" => 1, + "tags" => [], + "content" => "newest" + }) + + filters = [%{"authors" => [author], "kinds" => [1], "limit" => 2}] + + assert {:ok, results} = Events.query(%{}, filters, []) + + tie_winner_id = Enum.min([tie_a["id"], tie_b["id"]]) + assert Enum.map(results, & &1["id"]) == [newest["id"], tie_winner_id] + end + + test "count/3 ORs filters, deduplicates matches and respects tag filters" do + now = 1_700_001_000 + target_pubkey = String.duplicate("f", 64) + referenced_event = String.duplicate("0", 64) + + matching = + persist_event(%{ + "pubkey" => String.duplicate("2", 64), + "created_at" => 1_700_000_200, + "kind" => 7, + "tags" => [["p", target_pubkey], ["e", referenced_event]], + "content" => "reaction" + }) + + another_match = + persist_event(%{ + "pubkey" => String.duplicate("3", 64), + "created_at" => 1_700_000_201, + "kind" => 7, + "tags" => [["p", target_pubkey]], + "content" => "reaction-2" + }) + + _expired = + persist_event(%{ + "pubkey" => String.duplicate("4", 64), + "created_at" => 1_700_000_199, + "kind" => 7, + "tags" => [["p", target_pubkey], ["expiration", Integer.to_string(now - 1)]], + "content" => "expired" + }) + + _non_matching = + persist_event(%{ + "pubkey" => String.duplicate("5", 64), + "created_at" => 1_700_000_202, + "kind" => 7, + "tags" => [["p", String.duplicate("6", 64)]], + "content" => "other" + }) + + filters = [ + %{"kinds" => [7], "#p" => [target_pubkey], "#e" => [referenced_event]}, + %{"ids" => [matching["id"], another_match["id"]]} + ] + + assert {:ok, 2} = Events.count(%{}, filters, now: now) + end + + defp persist_event(overrides) do + event = build_event(overrides) + assert {:ok, _persisted} = Events.put_event(%{}, event) + event + end + + defp build_event(overrides) do + base_event = %{ + "pubkey" => String.duplicate("7", 64), + "created_at" => System.system_time(:second), + "kind" => 1, + "tags" => [], + "content" => "content-#{System.unique_integer([:positive])}", + "sig" => String.duplicate("8", 128) + } + + event = Map.merge(base_event, overrides) + Map.put(event, "id", EventValidator.compute_id(event)) + end +end