Implement NIP-66 relay discovery publishing
This commit is contained in:
400
lib/parrhesia/nip66.ex
Normal file
400
lib/parrhesia/nip66.ex
Normal file
@@ -0,0 +1,400 @@
|
||||
defmodule Parrhesia.NIP66 do
|
||||
@moduledoc false
|
||||
|
||||
alias Parrhesia.API.Events
|
||||
alias Parrhesia.API.Identity
|
||||
alias Parrhesia.API.RequestContext
|
||||
alias Parrhesia.NIP66.Probe
|
||||
alias Parrhesia.Web.Listener
|
||||
alias Parrhesia.Web.RelayInfo
|
||||
|
||||
@default_publish_interval_seconds 900
|
||||
@default_timeout_ms 5_000
|
||||
@default_checks [:open, :read, :nip11]
|
||||
@allowed_requirement_keys MapSet.new(~w[auth writes pow payment])
|
||||
|
||||
@spec enabled?(keyword()) :: boolean()
|
||||
def enabled?(opts \\ []) do
|
||||
config = config(opts)
|
||||
config_enabled?(config) and active_targets(config, listeners(opts)) != []
|
||||
end
|
||||
|
||||
@spec publish_snapshot(keyword()) :: {:ok, [map()]}
|
||||
def publish_snapshot(opts \\ []) when is_list(opts) do
|
||||
config = config(opts)
|
||||
targets = active_targets(config, listeners(opts))
|
||||
|
||||
if config_enabled?(config) and targets != [] do
|
||||
probe_fun = Keyword.get(opts, :probe_fun, &Probe.probe/3)
|
||||
context = Keyword.get(opts, :context, %RequestContext{})
|
||||
now = Keyword.get(opts, :now, System.system_time(:second))
|
||||
identity_opts = identity_opts(opts)
|
||||
|
||||
events =
|
||||
maybe_publish_monitor_announcement(config, now, context, identity_opts)
|
||||
|> Kernel.++(
|
||||
publish_discovery_events(targets, config, probe_fun, now, context, identity_opts)
|
||||
)
|
||||
|
||||
{:ok, events}
|
||||
else
|
||||
{:ok, []}
|
||||
end
|
||||
end
|
||||
|
||||
@spec publish_interval_ms(keyword()) :: pos_integer()
|
||||
def publish_interval_ms(opts \\ []) when is_list(opts) do
|
||||
config = config(opts)
|
||||
|
||||
config
|
||||
|> Keyword.get(:publish_interval_seconds, @default_publish_interval_seconds)
|
||||
|> normalize_positive_integer(@default_publish_interval_seconds)
|
||||
|> Kernel.*(1_000)
|
||||
end
|
||||
|
||||
defp maybe_publish_monitor_announcement(config, now, context, identity_opts) do
|
||||
if Keyword.get(config, :publish_monitor_announcement?, true) do
|
||||
config
|
||||
|> build_monitor_announcement(now)
|
||||
|> sign_and_publish(context, identity_opts)
|
||||
|> maybe_wrap_event()
|
||||
else
|
||||
[]
|
||||
end
|
||||
end
|
||||
|
||||
defp publish_discovery_events(targets, config, probe_fun, now, context, identity_opts) do
|
||||
probe_opts = [
|
||||
timeout_ms:
|
||||
config
|
||||
|> Keyword.get(:timeout_ms, @default_timeout_ms)
|
||||
|> normalize_positive_integer(@default_timeout_ms),
|
||||
checks: normalize_checks(Keyword.get(config, :checks, @default_checks))
|
||||
]
|
||||
|
||||
Enum.flat_map(targets, fn target ->
|
||||
probe_result =
|
||||
case probe_fun.(target, probe_opts, identity_opts) do
|
||||
{:ok, result} when is_map(result) -> result
|
||||
_other -> %{checks: [], metrics: %{}, relay_info: nil, relay_info_body: nil}
|
||||
end
|
||||
|
||||
target
|
||||
|> build_discovery_event(now, probe_result, identity_opts)
|
||||
|> sign_and_publish(context, identity_opts)
|
||||
|> maybe_wrap_event()
|
||||
end)
|
||||
end
|
||||
|
||||
defp sign_and_publish(event, context, identity_opts) do
|
||||
with {:ok, signed_event} <- Identity.sign_event(event, identity_opts),
|
||||
{:ok, %{accepted: true}} <- Events.publish(signed_event, context: context) do
|
||||
{:ok, signed_event}
|
||||
else
|
||||
_other -> :error
|
||||
end
|
||||
end
|
||||
|
||||
defp maybe_wrap_event({:ok, event}), do: [event]
|
||||
defp maybe_wrap_event(_other), do: []
|
||||
|
||||
defp build_monitor_announcement(config, now) do
|
||||
checks = normalize_checks(Keyword.get(config, :checks, @default_checks))
|
||||
timeout_ms = Keyword.get(config, :timeout_ms, @default_timeout_ms)
|
||||
frequency = Keyword.get(config, :publish_interval_seconds, @default_publish_interval_seconds)
|
||||
|
||||
tags =
|
||||
[
|
||||
[
|
||||
"frequency",
|
||||
Integer.to_string(
|
||||
normalize_positive_integer(frequency, @default_publish_interval_seconds)
|
||||
)
|
||||
]
|
||||
] ++
|
||||
Enum.map(checks, fn check ->
|
||||
["timeout", Atom.to_string(check), Integer.to_string(timeout_ms)]
|
||||
end) ++
|
||||
Enum.map(checks, fn check -> ["c", Atom.to_string(check)] end) ++
|
||||
maybe_geohash_tag(config)
|
||||
|
||||
%{
|
||||
"created_at" => now,
|
||||
"kind" => 10_166,
|
||||
"tags" => tags,
|
||||
"content" => ""
|
||||
}
|
||||
end
|
||||
|
||||
defp build_discovery_event(target, now, probe_result, identity_opts) do
|
||||
relay_info = probe_result[:relay_info] || local_relay_info(target.listener, identity_opts)
|
||||
content = probe_result[:relay_info_body] || JSON.encode!(relay_info)
|
||||
|
||||
tags =
|
||||
[["d", target.relay_url]]
|
||||
|> append_network_tag(target)
|
||||
|> append_relay_type_tag(target)
|
||||
|> append_geohash_tag(target)
|
||||
|> append_topic_tags(target)
|
||||
|> Kernel.++(nip_tags(relay_info))
|
||||
|> Kernel.++(requirement_tags(relay_info))
|
||||
|> Kernel.++(rtt_tags(probe_result[:metrics] || %{}))
|
||||
|
||||
%{
|
||||
"created_at" => now,
|
||||
"kind" => 30_166,
|
||||
"tags" => tags,
|
||||
"content" => content
|
||||
}
|
||||
end
|
||||
|
||||
defp nip_tags(relay_info) do
|
||||
relay_info
|
||||
|> Map.get("supported_nips", [])
|
||||
|> Enum.map(&["N", Integer.to_string(&1)])
|
||||
end
|
||||
|
||||
defp requirement_tags(relay_info) do
|
||||
limitation = Map.get(relay_info, "limitation", %{})
|
||||
|
||||
[
|
||||
requirement_value("auth", Map.get(limitation, "auth_required", false)),
|
||||
requirement_value("writes", Map.get(limitation, "restricted_writes", false)),
|
||||
requirement_value("pow", Map.get(limitation, "min_pow_difficulty", 0) > 0),
|
||||
requirement_value("payment", Map.get(limitation, "payment_required", false))
|
||||
]
|
||||
|> Enum.filter(&MapSet.member?(@allowed_requirement_keys, String.trim_leading(&1, "!")))
|
||||
|> Enum.map(&["R", &1])
|
||||
end
|
||||
|
||||
defp requirement_value(name, true), do: name
|
||||
defp requirement_value(name, false), do: "!" <> name
|
||||
|
||||
defp rtt_tags(metrics) when is_map(metrics) do
|
||||
[]
|
||||
|> maybe_put_metric_tag("rtt-open", Map.get(metrics, :rtt_open_ms))
|
||||
|> maybe_put_metric_tag("rtt-read", Map.get(metrics, :rtt_read_ms))
|
||||
|> maybe_put_metric_tag("rtt-write", Map.get(metrics, :rtt_write_ms))
|
||||
end
|
||||
|
||||
defp append_network_tag(tags, target) do
|
||||
case target.network do
|
||||
nil -> tags
|
||||
value -> tags ++ [["n", value]]
|
||||
end
|
||||
end
|
||||
|
||||
defp append_relay_type_tag(tags, target) do
|
||||
case target.relay_type do
|
||||
nil -> tags
|
||||
value -> tags ++ [["T", value]]
|
||||
end
|
||||
end
|
||||
|
||||
defp append_geohash_tag(tags, target) do
|
||||
case target.geohash do
|
||||
nil -> tags
|
||||
value -> tags ++ [["g", value]]
|
||||
end
|
||||
end
|
||||
|
||||
defp append_topic_tags(tags, target) do
|
||||
tags ++ Enum.map(target.topics, &["t", &1])
|
||||
end
|
||||
|
||||
defp maybe_put_metric_tag(tags, _name, nil), do: tags
|
||||
|
||||
defp maybe_put_metric_tag(tags, name, value) when is_integer(value) and value >= 0 do
|
||||
tags ++ [[name, Integer.to_string(value)]]
|
||||
end
|
||||
|
||||
defp maybe_put_metric_tag(tags, _name, _value), do: tags
|
||||
|
||||
defp local_relay_info(listener, identity_opts) do
|
||||
relay_info = RelayInfo.document(listener)
|
||||
|
||||
case Identity.get(identity_opts) do
|
||||
{:ok, %{pubkey: pubkey}} ->
|
||||
relay_info
|
||||
|> Map.put("pubkey", pubkey)
|
||||
|> Map.put("self", pubkey)
|
||||
|
||||
{:error, _reason} ->
|
||||
relay_info
|
||||
end
|
||||
end
|
||||
|
||||
defp maybe_geohash_tag(config) do
|
||||
case fetch_value(config, :geohash) do
|
||||
value when is_binary(value) and value != "" -> [["g", value]]
|
||||
_other -> []
|
||||
end
|
||||
end
|
||||
|
||||
defp active_targets(config, listeners) do
|
||||
listeners_by_id = Map.new(listeners, &{&1.id, &1})
|
||||
|
||||
raw_targets =
|
||||
case Keyword.get(config, :targets, []) do
|
||||
[] -> [default_target()]
|
||||
targets when is_list(targets) -> targets
|
||||
_other -> []
|
||||
end
|
||||
|
||||
Enum.flat_map(raw_targets, fn raw_target ->
|
||||
case normalize_target(raw_target, listeners_by_id) do
|
||||
{:ok, target} -> [target]
|
||||
:error -> []
|
||||
end
|
||||
end)
|
||||
end
|
||||
|
||||
defp normalize_target(target, listeners_by_id) when is_map(target) or is_list(target) do
|
||||
listener_id = fetch_value(target, :listener) || :public
|
||||
relay_url = fetch_value(target, :relay_url) || Application.get_env(:parrhesia, :relay_url)
|
||||
|
||||
with %{} = listener <- Map.get(listeners_by_id, normalize_listener_id(listener_id)),
|
||||
true <- listener.enabled and Listener.feature_enabled?(listener, :nostr),
|
||||
{:ok, normalized_relay_url} <- normalize_relay_url(relay_url) do
|
||||
{:ok,
|
||||
%{
|
||||
listener: listener,
|
||||
relay_url: normalized_relay_url,
|
||||
network: normalize_network(fetch_value(target, :network), normalized_relay_url),
|
||||
relay_type: normalize_optional_string(fetch_value(target, :relay_type)),
|
||||
geohash: normalize_optional_string(fetch_value(target, :geohash)),
|
||||
topics: normalize_string_list(fetch_value(target, :topics))
|
||||
}}
|
||||
else
|
||||
_other -> :error
|
||||
end
|
||||
end
|
||||
|
||||
defp normalize_target(_target, _listeners_by_id), do: :error
|
||||
|
||||
defp normalize_relay_url(relay_url) when is_binary(relay_url) and relay_url != "" do
|
||||
case URI.parse(relay_url) do
|
||||
%URI{scheme: scheme, host: host} = uri
|
||||
when scheme in ["ws", "wss"] and is_binary(host) and host != "" ->
|
||||
normalized_uri = %URI{
|
||||
uri
|
||||
| scheme: String.downcase(scheme),
|
||||
host: String.downcase(host),
|
||||
path: normalize_path(uri.path),
|
||||
query: nil,
|
||||
fragment: nil,
|
||||
port: normalize_port(uri.port, scheme)
|
||||
}
|
||||
|
||||
{:ok, URI.to_string(normalized_uri)}
|
||||
|
||||
_other ->
|
||||
:error
|
||||
end
|
||||
end
|
||||
|
||||
defp normalize_relay_url(_relay_url), do: :error
|
||||
|
||||
defp normalize_path(nil), do: "/"
|
||||
defp normalize_path(""), do: "/"
|
||||
defp normalize_path(path), do: path
|
||||
|
||||
defp normalize_port(80, "ws"), do: nil
|
||||
defp normalize_port(443, "wss"), do: nil
|
||||
defp normalize_port(port, _scheme), do: port
|
||||
|
||||
defp normalize_network(value, _relay_url)
|
||||
when is_binary(value) and value in ["clearnet", "tor", "i2p", "loki"],
|
||||
do: value
|
||||
|
||||
defp normalize_network(_value, relay_url) do
|
||||
relay_url
|
||||
|> URI.parse()
|
||||
|> Map.get(:host)
|
||||
|> infer_network()
|
||||
end
|
||||
|
||||
defp infer_network(host) when is_binary(host) do
|
||||
cond do
|
||||
String.ends_with?(host, ".onion") -> "tor"
|
||||
String.ends_with?(host, ".i2p") -> "i2p"
|
||||
true -> "clearnet"
|
||||
end
|
||||
end
|
||||
|
||||
defp infer_network(_host), do: "clearnet"
|
||||
|
||||
defp normalize_checks(checks) when is_list(checks) do
|
||||
checks
|
||||
|> Enum.map(&normalize_check/1)
|
||||
|> Enum.reject(&is_nil/1)
|
||||
|> Enum.uniq()
|
||||
end
|
||||
|
||||
defp normalize_checks(_checks), do: @default_checks
|
||||
|
||||
defp normalize_check(:open), do: :open
|
||||
defp normalize_check("open"), do: :open
|
||||
defp normalize_check(:read), do: :read
|
||||
defp normalize_check("read"), do: :read
|
||||
defp normalize_check(:nip11), do: :nip11
|
||||
defp normalize_check("nip11"), do: :nip11
|
||||
defp normalize_check(_check), do: nil
|
||||
|
||||
defp listeners(opts) do
|
||||
case Keyword.get(opts, :listeners) do
|
||||
listeners when is_list(listeners) -> listeners
|
||||
_other -> Listener.all()
|
||||
end
|
||||
end
|
||||
|
||||
defp identity_opts(opts) do
|
||||
opts
|
||||
|> Keyword.take([:path, :private_key, :configured_private_key])
|
||||
end
|
||||
|
||||
defp config(opts) do
|
||||
case Keyword.get(opts, :config) do
|
||||
config when is_list(config) -> config
|
||||
_other -> Application.get_env(:parrhesia, :nip66, [])
|
||||
end
|
||||
end
|
||||
|
||||
defp config_enabled?(config), do: Keyword.get(config, :enabled, true)
|
||||
|
||||
defp default_target do
|
||||
%{listener: :public, relay_url: Application.get_env(:parrhesia, :relay_url)}
|
||||
end
|
||||
|
||||
defp normalize_listener_id(value) when is_atom(value), do: value
|
||||
|
||||
defp normalize_listener_id(value) when is_binary(value) do
|
||||
String.to_existing_atom(value)
|
||||
rescue
|
||||
ArgumentError -> :public
|
||||
end
|
||||
|
||||
defp normalize_listener_id(_value), do: :public
|
||||
|
||||
defp normalize_positive_integer(value, _default) when is_integer(value) and value > 0, do: value
|
||||
defp normalize_positive_integer(_value, default), do: default
|
||||
|
||||
defp normalize_optional_string(value) when is_binary(value) and value != "", do: value
|
||||
defp normalize_optional_string(_value), do: nil
|
||||
|
||||
defp normalize_string_list(values) when is_list(values) do
|
||||
Enum.filter(values, &(is_binary(&1) and &1 != ""))
|
||||
end
|
||||
|
||||
defp normalize_string_list(_values), do: []
|
||||
|
||||
defp fetch_value(map, key) when is_map(map) do
|
||||
Map.get(map, key) || Map.get(map, Atom.to_string(key))
|
||||
end
|
||||
|
||||
defp fetch_value(list, key) when is_list(list) do
|
||||
if Keyword.keyword?(list), do: Keyword.get(list, key), else: nil
|
||||
end
|
||||
|
||||
defp fetch_value(_container, _key), do: nil
|
||||
end
|
||||
217
lib/parrhesia/nip66/probe.ex
Normal file
217
lib/parrhesia/nip66/probe.ex
Normal file
@@ -0,0 +1,217 @@
|
||||
defmodule Parrhesia.NIP66.Probe do
|
||||
@moduledoc false
|
||||
|
||||
alias Parrhesia.Sync.Transport.WebSockexClient
|
||||
|
||||
@type result :: %{
|
||||
checks: [atom()],
|
||||
metrics: map(),
|
||||
relay_info: map() | nil,
|
||||
relay_info_body: String.t() | nil
|
||||
}
|
||||
|
||||
@spec probe(map(), keyword(), keyword()) :: {:ok, result()}
|
||||
def probe(target, opts \\ [], publish_opts \\ [])
|
||||
|
||||
def probe(target, opts, _publish_opts) when is_map(target) and is_list(opts) do
|
||||
timeout_ms = Keyword.get(opts, :timeout_ms, 5_000)
|
||||
checks = normalize_checks(Keyword.get(opts, :checks, [:open, :read, :nip11]))
|
||||
|
||||
initial = %{checks: [], metrics: %{}, relay_info: nil, relay_info_body: nil}
|
||||
|
||||
result =
|
||||
Enum.reduce(checks, initial, fn check, acc ->
|
||||
merge_probe_result(acc, check_result(check, target, timeout_ms))
|
||||
end)
|
||||
|
||||
{:ok, result}
|
||||
end
|
||||
|
||||
def probe(_target, _opts, _publish_opts),
|
||||
do: {:ok, %{checks: [], metrics: %{}, relay_info: nil, relay_info_body: nil}}
|
||||
|
||||
defp merge_probe_result(acc, %{check: check, metric_key: metric_key, metric_value: metric_value}) do
|
||||
acc
|
||||
|> Map.update!(:checks, &[check | &1])
|
||||
|> Map.update!(:metrics, &Map.put(&1, metric_key, metric_value))
|
||||
end
|
||||
|
||||
defp merge_probe_result(acc, %{
|
||||
check: check,
|
||||
relay_info: relay_info,
|
||||
relay_info_body: relay_info_body
|
||||
}) do
|
||||
acc
|
||||
|> Map.update!(:checks, &[check | &1])
|
||||
|> Map.put(:relay_info, relay_info)
|
||||
|> Map.put(:relay_info_body, relay_info_body)
|
||||
end
|
||||
|
||||
defp merge_probe_result(acc, :skip), do: acc
|
||||
defp merge_probe_result(acc, {:error, _reason}), do: acc
|
||||
|
||||
defp check_result(:open, target, timeout_ms) do
|
||||
case measure_websocket_connect(Map.fetch!(target, :relay_url), timeout_ms) do
|
||||
{:ok, metric_value} ->
|
||||
%{check: :open, metric_key: :rtt_open_ms, metric_value: metric_value}
|
||||
|
||||
{:error, reason} ->
|
||||
{:error, reason}
|
||||
end
|
||||
end
|
||||
|
||||
defp check_result(:read, %{listener: listener} = target, timeout_ms) do
|
||||
if listener.auth.nip42_required do
|
||||
:skip
|
||||
else
|
||||
case measure_websocket_read(Map.fetch!(target, :relay_url), timeout_ms) do
|
||||
{:ok, metric_value} ->
|
||||
%{check: :read, metric_key: :rtt_read_ms, metric_value: metric_value}
|
||||
|
||||
{:error, reason} ->
|
||||
{:error, reason}
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
defp check_result(:nip11, target, timeout_ms) do
|
||||
case fetch_nip11(Map.fetch!(target, :relay_url), timeout_ms) do
|
||||
{:ok, relay_info, relay_info_body, _metric_value} ->
|
||||
%{check: :nip11, relay_info: relay_info, relay_info_body: relay_info_body}
|
||||
|
||||
{:error, reason} ->
|
||||
{:error, reason}
|
||||
end
|
||||
end
|
||||
|
||||
defp check_result(_check, _target, _timeout_ms), do: :skip
|
||||
|
||||
defp measure_websocket_connect(relay_url, timeout_ms) do
|
||||
with {:ok, websocket} <- connect(relay_url, timeout_ms),
|
||||
{:ok, metric_value} <- await_connected(websocket, timeout_ms) do
|
||||
:ok = WebSockexClient.close(websocket)
|
||||
{:ok, metric_value}
|
||||
end
|
||||
end
|
||||
|
||||
defp measure_websocket_read(relay_url, timeout_ms) do
|
||||
with {:ok, websocket} <- connect(relay_url, timeout_ms),
|
||||
{:ok, started_at} <- await_connected_started_at(websocket, timeout_ms),
|
||||
:ok <- WebSockexClient.send_json(websocket, ["COUNT", "nip66-probe", %{"kinds" => [1]}]),
|
||||
{:ok, metric_value} <- await_count_response(websocket, timeout_ms, started_at) do
|
||||
:ok = WebSockexClient.close(websocket)
|
||||
{:ok, metric_value}
|
||||
end
|
||||
end
|
||||
|
||||
defp connect(relay_url, timeout_ms) do
|
||||
server = %{url: relay_url, tls: tls_config(relay_url)}
|
||||
|
||||
WebSockexClient.connect(self(), server, websocket_opts: [timeout: timeout_ms, protocols: nil])
|
||||
end
|
||||
|
||||
defp await_connected(websocket, timeout_ms) do
|
||||
with {:ok, started_at} <- await_connected_started_at(websocket, timeout_ms) do
|
||||
{:ok, monotonic_duration_ms(started_at)}
|
||||
end
|
||||
end
|
||||
|
||||
defp await_connected_started_at(websocket, timeout_ms) do
|
||||
started_at = System.monotonic_time()
|
||||
|
||||
receive do
|
||||
{:sync_transport, ^websocket, :connected, _metadata} -> {:ok, started_at}
|
||||
{:sync_transport, ^websocket, :disconnected, reason} -> {:error, reason}
|
||||
after
|
||||
timeout_ms -> {:error, :timeout}
|
||||
end
|
||||
end
|
||||
|
||||
defp await_count_response(websocket, timeout_ms, started_at) do
|
||||
receive do
|
||||
{:sync_transport, ^websocket, :frame, ["COUNT", "nip66-probe", _payload]} ->
|
||||
{:ok, monotonic_duration_ms(started_at)}
|
||||
|
||||
{:sync_transport, ^websocket, :frame, ["CLOSED", "nip66-probe", _message]} ->
|
||||
{:error, :closed}
|
||||
|
||||
{:sync_transport, ^websocket, :disconnected, reason} ->
|
||||
{:error, reason}
|
||||
after
|
||||
timeout_ms -> {:error, :timeout}
|
||||
end
|
||||
end
|
||||
|
||||
defp fetch_nip11(relay_url, timeout_ms) do
|
||||
started_at = System.monotonic_time()
|
||||
|
||||
case Req.get(
|
||||
url: relay_info_url(relay_url),
|
||||
headers: [{"accept", "application/nostr+json"}],
|
||||
decode_body: false,
|
||||
connect_options: [timeout: timeout_ms],
|
||||
receive_timeout: timeout_ms
|
||||
) do
|
||||
{:ok, %Req.Response{status: 200, body: body}} when is_binary(body) ->
|
||||
case JSON.decode(body) do
|
||||
{:ok, relay_info} when is_map(relay_info) ->
|
||||
{:ok, relay_info, body, monotonic_duration_ms(started_at)}
|
||||
|
||||
{:error, reason} ->
|
||||
{:error, reason}
|
||||
|
||||
_other ->
|
||||
{:error, :invalid_relay_info}
|
||||
end
|
||||
|
||||
{:ok, %Req.Response{status: status}} ->
|
||||
{:error, {:relay_info_request_failed, status}}
|
||||
|
||||
{:error, reason} ->
|
||||
{:error, reason}
|
||||
end
|
||||
end
|
||||
|
||||
defp relay_info_url(relay_url) do
|
||||
relay_url
|
||||
|> URI.parse()
|
||||
|> Map.update!(:scheme, fn
|
||||
"wss" -> "https"
|
||||
"ws" -> "http"
|
||||
end)
|
||||
|> URI.to_string()
|
||||
end
|
||||
|
||||
defp tls_config(relay_url) do
|
||||
case URI.parse(relay_url) do
|
||||
%URI{scheme: "wss", host: host} when is_binary(host) and host != "" ->
|
||||
%{mode: :required, hostname: host, pins: []}
|
||||
|
||||
_other ->
|
||||
%{mode: :disabled}
|
||||
end
|
||||
end
|
||||
|
||||
defp normalize_checks(checks) when is_list(checks) do
|
||||
checks
|
||||
|> Enum.map(&normalize_check/1)
|
||||
|> Enum.reject(&is_nil/1)
|
||||
|> Enum.uniq()
|
||||
end
|
||||
|
||||
defp normalize_checks(_checks), do: []
|
||||
|
||||
defp normalize_check(:open), do: :open
|
||||
defp normalize_check("open"), do: :open
|
||||
defp normalize_check(:read), do: :read
|
||||
defp normalize_check("read"), do: :read
|
||||
defp normalize_check(:nip11), do: :nip11
|
||||
defp normalize_check("nip11"), do: :nip11
|
||||
defp normalize_check(_check), do: nil
|
||||
|
||||
defp monotonic_duration_ms(started_at) do
|
||||
System.monotonic_time()
|
||||
|> Kernel.-(started_at)
|
||||
|> System.convert_time_unit(:native, :millisecond)
|
||||
end
|
||||
end
|
||||
@@ -46,6 +46,13 @@ defmodule Parrhesia.Protocol.EventValidator do
|
||||
| :missing_marmot_group_tag
|
||||
| :invalid_marmot_group_tag
|
||||
| :invalid_marmot_group_content
|
||||
| :missing_nip66_d_tag
|
||||
| :invalid_nip66_d_tag
|
||||
| :invalid_nip66_discovery_tag
|
||||
| :missing_nip66_frequency_tag
|
||||
| :invalid_nip66_frequency_tag
|
||||
| :invalid_nip66_timeout_tag
|
||||
| :invalid_nip66_check_tag
|
||||
|
||||
@spec validate(map()) :: :ok | {:error, error_reason()}
|
||||
def validate(event) when is_map(event) do
|
||||
@@ -130,7 +137,19 @@ defmodule Parrhesia.Protocol.EventValidator do
|
||||
missing_marmot_group_tag: "invalid: kind 445 must include at least one h tag with a group id",
|
||||
invalid_marmot_group_tag:
|
||||
"invalid: kind 445 h tags must contain 32-byte lowercase hex group ids",
|
||||
invalid_marmot_group_content: "invalid: kind 445 content must be non-empty base64"
|
||||
invalid_marmot_group_content: "invalid: kind 445 content must be non-empty base64",
|
||||
missing_nip66_d_tag:
|
||||
"invalid: kind 30166 must include a single [\"d\", <normalized ws/wss url or relay pubkey>] tag",
|
||||
invalid_nip66_d_tag:
|
||||
"invalid: kind 30166 must include a single [\"d\", <normalized ws/wss url or relay pubkey>] tag",
|
||||
invalid_nip66_discovery_tag: "invalid: kind 30166 includes malformed NIP-66 discovery tags",
|
||||
missing_nip66_frequency_tag:
|
||||
"invalid: kind 10166 must include a single [\"frequency\", <seconds>] tag",
|
||||
invalid_nip66_frequency_tag:
|
||||
"invalid: kind 10166 must include a single [\"frequency\", <seconds>] tag",
|
||||
invalid_nip66_timeout_tag:
|
||||
"invalid: kind 10166 timeout tags must be [\"timeout\", <check>, <ms>]",
|
||||
invalid_nip66_check_tag: "invalid: kind 10166 c tags must contain lowercase check names"
|
||||
}
|
||||
|
||||
@spec error_message(error_reason()) :: String.t()
|
||||
@@ -252,6 +271,12 @@ defmodule Parrhesia.Protocol.EventValidator do
|
||||
defp validate_kind_specific(%{"kind" => 1059} = event),
|
||||
do: validate_giftwrap_event(event)
|
||||
|
||||
defp validate_kind_specific(%{"kind" => 30_166} = event),
|
||||
do: validate_nip66_discovery_event(event)
|
||||
|
||||
defp validate_kind_specific(%{"kind" => 10_166} = event),
|
||||
do: validate_nip66_monitor_announcement(event)
|
||||
|
||||
defp validate_kind_specific(_event), do: :ok
|
||||
|
||||
defp validate_marmot_keypackage_event(event) do
|
||||
@@ -325,6 +350,110 @@ defmodule Parrhesia.Protocol.EventValidator do
|
||||
end
|
||||
end
|
||||
|
||||
defp validate_nip66_discovery_event(event) do
|
||||
tags = Map.get(event, "tags", [])
|
||||
|
||||
with :ok <- validate_nip66_d_tag(tags),
|
||||
:ok <-
|
||||
validate_optional_single_string_tag_with_predicate(
|
||||
tags,
|
||||
"n",
|
||||
:invalid_nip66_discovery_tag,
|
||||
&(&1 in ["clearnet", "tor", "i2p", "loki"])
|
||||
),
|
||||
:ok <-
|
||||
validate_optional_single_string_tag_with_predicate(
|
||||
tags,
|
||||
"T",
|
||||
:invalid_nip66_discovery_tag,
|
||||
&valid_pascal_case?/1
|
||||
),
|
||||
:ok <-
|
||||
validate_optional_single_string_tag_with_predicate(
|
||||
tags,
|
||||
"g",
|
||||
:invalid_nip66_discovery_tag,
|
||||
&non_empty_string?/1
|
||||
),
|
||||
:ok <-
|
||||
validate_optional_repeated_tag(
|
||||
tags,
|
||||
"N",
|
||||
&positive_integer_string?/1,
|
||||
:invalid_nip66_discovery_tag
|
||||
),
|
||||
:ok <-
|
||||
validate_optional_repeated_tag(
|
||||
tags,
|
||||
"R",
|
||||
&valid_nip66_requirement_value?/1,
|
||||
:invalid_nip66_discovery_tag
|
||||
),
|
||||
:ok <-
|
||||
validate_optional_repeated_tag(
|
||||
tags,
|
||||
"k",
|
||||
&valid_nip66_kind_value?/1,
|
||||
:invalid_nip66_discovery_tag
|
||||
),
|
||||
:ok <-
|
||||
validate_optional_repeated_tag(
|
||||
tags,
|
||||
"t",
|
||||
&non_empty_string?/1,
|
||||
:invalid_nip66_discovery_tag
|
||||
),
|
||||
:ok <-
|
||||
validate_optional_single_string_tag_with_predicate(
|
||||
tags,
|
||||
"rtt-open",
|
||||
:invalid_nip66_discovery_tag,
|
||||
&positive_integer_string?/1
|
||||
),
|
||||
:ok <-
|
||||
validate_optional_single_string_tag_with_predicate(
|
||||
tags,
|
||||
"rtt-read",
|
||||
:invalid_nip66_discovery_tag,
|
||||
&positive_integer_string?/1
|
||||
) do
|
||||
validate_optional_single_string_tag_with_predicate(
|
||||
tags,
|
||||
"rtt-write",
|
||||
:invalid_nip66_discovery_tag,
|
||||
&positive_integer_string?/1
|
||||
)
|
||||
end
|
||||
end
|
||||
|
||||
defp validate_nip66_monitor_announcement(event) do
|
||||
tags = Map.get(event, "tags", [])
|
||||
|
||||
with :ok <-
|
||||
validate_single_string_tag_with_predicate(
|
||||
tags,
|
||||
"frequency",
|
||||
:missing_nip66_frequency_tag,
|
||||
:invalid_nip66_frequency_tag,
|
||||
&positive_integer_string?/1
|
||||
),
|
||||
:ok <- validate_optional_repeated_timeout_tags(tags),
|
||||
:ok <-
|
||||
validate_optional_repeated_tag(
|
||||
tags,
|
||||
"c",
|
||||
&valid_nip66_check_name?/1,
|
||||
:invalid_nip66_check_tag
|
||||
) do
|
||||
validate_optional_single_string_tag_with_predicate(
|
||||
tags,
|
||||
"g",
|
||||
:invalid_nip66_discovery_tag,
|
||||
&non_empty_string?/1
|
||||
)
|
||||
end
|
||||
end
|
||||
|
||||
defp validate_non_empty_base64_content(event),
|
||||
do: validate_non_empty_base64_content(event, :invalid_marmot_keypackage_content)
|
||||
|
||||
@@ -406,6 +535,25 @@ defmodule Parrhesia.Protocol.EventValidator do
|
||||
end
|
||||
end
|
||||
|
||||
defp validate_optional_single_string_tag_with_predicate(
|
||||
tags,
|
||||
tag_name,
|
||||
invalid_error,
|
||||
predicate
|
||||
)
|
||||
when is_function(predicate, 1) do
|
||||
case Enum.filter(tags, &match_tag_name?(&1, tag_name)) do
|
||||
[] ->
|
||||
:ok
|
||||
|
||||
[[^tag_name, value]] ->
|
||||
if predicate.(value), do: :ok, else: {:error, invalid_error}
|
||||
|
||||
_other ->
|
||||
{:error, invalid_error}
|
||||
end
|
||||
end
|
||||
|
||||
defp validate_mls_extensions_tag(tags) do
|
||||
with {:ok, ["mls_extensions" | extensions]} <-
|
||||
fetch_single_tag(tags, "mls_extensions", :missing_marmot_extensions_tag),
|
||||
@@ -444,6 +592,40 @@ defmodule Parrhesia.Protocol.EventValidator do
|
||||
end
|
||||
end
|
||||
|
||||
defp validate_nip66_d_tag(tags) do
|
||||
with {:ok, ["d", value]} <- fetch_single_tag(tags, "d", :missing_nip66_d_tag),
|
||||
true <- valid_websocket_url?(value) or lowercase_hex?(value, 32) do
|
||||
:ok
|
||||
else
|
||||
{:ok, _invalid_tag_shape} -> {:error, :invalid_nip66_d_tag}
|
||||
false -> {:error, :invalid_nip66_d_tag}
|
||||
{:error, _reason} = error -> error
|
||||
end
|
||||
end
|
||||
|
||||
defp validate_optional_repeated_timeout_tags(tags) do
|
||||
timeout_tags = Enum.filter(tags, &match_tag_name?(&1, "timeout"))
|
||||
|
||||
if Enum.all?(timeout_tags, &valid_nip66_timeout_tag?/1) do
|
||||
:ok
|
||||
else
|
||||
{:error, :invalid_nip66_timeout_tag}
|
||||
end
|
||||
end
|
||||
|
||||
defp validate_optional_repeated_tag(tags, tag_name, predicate, invalid_error)
|
||||
when is_function(predicate, 1) do
|
||||
tags
|
||||
|> Enum.filter(&match_tag_name?(&1, tag_name))
|
||||
|> Enum.reduce_while(:ok, fn
|
||||
[^tag_name, value], :ok ->
|
||||
if predicate.(value), do: {:cont, :ok}, else: {:halt, {:error, invalid_error}}
|
||||
|
||||
_other, :ok ->
|
||||
{:halt, {:error, invalid_error}}
|
||||
end)
|
||||
end
|
||||
|
||||
defp fetch_single_tag(tags, tag_name, missing_error) do
|
||||
case Enum.filter(tags, &match_tag_name?(&1, tag_name)) do
|
||||
[tag] -> {:ok, tag}
|
||||
@@ -500,6 +682,49 @@ defmodule Parrhesia.Protocol.EventValidator do
|
||||
|
||||
defp valid_websocket_url?(_url), do: false
|
||||
|
||||
defp valid_nip66_timeout_tag?(["timeout", milliseconds]),
|
||||
do: positive_integer_string?(milliseconds)
|
||||
|
||||
defp valid_nip66_timeout_tag?(["timeout", check, milliseconds]) do
|
||||
valid_nip66_check_name?(check) and positive_integer_string?(milliseconds)
|
||||
end
|
||||
|
||||
defp valid_nip66_timeout_tag?(_tag), do: false
|
||||
|
||||
defp valid_nip66_requirement_value?(value) when is_binary(value) do
|
||||
normalized = String.trim_leading(value, "!")
|
||||
normalized in ["auth", "writes", "pow", "payment"]
|
||||
end
|
||||
|
||||
defp valid_nip66_requirement_value?(_value), do: false
|
||||
|
||||
defp valid_nip66_kind_value?(<<"!", rest::binary>>), do: positive_integer_string?(rest)
|
||||
defp valid_nip66_kind_value?(value), do: positive_integer_string?(value)
|
||||
|
||||
defp valid_nip66_check_name?(value) when is_binary(value) do
|
||||
String.match?(value, ~r/^[a-z0-9-]+$/)
|
||||
end
|
||||
|
||||
defp valid_nip66_check_name?(_value), do: false
|
||||
|
||||
defp valid_pascal_case?(value) when is_binary(value) do
|
||||
String.match?(value, ~r/^[A-Z][A-Za-z0-9]*$/)
|
||||
end
|
||||
|
||||
defp valid_pascal_case?(_value), do: false
|
||||
|
||||
defp positive_integer_string?(value) when is_binary(value) do
|
||||
case Integer.parse(value) do
|
||||
{integer, ""} when integer >= 0 -> true
|
||||
_other -> false
|
||||
end
|
||||
end
|
||||
|
||||
defp positive_integer_string?(_value), do: false
|
||||
|
||||
defp non_empty_string?(value) when is_binary(value), do: value != ""
|
||||
defp non_empty_string?(_value), do: false
|
||||
|
||||
defp valid_keypackage_ref?(value) when is_binary(value) do
|
||||
Enum.any?(@supported_keypackage_ref_sizes, &lowercase_hex?(value, &1))
|
||||
end
|
||||
|
||||
40
lib/parrhesia/tasks/nip66_publisher.ex
Normal file
40
lib/parrhesia/tasks/nip66_publisher.ex
Normal file
@@ -0,0 +1,40 @@
|
||||
defmodule Parrhesia.Tasks.Nip66Publisher do
|
||||
@moduledoc """
|
||||
Periodic worker that publishes NIP-66 monitor and discovery events.
|
||||
"""
|
||||
|
||||
use GenServer
|
||||
|
||||
alias Parrhesia.NIP66
|
||||
|
||||
@spec start_link(keyword()) :: GenServer.on_start()
|
||||
def start_link(opts \\ []) do
|
||||
name = Keyword.get(opts, :name, __MODULE__)
|
||||
GenServer.start_link(__MODULE__, opts, name: name)
|
||||
end
|
||||
|
||||
@impl true
|
||||
def init(opts) do
|
||||
state = %{
|
||||
interval_ms: Keyword.get(opts, :interval_ms, NIP66.publish_interval_ms()),
|
||||
publish_opts: Keyword.drop(opts, [:name, :interval_ms, :nip66_module]),
|
||||
nip66_module: Keyword.get(opts, :nip66_module, NIP66)
|
||||
}
|
||||
|
||||
schedule_tick(0)
|
||||
{:ok, state}
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_info(:tick, state) do
|
||||
_result = state.nip66_module.publish_snapshot(state.publish_opts)
|
||||
schedule_tick(state.interval_ms)
|
||||
{:noreply, state}
|
||||
end
|
||||
|
||||
def handle_info(_message, state), do: {:noreply, state}
|
||||
|
||||
defp schedule_tick(interval_ms) do
|
||||
Process.send_after(self(), :tick, interval_ms)
|
||||
end
|
||||
end
|
||||
@@ -11,7 +11,7 @@ defmodule Parrhesia.Tasks.Supervisor do
|
||||
|
||||
@impl true
|
||||
def init(_init_arg) do
|
||||
children = expiration_children() ++ partition_retention_children()
|
||||
children = expiration_children() ++ partition_retention_children() ++ nip66_children()
|
||||
|
||||
Supervisor.init(children, strategy: :one_for_one)
|
||||
end
|
||||
@@ -29,4 +29,12 @@ defmodule Parrhesia.Tasks.Supervisor do
|
||||
{Parrhesia.Tasks.PartitionRetentionWorker, name: Parrhesia.Tasks.PartitionRetentionWorker}
|
||||
]
|
||||
end
|
||||
|
||||
defp nip66_children do
|
||||
if Parrhesia.NIP66.enabled?() do
|
||||
[{Parrhesia.Tasks.Nip66Publisher, name: Parrhesia.Tasks.Nip66Publisher}]
|
||||
else
|
||||
[]
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
@@ -12,6 +12,7 @@ defmodule Parrhesia.Web.RelayInfo do
|
||||
"name" => "Parrhesia",
|
||||
"description" => "Nostr/Marmot relay",
|
||||
"pubkey" => relay_pubkey(),
|
||||
"self" => relay_pubkey(),
|
||||
"supported_nips" => supported_nips(),
|
||||
"software" => "https://git.teralink.net/self/parrhesia",
|
||||
"version" => Application.spec(:parrhesia, :vsn) |> to_string(),
|
||||
@@ -20,13 +21,20 @@ defmodule Parrhesia.Web.RelayInfo do
|
||||
end
|
||||
|
||||
defp supported_nips do
|
||||
base = [1, 9, 11, 13, 17, 40, 42, 43, 44, 45, 50, 59, 62, 66, 70]
|
||||
base = [1, 9, 11, 13, 17, 40, 42, 43, 44, 45, 50, 59, 62, 70]
|
||||
|
||||
with_nip66 =
|
||||
if Parrhesia.NIP66.enabled?() do
|
||||
base ++ [66]
|
||||
else
|
||||
base
|
||||
end
|
||||
|
||||
with_negentropy =
|
||||
if negentropy_enabled?() do
|
||||
base ++ [77]
|
||||
with_nip66 ++ [77]
|
||||
else
|
||||
base
|
||||
with_nip66
|
||||
end
|
||||
|
||||
with_negentropy ++ [86, 98]
|
||||
@@ -38,7 +46,12 @@ defmodule Parrhesia.Web.RelayInfo do
|
||||
"max_subscriptions" =>
|
||||
Parrhesia.Config.get([:limits, :max_subscriptions_per_connection], 32),
|
||||
"max_filters" => Parrhesia.Config.get([:limits, :max_filters_per_req], 16),
|
||||
"auth_required" => Listener.relay_auth_required?(listener)
|
||||
"max_limit" => Parrhesia.Config.get([:limits, :max_filter_limit], 500),
|
||||
"max_event_tags" => Parrhesia.Config.get([:limits, :max_tags_per_event], 256),
|
||||
"min_pow_difficulty" => Parrhesia.Config.get([:policies, :min_pow_difficulty], 0),
|
||||
"auth_required" => Listener.relay_auth_required?(listener),
|
||||
"payment_required" => false,
|
||||
"restricted_writes" => restricted_writes?(listener)
|
||||
}
|
||||
end
|
||||
|
||||
@@ -54,4 +67,12 @@ defmodule Parrhesia.Web.RelayInfo do
|
||||
{:error, _reason} -> nil
|
||||
end
|
||||
end
|
||||
|
||||
defp restricted_writes?(listener) do
|
||||
listener.auth.nip42_required or
|
||||
(listener.baseline_acl.write != [] and
|
||||
Enum.any?(listener.baseline_acl.write, &(&1.action == :deny))) or
|
||||
Parrhesia.Config.get([:policies, :auth_required_for_writes], false) or
|
||||
Parrhesia.Config.get([:policies, :min_pow_difficulty], 0) > 0
|
||||
end
|
||||
end
|
||||
|
||||
Reference in New Issue
Block a user