Refactor ingress to listener-based configuration

This commit is contained in:
2026-03-16 23:47:17 +01:00
parent 5f4f086d28
commit 1f608ee2bd
18 changed files with 1231 additions and 210 deletions

View File

@@ -14,7 +14,6 @@ defmodule Parrhesia.Application do
Parrhesia.Sync.Supervisor,
Parrhesia.Policy.Supervisor,
Parrhesia.Web.Endpoint,
Parrhesia.Web.MetricsEndpoint,
Parrhesia.Tasks.Supervisor
]

View File

@@ -15,6 +15,7 @@ defmodule Parrhesia.Web.Connection do
alias Parrhesia.Protocol.Filter
alias Parrhesia.Subscriptions.Index
alias Parrhesia.Telemetry
alias Parrhesia.Web.Listener
@default_max_subscriptions_per_connection 32
@default_max_outbound_queue 256
@@ -43,6 +44,7 @@ defmodule Parrhesia.Web.Connection do
defstruct subscriptions: %{},
authenticated_pubkeys: MapSet.new(),
listener: nil,
max_subscriptions_per_connection: @default_max_subscriptions_per_connection,
subscription_index: Index,
auth_challenges: Challenges,
@@ -74,6 +76,7 @@ defmodule Parrhesia.Web.Connection do
@type t :: %__MODULE__{
subscriptions: %{String.t() => subscription()},
authenticated_pubkeys: MapSet.t(String.t()),
listener: map() | nil,
max_subscriptions_per_connection: pos_integer(),
subscription_index: GenServer.server() | nil,
auth_challenges: GenServer.server() | nil,
@@ -101,6 +104,7 @@ defmodule Parrhesia.Web.Connection do
auth_challenges = auth_challenges(opts)
state = %__MODULE__{
listener: Listener.from_opts(opts),
max_subscriptions_per_connection: max_subscriptions_per_connection(opts),
subscription_index: subscription_index(opts),
auth_challenges: auth_challenges,
@@ -222,7 +226,10 @@ defmodule Parrhesia.Web.Connection do
case maybe_allow_event_ingest(state) do
{:ok, next_state} ->
publish_event_response(next_state, event)
case authorize_listener_write(next_state, event) do
:ok -> publish_event_response(next_state, event)
{:error, reason} -> ingest_error_response(state, event_id, reason)
end
{:error, reason} ->
ingest_error_response(state, event_id, reason)
@@ -265,6 +272,7 @@ defmodule Parrhesia.Web.Connection do
defp handle_req(%__MODULE__{} = state, subscription_id, filters) do
with :ok <- Filter.validate_filters(filters),
:ok <- authorize_listener_read(state, filters),
:ok <-
EventPolicy.authorize_read(
filters,
@@ -302,6 +310,13 @@ defmodule Parrhesia.Web.Connection do
EventPolicy.error_message(:sync_read_not_allowed)
)
{:error, :listener_read_not_allowed} ->
restricted_close(
state,
subscription_id,
"restricted: listener baseline denies requested filters"
)
{:error, :marmot_group_h_tag_required} ->
restricted_close(
state,
@@ -364,13 +379,21 @@ defmodule Parrhesia.Web.Connection do
end
defp handle_count(%__MODULE__{} = state, subscription_id, filters, options) do
case Events.count(filters,
context: request_context(state, subscription_id),
options: options
) do
{:ok, payload} ->
response = Protocol.encode_relay({:count, subscription_id, payload})
{:push, {:text, response}, state}
with :ok <- authorize_listener_read(state, filters),
{:ok, payload} <-
Events.count(filters,
context: request_context(state, subscription_id),
options: options
) do
response = Protocol.encode_relay({:count, subscription_id, payload})
{:push, {:text, response}, state}
else
{:error, :listener_read_not_allowed} ->
restricted_count_notice(
state,
subscription_id,
"restricted: listener baseline denies requested filters"
)
{:error, reason} ->
handle_count_error(state, subscription_id, reason)
@@ -422,6 +445,7 @@ defmodule Parrhesia.Web.Connection do
defp handle_neg_open(%__MODULE__{} = state, subscription_id, filter, message) do
with :ok <- Filter.validate_filters([filter]),
:ok <- authorize_listener_read(state, [filter]),
:ok <-
EventPolicy.authorize_read(
[filter],
@@ -471,6 +495,9 @@ defmodule Parrhesia.Web.Connection do
defp error_message_for_ingest_failure(:event_too_large),
do: "invalid: event exceeds max event size"
defp error_message_for_ingest_failure(:listener_write_not_allowed),
do: "restricted: listener baseline denies event"
defp error_message_for_ingest_failure(:ephemeral_events_disabled),
do: "blocked: ephemeral events are disabled"
@@ -480,6 +507,7 @@ defmodule Parrhesia.Web.Connection do
:pubkey_not_allowed,
:restricted_giftwrap,
:sync_write_not_allowed,
:listener_write_not_allowed,
:protected_event_requires_auth,
:protected_event_pubkey_mismatch,
:pow_below_minimum,
@@ -576,6 +604,7 @@ defmodule Parrhesia.Web.Connection do
:pubkey_not_allowed,
:restricted_giftwrap,
:sync_read_not_allowed,
:listener_read_not_allowed,
:marmot_group_h_tag_required,
:marmot_group_h_values_exceeded,
:marmot_group_filter_window_too_wide
@@ -627,6 +656,9 @@ defmodule Parrhesia.Web.Connection do
],
do: Filter.error_message(reason)
defp negentropy_policy_or_filter_error_message(:listener_read_not_allowed),
do: "restricted: listener baseline denies requested filters"
defp negentropy_policy_or_filter_error_message(reason), do: EventPolicy.error_message(reason)
defp validate_auth_event(%__MODULE__{} = state, %{"kind" => 22_242} = auth_event) do
@@ -706,6 +738,31 @@ defmodule Parrhesia.Web.Connection do
defp auth_error_message(reason) when is_binary(reason), do: reason
defp auth_error_message(reason), do: "invalid: #{inspect(reason)}"
defp authorize_listener_read(%__MODULE__{} = state, filters) do
case maybe_require_listener_auth(state) do
:ok -> Listener.authorize_read(state.listener, filters)
error -> error
end
end
defp authorize_listener_write(%__MODULE__{} = state, event) do
case maybe_require_listener_auth(state) do
:ok -> Listener.authorize_write(state.listener, event)
error -> error
end
end
defp maybe_require_listener_auth(%__MODULE__{
listener: listener,
authenticated_pubkeys: pubkeys
}) do
if Listener.nip42_required?(listener) and MapSet.size(pubkeys) == 0 do
{:error, :auth_required}
else
:ok
end
end
defp with_auth_challenge_frame(
%__MODULE__{auth_challenge: nil},
result
@@ -1417,7 +1474,8 @@ defmodule Parrhesia.Web.Connection do
authenticated_pubkeys: state.authenticated_pubkeys,
caller: :websocket,
remote_ip: state.remote_ip,
subscription_id: subscription_id
subscription_id: subscription_id,
metadata: %{listener_id: state.listener.id}
}
end

View File

@@ -1,29 +1,27 @@
defmodule Parrhesia.Web.Endpoint do
@moduledoc """
Supervision entrypoint for WS/HTTP ingress.
Supervision entrypoint for configured ingress listeners.
"""
use Supervisor
def start_link(init_arg \\ []) do
Supervisor.start_link(__MODULE__, init_arg, name: __MODULE__)
alias Parrhesia.Web.Listener
def start_link(_init_arg \\ []) do
Supervisor.start_link(__MODULE__, :ok, name: __MODULE__)
end
@impl true
def init(init_arg) do
children = [
{Bandit, bandit_options(init_arg)}
]
def init(:ok) do
children =
Listener.all()
|> Enum.map(fn listener ->
%{
id: {:listener, listener.id},
start: {Bandit, :start_link, [Listener.bandit_options(listener)]}
}
end)
Supervisor.init(children, strategy: :one_for_one)
end
defp bandit_options(overrides) do
configured = Application.get_env(:parrhesia, __MODULE__, [])
configured
|> Keyword.merge(overrides)
|> Keyword.put_new(:scheme, :http)
|> Keyword.put_new(:plug, Parrhesia.Web.Router)
end
end

View File

@@ -0,0 +1,627 @@
defmodule Parrhesia.Web.Listener do
@moduledoc false
import Bitwise
alias Parrhesia.Protocol.Filter
@private_cidrs [
"127.0.0.0/8",
"10.0.0.0/8",
"172.16.0.0/12",
"192.168.0.0/16",
"169.254.0.0/16",
"::1/128",
"fc00::/7",
"fe80::/10"
]
@type t :: %{
id: atom(),
enabled: boolean(),
bind: %{ip: tuple(), port: pos_integer()},
transport: map(),
proxy: map(),
network: map(),
features: map(),
auth: map(),
baseline_acl: map(),
bandit_options: keyword()
}
@spec all() :: [t()]
def all do
:parrhesia
|> Application.get_env(:listeners, %{})
|> normalize_listeners()
|> Enum.filter(& &1.enabled)
end
@spec from_opts(keyword() | map()) :: t()
def from_opts(opts) when is_list(opts) do
opts
|> Keyword.get(:listener, default_listener())
|> normalize_listener()
end
def from_opts(opts) when is_map(opts) do
opts
|> Map.get(:listener, default_listener())
|> normalize_listener()
end
@spec from_conn(Plug.Conn.t()) :: t()
def from_conn(conn) do
conn.private
|> Map.get(:parrhesia_listener, default_listener())
|> normalize_listener()
end
@spec put_conn(Plug.Conn.t(), keyword()) :: Plug.Conn.t()
def put_conn(conn, opts) when is_list(opts) do
Plug.Conn.put_private(conn, :parrhesia_listener, from_opts(opts))
end
@spec feature_enabled?(t(), atom()) :: boolean()
def feature_enabled?(listener, feature) when is_map(listener) and is_atom(feature) do
listener
|> Map.get(:features, %{})
|> Map.get(feature, %{})
|> Map.get(:enabled, false)
end
@spec nip42_required?(t()) :: boolean()
def nip42_required?(listener), do: listener.auth.nip42_required
@spec admin_auth_required?(t()) :: boolean()
def admin_auth_required?(listener), do: listener.auth.nip98_required_for_admin
@spec trusted_proxies(t()) :: [String.t()]
def trusted_proxies(listener) do
listener.proxy.trusted_cidrs
end
@spec remote_ip_allowed?(t(), tuple() | String.t() | nil) :: boolean()
def remote_ip_allowed?(listener, remote_ip) do
access_allowed?(listener.network, remote_ip)
end
@spec metrics_allowed?(t(), Plug.Conn.t()) :: boolean()
def metrics_allowed?(listener, conn) do
metrics = Map.get(listener.features, :metrics, %{})
feature_enabled?(listener, :metrics) and
access_allowed?(Map.get(metrics, :access, %{}), conn.remote_ip) and
metrics_token_allowed?(metrics, conn)
end
@spec relay_url(t(), Plug.Conn.t()) :: String.t()
def relay_url(listener, conn) do
scheme = listener.transport.scheme
ws_scheme = if scheme == :https, do: "wss", else: "ws"
port_segment =
if default_http_port?(scheme, conn.port) do
""
else
":#{conn.port}"
end
"#{ws_scheme}://#{conn.host}#{port_segment}#{conn.request_path}"
end
@spec relay_auth_required?(t()) :: boolean()
def relay_auth_required?(listener), do: listener.auth.nip42_required
@spec authorize_read(t(), [map()]) :: :ok | {:error, :listener_read_not_allowed}
def authorize_read(listener, filters) when is_list(filters) do
case evaluate_rules(listener.baseline_acl.read, filters, :read) do
:allow -> :ok
:deny -> {:error, :listener_read_not_allowed}
end
end
@spec authorize_write(t(), map()) :: :ok | {:error, :listener_write_not_allowed}
def authorize_write(listener, event) when is_map(event) do
case evaluate_rules(listener.baseline_acl.write, event, :write) do
:allow -> :ok
:deny -> {:error, :listener_write_not_allowed}
end
end
@spec bandit_options(t()) :: keyword()
def bandit_options(listener) do
[
ip: listener.bind.ip,
port: listener.bind.port,
scheme: listener.transport.scheme,
plug: {Parrhesia.Web.ListenerPlug, listener: listener}
] ++ listener.bandit_options
end
defp normalize_listeners(listeners) when is_list(listeners) do
Enum.map(listeners, fn
{id, listener} when is_atom(id) and is_map(listener) ->
normalize_listener(Map.put(listener, :id, id))
listener when is_map(listener) ->
normalize_listener(listener)
end)
end
defp normalize_listeners(listeners) when is_map(listeners) do
listeners
|> Enum.map(fn {id, listener} -> normalize_listener(Map.put(listener, :id, id)) end)
|> Enum.sort_by(& &1.id)
end
defp normalize_listener(listener) when is_map(listener) do
id = normalize_atom(fetch_value(listener, :id), :listener)
enabled = normalize_boolean(fetch_value(listener, :enabled), true)
bind = normalize_bind(fetch_value(listener, :bind), listener)
transport = normalize_transport(fetch_value(listener, :transport))
proxy = normalize_proxy(fetch_value(listener, :proxy))
network = normalize_access(fetch_value(listener, :network), %{allow_all?: true})
features = normalize_features(fetch_value(listener, :features))
auth = normalize_auth(fetch_value(listener, :auth))
baseline_acl = normalize_baseline_acl(fetch_value(listener, :baseline_acl))
bandit_options = normalize_bandit_options(fetch_value(listener, :bandit_options))
%{
id: id,
enabled: enabled,
bind: bind,
transport: transport,
proxy: proxy,
network: network,
features: features,
auth: auth,
baseline_acl: baseline_acl,
bandit_options: bandit_options
}
end
defp normalize_listener(_listener), do: default_listener()
defp normalize_bind(bind, listener) when is_map(bind) do
%{
ip: normalize_ip(fetch_value(bind, :ip), default_bind_ip(listener)),
port: normalize_port(fetch_value(bind, :port), 4413)
}
end
defp normalize_bind(_bind, listener) do
%{
ip: default_bind_ip(listener),
port: normalize_port(fetch_value(listener, :port), 4413)
}
end
defp default_bind_ip(listener) do
normalize_ip(fetch_value(listener, :ip), {0, 0, 0, 0})
end
defp normalize_transport(transport) when is_map(transport) do
%{
scheme: normalize_scheme(fetch_value(transport, :scheme), :http),
tls: normalize_map(fetch_value(transport, :tls))
}
end
defp normalize_transport(_transport), do: %{scheme: :http, tls: %{}}
defp normalize_proxy(proxy) when is_map(proxy) do
%{
trusted_cidrs: normalize_string_list(fetch_value(proxy, :trusted_cidrs)),
honor_x_forwarded_for: normalize_boolean(fetch_value(proxy, :honor_x_forwarded_for), true)
}
end
defp normalize_proxy(_proxy), do: %{trusted_cidrs: [], honor_x_forwarded_for: true}
defp normalize_features(features) when is_map(features) do
%{
nostr: normalize_simple_feature(fetch_value(features, :nostr), true),
admin: normalize_simple_feature(fetch_value(features, :admin), true),
metrics: normalize_metrics_feature(fetch_value(features, :metrics))
}
end
defp normalize_features(_features) do
%{
nostr: %{enabled: true},
admin: %{enabled: true},
metrics: %{enabled: false, access: default_feature_access()}
}
end
defp normalize_simple_feature(feature, default_enabled) when is_map(feature) do
%{enabled: normalize_boolean(fetch_value(feature, :enabled), default_enabled)}
end
defp normalize_simple_feature(feature, _default_enabled) when is_boolean(feature) do
%{enabled: feature}
end
defp normalize_simple_feature(_feature, default_enabled), do: %{enabled: default_enabled}
defp normalize_metrics_feature(feature) when is_map(feature) do
%{
enabled: normalize_boolean(fetch_value(feature, :enabled), false),
auth_token: normalize_optional_string(fetch_value(feature, :auth_token)),
access:
normalize_access(fetch_value(feature, :access), %{
private_networks_only?: false,
allow_all?: true
})
}
end
defp normalize_metrics_feature(feature) when is_boolean(feature) do
%{enabled: feature, auth_token: nil, access: default_feature_access()}
end
defp normalize_metrics_feature(_feature),
do: %{enabled: false, auth_token: nil, access: default_feature_access()}
defp default_feature_access do
%{public?: false, private_networks_only?: false, allow_cidrs: [], allow_all?: true}
end
defp normalize_auth(auth) when is_map(auth) do
%{
nip42_required: normalize_boolean(fetch_value(auth, :nip42_required), false),
nip98_required_for_admin:
normalize_boolean(fetch_value(auth, :nip98_required_for_admin), true)
}
end
defp normalize_auth(_auth), do: %{nip42_required: false, nip98_required_for_admin: true}
defp normalize_baseline_acl(acl) when is_map(acl) do
%{
read: normalize_baseline_rules(fetch_value(acl, :read)),
write: normalize_baseline_rules(fetch_value(acl, :write))
}
end
defp normalize_baseline_acl(_acl), do: %{read: [], write: []}
defp normalize_baseline_rules(rules) when is_list(rules) do
Enum.flat_map(rules, fn
%{match: match} = rule when is_map(match) ->
[
%{
action: normalize_rule_action(fetch_value(rule, :action)),
match: normalize_filter_map(match)
}
]
_other ->
[]
end)
end
defp normalize_baseline_rules(_rules), do: []
defp normalize_rule_action(:deny), do: :deny
defp normalize_rule_action("deny"), do: :deny
defp normalize_rule_action(_action), do: :allow
defp normalize_bandit_options(options) when is_list(options), do: options
defp normalize_bandit_options(_options), do: []
defp normalize_access(access, defaults) when is_map(access) do
%{
public?:
normalize_boolean(
first_present(access, [:public, :public?]),
Map.get(defaults, :public?, false)
),
private_networks_only?:
normalize_boolean(
first_present(access, [:private_networks_only, :private_networks_only?]),
Map.get(defaults, :private_networks_only?, false)
),
allow_cidrs: normalize_string_list(fetch_value(access, :allow_cidrs)),
allow_all?:
normalize_boolean(
first_present(access, [:allow_all, :allow_all?]),
Map.get(defaults, :allow_all?, false)
)
}
end
defp normalize_access(_access, defaults) do
%{
public?: Map.get(defaults, :public?, false),
private_networks_only?: Map.get(defaults, :private_networks_only?, false),
allow_cidrs: [],
allow_all?: Map.get(defaults, :allow_all?, false)
}
end
defp access_allowed?(%{public?: true}, _remote_ip), do: true
defp access_allowed?(%{allow_cidrs: allow_cidrs}, remote_ip) when allow_cidrs != [] do
Enum.any?(allow_cidrs, &ip_in_cidr?(remote_ip, &1))
end
defp access_allowed?(%{private_networks_only?: true}, remote_ip) do
Enum.any?(@private_cidrs, &ip_in_cidr?(remote_ip, &1))
end
defp access_allowed?(%{allow_all?: allow_all?}, _remote_ip), do: allow_all?
defp metrics_token_allowed?(metrics, conn) do
case metrics.auth_token do
nil ->
true
token ->
conn
|> Plug.Conn.get_req_header("authorization")
|> List.first()
|> normalize_authorization_header()
|> Kernel.==(token)
end
end
defp normalize_authorization_header("Bearer " <> token), do: token
defp normalize_authorization_header(token) when is_binary(token), do: token
defp normalize_authorization_header(_header), do: nil
defp evaluate_rules([], _subject, _mode), do: :allow
defp evaluate_rules(rules, subject, mode) do
has_allow_rules? = Enum.any?(rules, &(&1.action == :allow))
case Enum.find(rules, &rule_matches?(&1, subject, mode)) do
%{action: :deny} -> :deny
%{action: :allow} -> :allow
nil when has_allow_rules? -> :deny
nil -> :allow
end
end
defp rule_matches?(rule, filters, :read) when is_list(filters) do
Enum.any?(filters, &filters_overlap?(&1, rule.match))
end
defp rule_matches?(rule, event, :write) when is_map(event) do
Filter.matches_filter?(event, rule.match)
end
defp rule_matches?(_rule, _subject, _mode), do: false
defp filters_overlap?(left, right) when is_map(left) and is_map(right) do
comparable_keys =
left
|> Map.keys()
|> Kernel.++(Map.keys(right))
|> Enum.uniq()
|> Enum.reject(&(&1 in ["limit", "search", "since", "until"]))
Enum.all?(comparable_keys, fn key ->
filter_constraint_compatible?(Map.get(left, key), Map.get(right, key))
end) and filter_ranges_overlap?(left, right)
end
defp filter_constraint_compatible?(nil, _right), do: true
defp filter_constraint_compatible?(_left, nil), do: true
defp filter_constraint_compatible?(left, right) when is_list(left) and is_list(right) do
not MapSet.disjoint?(MapSet.new(left), MapSet.new(right))
end
defp filter_constraint_compatible?(left, right), do: left == right
defp filter_ranges_overlap?(left, right) do
since = max(Map.get(left, "since", 0), Map.get(right, "since", 0))
until =
min(
Map.get(left, "until", 9_223_372_036_854_775_807),
Map.get(right, "until", 9_223_372_036_854_775_807)
)
since <= until
end
defp default_listener do
case configured_default_listener() do
nil -> fallback_listener()
listener -> normalize_listener(listener)
end
end
defp configured_default_listener do
listeners = Application.get_env(:parrhesia, :listeners, %{})
case fetch_public_listener(listeners) do
nil -> first_configured_listener(listeners)
listener -> listener
end
end
defp fetch_public_listener(%{public: listener}) when is_map(listener),
do: Map.put_new(listener, :id, :public)
defp fetch_public_listener(listeners) when is_list(listeners) do
case Keyword.fetch(listeners, :public) do
{:ok, listener} when is_map(listener) -> Map.put_new(listener, :id, :public)
_other -> nil
end
end
defp fetch_public_listener(_listeners), do: nil
defp first_configured_listener(listeners) when is_list(listeners) do
case listeners do
[{id, listener} | _rest] when is_atom(id) and is_map(listener) ->
Map.put_new(listener, :id, id)
_other ->
nil
end
end
defp first_configured_listener(listeners) when is_map(listeners) and map_size(listeners) > 0 do
{id, listener} = Enum.at(Enum.sort_by(listeners, fn {key, _value} -> key end), 0)
Map.put_new(listener, :id, id)
end
defp first_configured_listener(_listeners), do: nil
defp fallback_listener do
%{
id: :public,
enabled: true,
bind: %{ip: {0, 0, 0, 0}, port: 4413},
transport: %{scheme: :http, tls: %{}},
proxy: %{trusted_cidrs: [], honor_x_forwarded_for: true},
network: %{public?: false, private_networks_only?: false, allow_cidrs: [], allow_all?: true},
features: %{
nostr: %{enabled: true},
admin: %{enabled: true},
metrics: %{enabled: false, auth_token: nil, access: default_feature_access()}
},
auth: %{nip42_required: false, nip98_required_for_admin: true},
baseline_acl: %{read: [], write: []},
bandit_options: []
}
end
defp fetch_value(map, key) when is_map(map) do
cond do
Map.has_key?(map, key) ->
Map.get(map, key)
is_atom(key) and Map.has_key?(map, Atom.to_string(key)) ->
Map.get(map, Atom.to_string(key))
true ->
nil
end
end
defp first_present(map, keys) do
Enum.find_value(keys, fn key ->
cond do
Map.has_key?(map, key) ->
{:present, Map.get(map, key)}
is_atom(key) and Map.has_key?(map, Atom.to_string(key)) ->
{:present, Map.get(map, Atom.to_string(key))}
true ->
nil
end
end)
|> case do
{:present, value} -> value
nil -> nil
end
end
defp normalize_map(value) when is_map(value), do: value
defp normalize_map(_value), do: %{}
defp normalize_boolean(value, _default) when is_boolean(value), do: value
defp normalize_boolean(nil, default), do: default
defp normalize_boolean(_value, default), do: default
defp normalize_optional_string(value) when is_binary(value) and value != "", do: value
defp normalize_optional_string(_value), do: nil
defp normalize_string_list(values) when is_list(values) do
Enum.filter(values, &(is_binary(&1) and &1 != ""))
end
defp normalize_string_list(_values), do: []
defp normalize_ip({_, _, _, _} = ip, _default), do: ip
defp normalize_ip({_, _, _, _, _, _, _, _} = ip, _default), do: ip
defp normalize_ip(_ip, default), do: default
defp normalize_port(port, _default) when is_integer(port) and port > 0, do: port
defp normalize_port(0, _default), do: 0
defp normalize_port(_port, default), do: default
defp normalize_scheme(:https, _default), do: :https
defp normalize_scheme("https", _default), do: :https
defp normalize_scheme(_scheme, default), do: default
defp normalize_atom(value, _default) when is_atom(value), do: value
defp normalize_atom(_value, default), do: default
defp normalize_filter_map(filter) when is_map(filter) do
Map.new(filter, fn
{key, value} when is_atom(key) -> {Atom.to_string(key), value}
{key, value} -> {key, value}
end)
end
defp normalize_filter_map(filter), do: filter
defp default_http_port?(:http, 80), do: true
defp default_http_port?(:https, 443), do: true
defp default_http_port?(_scheme, _port), do: false
defp ip_in_cidr?(ip, cidr) do
with {network, prefix_len} <- parse_cidr(cidr),
{:ok, ip_size, ip_value} <- ip_to_int(ip),
{:ok, network_size, network_value} <- ip_to_int(network),
true <- ip_size == network_size,
true <- prefix_len >= 0,
true <- prefix_len <= ip_size do
mask = network_mask(ip_size, prefix_len)
(ip_value &&& mask) == (network_value &&& mask)
else
_other -> false
end
end
defp parse_cidr(cidr) when is_binary(cidr) do
case String.split(cidr, "/", parts: 2) do
[address, prefix_str] ->
with {prefix_len, ""} <- Integer.parse(prefix_str),
{:ok, ip} <- :inet.parse_address(String.to_charlist(address)) do
{ip, prefix_len}
else
_other -> :error
end
[address] ->
case :inet.parse_address(String.to_charlist(address)) do
{:ok, {_, _, _, _} = ip} -> {ip, 32}
{:ok, {_, _, _, _, _, _, _, _} = ip} -> {ip, 128}
_other -> :error
end
_other ->
:error
end
end
defp parse_cidr(_cidr), do: :error
defp ip_to_int({a, b, c, d}) do
{:ok, 32, (a <<< 24) + (b <<< 16) + (c <<< 8) + d}
end
defp ip_to_int({a, b, c, d, e, f, g, h}) do
{:ok, 128,
(a <<< 112) + (b <<< 96) + (c <<< 80) + (d <<< 64) + (e <<< 48) + (f <<< 32) + (g <<< 16) +
h}
end
defp ip_to_int(_ip), do: :error
defp network_mask(_size, 0), do: 0
defp network_mask(size, prefix_len) do
all_ones = (1 <<< size) - 1
all_ones <<< (size - prefix_len)
end
end

View File

@@ -0,0 +1,14 @@
defmodule Parrhesia.Web.ListenerPlug do
@moduledoc false
alias Parrhesia.Web.Listener
alias Parrhesia.Web.Router
def init(opts), do: opts
def call(conn, opts) do
conn
|> Listener.put_conn(opts)
|> Router.call([])
end
end

View File

@@ -8,13 +8,15 @@ defmodule Parrhesia.Web.Management do
alias Parrhesia.API.Admin
alias Parrhesia.API.Auth
@spec handle(Plug.Conn.t()) :: Plug.Conn.t()
def handle(conn) do
@spec handle(Plug.Conn.t(), keyword()) :: Plug.Conn.t()
def handle(conn, opts \\ []) do
full_url = full_request_url(conn)
method = conn.method
authorization = get_req_header(conn, "authorization") |> List.first()
auth_required? = admin_auth_required?(opts)
with {:ok, auth_context} <- Auth.validate_nip98(authorization, method, full_url),
with {:ok, auth_context} <-
maybe_validate_nip98(auth_required?, authorization, method, full_url),
{:ok, payload} <- parse_payload(conn.body_params),
{:ok, result} <- execute_method(payload),
:ok <- append_audit_log(auth_context, payload, result) do
@@ -46,6 +48,14 @@ defmodule Parrhesia.Web.Management do
end
end
defp maybe_validate_nip98(true, authorization, method, url) do
Auth.validate_nip98(authorization, method, url)
end
defp maybe_validate_nip98(false, _authorization, _method, _url) do
{:ok, %{pubkey: nil}}
end
defp parse_payload(%{"method" => method} = payload) when is_binary(method) do
params = Map.get(payload, "params", %{})
@@ -99,4 +109,13 @@ defmodule Parrhesia.Web.Management do
"#{scheme}://#{host}#{port_suffix}#{conn.request_path}#{query_suffix}"
end
defp admin_auth_required?(opts) do
opts
|> Keyword.get(:listener)
|> case do
%{auth: %{nip98_required_for_admin: value}} -> value
_other -> true
end
end
end

View File

@@ -4,18 +4,13 @@ defmodule Parrhesia.Web.Metrics do
import Plug.Conn
alias Parrhesia.Telemetry
alias Parrhesia.Web.MetricsAccess
@spec enabled_on_main_endpoint?() :: boolean()
def enabled_on_main_endpoint? do
:parrhesia
|> Application.get_env(:metrics, [])
|> Keyword.get(:enabled_on_main_endpoint, true)
end
alias Parrhesia.Web.Listener
@spec handle(Plug.Conn.t()) :: Plug.Conn.t()
def handle(conn) do
if MetricsAccess.allowed?(conn) do
listener = Listener.from_conn(conn)
if Listener.metrics_allowed?(listener, conn) do
body = TelemetryMetricsPrometheus.Core.scrape(Telemetry.prometheus_reporter())
conn

View File

@@ -4,9 +4,10 @@ defmodule Parrhesia.Web.RelayInfo do
"""
alias Parrhesia.API.Identity
alias Parrhesia.Web.Listener
@spec document() :: map()
def document do
@spec document(Listener.t()) :: map()
def document(listener) do
%{
"name" => "Parrhesia",
"description" => "Nostr/Marmot relay",
@@ -14,7 +15,7 @@ defmodule Parrhesia.Web.RelayInfo do
"supported_nips" => supported_nips(),
"software" => "https://git.teralink.net/self/parrhesia",
"version" => Application.spec(:parrhesia, :vsn) |> to_string(),
"limitation" => limitations()
"limitation" => limitations(listener)
}
end
@@ -31,13 +32,13 @@ defmodule Parrhesia.Web.RelayInfo do
with_negentropy ++ [86, 98]
end
defp limitations do
defp limitations(listener) do
%{
"max_message_length" => Parrhesia.Config.get([:limits, :max_frame_bytes], 1_048_576),
"max_subscriptions" =>
Parrhesia.Config.get([:limits, :max_subscriptions_per_connection], 32),
"max_filters" => Parrhesia.Config.get([:limits, :max_filters_per_req], 16),
"auth_required" => Parrhesia.Config.get([:policies, :auth_required_for_reads], false)
"auth_required" => Listener.relay_auth_required?(listener)
}
end

View File

@@ -3,12 +3,14 @@ defmodule Parrhesia.Web.RemoteIp do
import Bitwise
alias Parrhesia.Web.Listener
@spec init(term()) :: term()
def init(opts), do: opts
@spec call(Plug.Conn.t(), term()) :: Plug.Conn.t()
def call(conn, _opts) do
if trusted_proxy?(conn.remote_ip) do
if trusted_proxy?(conn) do
case forwarded_ip(conn) do
nil -> conn
forwarded_ip -> %{conn | remote_ip: forwarded_ip}
@@ -50,14 +52,22 @@ defmodule Parrhesia.Web.RemoteIp do
defp fallback_real_ip(ip, _conn), do: ip
defp trusted_proxy?(remote_ip) do
Enum.any?(trusted_proxies(), &ip_in_cidr?(remote_ip, &1))
defp trusted_proxy?(conn) do
Enum.any?(trusted_proxies(conn), &ip_in_cidr?(conn.remote_ip, &1))
end
defp trusted_proxies do
:parrhesia
|> Application.get_env(:trusted_proxies, [])
|> Enum.filter(&is_binary/1)
defp trusted_proxies(conn) do
listener = Listener.from_conn(conn)
case Listener.trusted_proxies(listener) do
[] ->
:parrhesia
|> Application.get_env(:trusted_proxies, [])
|> Enum.filter(&is_binary/1)
trusted_proxies ->
trusted_proxies
end
end
defp parse_x_forwarded_for(value) when is_binary(value) do

View File

@@ -4,11 +4,14 @@ defmodule Parrhesia.Web.Router do
use Plug.Router
alias Parrhesia.Policy.ConnectionPolicy
alias Parrhesia.Web.Listener
alias Parrhesia.Web.Management
alias Parrhesia.Web.Metrics
alias Parrhesia.Web.Readiness
alias Parrhesia.Web.RelayInfo
plug(:put_listener)
plug(Plug.Parsers,
parsers: [:json],
pass: ["application/json"],
@@ -32,42 +35,63 @@ defmodule Parrhesia.Web.Router do
end
get "/metrics" do
if Metrics.enabled_on_main_endpoint?() do
Metrics.handle(conn)
listener = Listener.from_conn(conn)
if Listener.feature_enabled?(listener, :metrics) do
case authorize_listener_request(conn, listener) do
:ok -> Metrics.handle(conn)
{:error, :forbidden} -> send_resp(conn, 403, "forbidden")
end
else
send_resp(conn, 404, "not found")
end
end
post "/management" do
case ConnectionPolicy.authorize_remote_ip(conn.remote_ip) do
:ok -> Management.handle(conn)
{:error, :ip_blocked} -> send_resp(conn, 403, "forbidden")
listener = Listener.from_conn(conn)
if Listener.feature_enabled?(listener, :admin) do
case authorize_listener_request(conn, listener) do
:ok -> Management.handle(conn, listener: listener)
{:error, :forbidden} -> send_resp(conn, 403, "forbidden")
end
else
send_resp(conn, 404, "not found")
end
end
get "/relay" do
case ConnectionPolicy.authorize_remote_ip(conn.remote_ip) do
:ok ->
if accepts_nip11?(conn) do
body = JSON.encode!(RelayInfo.document())
listener = Listener.from_conn(conn)
conn
|> put_resp_content_type("application/nostr+json")
|> send_resp(200, body)
else
conn
|> WebSockAdapter.upgrade(
Parrhesia.Web.Connection,
%{relay_url: relay_url(conn), remote_ip: remote_ip(conn)},
timeout: 60_000,
max_frame_size: max_frame_bytes()
)
|> halt()
end
if Listener.feature_enabled?(listener, :nostr) do
case authorize_listener_request(conn, listener) do
:ok ->
if accepts_nip11?(conn) do
body = JSON.encode!(RelayInfo.document(listener))
{:error, :ip_blocked} ->
send_resp(conn, 403, "forbidden")
conn
|> put_resp_content_type("application/nostr+json")
|> send_resp(200, body)
else
conn
|> WebSockAdapter.upgrade(
Parrhesia.Web.Connection,
%{
listener: listener,
relay_url: Listener.relay_url(listener, conn),
remote_ip: remote_ip(conn)
},
timeout: 60_000,
max_frame_size: max_frame_bytes()
)
|> halt()
end
{:error, :forbidden} ->
send_resp(conn, 403, "forbidden")
end
else
send_resp(conn, 404, "not found")
end
end
@@ -75,33 +99,37 @@ defmodule Parrhesia.Web.Router do
send_resp(conn, 404, "not found")
end
defp put_listener(conn, opts) do
case conn.private do
%{parrhesia_listener: _listener} -> conn
_other -> Listener.put_conn(conn, opts)
end
end
defp accepts_nip11?(conn) do
conn
|> get_req_header("accept")
|> Enum.any?(&String.contains?(&1, "application/nostr+json"))
end
defp relay_url(conn) do
ws_scheme = if conn.scheme == :https, do: "wss", else: "ws"
port_segment =
if default_http_port?(conn.scheme, conn.port) do
""
else
":#{conn.port}"
end
"#{ws_scheme}://#{conn.host}#{port_segment}#{conn.request_path}"
end
defp default_http_port?(:http, 80), do: true
defp default_http_port?(:https, 443), do: true
defp default_http_port?(_scheme, _port), do: false
defp max_frame_bytes do
Parrhesia.Config.get([:limits, :max_frame_bytes], 1_048_576)
end
defp authorize_listener_request(conn, listener) do
with :ok <- authorize_remote_ip(conn),
true <- Listener.remote_ip_allowed?(listener, conn.remote_ip) do
:ok
else
{:error, :ip_blocked} -> {:error, :forbidden}
false -> {:error, :forbidden}
end
end
defp authorize_remote_ip(conn) do
ConnectionPolicy.authorize_remote_ip(conn.remote_ip)
end
defp remote_ip(conn) do
case conn.remote_ip do
{_, _, _, _} = remote_ip -> :inet.ntoa(remote_ip) |> to_string()