You've already forked tribes-plugin-sender
forked from tribes/tribes-plugin-template
f8e2bfaada
Move Sender modules under TribeOne.TribesPlugin.Sender and replace the Aether-specific chat integration with the public chat@1 surface contract.
128 lines
3.7 KiB
Elixir
128 lines
3.7 KiB
Elixir
defmodule TribeOne.TribesPlugin.Sender.HLSPullThroughTest do
|
|
use ExUnit.Case, async: true
|
|
|
|
alias TribeOne.TribesPlugin.Sender.MediaBackend.HLSPullThrough.Worker
|
|
alias TribeOne.TribesPlugin.Sender.Streaming.StreamGeneration
|
|
|
|
test "mirrors a live playlist and segments into the local spool" do
|
|
stream_id = Ash.UUID.generate()
|
|
generation_id = Ash.UUID.generate()
|
|
|
|
root =
|
|
Path.join(System.tmp_dir!(), "sender-hls-edge-test-#{System.unique_integer([:positive])}")
|
|
|
|
generation = %StreamGeneration{id: generation_id, stream_id: stream_id}
|
|
|
|
playlist = """
|
|
#EXTM3U
|
|
#EXTINF:2.0,
|
|
segment-00001.ts
|
|
"""
|
|
|
|
playlist_url =
|
|
"https://origin.example/sender/hls/streams/#{stream_id}/#{generation_id}/media.m3u8"
|
|
|
|
segment_url =
|
|
"https://origin.example/sender/hls/streams/#{stream_id}/#{generation_id}/segment-00001.ts"
|
|
|
|
fetch_fun = fn
|
|
^playlist_url ->
|
|
{:ok, playlist}
|
|
|
|
^segment_url ->
|
|
{:ok, "segment"}
|
|
|
|
url ->
|
|
{:error, {:unexpected_url, url}}
|
|
end
|
|
|
|
{:ok, pid} =
|
|
start_supervised(
|
|
{Worker,
|
|
generation: generation,
|
|
source_base_url: "https://origin.example/sender/hls",
|
|
spool_root: root,
|
|
poll_interval_ms: 60_000,
|
|
fetch_fun: fetch_fun}
|
|
)
|
|
|
|
output_dir = Path.join([root, "streams", stream_id, generation_id])
|
|
|
|
assert_eventually(fn ->
|
|
assert File.read!(Path.join(output_dir, "segment-00001.ts")) == "segment"
|
|
assert File.read!(Path.join(output_dir, "media.m3u8")) == playlist
|
|
end)
|
|
|
|
assert %{status: :running, output_dir: ^output_dir, source_tls_verify: true} =
|
|
Worker.health(pid)
|
|
end
|
|
|
|
test "prunes unreferenced segments beyond the configured delete threshold" do
|
|
stream_id = Ash.UUID.generate()
|
|
generation_id = Ash.UUID.generate()
|
|
|
|
root =
|
|
Path.join(
|
|
System.tmp_dir!(),
|
|
"sender-hls-edge-prune-test-#{System.unique_integer([:positive])}"
|
|
)
|
|
|
|
generation = %StreamGeneration{id: generation_id, stream_id: stream_id}
|
|
output_dir = Path.join([root, "streams", stream_id, generation_id])
|
|
File.mkdir_p!(output_dir)
|
|
File.write!(Path.join(output_dir, "segment-00001.ts"), "stale-1")
|
|
File.write!(Path.join(output_dir, "segment-00002.ts"), "stale-2")
|
|
|
|
playlist = """
|
|
#EXTM3U
|
|
#EXT-X-MEDIA-SEQUENCE:3
|
|
#EXTINF:2.0,
|
|
segment-00003.ts
|
|
"""
|
|
|
|
playlist_url =
|
|
"https://origin.example/sender/hls/streams/#{stream_id}/#{generation_id}/media.m3u8"
|
|
|
|
segment_url =
|
|
"https://origin.example/sender/hls/streams/#{stream_id}/#{generation_id}/segment-00003.ts"
|
|
|
|
fetch_fun = fn
|
|
^playlist_url -> {:ok, playlist}
|
|
^segment_url -> {:ok, "fresh-3"}
|
|
url -> {:error, {:unexpected_url, url}}
|
|
end
|
|
|
|
{:ok, _pid} =
|
|
start_supervised(
|
|
{Worker,
|
|
generation: generation,
|
|
source_base_url: "https://origin.example/sender/hls",
|
|
spool_root: root,
|
|
poll_interval_ms: 60_000,
|
|
hls_delete_threshold: 1,
|
|
fetch_fun: fetch_fun}
|
|
)
|
|
|
|
assert_eventually(fn ->
|
|
refute File.exists?(Path.join(output_dir, "segment-00001.ts"))
|
|
assert File.read!(Path.join(output_dir, "segment-00002.ts")) == "stale-2"
|
|
assert File.read!(Path.join(output_dir, "segment-00003.ts")) == "fresh-3"
|
|
assert File.read!(Path.join(output_dir, "media.m3u8")) == playlist
|
|
end)
|
|
end
|
|
|
|
defp assert_eventually(callback, attempts \\ 20)
|
|
|
|
defp assert_eventually(callback, attempts) when attempts > 0 do
|
|
callback.()
|
|
rescue
|
|
error in [ExUnit.AssertionError, File.Error] ->
|
|
if attempts == 1 do
|
|
reraise(error, __STACKTRACE__)
|
|
else
|
|
Process.sleep(50)
|
|
assert_eventually(callback, attempts - 1)
|
|
end
|
|
end
|
|
end
|