You've already forked tribes-plugin-sender
forked from tribes/tribes-plugin-template
f8e2bfaada
Move Sender modules under TribeOne.TribesPlugin.Sender and replace the Aether-specific chat integration with the public chat@1 surface contract.
431 lines
12 KiB
Elixir
431 lines
12 KiB
Elixir
defmodule TribeOne.TribesPlugin.Sender.Stats do
|
|
@moduledoc """
|
|
Node-local live streaming counters.
|
|
|
|
This process intentionally keeps raw counters local. Cluster-wide state is
|
|
published separately as coarse `EndpointSnapshot` rows and host metrics
|
|
rollups.
|
|
"""
|
|
|
|
use GenServer
|
|
|
|
@type key :: {String.t() | nil, String.t() | nil, String.t() | nil}
|
|
@type generation_key :: {String.t() | nil, String.t() | nil}
|
|
|
|
@viewer_ttl_ms 90_000
|
|
@cleanup_interval_ms 15_000
|
|
|
|
def start_link(opts \\ []) do
|
|
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
|
|
end
|
|
|
|
def increment_viewers(endpoint_id, stream_id, generation_id) do
|
|
GenServer.call(__MODULE__, {:increment_viewers, key(endpoint_id, stream_id, generation_id)})
|
|
end
|
|
|
|
def decrement_viewers(endpoint_id, stream_id, generation_id) do
|
|
GenServer.call(__MODULE__, {:decrement_viewers, key(endpoint_id, stream_id, generation_id)})
|
|
end
|
|
|
|
def record_bitrate(endpoint_id, stream_id, generation_id, measurements)
|
|
when is_map(measurements) do
|
|
GenServer.call(
|
|
__MODULE__,
|
|
{:record_bitrate, key(endpoint_id, stream_id, generation_id), measurements}
|
|
)
|
|
end
|
|
|
|
def mark_segment(endpoint_id, stream_id, generation_id, attrs \\ []) do
|
|
GenServer.call(__MODULE__, {:mark_segment, key(endpoint_id, stream_id, generation_id), attrs})
|
|
end
|
|
|
|
def record_viewer_heartbeat(stream_id, generation_id, viewer_session_id, remote_ip) do
|
|
case normalize_countable_ip(remote_ip) do
|
|
{:ok, ip} ->
|
|
GenServer.call(
|
|
__MODULE__,
|
|
{:record_viewer_heartbeat, generation_key(stream_id, generation_id), viewer_session_id,
|
|
ip}
|
|
)
|
|
|
|
{:error, _reason} ->
|
|
:ignored
|
|
end
|
|
end
|
|
|
|
def record_hls_access(path_segments, remote_ip, viewer_session_id \\ nil)
|
|
when is_list(path_segments) do
|
|
with {:ok, generation_key} <- generation_key_from_hls_path(path_segments),
|
|
{:ok, ip} <- normalize_countable_ip(remote_ip) do
|
|
GenServer.call(
|
|
__MODULE__,
|
|
{:record_hls_access, generation_key, ip, normalize_viewer_session_id(viewer_session_id)}
|
|
)
|
|
else
|
|
_reason -> :ignored
|
|
end
|
|
end
|
|
|
|
def snapshot(endpoint_id, stream_id, generation_id) do
|
|
GenServer.call(__MODULE__, {:snapshot, key(endpoint_id, stream_id, generation_id)})
|
|
end
|
|
|
|
def viewer_count(stream_id, generation_id) do
|
|
GenServer.call(__MODULE__, {:viewer_count, generation_key(stream_id, generation_id)})
|
|
end
|
|
|
|
def reset do
|
|
GenServer.call(__MODULE__, :reset)
|
|
end
|
|
|
|
@impl true
|
|
def init(opts) do
|
|
state = fresh_state(opts)
|
|
schedule_cleanup(state)
|
|
{:ok, state}
|
|
end
|
|
|
|
@impl true
|
|
def handle_call({:increment_viewers, key}, _from, state) do
|
|
next = update_metric(state, key, :viewer_count, &(&1 + 1))
|
|
entry = Map.fetch!(next.entries, key)
|
|
emit(:viewer_count, key, entry)
|
|
{:reply, entry, next}
|
|
end
|
|
|
|
def handle_call({:decrement_viewers, key}, _from, state) do
|
|
next = update_metric(state, key, :viewer_count, &max(&1 - 1, 0))
|
|
entry = Map.fetch!(next.entries, key)
|
|
emit(:viewer_count, key, entry)
|
|
{:reply, entry, next}
|
|
end
|
|
|
|
def handle_call({:record_bitrate, key, measurements}, _from, state) do
|
|
next =
|
|
update_entry(state, key, fn entry ->
|
|
entry
|
|
|> maybe_put(:ingress_bps, measurements)
|
|
|> maybe_put(:egress_bps, measurements)
|
|
end)
|
|
|
|
entry = Map.fetch!(next.entries, key)
|
|
emit(:bitrate, key, entry)
|
|
{:reply, entry, next}
|
|
end
|
|
|
|
def handle_call({:mark_segment, key, attrs}, _from, state) do
|
|
now = DateTime.utc_now() |> DateTime.truncate(:second)
|
|
|
|
next =
|
|
update_entry(state, key, fn entry ->
|
|
entry
|
|
|> Map.put(:last_segment_at, Keyword.get(attrs, :last_segment_at, now))
|
|
|> maybe_put_kw(:segment_lag_ms, attrs)
|
|
|> maybe_put_kw(:playlist_age_ms, attrs)
|
|
end)
|
|
|
|
entry = Map.fetch!(next.entries, key)
|
|
emit(:segment, key, entry)
|
|
{:reply, entry, next}
|
|
end
|
|
|
|
def handle_call({:record_viewer_heartbeat, generation_key, viewer_session_id, ip}, _from, state) do
|
|
now_ms = now_ms()
|
|
|
|
next =
|
|
state
|
|
|> prune_stale_viewers(now_ms)
|
|
|> put_heartbeat_session(generation_key, viewer_session_id, ip, now_ms)
|
|
|> emit_viewer_count(generation_key, now_ms)
|
|
|
|
{:reply, effective_viewer_count(next, generation_key, now_ms), next}
|
|
end
|
|
|
|
def handle_call({:record_hls_access, generation_key, ip, viewer_session_id}, _from, state) do
|
|
now_ms = now_ms()
|
|
|
|
next =
|
|
state
|
|
|> prune_stale_viewers(now_ms)
|
|
|> put_hls_viewer(generation_key, ip, viewer_session_id, now_ms)
|
|
|> emit_viewer_count_if_changed(generation_key, now_ms)
|
|
|
|
{:reply, effective_viewer_count(next, generation_key, now_ms), next}
|
|
end
|
|
|
|
def handle_call({:snapshot, key}, _from, state) do
|
|
{:reply, Map.get(state.entries, key, default_entry()), state}
|
|
end
|
|
|
|
def handle_call({:viewer_count, generation_key}, _from, state) do
|
|
now_ms = now_ms()
|
|
next = prune_stale_viewers(state, now_ms)
|
|
{:reply, effective_viewer_count(next, generation_key, now_ms), next}
|
|
end
|
|
|
|
def handle_call(:reset, _from, state) do
|
|
{:reply, :ok, fresh_state(state)}
|
|
end
|
|
|
|
@impl true
|
|
def handle_info(:cleanup_viewers, state) do
|
|
now_ms = now_ms()
|
|
|
|
next =
|
|
state
|
|
|> prune_stale_viewers(now_ms)
|
|
|> emit_all_viewer_counts(now_ms)
|
|
|
|
schedule_cleanup(next)
|
|
{:noreply, next}
|
|
end
|
|
|
|
defp fresh_state(opts) do
|
|
%{
|
|
entries: %{},
|
|
heartbeat_sessions: %{},
|
|
hls_ips: %{},
|
|
last_emitted_viewer_counts: %{},
|
|
viewer_ttl_ms: option(opts, :viewer_ttl_ms, @viewer_ttl_ms),
|
|
cleanup_interval_ms: option(opts, :cleanup_interval_ms, @cleanup_interval_ms)
|
|
}
|
|
end
|
|
|
|
defp option(opts, key, default) when is_list(opts), do: Keyword.get(opts, key, default)
|
|
defp option(%{} = opts, key, default), do: Map.get(opts, key, default)
|
|
|
|
defp update_metric(state, key, metric, callback) do
|
|
update_entry(state, key, fn entry ->
|
|
Map.update!(entry, metric, callback)
|
|
end)
|
|
end
|
|
|
|
defp update_entry(state, key, callback) do
|
|
update_in(state.entries, &Map.update(&1, key, callback.(default_entry()), callback))
|
|
end
|
|
|
|
defp maybe_put(entry, key, measurements) do
|
|
case Map.fetch(measurements, key) do
|
|
{:ok, value} -> Map.put(entry, key, value)
|
|
:error -> entry
|
|
end
|
|
end
|
|
|
|
defp maybe_put_kw(entry, key, measurements) do
|
|
case Keyword.fetch(measurements, key) do
|
|
{:ok, value} -> Map.put(entry, key, value)
|
|
:error -> entry
|
|
end
|
|
end
|
|
|
|
defp default_entry do
|
|
%{
|
|
viewer_count: 0,
|
|
ingress_bps: nil,
|
|
egress_bps: nil,
|
|
segment_lag_ms: nil,
|
|
playlist_age_ms: nil,
|
|
last_segment_at: nil
|
|
}
|
|
end
|
|
|
|
defp key(endpoint_id, stream_id, generation_id) do
|
|
{nullable_string(endpoint_id), nullable_string(stream_id), nullable_string(generation_id)}
|
|
end
|
|
|
|
defp generation_key(stream_id, generation_id) do
|
|
{nullable_string(stream_id), nullable_string(generation_id)}
|
|
end
|
|
|
|
defp generation_key_from_hls_path(["streams", stream_id, generation_id | _rest]),
|
|
do: {:ok, generation_key(stream_id, generation_id)}
|
|
|
|
defp generation_key_from_hls_path([stream_id, generation_id | _rest]),
|
|
do: {:ok, generation_key(stream_id, generation_id)}
|
|
|
|
defp generation_key_from_hls_path(_path_segments), do: {:error, :unknown_hls_generation}
|
|
|
|
defp put_heartbeat_session(state, generation_key, viewer_session_id, ip, now_ms)
|
|
when is_binary(viewer_session_id) and viewer_session_id != "" do
|
|
session_key = {generation_key, viewer_session_id}
|
|
|
|
put_in(state.heartbeat_sessions[session_key], %{ip: ip, last_seen_ms: now_ms})
|
|
end
|
|
|
|
defp put_heartbeat_session(state, _generation_key, _viewer_session_id, _ip, _now_ms), do: state
|
|
|
|
defp put_hls_ip(state, generation_key, ip, now_ms) do
|
|
put_in(state.hls_ips[{generation_key, ip}], now_ms)
|
|
end
|
|
|
|
defp put_hls_viewer(state, generation_key, ip, viewer_session_id, now_ms)
|
|
when is_binary(viewer_session_id) and viewer_session_id != "" do
|
|
put_heartbeat_session(state, generation_key, viewer_session_id, ip, now_ms)
|
|
end
|
|
|
|
defp put_hls_viewer(state, generation_key, ip, _viewer_session_id, now_ms) do
|
|
put_hls_ip(state, generation_key, ip, now_ms)
|
|
end
|
|
|
|
defp effective_viewer_count(state, generation_key, now_ms) do
|
|
heartbeat_sessions = active_heartbeat_sessions(state, generation_key, now_ms)
|
|
|
|
heartbeat_ips =
|
|
heartbeat_sessions
|
|
|> Enum.map(fn {_key, session} -> session.ip end)
|
|
|> Enum.reject(&is_nil/1)
|
|
|> MapSet.new()
|
|
|
|
hls_ip_count =
|
|
state.hls_ips
|
|
|> Enum.count(fn
|
|
{{^generation_key, ip}, last_seen_ms} ->
|
|
active?(state, last_seen_ms, now_ms) and not MapSet.member?(heartbeat_ips, ip)
|
|
|
|
_other ->
|
|
false
|
|
end)
|
|
|
|
Enum.count(heartbeat_sessions) + hls_ip_count
|
|
end
|
|
|
|
defp active_heartbeat_sessions(state, generation_key, now_ms) do
|
|
Enum.filter(state.heartbeat_sessions, fn
|
|
{{^generation_key, _session_id}, %{last_seen_ms: last_seen_ms}} ->
|
|
active?(state, last_seen_ms, now_ms)
|
|
|
|
_other ->
|
|
false
|
|
end)
|
|
end
|
|
|
|
defp active?(state, last_seen_ms, now_ms), do: now_ms - last_seen_ms <= state.viewer_ttl_ms
|
|
|
|
defp prune_stale_viewers(state, now_ms) do
|
|
%{
|
|
state
|
|
| heartbeat_sessions:
|
|
Map.reject(state.heartbeat_sessions, fn {_key, session} ->
|
|
not active?(state, session.last_seen_ms, now_ms)
|
|
end),
|
|
hls_ips:
|
|
Map.reject(state.hls_ips, fn {_key, last_seen_ms} ->
|
|
not active?(state, last_seen_ms, now_ms)
|
|
end)
|
|
}
|
|
end
|
|
|
|
defp emit_all_viewer_counts(state, now_ms) do
|
|
state
|
|
|> active_generation_keys()
|
|
|> Enum.reduce(state, &emit_viewer_count_if_changed(&2, &1, now_ms))
|
|
end
|
|
|
|
defp active_generation_keys(state) do
|
|
heartbeat_keys =
|
|
Enum.map(state.heartbeat_sessions, fn {{generation_key, _session_id}, _session} ->
|
|
generation_key
|
|
end)
|
|
|
|
hls_keys =
|
|
Enum.map(state.hls_ips, fn {{generation_key, _ip}, _last_seen_ms} -> generation_key end)
|
|
|
|
emitted_keys = Map.keys(state.last_emitted_viewer_counts)
|
|
|
|
(heartbeat_keys ++ hls_keys ++ emitted_keys)
|
|
|> Enum.uniq()
|
|
end
|
|
|
|
defp emit_viewer_count_if_changed(state, generation_key, now_ms) do
|
|
count = effective_viewer_count(state, generation_key, now_ms)
|
|
|
|
case Map.get(state.last_emitted_viewer_counts, generation_key) do
|
|
^count ->
|
|
state
|
|
|
|
_previous ->
|
|
emit_viewer_metric(generation_key, count)
|
|
put_last_emitted_viewer_count(state, generation_key, count)
|
|
end
|
|
end
|
|
|
|
defp emit_viewer_count(state, generation_key, now_ms) do
|
|
count = effective_viewer_count(state, generation_key, now_ms)
|
|
|
|
emit_viewer_metric(generation_key, count)
|
|
put_last_emitted_viewer_count(state, generation_key, count)
|
|
end
|
|
|
|
defp put_last_emitted_viewer_count(state, generation_key, 0) do
|
|
update_in(state.last_emitted_viewer_counts, &Map.delete(&1, generation_key))
|
|
end
|
|
|
|
defp put_last_emitted_viewer_count(state, generation_key, count) do
|
|
put_in(state.last_emitted_viewer_counts[generation_key], count)
|
|
end
|
|
|
|
defp emit_viewer_metric({stream_id, generation_id}, count) do
|
|
:telemetry.execute(
|
|
[:sender, :metrics, :viewer_count],
|
|
%{value: count},
|
|
%{stream_id: stream_id, generation_id: generation_id}
|
|
)
|
|
end
|
|
|
|
defp normalize_countable_ip(remote_ip) do
|
|
case normalize_ip(remote_ip) do
|
|
nil -> {:error, :missing_ip}
|
|
ip -> if localhost_ip?(ip), do: {:error, :localhost_ip}, else: {:ok, ip}
|
|
end
|
|
end
|
|
|
|
defp normalize_ip(nil), do: nil
|
|
|
|
defp normalize_ip(ip) when is_tuple(ip) do
|
|
ip
|
|
|> :inet.ntoa()
|
|
|> to_string()
|
|
rescue
|
|
_error -> nil
|
|
end
|
|
|
|
defp normalize_ip(ip) when is_binary(ip), do: ip
|
|
defp normalize_ip(_ip), do: nil
|
|
|
|
defp normalize_viewer_session_id(nil), do: nil
|
|
|
|
defp normalize_viewer_session_id(viewer_session_id) do
|
|
viewer_session_id
|
|
|> to_string()
|
|
|> String.trim()
|
|
end
|
|
|
|
defp localhost_ip?("::1"), do: true
|
|
defp localhost_ip?("0:0:0:0:0:0:0:1"), do: true
|
|
defp localhost_ip?("localhost"), do: true
|
|
defp localhost_ip?(<<"127.", _rest::binary>>), do: true
|
|
defp localhost_ip?(_ip), do: false
|
|
|
|
defp schedule_cleanup(state) do
|
|
Process.send_after(self(), :cleanup_viewers, state.cleanup_interval_ms)
|
|
end
|
|
|
|
defp now_ms, do: System.monotonic_time(:millisecond)
|
|
|
|
defp nullable_string(nil), do: nil
|
|
defp nullable_string(value), do: to_string(value)
|
|
|
|
defp emit(event, {endpoint_id, stream_id, generation_id}, entry) do
|
|
:telemetry.execute(
|
|
[:sender, :stats, event],
|
|
%{count: 1},
|
|
%{
|
|
endpoint_id: endpoint_id,
|
|
stream_id: stream_id,
|
|
generation_id: generation_id,
|
|
stats: entry
|
|
}
|
|
)
|
|
end
|
|
end
|