improve: NIF-77 single-filter fast path
This commit is contained in:
@@ -98,22 +98,7 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Events do
|
||||
def query_event_refs(_context, filters, opts) when is_list(opts) do
|
||||
with :ok <- Filter.validate_filters(filters) do
|
||||
now = Keyword.get(opts, :now, System.system_time(:second))
|
||||
|
||||
refs =
|
||||
filters
|
||||
|> event_ref_union_query_for_filters(now, opts)
|
||||
|> subquery()
|
||||
|> then(fn union_query ->
|
||||
from(ref in union_query,
|
||||
group_by: [ref.created_at, ref.id],
|
||||
order_by: [asc: ref.created_at, asc: ref.id],
|
||||
select: %{created_at: ref.created_at, id: ref.id}
|
||||
)
|
||||
end)
|
||||
|> maybe_limit_query(Keyword.get(opts, :limit))
|
||||
|> Repo.all()
|
||||
|
||||
{:ok, refs}
|
||||
{:ok, fetch_event_refs(filters, now, opts)}
|
||||
end
|
||||
end
|
||||
|
||||
@@ -123,17 +108,7 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Events do
|
||||
def count(_context, filters, opts) when is_list(opts) do
|
||||
with :ok <- Filter.validate_filters(filters) do
|
||||
now = Keyword.get(opts, :now, System.system_time(:second))
|
||||
|
||||
total_count =
|
||||
filters
|
||||
|> event_id_union_query_for_filters(now, opts)
|
||||
|> subquery()
|
||||
|> then(fn union_query ->
|
||||
from(event in union_query, select: count(event.id, :distinct))
|
||||
end)
|
||||
|> Repo.one()
|
||||
|
||||
{:ok, total_count}
|
||||
{:ok, count_events(filters, now, opts)}
|
||||
end
|
||||
end
|
||||
|
||||
@@ -632,11 +607,12 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Events do
|
||||
end
|
||||
|
||||
defp event_query_for_filter(filter, now, opts) do
|
||||
base_query =
|
||||
from(event in "events",
|
||||
where: is_nil(event.deleted_at) and (is_nil(event.expires_at) or event.expires_at > ^now),
|
||||
order_by: [desc: event.created_at, asc: event.id],
|
||||
select: %{
|
||||
{base_query, remaining_tag_filters} = event_source_query(filter, now)
|
||||
|
||||
base_query
|
||||
|> apply_common_event_filters(filter, remaining_tag_filters, opts)
|
||||
|> order_by([event: event], desc: event.created_at, asc: event.id)
|
||||
|> select([event: event], %{
|
||||
id: event.id,
|
||||
pubkey: event.pubkey,
|
||||
created_at: event.created_at,
|
||||
@@ -644,69 +620,42 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Events do
|
||||
tags: event.tags,
|
||||
content: event.content,
|
||||
sig: event.sig
|
||||
}
|
||||
)
|
||||
|
||||
query =
|
||||
base_query
|
||||
|> maybe_filter_ids(Map.get(filter, "ids"))
|
||||
|> maybe_filter_authors(Map.get(filter, "authors"))
|
||||
|> maybe_filter_kinds(Map.get(filter, "kinds"))
|
||||
|> maybe_filter_since(Map.get(filter, "since"))
|
||||
|> maybe_filter_until(Map.get(filter, "until"))
|
||||
|> maybe_filter_search(Map.get(filter, "search"))
|
||||
|> filter_by_tags(filter)
|
||||
|> maybe_restrict_giftwrap_access(filter, opts)
|
||||
|
||||
maybe_limit_query(query, effective_filter_limit(filter, opts))
|
||||
})
|
||||
|> maybe_limit_query(effective_filter_limit(filter, opts))
|
||||
end
|
||||
|
||||
defp event_id_query_for_filter(filter, now, opts) do
|
||||
from(event in "events",
|
||||
where: is_nil(event.deleted_at) and (is_nil(event.expires_at) or event.expires_at > ^now),
|
||||
select: event.id
|
||||
)
|
||||
|> maybe_filter_ids(Map.get(filter, "ids"))
|
||||
|> maybe_filter_authors(Map.get(filter, "authors"))
|
||||
|> maybe_filter_kinds(Map.get(filter, "kinds"))
|
||||
|> maybe_filter_since(Map.get(filter, "since"))
|
||||
|> maybe_filter_until(Map.get(filter, "until"))
|
||||
|> maybe_filter_search(Map.get(filter, "search"))
|
||||
|> filter_by_tags(filter)
|
||||
|> maybe_restrict_giftwrap_access(filter, opts)
|
||||
{base_query, remaining_tag_filters} = event_source_query(filter, now)
|
||||
|
||||
base_query
|
||||
|> apply_common_event_filters(filter, remaining_tag_filters, opts)
|
||||
|> select([event: event], event.id)
|
||||
end
|
||||
|
||||
defp event_id_union_query_for_filters([], now, _opts) do
|
||||
defp event_id_distinct_union_query_for_filters([], now, _opts) do
|
||||
from(event in "events",
|
||||
where: event.created_at > ^now and event.created_at < ^now,
|
||||
select: event.id
|
||||
)
|
||||
end
|
||||
|
||||
defp event_id_union_query_for_filters([first_filter | rest_filters], now, opts) do
|
||||
defp event_id_distinct_union_query_for_filters([first_filter | rest_filters], now, opts) do
|
||||
Enum.reduce(rest_filters, event_id_query_for_filter(first_filter, now, opts), fn filter,
|
||||
acc ->
|
||||
union_all(acc, ^event_id_query_for_filter(filter, now, opts))
|
||||
union(acc, ^event_id_query_for_filter(filter, now, opts))
|
||||
end)
|
||||
end
|
||||
|
||||
defp event_ref_query_for_filter(filter, now, opts) do
|
||||
from(event in "events",
|
||||
where: is_nil(event.deleted_at) and (is_nil(event.expires_at) or event.expires_at > ^now),
|
||||
order_by: [asc: event.created_at, asc: event.id],
|
||||
select: %{
|
||||
{base_query, remaining_tag_filters} = event_source_query(filter, now)
|
||||
|
||||
base_query
|
||||
|> apply_common_event_filters(filter, remaining_tag_filters, opts)
|
||||
|> order_by([event: event], asc: event.created_at, asc: event.id)
|
||||
|> select([event: event], %{
|
||||
created_at: event.created_at,
|
||||
id: event.id
|
||||
}
|
||||
)
|
||||
|> maybe_filter_ids(Map.get(filter, "ids"))
|
||||
|> maybe_filter_authors(Map.get(filter, "authors"))
|
||||
|> maybe_filter_kinds(Map.get(filter, "kinds"))
|
||||
|> maybe_filter_since(Map.get(filter, "since"))
|
||||
|> maybe_filter_until(Map.get(filter, "until"))
|
||||
|> maybe_filter_search(Map.get(filter, "search"))
|
||||
|> filter_by_tags(filter)
|
||||
|> maybe_restrict_giftwrap_access(filter, opts)
|
||||
})
|
||||
|> maybe_limit_query(effective_filter_limit(filter, opts))
|
||||
end
|
||||
|
||||
@@ -724,34 +673,128 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Events do
|
||||
end)
|
||||
end
|
||||
|
||||
defp fetch_event_refs([filter], now, opts) do
|
||||
filter
|
||||
|> event_ref_query_for_filter(now, opts)
|
||||
|> maybe_limit_query(Keyword.get(opts, :limit))
|
||||
|> Repo.all()
|
||||
end
|
||||
|
||||
defp fetch_event_refs(filters, now, opts) do
|
||||
filters
|
||||
|> event_ref_union_query_for_filters(now, opts)
|
||||
|> subquery()
|
||||
|> then(fn union_query ->
|
||||
from(ref in union_query,
|
||||
group_by: [ref.created_at, ref.id],
|
||||
order_by: [asc: ref.created_at, asc: ref.id],
|
||||
select: %{created_at: ref.created_at, id: ref.id}
|
||||
)
|
||||
end)
|
||||
|> maybe_limit_query(Keyword.get(opts, :limit))
|
||||
|> Repo.all()
|
||||
end
|
||||
|
||||
defp count_events([filter], now, opts) do
|
||||
filter
|
||||
|> event_id_query_for_filter(now, opts)
|
||||
|> subquery()
|
||||
|> then(fn query ->
|
||||
from(event in query, select: count())
|
||||
end)
|
||||
|> Repo.one()
|
||||
end
|
||||
|
||||
defp count_events(filters, now, opts) do
|
||||
filters
|
||||
|> event_id_distinct_union_query_for_filters(now, opts)
|
||||
|> subquery()
|
||||
|> then(fn union_query ->
|
||||
from(event in union_query, select: count())
|
||||
end)
|
||||
|> Repo.one()
|
||||
end
|
||||
|
||||
defp event_source_query(filter, now) do
|
||||
tag_filters = tag_filters(filter)
|
||||
|
||||
case primary_tag_filter(tag_filters) do
|
||||
nil ->
|
||||
{from(event in "events",
|
||||
as: :event,
|
||||
where:
|
||||
is_nil(event.deleted_at) and
|
||||
(is_nil(event.expires_at) or event.expires_at > ^now)
|
||||
), []}
|
||||
|
||||
{tag_name, values} = primary_tag_filter ->
|
||||
remaining_tag_filters = List.delete(tag_filters, primary_tag_filter)
|
||||
|
||||
{from(tag in "event_tags",
|
||||
as: :primary_tag,
|
||||
where: tag.name == ^tag_name and tag.value in ^values,
|
||||
join: event in "events",
|
||||
as: :event,
|
||||
on: event.created_at == tag.event_created_at and event.id == tag.event_id,
|
||||
where:
|
||||
is_nil(event.deleted_at) and
|
||||
(is_nil(event.expires_at) or event.expires_at > ^now),
|
||||
distinct: [event.created_at, event.id]
|
||||
), remaining_tag_filters}
|
||||
end
|
||||
end
|
||||
|
||||
defp apply_common_event_filters(query, filter, remaining_tag_filters, opts) do
|
||||
query
|
||||
|> maybe_filter_ids(Map.get(filter, "ids"))
|
||||
|> maybe_filter_authors(Map.get(filter, "authors"))
|
||||
|> maybe_filter_kinds(Map.get(filter, "kinds"))
|
||||
|> maybe_filter_since(Map.get(filter, "since"))
|
||||
|> maybe_filter_until(Map.get(filter, "until"))
|
||||
|> maybe_filter_search(Map.get(filter, "search"))
|
||||
|> filter_by_tag_filters(remaining_tag_filters)
|
||||
|> maybe_restrict_giftwrap_access(filter, opts)
|
||||
end
|
||||
|
||||
defp primary_tag_filter([]), do: nil
|
||||
|
||||
defp primary_tag_filter(tag_filters) do
|
||||
Enum.find(tag_filters, fn {tag_name, _values} -> tag_name in ["h", "i"] end) ||
|
||||
List.first(tag_filters)
|
||||
end
|
||||
|
||||
defp maybe_filter_ids(query, nil), do: query
|
||||
|
||||
defp maybe_filter_ids(query, ids) do
|
||||
decoded_ids = decode_hex_list(ids, :lower)
|
||||
where(query, [event], event.id in ^decoded_ids)
|
||||
where(query, [event: event], event.id in ^decoded_ids)
|
||||
end
|
||||
|
||||
defp maybe_filter_authors(query, nil), do: query
|
||||
|
||||
defp maybe_filter_authors(query, authors) do
|
||||
decoded_authors = decode_hex_list(authors, :lower)
|
||||
where(query, [event], event.pubkey in ^decoded_authors)
|
||||
where(query, [event: event], event.pubkey in ^decoded_authors)
|
||||
end
|
||||
|
||||
defp maybe_filter_kinds(query, nil), do: query
|
||||
defp maybe_filter_kinds(query, kinds), do: where(query, [event], event.kind in ^kinds)
|
||||
defp maybe_filter_kinds(query, kinds), do: where(query, [event: event], event.kind in ^kinds)
|
||||
|
||||
defp maybe_filter_since(query, nil), do: query
|
||||
defp maybe_filter_since(query, since), do: where(query, [event], event.created_at >= ^since)
|
||||
|
||||
defp maybe_filter_since(query, since),
|
||||
do: where(query, [event: event], event.created_at >= ^since)
|
||||
|
||||
defp maybe_filter_until(query, nil), do: query
|
||||
defp maybe_filter_until(query, until), do: where(query, [event], event.created_at <= ^until)
|
||||
|
||||
defp maybe_filter_until(query, until),
|
||||
do: where(query, [event: event], event.created_at <= ^until)
|
||||
|
||||
defp maybe_filter_search(query, nil), do: query
|
||||
|
||||
defp maybe_filter_search(query, search) when is_binary(search) and search != "" do
|
||||
escaped_search = escape_like_pattern(search)
|
||||
where(query, [event], ilike(event.content, ^"%#{escaped_search}%"))
|
||||
where(query, [event: event], ilike(event.content, ^"%#{escaped_search}%"))
|
||||
end
|
||||
|
||||
defp maybe_filter_search(query, _search), do: query
|
||||
@@ -763,13 +806,11 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Events do
|
||||
|> String.replace("_", "\\_")
|
||||
end
|
||||
|
||||
defp filter_by_tags(query, filter) do
|
||||
filter
|
||||
|> tag_filters()
|
||||
|> Enum.reduce(query, fn {tag_name, values}, acc ->
|
||||
defp filter_by_tag_filters(query, tag_filters) do
|
||||
Enum.reduce(tag_filters, query, fn {tag_name, values}, acc ->
|
||||
where(
|
||||
acc,
|
||||
[event],
|
||||
[event: event],
|
||||
fragment(
|
||||
"EXISTS (SELECT 1 FROM event_tags AS tag WHERE tag.event_created_at = ? AND tag.event_id = ? AND tag.name = ? AND tag.value = ANY(?))",
|
||||
event.created_at,
|
||||
@@ -799,7 +840,7 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Events do
|
||||
targets_giftwrap?(filter) and requester_pubkeys != [] ->
|
||||
where(
|
||||
query,
|
||||
[event],
|
||||
[event: event],
|
||||
fragment(
|
||||
"EXISTS (SELECT 1 FROM event_tags AS tag WHERE tag.event_created_at = ? AND tag.event_id = ? AND tag.name = 'p' AND tag.value = ANY(?))",
|
||||
event.created_at,
|
||||
@@ -809,7 +850,7 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Events do
|
||||
)
|
||||
|
||||
targets_giftwrap?(filter) ->
|
||||
where(query, [_event], false)
|
||||
where(query, [event: _event], false)
|
||||
|
||||
true ->
|
||||
query
|
||||
|
||||
Reference in New Issue
Block a user