You've already forked tribes-plugin-sender
forked from tribes/tribes-plugin-template
f8e2bfaada
Move Sender modules under TribeOne.TribesPlugin.Sender and replace the Aether-specific chat integration with the public chat@1 surface contract.
140 lines
4.0 KiB
Elixir
140 lines
4.0 KiB
Elixir
defmodule TribeOne.TribesPlugin.Sender.Management do
|
|
@moduledoc false
|
|
|
|
require Ash.Query
|
|
|
|
alias TribeOne.TribesPlugin.Sender.Streaming
|
|
alias TribeOne.TribesPlugin.Sender.Streaming.{Stream, StreamGeneration}
|
|
|
|
@default_stream_slug "default"
|
|
|
|
def ash_opts(context \\ nil) do
|
|
[
|
|
authorize?: false,
|
|
context: %{
|
|
private: %{
|
|
system?: true,
|
|
system_purpose: :sender_management,
|
|
plugin_management_context: context
|
|
}
|
|
}
|
|
]
|
|
end
|
|
|
|
def default_stream_slug, do: @default_stream_slug
|
|
|
|
def get_default_stream(opts \\ []) do
|
|
slug = Keyword.get(opts, :slug, @default_stream_slug)
|
|
ash_opts = Keyword.get(opts, :ash_opts, ash_opts())
|
|
|
|
Stream
|
|
|> Ash.Query.filter(slug == ^slug)
|
|
|> Ash.Query.limit(1)
|
|
|> Ash.read_one(ash_opts)
|
|
end
|
|
|
|
def ensure_default_stream(attrs \\ %{}, opts \\ []) when is_map(attrs) do
|
|
slug = Map.get(attrs, "slug") || Map.get(attrs, :slug) || @default_stream_slug
|
|
ash_opts = Keyword.get(opts, :ash_opts, ash_opts())
|
|
|
|
case get_default_stream(slug: slug, ash_opts: ash_opts) do
|
|
{:ok, nil} ->
|
|
attrs
|
|
|> Map.put_new("slug", slug)
|
|
|> Map.put_new("title", "Live Stream")
|
|
|> normalize_stream_attrs()
|
|
|> Streaming.create_stream(ash_opts)
|
|
|
|
{:ok, stream} ->
|
|
{:ok, stream}
|
|
|
|
{:error, _reason} = error ->
|
|
error
|
|
end
|
|
end
|
|
|
|
def active_generation(opts \\ []) do
|
|
ash_opts = Keyword.get(opts, :ash_opts, ash_opts())
|
|
|
|
StreamGeneration
|
|
|> Ash.Query.filter(status in [:pending, :listening, :live, :degraded, :ending])
|
|
|> Ash.Query.sort(started_at: :desc, inserted_at: :desc)
|
|
|> Ash.Query.limit(1)
|
|
|> Ash.read_one(ash_opts)
|
|
end
|
|
|
|
def stream_generation_payload(%StreamGeneration{} = generation) do
|
|
%{
|
|
"id" => generation.id,
|
|
"stream_id" => generation.stream_id,
|
|
"status" => Atom.to_string(generation.status),
|
|
"started_at" => format_datetime(generation.started_at),
|
|
"ended_at" => format_datetime(generation.ended_at),
|
|
"active_ingest_endpoint_id" => generation.active_ingest_endpoint_id,
|
|
"primary_origin_endpoint_id" => generation.primary_origin_endpoint_id,
|
|
"delivery_mode" => Atom.to_string(generation.delivery_mode),
|
|
"generation_token" => generation.generation_token,
|
|
"playlist_namespace" => generation.playlist_namespace,
|
|
"error" => generation.error
|
|
}
|
|
end
|
|
|
|
def stream_payload(%Stream{} = stream) do
|
|
%{
|
|
"id" => stream.id,
|
|
"slug" => stream.slug,
|
|
"title" => stream.title,
|
|
"description" => stream.description,
|
|
"visibility" => Atom.to_string(stream.visibility),
|
|
"latency_mode" => Atom.to_string(stream.latency_mode),
|
|
"recording_policy" => stream.recording_policy,
|
|
"default_rendition_policy" => stream.default_rendition_policy
|
|
}
|
|
end
|
|
|
|
def normalize_stream_attrs(attrs) when is_map(attrs) do
|
|
attrs
|
|
|> take_known([
|
|
"slug",
|
|
"title",
|
|
"description",
|
|
"visibility",
|
|
"owner_id",
|
|
"latency_mode",
|
|
"recording_policy",
|
|
"default_rendition_policy"
|
|
])
|
|
|> atomize_enums(["visibility", "latency_mode"])
|
|
end
|
|
|
|
def take_known(attrs, keys) do
|
|
Enum.reduce(keys, %{}, fn key, acc ->
|
|
atom_key = String.to_existing_atom(key)
|
|
|
|
cond do
|
|
Map.has_key?(attrs, key) -> Map.put(acc, atom_key, Map.fetch!(attrs, key))
|
|
Map.has_key?(attrs, atom_key) -> Map.put(acc, atom_key, Map.fetch!(attrs, atom_key))
|
|
true -> acc
|
|
end
|
|
end)
|
|
rescue
|
|
ArgumentError -> Map.take(attrs, keys)
|
|
end
|
|
|
|
def atomize_enums(attrs, keys) do
|
|
Enum.reduce(keys, attrs, fn key, acc ->
|
|
atom_key = String.to_existing_atom(key)
|
|
|
|
case Map.get(acc, atom_key) do
|
|
value when is_binary(value) -> Map.put(acc, atom_key, String.to_existing_atom(value))
|
|
_other -> acc
|
|
end
|
|
end)
|
|
rescue
|
|
ArgumentError -> attrs
|
|
end
|
|
|
|
def format_datetime(nil), do: nil
|
|
def format_datetime(%DateTime{} = datetime), do: DateTime.to_iso8601(datetime)
|
|
end
|