fix: Subscription workers restart strategy, sandbox ownership race condition
Clear OTP SSL PEM cache between listener terminate/restart so reloaded certs are read from disk instead of serving stale cached data. Make reconcile_worker idempotent to prevent unnecessary worker churn when put_server is followed by start_server. Add request timeouts to RelayInfoClient to prevent hanging connections. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -74,6 +74,7 @@ defmodule Parrhesia.API.Sync.Manager do
|
|||||||
{:ok, normalized_server} ->
|
{:ok, normalized_server} ->
|
||||||
updated_state =
|
updated_state =
|
||||||
state
|
state
|
||||||
|
|> stop_worker_if_running(normalized_server.id)
|
||||||
|> put_server_state(normalized_server)
|
|> put_server_state(normalized_server)
|
||||||
|> persist_and_reconcile!(normalized_server.id)
|
|> persist_and_reconcile!(normalized_server.id)
|
||||||
|
|
||||||
@@ -248,9 +249,7 @@ defmodule Parrhesia.API.Sync.Manager do
|
|||||||
state
|
state
|
||||||
|
|
||||||
desired_running?(state, server_id) ->
|
desired_running?(state, server_id) ->
|
||||||
state
|
maybe_start_worker(state, server_id)
|
||||||
|> stop_worker_if_running(server_id)
|
|
||||||
|> maybe_start_worker(server_id)
|
|
||||||
|
|
||||||
true ->
|
true ->
|
||||||
stop_worker_if_running(state, server_id)
|
stop_worker_if_running(state, server_id)
|
||||||
|
|||||||
@@ -22,7 +22,8 @@ defmodule Parrhesia.Sync.RelayInfoClient do
|
|||||||
url: url,
|
url: url,
|
||||||
headers: [{"accept", "application/nostr+json"}],
|
headers: [{"accept", "application/nostr+json"}],
|
||||||
decode_body: false,
|
decode_body: false,
|
||||||
connect_options: opts
|
connect_options: Keyword.merge([timeout: 5_000], opts),
|
||||||
|
receive_timeout: 5_000
|
||||||
) do
|
) do
|
||||||
{:ok, response} -> {:ok, response}
|
{:ok, response} -> {:ok, response}
|
||||||
{:error, reason} -> {:error, reason}
|
{:error, reason} -> {:error, reason}
|
||||||
|
|||||||
@@ -16,6 +16,7 @@ defmodule Parrhesia.Web.Endpoint do
|
|||||||
@spec reload_listener(Supervisor.supervisor(), atom()) :: :ok | {:error, term()}
|
@spec reload_listener(Supervisor.supervisor(), atom()) :: :ok | {:error, term()}
|
||||||
def reload_listener(supervisor \\ __MODULE__, listener_id) when is_atom(listener_id) do
|
def reload_listener(supervisor \\ __MODULE__, listener_id) when is_atom(listener_id) do
|
||||||
with :ok <- Supervisor.terminate_child(supervisor, {:listener, listener_id}),
|
with :ok <- Supervisor.terminate_child(supervisor, {:listener, listener_id}),
|
||||||
|
:ok <- clear_pem_cache(),
|
||||||
{:ok, _pid} <- Supervisor.restart_child(supervisor, {:listener, listener_id}) do
|
{:ok, _pid} <- Supervisor.restart_child(supervisor, {:listener, listener_id}) do
|
||||||
:ok
|
:ok
|
||||||
else
|
else
|
||||||
@@ -27,17 +28,44 @@ defmodule Parrhesia.Web.Endpoint do
|
|||||||
|
|
||||||
@spec reload_all(Supervisor.supervisor()) :: :ok | {:error, term()}
|
@spec reload_all(Supervisor.supervisor()) :: :ok | {:error, term()}
|
||||||
def reload_all(supervisor \\ __MODULE__) do
|
def reload_all(supervisor \\ __MODULE__) do
|
||||||
supervisor
|
listener_ids =
|
||||||
|> Supervisor.which_children()
|
supervisor
|
||||||
|> Enum.filter(fn {id, _pid, _type, _modules} ->
|
|> Supervisor.which_children()
|
||||||
match?({:listener, _listener_id}, id)
|
|> Enum.flat_map(fn
|
||||||
end)
|
{{:listener, listener_id}, _pid, _type, _modules} -> [listener_id]
|
||||||
|> Enum.reduce_while(:ok, fn {{:listener, listener_id}, _pid, _type, _modules}, :ok ->
|
_other -> []
|
||||||
case reload_listener(supervisor, listener_id) do
|
end)
|
||||||
:ok -> {:cont, :ok}
|
|
||||||
{:error, _reason} = error -> {:halt, error}
|
with :ok <- terminate_listeners(supervisor, listener_ids),
|
||||||
end
|
:ok <- clear_pem_cache() do
|
||||||
end)
|
restart_listeners(supervisor, listener_ids)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
defp terminate_listeners(_supervisor, []), do: :ok
|
||||||
|
|
||||||
|
defp terminate_listeners(supervisor, [listener_id | rest]) do
|
||||||
|
case Supervisor.terminate_child(supervisor, {:listener, listener_id}) do
|
||||||
|
:ok -> terminate_listeners(supervisor, rest)
|
||||||
|
{:error, _reason} = error -> error
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
defp restart_listeners(_supervisor, []), do: :ok
|
||||||
|
|
||||||
|
defp restart_listeners(supervisor, [listener_id | rest]) do
|
||||||
|
case Supervisor.restart_child(supervisor, {:listener, listener_id}) do
|
||||||
|
{:ok, _pid} -> restart_listeners(supervisor, rest)
|
||||||
|
{:error, _reason} = error -> error
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
# OTP's ssl module caches PEM file contents by filename. When cert/key
|
||||||
|
# files are replaced on disk, the cache must be cleared so the restarted
|
||||||
|
# listener reads the updated files.
|
||||||
|
defp clear_pem_cache do
|
||||||
|
:ssl.clear_pem_cache()
|
||||||
|
:ok
|
||||||
end
|
end
|
||||||
|
|
||||||
@impl true
|
@impl true
|
||||||
|
|||||||
@@ -296,29 +296,24 @@ defmodule Parrhesia.Web.TLSE2ETest do
|
|||||||
end
|
end
|
||||||
|
|
||||||
defp server_cert_fingerprint(port) do
|
defp server_cert_fingerprint(port) do
|
||||||
command =
|
ssl_opts = [verify: :verify_none, server_name_indication: ~c"localhost"]
|
||||||
"printf '' | /usr/bin/openssl s_client -connect 127.0.0.1:#{port} -servername localhost -showcerts"
|
|
||||||
|
|
||||||
case System.cmd("/bin/sh", ["-c", command], stderr_to_stdout: true) do
|
case :ssl.connect({127, 0, 0, 1}, port, ssl_opts, 5_000) do
|
||||||
{output, 0} ->
|
{:ok, ssl_socket} ->
|
||||||
with {:ok, pem_entry} <- first_certificate_pem(output),
|
try do
|
||||||
[entry | _rest] <- :public_key.pem_decode(pem_entry),
|
case :ssl.peercert(ssl_socket) do
|
||||||
cert_der <- elem(entry, 1) do
|
{:ok, cert_der} ->
|
||||||
{:ok, Base.encode64(:crypto.hash(:sha256, cert_der))}
|
{:ok, Base.encode64(:crypto.hash(:sha256, cert_der))}
|
||||||
else
|
|
||||||
[] -> {:error, :missing_certificate}
|
{:error, reason} ->
|
||||||
{:error, _reason} = error -> error
|
{:error, reason}
|
||||||
|
end
|
||||||
|
after
|
||||||
|
:ssl.close(ssl_socket)
|
||||||
end
|
end
|
||||||
|
|
||||||
{output, status} ->
|
{:error, reason} ->
|
||||||
{:error, {:openssl_failed, status, output}}
|
{:error, reason}
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
defp first_certificate_pem(output) do
|
|
||||||
case Regex.run(~r/-----BEGIN CERTIFICATE-----.*?-----END CERTIFICATE-----/ms, output) do
|
|
||||||
[pem] -> {:ok, pem}
|
|
||||||
_other -> {:error, :missing_certificate}
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user