Align event_tags partition lifecycle with events
This commit is contained in:
@@ -230,7 +230,7 @@ CSV env vars use comma-separated values. Boolean env vars accept `1/0`, `true/fa
|
||||
| Atom key | ENV | Default | Notes |
|
||||
| --- | --- | --- | --- |
|
||||
| `:check_interval_hours` | `PARRHESIA_RETENTION_CHECK_INTERVAL_HOURS` | `24` | Partition maintenance + pruning cadence |
|
||||
| `:months_ahead` | `PARRHESIA_RETENTION_MONTHS_AHEAD` | `2` | Pre-create current month plus N future monthly partitions |
|
||||
| `:months_ahead` | `PARRHESIA_RETENTION_MONTHS_AHEAD` | `2` | Pre-create current month plus N future monthly partitions for `events` and `event_tags` |
|
||||
| `:max_db_bytes` | `PARRHESIA_RETENTION_MAX_DB_BYTES` | `:infinity` | Interpreted as GiB threshold; accepts integer or `infinity` |
|
||||
| `:max_months_to_keep` | `PARRHESIA_RETENTION_MAX_MONTHS_TO_KEEP` | `:infinity` | Keep at most N months (including current month); accepts integer or `infinity` |
|
||||
| `:max_partitions_to_drop_per_run` | `PARRHESIA_RETENTION_MAX_PARTITIONS_TO_DROP_PER_RUN` | `1` | Safety cap for each maintenance run |
|
||||
|
||||
@@ -9,6 +9,8 @@ defmodule Parrhesia.Storage.Archiver do
|
||||
|
||||
@identifier_pattern ~r/^[a-zA-Z_][a-zA-Z0-9_]*$/
|
||||
@monthly_partition_pattern ~r/^events_(\d{4})_(\d{2})$/
|
||||
@events_partition_prefix "events"
|
||||
@event_tags_partition_prefix "event_tags"
|
||||
@default_months_ahead 2
|
||||
|
||||
@type monthly_partition :: %{
|
||||
@@ -74,7 +76,7 @@ defmodule Parrhesia.Storage.Archiver do
|
||||
Enum.reduce_while(offsets, :ok, fn offset, :ok ->
|
||||
target_month = shift_month(reference_month, offset)
|
||||
|
||||
case create_monthly_partition(target_month) do
|
||||
case create_monthly_partitions(target_month) do
|
||||
:ok -> {:cont, :ok}
|
||||
{:error, reason} -> {:halt, {:error, reason}}
|
||||
end
|
||||
@@ -95,18 +97,16 @@ defmodule Parrhesia.Storage.Archiver do
|
||||
|
||||
@doc """
|
||||
Drops an event partition table by name.
|
||||
|
||||
For monthly `events_YYYY_MM` partitions, the matching `event_tags_YYYY_MM`
|
||||
partition is dropped first to keep partition lifecycle aligned.
|
||||
"""
|
||||
@spec drop_partition(String.t()) :: :ok | {:error, term()}
|
||||
def drop_partition(partition_name) when is_binary(partition_name) do
|
||||
if partition_name in ["events", "events_default"] do
|
||||
if protected_partition?(partition_name) do
|
||||
{:error, :protected_partition}
|
||||
else
|
||||
quoted_partition_name = quote_identifier!(partition_name)
|
||||
|
||||
case Repo.query("DROP TABLE IF EXISTS #{quoted_partition_name}") do
|
||||
{:ok, _result} -> :ok
|
||||
{:error, reason} -> {:error, reason}
|
||||
end
|
||||
drop_partition_tables(partition_name)
|
||||
end
|
||||
end
|
||||
|
||||
@@ -122,23 +122,56 @@ defmodule Parrhesia.Storage.Archiver do
|
||||
end
|
||||
|
||||
@doc """
|
||||
Returns the monthly partition name for a date.
|
||||
Returns the monthly `events` partition name for a date.
|
||||
"""
|
||||
@spec month_partition_name(Date.t()) :: String.t()
|
||||
def month_partition_name(%Date{} = date) do
|
||||
month = date.month |> Integer.to_string() |> String.pad_leading(2, "0")
|
||||
"events_#{date.year}_#{month}"
|
||||
monthly_partition_name(@events_partition_prefix, date)
|
||||
end
|
||||
|
||||
defp create_monthly_partition(%Date{} = month_date) do
|
||||
partition_name = month_partition_name(month_date)
|
||||
@doc """
|
||||
Returns the monthly `event_tags` partition name for a date.
|
||||
"""
|
||||
@spec event_tags_month_partition_name(Date.t()) :: String.t()
|
||||
def event_tags_month_partition_name(%Date{} = date) do
|
||||
monthly_partition_name(@event_tags_partition_prefix, date)
|
||||
end
|
||||
|
||||
defp monthly_partition_name(prefix, %Date{} = date) do
|
||||
month_suffix = date.month |> Integer.to_string() |> String.pad_leading(2, "0")
|
||||
"#{prefix}_#{date.year}_#{month_suffix}"
|
||||
end
|
||||
|
||||
defp create_monthly_partitions(%Date{} = month_date) do
|
||||
{start_unix, end_unix} = month_bounds_unix(month_date.year, month_date.month)
|
||||
|
||||
case create_monthly_partition(
|
||||
month_partition_name(month_date),
|
||||
@events_partition_prefix,
|
||||
start_unix,
|
||||
end_unix
|
||||
) do
|
||||
:ok ->
|
||||
create_monthly_partition(
|
||||
event_tags_month_partition_name(month_date),
|
||||
@event_tags_partition_prefix,
|
||||
start_unix,
|
||||
end_unix
|
||||
)
|
||||
|
||||
{:error, reason} ->
|
||||
{:error, reason}
|
||||
end
|
||||
end
|
||||
|
||||
defp create_monthly_partition(partition_name, parent_table_name, start_unix, end_unix) do
|
||||
quoted_partition_name = quote_identifier!(partition_name)
|
||||
quoted_parent_table_name = quote_identifier!(parent_table_name)
|
||||
|
||||
sql =
|
||||
"""
|
||||
CREATE TABLE IF NOT EXISTS #{quoted_partition_name}
|
||||
PARTITION OF "events"
|
||||
PARTITION OF #{quoted_parent_table_name}
|
||||
FOR VALUES FROM (#{start_unix}) TO (#{end_unix})
|
||||
"""
|
||||
|
||||
@@ -148,6 +181,76 @@ defmodule Parrhesia.Storage.Archiver do
|
||||
end
|
||||
end
|
||||
|
||||
defp drop_partition_tables(partition_name) do
|
||||
case parse_monthly_partition(partition_name) do
|
||||
nil -> drop_table(partition_name)
|
||||
monthly_partition -> drop_monthly_partition(partition_name, monthly_partition)
|
||||
end
|
||||
end
|
||||
|
||||
defp drop_monthly_partition(partition_name, %{year: year, month: month}) do
|
||||
month_date = Date.new!(year, month, 1)
|
||||
tags_partition_name = monthly_partition_name(@event_tags_partition_prefix, month_date)
|
||||
|
||||
with :ok <- maybe_detach_events_partition(partition_name),
|
||||
:ok <- drop_table(tags_partition_name) do
|
||||
drop_table(partition_name)
|
||||
end
|
||||
end
|
||||
|
||||
defp maybe_detach_events_partition(partition_name) do
|
||||
if attached_partition?(partition_name, @events_partition_prefix) do
|
||||
quoted_parent_table_name = quote_identifier!(@events_partition_prefix)
|
||||
quoted_partition_name = quote_identifier!(partition_name)
|
||||
|
||||
case Repo.query(
|
||||
"ALTER TABLE #{quoted_parent_table_name} DETACH PARTITION #{quoted_partition_name}"
|
||||
) do
|
||||
{:ok, _result} -> :ok
|
||||
{:error, reason} -> {:error, reason}
|
||||
end
|
||||
else
|
||||
:ok
|
||||
end
|
||||
end
|
||||
|
||||
defp attached_partition?(partition_name, parent_table_name) do
|
||||
query =
|
||||
"""
|
||||
SELECT 1
|
||||
FROM pg_inherits AS inheritance
|
||||
JOIN pg_class AS child ON child.oid = inheritance.inhrelid
|
||||
JOIN pg_namespace AS child_ns ON child_ns.oid = child.relnamespace
|
||||
JOIN pg_class AS parent ON parent.oid = inheritance.inhparent
|
||||
JOIN pg_namespace AS parent_ns ON parent_ns.oid = parent.relnamespace
|
||||
WHERE child_ns.nspname = 'public'
|
||||
AND parent_ns.nspname = 'public'
|
||||
AND child.relname = $1
|
||||
AND parent.relname = $2
|
||||
LIMIT 1
|
||||
"""
|
||||
|
||||
case Repo.query(query, [partition_name, parent_table_name]) do
|
||||
{:ok, %{rows: [[1]]}} -> true
|
||||
{:ok, %{rows: []}} -> false
|
||||
{:ok, _result} -> false
|
||||
{:error, _reason} -> false
|
||||
end
|
||||
end
|
||||
|
||||
defp drop_table(table_name) do
|
||||
quoted_table_name = quote_identifier!(table_name)
|
||||
|
||||
case Repo.query("DROP TABLE IF EXISTS #{quoted_table_name}") do
|
||||
{:ok, _result} -> :ok
|
||||
{:error, reason} -> {:error, reason}
|
||||
end
|
||||
end
|
||||
|
||||
defp protected_partition?(partition_name) do
|
||||
partition_name in ["events", "events_default", "event_tags", "event_tags_default"]
|
||||
end
|
||||
|
||||
defp parse_monthly_partition(partition_name) do
|
||||
case Regex.run(@monthly_partition_pattern, partition_name, capture: :all_but_first) do
|
||||
[year_text, month_text] ->
|
||||
|
||||
@@ -30,7 +30,10 @@ defmodule Parrhesia.Repo.Migrations.CreateRelayStorage do
|
||||
create(index(:events, [:expires_at], where: "expires_at IS NOT NULL"))
|
||||
create(index(:events, [:deleted_at], where: "deleted_at IS NOT NULL"))
|
||||
|
||||
create table(:event_tags, primary_key: false) do
|
||||
create table(:event_tags,
|
||||
primary_key: false,
|
||||
options: "PARTITION BY RANGE (event_created_at)"
|
||||
) do
|
||||
add(:event_created_at, :bigint, null: false)
|
||||
add(:event_id, :binary, null: false)
|
||||
add(:name, :string, null: false)
|
||||
@@ -39,6 +42,8 @@ defmodule Parrhesia.Repo.Migrations.CreateRelayStorage do
|
||||
timestamps(updated_at: false, type: :utc_datetime_usec)
|
||||
end
|
||||
|
||||
execute("CREATE TABLE event_tags_default PARTITION OF event_tags DEFAULT")
|
||||
|
||||
execute("""
|
||||
ALTER TABLE event_tags
|
||||
ADD CONSTRAINT event_tags_event_fk
|
||||
@@ -149,6 +154,8 @@ defmodule Parrhesia.Repo.Migrations.CreateRelayStorage do
|
||||
drop(table(:banned_pubkeys))
|
||||
drop(table(:addressable_event_state))
|
||||
drop(table(:replaceable_event_state))
|
||||
|
||||
execute("DROP TABLE event_tags_default")
|
||||
drop(table(:event_tags))
|
||||
|
||||
execute("DROP TABLE events_default")
|
||||
|
||||
@@ -64,7 +64,8 @@ defmodule Parrhesia.Storage.Adapters.Postgres.QueryPlanRegressionTest do
|
||||
)
|
||||
|
||||
plan = Enum.map_join(explain.rows, "\n", &hd/1)
|
||||
assert plan =~ "event_tags_h_value_created_at_idx"
|
||||
assert plan =~ "Index Scan using event_tags_"
|
||||
refute plan =~ "Filter: ((name)::text = 'h'::text)"
|
||||
end
|
||||
|
||||
test "#i-heavy query plan uses dedicated event_tags i index" do
|
||||
@@ -111,7 +112,8 @@ defmodule Parrhesia.Storage.Adapters.Postgres.QueryPlanRegressionTest do
|
||||
)
|
||||
|
||||
plan = Enum.map_join(explain.rows, "\n", &hd/1)
|
||||
assert plan =~ "event_tags_i_value_created_at_idx"
|
||||
assert plan =~ "Index Scan using event_tags_"
|
||||
refute plan =~ "Filter: ((name)::text = 'i'::text)"
|
||||
end
|
||||
|
||||
defp persist_event(overrides) do
|
||||
|
||||
@@ -15,7 +15,7 @@ defmodule Parrhesia.Storage.ArchiverTest do
|
||||
assert is_list(partitions)
|
||||
end
|
||||
|
||||
test "ensure_monthly_partitions creates named monthly partitions" do
|
||||
test "ensure_monthly_partitions creates aligned monthly partitions for events and event_tags" do
|
||||
assert :ok =
|
||||
Archiver.ensure_monthly_partitions(reference_date: ~D[2026-06-14], months_ahead: 1)
|
||||
|
||||
@@ -25,6 +25,9 @@ defmodule Parrhesia.Storage.ArchiverTest do
|
||||
|
||||
assert "events_2026_06" in monthly_partition_names
|
||||
assert "events_2026_07" in monthly_partition_names
|
||||
|
||||
assert table_exists?("event_tags_2026_06")
|
||||
assert table_exists?("event_tags_2026_07")
|
||||
end
|
||||
|
||||
test "archive_sql builds insert-select statement" do
|
||||
@@ -35,6 +38,21 @@ defmodule Parrhesia.Storage.ArchiverTest do
|
||||
test "drop_partition returns an error for protected partitions" do
|
||||
assert {:error, :protected_partition} = Archiver.drop_partition("events_default")
|
||||
assert {:error, :protected_partition} = Archiver.drop_partition("events")
|
||||
assert {:error, :protected_partition} = Archiver.drop_partition("event_tags_default")
|
||||
assert {:error, :protected_partition} = Archiver.drop_partition("event_tags")
|
||||
end
|
||||
|
||||
test "drop_partition removes aligned event_tags partition for monthly event partition" do
|
||||
assert :ok =
|
||||
Archiver.ensure_monthly_partitions(reference_date: ~D[2026-08-14], months_ahead: 0)
|
||||
|
||||
assert table_exists?("events_2026_08")
|
||||
assert table_exists?("event_tags_2026_08")
|
||||
|
||||
assert :ok = Archiver.drop_partition("events_2026_08")
|
||||
|
||||
refute table_exists?("events_2026_08")
|
||||
refute table_exists?("event_tags_2026_08")
|
||||
end
|
||||
|
||||
test "database_size_bytes returns the current database size" do
|
||||
@@ -48,4 +66,12 @@ defmodule Parrhesia.Storage.ArchiverTest do
|
||||
Archiver.archive_sql("events_default; DROP TABLE events", "events_archive")
|
||||
end
|
||||
end
|
||||
|
||||
defp table_exists?(table_name) when is_binary(table_name) do
|
||||
case Repo.query("SELECT to_regclass($1)", ["public." <> table_name]) do
|
||||
{:ok, %{rows: [[nil]]}} -> false
|
||||
{:ok, %{rows: [[_relation_name]]}} -> true
|
||||
_other -> false
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
Reference in New Issue
Block a user