Files
parrhesia/test/parrhesia/web/connection_test.exs

286 lines
8.9 KiB
Elixir

defmodule Parrhesia.Web.ConnectionTest do
use ExUnit.Case, async: false
alias Ecto.Adapters.SQL.Sandbox
alias Parrhesia.Protocol.EventValidator
alias Parrhesia.Repo
alias Parrhesia.Web.Connection
setup do
:ok = Sandbox.checkout(Repo)
:ok
end
test "REQ registers subscription, streams initial events and replies with EOSE" do
state = connection_state()
req_payload = Jason.encode!(["REQ", "sub-123", %{"kinds" => [1]}])
assert {:push, responses, next_state} =
Connection.handle_in({req_payload, [opcode: :text]}, state)
assert Map.has_key?(next_state.subscriptions, "sub-123")
assert next_state.subscriptions["sub-123"].filters == [%{"kinds" => [1]}]
assert next_state.subscriptions["sub-123"].eose_sent?
assert List.last(Enum.map(responses, fn {:text, frame} -> Jason.decode!(frame) end)) == [
"EOSE",
"sub-123"
]
end
test "COUNT returns exact count payload" do
state = connection_state()
payload = Jason.encode!(["COUNT", "sub-count", %{"kinds" => [1]}])
assert {:push, {:text, response}, ^state} =
Connection.handle_in({payload, [opcode: :text]}, state)
assert ["COUNT", "sub-count", payload] = Jason.decode!(response)
assert payload["count"] >= 0
assert payload["approximate"] == false
end
test "AUTH accepts valid challenge event" do
state = connection_state()
auth_event = valid_auth_event(state.auth_challenge)
payload = Jason.encode!(["AUTH", auth_event])
assert {:push, {:text, response}, next_state} =
Connection.handle_in({payload, [opcode: :text]}, state)
assert Jason.decode!(response) == ["OK", auth_event["id"], true, "ok: auth accepted"]
assert MapSet.member?(next_state.authenticated_pubkeys, auth_event["pubkey"])
refute next_state.auth_challenge == state.auth_challenge
end
test "AUTH rejects mismatched challenge and returns AUTH frame" do
state = connection_state()
auth_event = valid_auth_event("wrong-challenge")
payload = Jason.encode!(["AUTH", auth_event])
assert {:push, frames, ^state} = Connection.handle_in({payload, [opcode: :text]}, state)
decoded = Enum.map(frames, fn {:text, frame} -> Jason.decode!(frame) end)
assert Enum.any?(decoded, fn frame -> frame == ["AUTH", state.auth_challenge] end)
assert Enum.any?(decoded, fn frame ->
match?(["OK", _, false, _], frame)
end)
end
test "protected event is rejected unless authenticated" do
state = connection_state()
event =
valid_event()
|> Map.put("tags", [["-"]])
|> then(&Map.put(&1, "id", EventValidator.compute_id(&1)))
payload = Jason.encode!(["EVENT", event])
assert {:push, frames, ^state} = Connection.handle_in({payload, [opcode: :text]}, state)
decoded = Enum.map(frames, fn {:text, frame} -> Jason.decode!(frame) end)
assert ["OK", _, false, "auth-required: protected events require authenticated pubkey"] =
Enum.find(decoded, fn frame -> List.first(frame) == "OK" end)
assert Enum.any?(decoded, fn frame -> frame == ["AUTH", state.auth_challenge] end)
end
test "kind 445 REQ without #h is rejected" do
state = connection_state()
req_payload = Jason.encode!(["REQ", "sub-445", %{"kinds" => [445]}])
assert {:push, frames, ^state} = Connection.handle_in({req_payload, [opcode: :text]}, state)
decoded = Enum.map(frames, fn {:text, frame} -> Jason.decode!(frame) end)
assert ["CLOSED", "sub-445", "restricted: kind 445 queries must include a #h tag"] =
Enum.find(decoded, fn frame -> List.first(frame) == "CLOSED" end)
end
test "valid EVENT stores event and returns accepted OK" do
state = connection_state()
event = valid_event()
payload = Jason.encode!(["EVENT", event])
assert {:push, {:text, response}, ^state} =
Connection.handle_in({payload, [opcode: :text]}, state)
assert Jason.decode!(response) == ["OK", event["id"], true, "ok: event stored"]
end
test "invalid EVENT replies with OK false invalid prefix" do
state = connection_state()
event = valid_event() |> Map.put("sig", "nope")
payload = Jason.encode!(["EVENT", event])
assert {:push, {:text, response}, ^state} =
Connection.handle_in({payload, [opcode: :text]}, state)
assert Jason.decode!(response) == [
"OK",
event["id"],
false,
"invalid: sig must be 64-byte lowercase hex"
]
end
test "malformed wrapped welcome EVENT is rejected" do
state = connection_state()
event =
valid_event()
|> Map.put("kind", 1059)
|> Map.put("tags", [["e", String.duplicate("a", 64)]])
|> Map.put("content", "ciphertext")
|> then(&Map.put(&1, "id", EventValidator.compute_id(&1)))
payload = Jason.encode!(["EVENT", event])
assert {:push, {:text, response}, ^state} =
Connection.handle_in({payload, [opcode: :text]}, state)
assert Jason.decode!(response) == [
"OK",
event["id"],
false,
"invalid: kind 1059 must include at least one recipient p tag"
]
end
test "NEG sessions open and close" do
state = connection_state()
open_payload = Jason.encode!(["NEG-OPEN", "neg-1", %{"cursor" => 0}])
assert {:push, {:text, open_response}, ^state} =
Connection.handle_in({open_payload, [opcode: :text]}, state)
assert ["NEG-MSG", "neg-1", %{"status" => "open", "cursor" => 0}] =
Jason.decode!(open_response)
close_payload = Jason.encode!(["NEG-CLOSE", "neg-1"])
assert {:push, {:text, close_response}, ^state} =
Connection.handle_in({close_payload, [opcode: :text]}, state)
assert Jason.decode!(close_response) == ["NEG-MSG", "neg-1", %{"status" => "closed"}]
end
test "CLOSE removes subscription and replies with CLOSED" do
state = subscribed_connection_state([])
close_payload = Jason.encode!(["CLOSE", "sub-1"])
assert {:push, {:text, response}, next_state} =
Connection.handle_in({close_payload, [opcode: :text]}, state)
refute Map.has_key?(next_state.subscriptions, "sub-1")
assert Jason.decode!(response) == ["CLOSED", "sub-1", "error: subscription closed"]
end
test "fanout_event enqueues and drains matching events" do
state = subscribed_connection_state([])
event = live_event("event-1", 1)
assert {:ok, queued_state} = Connection.handle_info({:fanout_event, "sub-1", event}, state)
assert queued_state.outbound_queue_size == 1
assert_receive :drain_outbound_queue
assert {:push, [{:text, payload}], drained_state} =
Connection.handle_info(:drain_outbound_queue, queued_state)
assert drained_state.outbound_queue_size == 0
assert Jason.decode!(payload) == ["EVENT", "sub-1", event]
end
test "outbound queue overflow closes connection when strategy is close" do
state =
subscribed_connection_state(
max_outbound_queue: 1,
outbound_overflow_strategy: :close,
outbound_drain_batch_size: 1
)
assert {:ok, queued_state} =
Connection.handle_info({:fanout_event, "sub-1", live_event("event-1", 1)}, state)
assert_receive :drain_outbound_queue
assert {:stop, :normal, {1008, message}, [{:text, notice_payload}], _overflow_state} =
Connection.handle_info(
{:fanout_event, "sub-1", live_event("event-2", 1)},
queued_state
)
assert message == "rate-limited: outbound queue overflow"
assert Jason.decode!(notice_payload) == ["NOTICE", message]
end
defp subscribed_connection_state(opts) do
state = connection_state(opts)
req_payload = Jason.encode!(["REQ", "sub-1", %{"kinds" => [1]}])
assert {:push, _, subscribed_state} =
Connection.handle_in({req_payload, [opcode: :text]}, state)
subscribed_state
end
defp connection_state(opts \\ []) do
{:ok, state} = Connection.init(Keyword.put_new(opts, :subscription_index, nil))
state
end
defp live_event(id, kind) do
%{
"id" => id,
"pubkey" => String.duplicate("a", 64),
"created_at" => System.system_time(:second),
"kind" => kind,
"tags" => [],
"content" => "live",
"sig" => String.duplicate("b", 128)
}
end
defp valid_auth_event(challenge) do
now = System.system_time(:second)
base = %{
"pubkey" => String.duplicate("9", 64),
"created_at" => now,
"kind" => 22_242,
"tags" => [["challenge", challenge]],
"content" => "",
"sig" => String.duplicate("8", 128)
}
Map.put(base, "id", EventValidator.compute_id(base))
end
defp valid_event do
base_event = %{
"pubkey" => String.duplicate("1", 64),
"created_at" => System.system_time(:second),
"kind" => 1,
"tags" => [],
"content" => "hello",
"sig" => String.duplicate("3", 128)
}
Map.put(base_event, "id", EventValidator.compute_id(base_event))
end
end