feat: add sync relay guard fanout gating and env config
This commit is contained in:
@@ -13,6 +13,7 @@ POOL_SIZE=20
|
|||||||
# PARRHESIA_POLICIES_AUTH_REQUIRED_FOR_WRITES=false
|
# PARRHESIA_POLICIES_AUTH_REQUIRED_FOR_WRITES=false
|
||||||
# PARRHESIA_POLICIES_AUTH_REQUIRED_FOR_READS=false
|
# PARRHESIA_POLICIES_AUTH_REQUIRED_FOR_READS=false
|
||||||
# PARRHESIA_POLICIES_MIN_POW_DIFFICULTY=0
|
# PARRHESIA_POLICIES_MIN_POW_DIFFICULTY=0
|
||||||
|
# PARRHESIA_SYNC_RELAY_GUARD=false
|
||||||
# PARRHESIA_FEATURES_VERIFY_EVENT_SIGNATURES=true
|
# PARRHESIA_FEATURES_VERIFY_EVENT_SIGNATURES=true
|
||||||
# PARRHESIA_METRICS_ENABLED_ON_MAIN_ENDPOINT=true
|
# PARRHESIA_METRICS_ENABLED_ON_MAIN_ENDPOINT=true
|
||||||
# PARRHESIA_METRICS_PRIVATE_NETWORKS_ONLY=true
|
# PARRHESIA_METRICS_PRIVATE_NETWORKS_ONLY=true
|
||||||
|
|||||||
@@ -262,6 +262,7 @@ CSV env vars use comma-separated values. Boolean env vars accept `1/0`, `true/fa
|
|||||||
| `:nip66` | config-file driven | see table below | Built-in NIP-66 discovery / monitor publisher |
|
| `:nip66` | config-file driven | see table below | Built-in NIP-66 discovery / monitor publisher |
|
||||||
| `:sync.path` | `PARRHESIA_SYNC_PATH` | `nil` | Optional path to sync peer config |
|
| `:sync.path` | `PARRHESIA_SYNC_PATH` | `nil` | Optional path to sync peer config |
|
||||||
| `:sync.start_workers?` | `PARRHESIA_SYNC_START_WORKERS` | `true` | Start outbound sync workers on boot |
|
| `:sync.start_workers?` | `PARRHESIA_SYNC_START_WORKERS` | `true` | Start outbound sync workers on boot |
|
||||||
|
| `:sync.relay_guard` | `PARRHESIA_SYNC_RELAY_GUARD` | `false` | Suppress multi-node re-fanout for sync-originated events |
|
||||||
| `:limits` | `PARRHESIA_LIMITS_*` | see table below | Runtime override group |
|
| `:limits` | `PARRHESIA_LIMITS_*` | see table below | Runtime override group |
|
||||||
| `:policies` | `PARRHESIA_POLICIES_*` | see table below | Runtime override group |
|
| `:policies` | `PARRHESIA_POLICIES_*` | see table below | Runtime override group |
|
||||||
| `:listeners` | config-file driven | see notes below | Ingress listeners with bind, transport, feature, auth, network, and baseline ACL settings |
|
| `:listeners` | config-file driven | see notes below | Ingress listeners with bind, transport, feature, auth, network, and baseline ACL settings |
|
||||||
|
|||||||
@@ -39,7 +39,8 @@ config :parrhesia,
|
|||||||
],
|
],
|
||||||
sync: [
|
sync: [
|
||||||
path: nil,
|
path: nil,
|
||||||
start_workers?: true
|
start_workers?: true,
|
||||||
|
relay_guard: false
|
||||||
],
|
],
|
||||||
limits: [
|
limits: [
|
||||||
max_frame_bytes: 1_048_576,
|
max_frame_bytes: 1_048_576,
|
||||||
|
|||||||
@@ -161,6 +161,7 @@ if config_env() == :prod do
|
|||||||
retention_defaults = Application.get_env(:parrhesia, :retention, [])
|
retention_defaults = Application.get_env(:parrhesia, :retention, [])
|
||||||
features_defaults = Application.get_env(:parrhesia, :features, [])
|
features_defaults = Application.get_env(:parrhesia, :features, [])
|
||||||
acl_defaults = Application.get_env(:parrhesia, :acl, [])
|
acl_defaults = Application.get_env(:parrhesia, :acl, [])
|
||||||
|
sync_defaults = Application.get_env(:parrhesia, :sync, [])
|
||||||
|
|
||||||
default_pool_size = Keyword.get(repo_defaults, :pool_size, 32)
|
default_pool_size = Keyword.get(repo_defaults, :pool_size, 32)
|
||||||
default_queue_target = Keyword.get(repo_defaults, :queue_target, 1_000)
|
default_queue_target = Keyword.get(repo_defaults, :queue_target, 1_000)
|
||||||
@@ -748,7 +749,12 @@ if config_env() == :prod do
|
|||||||
start_workers?:
|
start_workers?:
|
||||||
bool_env.(
|
bool_env.(
|
||||||
"PARRHESIA_SYNC_START_WORKERS",
|
"PARRHESIA_SYNC_START_WORKERS",
|
||||||
Keyword.get(Application.get_env(:parrhesia, :sync, []), :start_workers?, true)
|
Keyword.get(sync_defaults, :start_workers?, true)
|
||||||
|
),
|
||||||
|
relay_guard:
|
||||||
|
bool_env.(
|
||||||
|
"PARRHESIA_SYNC_RELAY_GUARD",
|
||||||
|
Keyword.get(sync_defaults, :relay_guard, false)
|
||||||
)
|
)
|
||||||
],
|
],
|
||||||
moderation_cache_enabled:
|
moderation_cache_enabled:
|
||||||
|
|||||||
@@ -87,7 +87,7 @@ defmodule Parrhesia.API.Events do
|
|||||||
end
|
end
|
||||||
|
|
||||||
Dispatcher.dispatch(event)
|
Dispatcher.dispatch(event)
|
||||||
maybe_publish_multi_node(event)
|
maybe_publish_multi_node(event, context)
|
||||||
|
|
||||||
{:ok,
|
{:ok,
|
||||||
%PublishResult{
|
%PublishResult{
|
||||||
@@ -312,9 +312,15 @@ defmodule Parrhesia.API.Events do
|
|||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
defp maybe_publish_multi_node(event) do
|
defp maybe_publish_multi_node(event, %RequestContext{} = context) do
|
||||||
MultiNode.publish(event)
|
relay_guard? = Parrhesia.Config.get([:sync, :relay_guard], false)
|
||||||
:ok
|
|
||||||
|
if relay_guard? and context.caller == :sync do
|
||||||
|
:ok
|
||||||
|
else
|
||||||
|
MultiNode.publish(event)
|
||||||
|
:ok
|
||||||
|
end
|
||||||
catch
|
catch
|
||||||
:exit, _reason -> :ok
|
:exit, _reason -> :ok
|
||||||
end
|
end
|
||||||
|
|||||||
@@ -30,6 +30,45 @@ defmodule Parrhesia.API.EventsTest do
|
|||||||
assert second_result.message == "duplicate: event already stored"
|
assert second_result.message == "duplicate: event already stored"
|
||||||
end
|
end
|
||||||
|
|
||||||
|
test "publish fanout includes sync-originated events when relay guard is disabled" do
|
||||||
|
with_sync_relay_guard(false)
|
||||||
|
join_multi_node_group!()
|
||||||
|
|
||||||
|
event = valid_event()
|
||||||
|
event_id = event["id"]
|
||||||
|
|
||||||
|
assert {:ok, %{accepted: true}} =
|
||||||
|
Events.publish(event, context: %RequestContext{caller: :sync})
|
||||||
|
|
||||||
|
assert_receive {:remote_fanout_event, %{"id" => ^event_id}}, 200
|
||||||
|
end
|
||||||
|
|
||||||
|
test "publish fanout skips sync-originated events when relay guard is enabled" do
|
||||||
|
with_sync_relay_guard(true)
|
||||||
|
join_multi_node_group!()
|
||||||
|
|
||||||
|
event = valid_event()
|
||||||
|
event_id = event["id"]
|
||||||
|
|
||||||
|
assert {:ok, %{accepted: true}} =
|
||||||
|
Events.publish(event, context: %RequestContext{caller: :sync})
|
||||||
|
|
||||||
|
refute_receive {:remote_fanout_event, %{"id" => ^event_id}}, 200
|
||||||
|
end
|
||||||
|
|
||||||
|
test "publish fanout still includes local-originated events when relay guard is enabled" do
|
||||||
|
with_sync_relay_guard(true)
|
||||||
|
join_multi_node_group!()
|
||||||
|
|
||||||
|
event = valid_event()
|
||||||
|
event_id = event["id"]
|
||||||
|
|
||||||
|
assert {:ok, %{accepted: true}} =
|
||||||
|
Events.publish(event, context: %RequestContext{caller: :local})
|
||||||
|
|
||||||
|
assert_receive {:remote_fanout_event, %{"id" => ^event_id}}, 200
|
||||||
|
end
|
||||||
|
|
||||||
test "query and count preserve read semantics through the shared API" do
|
test "query and count preserve read semantics through the shared API" do
|
||||||
now = System.system_time(:second)
|
now = System.system_time(:second)
|
||||||
first = valid_event(%{"content" => "first", "created_at" => now})
|
first = valid_event(%{"content" => "first", "created_at" => now})
|
||||||
@@ -53,6 +92,36 @@ defmodule Parrhesia.API.EventsTest do
|
|||||||
)
|
)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
defp with_sync_relay_guard(enabled?) when is_boolean(enabled?) do
|
||||||
|
[{:config, previous}] = :ets.lookup(Parrhesia.Config, :config)
|
||||||
|
|
||||||
|
sync =
|
||||||
|
previous
|
||||||
|
|> Map.get(:sync, [])
|
||||||
|
|> Keyword.put(:relay_guard, enabled?)
|
||||||
|
|
||||||
|
:ets.insert(Parrhesia.Config, {:config, Map.put(previous, :sync, sync)})
|
||||||
|
|
||||||
|
on_exit(fn ->
|
||||||
|
:ets.insert(Parrhesia.Config, {:config, previous})
|
||||||
|
end)
|
||||||
|
end
|
||||||
|
|
||||||
|
defp join_multi_node_group! do
|
||||||
|
case Process.whereis(:pg) do
|
||||||
|
nil ->
|
||||||
|
case :pg.start_link() do
|
||||||
|
{:ok, _pid} -> :ok
|
||||||
|
{:error, {:already_started, _pid}} -> :ok
|
||||||
|
end
|
||||||
|
|
||||||
|
_pid ->
|
||||||
|
:ok
|
||||||
|
end
|
||||||
|
|
||||||
|
:ok = :pg.join(Parrhesia.Fanout.MultiNode, self())
|
||||||
|
end
|
||||||
|
|
||||||
defp valid_event(overrides \\ %{}) do
|
defp valid_event(overrides \\ %{}) do
|
||||||
base_event = %{
|
base_event = %{
|
||||||
"pubkey" => String.duplicate("1", 64),
|
"pubkey" => String.duplicate("1", 64),
|
||||||
|
|||||||
@@ -22,6 +22,7 @@ defmodule Parrhesia.ConfigTest do
|
|||||||
assert Parrhesia.Config.get([:limits, :max_filter_limit]) == 500
|
assert Parrhesia.Config.get([:limits, :max_filter_limit]) == 500
|
||||||
assert Parrhesia.Config.get([:database, :separate_read_pool?]) == false
|
assert Parrhesia.Config.get([:database, :separate_read_pool?]) == false
|
||||||
assert Parrhesia.Config.get([:relay_url]) == "ws://localhost:4413/relay"
|
assert Parrhesia.Config.get([:relay_url]) == "ws://localhost:4413/relay"
|
||||||
|
assert Parrhesia.Config.get([:sync, :relay_guard]) == false
|
||||||
assert Parrhesia.Config.get([:policies, :auth_required_for_writes]) == false
|
assert Parrhesia.Config.get([:policies, :auth_required_for_writes]) == false
|
||||||
assert Parrhesia.Config.get([:policies, :marmot_media_max_imeta_tags_per_event]) == 8
|
assert Parrhesia.Config.get([:policies, :marmot_media_max_imeta_tags_per_event]) == 8
|
||||||
assert Parrhesia.Config.get([:policies, :marmot_media_reject_mip04_v1]) == true
|
assert Parrhesia.Config.get([:policies, :marmot_media_reject_mip04_v1]) == true
|
||||||
|
|||||||
Reference in New Issue
Block a user