Files
self 3e288a7e95 feat: prefix Sender plugin slug
Rename Sender plugin identity to tribe-one-sender across manifest, runtime API routes, metrics, e2e fixtures, and tests.
2026-06-17 22:33:33 +02:00

376 lines
12 KiB
Elixir

defmodule TribeOne.TribesPlugin.Sender.LocalStreams do
@moduledoc """
Local Sender stream control helpers.
This module is intended for IEx and local operation when an external
orchestrator such as Legion is not managing TribeOne.TribesPlugin.Sender. It composes the same
management handlers that remote orchestration uses, and can optionally launch
a local ffmpeg test-video publisher with MuonTrap.
"""
alias TribeOne.TribesPlugin.Sender.Management.{StreamLifecycle, Topology}
alias Tribes.Plugin.ManagementContext
@default_input_url "rtmp://0.0.0.0:1935/live/source"
@default_push_url "rtmp://127.0.0.1:1935/live/source"
@default_rtmp_ingest_url "rtmp://localhost:1935/live/source"
@default_hls_base_url "http://localhost:4000/sender/hls"
@default_display_name "Local Sender origin"
@ffmpeg_executable_env "SENDER_FFMPEG_EXECUTABLE"
@doc """
Start this node as a local origin ingest endpoint.
Options are keyword based. Common options:
* `:push_test?` - start a generated ffmpeg test publisher after opening ingest.
* `:duration` - optional test publisher duration in seconds.
* `:input_url` - RTMP listener URL for node-side ffmpeg.
* `:push_url` - RTMP URL used by the optional test publisher.
* `:spool_root` - HLS spool root override.
The local node pubkey is required and is read from `Tribes.Identity.node_pubkey/0`.
"""
def start_origin(opts \\ []) when is_list(opts) do
with {:ok, node_pubkey} <- local_node_pubkey(opts),
{:ok, %{"endpoint" => endpoint}} <- upsert_local_origin(node_pubkey, opts),
{:ok, payload} <-
StreamLifecycle.start(
context("stream.start", node_pubkey),
start_params(endpoint, opts)
) do
maybe_attach_test_video(payload, endpoint, opts)
end
end
@doc """
Return current local origin ingest status.
"""
def status(opts \\ [])
def status(%{"generation" => %{"id" => generation_id}}) when is_binary(generation_id) do
status(generation_id: generation_id)
end
def status(%{generation: %{"id" => generation_id}}) when is_binary(generation_id) do
status(generation_id: generation_id)
end
def status(opts) when is_list(opts) do
StreamLifecycle.status(
context("stream.status", Keyword.get(opts, :actor_pubkey)),
status_params(opts)
)
end
@doc """
Stop the active, or specified, local origin ingest stream.
"""
def stop(opts \\ [])
def stop(%{"generation" => %{"id" => generation_id}}) when is_binary(generation_id) do
stop(generation_id: generation_id)
end
def stop(%{generation: %{"id" => generation_id}}) when is_binary(generation_id) do
stop(generation_id: generation_id)
end
def stop(opts) when is_list(opts) do
StreamLifecycle.stop(
context("stream.stop", Keyword.get(opts, :actor_pubkey)),
stop_params(opts)
)
end
@doc """
Start a local generated ffmpeg test-video publisher.
The publisher is started under `TribeOne.TribesPlugin.Sender.MediaBackendSupervisor` by default, so
it is not linked to the IEx shell process. Pass `duration: seconds` to make it
exit on its own.
"""
def push_test_video(opts \\ []) when is_list(opts) do
attempts = Keyword.get(opts, :retry_attempts, 30)
interval_ms = Keyword.get(opts, :retry_interval_ms, 1_000)
retry_start_test_video(opts, attempts, interval_ms)
end
@doc """
Stop a test-video publisher returned by `push_test_video/1` or `start_origin/1`.
"""
def stop_test_video(pid) when is_pid(pid), do: GenServer.stop(pid, :normal)
def stop_test_video(%{"pid" => pid}) when is_pid(pid), do: stop_test_video(pid)
def stop_test_video(%{pid: pid}) when is_pid(pid), do: stop_test_video(pid)
def stop_test_video(%{"test_video" => %{"pid" => pid}}) when is_pid(pid) do
stop_test_video(pid)
end
def stop_test_video(%{test_video: %{pid: pid}}) when is_pid(pid) do
stop_test_video(pid)
end
def stop_test_video(_other), do: {:error, :missing_test_video_pid}
@doc """
Return the ffmpeg executable and args used by `push_test_video/1`.
"""
def test_video_command(opts \\ []) when is_list(opts) do
with {:ok, executable} <- ffmpeg_executable() do
{:ok, {executable, test_video_args(opts)}}
end
end
@doc """
Return the ffmpeg args used for generated test video.
"""
def test_video_args(opts \\ []) when is_list(opts) do
duration_args =
case Keyword.get(opts, :duration) do
nil -> []
duration -> ["-t", to_string(duration)]
end
[
"-hide_banner",
"-loglevel",
to_string(Keyword.get(opts, :loglevel, "warning")),
"-re",
"-f",
"lavfi",
"-i",
"testsrc=size=#{Keyword.get(opts, :size, "320x180")}:rate=#{Keyword.get(opts, :rate, 15)}",
"-f",
"lavfi",
"-i",
"sine=frequency=#{Keyword.get(opts, :frequency, 1000)}:sample_rate=#{Keyword.get(opts, :sample_rate, 48_000)}"
] ++
duration_args ++
[
"-c:v",
"libx264",
"-preset",
"ultrafast",
"-tune",
"zerolatency",
"-pix_fmt",
"yuv420p",
"-g",
to_string(Keyword.get(opts, :gop, 30)),
"-keyint_min",
to_string(Keyword.get(opts, :keyint_min, Keyword.get(opts, :gop, 30))),
"-sc_threshold",
"0",
"-c:a",
"aac",
"-f",
"flv",
Keyword.get(opts, :push_url, @default_push_url)
]
end
defp upsert_local_origin(node_pubkey, opts) do
Topology.upsert_endpoint(
context("media_endpoints.upsert", node_pubkey),
endpoint_params(node_pubkey, opts)
)
end
defp endpoint_params(node_pubkey, opts) do
%{
"type" => "tribes_origin",
"node_pubkey" => node_pubkey,
"display_name" => Keyword.get(opts, :display_name, @default_display_name),
"rtmp_ingest_url" => Keyword.get(opts, :rtmp_ingest_url, @default_rtmp_ingest_url),
"hls_base_url" => Keyword.get(opts, :hls_base_url, @default_hls_base_url)
}
|> maybe_put_string("id", Keyword.get(opts, :endpoint_id))
|> maybe_put_string("public_base_url", Keyword.get(opts, :public_base_url))
|> maybe_put_string("internal_base_url", Keyword.get(opts, :internal_base_url))
|> maybe_put_string("metadata", Keyword.get(opts, :metadata))
end
defp start_params(endpoint, opts) do
%{
"mode" => "origin_ingest",
"endpoint_id" => endpoint["id"],
"input_url" => Keyword.get(opts, :input_url, @default_input_url)
}
|> maybe_put_string("stream_id", Keyword.get(opts, :stream_id))
|> maybe_put_string("slug", Keyword.get(opts, :slug))
|> maybe_put_string("title", Keyword.get(opts, :title))
|> maybe_put_string("description", Keyword.get(opts, :description))
|> maybe_put_string("visibility", Keyword.get(opts, :visibility))
|> maybe_put_string("latency_mode", Keyword.get(opts, :latency_mode))
|> maybe_put_string("recording_policy", Keyword.get(opts, :recording_policy))
|> maybe_put_string("default_rendition_policy", Keyword.get(opts, :default_rendition_policy))
|> maybe_put_string("spool_root", Keyword.get(opts, :spool_root))
|> maybe_put_string("hls_time", Keyword.get(opts, :hls_time))
|> maybe_put_string("hls_list_size", Keyword.get(opts, :hls_list_size))
|> maybe_put_string("hls_delete_threshold", Keyword.get(opts, :hls_delete_threshold))
|> maybe_put_string("ffmpeg_loglevel", Keyword.get(opts, :ffmpeg_loglevel, "info"))
|> maybe_put_string("ffmpeg_log_output", Keyword.get(opts, :ffmpeg_log_output, :warning))
|> maybe_put_string(
"readiness_poll_interval_ms",
Keyword.get(opts, :readiness_poll_interval_ms)
)
end
defp status_params(opts) do
%{"mode" => "origin_ingest"}
|> maybe_put_string("generation_id", Keyword.get(opts, :generation_id))
end
defp stop_params(opts) do
status_params(opts)
|> maybe_put_string("spool_root", Keyword.get(opts, :spool_root))
|> maybe_put_string("ended_retention_ms", Keyword.get(opts, :ended_retention_ms))
end
defp maybe_attach_test_video(payload, endpoint, opts) do
if Keyword.get(opts, :push_test?, false) do
case push_test_video(opts) do
{:ok, test_video} ->
{:ok,
payload
|> Map.put("endpoint", endpoint)
|> Map.put("test_video", test_video)}
{:error, reason} ->
_ = stop_started_origin(payload, opts)
{:error, reason}
end
else
{:ok, Map.put(payload, "endpoint", endpoint)}
end
end
defp stop_started_origin(%{"generation" => %{"id" => generation_id}}, opts) do
stop(
generation_id: generation_id,
spool_root: Keyword.get(opts, :spool_root),
ended_retention_ms: Keyword.get(opts, :ended_retention_ms, 10)
)
end
defp stop_started_origin(_payload, _opts), do: :ok
defp retry_start_test_video(_opts, attempts, _interval_ms) when attempts < 1 do
{:error, :retry_exhausted}
end
defp retry_start_test_video(opts, attempts, interval_ms) do
case start_test_video_once(opts) do
{:ok, test_video} ->
{:ok, test_video}
{:error, _reason} when attempts > 1 ->
Process.sleep(interval_ms)
retry_start_test_video(opts, attempts - 1, interval_ms)
{:error, reason} ->
{:error, reason}
end
end
defp start_test_video_once(opts) do
with {:ok, {executable, args}} <- test_video_command(opts),
{:ok, pid} <- start_muontrap(executable, args, opts),
:ok <- wait_for_startup(pid, Keyword.get(opts, :startup_grace_ms, 300)) do
{:ok,
%{
"pid" => pid,
"os_pid" => MuonTrap.Daemon.os_pid(pid),
"push_url" => Keyword.get(opts, :push_url, @default_push_url)
}}
end
end
defp start_muontrap(executable, args, opts) do
supervisor =
Keyword.get(opts, :supervisor, TribeOne.TribesPlugin.Sender.MediaBackendSupervisor)
child_spec = %{
id: {__MODULE__, :test_video, System.unique_integer([:positive, :monotonic])},
start: {MuonTrap.Daemon, :start_link, [executable, args, muontrap_opts(opts)]},
restart: :temporary,
type: :worker
}
DynamicSupervisor.start_child(supervisor, child_spec)
end
defp wait_for_startup(pid, grace_ms) do
ref = Process.monitor(pid)
receive do
{:DOWN, ^ref, :process, ^pid, reason} -> {:error, {:ffmpeg_exited, reason}}
after
grace_ms ->
Process.demonitor(ref, [:flush])
:ok
end
end
defp muontrap_opts(opts) do
base_opts = [
stderr_to_stdout: true,
log_prefix: "sender test-video ffmpeg ",
logger_metadata: [sender_test_video: true],
exit_status_to_reason: &exit_status_to_reason/1
]
case Keyword.get(opts, :log_output, :warning) do
nil -> base_opts
level -> Keyword.put(base_opts, :log_output, level)
end
end
defp exit_status_to_reason(0), do: :normal
defp exit_status_to_reason(status), do: {:ffmpeg_exit, status}
defp ffmpeg_executable do
configured = System.get_env(@ffmpeg_executable_env) || "ffmpeg"
cond do
Path.type(configured) == :absolute and File.exists?(configured) -> {:ok, configured}
found = System.find_executable(configured) -> {:ok, found}
true -> {:error, {:missing_executable, configured}}
end
end
defp local_node_pubkey(opts) do
opts
|> Keyword.get(:node_pubkey_fun, &Tribes.Identity.node_pubkey/0)
|> case do
pubkey when is_binary(pubkey) -> validate_node_pubkey(pubkey)
fun when is_function(fun, 0) -> fun.() |> normalize_node_pubkey()
end
end
defp normalize_node_pubkey({:ok, pubkey}), do: validate_node_pubkey(pubkey)
defp normalize_node_pubkey({:error, _reason} = error), do: error
defp normalize_node_pubkey(pubkey) when is_binary(pubkey), do: validate_node_pubkey(pubkey)
defp normalize_node_pubkey(other), do: {:error, {:invalid_node_pubkey, other}}
defp validate_node_pubkey(pubkey) when is_binary(pubkey) and pubkey != "", do: {:ok, pubkey}
defp validate_node_pubkey(other), do: {:error, {:invalid_node_pubkey, other}}
defp context(method, actor_pubkey) do
%ManagementContext{
plugin: "tribe-one-sender",
method: method,
version: "1",
actor_pubkey: actor_pubkey,
admin?: true
}
end
defp maybe_put_string(map, _key, nil), do: map
defp maybe_put_string(map, key, value), do: Map.put(map, key, value)
end