Improve ingest throughput with moderation cache and post-ack fanout

This commit is contained in:
2026-03-14 02:33:37 +01:00
parent d348eab69e
commit 54a54c026b
7 changed files with 175 additions and 30 deletions

View File

@@ -3,6 +3,7 @@ import Config
config :postgrex, :json_library, JSON config :postgrex, :json_library, JSON
config :parrhesia, config :parrhesia,
moderation_cache_enabled: true,
limits: [ limits: [
max_frame_bytes: 1_048_576, max_frame_bytes: 1_048_576,
max_event_bytes: 262_144, max_event_bytes: 262_144,

View File

@@ -1,5 +1,8 @@
import Config import Config
config :parrhesia, Parrhesia.Repo, pool_size: 32 config :parrhesia, Parrhesia.Repo,
pool_size: 32,
queue_target: 1_000,
queue_interval: 5_000
# Production runtime configuration lives in config/runtime.exs. # Production runtime configuration lives in config/runtime.exs.

View File

@@ -5,10 +5,11 @@ if config_env() == :prod do
System.get_env("DATABASE_URL") || System.get_env("DATABASE_URL") ||
raise "environment variable DATABASE_URL is missing. Example: ecto://USER:PASS@HOST/DATABASE" raise "environment variable DATABASE_URL is missing. Example: ecto://USER:PASS@HOST/DATABASE"
default_pool_size = repo_defaults = Application.get_env(:parrhesia, Parrhesia.Repo, [])
:parrhesia
|> Application.get_env(Parrhesia.Repo, []) default_pool_size = Keyword.get(repo_defaults, :pool_size, 32)
|> Keyword.get(:pool_size, 32) default_queue_target = Keyword.get(repo_defaults, :queue_target, 1_000)
default_queue_interval = Keyword.get(repo_defaults, :queue_interval, 5_000)
pool_size = pool_size =
case System.get_env("POOL_SIZE") do case System.get_env("POOL_SIZE") do
@@ -16,9 +17,23 @@ if config_env() == :prod do
value -> String.to_integer(value) value -> String.to_integer(value)
end end
queue_target =
case System.get_env("DB_QUEUE_TARGET_MS") do
nil -> default_queue_target
value -> String.to_integer(value)
end
queue_interval =
case System.get_env("DB_QUEUE_INTERVAL_MS") do
nil -> default_queue_interval
value -> String.to_integer(value)
end
config :parrhesia, Parrhesia.Repo, config :parrhesia, Parrhesia.Repo,
url: database_url, url: database_url,
pool_size: pool_size pool_size: pool_size,
queue_target: queue_target,
queue_interval: queue_interval
config :parrhesia, Parrhesia.Web.Endpoint, config :parrhesia, Parrhesia.Web.Endpoint,
port: String.to_integer(System.get_env("PORT") || "4000") port: String.to_integer(System.get_env("PORT") || "4000")

View File

@@ -12,7 +12,9 @@ config :parrhesia, Parrhesia.Web.Endpoint,
port: test_endpoint_port, port: test_endpoint_port,
ip: {127, 0, 0, 1} ip: {127, 0, 0, 1}
config :parrhesia, enable_expiration_worker: false config :parrhesia,
enable_expiration_worker: false,
moderation_cache_enabled: false
pg_host = System.get_env("PGHOST") pg_host = System.get_env("PGHOST")

View File

@@ -9,87 +9,111 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Moderation do
@behaviour Parrhesia.Storage.Moderation @behaviour Parrhesia.Storage.Moderation
@cache_table :parrhesia_moderation_cache
@cache_scope_sources %{
banned_pubkeys: {"banned_pubkeys", :pubkey},
allowed_pubkeys: {"allowed_pubkeys", :pubkey},
banned_events: {"banned_events", :event_id},
blocked_ips: {"blocked_ips", :ip}
}
@impl true @impl true
def ban_pubkey(_context, pubkey) do def ban_pubkey(_context, pubkey) do
with {:ok, normalized_pubkey} <- normalize_hex_or_binary(pubkey, 32, :invalid_pubkey) do with {:ok, normalized_pubkey} <- normalize_hex_or_binary(pubkey, 32, :invalid_pubkey),
upsert_presence_table("banned_pubkeys", :pubkey, normalized_pubkey) :ok <- upsert_presence_table("banned_pubkeys", :pubkey, normalized_pubkey) do
cache_put(:banned_pubkeys, normalized_pubkey)
:ok
end end
end end
@impl true @impl true
def unban_pubkey(_context, pubkey) do def unban_pubkey(_context, pubkey) do
with {:ok, normalized_pubkey} <- normalize_hex_or_binary(pubkey, 32, :invalid_pubkey) do with {:ok, normalized_pubkey} <- normalize_hex_or_binary(pubkey, 32, :invalid_pubkey),
delete_from_table("banned_pubkeys", :pubkey, normalized_pubkey) :ok <- delete_from_table("banned_pubkeys", :pubkey, normalized_pubkey) do
cache_delete(:banned_pubkeys, normalized_pubkey)
:ok
end end
end end
@impl true @impl true
def pubkey_banned?(_context, pubkey) do def pubkey_banned?(_context, pubkey) do
with {:ok, normalized_pubkey} <- normalize_hex_or_binary(pubkey, 32, :invalid_pubkey) do with {:ok, normalized_pubkey} <- normalize_hex_or_binary(pubkey, 32, :invalid_pubkey) do
{:ok, exists_in_table?("banned_pubkeys", :pubkey, normalized_pubkey)} {:ok, exists_in_scope?(:banned_pubkeys, normalized_pubkey)}
end end
end end
@impl true @impl true
def allow_pubkey(_context, pubkey) do def allow_pubkey(_context, pubkey) do
with {:ok, normalized_pubkey} <- normalize_hex_or_binary(pubkey, 32, :invalid_pubkey) do with {:ok, normalized_pubkey} <- normalize_hex_or_binary(pubkey, 32, :invalid_pubkey),
upsert_presence_table("allowed_pubkeys", :pubkey, normalized_pubkey) :ok <- upsert_presence_table("allowed_pubkeys", :pubkey, normalized_pubkey) do
cache_put(:allowed_pubkeys, normalized_pubkey)
:ok
end end
end end
@impl true @impl true
def disallow_pubkey(_context, pubkey) do def disallow_pubkey(_context, pubkey) do
with {:ok, normalized_pubkey} <- normalize_hex_or_binary(pubkey, 32, :invalid_pubkey) do with {:ok, normalized_pubkey} <- normalize_hex_or_binary(pubkey, 32, :invalid_pubkey),
delete_from_table("allowed_pubkeys", :pubkey, normalized_pubkey) :ok <- delete_from_table("allowed_pubkeys", :pubkey, normalized_pubkey) do
cache_delete(:allowed_pubkeys, normalized_pubkey)
:ok
end end
end end
@impl true @impl true
def pubkey_allowed?(_context, pubkey) do def pubkey_allowed?(_context, pubkey) do
with {:ok, normalized_pubkey} <- normalize_hex_or_binary(pubkey, 32, :invalid_pubkey) do with {:ok, normalized_pubkey} <- normalize_hex_or_binary(pubkey, 32, :invalid_pubkey) do
{:ok, exists_in_table?("allowed_pubkeys", :pubkey, normalized_pubkey)} {:ok, exists_in_scope?(:allowed_pubkeys, normalized_pubkey)}
end end
end end
@impl true @impl true
def ban_event(_context, event_id) do def ban_event(_context, event_id) do
with {:ok, normalized_event_id} <- normalize_hex_or_binary(event_id, 32, :invalid_event_id) do with {:ok, normalized_event_id} <- normalize_hex_or_binary(event_id, 32, :invalid_event_id),
upsert_presence_table("banned_events", :event_id, normalized_event_id) :ok <- upsert_presence_table("banned_events", :event_id, normalized_event_id) do
cache_put(:banned_events, normalized_event_id)
:ok
end end
end end
@impl true @impl true
def unban_event(_context, event_id) do def unban_event(_context, event_id) do
with {:ok, normalized_event_id} <- normalize_hex_or_binary(event_id, 32, :invalid_event_id) do with {:ok, normalized_event_id} <- normalize_hex_or_binary(event_id, 32, :invalid_event_id),
delete_from_table("banned_events", :event_id, normalized_event_id) :ok <- delete_from_table("banned_events", :event_id, normalized_event_id) do
cache_delete(:banned_events, normalized_event_id)
:ok
end end
end end
@impl true @impl true
def event_banned?(_context, event_id) do def event_banned?(_context, event_id) do
with {:ok, normalized_event_id} <- normalize_hex_or_binary(event_id, 32, :invalid_event_id) do with {:ok, normalized_event_id} <- normalize_hex_or_binary(event_id, 32, :invalid_event_id) do
{:ok, exists_in_table?("banned_events", :event_id, normalized_event_id)} {:ok, exists_in_scope?(:banned_events, normalized_event_id)}
end end
end end
@impl true @impl true
def block_ip(_context, ip_address) do def block_ip(_context, ip_address) do
with {:ok, normalized_ip} <- normalize_ip(ip_address) do with {:ok, normalized_ip} <- normalize_ip(ip_address),
upsert_presence_table("blocked_ips", :ip, normalized_ip) :ok <- upsert_presence_table("blocked_ips", :ip, normalized_ip) do
cache_put(:blocked_ips, normalized_ip)
:ok
end end
end end
@impl true @impl true
def unblock_ip(_context, ip_address) do def unblock_ip(_context, ip_address) do
with {:ok, normalized_ip} <- normalize_ip(ip_address) do with {:ok, normalized_ip} <- normalize_ip(ip_address),
delete_from_table("blocked_ips", :ip, normalized_ip) :ok <- delete_from_table("blocked_ips", :ip, normalized_ip) do
cache_delete(:blocked_ips, normalized_ip)
:ok
end end
end end
@impl true @impl true
def ip_blocked?(_context, ip_address) do def ip_blocked?(_context, ip_address) do
with {:ok, normalized_ip} <- normalize_ip(ip_address) do with {:ok, normalized_ip} <- normalize_ip(ip_address) do
{:ok, exists_in_table?("blocked_ips", :ip, normalized_ip)} {:ok, exists_in_scope?(:blocked_ips, normalized_ip)}
end end
end end
@@ -122,7 +146,99 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Moderation do
:ok :ok
end end
defp exists_in_table?(table, field, value) do defp exists_in_scope?(scope, value) do
if moderation_cache_enabled?() do
ensure_cache_scope_loaded(scope)
:ets.member(cache_table_ref(), cache_member_key(scope, value))
else
{table, field} = cache_scope_source!(scope)
exists_in_table_db?(table, field, value)
end
end
defp ensure_cache_scope_loaded(scope) do
table = cache_table_ref()
loaded_key = cache_loaded_key(scope)
if :ets.member(table, loaded_key) do
:ok
else
{db_table, db_field} = cache_scope_source!(scope)
values = load_scope_values(db_table, db_field)
entries = Enum.map(values, &{cache_member_key(scope, &1), true})
if entries != [] do
true = :ets.insert(table, entries)
end
true = :ets.insert(table, {loaded_key, true})
:ok
end
end
defp load_scope_values(table, field) do
query =
from(record in table,
select: field(record, ^field)
)
Repo.all(query)
end
defp cache_put(scope, value) do
if moderation_cache_enabled?() do
true = :ets.insert(cache_table_ref(), {cache_member_key(scope, value), true})
end
:ok
end
defp cache_delete(scope, value) do
if moderation_cache_enabled?() do
true = :ets.delete(cache_table_ref(), cache_member_key(scope, value))
end
:ok
end
defp cache_scope_source!(scope), do: Map.fetch!(@cache_scope_sources, scope)
defp cache_loaded_key(scope), do: {:loaded, scope}
defp cache_member_key(scope, value), do: {:member, scope, value}
defp cache_table_ref do
case :ets.whereis(@cache_table) do
:undefined ->
try do
:ets.new(@cache_table, [
:named_table,
:set,
:public,
read_concurrency: true,
write_concurrency: true
])
rescue
ArgumentError -> @cache_table
end
@cache_table
_table_ref ->
@cache_table
end
end
defp moderation_cache_enabled? do
case Application.get_env(:parrhesia, :moderation_cache_enabled, true) do
true -> true
false -> false
_other -> true
end
end
defp exists_in_table_db?(table, field, value) do
query = query =
from(record in table, from(record in table,
where: field(record, ^field) == ^value, where: field(record, ^field) == ^value,

View File

@@ -21,6 +21,7 @@ defmodule Parrhesia.Web.Connection do
@default_outbound_drain_batch_size 64 @default_outbound_drain_batch_size 64
@default_outbound_overflow_strategy :close @default_outbound_overflow_strategy :close
@drain_outbound_queue :drain_outbound_queue @drain_outbound_queue :drain_outbound_queue
@post_ack_ingest :post_ack_ingest
@outbound_queue_pressure_threshold 0.75 @outbound_queue_pressure_threshold 0.75
@marmot_kinds MapSet.new([ @marmot_kinds MapSet.new([
@@ -155,6 +156,12 @@ defmodule Parrhesia.Web.Connection do
handle_fanout_events(state, fanout_events) handle_fanout_events(state, fanout_events)
end end
def handle_info({@post_ack_ingest, event}, %__MODULE__{} = state) when is_map(event) do
fanout_event(event)
maybe_publish_multi_node(event)
{:ok, state}
end
def handle_info(@drain_outbound_queue, %__MODULE__{} = state) do def handle_info(@drain_outbound_queue, %__MODULE__{} = state) do
{frames, next_state} = drain_outbound_frames(state) {frames, next_state} = drain_outbound_frames(state)
@@ -190,8 +197,7 @@ defmodule Parrhesia.Web.Connection do
telemetry_metadata_for_event(event) telemetry_metadata_for_event(event)
) )
fanout_event(event) send(self(), {@post_ack_ingest, event})
maybe_publish_multi_node(event)
response = Protocol.encode_relay({:ok, event_id, true, message}) response = Protocol.encode_relay({:ok, event_id, true, message})
{:push, {:text, response}, state} {:push, {:text, response}, state}

View File

@@ -15,6 +15,8 @@ Runs nostr-bench against a temporary Parrhesia prod server started via
Pool tuning: Pool tuning:
POOL_SIZE optional override for prod pool size POOL_SIZE optional override for prod pool size
DB_QUEUE_TARGET_MS optional Repo queue target override
DB_QUEUE_INTERVAL_MS optional Repo queue interval override
Database lifecycle: Database lifecycle:
PGDATABASE optional override (auto-generated by default) PGDATABASE optional override (auto-generated by default)