diff --git a/README.md b/README.md index 4df91ee..6639201 100644 --- a/README.md +++ b/README.md @@ -32,6 +32,8 @@ Current `supported_nips` list: `1, 9, 11, 13, 17, 40, 42, 43, 44, 45, 50, 59, 62, 66, 70, 77, 86, 98` +`66` is advertised when the built-in NIP-66 publisher is enabled and has at least one relay target. The default config enables it for the `public` relay URL. Parrhesia probes those target relays, collects the resulting NIP-11 / websocket liveness data, and then publishes the signed `10166` and `30166` events locally on this relay. + ## Requirements - Elixir `~> 1.19` @@ -181,6 +183,7 @@ CSV env vars use comma-separated values. Boolean env vars accept `1/0`, `true/fa | `:identity.private_key` | `PARRHESIA_IDENTITY_PRIVATE_KEY` | `nil` | Optional inline relay private key | | `:moderation_cache_enabled` | `PARRHESIA_MODERATION_CACHE_ENABLED` | `true` | Toggle moderation cache | | `:enable_expiration_worker` | `PARRHESIA_ENABLE_EXPIRATION_WORKER` | `true` | Toggle background expiration worker | +| `:nip66` | config-file driven | see table below | Built-in NIP-66 discovery / monitor publisher | | `:sync.path` | `PARRHESIA_SYNC_PATH` | `nil` | Optional path to sync peer config | | `:sync.start_workers?` | `PARRHESIA_SYNC_START_WORKERS` | `true` | Start outbound sync workers on boot | | `:limits` | `PARRHESIA_LIMITS_*` | see table below | Runtime override group | @@ -250,6 +253,19 @@ Every listener supports this config-file schema: | `:baseline_acl.write` | `-` | `[]` | Static write deny/allow rules | | `:bandit_options` | `-` | `[]` | Advanced Bandit / ThousandIsland passthrough | +#### `:nip66` + +| Atom key | ENV | Default | Notes | +| --- | --- | --- | --- | +| `:enabled` | `-` | `true` | Enables the built-in NIP-66 publisher worker | +| `:publish_interval_seconds` | `-` | `900` | Republish cadence for `10166` and `30166` events | +| `:publish_monitor_announcement?` | `-` | `true` | Publish a `10166` monitor announcement alongside discovery events | +| `:timeout_ms` | `-` | `5000` | Probe timeout for websocket and NIP-11 checks | +| `:checks` | `-` | `[:open, :read, :nip11]` | Checks advertised in `10166` and run against each target relay during probing | +| `:targets` | `-` | `[]` | Optional explicit relay targets to probe; when empty, Parrhesia uses `:relay_url` for the `public` listener | + +NIP-66 targets are probe sources, not publish destinations. Parrhesia connects to each target relay, collects the configured liveness / discovery data, and stores the resulting signed `10166` / `30166` events in its own local event store so clients can query them here. + #### `:limits` | Atom key | ENV | Default | diff --git a/config/config.exs b/config/config.exs index 991fa40..1937c7c 100644 --- a/config/config.exs +++ b/config/config.exs @@ -5,6 +5,14 @@ config :postgrex, :json_library, JSON config :parrhesia, moderation_cache_enabled: true, relay_url: "ws://localhost:4413/relay", + nip66: [ + enabled: true, + publish_interval_seconds: 900, + publish_monitor_announcement?: true, + timeout_ms: 5_000, + checks: [:open, :read, :nip11], + targets: [] + ], identity: [ path: nil, private_key: nil diff --git a/config/test.exs b/config/test.exs index 86aeac8..9b8a726 100644 --- a/config/test.exs +++ b/config/test.exs @@ -27,6 +27,7 @@ config :parrhesia, :listeners, config :parrhesia, enable_expiration_worker: false, moderation_cache_enabled: false, + nip66: [enabled: false], identity: [ path: Path.join(System.tmp_dir!(), "parrhesia_test_identity.json"), private_key: nil diff --git a/lib/parrhesia/nip66.ex b/lib/parrhesia/nip66.ex new file mode 100644 index 0000000..f68f3c9 --- /dev/null +++ b/lib/parrhesia/nip66.ex @@ -0,0 +1,400 @@ +defmodule Parrhesia.NIP66 do + @moduledoc false + + alias Parrhesia.API.Events + alias Parrhesia.API.Identity + alias Parrhesia.API.RequestContext + alias Parrhesia.NIP66.Probe + alias Parrhesia.Web.Listener + alias Parrhesia.Web.RelayInfo + + @default_publish_interval_seconds 900 + @default_timeout_ms 5_000 + @default_checks [:open, :read, :nip11] + @allowed_requirement_keys MapSet.new(~w[auth writes pow payment]) + + @spec enabled?(keyword()) :: boolean() + def enabled?(opts \\ []) do + config = config(opts) + config_enabled?(config) and active_targets(config, listeners(opts)) != [] + end + + @spec publish_snapshot(keyword()) :: {:ok, [map()]} + def publish_snapshot(opts \\ []) when is_list(opts) do + config = config(opts) + targets = active_targets(config, listeners(opts)) + + if config_enabled?(config) and targets != [] do + probe_fun = Keyword.get(opts, :probe_fun, &Probe.probe/3) + context = Keyword.get(opts, :context, %RequestContext{}) + now = Keyword.get(opts, :now, System.system_time(:second)) + identity_opts = identity_opts(opts) + + events = + maybe_publish_monitor_announcement(config, now, context, identity_opts) + |> Kernel.++( + publish_discovery_events(targets, config, probe_fun, now, context, identity_opts) + ) + + {:ok, events} + else + {:ok, []} + end + end + + @spec publish_interval_ms(keyword()) :: pos_integer() + def publish_interval_ms(opts \\ []) when is_list(opts) do + config = config(opts) + + config + |> Keyword.get(:publish_interval_seconds, @default_publish_interval_seconds) + |> normalize_positive_integer(@default_publish_interval_seconds) + |> Kernel.*(1_000) + end + + defp maybe_publish_monitor_announcement(config, now, context, identity_opts) do + if Keyword.get(config, :publish_monitor_announcement?, true) do + config + |> build_monitor_announcement(now) + |> sign_and_publish(context, identity_opts) + |> maybe_wrap_event() + else + [] + end + end + + defp publish_discovery_events(targets, config, probe_fun, now, context, identity_opts) do + probe_opts = [ + timeout_ms: + config + |> Keyword.get(:timeout_ms, @default_timeout_ms) + |> normalize_positive_integer(@default_timeout_ms), + checks: normalize_checks(Keyword.get(config, :checks, @default_checks)) + ] + + Enum.flat_map(targets, fn target -> + probe_result = + case probe_fun.(target, probe_opts, identity_opts) do + {:ok, result} when is_map(result) -> result + _other -> %{checks: [], metrics: %{}, relay_info: nil, relay_info_body: nil} + end + + target + |> build_discovery_event(now, probe_result, identity_opts) + |> sign_and_publish(context, identity_opts) + |> maybe_wrap_event() + end) + end + + defp sign_and_publish(event, context, identity_opts) do + with {:ok, signed_event} <- Identity.sign_event(event, identity_opts), + {:ok, %{accepted: true}} <- Events.publish(signed_event, context: context) do + {:ok, signed_event} + else + _other -> :error + end + end + + defp maybe_wrap_event({:ok, event}), do: [event] + defp maybe_wrap_event(_other), do: [] + + defp build_monitor_announcement(config, now) do + checks = normalize_checks(Keyword.get(config, :checks, @default_checks)) + timeout_ms = Keyword.get(config, :timeout_ms, @default_timeout_ms) + frequency = Keyword.get(config, :publish_interval_seconds, @default_publish_interval_seconds) + + tags = + [ + [ + "frequency", + Integer.to_string( + normalize_positive_integer(frequency, @default_publish_interval_seconds) + ) + ] + ] ++ + Enum.map(checks, fn check -> + ["timeout", Atom.to_string(check), Integer.to_string(timeout_ms)] + end) ++ + Enum.map(checks, fn check -> ["c", Atom.to_string(check)] end) ++ + maybe_geohash_tag(config) + + %{ + "created_at" => now, + "kind" => 10_166, + "tags" => tags, + "content" => "" + } + end + + defp build_discovery_event(target, now, probe_result, identity_opts) do + relay_info = probe_result[:relay_info] || local_relay_info(target.listener, identity_opts) + content = probe_result[:relay_info_body] || JSON.encode!(relay_info) + + tags = + [["d", target.relay_url]] + |> append_network_tag(target) + |> append_relay_type_tag(target) + |> append_geohash_tag(target) + |> append_topic_tags(target) + |> Kernel.++(nip_tags(relay_info)) + |> Kernel.++(requirement_tags(relay_info)) + |> Kernel.++(rtt_tags(probe_result[:metrics] || %{})) + + %{ + "created_at" => now, + "kind" => 30_166, + "tags" => tags, + "content" => content + } + end + + defp nip_tags(relay_info) do + relay_info + |> Map.get("supported_nips", []) + |> Enum.map(&["N", Integer.to_string(&1)]) + end + + defp requirement_tags(relay_info) do + limitation = Map.get(relay_info, "limitation", %{}) + + [ + requirement_value("auth", Map.get(limitation, "auth_required", false)), + requirement_value("writes", Map.get(limitation, "restricted_writes", false)), + requirement_value("pow", Map.get(limitation, "min_pow_difficulty", 0) > 0), + requirement_value("payment", Map.get(limitation, "payment_required", false)) + ] + |> Enum.filter(&MapSet.member?(@allowed_requirement_keys, String.trim_leading(&1, "!"))) + |> Enum.map(&["R", &1]) + end + + defp requirement_value(name, true), do: name + defp requirement_value(name, false), do: "!" <> name + + defp rtt_tags(metrics) when is_map(metrics) do + [] + |> maybe_put_metric_tag("rtt-open", Map.get(metrics, :rtt_open_ms)) + |> maybe_put_metric_tag("rtt-read", Map.get(metrics, :rtt_read_ms)) + |> maybe_put_metric_tag("rtt-write", Map.get(metrics, :rtt_write_ms)) + end + + defp append_network_tag(tags, target) do + case target.network do + nil -> tags + value -> tags ++ [["n", value]] + end + end + + defp append_relay_type_tag(tags, target) do + case target.relay_type do + nil -> tags + value -> tags ++ [["T", value]] + end + end + + defp append_geohash_tag(tags, target) do + case target.geohash do + nil -> tags + value -> tags ++ [["g", value]] + end + end + + defp append_topic_tags(tags, target) do + tags ++ Enum.map(target.topics, &["t", &1]) + end + + defp maybe_put_metric_tag(tags, _name, nil), do: tags + + defp maybe_put_metric_tag(tags, name, value) when is_integer(value) and value >= 0 do + tags ++ [[name, Integer.to_string(value)]] + end + + defp maybe_put_metric_tag(tags, _name, _value), do: tags + + defp local_relay_info(listener, identity_opts) do + relay_info = RelayInfo.document(listener) + + case Identity.get(identity_opts) do + {:ok, %{pubkey: pubkey}} -> + relay_info + |> Map.put("pubkey", pubkey) + |> Map.put("self", pubkey) + + {:error, _reason} -> + relay_info + end + end + + defp maybe_geohash_tag(config) do + case fetch_value(config, :geohash) do + value when is_binary(value) and value != "" -> [["g", value]] + _other -> [] + end + end + + defp active_targets(config, listeners) do + listeners_by_id = Map.new(listeners, &{&1.id, &1}) + + raw_targets = + case Keyword.get(config, :targets, []) do + [] -> [default_target()] + targets when is_list(targets) -> targets + _other -> [] + end + + Enum.flat_map(raw_targets, fn raw_target -> + case normalize_target(raw_target, listeners_by_id) do + {:ok, target} -> [target] + :error -> [] + end + end) + end + + defp normalize_target(target, listeners_by_id) when is_map(target) or is_list(target) do + listener_id = fetch_value(target, :listener) || :public + relay_url = fetch_value(target, :relay_url) || Application.get_env(:parrhesia, :relay_url) + + with %{} = listener <- Map.get(listeners_by_id, normalize_listener_id(listener_id)), + true <- listener.enabled and Listener.feature_enabled?(listener, :nostr), + {:ok, normalized_relay_url} <- normalize_relay_url(relay_url) do + {:ok, + %{ + listener: listener, + relay_url: normalized_relay_url, + network: normalize_network(fetch_value(target, :network), normalized_relay_url), + relay_type: normalize_optional_string(fetch_value(target, :relay_type)), + geohash: normalize_optional_string(fetch_value(target, :geohash)), + topics: normalize_string_list(fetch_value(target, :topics)) + }} + else + _other -> :error + end + end + + defp normalize_target(_target, _listeners_by_id), do: :error + + defp normalize_relay_url(relay_url) when is_binary(relay_url) and relay_url != "" do + case URI.parse(relay_url) do + %URI{scheme: scheme, host: host} = uri + when scheme in ["ws", "wss"] and is_binary(host) and host != "" -> + normalized_uri = %URI{ + uri + | scheme: String.downcase(scheme), + host: String.downcase(host), + path: normalize_path(uri.path), + query: nil, + fragment: nil, + port: normalize_port(uri.port, scheme) + } + + {:ok, URI.to_string(normalized_uri)} + + _other -> + :error + end + end + + defp normalize_relay_url(_relay_url), do: :error + + defp normalize_path(nil), do: "/" + defp normalize_path(""), do: "/" + defp normalize_path(path), do: path + + defp normalize_port(80, "ws"), do: nil + defp normalize_port(443, "wss"), do: nil + defp normalize_port(port, _scheme), do: port + + defp normalize_network(value, _relay_url) + when is_binary(value) and value in ["clearnet", "tor", "i2p", "loki"], + do: value + + defp normalize_network(_value, relay_url) do + relay_url + |> URI.parse() + |> Map.get(:host) + |> infer_network() + end + + defp infer_network(host) when is_binary(host) do + cond do + String.ends_with?(host, ".onion") -> "tor" + String.ends_with?(host, ".i2p") -> "i2p" + true -> "clearnet" + end + end + + defp infer_network(_host), do: "clearnet" + + defp normalize_checks(checks) when is_list(checks) do + checks + |> Enum.map(&normalize_check/1) + |> Enum.reject(&is_nil/1) + |> Enum.uniq() + end + + defp normalize_checks(_checks), do: @default_checks + + defp normalize_check(:open), do: :open + defp normalize_check("open"), do: :open + defp normalize_check(:read), do: :read + defp normalize_check("read"), do: :read + defp normalize_check(:nip11), do: :nip11 + defp normalize_check("nip11"), do: :nip11 + defp normalize_check(_check), do: nil + + defp listeners(opts) do + case Keyword.get(opts, :listeners) do + listeners when is_list(listeners) -> listeners + _other -> Listener.all() + end + end + + defp identity_opts(opts) do + opts + |> Keyword.take([:path, :private_key, :configured_private_key]) + end + + defp config(opts) do + case Keyword.get(opts, :config) do + config when is_list(config) -> config + _other -> Application.get_env(:parrhesia, :nip66, []) + end + end + + defp config_enabled?(config), do: Keyword.get(config, :enabled, true) + + defp default_target do + %{listener: :public, relay_url: Application.get_env(:parrhesia, :relay_url)} + end + + defp normalize_listener_id(value) when is_atom(value), do: value + + defp normalize_listener_id(value) when is_binary(value) do + String.to_existing_atom(value) + rescue + ArgumentError -> :public + end + + defp normalize_listener_id(_value), do: :public + + defp normalize_positive_integer(value, _default) when is_integer(value) and value > 0, do: value + defp normalize_positive_integer(_value, default), do: default + + defp normalize_optional_string(value) when is_binary(value) and value != "", do: value + defp normalize_optional_string(_value), do: nil + + defp normalize_string_list(values) when is_list(values) do + Enum.filter(values, &(is_binary(&1) and &1 != "")) + end + + defp normalize_string_list(_values), do: [] + + defp fetch_value(map, key) when is_map(map) do + Map.get(map, key) || Map.get(map, Atom.to_string(key)) + end + + defp fetch_value(list, key) when is_list(list) do + if Keyword.keyword?(list), do: Keyword.get(list, key), else: nil + end + + defp fetch_value(_container, _key), do: nil +end diff --git a/lib/parrhesia/nip66/probe.ex b/lib/parrhesia/nip66/probe.ex new file mode 100644 index 0000000..5435ad4 --- /dev/null +++ b/lib/parrhesia/nip66/probe.ex @@ -0,0 +1,217 @@ +defmodule Parrhesia.NIP66.Probe do + @moduledoc false + + alias Parrhesia.Sync.Transport.WebSockexClient + + @type result :: %{ + checks: [atom()], + metrics: map(), + relay_info: map() | nil, + relay_info_body: String.t() | nil + } + + @spec probe(map(), keyword(), keyword()) :: {:ok, result()} + def probe(target, opts \\ [], publish_opts \\ []) + + def probe(target, opts, _publish_opts) when is_map(target) and is_list(opts) do + timeout_ms = Keyword.get(opts, :timeout_ms, 5_000) + checks = normalize_checks(Keyword.get(opts, :checks, [:open, :read, :nip11])) + + initial = %{checks: [], metrics: %{}, relay_info: nil, relay_info_body: nil} + + result = + Enum.reduce(checks, initial, fn check, acc -> + merge_probe_result(acc, check_result(check, target, timeout_ms)) + end) + + {:ok, result} + end + + def probe(_target, _opts, _publish_opts), + do: {:ok, %{checks: [], metrics: %{}, relay_info: nil, relay_info_body: nil}} + + defp merge_probe_result(acc, %{check: check, metric_key: metric_key, metric_value: metric_value}) do + acc + |> Map.update!(:checks, &[check | &1]) + |> Map.update!(:metrics, &Map.put(&1, metric_key, metric_value)) + end + + defp merge_probe_result(acc, %{ + check: check, + relay_info: relay_info, + relay_info_body: relay_info_body + }) do + acc + |> Map.update!(:checks, &[check | &1]) + |> Map.put(:relay_info, relay_info) + |> Map.put(:relay_info_body, relay_info_body) + end + + defp merge_probe_result(acc, :skip), do: acc + defp merge_probe_result(acc, {:error, _reason}), do: acc + + defp check_result(:open, target, timeout_ms) do + case measure_websocket_connect(Map.fetch!(target, :relay_url), timeout_ms) do + {:ok, metric_value} -> + %{check: :open, metric_key: :rtt_open_ms, metric_value: metric_value} + + {:error, reason} -> + {:error, reason} + end + end + + defp check_result(:read, %{listener: listener} = target, timeout_ms) do + if listener.auth.nip42_required do + :skip + else + case measure_websocket_read(Map.fetch!(target, :relay_url), timeout_ms) do + {:ok, metric_value} -> + %{check: :read, metric_key: :rtt_read_ms, metric_value: metric_value} + + {:error, reason} -> + {:error, reason} + end + end + end + + defp check_result(:nip11, target, timeout_ms) do + case fetch_nip11(Map.fetch!(target, :relay_url), timeout_ms) do + {:ok, relay_info, relay_info_body, _metric_value} -> + %{check: :nip11, relay_info: relay_info, relay_info_body: relay_info_body} + + {:error, reason} -> + {:error, reason} + end + end + + defp check_result(_check, _target, _timeout_ms), do: :skip + + defp measure_websocket_connect(relay_url, timeout_ms) do + with {:ok, websocket} <- connect(relay_url, timeout_ms), + {:ok, metric_value} <- await_connected(websocket, timeout_ms) do + :ok = WebSockexClient.close(websocket) + {:ok, metric_value} + end + end + + defp measure_websocket_read(relay_url, timeout_ms) do + with {:ok, websocket} <- connect(relay_url, timeout_ms), + {:ok, started_at} <- await_connected_started_at(websocket, timeout_ms), + :ok <- WebSockexClient.send_json(websocket, ["COUNT", "nip66-probe", %{"kinds" => [1]}]), + {:ok, metric_value} <- await_count_response(websocket, timeout_ms, started_at) do + :ok = WebSockexClient.close(websocket) + {:ok, metric_value} + end + end + + defp connect(relay_url, timeout_ms) do + server = %{url: relay_url, tls: tls_config(relay_url)} + + WebSockexClient.connect(self(), server, websocket_opts: [timeout: timeout_ms, protocols: nil]) + end + + defp await_connected(websocket, timeout_ms) do + with {:ok, started_at} <- await_connected_started_at(websocket, timeout_ms) do + {:ok, monotonic_duration_ms(started_at)} + end + end + + defp await_connected_started_at(websocket, timeout_ms) do + started_at = System.monotonic_time() + + receive do + {:sync_transport, ^websocket, :connected, _metadata} -> {:ok, started_at} + {:sync_transport, ^websocket, :disconnected, reason} -> {:error, reason} + after + timeout_ms -> {:error, :timeout} + end + end + + defp await_count_response(websocket, timeout_ms, started_at) do + receive do + {:sync_transport, ^websocket, :frame, ["COUNT", "nip66-probe", _payload]} -> + {:ok, monotonic_duration_ms(started_at)} + + {:sync_transport, ^websocket, :frame, ["CLOSED", "nip66-probe", _message]} -> + {:error, :closed} + + {:sync_transport, ^websocket, :disconnected, reason} -> + {:error, reason} + after + timeout_ms -> {:error, :timeout} + end + end + + defp fetch_nip11(relay_url, timeout_ms) do + started_at = System.monotonic_time() + + case Req.get( + url: relay_info_url(relay_url), + headers: [{"accept", "application/nostr+json"}], + decode_body: false, + connect_options: [timeout: timeout_ms], + receive_timeout: timeout_ms + ) do + {:ok, %Req.Response{status: 200, body: body}} when is_binary(body) -> + case JSON.decode(body) do + {:ok, relay_info} when is_map(relay_info) -> + {:ok, relay_info, body, monotonic_duration_ms(started_at)} + + {:error, reason} -> + {:error, reason} + + _other -> + {:error, :invalid_relay_info} + end + + {:ok, %Req.Response{status: status}} -> + {:error, {:relay_info_request_failed, status}} + + {:error, reason} -> + {:error, reason} + end + end + + defp relay_info_url(relay_url) do + relay_url + |> URI.parse() + |> Map.update!(:scheme, fn + "wss" -> "https" + "ws" -> "http" + end) + |> URI.to_string() + end + + defp tls_config(relay_url) do + case URI.parse(relay_url) do + %URI{scheme: "wss", host: host} when is_binary(host) and host != "" -> + %{mode: :required, hostname: host, pins: []} + + _other -> + %{mode: :disabled} + end + end + + defp normalize_checks(checks) when is_list(checks) do + checks + |> Enum.map(&normalize_check/1) + |> Enum.reject(&is_nil/1) + |> Enum.uniq() + end + + defp normalize_checks(_checks), do: [] + + defp normalize_check(:open), do: :open + defp normalize_check("open"), do: :open + defp normalize_check(:read), do: :read + defp normalize_check("read"), do: :read + defp normalize_check(:nip11), do: :nip11 + defp normalize_check("nip11"), do: :nip11 + defp normalize_check(_check), do: nil + + defp monotonic_duration_ms(started_at) do + System.monotonic_time() + |> Kernel.-(started_at) + |> System.convert_time_unit(:native, :millisecond) + end +end diff --git a/lib/parrhesia/protocol/event_validator.ex b/lib/parrhesia/protocol/event_validator.ex index de31e3c..e0ee211 100644 --- a/lib/parrhesia/protocol/event_validator.ex +++ b/lib/parrhesia/protocol/event_validator.ex @@ -46,6 +46,13 @@ defmodule Parrhesia.Protocol.EventValidator do | :missing_marmot_group_tag | :invalid_marmot_group_tag | :invalid_marmot_group_content + | :missing_nip66_d_tag + | :invalid_nip66_d_tag + | :invalid_nip66_discovery_tag + | :missing_nip66_frequency_tag + | :invalid_nip66_frequency_tag + | :invalid_nip66_timeout_tag + | :invalid_nip66_check_tag @spec validate(map()) :: :ok | {:error, error_reason()} def validate(event) when is_map(event) do @@ -130,7 +137,19 @@ defmodule Parrhesia.Protocol.EventValidator do missing_marmot_group_tag: "invalid: kind 445 must include at least one h tag with a group id", invalid_marmot_group_tag: "invalid: kind 445 h tags must contain 32-byte lowercase hex group ids", - invalid_marmot_group_content: "invalid: kind 445 content must be non-empty base64" + invalid_marmot_group_content: "invalid: kind 445 content must be non-empty base64", + missing_nip66_d_tag: + "invalid: kind 30166 must include a single [\"d\", ] tag", + invalid_nip66_d_tag: + "invalid: kind 30166 must include a single [\"d\", ] tag", + invalid_nip66_discovery_tag: "invalid: kind 30166 includes malformed NIP-66 discovery tags", + missing_nip66_frequency_tag: + "invalid: kind 10166 must include a single [\"frequency\", ] tag", + invalid_nip66_frequency_tag: + "invalid: kind 10166 must include a single [\"frequency\", ] tag", + invalid_nip66_timeout_tag: + "invalid: kind 10166 timeout tags must be [\"timeout\", , ]", + invalid_nip66_check_tag: "invalid: kind 10166 c tags must contain lowercase check names" } @spec error_message(error_reason()) :: String.t() @@ -252,6 +271,12 @@ defmodule Parrhesia.Protocol.EventValidator do defp validate_kind_specific(%{"kind" => 1059} = event), do: validate_giftwrap_event(event) + defp validate_kind_specific(%{"kind" => 30_166} = event), + do: validate_nip66_discovery_event(event) + + defp validate_kind_specific(%{"kind" => 10_166} = event), + do: validate_nip66_monitor_announcement(event) + defp validate_kind_specific(_event), do: :ok defp validate_marmot_keypackage_event(event) do @@ -325,6 +350,110 @@ defmodule Parrhesia.Protocol.EventValidator do end end + defp validate_nip66_discovery_event(event) do + tags = Map.get(event, "tags", []) + + with :ok <- validate_nip66_d_tag(tags), + :ok <- + validate_optional_single_string_tag_with_predicate( + tags, + "n", + :invalid_nip66_discovery_tag, + &(&1 in ["clearnet", "tor", "i2p", "loki"]) + ), + :ok <- + validate_optional_single_string_tag_with_predicate( + tags, + "T", + :invalid_nip66_discovery_tag, + &valid_pascal_case?/1 + ), + :ok <- + validate_optional_single_string_tag_with_predicate( + tags, + "g", + :invalid_nip66_discovery_tag, + &non_empty_string?/1 + ), + :ok <- + validate_optional_repeated_tag( + tags, + "N", + &positive_integer_string?/1, + :invalid_nip66_discovery_tag + ), + :ok <- + validate_optional_repeated_tag( + tags, + "R", + &valid_nip66_requirement_value?/1, + :invalid_nip66_discovery_tag + ), + :ok <- + validate_optional_repeated_tag( + tags, + "k", + &valid_nip66_kind_value?/1, + :invalid_nip66_discovery_tag + ), + :ok <- + validate_optional_repeated_tag( + tags, + "t", + &non_empty_string?/1, + :invalid_nip66_discovery_tag + ), + :ok <- + validate_optional_single_string_tag_with_predicate( + tags, + "rtt-open", + :invalid_nip66_discovery_tag, + &positive_integer_string?/1 + ), + :ok <- + validate_optional_single_string_tag_with_predicate( + tags, + "rtt-read", + :invalid_nip66_discovery_tag, + &positive_integer_string?/1 + ) do + validate_optional_single_string_tag_with_predicate( + tags, + "rtt-write", + :invalid_nip66_discovery_tag, + &positive_integer_string?/1 + ) + end + end + + defp validate_nip66_monitor_announcement(event) do + tags = Map.get(event, "tags", []) + + with :ok <- + validate_single_string_tag_with_predicate( + tags, + "frequency", + :missing_nip66_frequency_tag, + :invalid_nip66_frequency_tag, + &positive_integer_string?/1 + ), + :ok <- validate_optional_repeated_timeout_tags(tags), + :ok <- + validate_optional_repeated_tag( + tags, + "c", + &valid_nip66_check_name?/1, + :invalid_nip66_check_tag + ) do + validate_optional_single_string_tag_with_predicate( + tags, + "g", + :invalid_nip66_discovery_tag, + &non_empty_string?/1 + ) + end + end + defp validate_non_empty_base64_content(event), do: validate_non_empty_base64_content(event, :invalid_marmot_keypackage_content) @@ -406,6 +535,25 @@ defmodule Parrhesia.Protocol.EventValidator do end end + defp validate_optional_single_string_tag_with_predicate( + tags, + tag_name, + invalid_error, + predicate + ) + when is_function(predicate, 1) do + case Enum.filter(tags, &match_tag_name?(&1, tag_name)) do + [] -> + :ok + + [[^tag_name, value]] -> + if predicate.(value), do: :ok, else: {:error, invalid_error} + + _other -> + {:error, invalid_error} + end + end + defp validate_mls_extensions_tag(tags) do with {:ok, ["mls_extensions" | extensions]} <- fetch_single_tag(tags, "mls_extensions", :missing_marmot_extensions_tag), @@ -444,6 +592,40 @@ defmodule Parrhesia.Protocol.EventValidator do end end + defp validate_nip66_d_tag(tags) do + with {:ok, ["d", value]} <- fetch_single_tag(tags, "d", :missing_nip66_d_tag), + true <- valid_websocket_url?(value) or lowercase_hex?(value, 32) do + :ok + else + {:ok, _invalid_tag_shape} -> {:error, :invalid_nip66_d_tag} + false -> {:error, :invalid_nip66_d_tag} + {:error, _reason} = error -> error + end + end + + defp validate_optional_repeated_timeout_tags(tags) do + timeout_tags = Enum.filter(tags, &match_tag_name?(&1, "timeout")) + + if Enum.all?(timeout_tags, &valid_nip66_timeout_tag?/1) do + :ok + else + {:error, :invalid_nip66_timeout_tag} + end + end + + defp validate_optional_repeated_tag(tags, tag_name, predicate, invalid_error) + when is_function(predicate, 1) do + tags + |> Enum.filter(&match_tag_name?(&1, tag_name)) + |> Enum.reduce_while(:ok, fn + [^tag_name, value], :ok -> + if predicate.(value), do: {:cont, :ok}, else: {:halt, {:error, invalid_error}} + + _other, :ok -> + {:halt, {:error, invalid_error}} + end) + end + defp fetch_single_tag(tags, tag_name, missing_error) do case Enum.filter(tags, &match_tag_name?(&1, tag_name)) do [tag] -> {:ok, tag} @@ -500,6 +682,49 @@ defmodule Parrhesia.Protocol.EventValidator do defp valid_websocket_url?(_url), do: false + defp valid_nip66_timeout_tag?(["timeout", milliseconds]), + do: positive_integer_string?(milliseconds) + + defp valid_nip66_timeout_tag?(["timeout", check, milliseconds]) do + valid_nip66_check_name?(check) and positive_integer_string?(milliseconds) + end + + defp valid_nip66_timeout_tag?(_tag), do: false + + defp valid_nip66_requirement_value?(value) when is_binary(value) do + normalized = String.trim_leading(value, "!") + normalized in ["auth", "writes", "pow", "payment"] + end + + defp valid_nip66_requirement_value?(_value), do: false + + defp valid_nip66_kind_value?(<<"!", rest::binary>>), do: positive_integer_string?(rest) + defp valid_nip66_kind_value?(value), do: positive_integer_string?(value) + + defp valid_nip66_check_name?(value) when is_binary(value) do + String.match?(value, ~r/^[a-z0-9-]+$/) + end + + defp valid_nip66_check_name?(_value), do: false + + defp valid_pascal_case?(value) when is_binary(value) do + String.match?(value, ~r/^[A-Z][A-Za-z0-9]*$/) + end + + defp valid_pascal_case?(_value), do: false + + defp positive_integer_string?(value) when is_binary(value) do + case Integer.parse(value) do + {integer, ""} when integer >= 0 -> true + _other -> false + end + end + + defp positive_integer_string?(_value), do: false + + defp non_empty_string?(value) when is_binary(value), do: value != "" + defp non_empty_string?(_value), do: false + defp valid_keypackage_ref?(value) when is_binary(value) do Enum.any?(@supported_keypackage_ref_sizes, &lowercase_hex?(value, &1)) end diff --git a/lib/parrhesia/tasks/nip66_publisher.ex b/lib/parrhesia/tasks/nip66_publisher.ex new file mode 100644 index 0000000..6adfcbe --- /dev/null +++ b/lib/parrhesia/tasks/nip66_publisher.ex @@ -0,0 +1,40 @@ +defmodule Parrhesia.Tasks.Nip66Publisher do + @moduledoc """ + Periodic worker that publishes NIP-66 monitor and discovery events. + """ + + use GenServer + + alias Parrhesia.NIP66 + + @spec start_link(keyword()) :: GenServer.on_start() + def start_link(opts \\ []) do + name = Keyword.get(opts, :name, __MODULE__) + GenServer.start_link(__MODULE__, opts, name: name) + end + + @impl true + def init(opts) do + state = %{ + interval_ms: Keyword.get(opts, :interval_ms, NIP66.publish_interval_ms()), + publish_opts: Keyword.drop(opts, [:name, :interval_ms, :nip66_module]), + nip66_module: Keyword.get(opts, :nip66_module, NIP66) + } + + schedule_tick(0) + {:ok, state} + end + + @impl true + def handle_info(:tick, state) do + _result = state.nip66_module.publish_snapshot(state.publish_opts) + schedule_tick(state.interval_ms) + {:noreply, state} + end + + def handle_info(_message, state), do: {:noreply, state} + + defp schedule_tick(interval_ms) do + Process.send_after(self(), :tick, interval_ms) + end +end diff --git a/lib/parrhesia/tasks/supervisor.ex b/lib/parrhesia/tasks/supervisor.ex index b95f1d6..eabeaa5 100644 --- a/lib/parrhesia/tasks/supervisor.ex +++ b/lib/parrhesia/tasks/supervisor.ex @@ -11,7 +11,7 @@ defmodule Parrhesia.Tasks.Supervisor do @impl true def init(_init_arg) do - children = expiration_children() ++ partition_retention_children() + children = expiration_children() ++ partition_retention_children() ++ nip66_children() Supervisor.init(children, strategy: :one_for_one) end @@ -29,4 +29,12 @@ defmodule Parrhesia.Tasks.Supervisor do {Parrhesia.Tasks.PartitionRetentionWorker, name: Parrhesia.Tasks.PartitionRetentionWorker} ] end + + defp nip66_children do + if Parrhesia.NIP66.enabled?() do + [{Parrhesia.Tasks.Nip66Publisher, name: Parrhesia.Tasks.Nip66Publisher}] + else + [] + end + end end diff --git a/lib/parrhesia/web/relay_info.ex b/lib/parrhesia/web/relay_info.ex index fe92f47..d34bc66 100644 --- a/lib/parrhesia/web/relay_info.ex +++ b/lib/parrhesia/web/relay_info.ex @@ -12,6 +12,7 @@ defmodule Parrhesia.Web.RelayInfo do "name" => "Parrhesia", "description" => "Nostr/Marmot relay", "pubkey" => relay_pubkey(), + "self" => relay_pubkey(), "supported_nips" => supported_nips(), "software" => "https://git.teralink.net/self/parrhesia", "version" => Application.spec(:parrhesia, :vsn) |> to_string(), @@ -20,13 +21,20 @@ defmodule Parrhesia.Web.RelayInfo do end defp supported_nips do - base = [1, 9, 11, 13, 17, 40, 42, 43, 44, 45, 50, 59, 62, 66, 70] + base = [1, 9, 11, 13, 17, 40, 42, 43, 44, 45, 50, 59, 62, 70] + + with_nip66 = + if Parrhesia.NIP66.enabled?() do + base ++ [66] + else + base + end with_negentropy = if negentropy_enabled?() do - base ++ [77] + with_nip66 ++ [77] else - base + with_nip66 end with_negentropy ++ [86, 98] @@ -38,7 +46,12 @@ defmodule Parrhesia.Web.RelayInfo do "max_subscriptions" => Parrhesia.Config.get([:limits, :max_subscriptions_per_connection], 32), "max_filters" => Parrhesia.Config.get([:limits, :max_filters_per_req], 16), - "auth_required" => Listener.relay_auth_required?(listener) + "max_limit" => Parrhesia.Config.get([:limits, :max_filter_limit], 500), + "max_event_tags" => Parrhesia.Config.get([:limits, :max_tags_per_event], 256), + "min_pow_difficulty" => Parrhesia.Config.get([:policies, :min_pow_difficulty], 0), + "auth_required" => Listener.relay_auth_required?(listener), + "payment_required" => false, + "restricted_writes" => restricted_writes?(listener) } end @@ -54,4 +67,12 @@ defmodule Parrhesia.Web.RelayInfo do {:error, _reason} -> nil end end + + defp restricted_writes?(listener) do + listener.auth.nip42_required or + (listener.baseline_acl.write != [] and + Enum.any?(listener.baseline_acl.write, &(&1.action == :deny))) or + Parrhesia.Config.get([:policies, :auth_required_for_writes], false) or + Parrhesia.Config.get([:policies, :min_pow_difficulty], 0) > 0 + end end diff --git a/test/parrhesia/nip66_test.exs b/test/parrhesia/nip66_test.exs new file mode 100644 index 0000000..612dd08 --- /dev/null +++ b/test/parrhesia/nip66_test.exs @@ -0,0 +1,114 @@ +defmodule Parrhesia.Nip66Test do + use Parrhesia.IntegrationCase, async: false, sandbox: true + + alias Parrhesia.API.Events + alias Parrhesia.API.RequestContext + alias Parrhesia.NIP66 + alias Parrhesia.Protocol.EventValidator + alias Parrhesia.Web.Listener + alias Parrhesia.Web.RelayInfo + + setup do + previous_nip66 = Application.get_env(:parrhesia, :nip66) + previous_relay_url = Application.get_env(:parrhesia, :relay_url) + + on_exit(fn -> + Application.put_env(:parrhesia, :nip66, previous_nip66) + Application.put_env(:parrhesia, :relay_url, previous_relay_url) + end) + + :ok + end + + test "publish_snapshot stores monitor and discovery events for the configured relay" do + identity_path = unique_identity_path() + relay_url = "ws://127.0.0.1:4413/relay" + + Application.put_env(:parrhesia, :relay_url, relay_url) + + Application.put_env(:parrhesia, :nip66, + enabled: true, + publish_interval_seconds: 600, + publish_monitor_announcement?: true, + timeout_ms: 2_500, + checks: [:open, :read, :nip11], + geohash: "u33dc1", + targets: [ + %{ + listener: :public, + relay_url: relay_url, + topics: ["marmot"], + relay_type: "PublicInbox" + } + ] + ) + + probe_fun = fn _target, _probe_opts, _publish_opts -> + {:ok, + %{ + checks: [:open, :read, :nip11], + metrics: %{rtt_open_ms: 12, rtt_read_ms: 34}, + relay_info: nil, + relay_info_body: nil + }} + end + + assert {:ok, [monitor_event, discovery_event]} = + NIP66.publish_snapshot( + path: identity_path, + now: 1_700_000_000, + context: %RequestContext{}, + probe_fun: probe_fun + ) + + assert monitor_event["kind"] == 10_166 + assert discovery_event["kind"] == 30_166 + assert :ok = EventValidator.validate(monitor_event) + assert :ok = EventValidator.validate(discovery_event) + + assert {:ok, stored_events} = + Events.query( + [%{"ids" => [monitor_event["id"], discovery_event["id"]]}], + context: %RequestContext{} + ) + + assert Enum.sort(Enum.map(stored_events, & &1["kind"])) == [10_166, 30_166] + + assert Enum.any?(discovery_event["tags"], &(&1 == ["d", relay_url])) + assert Enum.any?(discovery_event["tags"], &(&1 == ["n", "clearnet"])) + assert Enum.any?(discovery_event["tags"], &(&1 == ["T", "PublicInbox"])) + assert Enum.any?(discovery_event["tags"], &(&1 == ["t", "marmot"])) + assert Enum.any?(discovery_event["tags"], &(&1 == ["rtt-open", "12"])) + assert Enum.any?(discovery_event["tags"], &(&1 == ["rtt-read", "34"])) + assert Enum.any?(discovery_event["tags"], &(&1 == ["N", "66"])) + assert Enum.any?(discovery_event["tags"], &(&1 == ["R", "!payment"])) + + relay_info = JSON.decode!(discovery_event["content"]) + assert relay_info["self"] == discovery_event["pubkey"] + assert 66 in relay_info["supported_nips"] + end + + test "relay info only advertises NIP-66 when the publisher is enabled" do + listener = Listener.from_opts(listener: %{id: :public, bind: %{port: 4413}}) + + Application.put_env(:parrhesia, :nip66, enabled: false) + refute 66 in RelayInfo.document(listener)["supported_nips"] + + Application.put_env(:parrhesia, :nip66, enabled: true) + assert 66 in RelayInfo.document(listener)["supported_nips"] + end + + defp unique_identity_path do + path = + Path.join( + System.tmp_dir!(), + "parrhesia_nip66_identity_#{System.unique_integer([:positive, :monotonic])}.json" + ) + + on_exit(fn -> + _ = File.rm(path) + end) + + path + end +end diff --git a/test/parrhesia/protocol/event_validator_nip66_test.exs b/test/parrhesia/protocol/event_validator_nip66_test.exs new file mode 100644 index 0000000..bbe54dc --- /dev/null +++ b/test/parrhesia/protocol/event_validator_nip66_test.exs @@ -0,0 +1,70 @@ +defmodule Parrhesia.Protocol.EventValidatorNip66Test do + use ExUnit.Case, async: true + + alias Parrhesia.Protocol.EventValidator + + test "accepts valid kind 30166 relay discovery events" do + event = valid_discovery_event() + + assert :ok = EventValidator.validate(event) + end + + test "rejects kind 30166 discovery events without d tags" do + event = valid_discovery_event(%{"tags" => [["N", "11"]]}) + + assert {:error, :missing_nip66_d_tag} = EventValidator.validate(event) + end + + test "accepts valid kind 10166 monitor announcements" do + event = valid_monitor_announcement() + + assert :ok = EventValidator.validate(event) + end + + test "rejects kind 10166 monitor announcements without frequency tags" do + event = valid_monitor_announcement(%{"tags" => [["c", "open"]]}) + + assert {:error, :missing_nip66_frequency_tag} = EventValidator.validate(event) + end + + defp valid_discovery_event(overrides \\ %{}) do + base_event = %{ + "pubkey" => String.duplicate("1", 64), + "created_at" => System.system_time(:second), + "kind" => 30_166, + "tags" => [ + ["d", "wss://relay.example.com/relay"], + ["n", "clearnet"], + ["N", "11"], + ["R", "!payment"], + ["R", "auth"], + ["t", "marmot"], + ["rtt-open", "12"] + ], + "content" => "{}", + "sig" => String.duplicate("2", 128) + } + + event = Map.merge(base_event, overrides) + Map.put(event, "id", EventValidator.compute_id(event)) + end + + defp valid_monitor_announcement(overrides \\ %{}) do + base_event = %{ + "pubkey" => String.duplicate("3", 64), + "created_at" => System.system_time(:second), + "kind" => 10_166, + "tags" => [ + ["frequency", "900"], + ["timeout", "open", "5000"], + ["c", "open"], + ["c", "nip11"] + ], + "content" => "", + "sig" => String.duplicate("4", 128) + } + + event = Map.merge(base_event, overrides) + Map.put(event, "id", EventValidator.compute_id(event)) + end +end diff --git a/test/parrhesia/tasks/nip66_publisher_test.exs b/test/parrhesia/tasks/nip66_publisher_test.exs new file mode 100644 index 0000000..ac34cf1 --- /dev/null +++ b/test/parrhesia/tasks/nip66_publisher_test.exs @@ -0,0 +1,48 @@ +defmodule Parrhesia.Tasks.Nip66PublisherTest do + use Parrhesia.IntegrationCase, async: false + + alias Parrhesia.API.Events + alias Parrhesia.API.Identity + alias Parrhesia.API.RequestContext + alias Parrhesia.Tasks.Nip66Publisher + + test "publishes a NIP-66 snapshot when ticked" do + path = + Path.join( + System.tmp_dir!(), + "parrhesia_nip66_worker_#{System.unique_integer([:positive, :monotonic])}.json" + ) + + on_exit(fn -> + _ = File.rm(path) + end) + + probe_fun = fn _target, _probe_opts, _publish_opts -> + {:ok, %{checks: [:open], metrics: %{rtt_open_ms: 8}, relay_info: nil, relay_info_body: nil}} + end + + worker = + start_supervised!( + {Nip66Publisher, + name: nil, + interval_ms: 60_000, + path: path, + now: 1_700_000_123, + probe_fun: probe_fun, + config: [enabled: true, publish_interval_seconds: 600, targets: [%{listener: :public}]]} + ) + + send(worker, :tick) + _ = :sys.get_state(worker) + + {:ok, %{pubkey: pubkey}} = Identity.get(path: path) + + assert {:ok, events} = + Events.query( + [%{"authors" => [pubkey], "kinds" => [10_166, 30_166]}], + context: %RequestContext{} + ) + + assert Enum.sort(Enum.map(events, & &1["kind"])) == [10_166, 30_166] + end +end