Add monthly partition maintenance and retention pruning

This commit is contained in:
2026-03-14 18:09:53 +01:00
parent 19664ac56c
commit 889d630c12
12 changed files with 1359 additions and 76 deletions

View File

@@ -7,6 +7,18 @@ defmodule Parrhesia.Storage.Archiver do
alias Parrhesia.Repo
@identifier_pattern ~r/^[a-zA-Z_][a-zA-Z0-9_]*$/
@monthly_partition_pattern ~r/^events_(\d{4})_(\d{2})$/
@default_months_ahead 2
@type monthly_partition :: %{
name: String.t(),
year: pos_integer(),
month: pos_integer(),
month_start_unix: non_neg_integer(),
month_end_unix: non_neg_integer()
}
@doc """
Lists all `events_*` partitions excluding the default partition.
"""
@@ -24,7 +36,79 @@ defmodule Parrhesia.Storage.Archiver do
Repo.all(query)
end
@identifier_pattern ~r/^[a-zA-Z_][a-zA-Z0-9_]*$/
@doc """
Lists monthly event partitions that match `events_YYYY_MM` naming.
"""
@spec list_monthly_partitions() :: [monthly_partition()]
def list_monthly_partitions do
list_partitions()
|> Enum.map(&parse_monthly_partition/1)
|> Enum.reject(&is_nil/1)
|> Enum.sort_by(&{&1.year, &1.month})
end
@doc """
Ensures monthly partitions exist for the current month and `months_ahead` future months.
"""
@spec ensure_monthly_partitions(keyword()) :: :ok | {:error, term()}
def ensure_monthly_partitions(opts \\ []) when is_list(opts) do
months_ahead =
opts
|> Keyword.get(:months_ahead, @default_months_ahead)
|> normalize_non_negative_integer(@default_months_ahead)
reference_date =
opts
|> Keyword.get(:reference_date, Date.utc_today())
|> normalize_reference_date()
reference_month = month_start(reference_date)
offsets =
if months_ahead == 0 do
[0]
else
Enum.to_list(0..months_ahead)
end
Enum.reduce_while(offsets, :ok, fn offset, :ok ->
target_month = shift_month(reference_month, offset)
case create_monthly_partition(target_month) do
:ok -> {:cont, :ok}
{:error, reason} -> {:halt, {:error, reason}}
end
end)
end
@doc """
Returns the current database size in bytes.
"""
@spec database_size_bytes() :: {:ok, non_neg_integer()} | {:error, term()}
def database_size_bytes do
case Repo.query("SELECT pg_database_size(current_database())") do
{:ok, %{rows: [[size]]}} when is_integer(size) and size >= 0 -> {:ok, size}
{:ok, _result} -> {:error, :unexpected_result}
{:error, reason} -> {:error, reason}
end
end
@doc """
Drops an event partition table by name.
"""
@spec drop_partition(String.t()) :: :ok | {:error, term()}
def drop_partition(partition_name) when is_binary(partition_name) do
if partition_name in ["events", "events_default"] do
{:error, :protected_partition}
else
quoted_partition_name = quote_identifier!(partition_name)
case Repo.query("DROP TABLE IF EXISTS #{quoted_partition_name}") do
{:ok, _result} -> :ok
{:error, reason} -> {:error, reason}
end
end
end
@doc """
Generates an archive SQL statement for the given partition.
@@ -37,6 +121,89 @@ defmodule Parrhesia.Storage.Archiver do
"INSERT INTO #{quoted_archive_table_name} SELECT * FROM #{quoted_partition_name};"
end
@doc """
Returns the monthly partition name for a date.
"""
@spec month_partition_name(Date.t()) :: String.t()
def month_partition_name(%Date{} = date) do
month = date.month |> Integer.to_string() |> String.pad_leading(2, "0")
"events_#{date.year}_#{month}"
end
defp create_monthly_partition(%Date{} = month_date) do
partition_name = month_partition_name(month_date)
{start_unix, end_unix} = month_bounds_unix(month_date.year, month_date.month)
quoted_partition_name = quote_identifier!(partition_name)
sql =
"""
CREATE TABLE IF NOT EXISTS #{quoted_partition_name}
PARTITION OF "events"
FOR VALUES FROM (#{start_unix}) TO (#{end_unix})
"""
case Repo.query(sql) do
{:ok, _result} -> :ok
{:error, reason} -> {:error, reason}
end
end
defp parse_monthly_partition(partition_name) do
case Regex.run(@monthly_partition_pattern, partition_name, capture: :all_but_first) do
[year_text, month_text] ->
{year, ""} = Integer.parse(year_text)
{month, ""} = Integer.parse(month_text)
if month in 1..12 do
{month_start_unix, month_end_unix} = month_bounds_unix(year, month)
%{
name: partition_name,
year: year,
month: month,
month_start_unix: month_start_unix,
month_end_unix: month_end_unix
}
else
nil
end
_other ->
nil
end
end
defp month_bounds_unix(year, month) do
month_date = Date.new!(year, month, 1)
next_month_date = shift_month(month_date, 1)
{date_to_unix(month_date), date_to_unix(next_month_date)}
end
defp date_to_unix(%Date{} = date) do
date
|> DateTime.new!(~T[00:00:00], "Etc/UTC")
|> DateTime.to_unix()
end
defp month_start(%Date{} = date), do: Date.new!(date.year, date.month, 1)
defp shift_month(%Date{} = date, month_delta) when is_integer(month_delta) do
month_index = date.year * 12 + date.month - 1 + month_delta
shifted_year = div(month_index, 12)
shifted_month = rem(month_index, 12) + 1
Date.new!(shifted_year, shifted_month, 1)
end
defp normalize_reference_date(%Date{} = date), do: date
defp normalize_reference_date(_other), do: Date.utc_today()
defp normalize_non_negative_integer(value, _default) when is_integer(value) and value >= 0,
do: value
defp normalize_non_negative_integer(_value, default), do: default
defp quote_identifier!(identifier) when is_binary(identifier) do
if Regex.match?(@identifier_pattern, identifier) do
~s("#{identifier}")

View File

@@ -0,0 +1,280 @@
defmodule Parrhesia.Tasks.PartitionRetentionWorker do
@moduledoc """
Periodic worker that ensures monthly event partitions and applies retention pruning.
"""
use GenServer
alias Parrhesia.Storage.Archiver
alias Parrhesia.Telemetry
@default_check_interval_hours 24
@default_months_ahead 2
@default_max_partitions_to_drop_per_run 1
@bytes_per_gib 1_073_741_824
@type monthly_partition :: Archiver.monthly_partition()
@spec start_link(keyword()) :: GenServer.on_start()
def start_link(opts \\ []) do
name = Keyword.get(opts, :name, __MODULE__)
GenServer.start_link(__MODULE__, opts, name: name)
end
@impl true
def init(opts) do
retention_config = Application.get_env(:parrhesia, :retention, [])
state = %{
archiver: Keyword.get(opts, :archiver, Archiver),
interval_ms: interval_ms(opts, retention_config),
months_ahead: months_ahead(opts, retention_config),
max_db_gib: max_db_gib(opts, retention_config),
max_months_to_keep: max_months_to_keep(opts, retention_config),
max_partitions_to_drop_per_run: max_partitions_to_drop_per_run(opts, retention_config),
today_fun: today_fun(opts)
}
schedule_tick(0)
{:ok, state}
end
@impl true
def handle_info(:tick, state) do
started_at = System.monotonic_time()
{dropped_count, status} =
case run_maintenance(state) do
{:ok, count} -> {count, :ok}
{:error, _reason} -> {0, :error}
end
Telemetry.emit(
[:parrhesia, :maintenance, :partition_retention, :stop],
%{
duration: System.monotonic_time() - started_at,
dropped_partitions: dropped_count
},
%{status: status}
)
schedule_tick(state.interval_ms)
{:noreply, state}
end
def handle_info(_message, state), do: {:noreply, state}
defp run_maintenance(state) do
case state.archiver.ensure_monthly_partitions(months_ahead: state.months_ahead) do
:ok -> maybe_drop_oldest_partitions(state)
{:error, reason} -> {:error, reason}
end
end
defp maybe_drop_oldest_partitions(%{max_partitions_to_drop_per_run: max_drops})
when max_drops <= 0,
do: {:ok, 0}
defp maybe_drop_oldest_partitions(state) do
1..state.max_partitions_to_drop_per_run
|> Enum.reduce_while({:ok, 0}, fn _attempt, {:ok, dropped_count} ->
drop_oldest_partition_once(state, dropped_count)
end)
end
defp drop_oldest_partition_once(state, dropped_count) do
case next_partition_to_drop(state) do
{:ok, partition} -> apply_partition_drop(state, partition, dropped_count)
{:error, reason} -> {:halt, {:error, reason}}
end
end
defp apply_partition_drop(_state, nil, dropped_count), do: {:halt, {:ok, dropped_count}}
defp apply_partition_drop(state, partition, dropped_count) do
case state.archiver.drop_partition(partition.name) do
:ok -> {:cont, {:ok, dropped_count + 1}}
{:error, reason} -> {:halt, {:error, reason}}
end
end
defp next_partition_to_drop(state) do
partitions = state.archiver.list_monthly_partitions()
current_month_index = current_month_index(state.today_fun)
month_limit_candidate =
oldest_partition_exceeding_month_limit(
partitions,
state.max_months_to_keep,
current_month_index
)
with {:ok, size_limit_candidate} <-
oldest_partition_exceeding_size_limit(
partitions,
state.max_db_gib,
current_month_index,
state.archiver
) do
{:ok, pick_oldest_partition(month_limit_candidate, size_limit_candidate)}
end
end
defp oldest_partition_exceeding_month_limit(_partitions, :infinity, _current_month_index),
do: nil
defp oldest_partition_exceeding_month_limit(partitions, max_months_to_keep, current_month_index)
when is_integer(max_months_to_keep) and max_months_to_keep > 0 do
oldest_month_to_keep_index = current_month_index - (max_months_to_keep - 1)
partitions
|> Enum.filter(fn partition ->
month_index(partition) < current_month_index and
month_index(partition) < oldest_month_to_keep_index
end)
|> Enum.min_by(&month_index/1, fn -> nil end)
end
defp oldest_partition_exceeding_month_limit(
_partitions,
_max_months_to_keep,
_current_month_index
),
do: nil
defp oldest_partition_exceeding_size_limit(
_partitions,
:infinity,
_current_month_index,
_archiver
),
do: {:ok, nil}
defp oldest_partition_exceeding_size_limit(
partitions,
max_db_gib,
current_month_index,
archiver
)
when is_integer(max_db_gib) and max_db_gib > 0 do
with {:ok, current_size_bytes} <- archiver.database_size_bytes() do
max_size_bytes = max_db_gib * @bytes_per_gib
if current_size_bytes > max_size_bytes do
{:ok, oldest_completed_partition(partitions, current_month_index)}
else
{:ok, nil}
end
end
end
defp oldest_partition_exceeding_size_limit(
_partitions,
_max_db_gib,
_current_month_index,
_archiver
),
do: {:ok, nil}
defp oldest_completed_partition(partitions, current_month_index) do
partitions
|> Enum.filter(&(month_index(&1) < current_month_index))
|> Enum.min_by(&month_index/1, fn -> nil end)
end
defp pick_oldest_partition(nil, nil), do: nil
defp pick_oldest_partition(partition, nil), do: partition
defp pick_oldest_partition(nil, partition), do: partition
defp pick_oldest_partition(left, right) do
if month_index(left) <= month_index(right) do
left
else
right
end
end
defp month_index(%{year: year, month: month}) when is_integer(year) and is_integer(month) do
year * 12 + month
end
defp current_month_index(today_fun) do
today = today_fun.()
today.year * 12 + today.month
end
defp interval_ms(opts, retention_config) do
case Keyword.get(opts, :interval_ms) do
value when is_integer(value) and value > 0 ->
value
_other ->
retention_config
|> Keyword.get(:check_interval_hours, @default_check_interval_hours)
|> normalize_positive_integer(@default_check_interval_hours)
|> hours_to_ms()
end
end
defp months_ahead(opts, retention_config) do
opts
|> Keyword.get(
:months_ahead,
Keyword.get(retention_config, :months_ahead, @default_months_ahead)
)
|> normalize_non_negative_integer(@default_months_ahead)
end
defp max_db_gib(opts, retention_config) do
opts
|> Keyword.get(:max_db_bytes, Keyword.get(retention_config, :max_db_bytes, :infinity))
|> normalize_limit()
end
defp max_months_to_keep(opts, retention_config) do
opts
|> Keyword.get(
:max_months_to_keep,
Keyword.get(retention_config, :max_months_to_keep, :infinity)
)
|> normalize_limit()
end
defp max_partitions_to_drop_per_run(opts, retention_config) do
opts
|> Keyword.get(
:max_partitions_to_drop_per_run,
Keyword.get(
retention_config,
:max_partitions_to_drop_per_run,
@default_max_partitions_to_drop_per_run
)
)
|> normalize_non_negative_integer(@default_max_partitions_to_drop_per_run)
end
defp today_fun(opts) do
case Keyword.get(opts, :today_fun, &Date.utc_today/0) do
function when is_function(function, 0) -> function
_other -> &Date.utc_today/0
end
end
defp normalize_limit(:infinity), do: :infinity
defp normalize_limit(value) when is_integer(value) and value > 0, do: value
defp normalize_limit(_value), do: :infinity
defp normalize_positive_integer(value, _default) when is_integer(value) and value > 0, do: value
defp normalize_positive_integer(_value, default), do: default
defp normalize_non_negative_integer(value, _default) when is_integer(value) and value >= 0,
do: value
defp normalize_non_negative_integer(_value, default), do: default
defp hours_to_ms(hours), do: hours * 60 * 60 * 1000
defp schedule_tick(interval_ms) do
Process.send_after(self(), :tick, interval_ms)
end
end

View File

@@ -11,13 +11,22 @@ defmodule Parrhesia.Tasks.Supervisor do
@impl true
def init(_init_arg) do
children =
if Application.get_env(:parrhesia, :enable_expiration_worker, true) do
[{Parrhesia.Tasks.ExpirationWorker, name: Parrhesia.Tasks.ExpirationWorker}]
else
[]
end
children = expiration_children() ++ partition_retention_children()
Supervisor.init(children, strategy: :one_for_one)
end
defp expiration_children do
if Application.get_env(:parrhesia, :enable_expiration_worker, true) do
[{Parrhesia.Tasks.ExpirationWorker, name: Parrhesia.Tasks.ExpirationWorker}]
else
[]
end
end
defp partition_retention_children do
[
{Parrhesia.Tasks.PartitionRetentionWorker, name: Parrhesia.Tasks.PartitionRetentionWorker}
]
end
end

View File

@@ -0,0 +1,52 @@
defmodule Parrhesia.TestSupport.PartitionRetentionStubArchiver do
@moduledoc false
use Agent
@spec start_link(keyword()) :: Agent.on_start()
def start_link(opts \\ []) do
name = Keyword.get(opts, :name, __MODULE__)
initial_state = %{
partitions: Keyword.get(opts, :partitions, []),
db_size_bytes: Keyword.get(opts, :db_size_bytes, 0),
test_pid: Keyword.get(opts, :test_pid)
}
Agent.start_link(fn -> initial_state end, name: name)
end
@spec ensure_monthly_partitions(keyword()) :: :ok
def ensure_monthly_partitions(opts \\ []) do
notify({:ensure_monthly_partitions, opts})
:ok
end
@spec list_monthly_partitions() :: [map()]
def list_monthly_partitions do
Agent.get(__MODULE__, & &1.partitions)
end
@spec database_size_bytes() :: {:ok, non_neg_integer()}
def database_size_bytes do
notify(:database_size_bytes)
{:ok, Agent.get(__MODULE__, & &1.db_size_bytes)}
end
@spec drop_partition(String.t()) :: :ok
def drop_partition(partition_name) when is_binary(partition_name) do
Agent.update(__MODULE__, fn state ->
%{state | partitions: Enum.reject(state.partitions, &(&1.name == partition_name))}
end)
notify({:drop_partition, partition_name})
:ok
end
defp notify(message) do
case Agent.get(__MODULE__, & &1.test_pid) do
pid when is_pid(pid) -> send(pid, message)
_other -> :ok
end
end
end