storage: implement postgres event query/count filter translation
This commit is contained in:
@@ -29,4 +29,6 @@ config :parrhesia,
|
|||||||
|
|
||||||
config :parrhesia, Parrhesia.Web.Endpoint, port: 4000
|
config :parrhesia, Parrhesia.Web.Endpoint, port: 4000
|
||||||
|
|
||||||
|
config :parrhesia, ecto_repos: [Parrhesia.Repo]
|
||||||
|
|
||||||
import_config "#{config_env()}.exs"
|
import_config "#{config_env()}.exs"
|
||||||
|
|||||||
@@ -1,3 +1,25 @@
|
|||||||
import Config
|
import Config
|
||||||
|
|
||||||
config :logger, :console, format: "[$level] $message\n"
|
config :logger, :console, format: "[$level] $message\n"
|
||||||
|
|
||||||
|
pg_host = System.get_env("PGHOST")
|
||||||
|
|
||||||
|
repo_host_opts =
|
||||||
|
if is_binary(pg_host) and String.starts_with?(pg_host, "/") do
|
||||||
|
[socket_dir: pg_host]
|
||||||
|
else
|
||||||
|
[
|
||||||
|
hostname: pg_host || "localhost",
|
||||||
|
port: String.to_integer(System.get_env("PGPORT") || "5432")
|
||||||
|
]
|
||||||
|
end
|
||||||
|
|
||||||
|
config :parrhesia,
|
||||||
|
Parrhesia.Repo,
|
||||||
|
[
|
||||||
|
username: System.get_env("PGUSER") || System.get_env("USER") || "agent",
|
||||||
|
password: System.get_env("PGPASSWORD"),
|
||||||
|
database: System.get_env("PGDATABASE") || "parrhesia_dev",
|
||||||
|
show_sensitive_data_on_connection_error: true,
|
||||||
|
pool_size: 10
|
||||||
|
] ++ repo_host_opts
|
||||||
|
|||||||
@@ -1 +1,9 @@
|
|||||||
import Config
|
import Config
|
||||||
|
|
||||||
|
database_url =
|
||||||
|
System.get_env("DATABASE_URL") ||
|
||||||
|
raise "environment variable DATABASE_URL is missing. Example: ecto://USER:PASS@HOST/DATABASE"
|
||||||
|
|
||||||
|
config :parrhesia, Parrhesia.Repo,
|
||||||
|
url: database_url,
|
||||||
|
pool_size: String.to_integer(System.get_env("POOL_SIZE") || "10")
|
||||||
|
|||||||
@@ -5,3 +5,25 @@ config :logger, level: :warning
|
|||||||
config :parrhesia, Parrhesia.Web.Endpoint,
|
config :parrhesia, Parrhesia.Web.Endpoint,
|
||||||
port: 0,
|
port: 0,
|
||||||
ip: {127, 0, 0, 1}
|
ip: {127, 0, 0, 1}
|
||||||
|
|
||||||
|
pg_host = System.get_env("PGHOST")
|
||||||
|
|
||||||
|
repo_host_opts =
|
||||||
|
if is_binary(pg_host) and String.starts_with?(pg_host, "/") do
|
||||||
|
[socket_dir: pg_host]
|
||||||
|
else
|
||||||
|
[
|
||||||
|
hostname: pg_host || "localhost",
|
||||||
|
port: String.to_integer(System.get_env("PGPORT") || "5432")
|
||||||
|
]
|
||||||
|
end
|
||||||
|
|
||||||
|
config :parrhesia,
|
||||||
|
Parrhesia.Repo,
|
||||||
|
[
|
||||||
|
username: System.get_env("PGUSER") || System.get_env("USER") || "agent",
|
||||||
|
password: System.get_env("PGPASSWORD"),
|
||||||
|
database: System.get_env("PGDATABASE") || "parrhesia_test",
|
||||||
|
pool: Ecto.Adapters.SQL.Sandbox,
|
||||||
|
pool_size: 10
|
||||||
|
] ++ repo_host_opts
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Events do
|
|||||||
|
|
||||||
import Ecto.Query
|
import Ecto.Query
|
||||||
|
|
||||||
|
alias Parrhesia.Protocol.Filter
|
||||||
alias Parrhesia.Repo
|
alias Parrhesia.Repo
|
||||||
|
|
||||||
@behaviour Parrhesia.Storage.Events
|
@behaviour Parrhesia.Storage.Events
|
||||||
@@ -77,10 +78,55 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Events do
|
|||||||
end
|
end
|
||||||
|
|
||||||
@impl true
|
@impl true
|
||||||
def query(_context, _filters, _opts), do: {:error, :not_implemented}
|
def query(_context, filters, opts) when is_list(opts) do
|
||||||
|
with :ok <- Filter.validate_filters(filters) do
|
||||||
|
now = Keyword.get(opts, :now, System.system_time(:second))
|
||||||
|
|
||||||
|
persisted_events =
|
||||||
|
filters
|
||||||
|
|> Enum.flat_map(fn filter ->
|
||||||
|
filter
|
||||||
|
|> event_query_for_filter(now, opts)
|
||||||
|
|> Repo.all()
|
||||||
|
end)
|
||||||
|
|> deduplicate_events()
|
||||||
|
|> sort_persisted_events()
|
||||||
|
|> maybe_apply_query_limit(opts)
|
||||||
|
|
||||||
|
event_keys = Enum.map(persisted_events, fn event -> {event.created_at, event.id} end)
|
||||||
|
tags_by_event = load_tags(event_keys)
|
||||||
|
|
||||||
|
nostr_events =
|
||||||
|
Enum.map(persisted_events, fn event ->
|
||||||
|
to_nostr_event(event, Map.get(tags_by_event, {event.created_at, event.id}, []))
|
||||||
|
end)
|
||||||
|
|
||||||
|
{:ok, nostr_events}
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def query(_context, _filters, _opts), do: {:error, :invalid_opts}
|
||||||
|
|
||||||
@impl true
|
@impl true
|
||||||
def count(_context, _filters, _opts), do: {:error, :not_implemented}
|
def count(_context, filters, opts) when is_list(opts) do
|
||||||
|
with :ok <- Filter.validate_filters(filters) do
|
||||||
|
now = Keyword.get(opts, :now, System.system_time(:second))
|
||||||
|
|
||||||
|
total_count =
|
||||||
|
filters
|
||||||
|
|> Enum.flat_map(fn filter ->
|
||||||
|
filter
|
||||||
|
|> event_id_query_for_filter(now)
|
||||||
|
|> Repo.all()
|
||||||
|
end)
|
||||||
|
|> MapSet.new()
|
||||||
|
|> MapSet.size()
|
||||||
|
|
||||||
|
{:ok, total_count}
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def count(_context, _filters, _opts), do: {:error, :invalid_opts}
|
||||||
|
|
||||||
@impl true
|
@impl true
|
||||||
def delete_by_request(_context, _event), do: {:error, :not_implemented}
|
def delete_by_request(_context, _event), do: {:error, :not_implemented}
|
||||||
@@ -364,6 +410,145 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Events do
|
|||||||
}
|
}
|
||||||
end
|
end
|
||||||
|
|
||||||
|
defp event_query_for_filter(filter, now, opts) do
|
||||||
|
base_query =
|
||||||
|
from(event in "events",
|
||||||
|
where: is_nil(event.deleted_at) and (is_nil(event.expires_at) or event.expires_at > ^now),
|
||||||
|
order_by: [desc: event.created_at, asc: event.id],
|
||||||
|
select: %{
|
||||||
|
id: event.id,
|
||||||
|
pubkey: event.pubkey,
|
||||||
|
created_at: event.created_at,
|
||||||
|
kind: event.kind,
|
||||||
|
content: event.content,
|
||||||
|
sig: event.sig
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
query =
|
||||||
|
base_query
|
||||||
|
|> maybe_filter_ids(Map.get(filter, "ids"))
|
||||||
|
|> maybe_filter_authors(Map.get(filter, "authors"))
|
||||||
|
|> maybe_filter_kinds(Map.get(filter, "kinds"))
|
||||||
|
|> maybe_filter_since(Map.get(filter, "since"))
|
||||||
|
|> maybe_filter_until(Map.get(filter, "until"))
|
||||||
|
|> filter_by_tags(filter)
|
||||||
|
|
||||||
|
maybe_limit_query(query, effective_filter_limit(filter, opts))
|
||||||
|
end
|
||||||
|
|
||||||
|
defp event_id_query_for_filter(filter, now) do
|
||||||
|
from(event in "events",
|
||||||
|
where: is_nil(event.deleted_at) and (is_nil(event.expires_at) or event.expires_at > ^now),
|
||||||
|
select: event.id
|
||||||
|
)
|
||||||
|
|> maybe_filter_ids(Map.get(filter, "ids"))
|
||||||
|
|> maybe_filter_authors(Map.get(filter, "authors"))
|
||||||
|
|> maybe_filter_kinds(Map.get(filter, "kinds"))
|
||||||
|
|> maybe_filter_since(Map.get(filter, "since"))
|
||||||
|
|> maybe_filter_until(Map.get(filter, "until"))
|
||||||
|
|> filter_by_tags(filter)
|
||||||
|
end
|
||||||
|
|
||||||
|
defp maybe_filter_ids(query, nil), do: query
|
||||||
|
|
||||||
|
defp maybe_filter_ids(query, ids) do
|
||||||
|
decoded_ids = decode_hex_list(ids, :lower)
|
||||||
|
where(query, [event], event.id in ^decoded_ids)
|
||||||
|
end
|
||||||
|
|
||||||
|
defp maybe_filter_authors(query, nil), do: query
|
||||||
|
|
||||||
|
defp maybe_filter_authors(query, authors) do
|
||||||
|
decoded_authors = decode_hex_list(authors, :lower)
|
||||||
|
where(query, [event], event.pubkey in ^decoded_authors)
|
||||||
|
end
|
||||||
|
|
||||||
|
defp maybe_filter_kinds(query, nil), do: query
|
||||||
|
defp maybe_filter_kinds(query, kinds), do: where(query, [event], event.kind in ^kinds)
|
||||||
|
|
||||||
|
defp maybe_filter_since(query, nil), do: query
|
||||||
|
defp maybe_filter_since(query, since), do: where(query, [event], event.created_at >= ^since)
|
||||||
|
|
||||||
|
defp maybe_filter_until(query, nil), do: query
|
||||||
|
defp maybe_filter_until(query, until), do: where(query, [event], event.created_at <= ^until)
|
||||||
|
|
||||||
|
defp filter_by_tags(query, filter) do
|
||||||
|
filter
|
||||||
|
|> tag_filters()
|
||||||
|
|> Enum.reduce(query, fn {tag_name, values}, acc ->
|
||||||
|
where(
|
||||||
|
acc,
|
||||||
|
[event],
|
||||||
|
fragment(
|
||||||
|
"EXISTS (SELECT 1 FROM event_tags AS tag WHERE tag.event_created_at = ? AND tag.event_id = ? AND tag.name = ? AND tag.value = ANY(?))",
|
||||||
|
event.created_at,
|
||||||
|
event.id,
|
||||||
|
^tag_name,
|
||||||
|
type(^values, {:array, :string})
|
||||||
|
)
|
||||||
|
)
|
||||||
|
end)
|
||||||
|
end
|
||||||
|
|
||||||
|
defp tag_filters(filter) do
|
||||||
|
Enum.reduce(filter, [], fn
|
||||||
|
{<<"#", tag_name::binary-size(1)>>, values}, acc -> [{tag_name, values} | acc]
|
||||||
|
_entry, acc -> acc
|
||||||
|
end)
|
||||||
|
end
|
||||||
|
|
||||||
|
defp decode_hex_list(values, decode_case) when is_list(values) do
|
||||||
|
Enum.map(values, &Base.decode16!(&1, case: decode_case))
|
||||||
|
end
|
||||||
|
|
||||||
|
defp effective_filter_limit(filter, opts) do
|
||||||
|
filter_limit = Map.get(filter, "limit")
|
||||||
|
max_filter_limit = Keyword.get(opts, :max_filter_limit)
|
||||||
|
|
||||||
|
cond do
|
||||||
|
is_integer(filter_limit) and is_integer(max_filter_limit) ->
|
||||||
|
min(filter_limit, max_filter_limit)
|
||||||
|
|
||||||
|
is_integer(filter_limit) ->
|
||||||
|
filter_limit
|
||||||
|
|
||||||
|
is_integer(max_filter_limit) ->
|
||||||
|
max_filter_limit
|
||||||
|
|
||||||
|
true ->
|
||||||
|
nil
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
defp maybe_limit_query(query, nil), do: query
|
||||||
|
defp maybe_limit_query(query, limit), do: limit(query, ^limit)
|
||||||
|
|
||||||
|
defp deduplicate_events(events) do
|
||||||
|
events
|
||||||
|
|> Enum.reduce(%{}, fn event, acc -> Map.put_new(acc, event.id, event) end)
|
||||||
|
|> Map.values()
|
||||||
|
end
|
||||||
|
|
||||||
|
defp sort_persisted_events(events) do
|
||||||
|
Enum.sort(events, fn left, right ->
|
||||||
|
cond do
|
||||||
|
left.created_at > right.created_at -> true
|
||||||
|
left.created_at < right.created_at -> false
|
||||||
|
true -> left.id < right.id
|
||||||
|
end
|
||||||
|
end)
|
||||||
|
end
|
||||||
|
|
||||||
|
defp maybe_apply_query_limit(events, opts) do
|
||||||
|
case Keyword.get(opts, :limit) do
|
||||||
|
limit when is_integer(limit) and limit > 0 -> Enum.take(events, limit)
|
||||||
|
_other -> events
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
defp load_tags([]), do: %{}
|
||||||
|
|
||||||
defp load_tags(event_keys) when is_list(event_keys) do
|
defp load_tags(event_keys) when is_list(event_keys) do
|
||||||
created_at_values = Enum.map(event_keys, fn {created_at, _event_id} -> created_at end)
|
created_at_values = Enum.map(event_keys, fn {created_at, _event_id} -> created_at end)
|
||||||
event_id_values = Enum.map(event_keys, fn {_created_at, event_id} -> event_id end)
|
event_id_values = Enum.map(event_keys, fn {_created_at, event_id} -> event_id end)
|
||||||
|
|||||||
@@ -0,0 +1,174 @@
|
|||||||
|
defmodule Parrhesia.Storage.Adapters.Postgres.EventsQueryCountTest do
|
||||||
|
use ExUnit.Case, async: false
|
||||||
|
|
||||||
|
alias Ecto.Adapters.SQL.Sandbox
|
||||||
|
alias Parrhesia.Protocol.EventValidator
|
||||||
|
alias Parrhesia.Repo
|
||||||
|
alias Parrhesia.Storage.Adapters.Postgres.Events
|
||||||
|
|
||||||
|
setup_all do
|
||||||
|
start_supervised!(Repo)
|
||||||
|
Sandbox.mode(Repo, :manual)
|
||||||
|
:ok
|
||||||
|
end
|
||||||
|
|
||||||
|
setup do
|
||||||
|
:ok = Sandbox.checkout(Repo)
|
||||||
|
end
|
||||||
|
|
||||||
|
test "query/3 translates NIP filters including tag filters" do
|
||||||
|
author = String.duplicate("a", 64)
|
||||||
|
other_author = String.duplicate("b", 64)
|
||||||
|
target_pubkey = String.duplicate("c", 64)
|
||||||
|
other_target = String.duplicate("d", 64)
|
||||||
|
referenced_event = String.duplicate("e", 64)
|
||||||
|
|
||||||
|
matching =
|
||||||
|
persist_event(%{
|
||||||
|
"pubkey" => author,
|
||||||
|
"created_at" => 1_700_000_000,
|
||||||
|
"kind" => 1,
|
||||||
|
"tags" => [["p", target_pubkey], ["e", referenced_event], ["x", "topic"]],
|
||||||
|
"content" => "matching"
|
||||||
|
})
|
||||||
|
|
||||||
|
_non_matching_tag =
|
||||||
|
persist_event(%{
|
||||||
|
"pubkey" => author,
|
||||||
|
"created_at" => 1_700_000_001,
|
||||||
|
"kind" => 1,
|
||||||
|
"tags" => [["p", other_target], ["e", referenced_event]],
|
||||||
|
"content" => "other-target"
|
||||||
|
})
|
||||||
|
|
||||||
|
_non_matching_author =
|
||||||
|
persist_event(%{
|
||||||
|
"pubkey" => other_author,
|
||||||
|
"created_at" => 1_700_000_002,
|
||||||
|
"kind" => 1,
|
||||||
|
"tags" => [["p", target_pubkey], ["e", referenced_event]],
|
||||||
|
"content" => "other-author"
|
||||||
|
})
|
||||||
|
|
||||||
|
filters = [
|
||||||
|
%{
|
||||||
|
"authors" => [author],
|
||||||
|
"kinds" => [1],
|
||||||
|
"#p" => [target_pubkey],
|
||||||
|
"#e" => [referenced_event]
|
||||||
|
}
|
||||||
|
]
|
||||||
|
|
||||||
|
assert {:ok, [result]} = Events.query(%{}, filters, [])
|
||||||
|
assert result["id"] == matching["id"]
|
||||||
|
assert ["p", target_pubkey] in result["tags"]
|
||||||
|
assert ["e", referenced_event] in result["tags"]
|
||||||
|
end
|
||||||
|
|
||||||
|
test "query/3 applies filter limit and deterministic tie-break ordering" do
|
||||||
|
author = String.duplicate("1", 64)
|
||||||
|
|
||||||
|
tie_a =
|
||||||
|
persist_event(%{
|
||||||
|
"pubkey" => author,
|
||||||
|
"created_at" => 1_700_000_100,
|
||||||
|
"kind" => 1,
|
||||||
|
"tags" => [],
|
||||||
|
"content" => "tie-a"
|
||||||
|
})
|
||||||
|
|
||||||
|
tie_b =
|
||||||
|
persist_event(%{
|
||||||
|
"pubkey" => author,
|
||||||
|
"created_at" => 1_700_000_100,
|
||||||
|
"kind" => 1,
|
||||||
|
"tags" => [],
|
||||||
|
"content" => "tie-b"
|
||||||
|
})
|
||||||
|
|
||||||
|
newest =
|
||||||
|
persist_event(%{
|
||||||
|
"pubkey" => author,
|
||||||
|
"created_at" => 1_700_000_101,
|
||||||
|
"kind" => 1,
|
||||||
|
"tags" => [],
|
||||||
|
"content" => "newest"
|
||||||
|
})
|
||||||
|
|
||||||
|
filters = [%{"authors" => [author], "kinds" => [1], "limit" => 2}]
|
||||||
|
|
||||||
|
assert {:ok, results} = Events.query(%{}, filters, [])
|
||||||
|
|
||||||
|
tie_winner_id = Enum.min([tie_a["id"], tie_b["id"]])
|
||||||
|
assert Enum.map(results, & &1["id"]) == [newest["id"], tie_winner_id]
|
||||||
|
end
|
||||||
|
|
||||||
|
test "count/3 ORs filters, deduplicates matches and respects tag filters" do
|
||||||
|
now = 1_700_001_000
|
||||||
|
target_pubkey = String.duplicate("f", 64)
|
||||||
|
referenced_event = String.duplicate("0", 64)
|
||||||
|
|
||||||
|
matching =
|
||||||
|
persist_event(%{
|
||||||
|
"pubkey" => String.duplicate("2", 64),
|
||||||
|
"created_at" => 1_700_000_200,
|
||||||
|
"kind" => 7,
|
||||||
|
"tags" => [["p", target_pubkey], ["e", referenced_event]],
|
||||||
|
"content" => "reaction"
|
||||||
|
})
|
||||||
|
|
||||||
|
another_match =
|
||||||
|
persist_event(%{
|
||||||
|
"pubkey" => String.duplicate("3", 64),
|
||||||
|
"created_at" => 1_700_000_201,
|
||||||
|
"kind" => 7,
|
||||||
|
"tags" => [["p", target_pubkey]],
|
||||||
|
"content" => "reaction-2"
|
||||||
|
})
|
||||||
|
|
||||||
|
_expired =
|
||||||
|
persist_event(%{
|
||||||
|
"pubkey" => String.duplicate("4", 64),
|
||||||
|
"created_at" => 1_700_000_199,
|
||||||
|
"kind" => 7,
|
||||||
|
"tags" => [["p", target_pubkey], ["expiration", Integer.to_string(now - 1)]],
|
||||||
|
"content" => "expired"
|
||||||
|
})
|
||||||
|
|
||||||
|
_non_matching =
|
||||||
|
persist_event(%{
|
||||||
|
"pubkey" => String.duplicate("5", 64),
|
||||||
|
"created_at" => 1_700_000_202,
|
||||||
|
"kind" => 7,
|
||||||
|
"tags" => [["p", String.duplicate("6", 64)]],
|
||||||
|
"content" => "other"
|
||||||
|
})
|
||||||
|
|
||||||
|
filters = [
|
||||||
|
%{"kinds" => [7], "#p" => [target_pubkey], "#e" => [referenced_event]},
|
||||||
|
%{"ids" => [matching["id"], another_match["id"]]}
|
||||||
|
]
|
||||||
|
|
||||||
|
assert {:ok, 2} = Events.count(%{}, filters, now: now)
|
||||||
|
end
|
||||||
|
|
||||||
|
defp persist_event(overrides) do
|
||||||
|
event = build_event(overrides)
|
||||||
|
assert {:ok, _persisted} = Events.put_event(%{}, event)
|
||||||
|
event
|
||||||
|
end
|
||||||
|
|
||||||
|
defp build_event(overrides) do
|
||||||
|
base_event = %{
|
||||||
|
"pubkey" => String.duplicate("7", 64),
|
||||||
|
"created_at" => System.system_time(:second),
|
||||||
|
"kind" => 1,
|
||||||
|
"tags" => [],
|
||||||
|
"content" => "content-#{System.unique_integer([:positive])}",
|
||||||
|
"sig" => String.duplicate("8", 128)
|
||||||
|
}
|
||||||
|
|
||||||
|
event = Map.merge(base_event, overrides)
|
||||||
|
Map.put(event, "id", EventValidator.compute_id(event))
|
||||||
|
end
|
||||||
|
end
|
||||||
Reference in New Issue
Block a user