Rename archiver to partitions and drop archive SQL helper
This commit is contained in:
@@ -1,6 +1,6 @@
|
|||||||
defmodule Parrhesia.Storage.Archiver do
|
defmodule Parrhesia.Storage.Partitions do
|
||||||
@moduledoc """
|
@moduledoc """
|
||||||
Partition-aware archival helpers for Postgres event partitions.
|
Partition lifecycle helpers for Postgres `events` and `event_tags` monthly partitions.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import Ecto.Query
|
import Ecto.Query
|
||||||
@@ -110,17 +110,6 @@ defmodule Parrhesia.Storage.Archiver do
|
|||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
@doc """
|
|
||||||
Generates an archive SQL statement for the given partition.
|
|
||||||
"""
|
|
||||||
@spec archive_sql(String.t(), String.t()) :: String.t()
|
|
||||||
def archive_sql(partition_name, archive_table_name) do
|
|
||||||
quoted_archive_table_name = quote_identifier!(archive_table_name)
|
|
||||||
quoted_partition_name = quote_identifier!(partition_name)
|
|
||||||
|
|
||||||
"INSERT INTO #{quoted_archive_table_name} SELECT * FROM #{quoted_partition_name};"
|
|
||||||
end
|
|
||||||
|
|
||||||
@doc """
|
@doc """
|
||||||
Returns the monthly `events` partition name for a date.
|
Returns the monthly `events` partition name for a date.
|
||||||
"""
|
"""
|
||||||
@@ -5,7 +5,7 @@ defmodule Parrhesia.Tasks.PartitionRetentionWorker do
|
|||||||
|
|
||||||
use GenServer
|
use GenServer
|
||||||
|
|
||||||
alias Parrhesia.Storage.Archiver
|
alias Parrhesia.Storage.Partitions
|
||||||
alias Parrhesia.Telemetry
|
alias Parrhesia.Telemetry
|
||||||
|
|
||||||
@default_check_interval_hours 24
|
@default_check_interval_hours 24
|
||||||
@@ -13,7 +13,7 @@ defmodule Parrhesia.Tasks.PartitionRetentionWorker do
|
|||||||
@default_max_partitions_to_drop_per_run 1
|
@default_max_partitions_to_drop_per_run 1
|
||||||
@bytes_per_gib 1_073_741_824
|
@bytes_per_gib 1_073_741_824
|
||||||
|
|
||||||
@type monthly_partition :: Archiver.monthly_partition()
|
@type monthly_partition :: Partitions.monthly_partition()
|
||||||
|
|
||||||
@spec start_link(keyword()) :: GenServer.on_start()
|
@spec start_link(keyword()) :: GenServer.on_start()
|
||||||
def start_link(opts \\ []) do
|
def start_link(opts \\ []) do
|
||||||
@@ -26,7 +26,7 @@ defmodule Parrhesia.Tasks.PartitionRetentionWorker do
|
|||||||
retention_config = Application.get_env(:parrhesia, :retention, [])
|
retention_config = Application.get_env(:parrhesia, :retention, [])
|
||||||
|
|
||||||
state = %{
|
state = %{
|
||||||
archiver: Keyword.get(opts, :archiver, Archiver),
|
partition_ops: Keyword.get(opts, :partition_ops, Partitions),
|
||||||
interval_ms: interval_ms(opts, retention_config),
|
interval_ms: interval_ms(opts, retention_config),
|
||||||
months_ahead: months_ahead(opts, retention_config),
|
months_ahead: months_ahead(opts, retention_config),
|
||||||
max_db_gib: max_db_gib(opts, retention_config),
|
max_db_gib: max_db_gib(opts, retention_config),
|
||||||
@@ -65,7 +65,7 @@ defmodule Parrhesia.Tasks.PartitionRetentionWorker do
|
|||||||
def handle_info(_message, state), do: {:noreply, state}
|
def handle_info(_message, state), do: {:noreply, state}
|
||||||
|
|
||||||
defp run_maintenance(state) do
|
defp run_maintenance(state) do
|
||||||
case state.archiver.ensure_monthly_partitions(months_ahead: state.months_ahead) do
|
case state.partition_ops.ensure_monthly_partitions(months_ahead: state.months_ahead) do
|
||||||
:ok -> maybe_drop_oldest_partitions(state)
|
:ok -> maybe_drop_oldest_partitions(state)
|
||||||
{:error, reason} -> {:error, reason}
|
{:error, reason} -> {:error, reason}
|
||||||
end
|
end
|
||||||
@@ -92,14 +92,14 @@ defmodule Parrhesia.Tasks.PartitionRetentionWorker do
|
|||||||
defp apply_partition_drop(_state, nil, dropped_count), do: {:halt, {:ok, dropped_count}}
|
defp apply_partition_drop(_state, nil, dropped_count), do: {:halt, {:ok, dropped_count}}
|
||||||
|
|
||||||
defp apply_partition_drop(state, partition, dropped_count) do
|
defp apply_partition_drop(state, partition, dropped_count) do
|
||||||
case state.archiver.drop_partition(partition.name) do
|
case state.partition_ops.drop_partition(partition.name) do
|
||||||
:ok -> {:cont, {:ok, dropped_count + 1}}
|
:ok -> {:cont, {:ok, dropped_count + 1}}
|
||||||
{:error, reason} -> {:halt, {:error, reason}}
|
{:error, reason} -> {:halt, {:error, reason}}
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
defp next_partition_to_drop(state) do
|
defp next_partition_to_drop(state) do
|
||||||
partitions = state.archiver.list_monthly_partitions()
|
partitions = state.partition_ops.list_monthly_partitions()
|
||||||
current_month_index = current_month_index(state.today_fun)
|
current_month_index = current_month_index(state.today_fun)
|
||||||
|
|
||||||
month_limit_candidate =
|
month_limit_candidate =
|
||||||
@@ -114,7 +114,7 @@ defmodule Parrhesia.Tasks.PartitionRetentionWorker do
|
|||||||
partitions,
|
partitions,
|
||||||
state.max_db_gib,
|
state.max_db_gib,
|
||||||
current_month_index,
|
current_month_index,
|
||||||
state.archiver
|
state.partition_ops
|
||||||
) do
|
) do
|
||||||
{:ok, pick_oldest_partition(month_limit_candidate, size_limit_candidate)}
|
{:ok, pick_oldest_partition(month_limit_candidate, size_limit_candidate)}
|
||||||
end
|
end
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
defmodule Parrhesia.TestSupport.PartitionRetentionStubArchiver do
|
defmodule Parrhesia.TestSupport.PartitionRetentionStubPartitions do
|
||||||
@moduledoc false
|
@moduledoc false
|
||||||
|
|
||||||
use Agent
|
use Agent
|
||||||
@@ -1,9 +1,9 @@
|
|||||||
defmodule Parrhesia.Storage.ArchiverTest do
|
defmodule Parrhesia.Storage.PartitionsTest do
|
||||||
use ExUnit.Case, async: false
|
use ExUnit.Case, async: false
|
||||||
|
|
||||||
alias Ecto.Adapters.SQL.Sandbox
|
alias Ecto.Adapters.SQL.Sandbox
|
||||||
alias Parrhesia.Repo
|
alias Parrhesia.Repo
|
||||||
alias Parrhesia.Storage.Archiver
|
alias Parrhesia.Storage.Partitions
|
||||||
|
|
||||||
setup do
|
setup do
|
||||||
:ok = Sandbox.checkout(Repo)
|
:ok = Sandbox.checkout(Repo)
|
||||||
@@ -11,16 +11,16 @@ defmodule Parrhesia.Storage.ArchiverTest do
|
|||||||
end
|
end
|
||||||
|
|
||||||
test "list_partitions returns partition tables" do
|
test "list_partitions returns partition tables" do
|
||||||
partitions = Archiver.list_partitions()
|
partitions = Partitions.list_partitions()
|
||||||
assert is_list(partitions)
|
assert is_list(partitions)
|
||||||
end
|
end
|
||||||
|
|
||||||
test "ensure_monthly_partitions creates aligned monthly partitions for events and event_tags" do
|
test "ensure_monthly_partitions creates aligned monthly partitions for events and event_tags" do
|
||||||
assert :ok =
|
assert :ok =
|
||||||
Archiver.ensure_monthly_partitions(reference_date: ~D[2026-06-14], months_ahead: 1)
|
Partitions.ensure_monthly_partitions(reference_date: ~D[2026-06-14], months_ahead: 1)
|
||||||
|
|
||||||
monthly_partition_names =
|
monthly_partition_names =
|
||||||
Archiver.list_monthly_partitions()
|
Partitions.list_monthly_partitions()
|
||||||
|> Enum.map(& &1.name)
|
|> Enum.map(& &1.name)
|
||||||
|
|
||||||
assert "events_2026_06" in monthly_partition_names
|
assert "events_2026_06" in monthly_partition_names
|
||||||
@@ -30,43 +30,32 @@ defmodule Parrhesia.Storage.ArchiverTest do
|
|||||||
assert table_exists?("event_tags_2026_07")
|
assert table_exists?("event_tags_2026_07")
|
||||||
end
|
end
|
||||||
|
|
||||||
test "archive_sql builds insert-select statement" do
|
|
||||||
assert Archiver.archive_sql("events_2026_03", "events_archive") ==
|
|
||||||
~s(INSERT INTO "events_archive" SELECT * FROM "events_2026_03";)
|
|
||||||
end
|
|
||||||
|
|
||||||
test "drop_partition returns an error for protected partitions" do
|
test "drop_partition returns an error for protected partitions" do
|
||||||
assert {:error, :protected_partition} = Archiver.drop_partition("events_default")
|
assert {:error, :protected_partition} = Partitions.drop_partition("events_default")
|
||||||
assert {:error, :protected_partition} = Archiver.drop_partition("events")
|
assert {:error, :protected_partition} = Partitions.drop_partition("events")
|
||||||
assert {:error, :protected_partition} = Archiver.drop_partition("event_tags_default")
|
assert {:error, :protected_partition} = Partitions.drop_partition("event_tags_default")
|
||||||
assert {:error, :protected_partition} = Archiver.drop_partition("event_tags")
|
assert {:error, :protected_partition} = Partitions.drop_partition("event_tags")
|
||||||
end
|
end
|
||||||
|
|
||||||
test "drop_partition removes aligned event_tags partition for monthly event partition" do
|
test "drop_partition removes aligned event_tags partition for monthly event partition" do
|
||||||
assert :ok =
|
assert :ok =
|
||||||
Archiver.ensure_monthly_partitions(reference_date: ~D[2026-08-14], months_ahead: 0)
|
Partitions.ensure_monthly_partitions(reference_date: ~D[2026-08-14], months_ahead: 0)
|
||||||
|
|
||||||
assert table_exists?("events_2026_08")
|
assert table_exists?("events_2026_08")
|
||||||
assert table_exists?("event_tags_2026_08")
|
assert table_exists?("event_tags_2026_08")
|
||||||
|
|
||||||
assert :ok = Archiver.drop_partition("events_2026_08")
|
assert :ok = Partitions.drop_partition("events_2026_08")
|
||||||
|
|
||||||
refute table_exists?("events_2026_08")
|
refute table_exists?("events_2026_08")
|
||||||
refute table_exists?("event_tags_2026_08")
|
refute table_exists?("event_tags_2026_08")
|
||||||
end
|
end
|
||||||
|
|
||||||
test "database_size_bytes returns the current database size" do
|
test "database_size_bytes returns the current database size" do
|
||||||
assert {:ok, size} = Archiver.database_size_bytes()
|
assert {:ok, size} = Partitions.database_size_bytes()
|
||||||
assert is_integer(size)
|
assert is_integer(size)
|
||||||
assert size >= 0
|
assert size >= 0
|
||||||
end
|
end
|
||||||
|
|
||||||
test "archive_sql rejects invalid SQL identifiers" do
|
|
||||||
assert_raise ArgumentError, fn ->
|
|
||||||
Archiver.archive_sql("events_default; DROP TABLE events", "events_archive")
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
defp table_exists?(table_name) when is_binary(table_name) do
|
defp table_exists?(table_name) when is_binary(table_name) do
|
||||||
case Repo.query("SELECT to_regclass($1)", ["public." <> table_name]) do
|
case Repo.query("SELECT to_regclass($1)", ["public." <> table_name]) do
|
||||||
{:ok, %{rows: [[nil]]}} -> false
|
{:ok, %{rows: [[nil]]}} -> false
|
||||||
@@ -2,13 +2,13 @@ defmodule Parrhesia.Tasks.PartitionRetentionWorkerTest do
|
|||||||
use ExUnit.Case, async: false
|
use ExUnit.Case, async: false
|
||||||
|
|
||||||
alias Parrhesia.Tasks.PartitionRetentionWorker
|
alias Parrhesia.Tasks.PartitionRetentionWorker
|
||||||
alias Parrhesia.TestSupport.PartitionRetentionStubArchiver
|
alias Parrhesia.TestSupport.PartitionRetentionStubPartitions
|
||||||
|
|
||||||
@bytes_per_gib 1_073_741_824
|
@bytes_per_gib 1_073_741_824
|
||||||
|
|
||||||
test "drops oldest partition when max_months_to_keep is exceeded" do
|
test "drops oldest partition when max_months_to_keep is exceeded" do
|
||||||
start_supervised!(
|
start_supervised!(
|
||||||
{PartitionRetentionStubArchiver,
|
{PartitionRetentionStubPartitions,
|
||||||
partitions: [
|
partitions: [
|
||||||
partition(2026, 1),
|
partition(2026, 1),
|
||||||
partition(2026, 2),
|
partition(2026, 2),
|
||||||
@@ -24,7 +24,7 @@ defmodule Parrhesia.Tasks.PartitionRetentionWorkerTest do
|
|||||||
start_supervised!(
|
start_supervised!(
|
||||||
{PartitionRetentionWorker,
|
{PartitionRetentionWorker,
|
||||||
name: nil,
|
name: nil,
|
||||||
archiver: PartitionRetentionStubArchiver,
|
partition_ops: PartitionRetentionStubPartitions,
|
||||||
interval_ms: :timer.hours(24),
|
interval_ms: :timer.hours(24),
|
||||||
months_ahead: 0,
|
months_ahead: 0,
|
||||||
max_db_bytes: :infinity,
|
max_db_bytes: :infinity,
|
||||||
@@ -42,7 +42,7 @@ defmodule Parrhesia.Tasks.PartitionRetentionWorkerTest do
|
|||||||
|
|
||||||
test "drops oldest completed partition when size exceeds max_db_bytes" do
|
test "drops oldest completed partition when size exceeds max_db_bytes" do
|
||||||
start_supervised!(
|
start_supervised!(
|
||||||
{PartitionRetentionStubArchiver,
|
{PartitionRetentionStubPartitions,
|
||||||
partitions: [partition(2026, 3), partition(2026, 4), partition(2026, 5)],
|
partitions: [partition(2026, 3), partition(2026, 4), partition(2026, 5)],
|
||||||
db_size_bytes: 12 * @bytes_per_gib,
|
db_size_bytes: 12 * @bytes_per_gib,
|
||||||
test_pid: self()}
|
test_pid: self()}
|
||||||
@@ -52,7 +52,7 @@ defmodule Parrhesia.Tasks.PartitionRetentionWorkerTest do
|
|||||||
start_supervised!(
|
start_supervised!(
|
||||||
{PartitionRetentionWorker,
|
{PartitionRetentionWorker,
|
||||||
name: nil,
|
name: nil,
|
||||||
archiver: PartitionRetentionStubArchiver,
|
partition_ops: PartitionRetentionStubPartitions,
|
||||||
interval_ms: :timer.hours(24),
|
interval_ms: :timer.hours(24),
|
||||||
months_ahead: 0,
|
months_ahead: 0,
|
||||||
max_db_bytes: 10,
|
max_db_bytes: 10,
|
||||||
@@ -69,7 +69,7 @@ defmodule Parrhesia.Tasks.PartitionRetentionWorkerTest do
|
|||||||
|
|
||||||
test "does not drop partitions when both limits are infinity" do
|
test "does not drop partitions when both limits are infinity" do
|
||||||
start_supervised!(
|
start_supervised!(
|
||||||
{PartitionRetentionStubArchiver,
|
{PartitionRetentionStubPartitions,
|
||||||
partitions: [partition(2026, 1), partition(2026, 2), partition(2026, 3)],
|
partitions: [partition(2026, 1), partition(2026, 2), partition(2026, 3)],
|
||||||
db_size_bytes: 50 * @bytes_per_gib,
|
db_size_bytes: 50 * @bytes_per_gib,
|
||||||
test_pid: self()}
|
test_pid: self()}
|
||||||
@@ -79,7 +79,7 @@ defmodule Parrhesia.Tasks.PartitionRetentionWorkerTest do
|
|||||||
start_supervised!(
|
start_supervised!(
|
||||||
{PartitionRetentionWorker,
|
{PartitionRetentionWorker,
|
||||||
name: nil,
|
name: nil,
|
||||||
archiver: PartitionRetentionStubArchiver,
|
partition_ops: PartitionRetentionStubPartitions,
|
||||||
interval_ms: :timer.hours(24),
|
interval_ms: :timer.hours(24),
|
||||||
months_ahead: 0,
|
months_ahead: 0,
|
||||||
max_db_bytes: :infinity,
|
max_db_bytes: :infinity,
|
||||||
|
|||||||
Reference in New Issue
Block a user