Fix medium findings: deletion coords, count SQL, cache startup

This commit is contained in:
2026-03-14 04:15:37 +01:00
parent c7a9f152f9
commit 18e429e05a
12 changed files with 522 additions and 140 deletions

View File

@@ -64,21 +64,49 @@ defmodule Parrhesia.Storage.Adapters.Memory.Events do
@impl true @impl true
def delete_by_request(_context, event) do def delete_by_request(_context, event) do
delete_ids = deleter_pubkey = Map.get(event, "pubkey")
delete_event_ids =
event event
|> Map.get("tags", []) |> Map.get("tags", [])
|> Enum.flat_map(fn |> Enum.flat_map(fn
["e", event_id | _rest] -> [event_id] ["e", event_id | _rest] when is_binary(event_id) -> [event_id]
_tag -> [] _tag -> []
end) end)
delete_coordinates =
event
|> Map.get("tags", [])
|> Enum.flat_map(fn
["a", coordinate | _rest] when is_binary(coordinate) ->
case parse_delete_coordinate(coordinate) do
{:ok, parsed_coordinate} -> [parsed_coordinate]
{:error, _reason} -> []
end
_tag ->
[]
end)
coordinate_delete_ids =
Store.get(fn state ->
state.events
|> Map.values()
|> Enum.filter(fn candidate ->
matches_delete_coordinate?(candidate, delete_coordinates, deleter_pubkey)
end)
|> Enum.map(& &1["id"])
end)
all_delete_ids = Enum.uniq(delete_event_ids ++ coordinate_delete_ids)
Store.update(fn state -> Store.update(fn state ->
Enum.reduce(delete_ids, state, fn event_id, acc -> Enum.reduce(all_delete_ids, state, fn event_id, acc ->
update_in(acc.deleted, &MapSet.put(&1, event_id)) update_in(acc.deleted, &MapSet.put(&1, event_id))
end) end)
end) end)
{:ok, length(delete_ids)} {:ok, length(all_delete_ids)}
end end
@impl true @impl true
@@ -105,6 +133,47 @@ defmodule Parrhesia.Storage.Adapters.Memory.Events do
@impl true @impl true
def purge_expired(_opts), do: {:ok, 0} def purge_expired(_opts), do: {:ok, 0}
defp parse_delete_coordinate(coordinate) do
case String.split(coordinate, ":", parts: 3) do
[kind_part, pubkey, d_tag] ->
case Integer.parse(kind_part) do
{kind, ""} when kind >= 0 -> {:ok, %{kind: kind, pubkey: pubkey, d_tag: d_tag}}
_other -> {:error, :invalid_coordinate}
end
_other ->
{:error, :invalid_coordinate}
end
end
defp matches_delete_coordinate?(candidate, delete_coordinates, deleter_pubkey) do
Enum.any?(delete_coordinates, fn coordinate ->
coordinate.pubkey == deleter_pubkey and
candidate["pubkey"] == deleter_pubkey and
candidate["kind"] == coordinate.kind and
coordinate_match_for_kind?(candidate, coordinate)
end)
end
defp coordinate_match_for_kind?(candidate, coordinate) do
if addressable_kind?(coordinate.kind) do
candidate_d_tag =
candidate
|> Map.get("tags", [])
|> Enum.find_value("", fn
["d", value | _rest] -> value
_tag -> nil
end)
candidate_d_tag == coordinate.d_tag
else
replaceable_kind?(coordinate.kind)
end
end
defp replaceable_kind?(kind), do: kind in [0, 3] or (kind >= 10_000 and kind < 20_000)
defp addressable_kind?(kind), do: kind >= 30_000 and kind < 40_000
defp giftwrap_visible_to_requester?(%{"kind" => 1059} = event, requester_pubkeys) do defp giftwrap_visible_to_requester?(%{"kind" => 1059} = event, requester_pubkeys) do
requester_pubkeys != [] and requester_pubkeys != [] and
event_targets_any_recipient?(event, requester_pubkeys) event_targets_any_recipient?(event, requester_pubkeys)

View File

@@ -114,13 +114,12 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Events do
total_count = total_count =
filters filters
|> Enum.flat_map(fn filter -> |> event_id_union_query_for_filters(now, opts)
filter |> subquery()
|> event_id_query_for_filter(now, opts) |> then(fn union_query ->
|> Repo.all() from(event in union_query, select: count(event.id, :distinct))
end) end)
|> MapSet.new() |> Repo.one()
|> MapSet.size()
{:ok, total_count} {:ok, total_count}
end end
@@ -131,18 +130,83 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Events do
@impl true @impl true
def delete_by_request(_context, event) do def delete_by_request(_context, event) do
with {:ok, deleter_pubkey} <- decode_hex(Map.get(event, "pubkey"), 32, :invalid_pubkey), with {:ok, deleter_pubkey} <- decode_hex(Map.get(event, "pubkey"), 32, :invalid_pubkey),
{:ok, delete_ids} <- extract_delete_event_ids(event) do {:ok, delete_targets} <- extract_delete_targets(event) do
deleted_at = System.system_time(:second)
deleted_by_id_count =
delete_targets
|> Map.get(:event_ids, [])
|> delete_events_by_ids(deleter_pubkey, deleted_at)
deleted_by_coordinate_count =
delete_targets
|> Map.get(:coordinates, [])
|> delete_events_by_coordinates(deleter_pubkey, deleted_at)
{:ok, deleted_by_id_count + deleted_by_coordinate_count}
end
end
defp delete_events_by_ids([], _deleter_pubkey, _deleted_at), do: 0
defp delete_events_by_ids(delete_ids, deleter_pubkey, deleted_at) do
query =
from(stored_event in "events",
where:
stored_event.id in ^delete_ids and
stored_event.pubkey == ^deleter_pubkey and
is_nil(stored_event.deleted_at)
)
{count, _result} = Repo.update_all(query, set: [deleted_at: deleted_at])
count
end
defp delete_events_by_coordinates([], _deleter_pubkey, _deleted_at), do: 0
defp delete_events_by_coordinates(coordinates, deleter_pubkey, deleted_at) do
relevant_coordinates =
Enum.filter(coordinates, fn coordinate ->
coordinate.pubkey == deleter_pubkey and
(replaceable_kind?(coordinate.kind) or addressable_kind?(coordinate.kind))
end)
if relevant_coordinates == [] do
0
else
dynamic_conditions =
Enum.reduce(relevant_coordinates, dynamic(false), fn coordinate, acc ->
coordinate_condition =
coordinate_delete_condition(coordinate, deleter_pubkey)
dynamic([stored_event], ^acc or ^coordinate_condition)
end)
query = query =
from(stored_event in "events", from(stored_event in "events",
where: where: is_nil(stored_event.deleted_at)
stored_event.id in ^delete_ids and
stored_event.pubkey == ^deleter_pubkey and
is_nil(stored_event.deleted_at)
) )
|> where(^dynamic_conditions)
deleted_at = System.system_time(:second)
{count, _result} = Repo.update_all(query, set: [deleted_at: deleted_at]) {count, _result} = Repo.update_all(query, set: [deleted_at: deleted_at])
{:ok, count} count
end
end
defp coordinate_delete_condition(coordinate, deleter_pubkey) do
if addressable_kind?(coordinate.kind) do
dynamic(
[stored_event],
stored_event.kind == ^coordinate.kind and
stored_event.pubkey == ^deleter_pubkey and
stored_event.d_tag == ^coordinate.d_tag
)
else
dynamic(
[stored_event],
stored_event.kind == ^coordinate.kind and
stored_event.pubkey == ^deleter_pubkey
)
end end
end end
@@ -598,6 +662,20 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Events do
|> maybe_restrict_giftwrap_access(filter, opts) |> maybe_restrict_giftwrap_access(filter, opts)
end end
defp event_id_union_query_for_filters([], now, _opts) do
from(event in "events",
where: event.created_at > ^now and event.created_at < ^now,
select: event.id
)
end
defp event_id_union_query_for_filters([first_filter | rest_filters], now, opts) do
Enum.reduce(rest_filters, event_id_query_for_filter(first_filter, now, opts), fn filter,
acc ->
union_all(acc, ^event_id_query_for_filter(filter, now, opts))
end)
end
defp maybe_filter_ids(query, nil), do: query defp maybe_filter_ids(query, nil), do: query
defp maybe_filter_ids(query, ids) do defp maybe_filter_ids(query, ids) do
@@ -826,23 +904,69 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Events do
end) end)
end end
defp extract_delete_event_ids(event) do defp extract_delete_targets(event) do
delete_ids = with {:ok, targets} <- parse_delete_targets(Map.get(event, "tags", [])) do
event event_ids = targets.event_ids |> Enum.uniq()
|> Map.get("tags", []) coordinates = targets.coordinates |> Enum.uniq()
|> Enum.reduce([], fn
["e", event_id | _rest], acc when is_binary(event_id) -> [event_id | acc]
_tag, acc -> acc
end)
|> Enum.uniq()
if delete_ids == [] do if event_ids == [] and coordinates == [] do
{:error, :no_delete_targets} {:error, :no_delete_targets}
else else
{:ok, Enum.map(delete_ids, &Base.decode16!(&1, case: :mixed))} {:ok, %{event_ids: event_ids, coordinates: coordinates}}
end
end
end
defp parse_delete_targets(tags) when is_list(tags) do
Enum.reduce_while(tags, {:ok, %{event_ids: [], coordinates: []}}, fn tag, {:ok, acc} ->
case parse_delete_target(tag) do
{:ok, {:event_id, event_id}} ->
{:cont, {:ok, %{acc | event_ids: [event_id | acc.event_ids]}}}
{:ok, {:coordinate, coordinate}} ->
{:cont, {:ok, %{acc | coordinates: [coordinate | acc.coordinates]}}}
:ignore ->
{:cont, {:ok, acc}}
{:error, _reason} = error ->
{:halt, error}
end
end)
end
defp parse_delete_targets(_tags), do: {:error, :invalid_delete_target}
defp parse_delete_target(["e", event_id | _rest]) when is_binary(event_id) do
case decode_hex(event_id, 32, :invalid_delete_target) do
{:ok, decoded_event_id} -> {:ok, {:event_id, decoded_event_id}}
{:error, _reason} -> {:error, :invalid_delete_target}
end
end
defp parse_delete_target(["a", coordinate | _rest]) when is_binary(coordinate) do
case parse_address_coordinate(coordinate) do
{:ok, parsed_coordinate} -> {:ok, {:coordinate, parsed_coordinate}}
{:error, _reason} -> {:error, :invalid_delete_target}
end
end
defp parse_delete_target(_tag), do: :ignore
defp parse_address_coordinate(coordinate) do
case String.split(coordinate, ":", parts: 3) do
[kind_part, pubkey_hex, d_tag] ->
with {kind, ""} <- Integer.parse(kind_part),
true <- kind >= 0,
{:ok, pubkey} <- decode_hex(pubkey_hex, 32, :invalid_delete_target) do
{:ok, %{kind: kind, pubkey: pubkey, d_tag: d_tag}}
else
_other -> {:error, :invalid_delete_target}
end
_other ->
{:error, :invalid_delete_target}
end end
rescue
ArgumentError -> {:error, :invalid_delete_target}
end end
defp extract_expiration(tags) do defp extract_expiration(tags) do

View File

@@ -147,17 +147,23 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Moderation do
end end
defp exists_in_scope?(scope, value) do defp exists_in_scope?(scope, value) do
{table, field} = cache_scope_source!(scope)
if moderation_cache_enabled?() do if moderation_cache_enabled?() do
ensure_cache_scope_loaded(scope) case cache_table_ref() do
:ets.member(cache_table_ref(), cache_member_key(scope, value)) :undefined ->
exists_in_table_db?(table, field, value)
cache_table ->
ensure_cache_scope_loaded(scope, cache_table)
:ets.member(cache_table, cache_member_key(scope, value))
end
else else
{table, field} = cache_scope_source!(scope)
exists_in_table_db?(table, field, value) exists_in_table_db?(table, field, value)
end end
end end
defp ensure_cache_scope_loaded(scope) do defp ensure_cache_scope_loaded(scope, table) do
table = cache_table_ref()
loaded_key = cache_loaded_key(scope) loaded_key = cache_loaded_key(scope)
if :ets.member(table, loaded_key) do if :ets.member(table, loaded_key) do
@@ -188,7 +194,10 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Moderation do
defp cache_put(scope, value) do defp cache_put(scope, value) do
if moderation_cache_enabled?() do if moderation_cache_enabled?() do
true = :ets.insert(cache_table_ref(), {cache_member_key(scope, value), true}) case cache_table_ref() do
:undefined -> :ok
cache_table -> true = :ets.insert(cache_table, {cache_member_key(scope, value), true})
end
end end
:ok :ok
@@ -196,7 +205,10 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Moderation do
defp cache_delete(scope, value) do defp cache_delete(scope, value) do
if moderation_cache_enabled?() do if moderation_cache_enabled?() do
true = :ets.delete(cache_table_ref(), cache_member_key(scope, value)) case cache_table_ref() do
:undefined -> :ok
cache_table -> true = :ets.delete(cache_table, cache_member_key(scope, value))
end
end end
:ok :ok
@@ -210,23 +222,8 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Moderation do
defp cache_table_ref do defp cache_table_ref do
case :ets.whereis(@cache_table) do case :ets.whereis(@cache_table) do
:undefined -> :undefined -> :undefined
try do _table_ref -> @cache_table
:ets.new(@cache_table, [
:named_table,
:set,
:public,
read_concurrency: true,
write_concurrency: true
])
rescue
ArgumentError -> @cache_table
end
@cache_table
_table_ref ->
@cache_table
end end
end end

View File

@@ -0,0 +1,28 @@
defmodule Parrhesia.Storage.Adapters.Postgres.ModerationCache do
@moduledoc """
ETS owner process for moderation cache tables.
"""
use GenServer
@cache_table :parrhesia_moderation_cache
@spec start_link(keyword()) :: GenServer.on_start()
def start_link(opts \\ []) do
GenServer.start_link(__MODULE__, :ok, opts)
end
@impl true
def init(:ok) do
_table =
:ets.new(@cache_table, [
:named_table,
:set,
:public,
read_concurrency: true,
write_concurrency: true
])
{:ok, %{}}
end
end

View File

@@ -24,11 +24,28 @@ defmodule Parrhesia.Storage.Archiver do
Repo.all(query) Repo.all(query)
end end
@identifier_pattern ~r/^[a-zA-Z_][a-zA-Z0-9_]*$/
@doc """ @doc """
Generates an archive SQL statement for the given partition. Generates an archive SQL statement for the given partition.
""" """
@spec archive_sql(String.t(), String.t()) :: String.t() @spec archive_sql(String.t(), String.t()) :: String.t()
def archive_sql(partition_name, archive_table_name) do def archive_sql(partition_name, archive_table_name) do
"INSERT INTO #{archive_table_name} SELECT * FROM #{partition_name};" quoted_archive_table_name = quote_identifier!(archive_table_name)
quoted_partition_name = quote_identifier!(partition_name)
"INSERT INTO #{quoted_archive_table_name} SELECT * FROM #{quoted_partition_name};"
end
defp quote_identifier!(identifier) when is_binary(identifier) do
if Regex.match?(@identifier_pattern, identifier) do
~s("#{identifier}")
else
raise ArgumentError, "invalid SQL identifier: #{inspect(identifier)}"
end
end
defp quote_identifier!(identifier) do
raise ArgumentError, "invalid SQL identifier: #{inspect(identifier)}"
end end
end end

View File

@@ -12,6 +12,8 @@ defmodule Parrhesia.Storage.Supervisor do
@impl true @impl true
def init(_init_arg) do def init(_init_arg) do
children = [ children = [
{Parrhesia.Storage.Adapters.Postgres.ModerationCache,
name: Parrhesia.Storage.Adapters.Postgres.ModerationCache},
Parrhesia.Repo Parrhesia.Repo
] ]

View File

@@ -11,6 +11,13 @@ defmodule Parrhesia.Subscriptions.Index do
alias Parrhesia.Protocol.Filter alias Parrhesia.Protocol.Filter
@wildcard_key :all @wildcard_key :all
@subscriptions_table_name :parrhesia_subscriptions_table
@kind_index_table_name :parrhesia_subscription_kind_index
@author_index_table_name :parrhesia_subscription_author_index
@tag_index_table_name :parrhesia_subscription_tag_index
@kind_wildcard_table_name :parrhesia_subscription_kind_wildcard_index
@author_wildcard_table_name :parrhesia_subscription_author_wildcard_index
@tag_wildcard_table_name :parrhesia_subscription_tag_wildcard_index
@type subscription_id :: String.t() @type subscription_id :: String.t()
@type owner :: pid() @type owner :: pid()
@@ -20,11 +27,12 @@ defmodule Parrhesia.Subscriptions.Index do
@spec start_link(keyword()) :: GenServer.on_start() @spec start_link(keyword()) :: GenServer.on_start()
def start_link(opts \\ []) do def start_link(opts \\ []) do
name = Keyword.get(opts, :name) name = Keyword.get(opts, :name)
init_arg = %{named_tables?: name == __MODULE__}
if is_nil(name) do if is_nil(name) do
GenServer.start_link(__MODULE__, :ok) GenServer.start_link(__MODULE__, init_arg)
else else
GenServer.start_link(__MODULE__, :ok, name: name) GenServer.start_link(__MODULE__, init_arg, name: name)
end end
end end
@@ -65,6 +73,13 @@ defmodule Parrhesia.Subscriptions.Index do
end end
@spec candidate_subscription_keys(GenServer.server(), map()) :: [subscription_key()] @spec candidate_subscription_keys(GenServer.server(), map()) :: [subscription_key()]
def candidate_subscription_keys(__MODULE__, event) do
case named_tables() do
{:ok, tables} -> candidate_subscription_keys_for_tables(tables, event)
:error -> GenServer.call(__MODULE__, {:candidate_subscription_keys, event})
end
end
def candidate_subscription_keys(server, event) do def candidate_subscription_keys(server, event) do
GenServer.call(server, {:candidate_subscription_keys, event}) GenServer.call(server, {:candidate_subscription_keys, event})
end end
@@ -76,20 +91,15 @@ defmodule Parrhesia.Subscriptions.Index do
end end
@impl true @impl true
def init(:ok) do def init(%{named_tables?: named_tables?}) do
tables = create_tables(named_tables?)
{:ok, {:ok,
%{ Map.merge(tables, %{
subscriptions_table: :ets.new(:subscriptions_table, [:set, :protected]),
kind_index_table: :ets.new(:subscription_kind_index, [:bag, :protected]),
author_index_table: :ets.new(:subscription_author_index, [:bag, :protected]),
tag_index_table: :ets.new(:subscription_tag_index, [:bag, :protected]),
kind_wildcard_table: :ets.new(:subscription_kind_wildcard_index, [:bag, :protected]),
author_wildcard_table: :ets.new(:subscription_author_wildcard_index, [:bag, :protected]),
tag_wildcard_table: :ets.new(:subscription_tag_wildcard_index, [:bag, :protected]),
owner_subscriptions: %{}, owner_subscriptions: %{},
owner_monitors: %{}, owner_monitors: %{},
monitor_owners: %{} monitor_owners: %{}
}} })}
end end
@impl true @impl true
@@ -128,14 +138,7 @@ defmodule Parrhesia.Subscriptions.Index do
end end
def handle_call({:candidate_subscription_keys, event}, _from, state) do def handle_call({:candidate_subscription_keys, event}, _from, state) do
candidates = {:reply, candidate_subscription_keys_for_tables(state, event), state}
state
|> kind_candidates(event)
|> MapSet.intersection(author_candidates(state, event))
|> MapSet.intersection(tag_candidates(state, event))
|> MapSet.to_list()
{:reply, candidates, state}
end end
def handle_call({:fetch_filters, owner_pid, subscription_id}, _from, state) do def handle_call({:fetch_filters, owner_pid, subscription_id}, _from, state) do
@@ -371,28 +374,110 @@ defmodule Parrhesia.Subscriptions.Index do
|> update_in([:owner_subscriptions], &Map.delete(&1, owner_pid)) |> update_in([:owner_subscriptions], &Map.delete(&1, owner_pid))
end end
defp kind_candidates(state, event) do defp create_tables(true) do
%{
subscriptions_table:
:ets.new(@subscriptions_table_name, [
:set,
:protected,
:named_table,
read_concurrency: true
]),
kind_index_table:
:ets.new(@kind_index_table_name, [:bag, :protected, :named_table, read_concurrency: true]),
author_index_table:
:ets.new(@author_index_table_name, [
:bag,
:protected,
:named_table,
read_concurrency: true
]),
tag_index_table:
:ets.new(@tag_index_table_name, [:bag, :protected, :named_table, read_concurrency: true]),
kind_wildcard_table:
:ets.new(@kind_wildcard_table_name, [
:bag,
:protected,
:named_table,
read_concurrency: true
]),
author_wildcard_table:
:ets.new(@author_wildcard_table_name, [
:bag,
:protected,
:named_table,
read_concurrency: true
]),
tag_wildcard_table:
:ets.new(@tag_wildcard_table_name, [
:bag,
:protected,
:named_table,
read_concurrency: true
])
}
end
defp create_tables(false) do
%{
subscriptions_table: :ets.new(:subscriptions_table, [:set, :protected]),
kind_index_table: :ets.new(:subscription_kind_index, [:bag, :protected]),
author_index_table: :ets.new(:subscription_author_index, [:bag, :protected]),
tag_index_table: :ets.new(:subscription_tag_index, [:bag, :protected]),
kind_wildcard_table: :ets.new(:subscription_kind_wildcard_index, [:bag, :protected]),
author_wildcard_table: :ets.new(:subscription_author_wildcard_index, [:bag, :protected]),
tag_wildcard_table: :ets.new(:subscription_tag_wildcard_index, [:bag, :protected])
}
end
defp named_tables do
tables = %{
subscriptions_table: :ets.whereis(@subscriptions_table_name),
kind_index_table: :ets.whereis(@kind_index_table_name),
author_index_table: :ets.whereis(@author_index_table_name),
tag_index_table: :ets.whereis(@tag_index_table_name),
kind_wildcard_table: :ets.whereis(@kind_wildcard_table_name),
author_wildcard_table: :ets.whereis(@author_wildcard_table_name),
tag_wildcard_table: :ets.whereis(@tag_wildcard_table_name)
}
if Enum.any?(tables, fn {_key, table_ref} -> table_ref == :undefined end) do
:error
else
{:ok, tables}
end
end
defp candidate_subscription_keys_for_tables(tables, event) do
tables
|> kind_candidates(event)
|> MapSet.intersection(author_candidates(tables, event))
|> MapSet.intersection(tag_candidates(tables, event))
|> MapSet.to_list()
end
defp kind_candidates(tables, event) do
event event
|> Map.get("kind") |> Map.get("kind")
|> index_candidates_for_value(state.kind_index_table, state.kind_wildcard_table) |> index_candidates_for_value(tables.kind_index_table, tables.kind_wildcard_table)
end end
defp author_candidates(state, event) do defp author_candidates(tables, event) do
event event
|> Map.get("pubkey") |> Map.get("pubkey")
|> index_candidates_for_value(state.author_index_table, state.author_wildcard_table) |> index_candidates_for_value(tables.author_index_table, tables.author_wildcard_table)
end end
defp tag_candidates(state, event) do defp tag_candidates(tables, event) do
tag_pairs = event_tag_pairs(Map.get(event, "tags")) tag_pairs = event_tag_pairs(Map.get(event, "tags"))
wildcard_candidates = lookup_candidates(state.tag_wildcard_table, @wildcard_key) wildcard_candidates = lookup_candidates(tables.tag_wildcard_table, @wildcard_key)
if MapSet.size(tag_pairs) == 0 do if MapSet.size(tag_pairs) == 0 do
wildcard_candidates wildcard_candidates
else else
matched_candidates = matched_candidates =
Enum.reduce(tag_pairs, MapSet.new(), fn {tag_name, value}, acc -> Enum.reduce(tag_pairs, MapSet.new(), fn {tag_name, value}, acc ->
MapSet.union(acc, lookup_candidates(state.tag_index_table, {tag_name, value})) MapSet.union(acc, lookup_candidates(tables.tag_index_table, {tag_name, value}))
end) end)
MapSet.union(matched_candidates, wildcard_candidates) MapSet.union(matched_candidates, wildcard_candidates)

View File

@@ -227,37 +227,58 @@ defmodule Parrhesia.Web.Connection do
case maybe_allow_event_ingest(state) do case maybe_allow_event_ingest(state) do
{:ok, next_state} -> {:ok, next_state} ->
with :ok <- validate_event_payload_size(event, next_state.max_event_bytes), result =
:ok <- Protocol.validate_event(event), with :ok <- validate_event_payload_size(event, next_state.max_event_bytes),
:ok <- EventPolicy.authorize_write(event, next_state.authenticated_pubkeys), :ok <- Protocol.validate_event(event),
:ok <- maybe_process_group_event(event), :ok <- EventPolicy.authorize_write(event, next_state.authenticated_pubkeys),
{:ok, _result, message} <- persist_event(event) do :ok <- maybe_process_group_event(event),
Telemetry.emit( {:ok, _result, message} <- persist_event(event) do
[:parrhesia, :ingest, :stop], {:ok, message}
%{duration: System.monotonic_time() - started_at}, end
telemetry_metadata_for_event(event)
)
send(self(), {@post_ack_ingest, event}) handle_event_ingest_result(result, next_state, event, event_id, started_at)
response = Protocol.encode_relay({:ok, event_id, true, message})
{:push, {:text, response}, next_state}
else
{:error, reason} ->
message = error_message_for_ingest_failure(reason)
response = Protocol.encode_relay({:ok, event_id, false, message})
if reason in [:auth_required, :protected_event_requires_auth] do
with_auth_challenge_frame(next_state, {:push, {:text, response}, next_state})
else
{:push, {:text, response}, next_state}
end
end
{:error, reason} -> {:error, reason} ->
message = error_message_for_ingest_failure(reason) ingest_error_response(state, event_id, reason)
response = Protocol.encode_relay({:ok, event_id, false, message}) end
{:push, {:text, response}, state} end
defp handle_event_ingest_result(
{:ok, message},
%__MODULE__{} = state,
event,
event_id,
started_at
) do
Telemetry.emit(
[:parrhesia, :ingest, :stop],
%{duration: System.monotonic_time() - started_at},
telemetry_metadata_for_event(event)
)
send(self(), {@post_ack_ingest, event})
response = Protocol.encode_relay({:ok, event_id, true, message})
{:push, {:text, response}, state}
end
defp handle_event_ingest_result(
{:error, reason},
%__MODULE__{} = state,
_event,
event_id,
_started_at
),
do: ingest_error_response(state, event_id, reason)
defp ingest_error_response(%__MODULE__{} = state, event_id, reason) do
message = error_message_for_ingest_failure(reason)
response = Protocol.encode_relay({:ok, event_id, false, message})
if reason in [:auth_required, :protected_event_requires_auth] do
with_auth_challenge_frame(state, {:push, {:text, response}, state})
else
{:push, {:text, response}, state}
end end
end end
@@ -468,28 +489,37 @@ defmodule Parrhesia.Web.Connection do
kind = Map.get(event, "kind") kind = Map.get(event, "kind")
cond do cond do
kind == 5 -> kind in [5, 62] -> persist_control_event(kind, event)
with {:ok, deleted_count} <- Storage.events().delete_by_request(%{}, event) do ephemeral_kind?(kind) -> persist_ephemeral_event()
{:ok, deleted_count, "ok: deletion request processed"} true -> persist_regular_event(event)
end end
end
kind == 62 -> defp persist_control_event(5, event) do
with {:ok, deleted_count} <- Storage.events().vanish(%{}, event) do with {:ok, deleted_count} <- Storage.events().delete_by_request(%{}, event) do
{:ok, deleted_count, "ok: vanish request processed"} {:ok, deleted_count, "ok: deletion request processed"}
end end
end
ephemeral_kind?(kind) and accept_ephemeral_events?() -> defp persist_control_event(62, event) do
{:ok, :ephemeral, "ok: ephemeral event accepted"} with {:ok, deleted_count} <- Storage.events().vanish(%{}, event) do
{:ok, deleted_count, "ok: vanish request processed"}
end
end
ephemeral_kind?(kind) -> defp persist_ephemeral_event do
{:error, :ephemeral_events_disabled} if accept_ephemeral_events?() do
{:ok, :ephemeral, "ok: ephemeral event accepted"}
else
{:error, :ephemeral_events_disabled}
end
end
true -> defp persist_regular_event(event) do
case Storage.events().put_event(%{}, event) do case Storage.events().put_event(%{}, event) do
{:ok, persisted_event} -> {:ok, persisted_event, "ok: event stored"} {:ok, persisted_event} -> {:ok, persisted_event, "ok: event stored"}
{:error, :duplicate_event} -> {:error, :duplicate_event} {:error, :duplicate_event} -> {:error, :duplicate_event}
{:error, reason} -> {:error, reason} {:error, reason} -> {:error, reason}
end
end end
end end
@@ -644,9 +674,8 @@ defmodule Parrhesia.Web.Connection do
end) end)
with :ok <- maybe_validate(challenge_tag?, :missing_challenge_tag), with :ok <- maybe_validate(challenge_tag?, :missing_challenge_tag),
:ok <- validate_auth_relay_tag(state, tags), :ok <- validate_auth_relay_tag(state, tags) do
:ok <- validate_auth_created_at_freshness(auth_event, state.auth_max_age_seconds) do validate_auth_created_at_freshness(auth_event, state.auth_max_age_seconds)
:ok
end end
end end

View File

@@ -43,7 +43,7 @@ defmodule Parrhesia.FaultInjectionGroupFlowTest do
payload = JSON.encode!(["EVENT", group_event]) payload = JSON.encode!(["EVENT", group_event])
assert {:push, {:text, error_response}, ^state} = assert {:push, {:text, error_response}, _next_state} =
Connection.handle_in({payload, [opcode: :text]}, state) Connection.handle_in({payload, [opcode: :text]}, state)
assert JSON.decode!(error_response) == ["OK", group_event["id"], false, "error: :db_down"] assert JSON.decode!(error_response) == ["OK", group_event["id"], false, "error: :db_down"]
@@ -54,7 +54,7 @@ defmodule Parrhesia.FaultInjectionGroupFlowTest do
previous_storage |> Keyword.put(:moderation, PermissiveModeration) previous_storage |> Keyword.put(:moderation, PermissiveModeration)
) )
assert {:push, {:text, ok_response}, ^state} = assert {:push, {:text, ok_response}, _next_state} =
Connection.handle_in({payload, [opcode: :text]}, state) Connection.handle_in({payload, [opcode: :text]}, state)
assert JSON.decode!(ok_response) == ["OK", group_event["id"], true, "ok: event stored"] assert JSON.decode!(ok_response) == ["OK", group_event["id"], true, "ok: event stored"]
@@ -87,7 +87,7 @@ defmodule Parrhesia.FaultInjectionGroupFlowTest do
"content" => Base.encode64("newer") "content" => Base.encode64("newer")
}) })
assert {:push, {:text, outage_response}, ^state} = assert {:push, {:text, outage_response}, _next_state} =
Connection.handle_in( Connection.handle_in(
{JSON.encode!(["EVENT", older_event]), [opcode: :text]}, {JSON.encode!(["EVENT", older_event]), [opcode: :text]},
state state
@@ -101,7 +101,7 @@ defmodule Parrhesia.FaultInjectionGroupFlowTest do
previous_storage |> Keyword.put(:moderation, PermissiveModeration) previous_storage |> Keyword.put(:moderation, PermissiveModeration)
) )
assert {:push, {:text, newer_response}, ^state} = assert {:push, {:text, newer_response}, _next_state} =
Connection.handle_in( Connection.handle_in(
{JSON.encode!(["EVENT", newer_event]), [opcode: :text]}, {JSON.encode!(["EVENT", newer_event]), [opcode: :text]},
state state
@@ -109,7 +109,7 @@ defmodule Parrhesia.FaultInjectionGroupFlowTest do
assert JSON.decode!(newer_response) == ["OK", newer_event["id"], true, "ok: event stored"] assert JSON.decode!(newer_response) == ["OK", newer_event["id"], true, "ok: event stored"]
assert {:push, {:text, older_response}, ^state} = assert {:push, {:text, older_response}, _next_state} =
Connection.handle_in( Connection.handle_in(
{JSON.encode!(["EVENT", older_event]), [opcode: :text]}, {JSON.encode!(["EVENT", older_event]), [opcode: :text]},
state state

View File

@@ -29,7 +29,7 @@ defmodule Parrhesia.FaultInjectionTest do
{:ok, state} = Connection.init(subscription_index: nil) {:ok, state} = Connection.init(subscription_index: nil)
event = valid_event() event = valid_event()
assert {:push, {:text, response}, ^state} = assert {:push, {:text, response}, _next_state} =
Connection.handle_in({JSON.encode!(["EVENT", event]), [opcode: :text]}, state) Connection.handle_in({JSON.encode!(["EVENT", event]), [opcode: :text]}, state)
assert JSON.decode!(response) == ["OK", event["id"], false, "error: :db_down"] assert JSON.decode!(response) == ["OK", event["id"], false, "error: :db_down"]

View File

@@ -26,6 +26,31 @@ defmodule Parrhesia.Storage.Adapters.Postgres.EventsLifecycleTest do
assert {:ok, nil} = Events.get_event(%{}, target["id"]) assert {:ok, nil} = Events.get_event(%{}, target["id"])
end end
test "delete_by_request tombstones addressable targets referenced via a tags" do
author = String.duplicate("4", 64)
target =
event(%{
"pubkey" => author,
"kind" => 30_023,
"tags" => [["d", "topic"]],
"content" => "addressable-target"
})
assert {:ok, _event} = Events.put_event(%{}, target)
delete_request =
event(%{
"pubkey" => author,
"kind" => 5,
"tags" => [["a", "30023:#{author}:topic"]],
"content" => "delete-addressable"
})
assert {:ok, 1} = Events.delete_by_request(%{}, delete_request)
assert {:ok, nil} = Events.get_event(%{}, target["id"])
end
test "vanish hard-deletes events authored by pubkey" do test "vanish hard-deletes events authored by pubkey" do
author = String.duplicate("3", 64) author = String.duplicate("3", 64)

View File

@@ -17,6 +17,12 @@ defmodule Parrhesia.Storage.ArchiverTest do
test "archive_sql builds insert-select statement" do test "archive_sql builds insert-select statement" do
assert Archiver.archive_sql("events_2026_03", "events_archive") == assert Archiver.archive_sql("events_2026_03", "events_archive") ==
"INSERT INTO events_archive SELECT * FROM events_2026_03;" ~s(INSERT INTO "events_archive" SELECT * FROM "events_2026_03";)
end
test "archive_sql rejects invalid SQL identifiers" do
assert_raise ArgumentError, fn ->
Archiver.archive_sql("events_default; DROP TABLE events", "events_archive")
end
end end
end end