From 953ccb60f4fd9a87e98a6f4dc6dd1001659cd159 Mon Sep 17 00:00:00 2001 From: Steffen Beyer Date: Fri, 13 Mar 2026 19:00:41 +0100 Subject: [PATCH] phase1: add websocket edge and nostr message codec --- PROGRESS.md | 4 +- config/test.exs | 4 + lib/parrhesia/protocol.ex | 113 +++++++++++++++++++++++++ lib/parrhesia/web/connection.ex | 80 +++++++++++++++++ lib/parrhesia/web/endpoint.ex | 17 +++- lib/parrhesia/web/router.ex | 22 +++++ mix.exs | 1 + mix.lock | 1 + test/parrhesia/application_test.exs | 5 ++ test/parrhesia/protocol_test.exs | 48 +++++++++++ test/parrhesia/web/connection_test.exs | 69 +++++++++++++++ 11 files changed, 360 insertions(+), 4 deletions(-) create mode 100644 lib/parrhesia/protocol.ex create mode 100644 lib/parrhesia/web/connection.ex create mode 100644 lib/parrhesia/web/router.ex create mode 100644 test/parrhesia/protocol_test.exs create mode 100644 test/parrhesia/web/connection_test.exs diff --git a/PROGRESS.md b/PROGRESS.md index 8639717..1f67189 100644 --- a/PROGRESS.md +++ b/PROGRESS.md @@ -11,8 +11,8 @@ Implementation checklist for Parrhesia relay. ## Phase 1 — protocol core (NIP-01) -- [ ] Implement websocket endpoint + per-connection process -- [ ] Implement message decode/encode for `EVENT`, `REQ`, `CLOSE` +- [x] Implement websocket endpoint + per-connection process +- [x] Implement message decode/encode for `EVENT`, `REQ`, `CLOSE` - [ ] Implement strict event validation (`id`, `sig`, shape, timestamps) - [ ] Implement filter evaluation engine (AND/OR semantics) - [ ] Implement subscription lifecycle + `EOSE` behavior diff --git a/config/test.exs b/config/test.exs index 63787d4..f533db2 100644 --- a/config/test.exs +++ b/config/test.exs @@ -1,3 +1,7 @@ import Config config :logger, level: :warning + +config :parrhesia, Parrhesia.Web.Endpoint, + port: 0, + ip: {127, 0, 0, 1} diff --git a/lib/parrhesia/protocol.ex b/lib/parrhesia/protocol.ex new file mode 100644 index 0000000..8c6db80 --- /dev/null +++ b/lib/parrhesia/protocol.ex @@ -0,0 +1,113 @@ +defmodule Parrhesia.Protocol do + @moduledoc """ + Nostr protocol message decode/encode helpers. + """ + + @type event :: map() + @type filter :: map() + + @type client_message :: + {:event, event()} + | {:req, String.t(), [filter()]} + | {:close, String.t()} + + @type relay_message :: + {:notice, String.t()} + | {:ok, String.t(), boolean(), String.t()} + | {:closed, String.t(), String.t()} + | {:eose, String.t()} + | {:event, String.t(), event()} + + @type decode_error :: + :invalid_json + | :invalid_message + | :invalid_event + | :invalid_subscription_id + | :invalid_filters + + @spec decode_client(binary()) :: {:ok, client_message()} | {:error, decode_error()} + def decode_client(payload) when is_binary(payload) do + with {:ok, decoded} <- decode_json(payload) do + decode_message(decoded) + end + end + + @spec encode_relay(relay_message()) :: binary() + def encode_relay(message) do + message + |> relay_frame() + |> Jason.encode!() + end + + @spec decode_error_notice(decode_error()) :: String.t() + def decode_error_notice(reason) do + case reason do + :invalid_json -> "error:invalid: malformed JSON" + :invalid_message -> "error:invalid: unsupported message shape" + :invalid_event -> "error:invalid: invalid EVENT shape" + :invalid_subscription_id -> "error:invalid: invalid subscription id" + :invalid_filters -> "error:invalid: invalid filters" + end + end + + defp decode_json(payload) do + case Jason.decode(payload) do + {:ok, decoded} -> {:ok, decoded} + {:error, _reason} -> {:error, :invalid_json} + end + end + + defp decode_message(["EVENT", event]) do + case valid_event?(event) do + true -> {:ok, {:event, event}} + false -> {:error, :invalid_event} + end + end + + defp decode_message(["REQ", subscription_id | filters]) when is_binary(subscription_id) do + cond do + filters == [] -> + {:error, :invalid_filters} + + Enum.all?(filters, &is_map/1) -> + {:ok, {:req, subscription_id, filters}} + + true -> + {:error, :invalid_filters} + end + end + + defp decode_message(["REQ", _subscription_id | _filters]), + do: {:error, :invalid_subscription_id} + + defp decode_message(["CLOSE", subscription_id]) when is_binary(subscription_id) do + {:ok, {:close, subscription_id}} + end + + defp decode_message(["CLOSE", _subscription_id]), do: {:error, :invalid_subscription_id} + defp decode_message(_other), do: {:error, :invalid_message} + + defp valid_event?(%{ + "id" => id, + "pubkey" => pubkey, + "created_at" => created_at, + "kind" => kind, + "tags" => tags, + "content" => content, + "sig" => sig + }) do + is_binary(id) and is_binary(pubkey) and is_integer(created_at) and is_integer(kind) and + is_list(tags) and Enum.all?(tags, &valid_tag?/1) and is_binary(content) and is_binary(sig) + end + + defp valid_event?(_other), do: false + + defp valid_tag?(tag) when is_list(tag), do: Enum.all?(tag, &is_binary/1) + defp valid_tag?(_other), do: false + + defp relay_frame({:notice, message}), do: ["NOTICE", message] + defp relay_frame({:ok, event_id, accepted, message}), do: ["OK", event_id, accepted, message] + defp relay_frame({:closed, subscription_id, message}), do: ["CLOSED", subscription_id, message] + defp relay_frame({:eose, subscription_id}), do: ["EOSE", subscription_id] + defp relay_frame({:event, subscription_id, event}), do: ["EVENT", subscription_id, event] +end diff --git a/lib/parrhesia/web/connection.ex b/lib/parrhesia/web/connection.ex new file mode 100644 index 0000000..cfd0fd5 --- /dev/null +++ b/lib/parrhesia/web/connection.ex @@ -0,0 +1,80 @@ +defmodule Parrhesia.Web.Connection do + @moduledoc """ + Per-connection websocket process state and message handling. + """ + + @behaviour WebSock + + alias Parrhesia.Protocol + + defstruct subscriptions: MapSet.new(), authenticated_pubkeys: MapSet.new() + + @type t :: %__MODULE__{ + subscriptions: MapSet.t(String.t()), + authenticated_pubkeys: MapSet.t(String.t()) + } + + @impl true + def init(_opts) do + {:ok, %__MODULE__{}} + end + + @impl true + def handle_in({payload, [opcode: :text]}, %__MODULE__{} = state) do + case Protocol.decode_client(payload) do + {:ok, {:event, event}} -> + event_id = Map.get(event, "id", "") + + response = + Protocol.encode_relay({ + :ok, + event_id, + false, + "error:unsupported: EVENT ingest not implemented" + }) + + {:push, {:text, response}, state} + + {:ok, {:req, subscription_id, _filters}} -> + next_state = put_subscription(state, subscription_id) + response = Protocol.encode_relay({:eose, subscription_id}) + + {:push, {:text, response}, next_state} + + {:ok, {:close, subscription_id}} -> + next_state = drop_subscription(state, subscription_id) + + response = + Protocol.encode_relay({:closed, subscription_id, "closed: subscription closed"}) + + {:push, {:text, response}, next_state} + + {:error, reason} -> + response = Protocol.encode_relay({:notice, Protocol.decode_error_notice(reason)}) + {:push, {:text, response}, state} + end + end + + @impl true + def handle_in({_payload, [opcode: :binary]}, %__MODULE__{} = state) do + response = + Protocol.encode_relay({:notice, "error:invalid: binary websocket frames are not supported"}) + + {:push, {:text, response}, state} + end + + @impl true + def handle_info(_message, %__MODULE__{} = state) do + {:ok, state} + end + + defp put_subscription(%__MODULE__{} = state, subscription_id) do + subscriptions = MapSet.put(state.subscriptions, subscription_id) + %__MODULE__{state | subscriptions: subscriptions} + end + + defp drop_subscription(%__MODULE__{} = state, subscription_id) do + subscriptions = MapSet.delete(state.subscriptions, subscription_id) + %__MODULE__{state | subscriptions: subscriptions} + end +end diff --git a/lib/parrhesia/web/endpoint.ex b/lib/parrhesia/web/endpoint.ex index 929f97f..5f0ee3b 100644 --- a/lib/parrhesia/web/endpoint.ex +++ b/lib/parrhesia/web/endpoint.ex @@ -10,7 +10,20 @@ defmodule Parrhesia.Web.Endpoint do end @impl true - def init(_init_arg) do - Supervisor.init([], strategy: :one_for_one) + def init(init_arg) do + children = [ + {Bandit, bandit_options(init_arg)} + ] + + Supervisor.init(children, strategy: :one_for_one) + end + + defp bandit_options(overrides) do + configured = Application.get_env(:parrhesia, __MODULE__, []) + + configured + |> Keyword.merge(overrides) + |> Keyword.put_new(:scheme, :http) + |> Keyword.put_new(:plug, Parrhesia.Web.Router) end end diff --git a/lib/parrhesia/web/router.ex b/lib/parrhesia/web/router.ex new file mode 100644 index 0000000..2bf8218 --- /dev/null +++ b/lib/parrhesia/web/router.ex @@ -0,0 +1,22 @@ +defmodule Parrhesia.Web.Router do + @moduledoc false + + use Plug.Router + + plug(:match) + plug(:dispatch) + + get "/health" do + send_resp(conn, 200, "ok") + end + + get "/relay" do + conn + |> WebSockAdapter.upgrade(Parrhesia.Web.Connection, %{}, timeout: 60_000) + |> halt() + end + + match _ do + send_resp(conn, 404, "not found") + end +end diff --git a/mix.exs b/mix.exs index ce1ac59..b823a42 100644 --- a/mix.exs +++ b/mix.exs @@ -30,6 +30,7 @@ defmodule Parrhesia.MixProject do # Runtime: web + protocol edge {:bandit, "~> 1.5"}, {:plug, "~> 1.15"}, + {:websock_adapter, "~> 0.5"}, # Runtime: storage adapter (Postgres first) {:ecto_sql, "~> 3.12"}, diff --git a/mix.lock b/mix.lock index 3f5d323..7036cd0 100644 --- a/mix.lock +++ b/mix.lock @@ -42,5 +42,6 @@ "text_diff": {:hex, :text_diff, "0.1.0", "1caf3175e11a53a9a139bc9339bd607c47b9e376b073d4571c031913317fecaa", [:mix], [], "hexpm", "d1ffaaecab338e49357b6daa82e435f877e0649041ace7755583a0ea3362dbd7"}, "thousand_island": {:hex, :thousand_island, "1.4.3", "2158209580f633be38d43ec4e3ce0a01079592b9657afff9080d5d8ca149a3af", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "6e4ce09b0fd761a58594d02814d40f77daff460c48a7354a15ab353bb998ea0b"}, "websock": {:hex, :websock, "0.5.3", "2f69a6ebe810328555b6fe5c831a851f485e303a7c8ce6c5f675abeb20ebdadc", [:mix], [], "hexpm", "6105453d7fac22c712ad66fab1d45abdf049868f253cf719b625151460b8b453"}, + "websock_adapter": {:hex, :websock_adapter, "0.5.9", "43dc3ba6d89ef5dec5b1d0a39698436a1e856d000d84bf31a3149862b01a287f", [:mix], [{:bandit, ">= 0.6.0", [hex: :bandit, repo: "hexpm", optional: true]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 2.6", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:websock, "~> 0.5", [hex: :websock, repo: "hexpm", optional: false]}], "hexpm", "5534d5c9adad3c18a0f58a9371220d75a803bf0b9a3d87e6fe072faaeed76a08"}, "websockex": {:hex, :websockex, "0.5.1", "9de28d37bbe34f371eb46e29b79c94c94fff79f93c960d842fbf447253558eb4", [:mix], [{:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "8ef39576ed56bc3804c9cd8626f8b5d6b5721848d2726c0ccd4f05385a3c9f14"}, } diff --git a/test/parrhesia/application_test.exs b/test/parrhesia/application_test.exs index 64d4274..f089c39 100644 --- a/test/parrhesia/application_test.exs +++ b/test/parrhesia/application_test.exs @@ -11,5 +11,10 @@ defmodule Parrhesia.ApplicationTest do assert is_pid(Process.whereis(Parrhesia.Policy.Supervisor)) assert is_pid(Process.whereis(Parrhesia.Web.Endpoint)) assert is_pid(Process.whereis(Parrhesia.Tasks.Supervisor)) + + assert Enum.any?(Supervisor.which_children(Parrhesia.Web.Endpoint), fn {_id, pid, _type, + modules} -> + is_pid(pid) and modules == [Bandit] + end) end end diff --git a/test/parrhesia/protocol_test.exs b/test/parrhesia/protocol_test.exs new file mode 100644 index 0000000..2481b95 --- /dev/null +++ b/test/parrhesia/protocol_test.exs @@ -0,0 +1,48 @@ +defmodule Parrhesia.ProtocolTest do + use ExUnit.Case, async: true + + alias Parrhesia.Protocol + + test "decodes valid EVENT frame" do + payload = + Jason.encode!([ + "EVENT", + %{ + "id" => String.duplicate("0", 64), + "pubkey" => String.duplicate("1", 64), + "created_at" => 1_715_000_000, + "kind" => 1, + "tags" => [["p", String.duplicate("2", 64)]], + "content" => "hello", + "sig" => String.duplicate("3", 128) + } + ]) + + assert {:ok, {:event, event}} = Protocol.decode_client(payload) + assert event["kind"] == 1 + assert event["content"] == "hello" + end + + test "decodes valid REQ and CLOSE frames" do + req_payload = Jason.encode!(["REQ", "sub-1", %{"authors" => [String.duplicate("a", 64)]}]) + close_payload = Jason.encode!(["CLOSE", "sub-1"]) + + assert {:ok, {:req, "sub-1", [%{"authors" => [_author]}]}} = + Protocol.decode_client(req_payload) + + assert {:ok, {:close, "sub-1"}} = Protocol.decode_client(close_payload) + end + + test "returns decode errors for malformed messages" do + assert {:error, :invalid_json} = Protocol.decode_client("not-json") + assert {:error, :invalid_filters} = Protocol.decode_client(Jason.encode!(["REQ", "sub-1"])) + + assert {:error, :invalid_event} = + Protocol.decode_client(Jason.encode!(["EVENT", %{"id" => "nope"}])) + end + + test "encodes relay messages" do + frame = Protocol.encode_relay({:closed, "sub-1", "closed: subscription closed"}) + assert Jason.decode!(frame) == ["CLOSED", "sub-1", "closed: subscription closed"] + end +end diff --git a/test/parrhesia/web/connection_test.exs b/test/parrhesia/web/connection_test.exs new file mode 100644 index 0000000..39172e0 --- /dev/null +++ b/test/parrhesia/web/connection_test.exs @@ -0,0 +1,69 @@ +defmodule Parrhesia.Web.ConnectionTest do + use ExUnit.Case, async: true + + alias Parrhesia.Web.Connection + + test "REQ registers subscription and replies with EOSE" do + {:ok, state} = Connection.init(%{}) + + req_payload = Jason.encode!(["REQ", "sub-123", %{"kinds" => [1]}]) + + assert {:push, {:text, response}, next_state} = + Connection.handle_in({req_payload, [opcode: :text]}, state) + + assert MapSet.member?(next_state.subscriptions, "sub-123") + assert Jason.decode!(response) == ["EOSE", "sub-123"] + end + + test "CLOSE removes subscription and replies with CLOSED" do + {:ok, state} = Connection.init(%{}) + + req_payload = Jason.encode!(["REQ", "sub-123", %{"kinds" => [1]}]) + {:push, _, subscribed_state} = Connection.handle_in({req_payload, [opcode: :text]}, state) + + close_payload = Jason.encode!(["CLOSE", "sub-123"]) + + assert {:push, {:text, response}, next_state} = + Connection.handle_in({close_payload, [opcode: :text]}, subscribed_state) + + refute MapSet.member?(next_state.subscriptions, "sub-123") + assert Jason.decode!(response) == ["CLOSED", "sub-123", "closed: subscription closed"] + end + + test "invalid input returns NOTICE" do + {:ok, state} = Connection.init(%{}) + + assert {:push, {:text, response}, ^state} = + Connection.handle_in({"not-json", [opcode: :text]}, state) + + assert Jason.decode!(response) == ["NOTICE", "error:invalid: malformed JSON"] + end + + test "EVENT currently replies with unsupported OK" do + {:ok, state} = Connection.init(%{}) + + payload = + Jason.encode!([ + "EVENT", + %{ + "id" => String.duplicate("0", 64), + "pubkey" => String.duplicate("1", 64), + "created_at" => 1_715_000_000, + "kind" => 1, + "tags" => [], + "content" => "hello", + "sig" => String.duplicate("3", 128) + } + ]) + + assert {:push, {:text, response}, ^state} = + Connection.handle_in({payload, [opcode: :text]}, state) + + assert Jason.decode!(response) == [ + "OK", + String.duplicate("0", 64), + false, + "error:unsupported: EVENT ingest not implemented" + ] + end +end