Merge remote-tracking branch 'public/master' (GH actions, test stability)
This commit is contained in:
@@ -22,7 +22,17 @@ defmodule Parrhesia.Storage.Adapters.Memory.Store do
|
||||
audit_logs: []
|
||||
}
|
||||
|
||||
def ensure_started, do: start_store()
|
||||
def start_link(opts \\ []) do
|
||||
name = Keyword.get(opts, :name, @name)
|
||||
Agent.start_link(&init_state/0, name: name)
|
||||
end
|
||||
|
||||
def ensure_started do
|
||||
case Process.whereis(@name) do
|
||||
pid when is_pid(pid) -> :ok
|
||||
nil -> start_store()
|
||||
end
|
||||
end
|
||||
|
||||
def put_event(event_id, event) when is_binary(event_id) and is_map(event) do
|
||||
:ok = ensure_started()
|
||||
@@ -159,28 +169,72 @@ defmodule Parrhesia.Storage.Adapters.Memory.Store do
|
||||
defp normalize_reduce_result(next_acc), do: next_acc
|
||||
|
||||
def get(fun) do
|
||||
:ok = ensure_started()
|
||||
Agent.get(@name, fun)
|
||||
with_store(fn pid -> Agent.get(pid, fun) end)
|
||||
end
|
||||
|
||||
def update(fun) do
|
||||
:ok = ensure_started()
|
||||
Agent.update(@name, fun)
|
||||
with_store(fn pid -> Agent.update(pid, fun) end)
|
||||
end
|
||||
|
||||
def get_and_update(fun) do
|
||||
:ok = ensure_started()
|
||||
Agent.get_and_update(@name, fun)
|
||||
with_store(fn pid -> Agent.get_and_update(pid, fun) end)
|
||||
end
|
||||
|
||||
defp start_store do
|
||||
case Agent.start_link(&init_state/0, name: @name) do
|
||||
{:ok, _pid} -> :ok
|
||||
{:error, {:already_started, _pid}} -> :ok
|
||||
{:error, reason} -> {:error, reason}
|
||||
case start_link() do
|
||||
{:ok, _pid} ->
|
||||
:ok
|
||||
|
||||
{:error, {:already_started, pid}} ->
|
||||
if Process.alive?(pid) do
|
||||
:ok
|
||||
else
|
||||
wait_for_store_exit(pid)
|
||||
end
|
||||
|
||||
{:error, reason} ->
|
||||
{:error, reason}
|
||||
end
|
||||
end
|
||||
|
||||
defp with_store(fun, attempts \\ 2)
|
||||
|
||||
defp with_store(fun, attempts) when attempts > 0 do
|
||||
:ok = ensure_started()
|
||||
|
||||
case Process.whereis(@name) do
|
||||
pid when is_pid(pid) ->
|
||||
try do
|
||||
fun.(pid)
|
||||
catch
|
||||
:exit, reason ->
|
||||
if noproc_exit?(reason) and attempts > 1 do
|
||||
with_store(fun, attempts - 1)
|
||||
else
|
||||
exit(reason)
|
||||
end
|
||||
end
|
||||
|
||||
nil ->
|
||||
with_store(fun, attempts - 1)
|
||||
end
|
||||
end
|
||||
|
||||
defp with_store(_fun, 0), do: exit(:noproc)
|
||||
|
||||
defp wait_for_store_exit(pid) do
|
||||
ref = Process.monitor(pid)
|
||||
|
||||
receive do
|
||||
{:DOWN, ^ref, :process, ^pid, _reason} -> start_store()
|
||||
after
|
||||
100 -> start_store()
|
||||
end
|
||||
end
|
||||
|
||||
defp noproc_exit?({:noproc, _details}), do: true
|
||||
defp noproc_exit?(_reason), do: false
|
||||
|
||||
defp init_state do
|
||||
ensure_tables_started()
|
||||
|
||||
|
||||
@@ -13,11 +13,19 @@ defmodule Parrhesia.Storage.Supervisor do
|
||||
|
||||
@impl true
|
||||
def init(_init_arg) do
|
||||
children = moderation_cache_children() ++ PostgresRepos.started_repos()
|
||||
children =
|
||||
memory_store_children() ++ moderation_cache_children() ++ PostgresRepos.started_repos()
|
||||
|
||||
Supervisor.init(children, strategy: :one_for_one)
|
||||
end
|
||||
|
||||
defp memory_store_children do
|
||||
case Application.get_env(:parrhesia, :storage, [])[:backend] do
|
||||
:memory -> [Parrhesia.Storage.Adapters.Memory.Store]
|
||||
_other -> []
|
||||
end
|
||||
end
|
||||
|
||||
defp moderation_cache_children do
|
||||
if PostgresRepos.postgres_enabled?() and
|
||||
Application.get_env(:parrhesia, :moderation_cache_enabled, true) do
|
||||
|
||||
Reference in New Issue
Block a user