diff --git a/README.md b/README.md index 63e0437..ad71f22 100644 --- a/README.md +++ b/README.md @@ -210,6 +210,16 @@ CSV env vars use comma-separated values. Boolean env vars accept `1/0`, `true/fa | `:queue_interval` | `DB_QUEUE_INTERVAL_MS` | `5000` | Ecto queue interval in ms | | `:types` | `-` | `Parrhesia.PostgresTypes` | Internal config-file setting | +#### `Parrhesia.ReadRepo` + +| Atom key | ENV | Default | Notes | +| --- | --- | --- | --- | +| `:url` | `DATABASE_URL` | required | Shares the primary DB URL with the write repo | +| `:pool_size` | `DB_READ_POOL_SIZE` | `32` | Read-only query pool size | +| `:queue_target` | `DB_READ_QUEUE_TARGET_MS` | `1000` | Read pool Ecto queue target in ms | +| `:queue_interval` | `DB_READ_QUEUE_INTERVAL_MS` | `5000` | Read pool Ecto queue interval in ms | +| `:types` | `-` | `Parrhesia.PostgresTypes` | Internal config-file setting | + #### `:listeners` | Atom key | ENV | Default | Notes | diff --git a/config/config.exs b/config/config.exs index 79f1299..5d93cae 100644 --- a/config/config.exs +++ b/config/config.exs @@ -3,6 +3,9 @@ import Config config :postgrex, :json_library, JSON config :parrhesia, + database: [ + separate_read_pool?: config_env() != :test + ], moderation_cache_enabled: true, relay_url: "ws://localhost:4413/relay", nip43: [ @@ -120,6 +123,7 @@ config :parrhesia, ] config :parrhesia, Parrhesia.Repo, types: Parrhesia.PostgresTypes +config :parrhesia, Parrhesia.ReadRepo, types: Parrhesia.PostgresTypes config :parrhesia, ecto_repos: [Parrhesia.Repo] diff --git a/config/dev.exs b/config/dev.exs index b2afdef..9ac1e47 100644 --- a/config/dev.exs +++ b/config/dev.exs @@ -23,3 +23,13 @@ config :parrhesia, show_sensitive_data_on_connection_error: true, pool_size: 10 ] ++ repo_host_opts + +config :parrhesia, + Parrhesia.ReadRepo, + [ + username: System.get_env("PGUSER") || System.get_env("USER") || "agent", + password: System.get_env("PGPASSWORD"), + database: System.get_env("PGDATABASE") || "parrhesia_dev", + show_sensitive_data_on_connection_error: true, + pool_size: 10 + ] ++ repo_host_opts diff --git a/config/prod.exs b/config/prod.exs index 83ada1c..94e6e8c 100644 --- a/config/prod.exs +++ b/config/prod.exs @@ -5,4 +5,9 @@ config :parrhesia, Parrhesia.Repo, queue_target: 1_000, queue_interval: 5_000 +config :parrhesia, Parrhesia.ReadRepo, + pool_size: 32, + queue_target: 1_000, + queue_interval: 5_000 + # Production runtime configuration lives in config/runtime.exs. diff --git a/config/runtime.exs b/config/runtime.exs index 9d9c800..7932fcb 100644 --- a/config/runtime.exs +++ b/config/runtime.exs @@ -130,6 +130,7 @@ if config_env() == :prod do raise "environment variable DATABASE_URL is missing. Example: ecto://USER:PASS@HOST/DATABASE" repo_defaults = Application.get_env(:parrhesia, Parrhesia.Repo, []) + read_repo_defaults = Application.get_env(:parrhesia, Parrhesia.ReadRepo, []) relay_url_default = Application.get_env(:parrhesia, :relay_url) moderation_cache_enabled_default = @@ -148,10 +149,18 @@ if config_env() == :prod do default_pool_size = Keyword.get(repo_defaults, :pool_size, 32) default_queue_target = Keyword.get(repo_defaults, :queue_target, 1_000) default_queue_interval = Keyword.get(repo_defaults, :queue_interval, 5_000) + default_read_pool_size = Keyword.get(read_repo_defaults, :pool_size, default_pool_size) + default_read_queue_target = Keyword.get(read_repo_defaults, :queue_target, default_queue_target) + + default_read_queue_interval = + Keyword.get(read_repo_defaults, :queue_interval, default_queue_interval) pool_size = int_env.("POOL_SIZE", default_pool_size) queue_target = int_env.("DB_QUEUE_TARGET_MS", default_queue_target) queue_interval = int_env.("DB_QUEUE_INTERVAL_MS", default_queue_interval) + read_pool_size = int_env.("DB_READ_POOL_SIZE", default_read_pool_size) + read_queue_target = int_env.("DB_READ_QUEUE_TARGET_MS", default_read_queue_target) + read_queue_interval = int_env.("DB_READ_QUEUE_INTERVAL_MS", default_read_queue_interval) limits = [ max_frame_bytes: @@ -629,6 +638,12 @@ if config_env() == :prod do queue_target: queue_target, queue_interval: queue_interval + config :parrhesia, Parrhesia.ReadRepo, + url: database_url, + pool_size: read_pool_size, + queue_target: read_queue_target, + queue_interval: read_queue_interval + config :parrhesia, relay_url: string_env.("PARRHESIA_RELAY_URL", relay_url_default), acl: [ diff --git a/lib/parrhesia/api/stream/subscription.ex b/lib/parrhesia/api/stream/subscription.ex index f66622b..b04cf0a 100644 --- a/lib/parrhesia/api/stream/subscription.ex +++ b/lib/parrhesia/api/stream/subscription.ex @@ -5,6 +5,7 @@ defmodule Parrhesia.API.Stream.Subscription do alias Parrhesia.Protocol.Filter alias Parrhesia.Subscriptions.Index + alias Parrhesia.Telemetry defstruct [ :ref, @@ -57,6 +58,7 @@ defmodule Parrhesia.API.Stream.Subscription do buffered_events: [] } + Telemetry.emit_process_mailbox_depth(:subscription) {:ok, state} else {:error, reason} -> {:stop, reason} @@ -72,20 +74,27 @@ defmodule Parrhesia.API.Stream.Subscription do end) {:reply, :ok, %__MODULE__{state | ready?: true, buffered_events: []}} + |> emit_mailbox_depth() end @impl true def handle_info({:fanout_event, subscription_id, event}, %__MODULE__{} = state) when is_binary(subscription_id) and is_map(event) do - handle_fanout_event(state, subscription_id, event) + state + |> handle_fanout_event(subscription_id, event) + |> emit_mailbox_depth() end def handle_info({:DOWN, monitor_ref, :process, subscriber, _reason}, %__MODULE__{} = state) when monitor_ref == state.subscriber_monitor_ref and subscriber == state.subscriber do {:stop, :normal, state} + |> emit_mailbox_depth() end - def handle_info(_message, %__MODULE__{} = state), do: {:noreply, state} + def handle_info(_message, %__MODULE__{} = state) do + {:noreply, state} + |> emit_mailbox_depth() + end @impl true def terminate(reason, %__MODULE__{} = state) do @@ -175,4 +184,9 @@ defmodule Parrhesia.API.Stream.Subscription do {:noreply, %__MODULE__{state | buffered_events: buffered_events}} end end + + defp emit_mailbox_depth(result) do + Telemetry.emit_process_mailbox_depth(:subscription) + result + end end diff --git a/lib/parrhesia/postgres_repos.ex b/lib/parrhesia/postgres_repos.ex new file mode 100644 index 0000000..5eaed18 --- /dev/null +++ b/lib/parrhesia/postgres_repos.ex @@ -0,0 +1,45 @@ +defmodule Parrhesia.PostgresRepos do + @moduledoc false + + alias Parrhesia.Config + alias Parrhesia.ReadRepo + alias Parrhesia.Repo + + @spec write() :: module() + def write, do: Repo + + @spec read() :: module() + def read do + if separate_read_pool_enabled?() and is_pid(Process.whereis(ReadRepo)) do + ReadRepo + else + Repo + end + end + + @spec started_repos() :: [module()] + def started_repos do + if separate_read_pool_enabled?() do + [Repo, ReadRepo] + else + [Repo] + end + end + + @spec separate_read_pool_enabled?() :: boolean() + def separate_read_pool_enabled? do + case Process.whereis(Config) do + pid when is_pid(pid) -> + Config.get([:database, :separate_read_pool?], application_default()) + + nil -> + application_default() + end + end + + defp application_default do + :parrhesia + |> Application.get_env(:database, []) + |> Keyword.get(:separate_read_pool?, false) + end +end diff --git a/lib/parrhesia/read_repo.ex b/lib/parrhesia/read_repo.ex new file mode 100644 index 0000000..3348ab7 --- /dev/null +++ b/lib/parrhesia/read_repo.ex @@ -0,0 +1,9 @@ +defmodule Parrhesia.ReadRepo do + @moduledoc """ + PostgreSQL repository dedicated to read-heavy workloads when a separate read pool is enabled. + """ + + use Ecto.Repo, + otp_app: :parrhesia, + adapter: Ecto.Adapters.Postgres +end diff --git a/lib/parrhesia/storage/adapters/postgres/acl.ex b/lib/parrhesia/storage/adapters/postgres/acl.ex index 160645e..5224c94 100644 --- a/lib/parrhesia/storage/adapters/postgres/acl.ex +++ b/lib/parrhesia/storage/adapters/postgres/acl.ex @@ -5,6 +5,7 @@ defmodule Parrhesia.Storage.Adapters.Postgres.ACL do import Ecto.Query + alias Parrhesia.PostgresRepos alias Parrhesia.Repo @behaviour Parrhesia.Storage.ACL @@ -74,7 +75,8 @@ defmodule Parrhesia.Storage.Adapters.Postgres.ACL do |> maybe_filter_principal(Keyword.get(opts, :principal)) |> maybe_filter_capability(Keyword.get(opts, :capability)) - {:ok, Enum.map(Repo.all(query), &normalize_persisted_rule/1)} + repo = read_repo() + {:ok, Enum.map(repo.all(query), &normalize_persisted_rule/1)} end def list_rules(_context, _opts), do: {:error, :invalid_opts} @@ -133,12 +135,16 @@ defmodule Parrhesia.Storage.Adapters.Postgres.ACL do } ) - case Repo.one(query) do + repo = read_repo() + + case repo.one(query) do nil -> nil stored_rule -> normalize_persisted_rule(stored_rule) end end + defp read_repo, do: PostgresRepos.read() + defp insert_rule(normalized_rule) do now = DateTime.utc_now() |> DateTime.truncate(:microsecond) diff --git a/lib/parrhesia/storage/adapters/postgres/admin.ex b/lib/parrhesia/storage/adapters/postgres/admin.ex index efc8328..5d26200 100644 --- a/lib/parrhesia/storage/adapters/postgres/admin.ex +++ b/lib/parrhesia/storage/adapters/postgres/admin.ex @@ -5,6 +5,7 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Admin do import Ecto.Query + alias Parrhesia.PostgresRepos alias Parrhesia.Repo @behaviour Parrhesia.Storage.Admin @@ -73,8 +74,8 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Admin do |> maybe_filter_actor_pubkey(Keyword.get(opts, :actor_pubkey)) logs = - query - |> Repo.all() + read_repo() + |> then(fn repo -> repo.all(query) end) |> Enum.map(&to_audit_log_map/1) {:ok, logs} @@ -83,11 +84,12 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Admin do def list_audit_logs(_context, _opts), do: {:error, :invalid_opts} defp relay_stats do - events_count = Repo.aggregate("events", :count, :id) - banned_pubkeys = Repo.aggregate("banned_pubkeys", :count, :pubkey) - allowed_pubkeys = Repo.aggregate("allowed_pubkeys", :count, :pubkey) - blocked_ips = Repo.aggregate("blocked_ips", :count, :ip) - acl_rules = Repo.aggregate("acl_rules", :count, :id) + repo = read_repo() + events_count = repo.aggregate("events", :count, :id) + banned_pubkeys = repo.aggregate("banned_pubkeys", :count, :pubkey) + allowed_pubkeys = repo.aggregate("allowed_pubkeys", :count, :pubkey) + blocked_ips = repo.aggregate("blocked_ips", :count, :ip) + acl_rules = repo.aggregate("acl_rules", :count, :id) %{ "events" => events_count, @@ -234,6 +236,8 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Admin do defp normalize_pubkey(_value), do: {:error, :invalid_actor_pubkey} + defp read_repo, do: PostgresRepos.read() + defp invalid_key_reason(:params), do: :invalid_params defp invalid_key_reason(:result), do: :invalid_result diff --git a/lib/parrhesia/storage/adapters/postgres/events.ex b/lib/parrhesia/storage/adapters/postgres/events.ex index bb128f3..627cfdf 100644 --- a/lib/parrhesia/storage/adapters/postgres/events.ex +++ b/lib/parrhesia/storage/adapters/postgres/events.ex @@ -5,6 +5,7 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Events do import Ecto.Query + alias Parrhesia.PostgresRepos alias Parrhesia.Protocol.Filter alias Parrhesia.Repo @@ -67,7 +68,9 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Events do } ) - case Repo.one(event_query) do + repo = read_repo() + + case repo.one(event_query) do nil -> {:ok, nil} @@ -81,13 +84,14 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Events do def query(_context, filters, opts) when is_list(opts) do with :ok <- Filter.validate_filters(filters) do now = Keyword.get(opts, :now, System.system_time(:second)) + repo = read_repo() persisted_events = filters |> Enum.flat_map(fn filter -> filter |> event_query_for_filter(now, opts) - |> Repo.all() + |> repo.all() end) |> deduplicate_events() |> sort_persisted_events(filters) @@ -365,30 +369,7 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Events do defp maybe_upsert_replaceable_state(normalized_event, now, deleted_at) do if replaceable_kind?(normalized_event.kind) do - lookup_query = - from(state in "replaceable_event_state", - where: - state.pubkey == ^normalized_event.pubkey and state.kind == ^normalized_event.kind, - select: %{event_created_at: state.event_created_at, event_id: state.event_id} - ) - - update_query = - from(state in "replaceable_event_state", - where: - state.pubkey == ^normalized_event.pubkey and - state.kind == ^normalized_event.kind - ) - - upsert_state_table( - "replaceable_event_state", - lookup_query, - update_query, - replaceable_state_row(normalized_event, now), - normalized_event, - now, - deleted_at, - :replaceable_state_update_failed - ) + upsert_replaceable_state_table(normalized_event, now, deleted_at) else :ok end @@ -396,159 +377,94 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Events do defp maybe_upsert_addressable_state(normalized_event, now, deleted_at) do if addressable_kind?(normalized_event.kind) do - lookup_query = - from(state in "addressable_event_state", - where: - state.pubkey == ^normalized_event.pubkey and - state.kind == ^normalized_event.kind and - state.d_tag == ^normalized_event.d_tag, - select: %{event_created_at: state.event_created_at, event_id: state.event_id} - ) - - update_query = - from(state in "addressable_event_state", - where: - state.pubkey == ^normalized_event.pubkey and - state.kind == ^normalized_event.kind and - state.d_tag == ^normalized_event.d_tag - ) - - upsert_state_table( - "addressable_event_state", - lookup_query, - update_query, - addressable_state_row(normalized_event, now), - normalized_event, - now, - deleted_at, - :addressable_state_update_failed - ) + upsert_addressable_state_table(normalized_event, now, deleted_at) else :ok end end - defp upsert_state_table( - table_name, - lookup_query, - update_query, - insert_row, - normalized_event, - now, - deleted_at, - failure_reason - ) do - case Repo.one(lookup_query) do - nil -> - insert_state_or_resolve_race( - table_name, - lookup_query, - update_query, - insert_row, - normalized_event, - now, - deleted_at, - failure_reason - ) + defp upsert_replaceable_state_table(normalized_event, now, deleted_at) do + params = [ + normalized_event.pubkey, + normalized_event.kind, + normalized_event.created_at, + normalized_event.id, + now, + now + ] - current_state -> - maybe_update_state( - update_query, - normalized_event, - current_state, - now, - deleted_at, - failure_reason - ) + case Repo.query(replaceable_state_upsert_sql(), params) do + {:ok, %{rows: [row]}} -> + finalize_state_upsert(row, normalized_event, deleted_at, :replaceable_state_update_failed) + + {:ok, _result} -> + Repo.rollback(:replaceable_state_update_failed) + + {:error, _reason} -> + Repo.rollback(:replaceable_state_update_failed) end end - defp insert_state_or_resolve_race( - table_name, - lookup_query, - update_query, - insert_row, + defp upsert_addressable_state_table(normalized_event, now, deleted_at) do + params = [ + normalized_event.pubkey, + normalized_event.kind, + normalized_event.d_tag, + normalized_event.created_at, + normalized_event.id, + now, + now + ] + + case Repo.query(addressable_state_upsert_sql(), params) do + {:ok, %{rows: [row]}} -> + finalize_state_upsert(row, normalized_event, deleted_at, :addressable_state_update_failed) + + {:ok, _result} -> + Repo.rollback(:addressable_state_update_failed) + + {:error, _reason} -> + Repo.rollback(:addressable_state_update_failed) + end + end + + defp finalize_state_upsert( + [retired_event_created_at, retired_event_id, winner_event_created_at, winner_event_id], normalized_event, - now, deleted_at, failure_reason ) do - case Repo.insert_all(table_name, [insert_row], on_conflict: :nothing) do - {1, _result} -> - :ok - - {0, _result} -> - resolve_state_race( - lookup_query, - update_query, - normalized_event, - now, + case {winner_event_created_at, winner_event_id} do + {created_at, event_id} + when created_at == normalized_event.created_at and event_id == normalized_event.id -> + maybe_retire_previous_state_event( + retired_event_created_at, + retired_event_id, deleted_at, failure_reason ) - {_inserted, _result} -> - Repo.rollback(failure_reason) - end - end - - defp resolve_state_race( - lookup_query, - update_query, - normalized_event, - now, - deleted_at, - failure_reason - ) do - case Repo.one(lookup_query) do - nil -> - Repo.rollback(failure_reason) - - current_state -> - maybe_update_state( - update_query, - normalized_event, - current_state, - now, - deleted_at, - failure_reason - ) - end - end - - defp maybe_update_state( - update_query, - normalized_event, - current_state, - now, - deleted_at, - failure_reason - ) do - if candidate_wins_state?(normalized_event, current_state) do - {updated, _result} = - Repo.update_all(update_query, - set: [ - event_created_at: normalized_event.created_at, - event_id: normalized_event.id, - updated_at: now - ] - ) - - if updated == 1 do + {_created_at, _event_id} -> retire_event!( - current_state.event_created_at, - current_state.event_id, + normalized_event.created_at, + normalized_event.id, deleted_at, failure_reason ) - else - Repo.rollback(failure_reason) - end - else - retire_event!(normalized_event.created_at, normalized_event.id, deleted_at, failure_reason) end end + defp maybe_retire_previous_state_event(nil, nil, _deleted_at, _failure_reason), do: :ok + + defp maybe_retire_previous_state_event( + retired_event_created_at, + retired_event_id, + deleted_at, + failure_reason + ) do + retire_event!(retired_event_created_at, retired_event_id, deleted_at, failure_reason) + end + defp retire_event!(event_created_at, event_id, deleted_at, failure_reason) do {updated, _result} = Repo.update_all( @@ -572,27 +488,147 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Events do defp addressable_kind?(kind), do: kind >= 30_000 and kind < 40_000 - defp replaceable_state_row(normalized_event, now) do - %{ - pubkey: normalized_event.pubkey, - kind: normalized_event.kind, - event_created_at: normalized_event.created_at, - event_id: normalized_event.id, - inserted_at: now, - updated_at: now - } + defp replaceable_state_upsert_sql do + """ + WITH inserted AS ( + INSERT INTO replaceable_event_state ( + pubkey, + kind, + event_created_at, + event_id, + inserted_at, + updated_at + ) + VALUES ($1, $2, $3, $4, $5, $6) + ON CONFLICT (pubkey, kind) DO NOTHING + RETURNING + NULL::bigint AS retired_event_created_at, + NULL::bytea AS retired_event_id, + event_created_at AS winner_event_created_at, + event_id AS winner_event_id + ), + updated AS ( + UPDATE replaceable_event_state AS state + SET + event_created_at = $3, + event_id = $4, + updated_at = $6 + FROM ( + SELECT current.event_created_at, current.event_id + FROM replaceable_event_state AS current + WHERE current.pubkey = $1 AND current.kind = $2 + FOR UPDATE + ) AS previous + WHERE + NOT EXISTS (SELECT 1 FROM inserted) + AND state.pubkey = $1 + AND state.kind = $2 + AND ( + state.event_created_at < $3 + OR (state.event_created_at = $3 AND state.event_id > $4) + ) + RETURNING + previous.event_created_at AS retired_event_created_at, + previous.event_id AS retired_event_id, + state.event_created_at AS winner_event_created_at, + state.event_id AS winner_event_id + ), + current AS ( + SELECT + NULL::bigint AS retired_event_created_at, + NULL::bytea AS retired_event_id, + state.event_created_at AS winner_event_created_at, + state.event_id AS winner_event_id + FROM replaceable_event_state AS state + WHERE + NOT EXISTS (SELECT 1 FROM inserted) + AND NOT EXISTS (SELECT 1 FROM updated) + AND state.pubkey = $1 + AND state.kind = $2 + ) + SELECT * + FROM inserted + UNION ALL + SELECT * + FROM updated + UNION ALL + SELECT * + FROM current + LIMIT 1 + """ end - defp addressable_state_row(normalized_event, now) do - %{ - pubkey: normalized_event.pubkey, - kind: normalized_event.kind, - d_tag: normalized_event.d_tag, - event_created_at: normalized_event.created_at, - event_id: normalized_event.id, - inserted_at: now, - updated_at: now - } + defp addressable_state_upsert_sql do + """ + WITH inserted AS ( + INSERT INTO addressable_event_state ( + pubkey, + kind, + d_tag, + event_created_at, + event_id, + inserted_at, + updated_at + ) + VALUES ($1, $2, $3, $4, $5, $6, $7) + ON CONFLICT (pubkey, kind, d_tag) DO NOTHING + RETURNING + NULL::bigint AS retired_event_created_at, + NULL::bytea AS retired_event_id, + event_created_at AS winner_event_created_at, + event_id AS winner_event_id + ), + updated AS ( + UPDATE addressable_event_state AS state + SET + event_created_at = $4, + event_id = $5, + updated_at = $7 + FROM ( + SELECT current.event_created_at, current.event_id + FROM addressable_event_state AS current + WHERE current.pubkey = $1 AND current.kind = $2 AND current.d_tag = $3 + FOR UPDATE + ) AS previous + WHERE + NOT EXISTS (SELECT 1 FROM inserted) + AND state.pubkey = $1 + AND state.kind = $2 + AND state.d_tag = $3 + AND ( + state.event_created_at < $4 + OR (state.event_created_at = $4 AND state.event_id > $5) + ) + RETURNING + previous.event_created_at AS retired_event_created_at, + previous.event_id AS retired_event_id, + state.event_created_at AS winner_event_created_at, + state.event_id AS winner_event_id + ), + current AS ( + SELECT + NULL::bigint AS retired_event_created_at, + NULL::bytea AS retired_event_id, + state.event_created_at AS winner_event_created_at, + state.event_id AS winner_event_id + FROM addressable_event_state AS state + WHERE + NOT EXISTS (SELECT 1 FROM inserted) + AND NOT EXISTS (SELECT 1 FROM updated) + AND state.pubkey = $1 + AND state.kind = $2 + AND state.d_tag = $3 + ) + SELECT * + FROM inserted + UNION ALL + SELECT * + FROM updated + UNION ALL + SELECT * + FROM current + LIMIT 1 + """ end defp event_row(normalized_event, now) do @@ -683,45 +719,57 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Events do end defp fetch_event_refs([filter], now, opts) do - filter - |> event_ref_query_for_filter(now, opts) - |> maybe_limit_query(Keyword.get(opts, :limit)) - |> Repo.all() + query = + filter + |> event_ref_query_for_filter(now, opts) + |> maybe_limit_query(Keyword.get(opts, :limit)) + + read_repo() + |> then(fn repo -> repo.all(query) end) end defp fetch_event_refs(filters, now, opts) do - filters - |> event_ref_union_query_for_filters(now, opts) - |> subquery() - |> then(fn union_query -> - from(ref in union_query, - group_by: [ref.created_at, ref.id], - order_by: [asc: ref.created_at, asc: ref.id], - select: %{created_at: ref.created_at, id: ref.id} - ) - end) - |> maybe_limit_query(Keyword.get(opts, :limit)) - |> Repo.all() + query = + filters + |> event_ref_union_query_for_filters(now, opts) + |> subquery() + |> then(fn union_query -> + from(ref in union_query, + group_by: [ref.created_at, ref.id], + order_by: [asc: ref.created_at, asc: ref.id], + select: %{created_at: ref.created_at, id: ref.id} + ) + end) + |> maybe_limit_query(Keyword.get(opts, :limit)) + + read_repo() + |> then(fn repo -> repo.all(query) end) end defp count_events([filter], now, opts) do - filter - |> event_id_query_for_filter(now, opts) - |> subquery() - |> then(fn query -> - from(event in query, select: count()) - end) - |> Repo.one() + query = + filter + |> event_id_query_for_filter(now, opts) + |> subquery() + |> then(fn query -> + from(event in query, select: count()) + end) + + read_repo() + |> then(fn repo -> repo.one(query) end) end defp count_events(filters, now, opts) do - filters - |> event_id_distinct_union_query_for_filters(now, opts) - |> subquery() - |> then(fn union_query -> - from(event in union_query, select: count()) - end) - |> Repo.one() + query = + filters + |> event_id_distinct_union_query_for_filters(now, opts) + |> subquery() + |> then(fn union_query -> + from(event in union_query, select: count()) + end) + + read_repo() + |> then(fn repo -> repo.one(query) end) end defp event_source_query(filter, now) do @@ -1195,4 +1243,6 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Events do end defp maybe_apply_mls_group_retention(expires_at, _kind, _created_at), do: expires_at + + defp read_repo, do: PostgresRepos.read() end diff --git a/lib/parrhesia/storage/adapters/postgres/groups.ex b/lib/parrhesia/storage/adapters/postgres/groups.ex index 5e56ec3..2903162 100644 --- a/lib/parrhesia/storage/adapters/postgres/groups.ex +++ b/lib/parrhesia/storage/adapters/postgres/groups.ex @@ -5,6 +5,7 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Groups do import Ecto.Query + alias Parrhesia.PostgresRepos alias Parrhesia.Repo @behaviour Parrhesia.Storage.Groups @@ -46,7 +47,9 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Groups do limit: 1 ) - case Repo.one(query) do + repo = read_repo() + + case repo.one(query) do nil -> {:ok, nil} @@ -94,8 +97,8 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Groups do ) memberships = - query - |> Repo.all() + read_repo() + |> then(fn repo -> repo.all(query) end) |> Enum.map(fn membership -> to_membership_map( membership.group_id, @@ -163,8 +166,8 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Groups do ) roles = - query - |> Repo.all() + read_repo() + |> then(fn repo -> repo.all(query) end) |> Enum.map(fn role -> to_role_map(role.group_id, role.pubkey, role.role, role.metadata) end) @@ -242,6 +245,7 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Groups do defp unwrap_transaction_result({:ok, result}), do: {:ok, result} defp unwrap_transaction_result({:error, reason}), do: {:error, reason} + defp read_repo, do: PostgresRepos.read() defp fetch_required_string(map, key) do map diff --git a/lib/parrhesia/storage/adapters/postgres/moderation.ex b/lib/parrhesia/storage/adapters/postgres/moderation.ex index 47c2a49..a165cd1 100644 --- a/lib/parrhesia/storage/adapters/postgres/moderation.ex +++ b/lib/parrhesia/storage/adapters/postgres/moderation.ex @@ -5,6 +5,7 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Moderation do import Ecto.Query + alias Parrhesia.PostgresRepos alias Parrhesia.Repo @behaviour Parrhesia.Storage.Moderation @@ -212,7 +213,8 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Moderation do select: field(record, ^field) ) - Repo.all(query) + read_repo() + |> then(fn repo -> repo.all(query) end) end defp cache_put(scope, value) do @@ -266,7 +268,9 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Moderation do limit: 1 ) - Repo.one(query) == 1 + read_repo() + |> then(fn repo -> repo.one(query) end) + |> Kernel.==(1) end defp scope_populated_db?(table, field) do @@ -276,7 +280,10 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Moderation do limit: 1 ) - not is_nil(Repo.one(query)) + read_repo() + |> then(fn repo -> repo.one(query) end) + |> is_nil() + |> Kernel.not() end defp normalize_hex_or_binary(value, expected_bytes, _reason) @@ -315,4 +322,6 @@ defmodule Parrhesia.Storage.Adapters.Postgres.Moderation do defp to_inet({_, _, _, _, _, _, _, _} = ip_tuple), do: %Postgrex.INET{address: ip_tuple, netmask: 128} + + defp read_repo, do: PostgresRepos.read() end diff --git a/lib/parrhesia/storage/partitions.ex b/lib/parrhesia/storage/partitions.ex index 9c5114f..ffdc32d 100644 --- a/lib/parrhesia/storage/partitions.ex +++ b/lib/parrhesia/storage/partitions.ex @@ -5,6 +5,7 @@ defmodule Parrhesia.Storage.Partitions do import Ecto.Query + alias Parrhesia.PostgresRepos alias Parrhesia.Repo @identifier_pattern ~r/^[a-zA-Z_][a-zA-Z0-9_]*$/ @@ -35,7 +36,8 @@ defmodule Parrhesia.Storage.Partitions do order_by: [asc: table.tablename] ) - Repo.all(query) + read_repo() + |> then(fn repo -> repo.all(query) end) end @doc """ @@ -88,7 +90,9 @@ defmodule Parrhesia.Storage.Partitions do """ @spec database_size_bytes() :: {:ok, non_neg_integer()} | {:error, term()} def database_size_bytes do - case Repo.query("SELECT pg_database_size(current_database())") do + repo = read_repo() + + case repo.query("SELECT pg_database_size(current_database())") do {:ok, %{rows: [[size]]}} when is_integer(size) and size >= 0 -> {:ok, size} {:ok, _result} -> {:error, :unexpected_result} {:error, reason} -> {:error, reason} @@ -219,7 +223,9 @@ defmodule Parrhesia.Storage.Partitions do LIMIT 1 """ - case Repo.query(query, [partition_name, parent_table_name]) do + repo = read_repo() + + case repo.query(query, [partition_name, parent_table_name]) do {:ok, %{rows: [[1]]}} -> true {:ok, %{rows: []}} -> false {:ok, _result} -> false @@ -278,6 +284,8 @@ defmodule Parrhesia.Storage.Partitions do |> DateTime.to_unix() end + defp read_repo, do: PostgresRepos.read() + defp month_start(%Date{} = date), do: Date.new!(date.year, date.month, 1) defp shift_month(%Date{} = date, month_delta) when is_integer(month_delta) do diff --git a/lib/parrhesia/storage/supervisor.ex b/lib/parrhesia/storage/supervisor.ex index a478ff1..4b58c43 100644 --- a/lib/parrhesia/storage/supervisor.ex +++ b/lib/parrhesia/storage/supervisor.ex @@ -5,17 +5,19 @@ defmodule Parrhesia.Storage.Supervisor do use Supervisor + alias Parrhesia.PostgresRepos + def start_link(init_arg \\ []) do Supervisor.start_link(__MODULE__, init_arg, name: __MODULE__) end @impl true def init(_init_arg) do - children = [ - {Parrhesia.Storage.Adapters.Postgres.ModerationCache, - name: Parrhesia.Storage.Adapters.Postgres.ModerationCache}, - Parrhesia.Repo - ] + children = + [ + {Parrhesia.Storage.Adapters.Postgres.ModerationCache, + name: Parrhesia.Storage.Adapters.Postgres.ModerationCache} + ] ++ PostgresRepos.started_repos() Supervisor.init(children, strategy: :one_for_one) end diff --git a/lib/parrhesia/telemetry.ex b/lib/parrhesia/telemetry.ex index 3af231f..bea907c 100644 --- a/lib/parrhesia/telemetry.ex +++ b/lib/parrhesia/telemetry.ex @@ -80,6 +80,13 @@ defmodule Parrhesia.Telemetry do tags: [:traffic_class], tag_values: &traffic_class_tag_values/1 ), + last_value("parrhesia.process.mailbox.depth", + event_name: [:parrhesia, :process, :mailbox], + measurement: :depth, + tags: [:process_type], + tag_values: &process_tag_values/1, + reporter_options: [prometheus_type: :gauge] + ), last_value("parrhesia.vm.memory.total.bytes", event_name: [:parrhesia, :vm, :memory], measurement: :total, @@ -95,6 +102,22 @@ defmodule Parrhesia.Telemetry do :telemetry.execute(event_name, measurements, metadata) end + @spec emit_process_mailbox_depth(atom(), map()) :: :ok + def emit_process_mailbox_depth(process_type, metadata \\ %{}) + when is_atom(process_type) and is_map(metadata) do + case Process.info(self(), :message_queue_len) do + {:message_queue_len, depth} -> + emit( + [:parrhesia, :process, :mailbox], + %{depth: depth}, + Map.put(metadata, :process_type, process_type) + ) + + nil -> + :ok + end + end + defp periodic_measurements do [ {__MODULE__, :emit_vm_memory, []} @@ -111,4 +134,9 @@ defmodule Parrhesia.Telemetry do traffic_class = metadata |> Map.get(:traffic_class, :generic) |> to_string() %{traffic_class: traffic_class} end + + defp process_tag_values(metadata) do + process_type = metadata |> Map.get(:process_type, :unknown) |> to_string() + %{process_type: process_type} + end end diff --git a/lib/parrhesia/web/connection.ex b/lib/parrhesia/web/connection.ex index 8d470d7..d096d96 100644 --- a/lib/parrhesia/web/connection.ex +++ b/lib/parrhesia/web/connection.ex @@ -111,6 +111,7 @@ defmodule Parrhesia.Web.Connection do @impl true def init(opts) do + maybe_configure_exit_trapping(opts) auth_challenges = auth_challenges(opts) state = %__MODULE__{ @@ -136,29 +137,33 @@ defmodule Parrhesia.Web.Connection do auth_max_age_seconds: auth_max_age_seconds(opts) } + Telemetry.emit_process_mailbox_depth(:connection) {:ok, state} end @impl true def handle_in({payload, [opcode: :text]}, %__MODULE__{} = state) do - if byte_size(payload) > state.max_frame_bytes do - response = - Protocol.encode_relay({ - :notice, - "invalid: websocket frame exceeds max frame size" - }) + result = + if byte_size(payload) > state.max_frame_bytes do + response = + Protocol.encode_relay({ + :notice, + "invalid: websocket frame exceeds max frame size" + }) - {:push, {:text, response}, state} - else - case Protocol.decode_client(payload) do - {:ok, decoded_message} -> - handle_decoded_message(decoded_message, state) + {:push, {:text, response}, state} + else + case Protocol.decode_client(payload) do + {:ok, decoded_message} -> + handle_decoded_message(decoded_message, state) - {:error, reason} -> - response = Protocol.encode_relay({:notice, Protocol.decode_error_notice(reason)}) - {:push, {:text, response}, state} + {:error, reason} -> + response = Protocol.encode_relay({:notice, Protocol.decode_error_notice(reason)}) + {:push, {:text, response}, state} + end end - end + + emit_connection_mailbox_depth(result) end @impl true @@ -167,6 +172,7 @@ defmodule Parrhesia.Web.Connection do Protocol.encode_relay({:notice, "invalid: binary websocket frames are not supported"}) {:push, {:text, response}, state} + |> emit_connection_mailbox_depth() end defp handle_decoded_message({:event, event}, state), do: handle_event_ingest(state, event) @@ -211,8 +217,10 @@ defmodule Parrhesia.Web.Connection do when is_reference(ref) and is_binary(subscription_id) and is_map(event) do if current_subscription_ref?(state, subscription_id, ref) do handle_fanout_events(state, [{subscription_id, event}]) + |> emit_connection_mailbox_depth() else {:ok, state} + |> emit_connection_mailbox_depth() end end @@ -224,9 +232,12 @@ defmodule Parrhesia.Web.Connection do if current_subscription_ref?(state, subscription_id, ref) and not subscription_eose_sent?(state, subscription_id) do response = Protocol.encode_relay({:eose, subscription_id}) + {:push, {:text, response}, mark_subscription_eose_sent(state, subscription_id)} + |> emit_connection_mailbox_depth() else {:ok, state} + |> emit_connection_mailbox_depth() end end @@ -242,20 +253,25 @@ defmodule Parrhesia.Web.Connection do |> drop_queued_subscription_events(subscription_id) response = Protocol.encode_relay({:closed, subscription_id, stream_closed_reason(reason)}) + {:push, {:text, response}, next_state} + |> emit_connection_mailbox_depth() else {:ok, state} + |> emit_connection_mailbox_depth() end end def handle_info({:fanout_event, subscription_id, event}, %__MODULE__{} = state) when is_binary(subscription_id) and is_map(event) do handle_fanout_events(state, [{subscription_id, event}]) + |> emit_connection_mailbox_depth() end def handle_info({:fanout_events, fanout_events}, %__MODULE__{} = state) when is_list(fanout_events) do handle_fanout_events(state, fanout_events) + |> emit_connection_mailbox_depth() end def handle_info(@drain_outbound_queue, %__MODULE__{} = state) do @@ -263,13 +279,26 @@ defmodule Parrhesia.Web.Connection do if frames == [] do {:ok, next_state} + |> emit_connection_mailbox_depth() else {:push, frames, next_state} + |> emit_connection_mailbox_depth() end end + def handle_info({:EXIT, _from, :shutdown}, %__MODULE__{} = state) do + close_with_drained_outbound_frames(state) + |> emit_connection_mailbox_depth() + end + + def handle_info({:EXIT, _from, {:shutdown, _detail}}, %__MODULE__{} = state) do + close_with_drained_outbound_frames(state) + |> emit_connection_mailbox_depth() + end + def handle_info(_message, %__MODULE__{} = state) do {:ok, state} + |> emit_connection_mailbox_depth() end @impl true @@ -988,6 +1017,11 @@ defmodule Parrhesia.Web.Connection do {:stop, :normal, {1008, message}, [{:text, notice}], state} end + defp close_with_drained_outbound_frames(state) do + {frames, next_state} = drain_all_outbound_frames(state) + {:stop, :normal, {1012, "service restart"}, frames, next_state} + end + defp enqueue_fanout_events(state, fanout_events) do Enum.reduce_while(fanout_events, {:ok, state}, fn {subscription_id, event}, {:ok, acc} when is_binary(subscription_id) and is_map(event) -> @@ -1094,9 +1128,37 @@ defmodule Parrhesia.Web.Connection do {Enum.reverse(frames), next_state} end + defp drain_all_outbound_frames(%__MODULE__{} = state) do + {frames, next_queue, remaining_size} = + pop_frames(state.outbound_queue, state.outbound_queue_size, :infinity, []) + + next_state = + %__MODULE__{ + state + | outbound_queue: next_queue, + outbound_queue_size: remaining_size, + drain_scheduled?: false + } + + emit_outbound_queue_depth(next_state) + + {Enum.reverse(frames), next_state} + end + defp pop_frames(queue, queue_size, _remaining_batch, acc) when queue_size == 0, do: {acc, queue, queue_size} + defp pop_frames(queue, queue_size, :infinity, acc) do + case :queue.out(queue) do + {{:value, {subscription_id, event}}, next_queue} -> + frame = {:text, Protocol.encode_relay({:event, subscription_id, event})} + pop_frames(next_queue, queue_size - 1, :infinity, [frame | acc]) + + {:empty, _same_queue} -> + {acc, :queue.new(), 0} + end + end + defp pop_frames(queue, queue_size, remaining_batch, acc) when remaining_batch <= 0, do: {acc, queue, queue_size} @@ -1145,6 +1207,11 @@ defmodule Parrhesia.Web.Connection do end end + defp emit_connection_mailbox_depth(result) do + Telemetry.emit_process_mailbox_depth(:connection) + result + end + defp ensure_subscription_capacity(%__MODULE__{} = state, subscription_id) do cond do Map.has_key?(state.subscriptions, subscription_id) -> @@ -1641,6 +1708,18 @@ defmodule Parrhesia.Web.Connection do |> Keyword.get(:auth_max_age_seconds, @default_auth_max_age_seconds) end + defp maybe_configure_exit_trapping(opts) do + if trap_exit?(opts) do + Process.flag(:trap_exit, true) + end + + :ok + end + + defp trap_exit?(opts) when is_list(opts), do: Keyword.get(opts, :trap_exit?, true) + defp trap_exit?(opts) when is_map(opts), do: Map.get(opts, :trap_exit?, true) + defp trap_exit?(_opts), do: true + defp request_context(%__MODULE__{} = state, subscription_id \\ nil) do %RequestContext{ authenticated_pubkeys: state.authenticated_pubkeys, diff --git a/lib/parrhesia/web/readiness.ex b/lib/parrhesia/web/readiness.ex index df59fd4..d834d40 100644 --- a/lib/parrhesia/web/readiness.ex +++ b/lib/parrhesia/web/readiness.ex @@ -1,12 +1,14 @@ defmodule Parrhesia.Web.Readiness do @moduledoc false + alias Parrhesia.PostgresRepos + @spec ready?() :: boolean() def ready? do process_ready?(Parrhesia.Subscriptions.Index) and process_ready?(Parrhesia.Auth.Challenges) and negentropy_ready?() and - process_ready?(Parrhesia.Repo) + repos_ready?() end defp negentropy_ready? do @@ -29,4 +31,8 @@ defmodule Parrhesia.Web.Readiness do nil -> false end end + + defp repos_ready? do + Enum.all?(PostgresRepos.started_repos(), &process_ready?/1) + end end diff --git a/test/parrhesia/application_test.exs b/test/parrhesia/application_test.exs index 6f79298..fdcf44e 100644 --- a/test/parrhesia/application_test.exs +++ b/test/parrhesia/application_test.exs @@ -1,6 +1,8 @@ defmodule Parrhesia.ApplicationTest do use Parrhesia.IntegrationCase, async: false + alias Parrhesia.PostgresRepos + test "starts the core supervision tree" do assert is_pid(Process.whereis(Parrhesia.Supervisor)) assert is_pid(Process.whereis(Parrhesia.Telemetry)) @@ -25,6 +27,7 @@ defmodule Parrhesia.ApplicationTest do assert is_pid(Process.whereis(Parrhesia.Auth.Nip98ReplayCache)) assert is_pid(Process.whereis(Parrhesia.API.Identity.Manager)) assert is_pid(Process.whereis(Parrhesia.API.Sync.Manager)) + assert Enum.all?(PostgresRepos.started_repos(), &is_pid(Process.whereis(&1))) if negentropy_enabled?() do assert is_pid(Process.whereis(Parrhesia.Negentropy.Sessions)) diff --git a/test/parrhesia/config_test.exs b/test/parrhesia/config_test.exs index 93c41b3..e6342ef 100644 --- a/test/parrhesia/config_test.exs +++ b/test/parrhesia/config_test.exs @@ -18,6 +18,7 @@ defmodule Parrhesia.ConfigTest do assert Parrhesia.Config.get([:limits, :auth_max_age_seconds]) == 600 assert Parrhesia.Config.get([:limits, :max_outbound_queue]) == 256 assert Parrhesia.Config.get([:limits, :max_filter_limit]) == 500 + assert Parrhesia.Config.get([:database, :separate_read_pool?]) == false assert Parrhesia.Config.get([:relay_url]) == "ws://localhost:4413/relay" assert Parrhesia.Config.get([:policies, :auth_required_for_writes]) == false assert Parrhesia.Config.get([:policies, :marmot_media_max_imeta_tags_per_event]) == 8 diff --git a/test/parrhesia/performance/load_soak_test.exs b/test/parrhesia/performance/load_soak_test.exs index a93004a..1a3926c 100644 --- a/test/parrhesia/performance/load_soak_test.exs +++ b/test/parrhesia/performance/load_soak_test.exs @@ -5,7 +5,8 @@ defmodule Parrhesia.Performance.LoadSoakTest do @tag :performance test "fanout enqueue/drain stays within relaxed p95 budget" do - {:ok, state} = Connection.init(subscription_index: nil, max_outbound_queue: 10_000) + {:ok, state} = + Connection.init(subscription_index: nil, max_outbound_queue: 10_000, trap_exit?: false) req_payload = JSON.encode!(["REQ", "sub-load", %{"kinds" => [1]}]) diff --git a/test/parrhesia/telemetry_test.exs b/test/parrhesia/telemetry_test.exs index 1351091..85dd8ee 100644 --- a/test/parrhesia/telemetry_test.exs +++ b/test/parrhesia/telemetry_test.exs @@ -12,6 +12,7 @@ defmodule Parrhesia.TelemetryTest do assert [:parrhesia, :connection, :outbound_queue, :depth] in metric_names assert [:parrhesia, :connection, :outbound_queue, :pressure] in metric_names assert [:parrhesia, :connection, :outbound_queue, :pressure_events, :count] in metric_names + assert [:parrhesia, :process, :mailbox, :depth] in metric_names end test "emit/3 accepts traffic-class metadata" do @@ -22,4 +23,26 @@ defmodule Parrhesia.TelemetryTest do %{traffic_class: :marmot} ) end + + test "emit_process_mailbox_depth/2 tags process type" do + handler_id = "telemetry-mailbox-depth-test" + + :ok = + :telemetry.attach( + handler_id, + [:parrhesia, :process, :mailbox], + fn _event_name, measurements, metadata, test_pid -> + send(test_pid, {:mailbox_depth, measurements, metadata}) + end, + self() + ) + + on_exit(fn -> :telemetry.detach(handler_id) end) + + assert :ok = Telemetry.emit_process_mailbox_depth(:connection) + + assert_receive {:mailbox_depth, %{depth: depth}, %{process_type: :connection}} + assert is_integer(depth) + assert depth >= 0 + end end diff --git a/test/parrhesia/web/connection_test.exs b/test/parrhesia/web/connection_test.exs index da4823a..d0f7af3 100644 --- a/test/parrhesia/web/connection_test.exs +++ b/test/parrhesia/web/connection_test.exs @@ -931,6 +931,30 @@ defmodule Parrhesia.Web.ConnectionTest do assert delivered_ids == Enum.map(events, & &1["id"]) end + test "shutdown drains queued outbound frames before closing" do + state = subscribed_connection_state(outbound_drain_batch_size: 1) + first = live_event(String.duplicate("a", 64), 1) + second = live_event(String.duplicate("b", 64), 1) + + assert {:ok, queued_state} = + Connection.handle_info( + {:fanout_events, [{"sub-1", first}, {"sub-1", second}]}, + state + ) + + assert queued_state.outbound_queue_size == 2 + + assert {:stop, :normal, {1012, "service restart"}, frames, drained_state} = + Connection.handle_info({:EXIT, self(), :shutdown}, queued_state) + + assert drained_state.outbound_queue_size == 0 + + assert Enum.map(frames, fn {:text, payload} -> JSON.decode!(payload) end) == [ + ["EVENT", "sub-1", first], + ["EVENT", "sub-1", second] + ] + end + test "outbound queue overflow closes connection when strategy is close" do state = subscribed_connection_state( @@ -975,7 +999,12 @@ defmodule Parrhesia.Web.ConnectionTest do end defp connection_state(opts \\ []) do - {:ok, state} = Connection.init(Keyword.put_new(opts, :subscription_index, nil)) + opts = + opts + |> Keyword.put_new(:subscription_index, nil) + |> Keyword.put_new(:trap_exit?, false) + + {:ok, state} = Connection.init(opts) state end diff --git a/test/support/runtime.ex b/test/support/runtime.ex index 179aa7d..210725e 100644 --- a/test/support/runtime.ex +++ b/test/support/runtime.ex @@ -1,14 +1,7 @@ defmodule Parrhesia.TestSupport.Runtime do @moduledoc false - @required_processes [ - Parrhesia.Supervisor, - Parrhesia.Config, - Parrhesia.Repo, - Parrhesia.Subscriptions.Supervisor, - Parrhesia.API.Stream.Supervisor, - Parrhesia.Web.Endpoint - ] + alias Parrhesia.PostgresRepos def ensure_started! do if healthy?() do @@ -19,7 +12,7 @@ defmodule Parrhesia.TestSupport.Runtime do end defp healthy? do - Enum.all?(@required_processes, &is_pid(Process.whereis(&1))) + Enum.all?(required_processes(), &is_pid(Process.whereis(&1))) end defp restart! do @@ -32,4 +25,14 @@ defmodule Parrhesia.TestSupport.Runtime do {:ok, _apps} = Application.ensure_all_started(:parrhesia) :ok end + + defp required_processes do + [ + Parrhesia.Supervisor, + Parrhesia.Config, + Parrhesia.Subscriptions.Supervisor, + Parrhesia.API.Stream.Supervisor, + Parrhesia.Web.Endpoint + ] ++ PostgresRepos.started_repos() + end end