From 18e429e05a295ffb63c9b2537edbf2e4b86beb78 Mon Sep 17 00:00:00 2001 From: Steffen Beyer Date: Sat, 14 Mar 2026 04:15:37 +0100 Subject: [PATCH] Fix medium findings: deletion coords, count SQL, cache startup --- .../storage/adapters/memory/events.ex | 77 +++++++- .../storage/adapters/postgres/events.ex | 180 +++++++++++++++--- .../storage/adapters/postgres/moderation.ex | 45 ++--- .../adapters/postgres/moderation_cache.ex | 28 +++ lib/parrhesia/storage/archiver.ex | 19 +- lib/parrhesia/storage/supervisor.ex | 2 + lib/parrhesia/subscriptions/index.ex | 139 +++++++++++--- lib/parrhesia/web/connection.ex | 127 +++++++----- .../fault_injection_group_flow_test.exs | 10 +- test/parrhesia/fault_injection_test.exs | 2 +- .../postgres/events_lifecycle_test.exs | 25 +++ test/parrhesia/storage/archiver_test.exs | 8 +- 12 files changed, 522 insertions(+), 140 deletions(-) create mode 100644 lib/parrhesia/storage/adapters/postgres/moderation_cache.ex diff --git a/lib/parrhesia/storage/adapters/memory/events.ex b/lib/parrhesia/storage/adapters/memory/events.ex index d54f12c..5e9870e 100644 --- a/lib/parrhesia/storage/adapters/memory/events.ex +++ b/lib/parrhesia/storage/adapters/memory/events.ex @@ -64,21 +64,49 @@ defmodule Parrhesia.Storage.Adapters.Memory.Events do @impl true def delete_by_request(_context, event) do - delete_ids = + deleter_pubkey = Map.get(event, "pubkey") + + delete_event_ids = event |> Map.get("tags", []) |> Enum.flat_map(fn - ["e", event_id | _rest] -> [event_id] + ["e", event_id | _rest] when is_binary(event_id) -> [event_id] _tag -> [] end) + delete_coordinates = + event + |> Map.get("tags", []) + |> Enum.flat_map(fn + ["a", coordinate | _rest] when is_binary(coordinate) -> + case parse_delete_coordinate(coordinate) do + {:ok, parsed_coordinate} -> [parsed_coordinate] + {:error, _reason} -> [] + end + + _tag -> + [] + end) + + coordinate_delete_ids = + Store.get(fn state -> + state.events + |> Map.values() + |> Enum.filter(fn candidate -> + matches_delete_coordinate?(candidate, delete_coordinates, deleter_pubkey) + end) + |> Enum.map(& &1["id"]) + end) + + all_delete_ids = Enum.uniq(delete_event_ids ++ coordinate_delete_ids) + Store.update(fn state -> - Enum.reduce(delete_ids, state, fn event_id, acc -> + Enum.reduce(all_delete_ids, state, fn event_id, acc -> update_in(acc.deleted, &MapSet.put(&1, event_id)) end) end) - {:ok, length(delete_ids)} + {:ok, length(all_delete_ids)} end @impl true @@ -105,6 +133,47 @@ defmodule Parrhesia.Storage.Adapters.Memory.Events do @impl true def purge_expired(_opts), do: {:ok, 0} + defp parse_delete_coordinate(coordinate) do + case String.split(coordinate, ":", parts: 3) do + [kind_part, pubkey, d_tag] -> + case Integer.parse(kind_part) do + {kind, ""} when kind >= 0 -> {:ok, %{kind: kind, pubkey: pubkey, d_tag: d_tag}} + _other -> {:error, :invalid_coordinate} + end + + _other -> + {:error, :invalid_coordinate} + end + end + + defp matches_delete_coordinate?(candidate, delete_coordinates, deleter_pubkey) do + Enum.any?(delete_coordinates, fn coordinate -> + coordinate.pubkey == deleter_pubkey and + candidate["pubkey"] == deleter_pubkey and + candidate["kind"] == coordinate.kind and + coordinate_match_for_kind?(candidate, coordinate) + end) + end + + defp coordinate_match_for_kind?(candidate, coordinate) do + if addressable_kind?(coordinate.kind) do + candidate_d_tag = + candidate + |> Map.get("tags", []) + |> Enum.find_value("", fn + ["d", value | _rest] -> value + _tag -> nil + end) + + candidate_d_tag == coordinate.d_tag + else + replaceable_kind?(coordinate.kind) + end + end + + defp replaceable_kind?(kind), do: kind in [0, 3] or (kind >= 10_000 and kind < 20_000) + defp addressable_kind?(kind), do: kind >= 30_000 and kind < 40_000 + defp giftwrap_visible_to_requester?(%{"kind" => 1059} = event, requester_pubkeys) do requester_pubkeys != [] and event_targets_any_recipient?(event, requester_pubkeys) diff --git a/lib/parrhesia/storage/adapters/postgres/events.ex b/lib/parrhesia/storage/adapters/postgres/events.ex index 582692d..ad1add8 100644 --- a/lib/parrhesia/storage/adapters/postgres/events.ex +++ b/lib/parrhesia/storage/adapters/postgres/events.ex @@ -114,13 +114,12 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Events do total_count = filters - |> Enum.flat_map(fn filter -> - filter - |> event_id_query_for_filter(now, opts) - |> Repo.all() + |> event_id_union_query_for_filters(now, opts) + |> subquery() + |> then(fn union_query -> + from(event in union_query, select: count(event.id, :distinct)) end) - |> MapSet.new() - |> MapSet.size() + |> Repo.one() {:ok, total_count} end @@ -131,18 +130,83 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Events do @impl true def delete_by_request(_context, event) do with {:ok, deleter_pubkey} <- decode_hex(Map.get(event, "pubkey"), 32, :invalid_pubkey), - {:ok, delete_ids} <- extract_delete_event_ids(event) do + {:ok, delete_targets} <- extract_delete_targets(event) do + deleted_at = System.system_time(:second) + + deleted_by_id_count = + delete_targets + |> Map.get(:event_ids, []) + |> delete_events_by_ids(deleter_pubkey, deleted_at) + + deleted_by_coordinate_count = + delete_targets + |> Map.get(:coordinates, []) + |> delete_events_by_coordinates(deleter_pubkey, deleted_at) + + {:ok, deleted_by_id_count + deleted_by_coordinate_count} + end + end + + defp delete_events_by_ids([], _deleter_pubkey, _deleted_at), do: 0 + + defp delete_events_by_ids(delete_ids, deleter_pubkey, deleted_at) do + query = + from(stored_event in "events", + where: + stored_event.id in ^delete_ids and + stored_event.pubkey == ^deleter_pubkey and + is_nil(stored_event.deleted_at) + ) + + {count, _result} = Repo.update_all(query, set: [deleted_at: deleted_at]) + count + end + + defp delete_events_by_coordinates([], _deleter_pubkey, _deleted_at), do: 0 + + defp delete_events_by_coordinates(coordinates, deleter_pubkey, deleted_at) do + relevant_coordinates = + Enum.filter(coordinates, fn coordinate -> + coordinate.pubkey == deleter_pubkey and + (replaceable_kind?(coordinate.kind) or addressable_kind?(coordinate.kind)) + end) + + if relevant_coordinates == [] do + 0 + else + dynamic_conditions = + Enum.reduce(relevant_coordinates, dynamic(false), fn coordinate, acc -> + coordinate_condition = + coordinate_delete_condition(coordinate, deleter_pubkey) + + dynamic([stored_event], ^acc or ^coordinate_condition) + end) + query = from(stored_event in "events", - where: - stored_event.id in ^delete_ids and - stored_event.pubkey == ^deleter_pubkey and - is_nil(stored_event.deleted_at) + where: is_nil(stored_event.deleted_at) ) + |> where(^dynamic_conditions) - deleted_at = System.system_time(:second) {count, _result} = Repo.update_all(query, set: [deleted_at: deleted_at]) - {:ok, count} + count + end + end + + defp coordinate_delete_condition(coordinate, deleter_pubkey) do + if addressable_kind?(coordinate.kind) do + dynamic( + [stored_event], + stored_event.kind == ^coordinate.kind and + stored_event.pubkey == ^deleter_pubkey and + stored_event.d_tag == ^coordinate.d_tag + ) + else + dynamic( + [stored_event], + stored_event.kind == ^coordinate.kind and + stored_event.pubkey == ^deleter_pubkey + ) end end @@ -598,6 +662,20 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Events do |> maybe_restrict_giftwrap_access(filter, opts) end + defp event_id_union_query_for_filters([], now, _opts) do + from(event in "events", + where: event.created_at > ^now and event.created_at < ^now, + select: event.id + ) + end + + defp event_id_union_query_for_filters([first_filter | rest_filters], now, opts) do + Enum.reduce(rest_filters, event_id_query_for_filter(first_filter, now, opts), fn filter, + acc -> + union_all(acc, ^event_id_query_for_filter(filter, now, opts)) + end) + end + defp maybe_filter_ids(query, nil), do: query defp maybe_filter_ids(query, ids) do @@ -826,23 +904,69 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Events do end) end - defp extract_delete_event_ids(event) do - delete_ids = - event - |> Map.get("tags", []) - |> Enum.reduce([], fn - ["e", event_id | _rest], acc when is_binary(event_id) -> [event_id | acc] - _tag, acc -> acc - end) - |> Enum.uniq() + defp extract_delete_targets(event) do + with {:ok, targets} <- parse_delete_targets(Map.get(event, "tags", [])) do + event_ids = targets.event_ids |> Enum.uniq() + coordinates = targets.coordinates |> Enum.uniq() - if delete_ids == [] do - {:error, :no_delete_targets} - else - {:ok, Enum.map(delete_ids, &Base.decode16!(&1, case: :mixed))} + if event_ids == [] and coordinates == [] do + {:error, :no_delete_targets} + else + {:ok, %{event_ids: event_ids, coordinates: coordinates}} + end + end + end + + defp parse_delete_targets(tags) when is_list(tags) do + Enum.reduce_while(tags, {:ok, %{event_ids: [], coordinates: []}}, fn tag, {:ok, acc} -> + case parse_delete_target(tag) do + {:ok, {:event_id, event_id}} -> + {:cont, {:ok, %{acc | event_ids: [event_id | acc.event_ids]}}} + + {:ok, {:coordinate, coordinate}} -> + {:cont, {:ok, %{acc | coordinates: [coordinate | acc.coordinates]}}} + + :ignore -> + {:cont, {:ok, acc}} + + {:error, _reason} = error -> + {:halt, error} + end + end) + end + + defp parse_delete_targets(_tags), do: {:error, :invalid_delete_target} + + defp parse_delete_target(["e", event_id | _rest]) when is_binary(event_id) do + case decode_hex(event_id, 32, :invalid_delete_target) do + {:ok, decoded_event_id} -> {:ok, {:event_id, decoded_event_id}} + {:error, _reason} -> {:error, :invalid_delete_target} + end + end + + defp parse_delete_target(["a", coordinate | _rest]) when is_binary(coordinate) do + case parse_address_coordinate(coordinate) do + {:ok, parsed_coordinate} -> {:ok, {:coordinate, parsed_coordinate}} + {:error, _reason} -> {:error, :invalid_delete_target} + end + end + + defp parse_delete_target(_tag), do: :ignore + + defp parse_address_coordinate(coordinate) do + case String.split(coordinate, ":", parts: 3) do + [kind_part, pubkey_hex, d_tag] -> + with {kind, ""} <- Integer.parse(kind_part), + true <- kind >= 0, + {:ok, pubkey} <- decode_hex(pubkey_hex, 32, :invalid_delete_target) do + {:ok, %{kind: kind, pubkey: pubkey, d_tag: d_tag}} + else + _other -> {:error, :invalid_delete_target} + end + + _other -> + {:error, :invalid_delete_target} end - rescue - ArgumentError -> {:error, :invalid_delete_target} end defp extract_expiration(tags) do diff --git a/lib/parrhesia/storage/adapters/postgres/moderation.ex b/lib/parrhesia/storage/adapters/postgres/moderation.ex index 22baf88..b24762f 100644 --- a/lib/parrhesia/storage/adapters/postgres/moderation.ex +++ b/lib/parrhesia/storage/adapters/postgres/moderation.ex @@ -147,17 +147,23 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Moderation do end defp exists_in_scope?(scope, value) do + {table, field} = cache_scope_source!(scope) + if moderation_cache_enabled?() do - ensure_cache_scope_loaded(scope) - :ets.member(cache_table_ref(), cache_member_key(scope, value)) + case cache_table_ref() do + :undefined -> + exists_in_table_db?(table, field, value) + + cache_table -> + ensure_cache_scope_loaded(scope, cache_table) + :ets.member(cache_table, cache_member_key(scope, value)) + end else - {table, field} = cache_scope_source!(scope) exists_in_table_db?(table, field, value) end end - defp ensure_cache_scope_loaded(scope) do - table = cache_table_ref() + defp ensure_cache_scope_loaded(scope, table) do loaded_key = cache_loaded_key(scope) if :ets.member(table, loaded_key) do @@ -188,7 +194,10 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Moderation do defp cache_put(scope, value) do if moderation_cache_enabled?() do - true = :ets.insert(cache_table_ref(), {cache_member_key(scope, value), true}) + case cache_table_ref() do + :undefined -> :ok + cache_table -> true = :ets.insert(cache_table, {cache_member_key(scope, value), true}) + end end :ok @@ -196,7 +205,10 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Moderation do defp cache_delete(scope, value) do if moderation_cache_enabled?() do - true = :ets.delete(cache_table_ref(), cache_member_key(scope, value)) + case cache_table_ref() do + :undefined -> :ok + cache_table -> true = :ets.delete(cache_table, cache_member_key(scope, value)) + end end :ok @@ -210,23 +222,8 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Moderation do defp cache_table_ref do case :ets.whereis(@cache_table) do - :undefined -> - try do - :ets.new(@cache_table, [ - :named_table, - :set, - :public, - read_concurrency: true, - write_concurrency: true - ]) - rescue - ArgumentError -> @cache_table - end - - @cache_table - - _table_ref -> - @cache_table + :undefined -> :undefined + _table_ref -> @cache_table end end diff --git a/lib/parrhesia/storage/adapters/postgres/moderation_cache.ex b/lib/parrhesia/storage/adapters/postgres/moderation_cache.ex new file mode 100644 index 0000000..cdebc9d --- /dev/null +++ b/lib/parrhesia/storage/adapters/postgres/moderation_cache.ex @@ -0,0 +1,28 @@ +defmodule Parrhesia.Storage.Adapters.Postgres.ModerationCache do + @moduledoc """ + ETS owner process for moderation cache tables. + """ + + use GenServer + + @cache_table :parrhesia_moderation_cache + + @spec start_link(keyword()) :: GenServer.on_start() + def start_link(opts \\ []) do + GenServer.start_link(__MODULE__, :ok, opts) + end + + @impl true + def init(:ok) do + _table = + :ets.new(@cache_table, [ + :named_table, + :set, + :public, + read_concurrency: true, + write_concurrency: true + ]) + + {:ok, %{}} + end +end diff --git a/lib/parrhesia/storage/archiver.ex b/lib/parrhesia/storage/archiver.ex index f3ec01e..f380f59 100644 --- a/lib/parrhesia/storage/archiver.ex +++ b/lib/parrhesia/storage/archiver.ex @@ -24,11 +24,28 @@ defmodule Parrhesia.Storage.Archiver do Repo.all(query) end + @identifier_pattern ~r/^[a-zA-Z_][a-zA-Z0-9_]*$/ + @doc """ Generates an archive SQL statement for the given partition. """ @spec archive_sql(String.t(), String.t()) :: String.t() def archive_sql(partition_name, archive_table_name) do - "INSERT INTO #{archive_table_name} SELECT * FROM #{partition_name};" + quoted_archive_table_name = quote_identifier!(archive_table_name) + quoted_partition_name = quote_identifier!(partition_name) + + "INSERT INTO #{quoted_archive_table_name} SELECT * FROM #{quoted_partition_name};" + end + + defp quote_identifier!(identifier) when is_binary(identifier) do + if Regex.match?(@identifier_pattern, identifier) do + ~s("#{identifier}") + else + raise ArgumentError, "invalid SQL identifier: #{inspect(identifier)}" + end + end + + defp quote_identifier!(identifier) do + raise ArgumentError, "invalid SQL identifier: #{inspect(identifier)}" end end diff --git a/lib/parrhesia/storage/supervisor.ex b/lib/parrhesia/storage/supervisor.ex index 957d147..a478ff1 100644 --- a/lib/parrhesia/storage/supervisor.ex +++ b/lib/parrhesia/storage/supervisor.ex @@ -12,6 +12,8 @@ defmodule Parrhesia.Storage.Supervisor do @impl true def init(_init_arg) do children = [ + {Parrhesia.Storage.Adapters.Postgres.ModerationCache, + name: Parrhesia.Storage.Adapters.Postgres.ModerationCache}, Parrhesia.Repo ] diff --git a/lib/parrhesia/subscriptions/index.ex b/lib/parrhesia/subscriptions/index.ex index ef3ad63..a063071 100644 --- a/lib/parrhesia/subscriptions/index.ex +++ b/lib/parrhesia/subscriptions/index.ex @@ -11,6 +11,13 @@ defmodule Parrhesia.Subscriptions.Index do alias Parrhesia.Protocol.Filter @wildcard_key :all + @subscriptions_table_name :parrhesia_subscriptions_table + @kind_index_table_name :parrhesia_subscription_kind_index + @author_index_table_name :parrhesia_subscription_author_index + @tag_index_table_name :parrhesia_subscription_tag_index + @kind_wildcard_table_name :parrhesia_subscription_kind_wildcard_index + @author_wildcard_table_name :parrhesia_subscription_author_wildcard_index + @tag_wildcard_table_name :parrhesia_subscription_tag_wildcard_index @type subscription_id :: String.t() @type owner :: pid() @@ -20,11 +27,12 @@ defmodule Parrhesia.Subscriptions.Index do @spec start_link(keyword()) :: GenServer.on_start() def start_link(opts \\ []) do name = Keyword.get(opts, :name) + init_arg = %{named_tables?: name == __MODULE__} if is_nil(name) do - GenServer.start_link(__MODULE__, :ok) + GenServer.start_link(__MODULE__, init_arg) else - GenServer.start_link(__MODULE__, :ok, name: name) + GenServer.start_link(__MODULE__, init_arg, name: name) end end @@ -65,6 +73,13 @@ defmodule Parrhesia.Subscriptions.Index do end @spec candidate_subscription_keys(GenServer.server(), map()) :: [subscription_key()] + def candidate_subscription_keys(__MODULE__, event) do + case named_tables() do + {:ok, tables} -> candidate_subscription_keys_for_tables(tables, event) + :error -> GenServer.call(__MODULE__, {:candidate_subscription_keys, event}) + end + end + def candidate_subscription_keys(server, event) do GenServer.call(server, {:candidate_subscription_keys, event}) end @@ -76,20 +91,15 @@ defmodule Parrhesia.Subscriptions.Index do end @impl true - def init(:ok) do + def init(%{named_tables?: named_tables?}) do + tables = create_tables(named_tables?) + {:ok, - %{ - subscriptions_table: :ets.new(:subscriptions_table, [:set, :protected]), - kind_index_table: :ets.new(:subscription_kind_index, [:bag, :protected]), - author_index_table: :ets.new(:subscription_author_index, [:bag, :protected]), - tag_index_table: :ets.new(:subscription_tag_index, [:bag, :protected]), - kind_wildcard_table: :ets.new(:subscription_kind_wildcard_index, [:bag, :protected]), - author_wildcard_table: :ets.new(:subscription_author_wildcard_index, [:bag, :protected]), - tag_wildcard_table: :ets.new(:subscription_tag_wildcard_index, [:bag, :protected]), + Map.merge(tables, %{ owner_subscriptions: %{}, owner_monitors: %{}, monitor_owners: %{} - }} + })} end @impl true @@ -128,14 +138,7 @@ defmodule Parrhesia.Subscriptions.Index do end def handle_call({:candidate_subscription_keys, event}, _from, state) do - candidates = - state - |> kind_candidates(event) - |> MapSet.intersection(author_candidates(state, event)) - |> MapSet.intersection(tag_candidates(state, event)) - |> MapSet.to_list() - - {:reply, candidates, state} + {:reply, candidate_subscription_keys_for_tables(state, event), state} end def handle_call({:fetch_filters, owner_pid, subscription_id}, _from, state) do @@ -371,28 +374,110 @@ defmodule Parrhesia.Subscriptions.Index do |> update_in([:owner_subscriptions], &Map.delete(&1, owner_pid)) end - defp kind_candidates(state, event) do + defp create_tables(true) do + %{ + subscriptions_table: + :ets.new(@subscriptions_table_name, [ + :set, + :protected, + :named_table, + read_concurrency: true + ]), + kind_index_table: + :ets.new(@kind_index_table_name, [:bag, :protected, :named_table, read_concurrency: true]), + author_index_table: + :ets.new(@author_index_table_name, [ + :bag, + :protected, + :named_table, + read_concurrency: true + ]), + tag_index_table: + :ets.new(@tag_index_table_name, [:bag, :protected, :named_table, read_concurrency: true]), + kind_wildcard_table: + :ets.new(@kind_wildcard_table_name, [ + :bag, + :protected, + :named_table, + read_concurrency: true + ]), + author_wildcard_table: + :ets.new(@author_wildcard_table_name, [ + :bag, + :protected, + :named_table, + read_concurrency: true + ]), + tag_wildcard_table: + :ets.new(@tag_wildcard_table_name, [ + :bag, + :protected, + :named_table, + read_concurrency: true + ]) + } + end + + defp create_tables(false) do + %{ + subscriptions_table: :ets.new(:subscriptions_table, [:set, :protected]), + kind_index_table: :ets.new(:subscription_kind_index, [:bag, :protected]), + author_index_table: :ets.new(:subscription_author_index, [:bag, :protected]), + tag_index_table: :ets.new(:subscription_tag_index, [:bag, :protected]), + kind_wildcard_table: :ets.new(:subscription_kind_wildcard_index, [:bag, :protected]), + author_wildcard_table: :ets.new(:subscription_author_wildcard_index, [:bag, :protected]), + tag_wildcard_table: :ets.new(:subscription_tag_wildcard_index, [:bag, :protected]) + } + end + + defp named_tables do + tables = %{ + subscriptions_table: :ets.whereis(@subscriptions_table_name), + kind_index_table: :ets.whereis(@kind_index_table_name), + author_index_table: :ets.whereis(@author_index_table_name), + tag_index_table: :ets.whereis(@tag_index_table_name), + kind_wildcard_table: :ets.whereis(@kind_wildcard_table_name), + author_wildcard_table: :ets.whereis(@author_wildcard_table_name), + tag_wildcard_table: :ets.whereis(@tag_wildcard_table_name) + } + + if Enum.any?(tables, fn {_key, table_ref} -> table_ref == :undefined end) do + :error + else + {:ok, tables} + end + end + + defp candidate_subscription_keys_for_tables(tables, event) do + tables + |> kind_candidates(event) + |> MapSet.intersection(author_candidates(tables, event)) + |> MapSet.intersection(tag_candidates(tables, event)) + |> MapSet.to_list() + end + + defp kind_candidates(tables, event) do event |> Map.get("kind") - |> index_candidates_for_value(state.kind_index_table, state.kind_wildcard_table) + |> index_candidates_for_value(tables.kind_index_table, tables.kind_wildcard_table) end - defp author_candidates(state, event) do + defp author_candidates(tables, event) do event |> Map.get("pubkey") - |> index_candidates_for_value(state.author_index_table, state.author_wildcard_table) + |> index_candidates_for_value(tables.author_index_table, tables.author_wildcard_table) end - defp tag_candidates(state, event) do + defp tag_candidates(tables, event) do tag_pairs = event_tag_pairs(Map.get(event, "tags")) - wildcard_candidates = lookup_candidates(state.tag_wildcard_table, @wildcard_key) + wildcard_candidates = lookup_candidates(tables.tag_wildcard_table, @wildcard_key) if MapSet.size(tag_pairs) == 0 do wildcard_candidates else matched_candidates = Enum.reduce(tag_pairs, MapSet.new(), fn {tag_name, value}, acc -> - MapSet.union(acc, lookup_candidates(state.tag_index_table, {tag_name, value})) + MapSet.union(acc, lookup_candidates(tables.tag_index_table, {tag_name, value})) end) MapSet.union(matched_candidates, wildcard_candidates) diff --git a/lib/parrhesia/web/connection.ex b/lib/parrhesia/web/connection.ex index 0311b49..38f5fb9 100644 --- a/lib/parrhesia/web/connection.ex +++ b/lib/parrhesia/web/connection.ex @@ -227,37 +227,58 @@ defmodule Parrhesia.Web.Connection do case maybe_allow_event_ingest(state) do {:ok, next_state} -> - with :ok <- validate_event_payload_size(event, next_state.max_event_bytes), - :ok <- Protocol.validate_event(event), - :ok <- EventPolicy.authorize_write(event, next_state.authenticated_pubkeys), - :ok <- maybe_process_group_event(event), - {:ok, _result, message} <- persist_event(event) do - Telemetry.emit( - [:parrhesia, :ingest, :stop], - %{duration: System.monotonic_time() - started_at}, - telemetry_metadata_for_event(event) - ) + result = + with :ok <- validate_event_payload_size(event, next_state.max_event_bytes), + :ok <- Protocol.validate_event(event), + :ok <- EventPolicy.authorize_write(event, next_state.authenticated_pubkeys), + :ok <- maybe_process_group_event(event), + {:ok, _result, message} <- persist_event(event) do + {:ok, message} + end - send(self(), {@post_ack_ingest, event}) - - response = Protocol.encode_relay({:ok, event_id, true, message}) - {:push, {:text, response}, next_state} - else - {:error, reason} -> - message = error_message_for_ingest_failure(reason) - response = Protocol.encode_relay({:ok, event_id, false, message}) - - if reason in [:auth_required, :protected_event_requires_auth] do - with_auth_challenge_frame(next_state, {:push, {:text, response}, next_state}) - else - {:push, {:text, response}, next_state} - end - end + handle_event_ingest_result(result, next_state, event, event_id, started_at) {:error, reason} -> - message = error_message_for_ingest_failure(reason) - response = Protocol.encode_relay({:ok, event_id, false, message}) - {:push, {:text, response}, state} + ingest_error_response(state, event_id, reason) + end + end + + defp handle_event_ingest_result( + {:ok, message}, + %__MODULE__{} = state, + event, + event_id, + started_at + ) do + Telemetry.emit( + [:parrhesia, :ingest, :stop], + %{duration: System.monotonic_time() - started_at}, + telemetry_metadata_for_event(event) + ) + + send(self(), {@post_ack_ingest, event}) + + response = Protocol.encode_relay({:ok, event_id, true, message}) + {:push, {:text, response}, state} + end + + defp handle_event_ingest_result( + {:error, reason}, + %__MODULE__{} = state, + _event, + event_id, + _started_at + ), + do: ingest_error_response(state, event_id, reason) + + defp ingest_error_response(%__MODULE__{} = state, event_id, reason) do + message = error_message_for_ingest_failure(reason) + response = Protocol.encode_relay({:ok, event_id, false, message}) + + if reason in [:auth_required, :protected_event_requires_auth] do + with_auth_challenge_frame(state, {:push, {:text, response}, state}) + else + {:push, {:text, response}, state} end end @@ -468,28 +489,37 @@ defmodule Parrhesia.Web.Connection do kind = Map.get(event, "kind") cond do - kind == 5 -> - with {:ok, deleted_count} <- Storage.events().delete_by_request(%{}, event) do - {:ok, deleted_count, "ok: deletion request processed"} - end + kind in [5, 62] -> persist_control_event(kind, event) + ephemeral_kind?(kind) -> persist_ephemeral_event() + true -> persist_regular_event(event) + end + end - kind == 62 -> - with {:ok, deleted_count} <- Storage.events().vanish(%{}, event) do - {:ok, deleted_count, "ok: vanish request processed"} - end + defp persist_control_event(5, event) do + with {:ok, deleted_count} <- Storage.events().delete_by_request(%{}, event) do + {:ok, deleted_count, "ok: deletion request processed"} + end + end - ephemeral_kind?(kind) and accept_ephemeral_events?() -> - {:ok, :ephemeral, "ok: ephemeral event accepted"} + defp persist_control_event(62, event) do + with {:ok, deleted_count} <- Storage.events().vanish(%{}, event) do + {:ok, deleted_count, "ok: vanish request processed"} + end + end - ephemeral_kind?(kind) -> - {:error, :ephemeral_events_disabled} + defp persist_ephemeral_event do + if accept_ephemeral_events?() do + {:ok, :ephemeral, "ok: ephemeral event accepted"} + else + {:error, :ephemeral_events_disabled} + end + end - true -> - case Storage.events().put_event(%{}, event) do - {:ok, persisted_event} -> {:ok, persisted_event, "ok: event stored"} - {:error, :duplicate_event} -> {:error, :duplicate_event} - {:error, reason} -> {:error, reason} - end + defp persist_regular_event(event) do + case Storage.events().put_event(%{}, event) do + {:ok, persisted_event} -> {:ok, persisted_event, "ok: event stored"} + {:error, :duplicate_event} -> {:error, :duplicate_event} + {:error, reason} -> {:error, reason} end end @@ -644,9 +674,8 @@ defmodule Parrhesia.Web.Connection do end) with :ok <- maybe_validate(challenge_tag?, :missing_challenge_tag), - :ok <- validate_auth_relay_tag(state, tags), - :ok <- validate_auth_created_at_freshness(auth_event, state.auth_max_age_seconds) do - :ok + :ok <- validate_auth_relay_tag(state, tags) do + validate_auth_created_at_freshness(auth_event, state.auth_max_age_seconds) end end diff --git a/test/parrhesia/fault_injection_group_flow_test.exs b/test/parrhesia/fault_injection_group_flow_test.exs index 07c3b2f..78d2b5a 100644 --- a/test/parrhesia/fault_injection_group_flow_test.exs +++ b/test/parrhesia/fault_injection_group_flow_test.exs @@ -43,7 +43,7 @@ defmodule Parrhesia.FaultInjectionGroupFlowTest do payload = JSON.encode!(["EVENT", group_event]) - assert {:push, {:text, error_response}, ^state} = + assert {:push, {:text, error_response}, _next_state} = Connection.handle_in({payload, [opcode: :text]}, state) assert JSON.decode!(error_response) == ["OK", group_event["id"], false, "error: :db_down"] @@ -54,7 +54,7 @@ defmodule Parrhesia.FaultInjectionGroupFlowTest do previous_storage |> Keyword.put(:moderation, PermissiveModeration) ) - assert {:push, {:text, ok_response}, ^state} = + assert {:push, {:text, ok_response}, _next_state} = Connection.handle_in({payload, [opcode: :text]}, state) assert JSON.decode!(ok_response) == ["OK", group_event["id"], true, "ok: event stored"] @@ -87,7 +87,7 @@ defmodule Parrhesia.FaultInjectionGroupFlowTest do "content" => Base.encode64("newer") }) - assert {:push, {:text, outage_response}, ^state} = + assert {:push, {:text, outage_response}, _next_state} = Connection.handle_in( {JSON.encode!(["EVENT", older_event]), [opcode: :text]}, state @@ -101,7 +101,7 @@ defmodule Parrhesia.FaultInjectionGroupFlowTest do previous_storage |> Keyword.put(:moderation, PermissiveModeration) ) - assert {:push, {:text, newer_response}, ^state} = + assert {:push, {:text, newer_response}, _next_state} = Connection.handle_in( {JSON.encode!(["EVENT", newer_event]), [opcode: :text]}, state @@ -109,7 +109,7 @@ defmodule Parrhesia.FaultInjectionGroupFlowTest do assert JSON.decode!(newer_response) == ["OK", newer_event["id"], true, "ok: event stored"] - assert {:push, {:text, older_response}, ^state} = + assert {:push, {:text, older_response}, _next_state} = Connection.handle_in( {JSON.encode!(["EVENT", older_event]), [opcode: :text]}, state diff --git a/test/parrhesia/fault_injection_test.exs b/test/parrhesia/fault_injection_test.exs index 0060966..d338186 100644 --- a/test/parrhesia/fault_injection_test.exs +++ b/test/parrhesia/fault_injection_test.exs @@ -29,7 +29,7 @@ defmodule Parrhesia.FaultInjectionTest do {:ok, state} = Connection.init(subscription_index: nil) event = valid_event() - assert {:push, {:text, response}, ^state} = + assert {:push, {:text, response}, _next_state} = Connection.handle_in({JSON.encode!(["EVENT", event]), [opcode: :text]}, state) assert JSON.decode!(response) == ["OK", event["id"], false, "error: :db_down"] diff --git a/test/parrhesia/storage/adapters/postgres/events_lifecycle_test.exs b/test/parrhesia/storage/adapters/postgres/events_lifecycle_test.exs index c570b10..2ebf640 100644 --- a/test/parrhesia/storage/adapters/postgres/events_lifecycle_test.exs +++ b/test/parrhesia/storage/adapters/postgres/events_lifecycle_test.exs @@ -26,6 +26,31 @@ defmodule Parrhesia.Storage.Adapters.Postgres.EventsLifecycleTest do assert {:ok, nil} = Events.get_event(%{}, target["id"]) end + test "delete_by_request tombstones addressable targets referenced via a tags" do + author = String.duplicate("4", 64) + + target = + event(%{ + "pubkey" => author, + "kind" => 30_023, + "tags" => [["d", "topic"]], + "content" => "addressable-target" + }) + + assert {:ok, _event} = Events.put_event(%{}, target) + + delete_request = + event(%{ + "pubkey" => author, + "kind" => 5, + "tags" => [["a", "30023:#{author}:topic"]], + "content" => "delete-addressable" + }) + + assert {:ok, 1} = Events.delete_by_request(%{}, delete_request) + assert {:ok, nil} = Events.get_event(%{}, target["id"]) + end + test "vanish hard-deletes events authored by pubkey" do author = String.duplicate("3", 64) diff --git a/test/parrhesia/storage/archiver_test.exs b/test/parrhesia/storage/archiver_test.exs index 96b03f1..1f625ca 100644 --- a/test/parrhesia/storage/archiver_test.exs +++ b/test/parrhesia/storage/archiver_test.exs @@ -17,6 +17,12 @@ defmodule Parrhesia.Storage.ArchiverTest do test "archive_sql builds insert-select statement" do assert Archiver.archive_sql("events_2026_03", "events_archive") == - "INSERT INTO events_archive SELECT * FROM events_2026_03;" + ~s(INSERT INTO "events_archive" SELECT * FROM "events_2026_03";) + end + + test "archive_sql rejects invalid SQL identifiers" do + assert_raise ArgumentError, fn -> + Archiver.archive_sql("events_default; DROP TABLE events", "events_archive") + end end end