Files
self f8e2bfaada refactor: use chat capability surfaces
Move Sender modules under TribeOne.TribesPlugin.Sender and replace the Aether-specific chat integration with the public chat@1 surface contract.
2026-05-26 01:13:38 +02:00

431 lines
12 KiB
Elixir

defmodule TribeOne.TribesPlugin.Sender.Stats do
@moduledoc """
Node-local live streaming counters.
This process intentionally keeps raw counters local. Cluster-wide state is
published separately as coarse `EndpointSnapshot` rows and host metrics
rollups.
"""
use GenServer
@type key :: {String.t() | nil, String.t() | nil, String.t() | nil}
@type generation_key :: {String.t() | nil, String.t() | nil}
@viewer_ttl_ms 90_000
@cleanup_interval_ms 15_000
def start_link(opts \\ []) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
def increment_viewers(endpoint_id, stream_id, generation_id) do
GenServer.call(__MODULE__, {:increment_viewers, key(endpoint_id, stream_id, generation_id)})
end
def decrement_viewers(endpoint_id, stream_id, generation_id) do
GenServer.call(__MODULE__, {:decrement_viewers, key(endpoint_id, stream_id, generation_id)})
end
def record_bitrate(endpoint_id, stream_id, generation_id, measurements)
when is_map(measurements) do
GenServer.call(
__MODULE__,
{:record_bitrate, key(endpoint_id, stream_id, generation_id), measurements}
)
end
def mark_segment(endpoint_id, stream_id, generation_id, attrs \\ []) do
GenServer.call(__MODULE__, {:mark_segment, key(endpoint_id, stream_id, generation_id), attrs})
end
def record_viewer_heartbeat(stream_id, generation_id, viewer_session_id, remote_ip) do
case normalize_countable_ip(remote_ip) do
{:ok, ip} ->
GenServer.call(
__MODULE__,
{:record_viewer_heartbeat, generation_key(stream_id, generation_id), viewer_session_id,
ip}
)
{:error, _reason} ->
:ignored
end
end
def record_hls_access(path_segments, remote_ip, viewer_session_id \\ nil)
when is_list(path_segments) do
with {:ok, generation_key} <- generation_key_from_hls_path(path_segments),
{:ok, ip} <- normalize_countable_ip(remote_ip) do
GenServer.call(
__MODULE__,
{:record_hls_access, generation_key, ip, normalize_viewer_session_id(viewer_session_id)}
)
else
_reason -> :ignored
end
end
def snapshot(endpoint_id, stream_id, generation_id) do
GenServer.call(__MODULE__, {:snapshot, key(endpoint_id, stream_id, generation_id)})
end
def viewer_count(stream_id, generation_id) do
GenServer.call(__MODULE__, {:viewer_count, generation_key(stream_id, generation_id)})
end
def reset do
GenServer.call(__MODULE__, :reset)
end
@impl true
def init(opts) do
state = fresh_state(opts)
schedule_cleanup(state)
{:ok, state}
end
@impl true
def handle_call({:increment_viewers, key}, _from, state) do
next = update_metric(state, key, :viewer_count, &(&1 + 1))
entry = Map.fetch!(next.entries, key)
emit(:viewer_count, key, entry)
{:reply, entry, next}
end
def handle_call({:decrement_viewers, key}, _from, state) do
next = update_metric(state, key, :viewer_count, &max(&1 - 1, 0))
entry = Map.fetch!(next.entries, key)
emit(:viewer_count, key, entry)
{:reply, entry, next}
end
def handle_call({:record_bitrate, key, measurements}, _from, state) do
next =
update_entry(state, key, fn entry ->
entry
|> maybe_put(:ingress_bps, measurements)
|> maybe_put(:egress_bps, measurements)
end)
entry = Map.fetch!(next.entries, key)
emit(:bitrate, key, entry)
{:reply, entry, next}
end
def handle_call({:mark_segment, key, attrs}, _from, state) do
now = DateTime.utc_now() |> DateTime.truncate(:second)
next =
update_entry(state, key, fn entry ->
entry
|> Map.put(:last_segment_at, Keyword.get(attrs, :last_segment_at, now))
|> maybe_put_kw(:segment_lag_ms, attrs)
|> maybe_put_kw(:playlist_age_ms, attrs)
end)
entry = Map.fetch!(next.entries, key)
emit(:segment, key, entry)
{:reply, entry, next}
end
def handle_call({:record_viewer_heartbeat, generation_key, viewer_session_id, ip}, _from, state) do
now_ms = now_ms()
next =
state
|> prune_stale_viewers(now_ms)
|> put_heartbeat_session(generation_key, viewer_session_id, ip, now_ms)
|> emit_viewer_count(generation_key, now_ms)
{:reply, effective_viewer_count(next, generation_key, now_ms), next}
end
def handle_call({:record_hls_access, generation_key, ip, viewer_session_id}, _from, state) do
now_ms = now_ms()
next =
state
|> prune_stale_viewers(now_ms)
|> put_hls_viewer(generation_key, ip, viewer_session_id, now_ms)
|> emit_viewer_count_if_changed(generation_key, now_ms)
{:reply, effective_viewer_count(next, generation_key, now_ms), next}
end
def handle_call({:snapshot, key}, _from, state) do
{:reply, Map.get(state.entries, key, default_entry()), state}
end
def handle_call({:viewer_count, generation_key}, _from, state) do
now_ms = now_ms()
next = prune_stale_viewers(state, now_ms)
{:reply, effective_viewer_count(next, generation_key, now_ms), next}
end
def handle_call(:reset, _from, state) do
{:reply, :ok, fresh_state(state)}
end
@impl true
def handle_info(:cleanup_viewers, state) do
now_ms = now_ms()
next =
state
|> prune_stale_viewers(now_ms)
|> emit_all_viewer_counts(now_ms)
schedule_cleanup(next)
{:noreply, next}
end
defp fresh_state(opts) do
%{
entries: %{},
heartbeat_sessions: %{},
hls_ips: %{},
last_emitted_viewer_counts: %{},
viewer_ttl_ms: option(opts, :viewer_ttl_ms, @viewer_ttl_ms),
cleanup_interval_ms: option(opts, :cleanup_interval_ms, @cleanup_interval_ms)
}
end
defp option(opts, key, default) when is_list(opts), do: Keyword.get(opts, key, default)
defp option(%{} = opts, key, default), do: Map.get(opts, key, default)
defp update_metric(state, key, metric, callback) do
update_entry(state, key, fn entry ->
Map.update!(entry, metric, callback)
end)
end
defp update_entry(state, key, callback) do
update_in(state.entries, &Map.update(&1, key, callback.(default_entry()), callback))
end
defp maybe_put(entry, key, measurements) do
case Map.fetch(measurements, key) do
{:ok, value} -> Map.put(entry, key, value)
:error -> entry
end
end
defp maybe_put_kw(entry, key, measurements) do
case Keyword.fetch(measurements, key) do
{:ok, value} -> Map.put(entry, key, value)
:error -> entry
end
end
defp default_entry do
%{
viewer_count: 0,
ingress_bps: nil,
egress_bps: nil,
segment_lag_ms: nil,
playlist_age_ms: nil,
last_segment_at: nil
}
end
defp key(endpoint_id, stream_id, generation_id) do
{nullable_string(endpoint_id), nullable_string(stream_id), nullable_string(generation_id)}
end
defp generation_key(stream_id, generation_id) do
{nullable_string(stream_id), nullable_string(generation_id)}
end
defp generation_key_from_hls_path(["streams", stream_id, generation_id | _rest]),
do: {:ok, generation_key(stream_id, generation_id)}
defp generation_key_from_hls_path([stream_id, generation_id | _rest]),
do: {:ok, generation_key(stream_id, generation_id)}
defp generation_key_from_hls_path(_path_segments), do: {:error, :unknown_hls_generation}
defp put_heartbeat_session(state, generation_key, viewer_session_id, ip, now_ms)
when is_binary(viewer_session_id) and viewer_session_id != "" do
session_key = {generation_key, viewer_session_id}
put_in(state.heartbeat_sessions[session_key], %{ip: ip, last_seen_ms: now_ms})
end
defp put_heartbeat_session(state, _generation_key, _viewer_session_id, _ip, _now_ms), do: state
defp put_hls_ip(state, generation_key, ip, now_ms) do
put_in(state.hls_ips[{generation_key, ip}], now_ms)
end
defp put_hls_viewer(state, generation_key, ip, viewer_session_id, now_ms)
when is_binary(viewer_session_id) and viewer_session_id != "" do
put_heartbeat_session(state, generation_key, viewer_session_id, ip, now_ms)
end
defp put_hls_viewer(state, generation_key, ip, _viewer_session_id, now_ms) do
put_hls_ip(state, generation_key, ip, now_ms)
end
defp effective_viewer_count(state, generation_key, now_ms) do
heartbeat_sessions = active_heartbeat_sessions(state, generation_key, now_ms)
heartbeat_ips =
heartbeat_sessions
|> Enum.map(fn {_key, session} -> session.ip end)
|> Enum.reject(&is_nil/1)
|> MapSet.new()
hls_ip_count =
state.hls_ips
|> Enum.count(fn
{{^generation_key, ip}, last_seen_ms} ->
active?(state, last_seen_ms, now_ms) and not MapSet.member?(heartbeat_ips, ip)
_other ->
false
end)
Enum.count(heartbeat_sessions) + hls_ip_count
end
defp active_heartbeat_sessions(state, generation_key, now_ms) do
Enum.filter(state.heartbeat_sessions, fn
{{^generation_key, _session_id}, %{last_seen_ms: last_seen_ms}} ->
active?(state, last_seen_ms, now_ms)
_other ->
false
end)
end
defp active?(state, last_seen_ms, now_ms), do: now_ms - last_seen_ms <= state.viewer_ttl_ms
defp prune_stale_viewers(state, now_ms) do
%{
state
| heartbeat_sessions:
Map.reject(state.heartbeat_sessions, fn {_key, session} ->
not active?(state, session.last_seen_ms, now_ms)
end),
hls_ips:
Map.reject(state.hls_ips, fn {_key, last_seen_ms} ->
not active?(state, last_seen_ms, now_ms)
end)
}
end
defp emit_all_viewer_counts(state, now_ms) do
state
|> active_generation_keys()
|> Enum.reduce(state, &emit_viewer_count_if_changed(&2, &1, now_ms))
end
defp active_generation_keys(state) do
heartbeat_keys =
Enum.map(state.heartbeat_sessions, fn {{generation_key, _session_id}, _session} ->
generation_key
end)
hls_keys =
Enum.map(state.hls_ips, fn {{generation_key, _ip}, _last_seen_ms} -> generation_key end)
emitted_keys = Map.keys(state.last_emitted_viewer_counts)
(heartbeat_keys ++ hls_keys ++ emitted_keys)
|> Enum.uniq()
end
defp emit_viewer_count_if_changed(state, generation_key, now_ms) do
count = effective_viewer_count(state, generation_key, now_ms)
case Map.get(state.last_emitted_viewer_counts, generation_key) do
^count ->
state
_previous ->
emit_viewer_metric(generation_key, count)
put_last_emitted_viewer_count(state, generation_key, count)
end
end
defp emit_viewer_count(state, generation_key, now_ms) do
count = effective_viewer_count(state, generation_key, now_ms)
emit_viewer_metric(generation_key, count)
put_last_emitted_viewer_count(state, generation_key, count)
end
defp put_last_emitted_viewer_count(state, generation_key, 0) do
update_in(state.last_emitted_viewer_counts, &Map.delete(&1, generation_key))
end
defp put_last_emitted_viewer_count(state, generation_key, count) do
put_in(state.last_emitted_viewer_counts[generation_key], count)
end
defp emit_viewer_metric({stream_id, generation_id}, count) do
:telemetry.execute(
[:sender, :metrics, :viewer_count],
%{value: count},
%{stream_id: stream_id, generation_id: generation_id}
)
end
defp normalize_countable_ip(remote_ip) do
case normalize_ip(remote_ip) do
nil -> {:error, :missing_ip}
ip -> if localhost_ip?(ip), do: {:error, :localhost_ip}, else: {:ok, ip}
end
end
defp normalize_ip(nil), do: nil
defp normalize_ip(ip) when is_tuple(ip) do
ip
|> :inet.ntoa()
|> to_string()
rescue
_error -> nil
end
defp normalize_ip(ip) when is_binary(ip), do: ip
defp normalize_ip(_ip), do: nil
defp normalize_viewer_session_id(nil), do: nil
defp normalize_viewer_session_id(viewer_session_id) do
viewer_session_id
|> to_string()
|> String.trim()
end
defp localhost_ip?("::1"), do: true
defp localhost_ip?("0:0:0:0:0:0:0:1"), do: true
defp localhost_ip?("localhost"), do: true
defp localhost_ip?(<<"127.", _rest::binary>>), do: true
defp localhost_ip?(_ip), do: false
defp schedule_cleanup(state) do
Process.send_after(self(), :cleanup_viewers, state.cleanup_interval_ms)
end
defp now_ms, do: System.monotonic_time(:millisecond)
defp nullable_string(nil), do: nil
defp nullable_string(value), do: to_string(value)
defp emit(event, {endpoint_id, stream_id, generation_id}, entry) do
:telemetry.execute(
[:sender, :stats, event],
%{count: 1},
%{
endpoint_id: endpoint_id,
stream_id: stream_id,
generation_id: generation_id,
stats: entry
}
)
end
end