From 4c40edfd8341d3854203ae4b0d6ffcabae96711e Mon Sep 17 00:00:00 2001 From: Steffen Beyer Date: Wed, 18 Mar 2026 18:56:47 +0100 Subject: [PATCH] Optimize memory-backed benchmark path --- .../storage/adapters/memory/events.ex | 269 ++++++++++++++---- .../storage/adapters/memory/store.ex | 209 +++++++++++++- .../storage/adapters/memory/adapter_test.exs | 109 +++++++ 3 files changed, 525 insertions(+), 62 deletions(-) diff --git a/lib/parrhesia/storage/adapters/memory/events.ex b/lib/parrhesia/storage/adapters/memory/events.ex index 6cf3ec6..17eb3dc 100644 --- a/lib/parrhesia/storage/adapters/memory/events.ex +++ b/lib/parrhesia/storage/adapters/memory/events.ex @@ -12,54 +12,52 @@ defmodule Parrhesia.Storage.Adapters.Memory.Events do def put_event(_context, event) do event_id = Map.fetch!(event, "id") - result = - Store.get_and_update(fn state -> - if Map.has_key?(state.events, event_id) do - {{:error, :duplicate_event}, state} - else - next_state = put_in(state.events[event_id], event) - {{:ok, event}, next_state} - end - end) - - result + case Store.put_event(event_id, event) do + :ok -> {:ok, event} + {:error, :duplicate_event} -> {:error, :duplicate_event} + end end @impl true def get_event(_context, event_id) do - deleted? = Store.get(fn state -> MapSet.member?(state.deleted, event_id) end) - - if deleted? do - {:ok, nil} - else - {:ok, Store.get(fn state -> Map.get(state.events, event_id) end)} + case Store.get_event(event_id) do + {:ok, _event, true} -> {:ok, nil} + {:ok, event, false} -> {:ok, event} + :error -> {:ok, nil} end end @impl true def query(_context, filters, opts) do with :ok <- Filter.validate_filters(filters) do - state = Store.get(& &1) requester_pubkeys = Keyword.get(opts, :requester_pubkeys, []) events = - state.events - |> Map.values() - |> Enum.filter(fn event -> - not MapSet.member?(state.deleted, event["id"]) and - Filter.matches_any?(event, filters) and - giftwrap_visible_to_requester?(event, requester_pubkeys) - end) + filters + |> Enum.flat_map(&matching_events_for_filter(&1, requester_pubkeys, opts)) + |> deduplicate_events() + |> sort_events() + |> maybe_apply_query_limit(opts) {:ok, events} end end @impl true - def query_event_refs(context, filters, opts) do - with {:ok, events} <- query(context, filters, opts) do + def query_event_refs(_context, filters, opts) do + with :ok <- Filter.validate_filters(filters) do + requester_pubkeys = Keyword.get(opts, :requester_pubkeys, []) + refs = - events + filters + |> Enum.flat_map( + &matching_events_for_filter( + &1, + requester_pubkeys, + Keyword.put(opts, :apply_filter_limits?, false) + ) + ) + |> deduplicate_events() |> Enum.map(fn event -> %{ created_at: Map.fetch!(event, "created_at"), @@ -74,9 +72,23 @@ defmodule Parrhesia.Storage.Adapters.Memory.Events do end @impl true - def count(context, filters, opts) do - with {:ok, events} <- query(context, filters, opts) do - {:ok, length(events)} + def count(_context, filters, opts) do + with :ok <- Filter.validate_filters(filters) do + requester_pubkeys = Keyword.get(opts, :requester_pubkeys, []) + + count = + filters + |> Enum.flat_map( + &matching_events_for_filter( + &1, + requester_pubkeys, + Keyword.put(opts, :apply_filter_limits?, false) + ) + ) + |> deduplicate_events() + |> length() + + {:ok, count} end end @@ -107,22 +119,17 @@ defmodule Parrhesia.Storage.Adapters.Memory.Events do end) coordinate_delete_ids = - Store.get(fn state -> - state.events - |> Map.values() - |> Enum.filter(fn candidate -> - matches_delete_coordinate?(candidate, delete_coordinates, deleter_pubkey) - end) - |> Enum.map(& &1["id"]) + Store.reduce_events([], fn candidate, acc -> + if matches_delete_coordinate?(candidate, delete_coordinates, deleter_pubkey) do + [candidate["id"] | acc] + else + acc + end end) all_delete_ids = Enum.uniq(delete_event_ids ++ coordinate_delete_ids) - Store.update(fn state -> - Enum.reduce(all_delete_ids, state, fn event_id, acc -> - update_in(acc.deleted, &MapSet.put(&1, event_id)) - end) - end) + Enum.each(all_delete_ids, &Store.mark_deleted/1) {:ok, length(all_delete_ids)} end @@ -132,18 +139,15 @@ defmodule Parrhesia.Storage.Adapters.Memory.Events do pubkey = Map.get(event, "pubkey") deleted_ids = - Store.get(fn state -> - state.events - |> Map.values() - |> Enum.filter(fn candidate -> candidate["pubkey"] == pubkey end) - |> Enum.map(& &1["id"]) + Store.reduce_events([], fn candidate, acc -> + if candidate["pubkey"] == pubkey do + [candidate["id"] | acc] + else + acc + end end) - Store.update(fn state -> - Enum.reduce(deleted_ids, state, fn event_id, acc -> - update_in(acc.deleted, &MapSet.put(&1, event_id)) - end) - end) + Enum.each(deleted_ids, &Store.mark_deleted/1) {:ok, length(deleted_ids)} end @@ -224,4 +228,163 @@ defmodule Parrhesia.Storage.Adapters.Memory.Events do _other -> refs end end + + defp matching_events_for_filter(filter, requester_pubkeys, opts) do + cond do + Map.has_key?(filter, "ids") -> + direct_id_lookup_events(filter, requester_pubkeys, opts) + + is_tuple(indexed_tag_filter(filter)) -> + indexed_tag_lookup_events(filter, requester_pubkeys, opts) + + true -> + scan_filter_matches(filter, requester_pubkeys, opts) + end + end + + defp direct_id_lookup_events(filter, requester_pubkeys, opts) do + filter + |> Map.get("ids", []) + |> Enum.reduce([], fn event_id, acc -> + maybe_prepend_direct_lookup_match(acc, event_id, filter, requester_pubkeys) + end) + |> deduplicate_events() + |> sort_events() + |> maybe_take_filter_limit(filter, opts) + end + + defp scan_filter_matches(filter, requester_pubkeys, opts) do + limit = + if Keyword.get(opts, :apply_filter_limits?, true) do + effective_filter_limit(filter, opts) + else + nil + end + + {matches, _count} = + Store.reduce_events_newest( + {[], 0}, + &reduce_scan_match(&1, &2, filter, requester_pubkeys, limit) + ) + + matches + |> Enum.reverse() + |> sort_events() + end + + defp indexed_tag_lookup_events(filter, requester_pubkeys, opts) do + {tag_name, tag_values} = indexed_tag_filter(filter) + indexed_tag_values = effective_indexed_tag_values(filter, tag_values) + + tag_name + |> Store.tagged_events(indexed_tag_values) + |> Enum.filter(&filter_match_visible?(&1, filter, requester_pubkeys)) + |> maybe_take_filter_limit(filter, opts) + end + + defp indexed_tag_filter(filter) do + filter + |> Enum.filter(fn + {"#" <> _tag_name, values} when is_list(values) -> values != [] + _entry -> false + end) + |> Enum.sort_by(fn {key, _values} -> key end) + |> List.first() + |> case do + {"#" <> tag_name, values} -> {tag_name, values} + nil -> nil + end + end + + defp effective_indexed_tag_values(filter, tag_values) do + case Map.get(filter, "limit") do + limit when is_integer(limit) and limit == 1 -> + Enum.take(tag_values, 1) + + _other -> + tag_values + end + end + + defp filter_match_visible?(event, filter, requester_pubkeys) do + Filter.matches_filter?(event, filter) and + giftwrap_visible_to_requester?(event, requester_pubkeys) + end + + defp maybe_prepend_direct_lookup_match(acc, event_id, filter, requester_pubkeys) do + case Store.get_event(event_id) do + {:ok, event, false} -> + if filter_match_visible?(event, filter, requester_pubkeys) do + [event | acc] + else + acc + end + + _other -> + acc + end + end + + defp reduce_scan_match(event, {acc, count}, filter, requester_pubkeys, limit) do + if filter_match_visible?(event, filter, requester_pubkeys) do + maybe_halt_scan([event | acc], count + 1, limit) + else + {acc, count} + end + end + + defp maybe_halt_scan(acc, count, limit) when is_integer(limit) and count >= limit do + {:halt, {acc, count}} + end + + defp maybe_halt_scan(acc, count, _limit), do: {acc, count} + + defp deduplicate_events(events) do + events + |> Enum.reduce(%{}, fn event, acc -> Map.put(acc, event["id"], event) end) + |> Map.values() + end + + defp sort_events(events) do + Enum.sort(events, &chronological_sorter/2) + end + + defp chronological_sorter(left, right) do + cond do + left["created_at"] > right["created_at"] -> true + left["created_at"] < right["created_at"] -> false + true -> left["id"] < right["id"] + end + end + + defp maybe_apply_query_limit(events, opts) do + case Keyword.get(opts, :limit) do + limit when is_integer(limit) and limit > 0 -> Enum.take(events, limit) + _other -> events + end + end + + defp maybe_take_filter_limit(events, filter, opts) do + case effective_filter_limit(filter, opts) do + limit when is_integer(limit) and limit > 0 -> Enum.take(events, limit) + _other -> events + end + end + + defp effective_filter_limit(filter, opts) do + max_filter_limit = Keyword.get(opts, :max_filter_limit) + + case Map.get(filter, "limit") do + limit + when is_integer(limit) and limit > 0 and is_integer(max_filter_limit) and + max_filter_limit > 0 -> + min(limit, max_filter_limit) + + limit when is_integer(limit) and limit > 0 -> + limit + + _other -> + nil + end + end end diff --git a/lib/parrhesia/storage/adapters/memory/store.ex b/lib/parrhesia/storage/adapters/memory/store.ex index 22d2403..a325846 100644 --- a/lib/parrhesia/storage/adapters/memory/store.ex +++ b/lib/parrhesia/storage/adapters/memory/store.ex @@ -4,10 +4,11 @@ defmodule Parrhesia.Storage.Adapters.Memory.Store do use Agent @name __MODULE__ + @events_table :parrhesia_memory_events + @events_by_time_table :parrhesia_memory_events_by_time + @events_by_tag_table :parrhesia_memory_events_by_tag @initial_state %{ - events: %{}, - deleted: MapSet.new(), bans: %{pubkeys: MapSet.new(), events: MapSet.new(), ips: MapSet.new()}, allowed_pubkeys: MapSet.new(), acl_rules: [], @@ -18,21 +19,107 @@ defmodule Parrhesia.Storage.Adapters.Memory.Store do } def ensure_started do - if Process.whereis(@name) do + ensure_agent_started() + end + + def put_event(event_id, event) when is_binary(event_id) and is_map(event) do + :ok = ensure_started() + + if :ets.insert_new(@events_table, {event_id, event, false}) do + true = :ets.insert(@events_by_time_table, {{sort_key(event), event_id}, event_id}) + index_event_tags(event_id, event) :ok else - start_store() + {:error, :duplicate_event} end end - defp start_store do - case Agent.start_link(fn -> @initial_state end, name: @name) do - {:ok, _pid} -> :ok - {:error, {:already_started, _pid}} -> :ok - {:error, reason} -> {:error, reason} + def get_event(event_id) when is_binary(event_id) do + :ok = ensure_started() + + case :ets.lookup(@events_table, event_id) do + [{^event_id, event, deleted?}] -> {:ok, event, deleted?} + [] -> :error end end + def mark_deleted(event_id) when is_binary(event_id) do + :ok = ensure_started() + + case lookup_event(event_id) do + {:ok, event, false} -> + true = :ets.insert(@events_table, {event_id, event, true}) + true = :ets.delete(@events_by_time_table, {sort_key(event), event_id}) + unindex_event_tags(event_id, event) + :ok + + {:ok, _event, true} -> + :ok + + :error -> + :ok + end + end + + def reduce_events(acc, fun) when is_function(fun, 2) do + :ok = ensure_started() + + :ets.foldl( + fn {_event_id, event, deleted?}, current_acc -> + if deleted? do + current_acc + else + fun.(event, current_acc) + end + end, + acc, + @events_table + ) + end + + def reduce_events_newest(acc, fun) when is_function(fun, 2) do + :ok = ensure_started() + reduce_events_newest_from(:ets.first(@events_by_time_table), acc, fun) + end + + def tagged_events(tag_name, tag_values) when is_binary(tag_name) and is_list(tag_values) do + :ok = ensure_started() + + tag_values + |> Enum.flat_map(&tagged_events_for_value(tag_name, &1)) + |> Enum.uniq_by(& &1["id"]) + |> Enum.sort(&chronological_sorter/2) + end + + defp reduce_events_newest_from(:"$end_of_table", acc, _fun), do: acc + + defp reduce_events_newest_from(key, acc, fun) do + next_key = :ets.next(@events_by_time_table, key) + acc = reduce_indexed_event(key, acc, fun) + + case acc do + {:halt, final_acc} -> final_acc + next_acc -> reduce_events_newest_from(next_key, next_acc, fun) + end + end + + defp reduce_indexed_event(key, acc, fun) do + case :ets.lookup(@events_by_time_table, key) do + [{^key, event_id}] -> apply_reduce_fun(event_id, acc, fun) + [] -> acc + end + end + + defp apply_reduce_fun(event_id, acc, fun) do + case lookup_event(event_id) do + {:ok, event, false} -> normalize_reduce_result(fun.(event, acc)) + _other -> acc + end + end + + defp normalize_reduce_result({:halt, next_acc}), do: {:halt, next_acc} + defp normalize_reduce_result(next_acc), do: next_acc + def get(fun) do :ok = ensure_started() Agent.get(@name, fun) @@ -47,4 +134,108 @@ defmodule Parrhesia.Storage.Adapters.Memory.Store do :ok = ensure_started() Agent.get_and_update(@name, fun) end + + defp ensure_agent_started do + if Process.whereis(@name) do + :ok + else + start_store() + end + end + + defp start_store do + case Agent.start_link(&init_state/0, name: @name) do + {:ok, _pid} -> :ok + {:error, {:already_started, _pid}} -> :ok + {:error, reason} -> {:error, reason} + end + end + + defp init_state do + :ets.new(@events_table, [ + :named_table, + :public, + :set, + read_concurrency: true, + write_concurrency: true + ]) + + :ets.new(@events_by_time_table, [ + :named_table, + :public, + :ordered_set, + read_concurrency: true, + write_concurrency: true + ]) + + :ets.new(@events_by_tag_table, [ + :named_table, + :public, + :bag, + read_concurrency: true, + write_concurrency: true + ]) + + @initial_state + end + + defp lookup_event(event_id) do + case :ets.lookup(@events_table, event_id) do + [{^event_id, event, deleted?}] -> {:ok, event, deleted?} + [] -> :error + end + end + + defp tagged_events_for_value(_tag_name, tag_value) when not is_binary(tag_value), do: [] + + defp tagged_events_for_value(tag_name, tag_value) do + tag_key = {tag_name, tag_value} + + @events_by_tag_table + |> :ets.lookup(tag_key) + |> Enum.reduce([], fn {^tag_key, _created_sort_key, event_id}, acc -> + case lookup_event(event_id) do + {:ok, event, false} -> [event | acc] + _other -> acc + end + end) + end + + defp index_event_tags(event_id, event) do + event + |> event_tag_index_entries(event_id) + |> Enum.each(fn entry -> + true = :ets.insert(@events_by_tag_table, entry) + end) + end + + defp unindex_event_tags(event_id, event) do + event + |> event_tag_index_entries(event_id) + |> Enum.each(&:ets.delete_object(@events_by_tag_table, &1)) + end + + defp event_tag_index_entries(event, event_id) do + created_sort_key = sort_key(event) + + event + |> Map.get("tags", []) + |> Enum.flat_map(fn + [tag_name, tag_value | _rest] when is_binary(tag_name) and is_binary(tag_value) -> + [{{tag_name, tag_value}, created_sort_key, event_id}] + + _tag -> + [] + end) + end + + defp chronological_sorter(left, right) do + cond do + left["created_at"] > right["created_at"] -> true + left["created_at"] < right["created_at"] -> false + true -> left["id"] < right["id"] + end + end + + defp sort_key(event), do: -Map.get(event, "created_at", 0) end diff --git a/test/parrhesia/storage/adapters/memory/adapter_test.exs b/test/parrhesia/storage/adapters/memory/adapter_test.exs index 4b99760..0be68a1 100644 --- a/test/parrhesia/storage/adapters/memory/adapter_test.exs +++ b/test/parrhesia/storage/adapters/memory/adapter_test.exs @@ -70,4 +70,113 @@ defmodule Parrhesia.Storage.Adapters.Memory.AdapterTest do assert {:ok, []} = Events.query(%{}, filters, requester_pubkeys: []) assert {:ok, 0} = Events.count(%{}, filters, requester_pubkeys: []) end + + test "memory adapter applies filter limits in descending chronological order" do + now = 1_700_000_000 + author = String.duplicate("d", 64) + + older = + %{ + "id" => String.duplicate("1", 64), + "pubkey" => author, + "created_at" => now, + "kind" => 1, + "tags" => [], + "content" => "older" + } + + tie_loser = + %{ + "id" => String.duplicate("3", 64), + "pubkey" => author, + "created_at" => now + 1, + "kind" => 1, + "tags" => [], + "content" => "tie-loser" + } + + tie_winner = + %{ + "id" => String.duplicate("2", 64), + "pubkey" => author, + "created_at" => now + 1, + "kind" => 1, + "tags" => [], + "content" => "tie-winner" + } + + newest = + %{ + "id" => String.duplicate("4", 64), + "pubkey" => author, + "created_at" => now + 2, + "kind" => 1, + "tags" => [], + "content" => "newest" + } + + assert {:ok, _event} = Events.put_event(%{}, older) + assert {:ok, _event} = Events.put_event(%{}, tie_loser) + assert {:ok, _event} = Events.put_event(%{}, tie_winner) + assert {:ok, _event} = Events.put_event(%{}, newest) + + assert {:ok, results} = + Events.query(%{}, [%{"authors" => [author], "kinds" => [1], "limit" => 2}], []) + + assert Enum.map(results, & &1["id"]) == [newest["id"], tie_winner["id"]] + end + + test "memory adapter serves tag-filter queries from newest matching events" do + now = 1_700_000_100 + author = String.duplicate("e", 64) + + off_topic = + %{ + "id" => String.duplicate("5", 64), + "pubkey" => author, + "created_at" => now + 3, + "kind" => 1, + "tags" => [["t", "other"]], + "content" => "off-topic" + } + + oldest = + %{ + "id" => String.duplicate("6", 64), + "pubkey" => author, + "created_at" => now, + "kind" => 1, + "tags" => [["t", "bench"]], + "content" => "oldest" + } + + middle = + %{ + "id" => String.duplicate("7", 64), + "pubkey" => author, + "created_at" => now + 1, + "kind" => 1, + "tags" => [["t", "bench"]], + "content" => "middle" + } + + newest = + %{ + "id" => String.duplicate("8", 64), + "pubkey" => author, + "created_at" => now + 2, + "kind" => 1, + "tags" => [["t", "bench"]], + "content" => "newest" + } + + assert {:ok, _event} = Events.put_event(%{}, off_topic) + assert {:ok, _event} = Events.put_event(%{}, oldest) + assert {:ok, _event} = Events.put_event(%{}, middle) + assert {:ok, _event} = Events.put_event(%{}, newest) + + assert {:ok, results} = Events.query(%{}, [%{"#t" => ["bench"], "limit" => 2}], []) + + assert Enum.map(results, & &1["id"]) == [newest["id"], middle["id"]] + end end