You've already forked tribes-plugin-sender
forked from tribes/tribes-plugin-template
3e288a7e95
Rename Sender plugin identity to tribe-one-sender across manifest, runtime API routes, metrics, e2e fixtures, and tests.
641 lines
19 KiB
Elixir
641 lines
19 KiB
Elixir
defmodule TribeOne.TribesPlugin.Sender.E2ERunner do
|
|
alias Parrhesia.API.Auth
|
|
|
|
@admin_pubkey "985826ac4ec99f6304785ebfaa303ec42973653a682cfa0a06caf9fec662eaa7"
|
|
@admin_privkey Base.decode16!(
|
|
"7ee088208557dcc06405ccf6547115b62a597c5d107ee6044bb430e71a68655c",
|
|
case: :lower
|
|
)
|
|
|
|
@origin "http://127.0.0.1:46101"
|
|
@edge "http://127.0.0.1:46102"
|
|
@edge_vm "http://127.0.0.1:48112"
|
|
@origin_hls_base "http://sender-origin:4000/sender/hls"
|
|
@rtmp_push_url "rtmp://127.0.0.1:49135/live/source"
|
|
|
|
def main(_argv) do
|
|
:ok = ensure_http_client_started()
|
|
|
|
assert_ok(wait_victoria_metrics(@edge_vm), "edge VictoriaMetrics ready")
|
|
assert_ok(wait_http(@origin <> "/sign-in"), "origin node ready")
|
|
assert_ok(wait_http(@edge <> "/sign-in"), "edge node ready")
|
|
|
|
{:ok, origin_info} =
|
|
admin_with_retry(@origin, "node_info", %{}, attempts: 90, delay_ms: 1_000)
|
|
|
|
{:ok, edge_info} = admin_with_retry(@edge, "node_info", %{}, attempts: 90, delay_ms: 1_000)
|
|
|
|
assert_plugin_loaded!(@origin, "origin")
|
|
assert_plugin_loaded!(@edge, "edge")
|
|
|
|
:ok = connect_cluster_pair(@origin, origin_info, @edge, edge_info)
|
|
|
|
{:ok, %{"endpoint" => origin_endpoint}} =
|
|
sender(@origin, "media_endpoints.upsert", %{
|
|
"type" => "tribes_origin",
|
|
"node_pubkey" => node_pubkey(origin_info),
|
|
"display_name" => "Origin",
|
|
"hls_base_url" => @origin_hls_base,
|
|
"rtmp_ingest_url" => "rtmp://sender-origin:1935/live/source"
|
|
})
|
|
|
|
{:ok, _mirrored_origin_endpoint} =
|
|
sender(@edge, "media_endpoints.upsert", %{
|
|
"id" => origin_endpoint["id"],
|
|
"type" => "tribes_origin",
|
|
"node_pubkey" => node_pubkey(origin_info),
|
|
"display_name" => "Origin",
|
|
"hls_base_url" => @origin_hls_base,
|
|
"rtmp_ingest_url" => "rtmp://sender-origin:1935/live/source"
|
|
})
|
|
|
|
{:ok, %{"endpoint" => edge_endpoint}} =
|
|
sender(@edge, "media_endpoints.upsert", %{
|
|
"type" => "tribes_edge",
|
|
"node_pubkey" => node_pubkey(edge_info),
|
|
"source_endpoint_id" => origin_endpoint["id"],
|
|
"display_name" => "Edge"
|
|
})
|
|
|
|
{:ok, %{"generation" => generation, "stream" => stream}} =
|
|
sender(@origin, "stream.start", %{
|
|
"endpoint_id" => origin_endpoint["id"],
|
|
"input_url" => "rtmp://0.0.0.0:1935/live/source",
|
|
"ffmpeg_loglevel" => "info",
|
|
"ffmpeg_log_output" => "warning",
|
|
"backend_restart_delay_ms" => 250,
|
|
"backend_restart_max_delay_ms" => 1_000,
|
|
"readiness_poll_interval_ms" => 250
|
|
})
|
|
|
|
{:ok, _edge_start} =
|
|
sender_with_retry(
|
|
@edge,
|
|
"stream.start",
|
|
%{
|
|
"mode" => "hls_edge",
|
|
"endpoint_id" => edge_endpoint["id"],
|
|
"generation_id" => generation["id"],
|
|
"backend_restart_delay_ms" => 250,
|
|
"backend_restart_max_delay_ms" => 1_000,
|
|
"edge_poll_interval_ms" => 250,
|
|
"source_request_headers" => %{
|
|
"x-forwarded-proto" => "https"
|
|
}
|
|
},
|
|
attempts: 90,
|
|
delay_ms: 1_000
|
|
)
|
|
|
|
:ok = push_test_stream()
|
|
|
|
assert_ok(
|
|
wait_sender_status(@origin, generation["id"], "origin_ingest", "live") == :ok,
|
|
"origin generation becomes live",
|
|
&dump_debug/0
|
|
)
|
|
|
|
{_playlist, segment_ref} = wait_edge_playlist!(stream["id"], generation["id"])
|
|
|
|
assert_ok(
|
|
fetch_edge_segment(stream["id"], generation["id"], segment_ref),
|
|
"edge segment served"
|
|
)
|
|
|
|
{:ok, %{"session" => edge_status}} =
|
|
sender(@edge, "stream.status", %{
|
|
"mode" => "hls_edge",
|
|
"generation_id" => generation["id"]
|
|
})
|
|
|
|
assert_ok(edge_status["status"] == "running", "edge session running", &dump_debug/0)
|
|
|
|
:ok = emit_viewer_heartbeats!(stream["id"], generation["id"])
|
|
|
|
assert_ok(
|
|
wait_for_sender_metric_export(@edge, 2) == :ok,
|
|
"sender viewer count exported on /metrics",
|
|
&dump_metrics_debug/0
|
|
)
|
|
|
|
assert_ok(
|
|
wait_for_sender_metric_rollup(@edge, 2, fn ->
|
|
emit_viewer_heartbeats!(stream["id"], generation["id"])
|
|
end) == :ok,
|
|
"sender viewer count appears in metrics rollup",
|
|
&dump_metrics_debug/0
|
|
)
|
|
|
|
_ = sender(@edge, "stream.stop", %{"mode" => "hls_edge", "generation_id" => generation["id"]})
|
|
_ = sender(@origin, "stream.stop", %{"generation_id" => generation["id"]})
|
|
|
|
IO.puts("sender e2e assertions passed")
|
|
end
|
|
|
|
defp assert_plugin_loaded!(base_url, label) do
|
|
{:ok, plugins} = admin_with_retry(base_url, "plugin_list", %{}, attempts: 30, delay_ms: 1_000)
|
|
|
|
loaded? =
|
|
Enum.any?(plugins["plugins"], fn
|
|
%{"name" => "tribe-one-sender", "status" => "loaded"} -> true
|
|
_other -> false
|
|
end)
|
|
|
|
assert_ok(loaded?, "#{label} sender plugin loaded")
|
|
end
|
|
|
|
defp sender(base_url, method, params) do
|
|
admin(base_url, "plugin.call", %{
|
|
"plugin" => "tribe-one-sender",
|
|
"method" => method,
|
|
"version" => "1",
|
|
"params" => params
|
|
})
|
|
end
|
|
|
|
defp sender_with_retry(base_url, method, params, opts) do
|
|
attempts = Keyword.fetch!(opts, :attempts)
|
|
delay_ms = Keyword.fetch!(opts, :delay_ms)
|
|
|
|
Enum.reduce_while(1..attempts, {:error, :retry_exhausted}, fn attempt, _acc ->
|
|
case sender(base_url, method, params) do
|
|
{:ok, _result} = ok ->
|
|
{:halt, ok}
|
|
|
|
{:error, reason} = error when attempt < attempts ->
|
|
maybe_log_retry(method, attempt, attempts, reason)
|
|
Process.sleep(delay_ms)
|
|
{:cont, error}
|
|
|
|
error ->
|
|
{:halt, error}
|
|
end
|
|
end)
|
|
end
|
|
|
|
defp maybe_log_retry(method, attempt, attempts, reason) do
|
|
if attempt == 1 or rem(attempt, 10) == 0 do
|
|
IO.puts("sender #{method} retry #{attempt}/#{attempts}: #{retry_reason(reason)}")
|
|
end
|
|
end
|
|
|
|
defp retry_reason(%{status: status, body: %{"error" => error}}) when is_binary(error) do
|
|
"#{status} #{String.slice(error, 0, 160)}"
|
|
end
|
|
|
|
defp retry_reason(reason), do: inspect(reason)
|
|
|
|
defp push_test_stream do
|
|
args = [
|
|
"-hide_banner",
|
|
"-loglevel",
|
|
"warning",
|
|
"-re",
|
|
"-f",
|
|
"lavfi",
|
|
"-i",
|
|
"testsrc=size=320x180:rate=15",
|
|
"-f",
|
|
"lavfi",
|
|
"-i",
|
|
"sine=frequency=1000:sample_rate=48000",
|
|
"-t",
|
|
"18",
|
|
"-c:v",
|
|
"libx264",
|
|
"-preset",
|
|
"ultrafast",
|
|
"-tune",
|
|
"zerolatency",
|
|
"-pix_fmt",
|
|
"yuv420p",
|
|
"-g",
|
|
"30",
|
|
"-keyint_min",
|
|
"30",
|
|
"-sc_threshold",
|
|
"0",
|
|
"-c:a",
|
|
"aac",
|
|
"-f",
|
|
"flv",
|
|
@rtmp_push_url
|
|
]
|
|
|
|
Enum.reduce_while(1..30, {:error, :retry_exhausted}, fn attempt, _acc ->
|
|
case System.cmd("ffmpeg", args, stderr_to_stdout: true) do
|
|
{_output, 0} ->
|
|
{:halt, :ok}
|
|
|
|
{output, _status} when attempt < 30 ->
|
|
IO.puts("ffmpeg push retry #{attempt}/30: #{String.trim(output)}")
|
|
Process.sleep(1_000)
|
|
{:cont, {:error, output}}
|
|
|
|
{output, status} ->
|
|
{:halt, {:error, {:ffmpeg_push_failed, status, output}}}
|
|
end
|
|
end)
|
|
|> case do
|
|
:ok ->
|
|
:ok
|
|
|
|
{:error, reason} ->
|
|
assert_ok(false, "ffmpeg push failed: #{inspect(reason)}", &dump_debug/0)
|
|
end
|
|
end
|
|
|
|
defp wait_sender_status(base_url, generation_id, role, expected_generation_status) do
|
|
mode = if role == "hls_edge", do: "hls_edge", else: "origin_ingest"
|
|
|
|
Enum.reduce_while(1..90, {:error, :timeout}, fn _attempt, _acc ->
|
|
case sender(base_url, "stream.status", %{"mode" => mode, "generation_id" => generation_id}) do
|
|
{:ok, %{"generation" => %{"status" => ^expected_generation_status}}} ->
|
|
{:halt, :ok}
|
|
|
|
other ->
|
|
Process.sleep(1_000)
|
|
{:cont, other}
|
|
end
|
|
end)
|
|
end
|
|
|
|
defp wait_edge_playlist!(stream_id, generation_id) do
|
|
path = viewer_playlist_path(stream_id, generation_id, "browser-1")
|
|
|
|
Enum.reduce_while(1..120, {:error, :timeout}, fn _attempt, _acc ->
|
|
case http_get(@edge <> path, viewer_headers()) do
|
|
{:ok, 200, "#EXTM3U" <> _rest = playlist} ->
|
|
case first_segment_ref(playlist) do
|
|
{:ok, segment_ref} -> {:halt, {playlist, segment_ref}}
|
|
{:error, _reason} -> retry_playlist({:missing_segment, playlist})
|
|
end
|
|
|
|
{:ok, 200, body} ->
|
|
retry_playlist({:non_hls_playlist, String.slice(body, 0, 200)})
|
|
|
|
other ->
|
|
retry_playlist(other)
|
|
end
|
|
end)
|
|
|> case do
|
|
{playlist, segment_ref} ->
|
|
{playlist, segment_ref}
|
|
|
|
other ->
|
|
assert_ok(false, "edge playlist not ready: #{inspect(other)}", &dump_debug/0)
|
|
end
|
|
end
|
|
|
|
defp retry_playlist(reason) do
|
|
Process.sleep(1_000)
|
|
{:cont, reason}
|
|
end
|
|
|
|
defp first_segment_ref(playlist) do
|
|
playlist
|
|
|> String.split("\n")
|
|
|> Enum.map(&String.trim/1)
|
|
|> Enum.find(fn line -> line != "" and not String.starts_with?(line, "#") end)
|
|
|> case do
|
|
nil -> {:error, :missing_segment}
|
|
segment -> {:ok, segment}
|
|
end
|
|
end
|
|
|
|
defp fetch_edge_segment(stream_id, generation_id, segment_ref) do
|
|
path = "/sender/hls/streams/#{stream_id}/#{generation_id}/#{segment_ref}"
|
|
|
|
case http_get(@edge <> path, viewer_headers()) do
|
|
{:ok, 200, body} -> byte_size(body) > 0
|
|
_other -> false
|
|
end
|
|
end
|
|
|
|
defp viewer_playlist_path(stream_id, generation_id, viewer_session_id) do
|
|
"/sender/hls/streams/#{stream_id}/#{generation_id}/media.m3u8?vsid=#{viewer_session_id}"
|
|
end
|
|
|
|
defp emit_viewer_heartbeats!(stream_id, generation_id) do
|
|
["browser-1", "browser-2"]
|
|
|> Enum.with_index(1)
|
|
|> Enum.each(fn {viewer_session_id, expected_count} ->
|
|
{:ok, 202, %{"ok" => true, "viewer_count" => viewer_count}} =
|
|
post_viewer_heartbeat(stream_id, generation_id, viewer_session_id)
|
|
|
|
assert_ok(
|
|
viewer_count == expected_count or viewer_count == 2,
|
|
"viewer heartbeat counted session #{viewer_session_id}, got #{inspect(viewer_count)}"
|
|
)
|
|
end)
|
|
|
|
:ok
|
|
end
|
|
|
|
defp post_viewer_heartbeat(stream_id, generation_id, viewer_session_id) do
|
|
http_post_json(@edge <> "/plugins-api/tribe-one-sender/player-events", %{
|
|
"event" => "heartbeat",
|
|
"stream_id" => stream_id,
|
|
"generation_id" => generation_id,
|
|
"viewer_session_id" => viewer_session_id
|
|
})
|
|
end
|
|
|
|
defp wait_for_sender_metric_rollup(base_url, expected_value, keepalive_fun) do
|
|
Enum.reduce_while(1..150, {:error, :timeout}, fn _attempt, _acc ->
|
|
keepalive_fun.()
|
|
|
|
case admin(base_url, "metrics_rollups.list", %{"limit" => 120}) do
|
|
{:ok, %{"rollups" => rollups}} ->
|
|
if Enum.any?(rollups, &sender_metric_rollup?(&1, expected_value)) do
|
|
{:halt, :ok}
|
|
else
|
|
Process.sleep(1_000)
|
|
{:cont, {:error, rollups}}
|
|
end
|
|
|
|
other ->
|
|
Process.sleep(1_000)
|
|
{:cont, other}
|
|
end
|
|
end)
|
|
end
|
|
|
|
defp wait_for_sender_metric_export(base_url, expected_value) do
|
|
Enum.reduce_while(1..30, {:error, :timeout}, fn _attempt, _acc ->
|
|
case http_get(base_url <> "/metrics") do
|
|
{:ok, 200, body} ->
|
|
if metrics_body_has_sender_viewer_count?(body, expected_value) do
|
|
{:halt, :ok}
|
|
else
|
|
Process.sleep(1_000)
|
|
{:cont, {:error, String.slice(body, 0, 2_000)}}
|
|
end
|
|
|
|
other ->
|
|
Process.sleep(1_000)
|
|
{:cont, other}
|
|
end
|
|
end)
|
|
end
|
|
|
|
defp metrics_body_has_sender_viewer_count?(body, expected_value) do
|
|
expected = Integer.to_string(expected_value)
|
|
|
|
body
|
|
|> String.split("\n")
|
|
|> Enum.any?(fn line ->
|
|
String.starts_with?(line, "plugins_tribe_one_sender_viewer_count ") and
|
|
String.trim_trailing(line) in [
|
|
"plugins_tribe_one_sender_viewer_count #{expected}",
|
|
"plugins_tribe_one_sender_viewer_count #{expected}.0"
|
|
]
|
|
end)
|
|
end
|
|
|
|
defp sender_metric_rollup?(%{"plugins" => plugins}, expected_value) when is_map(plugins) do
|
|
plugins
|
|
|> get_in(["tribe-one-sender", "viewer_count", "last"])
|
|
|> case do
|
|
nil -> get_in(plugins, ["tribe-one-sender", "viewer_count", "max"])
|
|
value -> value
|
|
end
|
|
|> numeric_equal?(expected_value)
|
|
end
|
|
|
|
defp sender_metric_rollup?(_rollup, _expected_value), do: false
|
|
|
|
defp numeric_equal?(value, expected) when is_integer(value) or is_float(value),
|
|
do: value == expected
|
|
|
|
defp numeric_equal?(value, expected) when is_binary(value) do
|
|
case Float.parse(value) do
|
|
{parsed, ""} -> parsed == expected
|
|
_other -> false
|
|
end
|
|
end
|
|
|
|
defp numeric_equal?(_value, _expected), do: false
|
|
|
|
defp connect_cluster_pair(left_base_url, left_node_info, right_base_url, right_node_info) do
|
|
with {:ok, _} <- upsert_cluster_node(left_base_url, right_node_info),
|
|
{:ok, _} <- upsert_cluster_node(right_base_url, left_node_info) do
|
|
:ok
|
|
else
|
|
{:error, reason} ->
|
|
assert_ok(false, "unable to connect cluster pair: #{inspect(reason)}")
|
|
end
|
|
end
|
|
|
|
defp upsert_cluster_node(base_url, node_info) do
|
|
admin(base_url, "cluster_nodes.upsert", %{
|
|
"pubkey" => node_pubkey(node_info),
|
|
"transport_address" => node_info["sync_url"],
|
|
"scope" => "all",
|
|
"status" => "active"
|
|
})
|
|
end
|
|
|
|
defp node_pubkey(%{"node_pubkey" => pubkey}) when is_binary(pubkey) and pubkey != "",
|
|
do: pubkey
|
|
|
|
defp node_pubkey(%{"pubkey" => pubkey}) when is_binary(pubkey) and pubkey != "",
|
|
do: pubkey
|
|
|
|
defp admin(base_url, method, params) do
|
|
url = base_url <> "/api/admin/management"
|
|
body = JSON.encode!(%{"method" => method, "params" => params})
|
|
|
|
headers = [
|
|
{~c"content-type", ~c"application/json"},
|
|
{~c"authorization", to_charlist(nip98_authorization("POST", url))}
|
|
]
|
|
|
|
case :httpc.request(:post, {to_charlist(url), headers, ~c"application/json", body}, [],
|
|
body_format: :binary
|
|
) do
|
|
{:ok, {{_version, status, _reason}, _headers, response_body}} ->
|
|
decoded = decode_json(response_body)
|
|
|
|
if status == 200 and decoded["ok"] == true do
|
|
{:ok, decoded["result"]}
|
|
else
|
|
{:error, %{status: status, body: decoded}}
|
|
end
|
|
|
|
{:error, reason} ->
|
|
{:error, reason}
|
|
end
|
|
end
|
|
|
|
defp admin_with_retry(base_url, method, params, opts) do
|
|
attempts = Keyword.get(opts, :attempts, 10)
|
|
delay_ms = Keyword.get(opts, :delay_ms, 500)
|
|
|
|
Enum.reduce_while(1..attempts, {:error, :retry_exhausted}, fn attempt, _acc ->
|
|
case admin(base_url, method, params) do
|
|
{:ok, _result} = ok ->
|
|
{:halt, ok}
|
|
|
|
error when attempt < attempts ->
|
|
Process.sleep(delay_ms)
|
|
{:cont, error}
|
|
|
|
error ->
|
|
{:halt, error}
|
|
end
|
|
end)
|
|
end
|
|
|
|
defp http_get(url, headers \\ base_headers()) do
|
|
case :httpc.request(:get, {to_charlist(url), headers}, [], body_format: :binary) do
|
|
{:ok, {{_version, status, _reason}, _headers, body}} -> {:ok, status, body}
|
|
{:error, reason} -> {:error, reason}
|
|
end
|
|
end
|
|
|
|
defp http_post_json(url, payload) do
|
|
body = JSON.encode!(payload)
|
|
|
|
headers = [{~c"content-type", ~c"application/json"} | viewer_headers()]
|
|
|
|
case :httpc.request(:post, {to_charlist(url), headers, ~c"application/json", body}, [],
|
|
body_format: :binary
|
|
) do
|
|
{:ok, {{_version, status, _reason}, _headers, response_body}} ->
|
|
{:ok, status, decode_json(response_body)}
|
|
|
|
{:error, reason} ->
|
|
{:error, reason}
|
|
end
|
|
end
|
|
|
|
defp base_headers, do: [{~c"x-forwarded-proto", ~c"https"}]
|
|
|
|
defp viewer_headers, do: base_headers()
|
|
|
|
defp wait_http(url) do
|
|
Enum.reduce_while(1..90, {:error, :timeout}, fn _attempt, _acc ->
|
|
case http_get(url) do
|
|
{:ok, status, _body} when status in 200..499 ->
|
|
{:halt, :ok}
|
|
|
|
other ->
|
|
Process.sleep(1_000)
|
|
{:cont, other}
|
|
end
|
|
end)
|
|
end
|
|
|
|
defp wait_victoria_metrics(base_url) do
|
|
Enum.reduce_while(1..90, {:error, :timeout}, fn _attempt, _acc ->
|
|
case http_get(base_url <> "/api/v1/query?query=vector(1)") do
|
|
{:ok, 200, _body} ->
|
|
{:halt, :ok}
|
|
|
|
other ->
|
|
Process.sleep(1_000)
|
|
{:cont, other}
|
|
end
|
|
end)
|
|
end
|
|
|
|
defp nip98_authorization(method, url) do
|
|
base = %{
|
|
"pubkey" => @admin_pubkey,
|
|
"created_at" => System.system_time(:second),
|
|
"kind" => 27_235,
|
|
"tags" => [["method", method], ["u", url]],
|
|
"content" => "sender-e2e-#{System.unique_integer([:positive, :monotonic])}"
|
|
}
|
|
|
|
event_id = Auth.compute_event_id(base)
|
|
{:ok, sig} = Tribes.Keyring.sign_event(Base.decode16!(event_id, case: :lower), @admin_privkey)
|
|
event = Map.merge(base, %{"id" => event_id, "sig" => sig})
|
|
|
|
"Nostr " <> Base.encode64(JSON.encode!(event))
|
|
end
|
|
|
|
defp decode_json(body) when is_binary(body) do
|
|
case JSON.decode(body) do
|
|
{:ok, decoded} -> decoded
|
|
{:error, _reason} -> %{"raw" => body}
|
|
end
|
|
end
|
|
|
|
defp ensure_http_client_started do
|
|
case :inets.start() do
|
|
:ok -> :ok
|
|
{:error, {:already_started, :inets}} -> :ok
|
|
{:error, reason} -> {:error, reason}
|
|
end
|
|
end
|
|
|
|
defp dump_debug do
|
|
IO.puts(:stderr, "==== sender e2e debug dump ====")
|
|
|
|
Enum.each([origin: @origin, edge: @edge], fn {name, base_url} ->
|
|
IO.puts(:stderr, "-- #{name} #{base_url} --")
|
|
|
|
IO.puts(
|
|
:stderr,
|
|
"node_info: #{inspect(admin(base_url, "node_info", %{}), limit: :infinity)}"
|
|
)
|
|
|
|
IO.puts(
|
|
:stderr,
|
|
"plugin_list: #{inspect(admin(base_url, "plugin_list", %{}), limit: :infinity)}"
|
|
)
|
|
|
|
IO.puts(
|
|
:stderr,
|
|
"sender status: #{inspect(sender(base_url, "stream.status", %{}), limit: :infinity)}"
|
|
)
|
|
end)
|
|
|
|
IO.puts(:stderr, "==== end sender e2e debug dump ====")
|
|
end
|
|
|
|
defp dump_metrics_debug do
|
|
IO.puts(:stderr, "==== sender e2e metrics debug dump ====")
|
|
|
|
Enum.each([origin: @origin, edge: @edge], fn {name, base_url} ->
|
|
IO.puts(:stderr, "-- #{name} #{base_url} --")
|
|
|
|
IO.puts(
|
|
:stderr,
|
|
"metrics_rollups.list: #{inspect(admin(base_url, "metrics_rollups.list", %{"limit" => 10}), limit: :infinity)}"
|
|
)
|
|
end)
|
|
|
|
IO.puts(:stderr, "==== end sender e2e metrics debug dump ====")
|
|
end
|
|
|
|
defp assert_ok(true, _message), do: :ok
|
|
defp assert_ok(:ok, _message), do: :ok
|
|
|
|
defp assert_ok(false, message) do
|
|
IO.puts(:stderr, "assertion failed: #{message}")
|
|
System.halt(1)
|
|
end
|
|
|
|
defp assert_ok(other, message) do
|
|
IO.puts(:stderr, "assertion failed: #{message} (got: #{inspect(other)})")
|
|
System.halt(1)
|
|
end
|
|
|
|
defp assert_ok(true, _message, _on_failure), do: :ok
|
|
|
|
defp assert_ok(false, message, on_failure) when is_function(on_failure, 0) do
|
|
on_failure.()
|
|
assert_ok(false, message)
|
|
end
|
|
|
|
defp assert_ok(other, message, on_failure) when is_function(on_failure, 0) do
|
|
on_failure.()
|
|
assert_ok(other, message)
|
|
end
|
|
end
|
|
|
|
TribeOne.TribesPlugin.Sender.E2ERunner.main(System.argv())
|