You've already forked vinyl_exporter
Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 220e5e555f | |||
| c43d7f4843 |
@@ -14,6 +14,11 @@ Viewer identity is resolved in this order:
|
||||
|
||||
Raw viewer IDs are only kept in memory and are never emitted as Prometheus labels.
|
||||
|
||||
When Vinyl runs behind a reverse proxy, configure the proxy address with
|
||||
`--hls.identity.trusted-proxy`. If the Vinyl peer address is trusted, IP fallback
|
||||
uses the first untrusted address from `X-Forwarded-For`; otherwise the header is
|
||||
ignored.
|
||||
|
||||
## Example
|
||||
|
||||
```sh
|
||||
@@ -21,7 +26,8 @@ vinyl_exporter \
|
||||
--hls.path-prefix=/hls/ \
|
||||
--hls.stream-components=1 \
|
||||
--hls.identity.query-param=viewer \
|
||||
--hls.identity.cookie=viewer_id
|
||||
--hls.identity.cookie=viewer_id \
|
||||
--hls.identity.trusted-proxy=127.0.0.1
|
||||
```
|
||||
|
||||
For `/hls/channel-a/index.m3u8?viewer=abc`, the stream label is `channel-a`.
|
||||
|
||||
@@ -22,6 +22,7 @@ func main() {
|
||||
var suffixes exporter.StringList = cfg.HLS.PlaylistSuffixes
|
||||
var queryParams exporter.StringList
|
||||
var cookies exporter.StringList
|
||||
var trustedProxies exporter.StringList
|
||||
|
||||
flag.StringVar(&cfg.ListenAddress, "web.listen-address", cfg.ListenAddress, "Address on which to expose metrics.")
|
||||
flag.StringVar(&cfg.MetricsPath, "web.telemetry-path", cfg.MetricsPath, "Path under which to expose metrics.")
|
||||
@@ -37,11 +38,13 @@ func main() {
|
||||
flag.Var(&queryParams, "hls.identity.query-param", "Query parameter to use for viewer identity. Repeatable.")
|
||||
flag.Var(&cookies, "hls.identity.cookie", "Cookie name to use for viewer identity. Repeatable.")
|
||||
flag.BoolVar(&cfg.HLS.IPFallback, "hls.identity.ip-fallback", cfg.HLS.IPFallback, "Fall back to client IP when query/cookie identity is unavailable.")
|
||||
flag.Var(&trustedProxies, "hls.identity.trusted-proxy", "Trusted proxy IP or CIDR whose X-Forwarded-For header may be used for IP fallback. Repeatable.")
|
||||
flag.Parse()
|
||||
|
||||
cfg.HLS.PlaylistSuffixes = []string(suffixes)
|
||||
cfg.HLS.QueryParams = []string(queryParams)
|
||||
cfg.HLS.Cookies = []string(cookies)
|
||||
cfg.HLS.TrustedProxies = []string(trustedProxies)
|
||||
normalizeConfig(&cfg)
|
||||
|
||||
logger := slog.New(slog.NewTextHandler(os.Stderr, nil))
|
||||
|
||||
@@ -2,6 +2,11 @@
|
||||
inputs:
|
||||
nixpkgs:
|
||||
url: github:cachix/devenv-nixpkgs/rolling
|
||||
git-hooks:
|
||||
url: github:cachix/git-hooks.nix
|
||||
inputs:
|
||||
nixpkgs:
|
||||
follows: nixpkgs
|
||||
nixpkgs-unstable:
|
||||
url: github:NixOS/nixpkgs/nixos-unstable
|
||||
|
||||
|
||||
@@ -21,9 +21,10 @@ type HLSConfig struct {
|
||||
PlaylistSuffixes []string
|
||||
ViewerTTL time.Duration
|
||||
|
||||
QueryParams []string
|
||||
Cookies []string
|
||||
IPFallback bool
|
||||
QueryParams []string
|
||||
Cookies []string
|
||||
IPFallback bool
|
||||
TrustedProxies []string
|
||||
}
|
||||
|
||||
func DefaultConfig() Config {
|
||||
|
||||
+96
-12
@@ -16,7 +16,8 @@ import (
|
||||
)
|
||||
|
||||
type HLSState struct {
|
||||
cfg HLSConfig
|
||||
cfg HLSConfig
|
||||
trustedProxies []*net.IPNet
|
||||
|
||||
mu sync.Mutex
|
||||
lastSeen map[string]map[string]time.Time
|
||||
@@ -46,18 +47,20 @@ type LogRunner struct {
|
||||
}
|
||||
|
||||
type transaction struct {
|
||||
URL string
|
||||
Cookie string
|
||||
ClientIP string
|
||||
Status int
|
||||
URL string
|
||||
Cookie string
|
||||
ForwardedFor string
|
||||
ClientIP string
|
||||
Status int
|
||||
}
|
||||
|
||||
func NewHLSState(cfg HLSConfig) *HLSState {
|
||||
return &HLSState{
|
||||
cfg: cfg,
|
||||
lastSeen: make(map[string]map[string]time.Time),
|
||||
requests: make(map[hlsRequestKey]uint64),
|
||||
missingID: make(map[string]uint64),
|
||||
cfg: cfg,
|
||||
trustedProxies: parseTrustedProxies(cfg.TrustedProxies),
|
||||
lastSeen: make(map[string]map[string]time.Time),
|
||||
requests: make(map[hlsRequestKey]uint64),
|
||||
missingID: make(map[string]uint64),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -176,12 +179,28 @@ func (s *HLSState) identity(tx transaction, parsed *url.URL) (string, string) {
|
||||
}
|
||||
}
|
||||
}
|
||||
if s.cfg.IPFallback && tx.ClientIP != "" {
|
||||
return "ip", tx.ClientIP
|
||||
if s.cfg.IPFallback {
|
||||
if ip := s.fallbackIP(tx); ip != "" {
|
||||
return "ip", ip
|
||||
}
|
||||
}
|
||||
return "none", ""
|
||||
}
|
||||
|
||||
func (s *HLSState) fallbackIP(tx transaction) string {
|
||||
clientIP := normalizeIP(tx.ClientIP)
|
||||
if clientIP == "" {
|
||||
return ""
|
||||
}
|
||||
if len(s.trustedProxies) == 0 || !ipInNets(clientIP, s.trustedProxies) {
|
||||
return clientIP
|
||||
}
|
||||
if forwardedIP := forwardedClientIP(tx.ForwardedFor, s.trustedProxies); forwardedIP != "" {
|
||||
return forwardedIP
|
||||
}
|
||||
return clientIP
|
||||
}
|
||||
|
||||
func (r LogRunner) Run(ctx context.Context) {
|
||||
if r.State == nil {
|
||||
return
|
||||
@@ -257,8 +276,18 @@ func ParseVinyllog(r io.Reader, state *HLSState, now func() time.Time) {
|
||||
tx.URL = value
|
||||
case "ReqHeader":
|
||||
name, headerValue, found := strings.Cut(value, ":")
|
||||
if found && strings.EqualFold(strings.TrimSpace(name), "Cookie") {
|
||||
if !found {
|
||||
continue
|
||||
}
|
||||
switch {
|
||||
case strings.EqualFold(strings.TrimSpace(name), "Cookie"):
|
||||
tx.Cookie = strings.TrimSpace(headerValue)
|
||||
case strings.EqualFold(strings.TrimSpace(name), "X-Forwarded-For"):
|
||||
if tx.ForwardedFor == "" {
|
||||
tx.ForwardedFor = strings.TrimSpace(headerValue)
|
||||
} else {
|
||||
tx.ForwardedFor += ", " + strings.TrimSpace(headerValue)
|
||||
}
|
||||
}
|
||||
case "RespStatus":
|
||||
fmt.Sscanf(value, "%d", &tx.Status)
|
||||
@@ -293,3 +322,58 @@ func normalizeIP(value string) string {
|
||||
}
|
||||
return strings.Trim(value, "[]")
|
||||
}
|
||||
|
||||
func parseTrustedProxies(values []string) []*net.IPNet {
|
||||
proxies := make([]*net.IPNet, 0, len(values))
|
||||
for _, raw := range values {
|
||||
value := strings.TrimSpace(raw)
|
||||
if value == "" {
|
||||
continue
|
||||
}
|
||||
if strings.Contains(value, "/") {
|
||||
if _, ipNet, err := net.ParseCIDR(value); err == nil {
|
||||
proxies = append(proxies, ipNet)
|
||||
}
|
||||
continue
|
||||
}
|
||||
if ip := net.ParseIP(normalizeIP(value)); ip != nil {
|
||||
bits := 32
|
||||
if ip.To4() == nil {
|
||||
bits = 128
|
||||
}
|
||||
proxies = append(proxies, &net.IPNet{IP: ip, Mask: net.CIDRMask(bits, bits)})
|
||||
}
|
||||
}
|
||||
return proxies
|
||||
}
|
||||
|
||||
func forwardedClientIP(header string, trustedProxies []*net.IPNet) string {
|
||||
if header == "" {
|
||||
return ""
|
||||
}
|
||||
parts := strings.Split(header, ",")
|
||||
for i := len(parts) - 1; i >= 0; i-- {
|
||||
candidate := normalizeIP(strings.TrimSpace(parts[i]))
|
||||
if candidate == "" || net.ParseIP(candidate) == nil {
|
||||
continue
|
||||
}
|
||||
if ipInNets(candidate, trustedProxies) {
|
||||
continue
|
||||
}
|
||||
return candidate
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func ipInNets(value string, nets []*net.IPNet) bool {
|
||||
ip := net.ParseIP(normalizeIP(value))
|
||||
if ip == nil {
|
||||
return false
|
||||
}
|
||||
for _, ipNet := range nets {
|
||||
if ipNet.Contains(ip) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
@@ -85,6 +85,59 @@ func TestParseVinyllogRequestTransactions(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestIPFallbackUsesForwardedForFromTrustedProxy(t *testing.T) {
|
||||
cfg := testHLSConfig()
|
||||
cfg.TrustedProxies = []string{"127.0.0.1", "10.0.0.0/8"}
|
||||
state := NewHLSState(cfg)
|
||||
now := time.Date(2026, 5, 15, 12, 0, 0, 0, time.UTC)
|
||||
|
||||
log := `
|
||||
* << Request >> 1001
|
||||
- ReqStart 127.0.0.1 12345
|
||||
- ReqURL /hls/news/index.m3u8
|
||||
- ReqHeader X-Forwarded-For: 198.51.100.42, 10.0.0.8
|
||||
- RespStatus 200
|
||||
* << Request >> 1002
|
||||
- ReqStart 127.0.0.1 12345
|
||||
- ReqURL /hls/news/index.m3u8
|
||||
- ReqHeader X-Forwarded-For: 198.51.100.43, 10.0.0.8
|
||||
- RespStatus 200
|
||||
`
|
||||
ParseVinyllog(strings.NewReader(log), state, func() time.Time { return now })
|
||||
snapshot := state.Snapshot(now)
|
||||
if got := snapshot.Active["news"]; got != 2 {
|
||||
t.Fatalf("active viewers = %d, want 2", got)
|
||||
}
|
||||
if got := snapshot.Requests[hlsRequestKey{Stream: "news", IdentitySource: "ip", StatusClass: "2xx"}]; got != 2 {
|
||||
t.Fatalf("ip requests = %d, want 2", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestIPFallbackIgnoresForwardedForFromUntrustedPeer(t *testing.T) {
|
||||
cfg := testHLSConfig()
|
||||
cfg.TrustedProxies = []string{"127.0.0.1"}
|
||||
state := NewHLSState(cfg)
|
||||
now := time.Date(2026, 5, 15, 12, 0, 0, 0, time.UTC)
|
||||
|
||||
state.Observe(transaction{
|
||||
URL: "/hls/news/index.m3u8",
|
||||
ClientIP: "203.0.113.10",
|
||||
ForwardedFor: "198.51.100.42",
|
||||
Status: 200,
|
||||
}, now)
|
||||
state.Observe(transaction{
|
||||
URL: "/hls/news/index.m3u8",
|
||||
ClientIP: "203.0.113.10",
|
||||
ForwardedFor: "198.51.100.43",
|
||||
Status: 200,
|
||||
}, now)
|
||||
|
||||
snapshot := state.Snapshot(now)
|
||||
if got := snapshot.Active["news"]; got != 1 {
|
||||
t.Fatalf("active viewers = %d, want 1", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestMissingIdentityWhenFallbackDisabled(t *testing.T) {
|
||||
cfg := testHLSConfig()
|
||||
cfg.QueryParams = nil
|
||||
|
||||
Reference in New Issue
Block a user