diff --git a/config/config.exs b/config/config.exs index cdeeae6..cd9df39 100644 --- a/config/config.exs +++ b/config/config.exs @@ -3,6 +3,7 @@ import Config config :postgrex, :json_library, JSON config :parrhesia, + moderation_cache_enabled: true, limits: [ max_frame_bytes: 1_048_576, max_event_bytes: 262_144, diff --git a/config/prod.exs b/config/prod.exs index 2961960..83ada1c 100644 --- a/config/prod.exs +++ b/config/prod.exs @@ -1,5 +1,8 @@ import Config -config :parrhesia, Parrhesia.Repo, pool_size: 32 +config :parrhesia, Parrhesia.Repo, + pool_size: 32, + queue_target: 1_000, + queue_interval: 5_000 # Production runtime configuration lives in config/runtime.exs. diff --git a/config/runtime.exs b/config/runtime.exs index a8a99ba..58c223f 100644 --- a/config/runtime.exs +++ b/config/runtime.exs @@ -5,10 +5,11 @@ if config_env() == :prod do System.get_env("DATABASE_URL") || raise "environment variable DATABASE_URL is missing. Example: ecto://USER:PASS@HOST/DATABASE" - default_pool_size = - :parrhesia - |> Application.get_env(Parrhesia.Repo, []) - |> Keyword.get(:pool_size, 32) + repo_defaults = Application.get_env(:parrhesia, Parrhesia.Repo, []) + + default_pool_size = Keyword.get(repo_defaults, :pool_size, 32) + default_queue_target = Keyword.get(repo_defaults, :queue_target, 1_000) + default_queue_interval = Keyword.get(repo_defaults, :queue_interval, 5_000) pool_size = case System.get_env("POOL_SIZE") do @@ -16,9 +17,23 @@ if config_env() == :prod do value -> String.to_integer(value) end + queue_target = + case System.get_env("DB_QUEUE_TARGET_MS") do + nil -> default_queue_target + value -> String.to_integer(value) + end + + queue_interval = + case System.get_env("DB_QUEUE_INTERVAL_MS") do + nil -> default_queue_interval + value -> String.to_integer(value) + end + config :parrhesia, Parrhesia.Repo, url: database_url, - pool_size: pool_size + pool_size: pool_size, + queue_target: queue_target, + queue_interval: queue_interval config :parrhesia, Parrhesia.Web.Endpoint, port: String.to_integer(System.get_env("PORT") || "4000") diff --git a/config/test.exs b/config/test.exs index f840d03..d9e1c4a 100644 --- a/config/test.exs +++ b/config/test.exs @@ -12,7 +12,9 @@ config :parrhesia, Parrhesia.Web.Endpoint, port: test_endpoint_port, ip: {127, 0, 0, 1} -config :parrhesia, enable_expiration_worker: false +config :parrhesia, + enable_expiration_worker: false, + moderation_cache_enabled: false pg_host = System.get_env("PGHOST") diff --git a/lib/parrhesia/storage/adapters/postgres/moderation.ex b/lib/parrhesia/storage/adapters/postgres/moderation.ex index 4f29c58..22baf88 100644 --- a/lib/parrhesia/storage/adapters/postgres/moderation.ex +++ b/lib/parrhesia/storage/adapters/postgres/moderation.ex @@ -9,87 +9,111 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Moderation do @behaviour Parrhesia.Storage.Moderation + @cache_table :parrhesia_moderation_cache + @cache_scope_sources %{ + banned_pubkeys: {"banned_pubkeys", :pubkey}, + allowed_pubkeys: {"allowed_pubkeys", :pubkey}, + banned_events: {"banned_events", :event_id}, + blocked_ips: {"blocked_ips", :ip} + } + @impl true def ban_pubkey(_context, pubkey) do - with {:ok, normalized_pubkey} <- normalize_hex_or_binary(pubkey, 32, :invalid_pubkey) do - upsert_presence_table("banned_pubkeys", :pubkey, normalized_pubkey) + with {:ok, normalized_pubkey} <- normalize_hex_or_binary(pubkey, 32, :invalid_pubkey), + :ok <- upsert_presence_table("banned_pubkeys", :pubkey, normalized_pubkey) do + cache_put(:banned_pubkeys, normalized_pubkey) + :ok end end @impl true def unban_pubkey(_context, pubkey) do - with {:ok, normalized_pubkey} <- normalize_hex_or_binary(pubkey, 32, :invalid_pubkey) do - delete_from_table("banned_pubkeys", :pubkey, normalized_pubkey) + with {:ok, normalized_pubkey} <- normalize_hex_or_binary(pubkey, 32, :invalid_pubkey), + :ok <- delete_from_table("banned_pubkeys", :pubkey, normalized_pubkey) do + cache_delete(:banned_pubkeys, normalized_pubkey) + :ok end end @impl true def pubkey_banned?(_context, pubkey) do with {:ok, normalized_pubkey} <- normalize_hex_or_binary(pubkey, 32, :invalid_pubkey) do - {:ok, exists_in_table?("banned_pubkeys", :pubkey, normalized_pubkey)} + {:ok, exists_in_scope?(:banned_pubkeys, normalized_pubkey)} end end @impl true def allow_pubkey(_context, pubkey) do - with {:ok, normalized_pubkey} <- normalize_hex_or_binary(pubkey, 32, :invalid_pubkey) do - upsert_presence_table("allowed_pubkeys", :pubkey, normalized_pubkey) + with {:ok, normalized_pubkey} <- normalize_hex_or_binary(pubkey, 32, :invalid_pubkey), + :ok <- upsert_presence_table("allowed_pubkeys", :pubkey, normalized_pubkey) do + cache_put(:allowed_pubkeys, normalized_pubkey) + :ok end end @impl true def disallow_pubkey(_context, pubkey) do - with {:ok, normalized_pubkey} <- normalize_hex_or_binary(pubkey, 32, :invalid_pubkey) do - delete_from_table("allowed_pubkeys", :pubkey, normalized_pubkey) + with {:ok, normalized_pubkey} <- normalize_hex_or_binary(pubkey, 32, :invalid_pubkey), + :ok <- delete_from_table("allowed_pubkeys", :pubkey, normalized_pubkey) do + cache_delete(:allowed_pubkeys, normalized_pubkey) + :ok end end @impl true def pubkey_allowed?(_context, pubkey) do with {:ok, normalized_pubkey} <- normalize_hex_or_binary(pubkey, 32, :invalid_pubkey) do - {:ok, exists_in_table?("allowed_pubkeys", :pubkey, normalized_pubkey)} + {:ok, exists_in_scope?(:allowed_pubkeys, normalized_pubkey)} end end @impl true def ban_event(_context, event_id) do - with {:ok, normalized_event_id} <- normalize_hex_or_binary(event_id, 32, :invalid_event_id) do - upsert_presence_table("banned_events", :event_id, normalized_event_id) + with {:ok, normalized_event_id} <- normalize_hex_or_binary(event_id, 32, :invalid_event_id), + :ok <- upsert_presence_table("banned_events", :event_id, normalized_event_id) do + cache_put(:banned_events, normalized_event_id) + :ok end end @impl true def unban_event(_context, event_id) do - with {:ok, normalized_event_id} <- normalize_hex_or_binary(event_id, 32, :invalid_event_id) do - delete_from_table("banned_events", :event_id, normalized_event_id) + with {:ok, normalized_event_id} <- normalize_hex_or_binary(event_id, 32, :invalid_event_id), + :ok <- delete_from_table("banned_events", :event_id, normalized_event_id) do + cache_delete(:banned_events, normalized_event_id) + :ok end end @impl true def event_banned?(_context, event_id) do with {:ok, normalized_event_id} <- normalize_hex_or_binary(event_id, 32, :invalid_event_id) do - {:ok, exists_in_table?("banned_events", :event_id, normalized_event_id)} + {:ok, exists_in_scope?(:banned_events, normalized_event_id)} end end @impl true def block_ip(_context, ip_address) do - with {:ok, normalized_ip} <- normalize_ip(ip_address) do - upsert_presence_table("blocked_ips", :ip, normalized_ip) + with {:ok, normalized_ip} <- normalize_ip(ip_address), + :ok <- upsert_presence_table("blocked_ips", :ip, normalized_ip) do + cache_put(:blocked_ips, normalized_ip) + :ok end end @impl true def unblock_ip(_context, ip_address) do - with {:ok, normalized_ip} <- normalize_ip(ip_address) do - delete_from_table("blocked_ips", :ip, normalized_ip) + with {:ok, normalized_ip} <- normalize_ip(ip_address), + :ok <- delete_from_table("blocked_ips", :ip, normalized_ip) do + cache_delete(:blocked_ips, normalized_ip) + :ok end end @impl true def ip_blocked?(_context, ip_address) do with {:ok, normalized_ip} <- normalize_ip(ip_address) do - {:ok, exists_in_table?("blocked_ips", :ip, normalized_ip)} + {:ok, exists_in_scope?(:blocked_ips, normalized_ip)} end end @@ -122,7 +146,99 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Moderation do :ok end - defp exists_in_table?(table, field, value) do + defp exists_in_scope?(scope, value) do + if moderation_cache_enabled?() do + ensure_cache_scope_loaded(scope) + :ets.member(cache_table_ref(), cache_member_key(scope, value)) + else + {table, field} = cache_scope_source!(scope) + exists_in_table_db?(table, field, value) + end + end + + defp ensure_cache_scope_loaded(scope) do + table = cache_table_ref() + loaded_key = cache_loaded_key(scope) + + if :ets.member(table, loaded_key) do + :ok + else + {db_table, db_field} = cache_scope_source!(scope) + values = load_scope_values(db_table, db_field) + + entries = Enum.map(values, &{cache_member_key(scope, &1), true}) + + if entries != [] do + true = :ets.insert(table, entries) + end + + true = :ets.insert(table, {loaded_key, true}) + :ok + end + end + + defp load_scope_values(table, field) do + query = + from(record in table, + select: field(record, ^field) + ) + + Repo.all(query) + end + + defp cache_put(scope, value) do + if moderation_cache_enabled?() do + true = :ets.insert(cache_table_ref(), {cache_member_key(scope, value), true}) + end + + :ok + end + + defp cache_delete(scope, value) do + if moderation_cache_enabled?() do + true = :ets.delete(cache_table_ref(), cache_member_key(scope, value)) + end + + :ok + end + + defp cache_scope_source!(scope), do: Map.fetch!(@cache_scope_sources, scope) + + defp cache_loaded_key(scope), do: {:loaded, scope} + + defp cache_member_key(scope, value), do: {:member, scope, value} + + defp cache_table_ref do + case :ets.whereis(@cache_table) do + :undefined -> + try do + :ets.new(@cache_table, [ + :named_table, + :set, + :public, + read_concurrency: true, + write_concurrency: true + ]) + rescue + ArgumentError -> @cache_table + end + + @cache_table + + _table_ref -> + @cache_table + end + end + + defp moderation_cache_enabled? do + case Application.get_env(:parrhesia, :moderation_cache_enabled, true) do + true -> true + false -> false + _other -> true + end + end + + defp exists_in_table_db?(table, field, value) do query = from(record in table, where: field(record, ^field) == ^value, diff --git a/lib/parrhesia/web/connection.ex b/lib/parrhesia/web/connection.ex index f3ceefb..6495ec0 100644 --- a/lib/parrhesia/web/connection.ex +++ b/lib/parrhesia/web/connection.ex @@ -21,6 +21,7 @@ defmodule Parrhesia.Web.Connection do @default_outbound_drain_batch_size 64 @default_outbound_overflow_strategy :close @drain_outbound_queue :drain_outbound_queue + @post_ack_ingest :post_ack_ingest @outbound_queue_pressure_threshold 0.75 @marmot_kinds MapSet.new([ @@ -155,6 +156,12 @@ defmodule Parrhesia.Web.Connection do handle_fanout_events(state, fanout_events) end + def handle_info({@post_ack_ingest, event}, %__MODULE__{} = state) when is_map(event) do + fanout_event(event) + maybe_publish_multi_node(event) + {:ok, state} + end + def handle_info(@drain_outbound_queue, %__MODULE__{} = state) do {frames, next_state} = drain_outbound_frames(state) @@ -190,8 +197,7 @@ defmodule Parrhesia.Web.Connection do telemetry_metadata_for_event(event) ) - fanout_event(event) - maybe_publish_multi_node(event) + send(self(), {@post_ack_ingest, event}) response = Protocol.encode_relay({:ok, event_id, true, message}) {:push, {:text, response}, state} diff --git a/scripts/run_nostr_bench.sh b/scripts/run_nostr_bench.sh index b04d56e..1f668fc 100755 --- a/scripts/run_nostr_bench.sh +++ b/scripts/run_nostr_bench.sh @@ -15,6 +15,8 @@ Runs nostr-bench against a temporary Parrhesia prod server started via Pool tuning: POOL_SIZE optional override for prod pool size + DB_QUEUE_TARGET_MS optional Repo queue target override + DB_QUEUE_INTERVAL_MS optional Repo queue interval override Database lifecycle: PGDATABASE optional override (auto-generated by default)