You've already forked tribes-plugin-sender
forked from tribes/tribes-plugin-template
3e288a7e95
Rename Sender plugin identity to tribe-one-sender across manifest, runtime API routes, metrics, e2e fixtures, and tests.
376 lines
12 KiB
Elixir
376 lines
12 KiB
Elixir
defmodule TribeOne.TribesPlugin.Sender.LocalStreams do
|
|
@moduledoc """
|
|
Local Sender stream control helpers.
|
|
|
|
This module is intended for IEx and local operation when an external
|
|
orchestrator such as Legion is not managing TribeOne.TribesPlugin.Sender. It composes the same
|
|
management handlers that remote orchestration uses, and can optionally launch
|
|
a local ffmpeg test-video publisher with MuonTrap.
|
|
"""
|
|
|
|
alias TribeOne.TribesPlugin.Sender.Management.{StreamLifecycle, Topology}
|
|
alias Tribes.Plugin.ManagementContext
|
|
|
|
@default_input_url "rtmp://0.0.0.0:1935/live/source"
|
|
@default_push_url "rtmp://127.0.0.1:1935/live/source"
|
|
@default_rtmp_ingest_url "rtmp://localhost:1935/live/source"
|
|
@default_hls_base_url "http://localhost:4000/sender/hls"
|
|
@default_display_name "Local Sender origin"
|
|
@ffmpeg_executable_env "SENDER_FFMPEG_EXECUTABLE"
|
|
|
|
@doc """
|
|
Start this node as a local origin ingest endpoint.
|
|
|
|
Options are keyword based. Common options:
|
|
|
|
* `:push_test?` - start a generated ffmpeg test publisher after opening ingest.
|
|
* `:duration` - optional test publisher duration in seconds.
|
|
* `:input_url` - RTMP listener URL for node-side ffmpeg.
|
|
* `:push_url` - RTMP URL used by the optional test publisher.
|
|
* `:spool_root` - HLS spool root override.
|
|
|
|
The local node pubkey is required and is read from `Tribes.Identity.node_pubkey/0`.
|
|
"""
|
|
def start_origin(opts \\ []) when is_list(opts) do
|
|
with {:ok, node_pubkey} <- local_node_pubkey(opts),
|
|
{:ok, %{"endpoint" => endpoint}} <- upsert_local_origin(node_pubkey, opts),
|
|
{:ok, payload} <-
|
|
StreamLifecycle.start(
|
|
context("stream.start", node_pubkey),
|
|
start_params(endpoint, opts)
|
|
) do
|
|
maybe_attach_test_video(payload, endpoint, opts)
|
|
end
|
|
end
|
|
|
|
@doc """
|
|
Return current local origin ingest status.
|
|
"""
|
|
def status(opts \\ [])
|
|
|
|
def status(%{"generation" => %{"id" => generation_id}}) when is_binary(generation_id) do
|
|
status(generation_id: generation_id)
|
|
end
|
|
|
|
def status(%{generation: %{"id" => generation_id}}) when is_binary(generation_id) do
|
|
status(generation_id: generation_id)
|
|
end
|
|
|
|
def status(opts) when is_list(opts) do
|
|
StreamLifecycle.status(
|
|
context("stream.status", Keyword.get(opts, :actor_pubkey)),
|
|
status_params(opts)
|
|
)
|
|
end
|
|
|
|
@doc """
|
|
Stop the active, or specified, local origin ingest stream.
|
|
"""
|
|
def stop(opts \\ [])
|
|
|
|
def stop(%{"generation" => %{"id" => generation_id}}) when is_binary(generation_id) do
|
|
stop(generation_id: generation_id)
|
|
end
|
|
|
|
def stop(%{generation: %{"id" => generation_id}}) when is_binary(generation_id) do
|
|
stop(generation_id: generation_id)
|
|
end
|
|
|
|
def stop(opts) when is_list(opts) do
|
|
StreamLifecycle.stop(
|
|
context("stream.stop", Keyword.get(opts, :actor_pubkey)),
|
|
stop_params(opts)
|
|
)
|
|
end
|
|
|
|
@doc """
|
|
Start a local generated ffmpeg test-video publisher.
|
|
|
|
The publisher is started under `TribeOne.TribesPlugin.Sender.MediaBackendSupervisor` by default, so
|
|
it is not linked to the IEx shell process. Pass `duration: seconds` to make it
|
|
exit on its own.
|
|
"""
|
|
def push_test_video(opts \\ []) when is_list(opts) do
|
|
attempts = Keyword.get(opts, :retry_attempts, 30)
|
|
interval_ms = Keyword.get(opts, :retry_interval_ms, 1_000)
|
|
|
|
retry_start_test_video(opts, attempts, interval_ms)
|
|
end
|
|
|
|
@doc """
|
|
Stop a test-video publisher returned by `push_test_video/1` or `start_origin/1`.
|
|
"""
|
|
def stop_test_video(pid) when is_pid(pid), do: GenServer.stop(pid, :normal)
|
|
|
|
def stop_test_video(%{"pid" => pid}) when is_pid(pid), do: stop_test_video(pid)
|
|
|
|
def stop_test_video(%{pid: pid}) when is_pid(pid), do: stop_test_video(pid)
|
|
|
|
def stop_test_video(%{"test_video" => %{"pid" => pid}}) when is_pid(pid) do
|
|
stop_test_video(pid)
|
|
end
|
|
|
|
def stop_test_video(%{test_video: %{pid: pid}}) when is_pid(pid) do
|
|
stop_test_video(pid)
|
|
end
|
|
|
|
def stop_test_video(_other), do: {:error, :missing_test_video_pid}
|
|
|
|
@doc """
|
|
Return the ffmpeg executable and args used by `push_test_video/1`.
|
|
"""
|
|
def test_video_command(opts \\ []) when is_list(opts) do
|
|
with {:ok, executable} <- ffmpeg_executable() do
|
|
{:ok, {executable, test_video_args(opts)}}
|
|
end
|
|
end
|
|
|
|
@doc """
|
|
Return the ffmpeg args used for generated test video.
|
|
"""
|
|
def test_video_args(opts \\ []) when is_list(opts) do
|
|
duration_args =
|
|
case Keyword.get(opts, :duration) do
|
|
nil -> []
|
|
duration -> ["-t", to_string(duration)]
|
|
end
|
|
|
|
[
|
|
"-hide_banner",
|
|
"-loglevel",
|
|
to_string(Keyword.get(opts, :loglevel, "warning")),
|
|
"-re",
|
|
"-f",
|
|
"lavfi",
|
|
"-i",
|
|
"testsrc=size=#{Keyword.get(opts, :size, "320x180")}:rate=#{Keyword.get(opts, :rate, 15)}",
|
|
"-f",
|
|
"lavfi",
|
|
"-i",
|
|
"sine=frequency=#{Keyword.get(opts, :frequency, 1000)}:sample_rate=#{Keyword.get(opts, :sample_rate, 48_000)}"
|
|
] ++
|
|
duration_args ++
|
|
[
|
|
"-c:v",
|
|
"libx264",
|
|
"-preset",
|
|
"ultrafast",
|
|
"-tune",
|
|
"zerolatency",
|
|
"-pix_fmt",
|
|
"yuv420p",
|
|
"-g",
|
|
to_string(Keyword.get(opts, :gop, 30)),
|
|
"-keyint_min",
|
|
to_string(Keyword.get(opts, :keyint_min, Keyword.get(opts, :gop, 30))),
|
|
"-sc_threshold",
|
|
"0",
|
|
"-c:a",
|
|
"aac",
|
|
"-f",
|
|
"flv",
|
|
Keyword.get(opts, :push_url, @default_push_url)
|
|
]
|
|
end
|
|
|
|
defp upsert_local_origin(node_pubkey, opts) do
|
|
Topology.upsert_endpoint(
|
|
context("media_endpoints.upsert", node_pubkey),
|
|
endpoint_params(node_pubkey, opts)
|
|
)
|
|
end
|
|
|
|
defp endpoint_params(node_pubkey, opts) do
|
|
%{
|
|
"type" => "tribes_origin",
|
|
"node_pubkey" => node_pubkey,
|
|
"display_name" => Keyword.get(opts, :display_name, @default_display_name),
|
|
"rtmp_ingest_url" => Keyword.get(opts, :rtmp_ingest_url, @default_rtmp_ingest_url),
|
|
"hls_base_url" => Keyword.get(opts, :hls_base_url, @default_hls_base_url)
|
|
}
|
|
|> maybe_put_string("id", Keyword.get(opts, :endpoint_id))
|
|
|> maybe_put_string("public_base_url", Keyword.get(opts, :public_base_url))
|
|
|> maybe_put_string("internal_base_url", Keyword.get(opts, :internal_base_url))
|
|
|> maybe_put_string("metadata", Keyword.get(opts, :metadata))
|
|
end
|
|
|
|
defp start_params(endpoint, opts) do
|
|
%{
|
|
"mode" => "origin_ingest",
|
|
"endpoint_id" => endpoint["id"],
|
|
"input_url" => Keyword.get(opts, :input_url, @default_input_url)
|
|
}
|
|
|> maybe_put_string("stream_id", Keyword.get(opts, :stream_id))
|
|
|> maybe_put_string("slug", Keyword.get(opts, :slug))
|
|
|> maybe_put_string("title", Keyword.get(opts, :title))
|
|
|> maybe_put_string("description", Keyword.get(opts, :description))
|
|
|> maybe_put_string("visibility", Keyword.get(opts, :visibility))
|
|
|> maybe_put_string("latency_mode", Keyword.get(opts, :latency_mode))
|
|
|> maybe_put_string("recording_policy", Keyword.get(opts, :recording_policy))
|
|
|> maybe_put_string("default_rendition_policy", Keyword.get(opts, :default_rendition_policy))
|
|
|> maybe_put_string("spool_root", Keyword.get(opts, :spool_root))
|
|
|> maybe_put_string("hls_time", Keyword.get(opts, :hls_time))
|
|
|> maybe_put_string("hls_list_size", Keyword.get(opts, :hls_list_size))
|
|
|> maybe_put_string("hls_delete_threshold", Keyword.get(opts, :hls_delete_threshold))
|
|
|> maybe_put_string("ffmpeg_loglevel", Keyword.get(opts, :ffmpeg_loglevel, "info"))
|
|
|> maybe_put_string("ffmpeg_log_output", Keyword.get(opts, :ffmpeg_log_output, :warning))
|
|
|> maybe_put_string(
|
|
"readiness_poll_interval_ms",
|
|
Keyword.get(opts, :readiness_poll_interval_ms)
|
|
)
|
|
end
|
|
|
|
defp status_params(opts) do
|
|
%{"mode" => "origin_ingest"}
|
|
|> maybe_put_string("generation_id", Keyword.get(opts, :generation_id))
|
|
end
|
|
|
|
defp stop_params(opts) do
|
|
status_params(opts)
|
|
|> maybe_put_string("spool_root", Keyword.get(opts, :spool_root))
|
|
|> maybe_put_string("ended_retention_ms", Keyword.get(opts, :ended_retention_ms))
|
|
end
|
|
|
|
defp maybe_attach_test_video(payload, endpoint, opts) do
|
|
if Keyword.get(opts, :push_test?, false) do
|
|
case push_test_video(opts) do
|
|
{:ok, test_video} ->
|
|
{:ok,
|
|
payload
|
|
|> Map.put("endpoint", endpoint)
|
|
|> Map.put("test_video", test_video)}
|
|
|
|
{:error, reason} ->
|
|
_ = stop_started_origin(payload, opts)
|
|
{:error, reason}
|
|
end
|
|
else
|
|
{:ok, Map.put(payload, "endpoint", endpoint)}
|
|
end
|
|
end
|
|
|
|
defp stop_started_origin(%{"generation" => %{"id" => generation_id}}, opts) do
|
|
stop(
|
|
generation_id: generation_id,
|
|
spool_root: Keyword.get(opts, :spool_root),
|
|
ended_retention_ms: Keyword.get(opts, :ended_retention_ms, 10)
|
|
)
|
|
end
|
|
|
|
defp stop_started_origin(_payload, _opts), do: :ok
|
|
|
|
defp retry_start_test_video(_opts, attempts, _interval_ms) when attempts < 1 do
|
|
{:error, :retry_exhausted}
|
|
end
|
|
|
|
defp retry_start_test_video(opts, attempts, interval_ms) do
|
|
case start_test_video_once(opts) do
|
|
{:ok, test_video} ->
|
|
{:ok, test_video}
|
|
|
|
{:error, _reason} when attempts > 1 ->
|
|
Process.sleep(interval_ms)
|
|
retry_start_test_video(opts, attempts - 1, interval_ms)
|
|
|
|
{:error, reason} ->
|
|
{:error, reason}
|
|
end
|
|
end
|
|
|
|
defp start_test_video_once(opts) do
|
|
with {:ok, {executable, args}} <- test_video_command(opts),
|
|
{:ok, pid} <- start_muontrap(executable, args, opts),
|
|
:ok <- wait_for_startup(pid, Keyword.get(opts, :startup_grace_ms, 300)) do
|
|
{:ok,
|
|
%{
|
|
"pid" => pid,
|
|
"os_pid" => MuonTrap.Daemon.os_pid(pid),
|
|
"push_url" => Keyword.get(opts, :push_url, @default_push_url)
|
|
}}
|
|
end
|
|
end
|
|
|
|
defp start_muontrap(executable, args, opts) do
|
|
supervisor =
|
|
Keyword.get(opts, :supervisor, TribeOne.TribesPlugin.Sender.MediaBackendSupervisor)
|
|
|
|
child_spec = %{
|
|
id: {__MODULE__, :test_video, System.unique_integer([:positive, :monotonic])},
|
|
start: {MuonTrap.Daemon, :start_link, [executable, args, muontrap_opts(opts)]},
|
|
restart: :temporary,
|
|
type: :worker
|
|
}
|
|
|
|
DynamicSupervisor.start_child(supervisor, child_spec)
|
|
end
|
|
|
|
defp wait_for_startup(pid, grace_ms) do
|
|
ref = Process.monitor(pid)
|
|
|
|
receive do
|
|
{:DOWN, ^ref, :process, ^pid, reason} -> {:error, {:ffmpeg_exited, reason}}
|
|
after
|
|
grace_ms ->
|
|
Process.demonitor(ref, [:flush])
|
|
:ok
|
|
end
|
|
end
|
|
|
|
defp muontrap_opts(opts) do
|
|
base_opts = [
|
|
stderr_to_stdout: true,
|
|
log_prefix: "sender test-video ffmpeg ",
|
|
logger_metadata: [sender_test_video: true],
|
|
exit_status_to_reason: &exit_status_to_reason/1
|
|
]
|
|
|
|
case Keyword.get(opts, :log_output, :warning) do
|
|
nil -> base_opts
|
|
level -> Keyword.put(base_opts, :log_output, level)
|
|
end
|
|
end
|
|
|
|
defp exit_status_to_reason(0), do: :normal
|
|
defp exit_status_to_reason(status), do: {:ffmpeg_exit, status}
|
|
|
|
defp ffmpeg_executable do
|
|
configured = System.get_env(@ffmpeg_executable_env) || "ffmpeg"
|
|
|
|
cond do
|
|
Path.type(configured) == :absolute and File.exists?(configured) -> {:ok, configured}
|
|
found = System.find_executable(configured) -> {:ok, found}
|
|
true -> {:error, {:missing_executable, configured}}
|
|
end
|
|
end
|
|
|
|
defp local_node_pubkey(opts) do
|
|
opts
|
|
|> Keyword.get(:node_pubkey_fun, &Tribes.Identity.node_pubkey/0)
|
|
|> case do
|
|
pubkey when is_binary(pubkey) -> validate_node_pubkey(pubkey)
|
|
fun when is_function(fun, 0) -> fun.() |> normalize_node_pubkey()
|
|
end
|
|
end
|
|
|
|
defp normalize_node_pubkey({:ok, pubkey}), do: validate_node_pubkey(pubkey)
|
|
defp normalize_node_pubkey({:error, _reason} = error), do: error
|
|
defp normalize_node_pubkey(pubkey) when is_binary(pubkey), do: validate_node_pubkey(pubkey)
|
|
defp normalize_node_pubkey(other), do: {:error, {:invalid_node_pubkey, other}}
|
|
|
|
defp validate_node_pubkey(pubkey) when is_binary(pubkey) and pubkey != "", do: {:ok, pubkey}
|
|
defp validate_node_pubkey(other), do: {:error, {:invalid_node_pubkey, other}}
|
|
|
|
defp context(method, actor_pubkey) do
|
|
%ManagementContext{
|
|
plugin: "tribe-one-sender",
|
|
method: method,
|
|
version: "1",
|
|
actor_pubkey: actor_pubkey,
|
|
admin?: true
|
|
}
|
|
end
|
|
|
|
defp maybe_put_string(map, _key, nil), do: map
|
|
defp maybe_put_string(map, key, value), do: Map.put(map, key, value)
|
|
end
|