Expand in-memory storage indexes
This commit is contained in:
@@ -6,6 +6,9 @@ defmodule Parrhesia.Storage.Adapters.Memory.Admin do
|
|||||||
alias Parrhesia.Storage.Adapters.Memory.Store
|
alias Parrhesia.Storage.Adapters.Memory.Store
|
||||||
|
|
||||||
@behaviour Parrhesia.Storage.Admin
|
@behaviour Parrhesia.Storage.Admin
|
||||||
|
@default_limit 100
|
||||||
|
@max_limit 1_000
|
||||||
|
@max_audit_logs 1_000
|
||||||
|
|
||||||
@impl true
|
@impl true
|
||||||
def execute(_context, method, _params) do
|
def execute(_context, method, _params) do
|
||||||
@@ -17,18 +20,59 @@ defmodule Parrhesia.Storage.Adapters.Memory.Admin do
|
|||||||
|
|
||||||
@impl true
|
@impl true
|
||||||
def append_audit_log(_context, audit_entry) when is_map(audit_entry) do
|
def append_audit_log(_context, audit_entry) when is_map(audit_entry) do
|
||||||
Store.update(fn state -> update_in(state.audit_logs, &[audit_entry | &1]) end)
|
Store.update(fn state ->
|
||||||
|
update_in(state.audit_logs, fn logs ->
|
||||||
|
[audit_entry | logs] |> Enum.take(@max_audit_logs)
|
||||||
|
end)
|
||||||
|
end)
|
||||||
|
|
||||||
:ok
|
:ok
|
||||||
end
|
end
|
||||||
|
|
||||||
def append_audit_log(_context, _audit_entry), do: {:error, :invalid_audit_entry}
|
def append_audit_log(_context, _audit_entry), do: {:error, :invalid_audit_entry}
|
||||||
|
|
||||||
@impl true
|
@impl true
|
||||||
def list_audit_logs(_context, _opts) do
|
def list_audit_logs(_context, opts) when is_list(opts) do
|
||||||
{:ok, Store.get(fn state -> Enum.reverse(state.audit_logs) end)}
|
limit = normalize_limit(Keyword.get(opts, :limit, @default_limit))
|
||||||
|
method = normalize_method_filter(Keyword.get(opts, :method))
|
||||||
|
actor_pubkey = Keyword.get(opts, :actor_pubkey)
|
||||||
|
|
||||||
|
logs =
|
||||||
|
Store.get(fn state ->
|
||||||
|
state.audit_logs
|
||||||
|
|> Enum.filter(&matches_filters?(&1, method, actor_pubkey))
|
||||||
|
|> Enum.take(limit)
|
||||||
|
end)
|
||||||
|
|
||||||
|
{:ok, logs}
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def list_audit_logs(_context, _opts), do: {:error, :invalid_opts}
|
||||||
|
|
||||||
defp normalize_method(method) when is_binary(method), do: method
|
defp normalize_method(method) when is_binary(method), do: method
|
||||||
defp normalize_method(method) when is_atom(method), do: Atom.to_string(method)
|
defp normalize_method(method) when is_atom(method), do: Atom.to_string(method)
|
||||||
defp normalize_method(method), do: inspect(method)
|
defp normalize_method(method), do: inspect(method)
|
||||||
|
|
||||||
|
defp normalize_limit(limit) when is_integer(limit) and limit > 0, do: min(limit, @max_limit)
|
||||||
|
defp normalize_limit(_limit), do: @default_limit
|
||||||
|
|
||||||
|
defp normalize_method_filter(nil), do: nil
|
||||||
|
defp normalize_method_filter(method), do: normalize_method(method)
|
||||||
|
|
||||||
|
defp matches_method?(_entry, nil), do: true
|
||||||
|
|
||||||
|
defp matches_method?(entry, method) do
|
||||||
|
normalize_method(Map.get(entry, :method) || Map.get(entry, "method")) == method
|
||||||
|
end
|
||||||
|
|
||||||
|
defp matches_actor_pubkey?(_entry, nil), do: true
|
||||||
|
|
||||||
|
defp matches_actor_pubkey?(entry, actor_pubkey) do
|
||||||
|
Map.get(entry, :actor_pubkey) == actor_pubkey or
|
||||||
|
Map.get(entry, "actor_pubkey") == actor_pubkey
|
||||||
|
end
|
||||||
|
|
||||||
|
defp matches_filters?(entry, method, actor_pubkey) do
|
||||||
|
matches_method?(entry, method) and matches_actor_pubkey?(entry, actor_pubkey)
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|||||||
@@ -47,25 +47,19 @@ defmodule Parrhesia.Storage.Adapters.Memory.Events do
|
|||||||
def query_event_refs(_context, filters, opts) do
|
def query_event_refs(_context, filters, opts) do
|
||||||
with :ok <- Filter.validate_filters(filters) do
|
with :ok <- Filter.validate_filters(filters) do
|
||||||
requester_pubkeys = Keyword.get(opts, :requester_pubkeys, [])
|
requester_pubkeys = Keyword.get(opts, :requester_pubkeys, [])
|
||||||
|
query_opts = Keyword.put(opts, :apply_filter_limits?, false)
|
||||||
|
|
||||||
|
{_, refs} =
|
||||||
|
reduce_unique_matching_events(
|
||||||
|
filters,
|
||||||
|
requester_pubkeys,
|
||||||
|
query_opts,
|
||||||
|
{MapSet.new(), []},
|
||||||
|
&append_unique_event_ref/2
|
||||||
|
)
|
||||||
|
|
||||||
refs =
|
refs =
|
||||||
filters
|
refs |> Enum.sort(&(compare_event_refs(&1, &2) != :gt)) |> maybe_limit_event_refs(opts)
|
||||||
|> Enum.flat_map(
|
|
||||||
&matching_events_for_filter(
|
|
||||||
&1,
|
|
||||||
requester_pubkeys,
|
|
||||||
Keyword.put(opts, :apply_filter_limits?, false)
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|> deduplicate_events()
|
|
||||||
|> Enum.map(fn event ->
|
|
||||||
%{
|
|
||||||
created_at: Map.fetch!(event, "created_at"),
|
|
||||||
id: Base.decode16!(Map.fetch!(event, "id"), case: :mixed)
|
|
||||||
}
|
|
||||||
end)
|
|
||||||
|> Enum.sort(&(compare_event_refs(&1, &2) != :gt))
|
|
||||||
|> maybe_limit_event_refs(opts)
|
|
||||||
|
|
||||||
{:ok, refs}
|
{:ok, refs}
|
||||||
end
|
end
|
||||||
@@ -75,18 +69,16 @@ defmodule Parrhesia.Storage.Adapters.Memory.Events do
|
|||||||
def count(_context, filters, opts) do
|
def count(_context, filters, opts) do
|
||||||
with :ok <- Filter.validate_filters(filters) do
|
with :ok <- Filter.validate_filters(filters) do
|
||||||
requester_pubkeys = Keyword.get(opts, :requester_pubkeys, [])
|
requester_pubkeys = Keyword.get(opts, :requester_pubkeys, [])
|
||||||
|
query_opts = Keyword.put(opts, :apply_filter_limits?, false)
|
||||||
|
|
||||||
count =
|
{_seen_ids, count} =
|
||||||
filters
|
reduce_unique_matching_events(
|
||||||
|> Enum.flat_map(
|
filters,
|
||||||
&matching_events_for_filter(
|
requester_pubkeys,
|
||||||
&1,
|
query_opts,
|
||||||
requester_pubkeys,
|
{MapSet.new(), 0},
|
||||||
Keyword.put(opts, :apply_filter_limits?, false)
|
&count_unique_event/2
|
||||||
)
|
|
||||||
)
|
)
|
||||||
|> deduplicate_events()
|
|
||||||
|> length()
|
|
||||||
|
|
||||||
{:ok, count}
|
{:ok, count}
|
||||||
end
|
end
|
||||||
@@ -119,13 +111,10 @@ defmodule Parrhesia.Storage.Adapters.Memory.Events do
|
|||||||
end)
|
end)
|
||||||
|
|
||||||
coordinate_delete_ids =
|
coordinate_delete_ids =
|
||||||
Store.reduce_events([], fn candidate, acc ->
|
delete_coordinates
|
||||||
if matches_delete_coordinate?(candidate, delete_coordinates, deleter_pubkey) do
|
|> coordinate_delete_candidates(deleter_pubkey)
|
||||||
[candidate["id"] | acc]
|
|> Enum.filter(&matches_delete_coordinate?(&1, delete_coordinates, deleter_pubkey))
|
||||||
else
|
|> Enum.map(& &1["id"])
|
||||||
acc
|
|
||||||
end
|
|
||||||
end)
|
|
||||||
|
|
||||||
all_delete_ids = Enum.uniq(delete_event_ids ++ coordinate_delete_ids)
|
all_delete_ids = Enum.uniq(delete_event_ids ++ coordinate_delete_ids)
|
||||||
|
|
||||||
@@ -139,13 +128,9 @@ defmodule Parrhesia.Storage.Adapters.Memory.Events do
|
|||||||
pubkey = Map.get(event, "pubkey")
|
pubkey = Map.get(event, "pubkey")
|
||||||
|
|
||||||
deleted_ids =
|
deleted_ids =
|
||||||
Store.reduce_events([], fn candidate, acc ->
|
pubkey
|
||||||
if candidate["pubkey"] == pubkey do
|
|> vanish_candidates(Map.get(event, "created_at"))
|
||||||
[candidate["id"] | acc]
|
|> Enum.map(& &1["id"])
|
||||||
else
|
|
||||||
acc
|
|
||||||
end
|
|
||||||
end)
|
|
||||||
|
|
||||||
Enum.each(deleted_ids, &Store.mark_deleted/1)
|
Enum.each(deleted_ids, &Store.mark_deleted/1)
|
||||||
|
|
||||||
@@ -234,7 +219,7 @@ defmodule Parrhesia.Storage.Adapters.Memory.Events do
|
|||||||
Map.has_key?(filter, "ids") ->
|
Map.has_key?(filter, "ids") ->
|
||||||
direct_id_lookup_events(filter, requester_pubkeys, opts)
|
direct_id_lookup_events(filter, requester_pubkeys, opts)
|
||||||
|
|
||||||
is_tuple(indexed_tag_filter(filter)) ->
|
indexed_candidate_spec(filter) != nil ->
|
||||||
indexed_tag_lookup_events(filter, requester_pubkeys, opts)
|
indexed_tag_lookup_events(filter, requester_pubkeys, opts)
|
||||||
|
|
||||||
true ->
|
true ->
|
||||||
@@ -273,11 +258,8 @@ defmodule Parrhesia.Storage.Adapters.Memory.Events do
|
|||||||
end
|
end
|
||||||
|
|
||||||
defp indexed_tag_lookup_events(filter, requester_pubkeys, opts) do
|
defp indexed_tag_lookup_events(filter, requester_pubkeys, opts) do
|
||||||
{tag_name, tag_values} = indexed_tag_filter(filter)
|
filter
|
||||||
indexed_tag_values = effective_indexed_tag_values(filter, tag_values)
|
|> indexed_candidate_events()
|
||||||
|
|
||||||
tag_name
|
|
||||||
|> Store.tagged_events(indexed_tag_values)
|
|
||||||
|> Enum.filter(&filter_match_visible?(&1, filter, requester_pubkeys))
|
|> Enum.filter(&filter_match_visible?(&1, filter, requester_pubkeys))
|
||||||
|> maybe_take_filter_limit(filter, opts)
|
|> maybe_take_filter_limit(filter, opts)
|
||||||
end
|
end
|
||||||
@@ -296,6 +278,49 @@ defmodule Parrhesia.Storage.Adapters.Memory.Events do
|
|||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
defp indexed_candidate_spec(filter) do
|
||||||
|
authors = Map.get(filter, "authors")
|
||||||
|
kinds = Map.get(filter, "kinds")
|
||||||
|
tag_filter = indexed_tag_filter(filter)
|
||||||
|
|
||||||
|
cond do
|
||||||
|
is_tuple(tag_filter) ->
|
||||||
|
{tag_name, tag_values} = tag_filter
|
||||||
|
{:tag, tag_name, effective_indexed_tag_values(filter, tag_values)}
|
||||||
|
|
||||||
|
is_list(authors) and is_list(kinds) ->
|
||||||
|
{:pubkey_kind, authors, kinds}
|
||||||
|
|
||||||
|
is_list(authors) ->
|
||||||
|
{:pubkey, authors}
|
||||||
|
|
||||||
|
is_list(kinds) ->
|
||||||
|
{:kind, kinds}
|
||||||
|
|
||||||
|
true ->
|
||||||
|
nil
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
defp indexed_candidate_events(filter) do
|
||||||
|
case indexed_candidate_spec(filter) do
|
||||||
|
{:tag, tag_name, tag_values} ->
|
||||||
|
Store.tagged_events(tag_name, tag_values)
|
||||||
|
|
||||||
|
{:pubkey_kind, authors, kinds} ->
|
||||||
|
Store.events_by_pubkeys_and_kinds(authors, kinds)
|
||||||
|
|
||||||
|
{:pubkey, authors} ->
|
||||||
|
Store.events_by_pubkeys(authors)
|
||||||
|
|
||||||
|
{:kind, kinds} ->
|
||||||
|
Store.events_by_kinds(kinds)
|
||||||
|
|
||||||
|
nil ->
|
||||||
|
[]
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
defp effective_indexed_tag_values(filter, tag_values) do
|
defp effective_indexed_tag_values(filter, tag_values) do
|
||||||
case Map.get(filter, "limit") do
|
case Map.get(filter, "limit") do
|
||||||
limit when is_integer(limit) and limit == 1 ->
|
limit when is_integer(limit) and limit == 1 ->
|
||||||
@@ -339,6 +364,114 @@ defmodule Parrhesia.Storage.Adapters.Memory.Events do
|
|||||||
|
|
||||||
defp maybe_halt_scan(acc, count, _limit), do: {acc, count}
|
defp maybe_halt_scan(acc, count, _limit), do: {acc, count}
|
||||||
|
|
||||||
|
defp reduce_unique_matching_events(filters, requester_pubkeys, opts, acc, reducer) do
|
||||||
|
Enum.reduce(filters, acc, fn filter, current_acc ->
|
||||||
|
reduce_matching_events_for_filter(filter, requester_pubkeys, opts, current_acc, reducer)
|
||||||
|
end)
|
||||||
|
end
|
||||||
|
|
||||||
|
defp reduce_matching_events_for_filter(filter, requester_pubkeys, _opts, acc, reducer) do
|
||||||
|
cond do
|
||||||
|
Map.has_key?(filter, "ids") ->
|
||||||
|
filter
|
||||||
|
|> Map.get("ids", [])
|
||||||
|
|> Enum.reduce(acc, &reduce_event_id_match(&1, filter, requester_pubkeys, &2, reducer))
|
||||||
|
|
||||||
|
indexed_candidate_spec(filter) != nil ->
|
||||||
|
filter
|
||||||
|
|> indexed_candidate_events()
|
||||||
|
|> Enum.reduce(
|
||||||
|
acc,
|
||||||
|
&maybe_reduce_visible_event(&1, filter, requester_pubkeys, &2, reducer)
|
||||||
|
)
|
||||||
|
|
||||||
|
true ->
|
||||||
|
Store.reduce_events_newest(
|
||||||
|
acc,
|
||||||
|
&maybe_reduce_visible_event(&1, filter, requester_pubkeys, &2, reducer)
|
||||||
|
)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
defp coordinate_delete_candidates(delete_coordinates, deleter_pubkey) do
|
||||||
|
delete_coordinates
|
||||||
|
|> Enum.flat_map(fn coordinate ->
|
||||||
|
cond do
|
||||||
|
coordinate.pubkey != deleter_pubkey ->
|
||||||
|
[]
|
||||||
|
|
||||||
|
addressable_kind?(coordinate.kind) ->
|
||||||
|
Store.events_by_addresses([{coordinate.kind, deleter_pubkey, coordinate.d_tag}])
|
||||||
|
|
||||||
|
replaceable_kind?(coordinate.kind) ->
|
||||||
|
Store.events_by_pubkeys_and_kinds([deleter_pubkey], [coordinate.kind])
|
||||||
|
|
||||||
|
true ->
|
||||||
|
[]
|
||||||
|
end
|
||||||
|
end)
|
||||||
|
|> deduplicate_events()
|
||||||
|
end
|
||||||
|
|
||||||
|
defp vanish_candidates(pubkey, created_at) do
|
||||||
|
own_events =
|
||||||
|
Store.events_by_pubkeys([pubkey])
|
||||||
|
|> Enum.filter(&(&1["created_at"] <= created_at))
|
||||||
|
|
||||||
|
giftwrap_events =
|
||||||
|
Store.tagged_events("p", [pubkey])
|
||||||
|
|> Enum.filter(&(&1["kind"] == 1059 and &1["created_at"] <= created_at))
|
||||||
|
|
||||||
|
deduplicate_events(own_events ++ giftwrap_events)
|
||||||
|
end
|
||||||
|
|
||||||
|
defp event_ref(event) do
|
||||||
|
%{
|
||||||
|
created_at: Map.fetch!(event, "created_at"),
|
||||||
|
id: Base.decode16!(Map.fetch!(event, "id"), case: :mixed)
|
||||||
|
}
|
||||||
|
end
|
||||||
|
|
||||||
|
defp append_unique_event_ref(event, {seen_ids, acc}) do
|
||||||
|
reduce_unique_event(event, {seen_ids, acc}, fn _event_id, next_seen_ids ->
|
||||||
|
{next_seen_ids, [event_ref(event) | acc]}
|
||||||
|
end)
|
||||||
|
end
|
||||||
|
|
||||||
|
defp count_unique_event(event, {seen_ids, acc}) do
|
||||||
|
reduce_unique_event(event, {seen_ids, acc}, fn _event_id, next_seen_ids ->
|
||||||
|
{next_seen_ids, acc + 1}
|
||||||
|
end)
|
||||||
|
end
|
||||||
|
|
||||||
|
defp reduce_unique_event(event, {seen_ids, acc}, fun) do
|
||||||
|
event_id = Map.fetch!(event, "id")
|
||||||
|
|
||||||
|
if MapSet.member?(seen_ids, event_id) do
|
||||||
|
{seen_ids, acc}
|
||||||
|
else
|
||||||
|
fun.(event_id, MapSet.put(seen_ids, event_id))
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
defp maybe_reduce_visible_event(event, filter, requester_pubkeys, acc, reducer) do
|
||||||
|
if filter_match_visible?(event, filter, requester_pubkeys) do
|
||||||
|
reducer.(event, acc)
|
||||||
|
else
|
||||||
|
acc
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
defp reduce_event_id_match(event_id, filter, requester_pubkeys, acc, reducer) do
|
||||||
|
case Store.get_event(event_id) do
|
||||||
|
{:ok, event, false} ->
|
||||||
|
maybe_reduce_visible_event(event, filter, requester_pubkeys, acc, reducer)
|
||||||
|
|
||||||
|
_other ->
|
||||||
|
acc
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
defp deduplicate_events(events) do
|
defp deduplicate_events(events) do
|
||||||
events
|
events
|
||||||
|> Enum.reduce(%{}, fn event, acc -> Map.put(acc, event["id"], event) end)
|
|> Enum.reduce(%{}, fn event, acc -> Map.put(acc, event["id"], event) end)
|
||||||
|
|||||||
@@ -7,6 +7,10 @@ defmodule Parrhesia.Storage.Adapters.Memory.Store do
|
|||||||
@events_table :parrhesia_memory_events
|
@events_table :parrhesia_memory_events
|
||||||
@events_by_time_table :parrhesia_memory_events_by_time
|
@events_by_time_table :parrhesia_memory_events_by_time
|
||||||
@events_by_tag_table :parrhesia_memory_events_by_tag
|
@events_by_tag_table :parrhesia_memory_events_by_tag
|
||||||
|
@events_by_pubkey_table :parrhesia_memory_events_by_pubkey
|
||||||
|
@events_by_kind_table :parrhesia_memory_events_by_kind
|
||||||
|
@events_by_pubkey_kind_table :parrhesia_memory_events_by_pubkey_kind
|
||||||
|
@events_by_address_table :parrhesia_memory_events_by_address
|
||||||
|
|
||||||
@initial_state %{
|
@initial_state %{
|
||||||
bans: %{pubkeys: MapSet.new(), events: MapSet.new(), ips: MapSet.new()},
|
bans: %{pubkeys: MapSet.new(), events: MapSet.new(), ips: MapSet.new()},
|
||||||
@@ -19,7 +23,14 @@ defmodule Parrhesia.Storage.Adapters.Memory.Store do
|
|||||||
}
|
}
|
||||||
|
|
||||||
def ensure_started do
|
def ensure_started do
|
||||||
ensure_agent_started()
|
with :ok <- ensure_agent_started() do
|
||||||
|
Agent.get(@name, fn state ->
|
||||||
|
ensure_tables_started()
|
||||||
|
state
|
||||||
|
end)
|
||||||
|
|
||||||
|
:ok
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
def put_event(event_id, event) when is_binary(event_id) and is_map(event) do
|
def put_event(event_id, event) when is_binary(event_id) and is_map(event) do
|
||||||
@@ -28,6 +39,7 @@ defmodule Parrhesia.Storage.Adapters.Memory.Store do
|
|||||||
if :ets.insert_new(@events_table, {event_id, event, false}) do
|
if :ets.insert_new(@events_table, {event_id, event, false}) do
|
||||||
true = :ets.insert(@events_by_time_table, {{sort_key(event), event_id}, event_id})
|
true = :ets.insert(@events_by_time_table, {{sort_key(event), event_id}, event_id})
|
||||||
index_event_tags(event_id, event)
|
index_event_tags(event_id, event)
|
||||||
|
index_event_secondary_keys(event_id, event)
|
||||||
:ok
|
:ok
|
||||||
else
|
else
|
||||||
{:error, :duplicate_event}
|
{:error, :duplicate_event}
|
||||||
@@ -51,6 +63,7 @@ defmodule Parrhesia.Storage.Adapters.Memory.Store do
|
|||||||
true = :ets.insert(@events_table, {event_id, event, true})
|
true = :ets.insert(@events_table, {event_id, event, true})
|
||||||
true = :ets.delete(@events_by_time_table, {sort_key(event), event_id})
|
true = :ets.delete(@events_by_time_table, {sort_key(event), event_id})
|
||||||
unindex_event_tags(event_id, event)
|
unindex_event_tags(event_id, event)
|
||||||
|
unindex_event_secondary_keys(event_id, event)
|
||||||
:ok
|
:ok
|
||||||
|
|
||||||
{:ok, _event, true} ->
|
{:ok, _event, true} ->
|
||||||
@@ -86,9 +99,43 @@ defmodule Parrhesia.Storage.Adapters.Memory.Store do
|
|||||||
:ok = ensure_started()
|
:ok = ensure_started()
|
||||||
|
|
||||||
tag_values
|
tag_values
|
||||||
|> Enum.flat_map(&tagged_events_for_value(tag_name, &1))
|
|> Enum.flat_map(&indexed_events_for_value(@events_by_tag_table, {tag_name, &1}))
|
||||||
|> Enum.uniq_by(& &1["id"])
|
|> sort_and_deduplicate_events()
|
||||||
|> Enum.sort(&chronological_sorter/2)
|
end
|
||||||
|
|
||||||
|
def events_by_pubkeys(pubkeys) when is_list(pubkeys) do
|
||||||
|
:ok = ensure_started()
|
||||||
|
|
||||||
|
pubkeys
|
||||||
|
|> Enum.flat_map(&indexed_events_for_value(@events_by_pubkey_table, &1))
|
||||||
|
|> sort_and_deduplicate_events()
|
||||||
|
end
|
||||||
|
|
||||||
|
def events_by_kinds(kinds) when is_list(kinds) do
|
||||||
|
:ok = ensure_started()
|
||||||
|
|
||||||
|
kinds
|
||||||
|
|> Enum.flat_map(&indexed_events_for_value(@events_by_kind_table, &1))
|
||||||
|
|> sort_and_deduplicate_events()
|
||||||
|
end
|
||||||
|
|
||||||
|
def events_by_pubkeys_and_kinds(pubkeys, kinds) when is_list(pubkeys) and is_list(kinds) do
|
||||||
|
:ok = ensure_started()
|
||||||
|
|
||||||
|
pubkeys
|
||||||
|
|> Enum.flat_map(fn pubkey ->
|
||||||
|
kinds
|
||||||
|
|> Enum.flat_map(&indexed_events_for_value(@events_by_pubkey_kind_table, {pubkey, &1}))
|
||||||
|
end)
|
||||||
|
|> sort_and_deduplicate_events()
|
||||||
|
end
|
||||||
|
|
||||||
|
def events_by_addresses(addresses) when is_list(addresses) do
|
||||||
|
:ok = ensure_started()
|
||||||
|
|
||||||
|
addresses
|
||||||
|
|> Enum.flat_map(&indexed_events_for_value(@events_by_address_table, &1))
|
||||||
|
|> sort_and_deduplicate_events()
|
||||||
end
|
end
|
||||||
|
|
||||||
defp reduce_events_newest_from(:"$end_of_table", acc, _fun), do: acc
|
defp reduce_events_newest_from(:"$end_of_table", acc, _fun), do: acc
|
||||||
@@ -152,7 +199,13 @@ defmodule Parrhesia.Storage.Adapters.Memory.Store do
|
|||||||
end
|
end
|
||||||
|
|
||||||
defp init_state do
|
defp init_state do
|
||||||
:ets.new(@events_table, [
|
ensure_tables_started()
|
||||||
|
|
||||||
|
@initial_state
|
||||||
|
end
|
||||||
|
|
||||||
|
defp ensure_tables_started do
|
||||||
|
ensure_table(@events_table, [
|
||||||
:named_table,
|
:named_table,
|
||||||
:public,
|
:public,
|
||||||
:set,
|
:set,
|
||||||
@@ -160,7 +213,7 @@ defmodule Parrhesia.Storage.Adapters.Memory.Store do
|
|||||||
write_concurrency: true
|
write_concurrency: true
|
||||||
])
|
])
|
||||||
|
|
||||||
:ets.new(@events_by_time_table, [
|
ensure_table(@events_by_time_table, [
|
||||||
:named_table,
|
:named_table,
|
||||||
:public,
|
:public,
|
||||||
:ordered_set,
|
:ordered_set,
|
||||||
@@ -168,7 +221,7 @@ defmodule Parrhesia.Storage.Adapters.Memory.Store do
|
|||||||
write_concurrency: true
|
write_concurrency: true
|
||||||
])
|
])
|
||||||
|
|
||||||
:ets.new(@events_by_tag_table, [
|
ensure_table(@events_by_tag_table, [
|
||||||
:named_table,
|
:named_table,
|
||||||
:public,
|
:public,
|
||||||
:bag,
|
:bag,
|
||||||
@@ -176,7 +229,44 @@ defmodule Parrhesia.Storage.Adapters.Memory.Store do
|
|||||||
write_concurrency: true
|
write_concurrency: true
|
||||||
])
|
])
|
||||||
|
|
||||||
@initial_state
|
ensure_table(@events_by_pubkey_table, [
|
||||||
|
:named_table,
|
||||||
|
:public,
|
||||||
|
:bag,
|
||||||
|
read_concurrency: true,
|
||||||
|
write_concurrency: true
|
||||||
|
])
|
||||||
|
|
||||||
|
ensure_table(@events_by_kind_table, [
|
||||||
|
:named_table,
|
||||||
|
:public,
|
||||||
|
:bag,
|
||||||
|
read_concurrency: true,
|
||||||
|
write_concurrency: true
|
||||||
|
])
|
||||||
|
|
||||||
|
ensure_table(@events_by_pubkey_kind_table, [
|
||||||
|
:named_table,
|
||||||
|
:public,
|
||||||
|
:bag,
|
||||||
|
read_concurrency: true,
|
||||||
|
write_concurrency: true
|
||||||
|
])
|
||||||
|
|
||||||
|
ensure_table(@events_by_address_table, [
|
||||||
|
:named_table,
|
||||||
|
:public,
|
||||||
|
:bag,
|
||||||
|
read_concurrency: true,
|
||||||
|
write_concurrency: true
|
||||||
|
])
|
||||||
|
end
|
||||||
|
|
||||||
|
defp ensure_table(name, options) do
|
||||||
|
case :ets.whereis(name) do
|
||||||
|
:undefined -> :ets.new(name, options)
|
||||||
|
_table -> :ok
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
defp lookup_event(event_id) do
|
defp lookup_event(event_id) do
|
||||||
@@ -186,21 +276,6 @@ defmodule Parrhesia.Storage.Adapters.Memory.Store do
|
|||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
defp tagged_events_for_value(_tag_name, tag_value) when not is_binary(tag_value), do: []
|
|
||||||
|
|
||||||
defp tagged_events_for_value(tag_name, tag_value) do
|
|
||||||
tag_key = {tag_name, tag_value}
|
|
||||||
|
|
||||||
@events_by_tag_table
|
|
||||||
|> :ets.lookup(tag_key)
|
|
||||||
|> Enum.reduce([], fn {^tag_key, _created_sort_key, event_id}, acc ->
|
|
||||||
case lookup_event(event_id) do
|
|
||||||
{:ok, event, false} -> [event | acc]
|
|
||||||
_other -> acc
|
|
||||||
end
|
|
||||||
end)
|
|
||||||
end
|
|
||||||
|
|
||||||
defp index_event_tags(event_id, event) do
|
defp index_event_tags(event_id, event) do
|
||||||
event
|
event
|
||||||
|> event_tag_index_entries(event_id)
|
|> event_tag_index_entries(event_id)
|
||||||
@@ -209,12 +284,28 @@ defmodule Parrhesia.Storage.Adapters.Memory.Store do
|
|||||||
end)
|
end)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
defp index_event_secondary_keys(event_id, event) do
|
||||||
|
event
|
||||||
|
|> secondary_index_entries(event_id)
|
||||||
|
|> Enum.each(fn {table, entry} ->
|
||||||
|
true = :ets.insert(table, entry)
|
||||||
|
end)
|
||||||
|
end
|
||||||
|
|
||||||
defp unindex_event_tags(event_id, event) do
|
defp unindex_event_tags(event_id, event) do
|
||||||
event
|
event
|
||||||
|> event_tag_index_entries(event_id)
|
|> event_tag_index_entries(event_id)
|
||||||
|> Enum.each(&:ets.delete_object(@events_by_tag_table, &1))
|
|> Enum.each(&:ets.delete_object(@events_by_tag_table, &1))
|
||||||
end
|
end
|
||||||
|
|
||||||
|
defp unindex_event_secondary_keys(event_id, event) do
|
||||||
|
event
|
||||||
|
|> secondary_index_entries(event_id)
|
||||||
|
|> Enum.each(fn {table, entry} ->
|
||||||
|
:ets.delete_object(table, entry)
|
||||||
|
end)
|
||||||
|
end
|
||||||
|
|
||||||
defp event_tag_index_entries(event, event_id) do
|
defp event_tag_index_entries(event, event_id) do
|
||||||
created_sort_key = sort_key(event)
|
created_sort_key = sort_key(event)
|
||||||
|
|
||||||
@@ -229,6 +320,70 @@ defmodule Parrhesia.Storage.Adapters.Memory.Store do
|
|||||||
end)
|
end)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
defp secondary_index_entries(event, event_id) do
|
||||||
|
created_sort_key = sort_key(event)
|
||||||
|
pubkey = Map.get(event, "pubkey")
|
||||||
|
kind = Map.get(event, "kind")
|
||||||
|
|
||||||
|
[]
|
||||||
|
|> maybe_put_secondary_entry(@events_by_pubkey_table, pubkey, created_sort_key, event_id)
|
||||||
|
|> maybe_put_secondary_entry(@events_by_kind_table, kind, created_sort_key, event_id)
|
||||||
|
|> maybe_put_pubkey_kind_entry(pubkey, kind, created_sort_key, event_id)
|
||||||
|
|> maybe_put_address_entry(event, pubkey, kind, event_id)
|
||||||
|
end
|
||||||
|
|
||||||
|
defp maybe_put_secondary_entry(entries, _table, key, _created_sort_key, _event_id)
|
||||||
|
when is_nil(key),
|
||||||
|
do: entries
|
||||||
|
|
||||||
|
defp maybe_put_secondary_entry(entries, table, key, created_sort_key, event_id) do
|
||||||
|
[{table, {key, created_sort_key, event_id}} | entries]
|
||||||
|
end
|
||||||
|
|
||||||
|
defp maybe_put_pubkey_kind_entry(entries, pubkey, kind, created_sort_key, event_id)
|
||||||
|
when is_binary(pubkey) and is_integer(kind) do
|
||||||
|
[{@events_by_pubkey_kind_table, {{pubkey, kind}, created_sort_key, event_id}} | entries]
|
||||||
|
end
|
||||||
|
|
||||||
|
defp maybe_put_pubkey_kind_entry(entries, _pubkey, _kind, _created_sort_key, _event_id),
|
||||||
|
do: entries
|
||||||
|
|
||||||
|
defp maybe_put_address_entry(entries, event, pubkey, kind, event_id)
|
||||||
|
when is_binary(pubkey) and is_integer(kind) and kind >= 30_000 and kind < 40_000 do
|
||||||
|
d_tag =
|
||||||
|
event
|
||||||
|
|> Map.get("tags", [])
|
||||||
|
|> Enum.find_value("", fn
|
||||||
|
["d", value | _rest] -> value
|
||||||
|
_tag -> nil
|
||||||
|
end)
|
||||||
|
|
||||||
|
[{@events_by_address_table, {{kind, pubkey, d_tag}, sort_key(event), event_id}} | entries]
|
||||||
|
end
|
||||||
|
|
||||||
|
defp maybe_put_address_entry(entries, _event, _pubkey, _kind, _event_id), do: entries
|
||||||
|
|
||||||
|
defp indexed_events_for_value(_table, value)
|
||||||
|
when not is_binary(value) and not is_integer(value) and not is_tuple(value),
|
||||||
|
do: []
|
||||||
|
|
||||||
|
defp indexed_events_for_value(table, value) do
|
||||||
|
table
|
||||||
|
|> :ets.lookup(value)
|
||||||
|
|> Enum.reduce([], fn {^value, _created_sort_key, event_id}, acc ->
|
||||||
|
case lookup_event(event_id) do
|
||||||
|
{:ok, event, false} -> [event | acc]
|
||||||
|
_other -> acc
|
||||||
|
end
|
||||||
|
end)
|
||||||
|
end
|
||||||
|
|
||||||
|
defp sort_and_deduplicate_events(events) do
|
||||||
|
events
|
||||||
|
|> Enum.uniq_by(& &1["id"])
|
||||||
|
|> Enum.sort(&chronological_sorter/2)
|
||||||
|
end
|
||||||
|
|
||||||
defp chronological_sorter(left, right) do
|
defp chronological_sorter(left, right) do
|
||||||
cond do
|
cond do
|
||||||
left["created_at"] > right["created_at"] -> true
|
left["created_at"] > right["created_at"] -> true
|
||||||
|
|||||||
@@ -179,4 +179,127 @@ defmodule Parrhesia.Storage.Adapters.Memory.AdapterTest do
|
|||||||
|
|
||||||
assert Enum.map(results, & &1["id"]) == [newest["id"], middle["id"]]
|
assert Enum.map(results, & &1["id"]) == [newest["id"], middle["id"]]
|
||||||
end
|
end
|
||||||
|
|
||||||
|
test "memory adapter counts and returns refs without duplicate overlaps across indexed filters" do
|
||||||
|
author = String.duplicate("9", 64)
|
||||||
|
tag = "shared"
|
||||||
|
|
||||||
|
older =
|
||||||
|
%{
|
||||||
|
"id" => String.duplicate("a", 64),
|
||||||
|
"pubkey" => author,
|
||||||
|
"created_at" => 1_700_000_200,
|
||||||
|
"kind" => 1,
|
||||||
|
"tags" => [["t", tag]],
|
||||||
|
"content" => "older"
|
||||||
|
}
|
||||||
|
|
||||||
|
newer =
|
||||||
|
%{
|
||||||
|
"id" => String.duplicate("b", 64),
|
||||||
|
"pubkey" => author,
|
||||||
|
"created_at" => 1_700_000_201,
|
||||||
|
"kind" => 1,
|
||||||
|
"tags" => [["t", tag]],
|
||||||
|
"content" => "newer"
|
||||||
|
}
|
||||||
|
|
||||||
|
unrelated =
|
||||||
|
%{
|
||||||
|
"id" => String.duplicate("c", 64),
|
||||||
|
"pubkey" => String.duplicate("d", 64),
|
||||||
|
"created_at" => 1_700_000_202,
|
||||||
|
"kind" => 2,
|
||||||
|
"tags" => [["t", "other"]],
|
||||||
|
"content" => "unrelated"
|
||||||
|
}
|
||||||
|
|
||||||
|
assert {:ok, _event} = Events.put_event(%{}, older)
|
||||||
|
assert {:ok, _event} = Events.put_event(%{}, newer)
|
||||||
|
assert {:ok, _event} = Events.put_event(%{}, unrelated)
|
||||||
|
|
||||||
|
filters = [
|
||||||
|
%{"authors" => [author], "kinds" => [1]},
|
||||||
|
%{"#t" => [tag]}
|
||||||
|
]
|
||||||
|
|
||||||
|
assert {:ok, 2} = Events.count(%{}, filters, [])
|
||||||
|
|
||||||
|
assert {:ok, refs} = Events.query_event_refs(%{}, filters, [])
|
||||||
|
|
||||||
|
assert Enum.map(refs, &Base.encode16(&1.id, case: :lower)) == [older["id"], newer["id"]]
|
||||||
|
end
|
||||||
|
|
||||||
|
test "memory adapter uses indexes for vanish and keeps unrelated events" do
|
||||||
|
author = String.duplicate("e", 64)
|
||||||
|
other = String.duplicate("f", 64)
|
||||||
|
|
||||||
|
own_event =
|
||||||
|
%{
|
||||||
|
"id" => String.duplicate("1", 64),
|
||||||
|
"pubkey" => author,
|
||||||
|
"created_at" => 1_700_000_300,
|
||||||
|
"kind" => 1,
|
||||||
|
"tags" => [],
|
||||||
|
"content" => "own-event"
|
||||||
|
}
|
||||||
|
|
||||||
|
giftwrap_event =
|
||||||
|
%{
|
||||||
|
"id" => String.duplicate("2", 64),
|
||||||
|
"pubkey" => other,
|
||||||
|
"created_at" => 1_700_000_301,
|
||||||
|
"kind" => 1059,
|
||||||
|
"tags" => [["p", author]],
|
||||||
|
"content" => "giftwrap"
|
||||||
|
}
|
||||||
|
|
||||||
|
other_event =
|
||||||
|
%{
|
||||||
|
"id" => String.duplicate("3", 64),
|
||||||
|
"pubkey" => other,
|
||||||
|
"created_at" => 1_700_000_302,
|
||||||
|
"kind" => 1,
|
||||||
|
"tags" => [],
|
||||||
|
"content" => "other"
|
||||||
|
}
|
||||||
|
|
||||||
|
assert {:ok, _event} = Events.put_event(%{}, own_event)
|
||||||
|
assert {:ok, _event} = Events.put_event(%{}, giftwrap_event)
|
||||||
|
assert {:ok, _event} = Events.put_event(%{}, other_event)
|
||||||
|
|
||||||
|
assert {:ok, 2} =
|
||||||
|
Events.vanish(%{}, %{"pubkey" => author, "created_at" => 1_700_000_400})
|
||||||
|
|
||||||
|
assert {:ok, nil} = Events.get_event(%{}, own_event["id"])
|
||||||
|
assert {:ok, nil} = Events.get_event(%{}, giftwrap_event["id"])
|
||||||
|
assert {:ok, remaining} = Events.get_event(%{}, other_event["id"])
|
||||||
|
assert remaining["id"] == other_event["id"]
|
||||||
|
end
|
||||||
|
|
||||||
|
test "memory admin keeps a bounded newest-first audit log" do
|
||||||
|
Enum.each(1..1_005, fn index ->
|
||||||
|
assert :ok =
|
||||||
|
Admin.append_audit_log(%{}, %{
|
||||||
|
method: if(rem(index, 2) == 0, do: "stats", else: "ping"),
|
||||||
|
actor_pubkey: if(rem(index, 3) == 0, do: "actor-a", else: "actor-b"),
|
||||||
|
params: %{index: index}
|
||||||
|
})
|
||||||
|
end)
|
||||||
|
|
||||||
|
assert {:ok, latest_logs} = Admin.list_audit_logs(%{}, limit: 3)
|
||||||
|
assert length(latest_logs) == 3
|
||||||
|
assert Enum.map(latest_logs, & &1.params.index) == [1005, 1004, 1003]
|
||||||
|
|
||||||
|
assert {:ok, actor_logs} = Admin.list_audit_logs(%{}, actor_pubkey: "actor-a", limit: 2)
|
||||||
|
assert Enum.all?(actor_logs, &(&1.actor_pubkey == "actor-a"))
|
||||||
|
|
||||||
|
assert {:ok, stats_logs} = Admin.list_audit_logs(%{}, method: :stats, limit: 2)
|
||||||
|
assert Enum.all?(stats_logs, &(&1.method == "stats"))
|
||||||
|
|
||||||
|
assert {:ok, retained_logs} = Admin.list_audit_logs(%{}, limit: 2_000)
|
||||||
|
assert length(retained_logs) == 1_000
|
||||||
|
assert Enum.any?(retained_logs, &(&1.params.index == 1005))
|
||||||
|
refute Enum.any?(retained_logs, &(&1.params.index == 1))
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|||||||
Reference in New Issue
Block a user