phase1: add websocket edge and nostr message codec

This commit is contained in:
2026-03-13 19:00:41 +01:00
parent 5e478cd305
commit 953ccb60f4
11 changed files with 360 additions and 4 deletions

View File

@@ -11,8 +11,8 @@ Implementation checklist for Parrhesia relay.
## Phase 1 — protocol core (NIP-01) ## Phase 1 — protocol core (NIP-01)
- [ ] Implement websocket endpoint + per-connection process - [x] Implement websocket endpoint + per-connection process
- [ ] Implement message decode/encode for `EVENT`, `REQ`, `CLOSE` - [x] Implement message decode/encode for `EVENT`, `REQ`, `CLOSE`
- [ ] Implement strict event validation (`id`, `sig`, shape, timestamps) - [ ] Implement strict event validation (`id`, `sig`, shape, timestamps)
- [ ] Implement filter evaluation engine (AND/OR semantics) - [ ] Implement filter evaluation engine (AND/OR semantics)
- [ ] Implement subscription lifecycle + `EOSE` behavior - [ ] Implement subscription lifecycle + `EOSE` behavior

View File

@@ -1,3 +1,7 @@
import Config import Config
config :logger, level: :warning config :logger, level: :warning
config :parrhesia, Parrhesia.Web.Endpoint,
port: 0,
ip: {127, 0, 0, 1}

113
lib/parrhesia/protocol.ex Normal file
View File

@@ -0,0 +1,113 @@
defmodule Parrhesia.Protocol do
@moduledoc """
Nostr protocol message decode/encode helpers.
"""
@type event :: map()
@type filter :: map()
@type client_message ::
{:event, event()}
| {:req, String.t(), [filter()]}
| {:close, String.t()}
@type relay_message ::
{:notice, String.t()}
| {:ok, String.t(), boolean(), String.t()}
| {:closed, String.t(), String.t()}
| {:eose, String.t()}
| {:event, String.t(), event()}
@type decode_error ::
:invalid_json
| :invalid_message
| :invalid_event
| :invalid_subscription_id
| :invalid_filters
@spec decode_client(binary()) :: {:ok, client_message()} | {:error, decode_error()}
def decode_client(payload) when is_binary(payload) do
with {:ok, decoded} <- decode_json(payload) do
decode_message(decoded)
end
end
@spec encode_relay(relay_message()) :: binary()
def encode_relay(message) do
message
|> relay_frame()
|> Jason.encode!()
end
@spec decode_error_notice(decode_error()) :: String.t()
def decode_error_notice(reason) do
case reason do
:invalid_json -> "error:invalid: malformed JSON"
:invalid_message -> "error:invalid: unsupported message shape"
:invalid_event -> "error:invalid: invalid EVENT shape"
:invalid_subscription_id -> "error:invalid: invalid subscription id"
:invalid_filters -> "error:invalid: invalid filters"
end
end
defp decode_json(payload) do
case Jason.decode(payload) do
{:ok, decoded} -> {:ok, decoded}
{:error, _reason} -> {:error, :invalid_json}
end
end
defp decode_message(["EVENT", event]) do
case valid_event?(event) do
true -> {:ok, {:event, event}}
false -> {:error, :invalid_event}
end
end
defp decode_message(["REQ", subscription_id | filters]) when is_binary(subscription_id) do
cond do
filters == [] ->
{:error, :invalid_filters}
Enum.all?(filters, &is_map/1) ->
{:ok, {:req, subscription_id, filters}}
true ->
{:error, :invalid_filters}
end
end
defp decode_message(["REQ", _subscription_id | _filters]),
do: {:error, :invalid_subscription_id}
defp decode_message(["CLOSE", subscription_id]) when is_binary(subscription_id) do
{:ok, {:close, subscription_id}}
end
defp decode_message(["CLOSE", _subscription_id]), do: {:error, :invalid_subscription_id}
defp decode_message(_other), do: {:error, :invalid_message}
defp valid_event?(%{
"id" => id,
"pubkey" => pubkey,
"created_at" => created_at,
"kind" => kind,
"tags" => tags,
"content" => content,
"sig" => sig
}) do
is_binary(id) and is_binary(pubkey) and is_integer(created_at) and is_integer(kind) and
is_list(tags) and Enum.all?(tags, &valid_tag?/1) and is_binary(content) and is_binary(sig)
end
defp valid_event?(_other), do: false
defp valid_tag?(tag) when is_list(tag), do: Enum.all?(tag, &is_binary/1)
defp valid_tag?(_other), do: false
defp relay_frame({:notice, message}), do: ["NOTICE", message]
defp relay_frame({:ok, event_id, accepted, message}), do: ["OK", event_id, accepted, message]
defp relay_frame({:closed, subscription_id, message}), do: ["CLOSED", subscription_id, message]
defp relay_frame({:eose, subscription_id}), do: ["EOSE", subscription_id]
defp relay_frame({:event, subscription_id, event}), do: ["EVENT", subscription_id, event]
end

View File

@@ -0,0 +1,80 @@
defmodule Parrhesia.Web.Connection do
@moduledoc """
Per-connection websocket process state and message handling.
"""
@behaviour WebSock
alias Parrhesia.Protocol
defstruct subscriptions: MapSet.new(), authenticated_pubkeys: MapSet.new()
@type t :: %__MODULE__{
subscriptions: MapSet.t(String.t()),
authenticated_pubkeys: MapSet.t(String.t())
}
@impl true
def init(_opts) do
{:ok, %__MODULE__{}}
end
@impl true
def handle_in({payload, [opcode: :text]}, %__MODULE__{} = state) do
case Protocol.decode_client(payload) do
{:ok, {:event, event}} ->
event_id = Map.get(event, "id", "")
response =
Protocol.encode_relay({
:ok,
event_id,
false,
"error:unsupported: EVENT ingest not implemented"
})
{:push, {:text, response}, state}
{:ok, {:req, subscription_id, _filters}} ->
next_state = put_subscription(state, subscription_id)
response = Protocol.encode_relay({:eose, subscription_id})
{:push, {:text, response}, next_state}
{:ok, {:close, subscription_id}} ->
next_state = drop_subscription(state, subscription_id)
response =
Protocol.encode_relay({:closed, subscription_id, "closed: subscription closed"})
{:push, {:text, response}, next_state}
{:error, reason} ->
response = Protocol.encode_relay({:notice, Protocol.decode_error_notice(reason)})
{:push, {:text, response}, state}
end
end
@impl true
def handle_in({_payload, [opcode: :binary]}, %__MODULE__{} = state) do
response =
Protocol.encode_relay({:notice, "error:invalid: binary websocket frames are not supported"})
{:push, {:text, response}, state}
end
@impl true
def handle_info(_message, %__MODULE__{} = state) do
{:ok, state}
end
defp put_subscription(%__MODULE__{} = state, subscription_id) do
subscriptions = MapSet.put(state.subscriptions, subscription_id)
%__MODULE__{state | subscriptions: subscriptions}
end
defp drop_subscription(%__MODULE__{} = state, subscription_id) do
subscriptions = MapSet.delete(state.subscriptions, subscription_id)
%__MODULE__{state | subscriptions: subscriptions}
end
end

View File

@@ -10,7 +10,20 @@ defmodule Parrhesia.Web.Endpoint do
end end
@impl true @impl true
def init(_init_arg) do def init(init_arg) do
Supervisor.init([], strategy: :one_for_one) children = [
{Bandit, bandit_options(init_arg)}
]
Supervisor.init(children, strategy: :one_for_one)
end
defp bandit_options(overrides) do
configured = Application.get_env(:parrhesia, __MODULE__, [])
configured
|> Keyword.merge(overrides)
|> Keyword.put_new(:scheme, :http)
|> Keyword.put_new(:plug, Parrhesia.Web.Router)
end end
end end

View File

@@ -0,0 +1,22 @@
defmodule Parrhesia.Web.Router do
@moduledoc false
use Plug.Router
plug(:match)
plug(:dispatch)
get "/health" do
send_resp(conn, 200, "ok")
end
get "/relay" do
conn
|> WebSockAdapter.upgrade(Parrhesia.Web.Connection, %{}, timeout: 60_000)
|> halt()
end
match _ do
send_resp(conn, 404, "not found")
end
end

View File

@@ -30,6 +30,7 @@ defmodule Parrhesia.MixProject do
# Runtime: web + protocol edge # Runtime: web + protocol edge
{:bandit, "~> 1.5"}, {:bandit, "~> 1.5"},
{:plug, "~> 1.15"}, {:plug, "~> 1.15"},
{:websock_adapter, "~> 0.5"},
# Runtime: storage adapter (Postgres first) # Runtime: storage adapter (Postgres first)
{:ecto_sql, "~> 3.12"}, {:ecto_sql, "~> 3.12"},

View File

@@ -42,5 +42,6 @@
"text_diff": {:hex, :text_diff, "0.1.0", "1caf3175e11a53a9a139bc9339bd607c47b9e376b073d4571c031913317fecaa", [:mix], [], "hexpm", "d1ffaaecab338e49357b6daa82e435f877e0649041ace7755583a0ea3362dbd7"}, "text_diff": {:hex, :text_diff, "0.1.0", "1caf3175e11a53a9a139bc9339bd607c47b9e376b073d4571c031913317fecaa", [:mix], [], "hexpm", "d1ffaaecab338e49357b6daa82e435f877e0649041ace7755583a0ea3362dbd7"},
"thousand_island": {:hex, :thousand_island, "1.4.3", "2158209580f633be38d43ec4e3ce0a01079592b9657afff9080d5d8ca149a3af", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "6e4ce09b0fd761a58594d02814d40f77daff460c48a7354a15ab353bb998ea0b"}, "thousand_island": {:hex, :thousand_island, "1.4.3", "2158209580f633be38d43ec4e3ce0a01079592b9657afff9080d5d8ca149a3af", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "6e4ce09b0fd761a58594d02814d40f77daff460c48a7354a15ab353bb998ea0b"},
"websock": {:hex, :websock, "0.5.3", "2f69a6ebe810328555b6fe5c831a851f485e303a7c8ce6c5f675abeb20ebdadc", [:mix], [], "hexpm", "6105453d7fac22c712ad66fab1d45abdf049868f253cf719b625151460b8b453"}, "websock": {:hex, :websock, "0.5.3", "2f69a6ebe810328555b6fe5c831a851f485e303a7c8ce6c5f675abeb20ebdadc", [:mix], [], "hexpm", "6105453d7fac22c712ad66fab1d45abdf049868f253cf719b625151460b8b453"},
"websock_adapter": {:hex, :websock_adapter, "0.5.9", "43dc3ba6d89ef5dec5b1d0a39698436a1e856d000d84bf31a3149862b01a287f", [:mix], [{:bandit, ">= 0.6.0", [hex: :bandit, repo: "hexpm", optional: true]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 2.6", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:websock, "~> 0.5", [hex: :websock, repo: "hexpm", optional: false]}], "hexpm", "5534d5c9adad3c18a0f58a9371220d75a803bf0b9a3d87e6fe072faaeed76a08"},
"websockex": {:hex, :websockex, "0.5.1", "9de28d37bbe34f371eb46e29b79c94c94fff79f93c960d842fbf447253558eb4", [:mix], [{:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "8ef39576ed56bc3804c9cd8626f8b5d6b5721848d2726c0ccd4f05385a3c9f14"}, "websockex": {:hex, :websockex, "0.5.1", "9de28d37bbe34f371eb46e29b79c94c94fff79f93c960d842fbf447253558eb4", [:mix], [{:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "8ef39576ed56bc3804c9cd8626f8b5d6b5721848d2726c0ccd4f05385a3c9f14"},
} }

View File

@@ -11,5 +11,10 @@ defmodule Parrhesia.ApplicationTest do
assert is_pid(Process.whereis(Parrhesia.Policy.Supervisor)) assert is_pid(Process.whereis(Parrhesia.Policy.Supervisor))
assert is_pid(Process.whereis(Parrhesia.Web.Endpoint)) assert is_pid(Process.whereis(Parrhesia.Web.Endpoint))
assert is_pid(Process.whereis(Parrhesia.Tasks.Supervisor)) assert is_pid(Process.whereis(Parrhesia.Tasks.Supervisor))
assert Enum.any?(Supervisor.which_children(Parrhesia.Web.Endpoint), fn {_id, pid, _type,
modules} ->
is_pid(pid) and modules == [Bandit]
end)
end end
end end

View File

@@ -0,0 +1,48 @@
defmodule Parrhesia.ProtocolTest do
use ExUnit.Case, async: true
alias Parrhesia.Protocol
test "decodes valid EVENT frame" do
payload =
Jason.encode!([
"EVENT",
%{
"id" => String.duplicate("0", 64),
"pubkey" => String.duplicate("1", 64),
"created_at" => 1_715_000_000,
"kind" => 1,
"tags" => [["p", String.duplicate("2", 64)]],
"content" => "hello",
"sig" => String.duplicate("3", 128)
}
])
assert {:ok, {:event, event}} = Protocol.decode_client(payload)
assert event["kind"] == 1
assert event["content"] == "hello"
end
test "decodes valid REQ and CLOSE frames" do
req_payload = Jason.encode!(["REQ", "sub-1", %{"authors" => [String.duplicate("a", 64)]}])
close_payload = Jason.encode!(["CLOSE", "sub-1"])
assert {:ok, {:req, "sub-1", [%{"authors" => [_author]}]}} =
Protocol.decode_client(req_payload)
assert {:ok, {:close, "sub-1"}} = Protocol.decode_client(close_payload)
end
test "returns decode errors for malformed messages" do
assert {:error, :invalid_json} = Protocol.decode_client("not-json")
assert {:error, :invalid_filters} = Protocol.decode_client(Jason.encode!(["REQ", "sub-1"]))
assert {:error, :invalid_event} =
Protocol.decode_client(Jason.encode!(["EVENT", %{"id" => "nope"}]))
end
test "encodes relay messages" do
frame = Protocol.encode_relay({:closed, "sub-1", "closed: subscription closed"})
assert Jason.decode!(frame) == ["CLOSED", "sub-1", "closed: subscription closed"]
end
end

View File

@@ -0,0 +1,69 @@
defmodule Parrhesia.Web.ConnectionTest do
use ExUnit.Case, async: true
alias Parrhesia.Web.Connection
test "REQ registers subscription and replies with EOSE" do
{:ok, state} = Connection.init(%{})
req_payload = Jason.encode!(["REQ", "sub-123", %{"kinds" => [1]}])
assert {:push, {:text, response}, next_state} =
Connection.handle_in({req_payload, [opcode: :text]}, state)
assert MapSet.member?(next_state.subscriptions, "sub-123")
assert Jason.decode!(response) == ["EOSE", "sub-123"]
end
test "CLOSE removes subscription and replies with CLOSED" do
{:ok, state} = Connection.init(%{})
req_payload = Jason.encode!(["REQ", "sub-123", %{"kinds" => [1]}])
{:push, _, subscribed_state} = Connection.handle_in({req_payload, [opcode: :text]}, state)
close_payload = Jason.encode!(["CLOSE", "sub-123"])
assert {:push, {:text, response}, next_state} =
Connection.handle_in({close_payload, [opcode: :text]}, subscribed_state)
refute MapSet.member?(next_state.subscriptions, "sub-123")
assert Jason.decode!(response) == ["CLOSED", "sub-123", "closed: subscription closed"]
end
test "invalid input returns NOTICE" do
{:ok, state} = Connection.init(%{})
assert {:push, {:text, response}, ^state} =
Connection.handle_in({"not-json", [opcode: :text]}, state)
assert Jason.decode!(response) == ["NOTICE", "error:invalid: malformed JSON"]
end
test "EVENT currently replies with unsupported OK" do
{:ok, state} = Connection.init(%{})
payload =
Jason.encode!([
"EVENT",
%{
"id" => String.duplicate("0", 64),
"pubkey" => String.duplicate("1", 64),
"created_at" => 1_715_000_000,
"kind" => 1,
"tags" => [],
"content" => "hello",
"sig" => String.duplicate("3", 128)
}
])
assert {:push, {:text, response}, ^state} =
Connection.handle_in({payload, [opcode: :text]}, state)
assert Jason.decode!(response) == [
"OK",
String.duplicate("0", 64),
false,
"error:unsupported: EVENT ingest not implemented"
]
end
end