Files
tribes-plugin-sender/test/sender/hls_pull_through_test.exs
self f8e2bfaada refactor: use chat capability surfaces
Move Sender modules under TribeOne.TribesPlugin.Sender and replace the Aether-specific chat integration with the public chat@1 surface contract.
2026-05-26 01:13:38 +02:00

128 lines
3.7 KiB
Elixir

defmodule TribeOne.TribesPlugin.Sender.HLSPullThroughTest do
use ExUnit.Case, async: true
alias TribeOne.TribesPlugin.Sender.MediaBackend.HLSPullThrough.Worker
alias TribeOne.TribesPlugin.Sender.Streaming.StreamGeneration
test "mirrors a live playlist and segments into the local spool" do
stream_id = Ash.UUID.generate()
generation_id = Ash.UUID.generate()
root =
Path.join(System.tmp_dir!(), "sender-hls-edge-test-#{System.unique_integer([:positive])}")
generation = %StreamGeneration{id: generation_id, stream_id: stream_id}
playlist = """
#EXTM3U
#EXTINF:2.0,
segment-00001.ts
"""
playlist_url =
"https://origin.example/sender/hls/streams/#{stream_id}/#{generation_id}/media.m3u8"
segment_url =
"https://origin.example/sender/hls/streams/#{stream_id}/#{generation_id}/segment-00001.ts"
fetch_fun = fn
^playlist_url ->
{:ok, playlist}
^segment_url ->
{:ok, "segment"}
url ->
{:error, {:unexpected_url, url}}
end
{:ok, pid} =
start_supervised(
{Worker,
generation: generation,
source_base_url: "https://origin.example/sender/hls",
spool_root: root,
poll_interval_ms: 60_000,
fetch_fun: fetch_fun}
)
output_dir = Path.join([root, "streams", stream_id, generation_id])
assert_eventually(fn ->
assert File.read!(Path.join(output_dir, "segment-00001.ts")) == "segment"
assert File.read!(Path.join(output_dir, "media.m3u8")) == playlist
end)
assert %{status: :running, output_dir: ^output_dir, source_tls_verify: true} =
Worker.health(pid)
end
test "prunes unreferenced segments beyond the configured delete threshold" do
stream_id = Ash.UUID.generate()
generation_id = Ash.UUID.generate()
root =
Path.join(
System.tmp_dir!(),
"sender-hls-edge-prune-test-#{System.unique_integer([:positive])}"
)
generation = %StreamGeneration{id: generation_id, stream_id: stream_id}
output_dir = Path.join([root, "streams", stream_id, generation_id])
File.mkdir_p!(output_dir)
File.write!(Path.join(output_dir, "segment-00001.ts"), "stale-1")
File.write!(Path.join(output_dir, "segment-00002.ts"), "stale-2")
playlist = """
#EXTM3U
#EXT-X-MEDIA-SEQUENCE:3
#EXTINF:2.0,
segment-00003.ts
"""
playlist_url =
"https://origin.example/sender/hls/streams/#{stream_id}/#{generation_id}/media.m3u8"
segment_url =
"https://origin.example/sender/hls/streams/#{stream_id}/#{generation_id}/segment-00003.ts"
fetch_fun = fn
^playlist_url -> {:ok, playlist}
^segment_url -> {:ok, "fresh-3"}
url -> {:error, {:unexpected_url, url}}
end
{:ok, _pid} =
start_supervised(
{Worker,
generation: generation,
source_base_url: "https://origin.example/sender/hls",
spool_root: root,
poll_interval_ms: 60_000,
hls_delete_threshold: 1,
fetch_fun: fetch_fun}
)
assert_eventually(fn ->
refute File.exists?(Path.join(output_dir, "segment-00001.ts"))
assert File.read!(Path.join(output_dir, "segment-00002.ts")) == "stale-2"
assert File.read!(Path.join(output_dir, "segment-00003.ts")) == "fresh-3"
assert File.read!(Path.join(output_dir, "media.m3u8")) == playlist
end)
end
defp assert_eventually(callback, attempts \\ 20)
defp assert_eventually(callback, attempts) when attempts > 0 do
callback.()
rescue
error in [ExUnit.AssertionError, File.Error] ->
if attempts == 1 do
reraise(error, __STACKTRACE__)
else
Process.sleep(50)
assert_eventually(callback, attempts - 1)
end
end
end