Files
self f8e2bfaada refactor: use chat capability surfaces
Move Sender modules under TribeOne.TribesPlugin.Sender and replace the Aether-specific chat integration with the public chat@1 surface contract.
2026-05-26 01:13:38 +02:00

140 lines
4.0 KiB
Elixir

defmodule TribeOne.TribesPlugin.Sender.Management do
@moduledoc false
require Ash.Query
alias TribeOne.TribesPlugin.Sender.Streaming
alias TribeOne.TribesPlugin.Sender.Streaming.{Stream, StreamGeneration}
@default_stream_slug "default"
def ash_opts(context \\ nil) do
[
authorize?: false,
context: %{
private: %{
system?: true,
system_purpose: :sender_management,
plugin_management_context: context
}
}
]
end
def default_stream_slug, do: @default_stream_slug
def get_default_stream(opts \\ []) do
slug = Keyword.get(opts, :slug, @default_stream_slug)
ash_opts = Keyword.get(opts, :ash_opts, ash_opts())
Stream
|> Ash.Query.filter(slug == ^slug)
|> Ash.Query.limit(1)
|> Ash.read_one(ash_opts)
end
def ensure_default_stream(attrs \\ %{}, opts \\ []) when is_map(attrs) do
slug = Map.get(attrs, "slug") || Map.get(attrs, :slug) || @default_stream_slug
ash_opts = Keyword.get(opts, :ash_opts, ash_opts())
case get_default_stream(slug: slug, ash_opts: ash_opts) do
{:ok, nil} ->
attrs
|> Map.put_new("slug", slug)
|> Map.put_new("title", "Live Stream")
|> normalize_stream_attrs()
|> Streaming.create_stream(ash_opts)
{:ok, stream} ->
{:ok, stream}
{:error, _reason} = error ->
error
end
end
def active_generation(opts \\ []) do
ash_opts = Keyword.get(opts, :ash_opts, ash_opts())
StreamGeneration
|> Ash.Query.filter(status in [:pending, :listening, :live, :degraded, :ending])
|> Ash.Query.sort(started_at: :desc, inserted_at: :desc)
|> Ash.Query.limit(1)
|> Ash.read_one(ash_opts)
end
def stream_generation_payload(%StreamGeneration{} = generation) do
%{
"id" => generation.id,
"stream_id" => generation.stream_id,
"status" => Atom.to_string(generation.status),
"started_at" => format_datetime(generation.started_at),
"ended_at" => format_datetime(generation.ended_at),
"active_ingest_endpoint_id" => generation.active_ingest_endpoint_id,
"primary_origin_endpoint_id" => generation.primary_origin_endpoint_id,
"delivery_mode" => Atom.to_string(generation.delivery_mode),
"generation_token" => generation.generation_token,
"playlist_namespace" => generation.playlist_namespace,
"error" => generation.error
}
end
def stream_payload(%Stream{} = stream) do
%{
"id" => stream.id,
"slug" => stream.slug,
"title" => stream.title,
"description" => stream.description,
"visibility" => Atom.to_string(stream.visibility),
"latency_mode" => Atom.to_string(stream.latency_mode),
"recording_policy" => stream.recording_policy,
"default_rendition_policy" => stream.default_rendition_policy
}
end
def normalize_stream_attrs(attrs) when is_map(attrs) do
attrs
|> take_known([
"slug",
"title",
"description",
"visibility",
"owner_id",
"latency_mode",
"recording_policy",
"default_rendition_policy"
])
|> atomize_enums(["visibility", "latency_mode"])
end
def take_known(attrs, keys) do
Enum.reduce(keys, %{}, fn key, acc ->
atom_key = String.to_existing_atom(key)
cond do
Map.has_key?(attrs, key) -> Map.put(acc, atom_key, Map.fetch!(attrs, key))
Map.has_key?(attrs, atom_key) -> Map.put(acc, atom_key, Map.fetch!(attrs, atom_key))
true -> acc
end
end)
rescue
ArgumentError -> Map.take(attrs, keys)
end
def atomize_enums(attrs, keys) do
Enum.reduce(keys, attrs, fn key, acc ->
atom_key = String.to_existing_atom(key)
case Map.get(acc, atom_key) do
value when is_binary(value) -> Map.put(acc, atom_key, String.to_existing_atom(value))
_other -> acc
end
end)
rescue
ArgumentError -> attrs
end
def format_datetime(nil), do: nil
def format_datetime(%DateTime{} = datetime), do: DateTime.to_iso8601(datetime)
end