From b402d95e47bf945bc8846d01fb5879cc4164d353 Mon Sep 17 00:00:00 2001 From: Steffen Beyer Date: Thu, 26 Mar 2026 00:36:00 +0100 Subject: [PATCH] feat: add sync relay guard fanout gating and env config --- .env.example | 1 + README.md | 1 + config/config.exs | 3 +- config/runtime.exs | 8 +++- lib/parrhesia/api/events.ex | 14 ++++-- test/parrhesia/api/events_test.exs | 69 ++++++++++++++++++++++++++++++ test/parrhesia/config_test.exs | 1 + 7 files changed, 91 insertions(+), 6 deletions(-) diff --git a/.env.example b/.env.example index 8fec40e..5714077 100644 --- a/.env.example +++ b/.env.example @@ -13,6 +13,7 @@ POOL_SIZE=20 # PARRHESIA_POLICIES_AUTH_REQUIRED_FOR_WRITES=false # PARRHESIA_POLICIES_AUTH_REQUIRED_FOR_READS=false # PARRHESIA_POLICIES_MIN_POW_DIFFICULTY=0 +# PARRHESIA_SYNC_RELAY_GUARD=false # PARRHESIA_FEATURES_VERIFY_EVENT_SIGNATURES=true # PARRHESIA_METRICS_ENABLED_ON_MAIN_ENDPOINT=true # PARRHESIA_METRICS_PRIVATE_NETWORKS_ONLY=true diff --git a/README.md b/README.md index 8a66108..9dbc2bb 100644 --- a/README.md +++ b/README.md @@ -262,6 +262,7 @@ CSV env vars use comma-separated values. Boolean env vars accept `1/0`, `true/fa | `:nip66` | config-file driven | see table below | Built-in NIP-66 discovery / monitor publisher | | `:sync.path` | `PARRHESIA_SYNC_PATH` | `nil` | Optional path to sync peer config | | `:sync.start_workers?` | `PARRHESIA_SYNC_START_WORKERS` | `true` | Start outbound sync workers on boot | +| `:sync.relay_guard` | `PARRHESIA_SYNC_RELAY_GUARD` | `false` | Suppress multi-node re-fanout for sync-originated events | | `:limits` | `PARRHESIA_LIMITS_*` | see table below | Runtime override group | | `:policies` | `PARRHESIA_POLICIES_*` | see table below | Runtime override group | | `:listeners` | config-file driven | see notes below | Ingress listeners with bind, transport, feature, auth, network, and baseline ACL settings | diff --git a/config/config.exs b/config/config.exs index b37d7e3..ab39943 100644 --- a/config/config.exs +++ b/config/config.exs @@ -39,7 +39,8 @@ config :parrhesia, ], sync: [ path: nil, - start_workers?: true + start_workers?: true, + relay_guard: false ], limits: [ max_frame_bytes: 1_048_576, diff --git a/config/runtime.exs b/config/runtime.exs index fb8200f..9b411dd 100644 --- a/config/runtime.exs +++ b/config/runtime.exs @@ -161,6 +161,7 @@ if config_env() == :prod do retention_defaults = Application.get_env(:parrhesia, :retention, []) features_defaults = Application.get_env(:parrhesia, :features, []) acl_defaults = Application.get_env(:parrhesia, :acl, []) + sync_defaults = Application.get_env(:parrhesia, :sync, []) default_pool_size = Keyword.get(repo_defaults, :pool_size, 32) default_queue_target = Keyword.get(repo_defaults, :queue_target, 1_000) @@ -748,7 +749,12 @@ if config_env() == :prod do start_workers?: bool_env.( "PARRHESIA_SYNC_START_WORKERS", - Keyword.get(Application.get_env(:parrhesia, :sync, []), :start_workers?, true) + Keyword.get(sync_defaults, :start_workers?, true) + ), + relay_guard: + bool_env.( + "PARRHESIA_SYNC_RELAY_GUARD", + Keyword.get(sync_defaults, :relay_guard, false) ) ], moderation_cache_enabled: diff --git a/lib/parrhesia/api/events.ex b/lib/parrhesia/api/events.ex index b1df301..d3c126b 100644 --- a/lib/parrhesia/api/events.ex +++ b/lib/parrhesia/api/events.ex @@ -87,7 +87,7 @@ defmodule Parrhesia.API.Events do end Dispatcher.dispatch(event) - maybe_publish_multi_node(event) + maybe_publish_multi_node(event, context) {:ok, %PublishResult{ @@ -312,9 +312,15 @@ defmodule Parrhesia.API.Events do end end - defp maybe_publish_multi_node(event) do - MultiNode.publish(event) - :ok + defp maybe_publish_multi_node(event, %RequestContext{} = context) do + relay_guard? = Parrhesia.Config.get([:sync, :relay_guard], false) + + if relay_guard? and context.caller == :sync do + :ok + else + MultiNode.publish(event) + :ok + end catch :exit, _reason -> :ok end diff --git a/test/parrhesia/api/events_test.exs b/test/parrhesia/api/events_test.exs index 6d8f166..315f333 100644 --- a/test/parrhesia/api/events_test.exs +++ b/test/parrhesia/api/events_test.exs @@ -30,6 +30,45 @@ defmodule Parrhesia.API.EventsTest do assert second_result.message == "duplicate: event already stored" end + test "publish fanout includes sync-originated events when relay guard is disabled" do + with_sync_relay_guard(false) + join_multi_node_group!() + + event = valid_event() + event_id = event["id"] + + assert {:ok, %{accepted: true}} = + Events.publish(event, context: %RequestContext{caller: :sync}) + + assert_receive {:remote_fanout_event, %{"id" => ^event_id}}, 200 + end + + test "publish fanout skips sync-originated events when relay guard is enabled" do + with_sync_relay_guard(true) + join_multi_node_group!() + + event = valid_event() + event_id = event["id"] + + assert {:ok, %{accepted: true}} = + Events.publish(event, context: %RequestContext{caller: :sync}) + + refute_receive {:remote_fanout_event, %{"id" => ^event_id}}, 200 + end + + test "publish fanout still includes local-originated events when relay guard is enabled" do + with_sync_relay_guard(true) + join_multi_node_group!() + + event = valid_event() + event_id = event["id"] + + assert {:ok, %{accepted: true}} = + Events.publish(event, context: %RequestContext{caller: :local}) + + assert_receive {:remote_fanout_event, %{"id" => ^event_id}}, 200 + end + test "query and count preserve read semantics through the shared API" do now = System.system_time(:second) first = valid_event(%{"content" => "first", "created_at" => now}) @@ -53,6 +92,36 @@ defmodule Parrhesia.API.EventsTest do ) end + defp with_sync_relay_guard(enabled?) when is_boolean(enabled?) do + [{:config, previous}] = :ets.lookup(Parrhesia.Config, :config) + + sync = + previous + |> Map.get(:sync, []) + |> Keyword.put(:relay_guard, enabled?) + + :ets.insert(Parrhesia.Config, {:config, Map.put(previous, :sync, sync)}) + + on_exit(fn -> + :ets.insert(Parrhesia.Config, {:config, previous}) + end) + end + + defp join_multi_node_group! do + case Process.whereis(:pg) do + nil -> + case :pg.start_link() do + {:ok, _pid} -> :ok + {:error, {:already_started, _pid}} -> :ok + end + + _pid -> + :ok + end + + :ok = :pg.join(Parrhesia.Fanout.MultiNode, self()) + end + defp valid_event(overrides \\ %{}) do base_event = %{ "pubkey" => String.duplicate("1", 64), diff --git a/test/parrhesia/config_test.exs b/test/parrhesia/config_test.exs index 9bc0429..7cda128 100644 --- a/test/parrhesia/config_test.exs +++ b/test/parrhesia/config_test.exs @@ -22,6 +22,7 @@ defmodule Parrhesia.ConfigTest do assert Parrhesia.Config.get([:limits, :max_filter_limit]) == 500 assert Parrhesia.Config.get([:database, :separate_read_pool?]) == false assert Parrhesia.Config.get([:relay_url]) == "ws://localhost:4413/relay" + assert Parrhesia.Config.get([:sync, :relay_guard]) == false assert Parrhesia.Config.get([:policies, :auth_required_for_writes]) == false assert Parrhesia.Config.get([:policies, :marmot_media_max_imeta_tags_per_event]) == 8 assert Parrhesia.Config.get([:policies, :marmot_media_reject_mip04_v1]) == true