Separate read pool and harden fanout state handling

This commit is contained in:
2026-03-18 17:21:58 +01:00
parent dce473662f
commit c377ed4b62
24 changed files with 626 additions and 258 deletions

View File

@@ -5,6 +5,7 @@ defmodule Parrhesia.API.Stream.Subscription do
alias Parrhesia.Protocol.Filter
alias Parrhesia.Subscriptions.Index
alias Parrhesia.Telemetry
defstruct [
:ref,
@@ -57,6 +58,7 @@ defmodule Parrhesia.API.Stream.Subscription do
buffered_events: []
}
Telemetry.emit_process_mailbox_depth(:subscription)
{:ok, state}
else
{:error, reason} -> {:stop, reason}
@@ -72,20 +74,27 @@ defmodule Parrhesia.API.Stream.Subscription do
end)
{:reply, :ok, %__MODULE__{state | ready?: true, buffered_events: []}}
|> emit_mailbox_depth()
end
@impl true
def handle_info({:fanout_event, subscription_id, event}, %__MODULE__{} = state)
when is_binary(subscription_id) and is_map(event) do
handle_fanout_event(state, subscription_id, event)
state
|> handle_fanout_event(subscription_id, event)
|> emit_mailbox_depth()
end
def handle_info({:DOWN, monitor_ref, :process, subscriber, _reason}, %__MODULE__{} = state)
when monitor_ref == state.subscriber_monitor_ref and subscriber == state.subscriber do
{:stop, :normal, state}
|> emit_mailbox_depth()
end
def handle_info(_message, %__MODULE__{} = state), do: {:noreply, state}
def handle_info(_message, %__MODULE__{} = state) do
{:noreply, state}
|> emit_mailbox_depth()
end
@impl true
def terminate(reason, %__MODULE__{} = state) do
@@ -175,4 +184,9 @@ defmodule Parrhesia.API.Stream.Subscription do
{:noreply, %__MODULE__{state | buffered_events: buffered_events}}
end
end
defp emit_mailbox_depth(result) do
Telemetry.emit_process_mailbox_depth(:subscription)
result
end
end

View File

@@ -0,0 +1,45 @@
defmodule Parrhesia.PostgresRepos do
@moduledoc false
alias Parrhesia.Config
alias Parrhesia.ReadRepo
alias Parrhesia.Repo
@spec write() :: module()
def write, do: Repo
@spec read() :: module()
def read do
if separate_read_pool_enabled?() and is_pid(Process.whereis(ReadRepo)) do
ReadRepo
else
Repo
end
end
@spec started_repos() :: [module()]
def started_repos do
if separate_read_pool_enabled?() do
[Repo, ReadRepo]
else
[Repo]
end
end
@spec separate_read_pool_enabled?() :: boolean()
def separate_read_pool_enabled? do
case Process.whereis(Config) do
pid when is_pid(pid) ->
Config.get([:database, :separate_read_pool?], application_default())
nil ->
application_default()
end
end
defp application_default do
:parrhesia
|> Application.get_env(:database, [])
|> Keyword.get(:separate_read_pool?, false)
end
end

View File

@@ -0,0 +1,9 @@
defmodule Parrhesia.ReadRepo do
@moduledoc """
PostgreSQL repository dedicated to read-heavy workloads when a separate read pool is enabled.
"""
use Ecto.Repo,
otp_app: :parrhesia,
adapter: Ecto.Adapters.Postgres
end

View File

@@ -5,6 +5,7 @@ defmodule Parrhesia.Storage.Adapters.Postgres.ACL do
import Ecto.Query
alias Parrhesia.PostgresRepos
alias Parrhesia.Repo
@behaviour Parrhesia.Storage.ACL
@@ -74,7 +75,8 @@ defmodule Parrhesia.Storage.Adapters.Postgres.ACL do
|> maybe_filter_principal(Keyword.get(opts, :principal))
|> maybe_filter_capability(Keyword.get(opts, :capability))
{:ok, Enum.map(Repo.all(query), &normalize_persisted_rule/1)}
repo = read_repo()
{:ok, Enum.map(repo.all(query), &normalize_persisted_rule/1)}
end
def list_rules(_context, _opts), do: {:error, :invalid_opts}
@@ -133,12 +135,16 @@ defmodule Parrhesia.Storage.Adapters.Postgres.ACL do
}
)
case Repo.one(query) do
repo = read_repo()
case repo.one(query) do
nil -> nil
stored_rule -> normalize_persisted_rule(stored_rule)
end
end
defp read_repo, do: PostgresRepos.read()
defp insert_rule(normalized_rule) do
now = DateTime.utc_now() |> DateTime.truncate(:microsecond)

View File

@@ -5,6 +5,7 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Admin do
import Ecto.Query
alias Parrhesia.PostgresRepos
alias Parrhesia.Repo
@behaviour Parrhesia.Storage.Admin
@@ -73,8 +74,8 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Admin do
|> maybe_filter_actor_pubkey(Keyword.get(opts, :actor_pubkey))
logs =
query
|> Repo.all()
read_repo()
|> then(fn repo -> repo.all(query) end)
|> Enum.map(&to_audit_log_map/1)
{:ok, logs}
@@ -83,11 +84,12 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Admin do
def list_audit_logs(_context, _opts), do: {:error, :invalid_opts}
defp relay_stats do
events_count = Repo.aggregate("events", :count, :id)
banned_pubkeys = Repo.aggregate("banned_pubkeys", :count, :pubkey)
allowed_pubkeys = Repo.aggregate("allowed_pubkeys", :count, :pubkey)
blocked_ips = Repo.aggregate("blocked_ips", :count, :ip)
acl_rules = Repo.aggregate("acl_rules", :count, :id)
repo = read_repo()
events_count = repo.aggregate("events", :count, :id)
banned_pubkeys = repo.aggregate("banned_pubkeys", :count, :pubkey)
allowed_pubkeys = repo.aggregate("allowed_pubkeys", :count, :pubkey)
blocked_ips = repo.aggregate("blocked_ips", :count, :ip)
acl_rules = repo.aggregate("acl_rules", :count, :id)
%{
"events" => events_count,
@@ -234,6 +236,8 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Admin do
defp normalize_pubkey(_value), do: {:error, :invalid_actor_pubkey}
defp read_repo, do: PostgresRepos.read()
defp invalid_key_reason(:params), do: :invalid_params
defp invalid_key_reason(:result), do: :invalid_result

View File

@@ -5,6 +5,7 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Events do
import Ecto.Query
alias Parrhesia.PostgresRepos
alias Parrhesia.Protocol.Filter
alias Parrhesia.Repo
@@ -67,7 +68,9 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Events do
}
)
case Repo.one(event_query) do
repo = read_repo()
case repo.one(event_query) do
nil ->
{:ok, nil}
@@ -81,13 +84,14 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Events do
def query(_context, filters, opts) when is_list(opts) do
with :ok <- Filter.validate_filters(filters) do
now = Keyword.get(opts, :now, System.system_time(:second))
repo = read_repo()
persisted_events =
filters
|> Enum.flat_map(fn filter ->
filter
|> event_query_for_filter(now, opts)
|> Repo.all()
|> repo.all()
end)
|> deduplicate_events()
|> sort_persisted_events(filters)
@@ -365,30 +369,7 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Events do
defp maybe_upsert_replaceable_state(normalized_event, now, deleted_at) do
if replaceable_kind?(normalized_event.kind) do
lookup_query =
from(state in "replaceable_event_state",
where:
state.pubkey == ^normalized_event.pubkey and state.kind == ^normalized_event.kind,
select: %{event_created_at: state.event_created_at, event_id: state.event_id}
)
update_query =
from(state in "replaceable_event_state",
where:
state.pubkey == ^normalized_event.pubkey and
state.kind == ^normalized_event.kind
)
upsert_state_table(
"replaceable_event_state",
lookup_query,
update_query,
replaceable_state_row(normalized_event, now),
normalized_event,
now,
deleted_at,
:replaceable_state_update_failed
)
upsert_replaceable_state_table(normalized_event, now, deleted_at)
else
:ok
end
@@ -396,159 +377,94 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Events do
defp maybe_upsert_addressable_state(normalized_event, now, deleted_at) do
if addressable_kind?(normalized_event.kind) do
lookup_query =
from(state in "addressable_event_state",
where:
state.pubkey == ^normalized_event.pubkey and
state.kind == ^normalized_event.kind and
state.d_tag == ^normalized_event.d_tag,
select: %{event_created_at: state.event_created_at, event_id: state.event_id}
)
update_query =
from(state in "addressable_event_state",
where:
state.pubkey == ^normalized_event.pubkey and
state.kind == ^normalized_event.kind and
state.d_tag == ^normalized_event.d_tag
)
upsert_state_table(
"addressable_event_state",
lookup_query,
update_query,
addressable_state_row(normalized_event, now),
normalized_event,
now,
deleted_at,
:addressable_state_update_failed
)
upsert_addressable_state_table(normalized_event, now, deleted_at)
else
:ok
end
end
defp upsert_state_table(
table_name,
lookup_query,
update_query,
insert_row,
normalized_event,
now,
deleted_at,
failure_reason
) do
case Repo.one(lookup_query) do
nil ->
insert_state_or_resolve_race(
table_name,
lookup_query,
update_query,
insert_row,
normalized_event,
now,
deleted_at,
failure_reason
)
defp upsert_replaceable_state_table(normalized_event, now, deleted_at) do
params = [
normalized_event.pubkey,
normalized_event.kind,
normalized_event.created_at,
normalized_event.id,
now,
now
]
current_state ->
maybe_update_state(
update_query,
normalized_event,
current_state,
now,
deleted_at,
failure_reason
)
case Repo.query(replaceable_state_upsert_sql(), params) do
{:ok, %{rows: [row]}} ->
finalize_state_upsert(row, normalized_event, deleted_at, :replaceable_state_update_failed)
{:ok, _result} ->
Repo.rollback(:replaceable_state_update_failed)
{:error, _reason} ->
Repo.rollback(:replaceable_state_update_failed)
end
end
defp insert_state_or_resolve_race(
table_name,
lookup_query,
update_query,
insert_row,
defp upsert_addressable_state_table(normalized_event, now, deleted_at) do
params = [
normalized_event.pubkey,
normalized_event.kind,
normalized_event.d_tag,
normalized_event.created_at,
normalized_event.id,
now,
now
]
case Repo.query(addressable_state_upsert_sql(), params) do
{:ok, %{rows: [row]}} ->
finalize_state_upsert(row, normalized_event, deleted_at, :addressable_state_update_failed)
{:ok, _result} ->
Repo.rollback(:addressable_state_update_failed)
{:error, _reason} ->
Repo.rollback(:addressable_state_update_failed)
end
end
defp finalize_state_upsert(
[retired_event_created_at, retired_event_id, winner_event_created_at, winner_event_id],
normalized_event,
now,
deleted_at,
failure_reason
) do
case Repo.insert_all(table_name, [insert_row], on_conflict: :nothing) do
{1, _result} ->
:ok
{0, _result} ->
resolve_state_race(
lookup_query,
update_query,
normalized_event,
now,
case {winner_event_created_at, winner_event_id} do
{created_at, event_id}
when created_at == normalized_event.created_at and event_id == normalized_event.id ->
maybe_retire_previous_state_event(
retired_event_created_at,
retired_event_id,
deleted_at,
failure_reason
)
{_inserted, _result} ->
Repo.rollback(failure_reason)
end
end
defp resolve_state_race(
lookup_query,
update_query,
normalized_event,
now,
deleted_at,
failure_reason
) do
case Repo.one(lookup_query) do
nil ->
Repo.rollback(failure_reason)
current_state ->
maybe_update_state(
update_query,
normalized_event,
current_state,
now,
deleted_at,
failure_reason
)
end
end
defp maybe_update_state(
update_query,
normalized_event,
current_state,
now,
deleted_at,
failure_reason
) do
if candidate_wins_state?(normalized_event, current_state) do
{updated, _result} =
Repo.update_all(update_query,
set: [
event_created_at: normalized_event.created_at,
event_id: normalized_event.id,
updated_at: now
]
)
if updated == 1 do
{_created_at, _event_id} ->
retire_event!(
current_state.event_created_at,
current_state.event_id,
normalized_event.created_at,
normalized_event.id,
deleted_at,
failure_reason
)
else
Repo.rollback(failure_reason)
end
else
retire_event!(normalized_event.created_at, normalized_event.id, deleted_at, failure_reason)
end
end
defp maybe_retire_previous_state_event(nil, nil, _deleted_at, _failure_reason), do: :ok
defp maybe_retire_previous_state_event(
retired_event_created_at,
retired_event_id,
deleted_at,
failure_reason
) do
retire_event!(retired_event_created_at, retired_event_id, deleted_at, failure_reason)
end
defp retire_event!(event_created_at, event_id, deleted_at, failure_reason) do
{updated, _result} =
Repo.update_all(
@@ -572,27 +488,147 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Events do
defp addressable_kind?(kind), do: kind >= 30_000 and kind < 40_000
defp replaceable_state_row(normalized_event, now) do
%{
pubkey: normalized_event.pubkey,
kind: normalized_event.kind,
event_created_at: normalized_event.created_at,
event_id: normalized_event.id,
inserted_at: now,
updated_at: now
}
defp replaceable_state_upsert_sql do
"""
WITH inserted AS (
INSERT INTO replaceable_event_state (
pubkey,
kind,
event_created_at,
event_id,
inserted_at,
updated_at
)
VALUES ($1, $2, $3, $4, $5, $6)
ON CONFLICT (pubkey, kind) DO NOTHING
RETURNING
NULL::bigint AS retired_event_created_at,
NULL::bytea AS retired_event_id,
event_created_at AS winner_event_created_at,
event_id AS winner_event_id
),
updated AS (
UPDATE replaceable_event_state AS state
SET
event_created_at = $3,
event_id = $4,
updated_at = $6
FROM (
SELECT current.event_created_at, current.event_id
FROM replaceable_event_state AS current
WHERE current.pubkey = $1 AND current.kind = $2
FOR UPDATE
) AS previous
WHERE
NOT EXISTS (SELECT 1 FROM inserted)
AND state.pubkey = $1
AND state.kind = $2
AND (
state.event_created_at < $3
OR (state.event_created_at = $3 AND state.event_id > $4)
)
RETURNING
previous.event_created_at AS retired_event_created_at,
previous.event_id AS retired_event_id,
state.event_created_at AS winner_event_created_at,
state.event_id AS winner_event_id
),
current AS (
SELECT
NULL::bigint AS retired_event_created_at,
NULL::bytea AS retired_event_id,
state.event_created_at AS winner_event_created_at,
state.event_id AS winner_event_id
FROM replaceable_event_state AS state
WHERE
NOT EXISTS (SELECT 1 FROM inserted)
AND NOT EXISTS (SELECT 1 FROM updated)
AND state.pubkey = $1
AND state.kind = $2
)
SELECT *
FROM inserted
UNION ALL
SELECT *
FROM updated
UNION ALL
SELECT *
FROM current
LIMIT 1
"""
end
defp addressable_state_row(normalized_event, now) do
%{
pubkey: normalized_event.pubkey,
kind: normalized_event.kind,
d_tag: normalized_event.d_tag,
event_created_at: normalized_event.created_at,
event_id: normalized_event.id,
inserted_at: now,
updated_at: now
}
defp addressable_state_upsert_sql do
"""
WITH inserted AS (
INSERT INTO addressable_event_state (
pubkey,
kind,
d_tag,
event_created_at,
event_id,
inserted_at,
updated_at
)
VALUES ($1, $2, $3, $4, $5, $6, $7)
ON CONFLICT (pubkey, kind, d_tag) DO NOTHING
RETURNING
NULL::bigint AS retired_event_created_at,
NULL::bytea AS retired_event_id,
event_created_at AS winner_event_created_at,
event_id AS winner_event_id
),
updated AS (
UPDATE addressable_event_state AS state
SET
event_created_at = $4,
event_id = $5,
updated_at = $7
FROM (
SELECT current.event_created_at, current.event_id
FROM addressable_event_state AS current
WHERE current.pubkey = $1 AND current.kind = $2 AND current.d_tag = $3
FOR UPDATE
) AS previous
WHERE
NOT EXISTS (SELECT 1 FROM inserted)
AND state.pubkey = $1
AND state.kind = $2
AND state.d_tag = $3
AND (
state.event_created_at < $4
OR (state.event_created_at = $4 AND state.event_id > $5)
)
RETURNING
previous.event_created_at AS retired_event_created_at,
previous.event_id AS retired_event_id,
state.event_created_at AS winner_event_created_at,
state.event_id AS winner_event_id
),
current AS (
SELECT
NULL::bigint AS retired_event_created_at,
NULL::bytea AS retired_event_id,
state.event_created_at AS winner_event_created_at,
state.event_id AS winner_event_id
FROM addressable_event_state AS state
WHERE
NOT EXISTS (SELECT 1 FROM inserted)
AND NOT EXISTS (SELECT 1 FROM updated)
AND state.pubkey = $1
AND state.kind = $2
AND state.d_tag = $3
)
SELECT *
FROM inserted
UNION ALL
SELECT *
FROM updated
UNION ALL
SELECT *
FROM current
LIMIT 1
"""
end
defp event_row(normalized_event, now) do
@@ -683,45 +719,57 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Events do
end
defp fetch_event_refs([filter], now, opts) do
filter
|> event_ref_query_for_filter(now, opts)
|> maybe_limit_query(Keyword.get(opts, :limit))
|> Repo.all()
query =
filter
|> event_ref_query_for_filter(now, opts)
|> maybe_limit_query(Keyword.get(opts, :limit))
read_repo()
|> then(fn repo -> repo.all(query) end)
end
defp fetch_event_refs(filters, now, opts) do
filters
|> event_ref_union_query_for_filters(now, opts)
|> subquery()
|> then(fn union_query ->
from(ref in union_query,
group_by: [ref.created_at, ref.id],
order_by: [asc: ref.created_at, asc: ref.id],
select: %{created_at: ref.created_at, id: ref.id}
)
end)
|> maybe_limit_query(Keyword.get(opts, :limit))
|> Repo.all()
query =
filters
|> event_ref_union_query_for_filters(now, opts)
|> subquery()
|> then(fn union_query ->
from(ref in union_query,
group_by: [ref.created_at, ref.id],
order_by: [asc: ref.created_at, asc: ref.id],
select: %{created_at: ref.created_at, id: ref.id}
)
end)
|> maybe_limit_query(Keyword.get(opts, :limit))
read_repo()
|> then(fn repo -> repo.all(query) end)
end
defp count_events([filter], now, opts) do
filter
|> event_id_query_for_filter(now, opts)
|> subquery()
|> then(fn query ->
from(event in query, select: count())
end)
|> Repo.one()
query =
filter
|> event_id_query_for_filter(now, opts)
|> subquery()
|> then(fn query ->
from(event in query, select: count())
end)
read_repo()
|> then(fn repo -> repo.one(query) end)
end
defp count_events(filters, now, opts) do
filters
|> event_id_distinct_union_query_for_filters(now, opts)
|> subquery()
|> then(fn union_query ->
from(event in union_query, select: count())
end)
|> Repo.one()
query =
filters
|> event_id_distinct_union_query_for_filters(now, opts)
|> subquery()
|> then(fn union_query ->
from(event in union_query, select: count())
end)
read_repo()
|> then(fn repo -> repo.one(query) end)
end
defp event_source_query(filter, now) do
@@ -1195,4 +1243,6 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Events do
end
defp maybe_apply_mls_group_retention(expires_at, _kind, _created_at), do: expires_at
defp read_repo, do: PostgresRepos.read()
end

View File

@@ -5,6 +5,7 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Groups do
import Ecto.Query
alias Parrhesia.PostgresRepos
alias Parrhesia.Repo
@behaviour Parrhesia.Storage.Groups
@@ -46,7 +47,9 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Groups do
limit: 1
)
case Repo.one(query) do
repo = read_repo()
case repo.one(query) do
nil ->
{:ok, nil}
@@ -94,8 +97,8 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Groups do
)
memberships =
query
|> Repo.all()
read_repo()
|> then(fn repo -> repo.all(query) end)
|> Enum.map(fn membership ->
to_membership_map(
membership.group_id,
@@ -163,8 +166,8 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Groups do
)
roles =
query
|> Repo.all()
read_repo()
|> then(fn repo -> repo.all(query) end)
|> Enum.map(fn role ->
to_role_map(role.group_id, role.pubkey, role.role, role.metadata)
end)
@@ -242,6 +245,7 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Groups do
defp unwrap_transaction_result({:ok, result}), do: {:ok, result}
defp unwrap_transaction_result({:error, reason}), do: {:error, reason}
defp read_repo, do: PostgresRepos.read()
defp fetch_required_string(map, key) do
map

View File

@@ -5,6 +5,7 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Moderation do
import Ecto.Query
alias Parrhesia.PostgresRepos
alias Parrhesia.Repo
@behaviour Parrhesia.Storage.Moderation
@@ -212,7 +213,8 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Moderation do
select: field(record, ^field)
)
Repo.all(query)
read_repo()
|> then(fn repo -> repo.all(query) end)
end
defp cache_put(scope, value) do
@@ -266,7 +268,9 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Moderation do
limit: 1
)
Repo.one(query) == 1
read_repo()
|> then(fn repo -> repo.one(query) end)
|> Kernel.==(1)
end
defp scope_populated_db?(table, field) do
@@ -276,7 +280,10 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Moderation do
limit: 1
)
not is_nil(Repo.one(query))
read_repo()
|> then(fn repo -> repo.one(query) end)
|> is_nil()
|> Kernel.not()
end
defp normalize_hex_or_binary(value, expected_bytes, _reason)
@@ -315,4 +322,6 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Moderation do
defp to_inet({_, _, _, _, _, _, _, _} = ip_tuple),
do: %Postgrex.INET{address: ip_tuple, netmask: 128}
defp read_repo, do: PostgresRepos.read()
end

View File

@@ -5,6 +5,7 @@ defmodule Parrhesia.Storage.Partitions do
import Ecto.Query
alias Parrhesia.PostgresRepos
alias Parrhesia.Repo
@identifier_pattern ~r/^[a-zA-Z_][a-zA-Z0-9_]*$/
@@ -35,7 +36,8 @@ defmodule Parrhesia.Storage.Partitions do
order_by: [asc: table.tablename]
)
Repo.all(query)
read_repo()
|> then(fn repo -> repo.all(query) end)
end
@doc """
@@ -88,7 +90,9 @@ defmodule Parrhesia.Storage.Partitions do
"""
@spec database_size_bytes() :: {:ok, non_neg_integer()} | {:error, term()}
def database_size_bytes do
case Repo.query("SELECT pg_database_size(current_database())") do
repo = read_repo()
case repo.query("SELECT pg_database_size(current_database())") do
{:ok, %{rows: [[size]]}} when is_integer(size) and size >= 0 -> {:ok, size}
{:ok, _result} -> {:error, :unexpected_result}
{:error, reason} -> {:error, reason}
@@ -219,7 +223,9 @@ defmodule Parrhesia.Storage.Partitions do
LIMIT 1
"""
case Repo.query(query, [partition_name, parent_table_name]) do
repo = read_repo()
case repo.query(query, [partition_name, parent_table_name]) do
{:ok, %{rows: [[1]]}} -> true
{:ok, %{rows: []}} -> false
{:ok, _result} -> false
@@ -278,6 +284,8 @@ defmodule Parrhesia.Storage.Partitions do
|> DateTime.to_unix()
end
defp read_repo, do: PostgresRepos.read()
defp month_start(%Date{} = date), do: Date.new!(date.year, date.month, 1)
defp shift_month(%Date{} = date, month_delta) when is_integer(month_delta) do

View File

@@ -5,17 +5,19 @@ defmodule Parrhesia.Storage.Supervisor do
use Supervisor
alias Parrhesia.PostgresRepos
def start_link(init_arg \\ []) do
Supervisor.start_link(__MODULE__, init_arg, name: __MODULE__)
end
@impl true
def init(_init_arg) do
children = [
{Parrhesia.Storage.Adapters.Postgres.ModerationCache,
name: Parrhesia.Storage.Adapters.Postgres.ModerationCache},
Parrhesia.Repo
]
children =
[
{Parrhesia.Storage.Adapters.Postgres.ModerationCache,
name: Parrhesia.Storage.Adapters.Postgres.ModerationCache}
] ++ PostgresRepos.started_repos()
Supervisor.init(children, strategy: :one_for_one)
end

View File

@@ -80,6 +80,13 @@ defmodule Parrhesia.Telemetry do
tags: [:traffic_class],
tag_values: &traffic_class_tag_values/1
),
last_value("parrhesia.process.mailbox.depth",
event_name: [:parrhesia, :process, :mailbox],
measurement: :depth,
tags: [:process_type],
tag_values: &process_tag_values/1,
reporter_options: [prometheus_type: :gauge]
),
last_value("parrhesia.vm.memory.total.bytes",
event_name: [:parrhesia, :vm, :memory],
measurement: :total,
@@ -95,6 +102,22 @@ defmodule Parrhesia.Telemetry do
:telemetry.execute(event_name, measurements, metadata)
end
@spec emit_process_mailbox_depth(atom(), map()) :: :ok
def emit_process_mailbox_depth(process_type, metadata \\ %{})
when is_atom(process_type) and is_map(metadata) do
case Process.info(self(), :message_queue_len) do
{:message_queue_len, depth} ->
emit(
[:parrhesia, :process, :mailbox],
%{depth: depth},
Map.put(metadata, :process_type, process_type)
)
nil ->
:ok
end
end
defp periodic_measurements do
[
{__MODULE__, :emit_vm_memory, []}
@@ -111,4 +134,9 @@ defmodule Parrhesia.Telemetry do
traffic_class = metadata |> Map.get(:traffic_class, :generic) |> to_string()
%{traffic_class: traffic_class}
end
defp process_tag_values(metadata) do
process_type = metadata |> Map.get(:process_type, :unknown) |> to_string()
%{process_type: process_type}
end
end

View File

@@ -111,6 +111,7 @@ defmodule Parrhesia.Web.Connection do
@impl true
def init(opts) do
maybe_configure_exit_trapping(opts)
auth_challenges = auth_challenges(opts)
state = %__MODULE__{
@@ -136,29 +137,33 @@ defmodule Parrhesia.Web.Connection do
auth_max_age_seconds: auth_max_age_seconds(opts)
}
Telemetry.emit_process_mailbox_depth(:connection)
{:ok, state}
end
@impl true
def handle_in({payload, [opcode: :text]}, %__MODULE__{} = state) do
if byte_size(payload) > state.max_frame_bytes do
response =
Protocol.encode_relay({
:notice,
"invalid: websocket frame exceeds max frame size"
})
result =
if byte_size(payload) > state.max_frame_bytes do
response =
Protocol.encode_relay({
:notice,
"invalid: websocket frame exceeds max frame size"
})
{:push, {:text, response}, state}
else
case Protocol.decode_client(payload) do
{:ok, decoded_message} ->
handle_decoded_message(decoded_message, state)
{:push, {:text, response}, state}
else
case Protocol.decode_client(payload) do
{:ok, decoded_message} ->
handle_decoded_message(decoded_message, state)
{:error, reason} ->
response = Protocol.encode_relay({:notice, Protocol.decode_error_notice(reason)})
{:push, {:text, response}, state}
{:error, reason} ->
response = Protocol.encode_relay({:notice, Protocol.decode_error_notice(reason)})
{:push, {:text, response}, state}
end
end
end
emit_connection_mailbox_depth(result)
end
@impl true
@@ -167,6 +172,7 @@ defmodule Parrhesia.Web.Connection do
Protocol.encode_relay({:notice, "invalid: binary websocket frames are not supported"})
{:push, {:text, response}, state}
|> emit_connection_mailbox_depth()
end
defp handle_decoded_message({:event, event}, state), do: handle_event_ingest(state, event)
@@ -211,8 +217,10 @@ defmodule Parrhesia.Web.Connection do
when is_reference(ref) and is_binary(subscription_id) and is_map(event) do
if current_subscription_ref?(state, subscription_id, ref) do
handle_fanout_events(state, [{subscription_id, event}])
|> emit_connection_mailbox_depth()
else
{:ok, state}
|> emit_connection_mailbox_depth()
end
end
@@ -224,9 +232,12 @@ defmodule Parrhesia.Web.Connection do
if current_subscription_ref?(state, subscription_id, ref) and
not subscription_eose_sent?(state, subscription_id) do
response = Protocol.encode_relay({:eose, subscription_id})
{:push, {:text, response}, mark_subscription_eose_sent(state, subscription_id)}
|> emit_connection_mailbox_depth()
else
{:ok, state}
|> emit_connection_mailbox_depth()
end
end
@@ -242,20 +253,25 @@ defmodule Parrhesia.Web.Connection do
|> drop_queued_subscription_events(subscription_id)
response = Protocol.encode_relay({:closed, subscription_id, stream_closed_reason(reason)})
{:push, {:text, response}, next_state}
|> emit_connection_mailbox_depth()
else
{:ok, state}
|> emit_connection_mailbox_depth()
end
end
def handle_info({:fanout_event, subscription_id, event}, %__MODULE__{} = state)
when is_binary(subscription_id) and is_map(event) do
handle_fanout_events(state, [{subscription_id, event}])
|> emit_connection_mailbox_depth()
end
def handle_info({:fanout_events, fanout_events}, %__MODULE__{} = state)
when is_list(fanout_events) do
handle_fanout_events(state, fanout_events)
|> emit_connection_mailbox_depth()
end
def handle_info(@drain_outbound_queue, %__MODULE__{} = state) do
@@ -263,13 +279,26 @@ defmodule Parrhesia.Web.Connection do
if frames == [] do
{:ok, next_state}
|> emit_connection_mailbox_depth()
else
{:push, frames, next_state}
|> emit_connection_mailbox_depth()
end
end
def handle_info({:EXIT, _from, :shutdown}, %__MODULE__{} = state) do
close_with_drained_outbound_frames(state)
|> emit_connection_mailbox_depth()
end
def handle_info({:EXIT, _from, {:shutdown, _detail}}, %__MODULE__{} = state) do
close_with_drained_outbound_frames(state)
|> emit_connection_mailbox_depth()
end
def handle_info(_message, %__MODULE__{} = state) do
{:ok, state}
|> emit_connection_mailbox_depth()
end
@impl true
@@ -988,6 +1017,11 @@ defmodule Parrhesia.Web.Connection do
{:stop, :normal, {1008, message}, [{:text, notice}], state}
end
defp close_with_drained_outbound_frames(state) do
{frames, next_state} = drain_all_outbound_frames(state)
{:stop, :normal, {1012, "service restart"}, frames, next_state}
end
defp enqueue_fanout_events(state, fanout_events) do
Enum.reduce_while(fanout_events, {:ok, state}, fn
{subscription_id, event}, {:ok, acc} when is_binary(subscription_id) and is_map(event) ->
@@ -1094,9 +1128,37 @@ defmodule Parrhesia.Web.Connection do
{Enum.reverse(frames), next_state}
end
defp drain_all_outbound_frames(%__MODULE__{} = state) do
{frames, next_queue, remaining_size} =
pop_frames(state.outbound_queue, state.outbound_queue_size, :infinity, [])
next_state =
%__MODULE__{
state
| outbound_queue: next_queue,
outbound_queue_size: remaining_size,
drain_scheduled?: false
}
emit_outbound_queue_depth(next_state)
{Enum.reverse(frames), next_state}
end
defp pop_frames(queue, queue_size, _remaining_batch, acc) when queue_size == 0,
do: {acc, queue, queue_size}
defp pop_frames(queue, queue_size, :infinity, acc) do
case :queue.out(queue) do
{{:value, {subscription_id, event}}, next_queue} ->
frame = {:text, Protocol.encode_relay({:event, subscription_id, event})}
pop_frames(next_queue, queue_size - 1, :infinity, [frame | acc])
{:empty, _same_queue} ->
{acc, :queue.new(), 0}
end
end
defp pop_frames(queue, queue_size, remaining_batch, acc) when remaining_batch <= 0,
do: {acc, queue, queue_size}
@@ -1145,6 +1207,11 @@ defmodule Parrhesia.Web.Connection do
end
end
defp emit_connection_mailbox_depth(result) do
Telemetry.emit_process_mailbox_depth(:connection)
result
end
defp ensure_subscription_capacity(%__MODULE__{} = state, subscription_id) do
cond do
Map.has_key?(state.subscriptions, subscription_id) ->
@@ -1641,6 +1708,18 @@ defmodule Parrhesia.Web.Connection do
|> Keyword.get(:auth_max_age_seconds, @default_auth_max_age_seconds)
end
defp maybe_configure_exit_trapping(opts) do
if trap_exit?(opts) do
Process.flag(:trap_exit, true)
end
:ok
end
defp trap_exit?(opts) when is_list(opts), do: Keyword.get(opts, :trap_exit?, true)
defp trap_exit?(opts) when is_map(opts), do: Map.get(opts, :trap_exit?, true)
defp trap_exit?(_opts), do: true
defp request_context(%__MODULE__{} = state, subscription_id \\ nil) do
%RequestContext{
authenticated_pubkeys: state.authenticated_pubkeys,

View File

@@ -1,12 +1,14 @@
defmodule Parrhesia.Web.Readiness do
@moduledoc false
alias Parrhesia.PostgresRepos
@spec ready?() :: boolean()
def ready? do
process_ready?(Parrhesia.Subscriptions.Index) and
process_ready?(Parrhesia.Auth.Challenges) and
negentropy_ready?() and
process_ready?(Parrhesia.Repo)
repos_ready?()
end
defp negentropy_ready? do
@@ -29,4 +31,8 @@ defmodule Parrhesia.Web.Readiness do
nil -> false
end
end
defp repos_ready? do
Enum.all?(PostgresRepos.started_repos(), &process_ready?/1)
end
end