2 Commits

Author SHA1 Message Date
self 220e5e555f chore: declare git-hooks input in devenv.yaml
Newer devenv versions require git-hooks.nix to be declared as an
explicit input when devenv.nix uses `git-hooks.hooks`.
2026-05-24 15:20:11 +02:00
self c43d7f4843 fix: honor forwarded IPs from trusted proxies
Add an explicit trusted-proxy list for HLS IP fallback so deployments behind HAProxy can count original viewer addresses from X-Forwarded-For without trusting spoofed headers from arbitrary clients.
2026-05-17 18:25:11 +02:00
6 changed files with 168 additions and 16 deletions
+7 -1
View File
@@ -14,6 +14,11 @@ Viewer identity is resolved in this order:
Raw viewer IDs are only kept in memory and are never emitted as Prometheus labels.
When Vinyl runs behind a reverse proxy, configure the proxy address with
`--hls.identity.trusted-proxy`. If the Vinyl peer address is trusted, IP fallback
uses the first untrusted address from `X-Forwarded-For`; otherwise the header is
ignored.
## Example
```sh
@@ -21,7 +26,8 @@ vinyl_exporter \
--hls.path-prefix=/hls/ \
--hls.stream-components=1 \
--hls.identity.query-param=viewer \
--hls.identity.cookie=viewer_id
--hls.identity.cookie=viewer_id \
--hls.identity.trusted-proxy=127.0.0.1
```
For `/hls/channel-a/index.m3u8?viewer=abc`, the stream label is `channel-a`.
+3
View File
@@ -22,6 +22,7 @@ func main() {
var suffixes exporter.StringList = cfg.HLS.PlaylistSuffixes
var queryParams exporter.StringList
var cookies exporter.StringList
var trustedProxies exporter.StringList
flag.StringVar(&cfg.ListenAddress, "web.listen-address", cfg.ListenAddress, "Address on which to expose metrics.")
flag.StringVar(&cfg.MetricsPath, "web.telemetry-path", cfg.MetricsPath, "Path under which to expose metrics.")
@@ -37,11 +38,13 @@ func main() {
flag.Var(&queryParams, "hls.identity.query-param", "Query parameter to use for viewer identity. Repeatable.")
flag.Var(&cookies, "hls.identity.cookie", "Cookie name to use for viewer identity. Repeatable.")
flag.BoolVar(&cfg.HLS.IPFallback, "hls.identity.ip-fallback", cfg.HLS.IPFallback, "Fall back to client IP when query/cookie identity is unavailable.")
flag.Var(&trustedProxies, "hls.identity.trusted-proxy", "Trusted proxy IP or CIDR whose X-Forwarded-For header may be used for IP fallback. Repeatable.")
flag.Parse()
cfg.HLS.PlaylistSuffixes = []string(suffixes)
cfg.HLS.QueryParams = []string(queryParams)
cfg.HLS.Cookies = []string(cookies)
cfg.HLS.TrustedProxies = []string(trustedProxies)
normalizeConfig(&cfg)
logger := slog.New(slog.NewTextHandler(os.Stderr, nil))
+5
View File
@@ -2,6 +2,11 @@
inputs:
nixpkgs:
url: github:cachix/devenv-nixpkgs/rolling
git-hooks:
url: github:cachix/git-hooks.nix
inputs:
nixpkgs:
follows: nixpkgs
nixpkgs-unstable:
url: github:NixOS/nixpkgs/nixos-unstable
+4 -3
View File
@@ -21,9 +21,10 @@ type HLSConfig struct {
PlaylistSuffixes []string
ViewerTTL time.Duration
QueryParams []string
Cookies []string
IPFallback bool
QueryParams []string
Cookies []string
IPFallback bool
TrustedProxies []string
}
func DefaultConfig() Config {
+96 -12
View File
@@ -16,7 +16,8 @@ import (
)
type HLSState struct {
cfg HLSConfig
cfg HLSConfig
trustedProxies []*net.IPNet
mu sync.Mutex
lastSeen map[string]map[string]time.Time
@@ -46,18 +47,20 @@ type LogRunner struct {
}
type transaction struct {
URL string
Cookie string
ClientIP string
Status int
URL string
Cookie string
ForwardedFor string
ClientIP string
Status int
}
func NewHLSState(cfg HLSConfig) *HLSState {
return &HLSState{
cfg: cfg,
lastSeen: make(map[string]map[string]time.Time),
requests: make(map[hlsRequestKey]uint64),
missingID: make(map[string]uint64),
cfg: cfg,
trustedProxies: parseTrustedProxies(cfg.TrustedProxies),
lastSeen: make(map[string]map[string]time.Time),
requests: make(map[hlsRequestKey]uint64),
missingID: make(map[string]uint64),
}
}
@@ -176,12 +179,28 @@ func (s *HLSState) identity(tx transaction, parsed *url.URL) (string, string) {
}
}
}
if s.cfg.IPFallback && tx.ClientIP != "" {
return "ip", tx.ClientIP
if s.cfg.IPFallback {
if ip := s.fallbackIP(tx); ip != "" {
return "ip", ip
}
}
return "none", ""
}
func (s *HLSState) fallbackIP(tx transaction) string {
clientIP := normalizeIP(tx.ClientIP)
if clientIP == "" {
return ""
}
if len(s.trustedProxies) == 0 || !ipInNets(clientIP, s.trustedProxies) {
return clientIP
}
if forwardedIP := forwardedClientIP(tx.ForwardedFor, s.trustedProxies); forwardedIP != "" {
return forwardedIP
}
return clientIP
}
func (r LogRunner) Run(ctx context.Context) {
if r.State == nil {
return
@@ -257,8 +276,18 @@ func ParseVinyllog(r io.Reader, state *HLSState, now func() time.Time) {
tx.URL = value
case "ReqHeader":
name, headerValue, found := strings.Cut(value, ":")
if found && strings.EqualFold(strings.TrimSpace(name), "Cookie") {
if !found {
continue
}
switch {
case strings.EqualFold(strings.TrimSpace(name), "Cookie"):
tx.Cookie = strings.TrimSpace(headerValue)
case strings.EqualFold(strings.TrimSpace(name), "X-Forwarded-For"):
if tx.ForwardedFor == "" {
tx.ForwardedFor = strings.TrimSpace(headerValue)
} else {
tx.ForwardedFor += ", " + strings.TrimSpace(headerValue)
}
}
case "RespStatus":
fmt.Sscanf(value, "%d", &tx.Status)
@@ -293,3 +322,58 @@ func normalizeIP(value string) string {
}
return strings.Trim(value, "[]")
}
func parseTrustedProxies(values []string) []*net.IPNet {
proxies := make([]*net.IPNet, 0, len(values))
for _, raw := range values {
value := strings.TrimSpace(raw)
if value == "" {
continue
}
if strings.Contains(value, "/") {
if _, ipNet, err := net.ParseCIDR(value); err == nil {
proxies = append(proxies, ipNet)
}
continue
}
if ip := net.ParseIP(normalizeIP(value)); ip != nil {
bits := 32
if ip.To4() == nil {
bits = 128
}
proxies = append(proxies, &net.IPNet{IP: ip, Mask: net.CIDRMask(bits, bits)})
}
}
return proxies
}
func forwardedClientIP(header string, trustedProxies []*net.IPNet) string {
if header == "" {
return ""
}
parts := strings.Split(header, ",")
for i := len(parts) - 1; i >= 0; i-- {
candidate := normalizeIP(strings.TrimSpace(parts[i]))
if candidate == "" || net.ParseIP(candidate) == nil {
continue
}
if ipInNets(candidate, trustedProxies) {
continue
}
return candidate
}
return ""
}
func ipInNets(value string, nets []*net.IPNet) bool {
ip := net.ParseIP(normalizeIP(value))
if ip == nil {
return false
}
for _, ipNet := range nets {
if ipNet.Contains(ip) {
return true
}
}
return false
}
+53
View File
@@ -85,6 +85,59 @@ func TestParseVinyllogRequestTransactions(t *testing.T) {
}
}
func TestIPFallbackUsesForwardedForFromTrustedProxy(t *testing.T) {
cfg := testHLSConfig()
cfg.TrustedProxies = []string{"127.0.0.1", "10.0.0.0/8"}
state := NewHLSState(cfg)
now := time.Date(2026, 5, 15, 12, 0, 0, 0, time.UTC)
log := `
* << Request >> 1001
- ReqStart 127.0.0.1 12345
- ReqURL /hls/news/index.m3u8
- ReqHeader X-Forwarded-For: 198.51.100.42, 10.0.0.8
- RespStatus 200
* << Request >> 1002
- ReqStart 127.0.0.1 12345
- ReqURL /hls/news/index.m3u8
- ReqHeader X-Forwarded-For: 198.51.100.43, 10.0.0.8
- RespStatus 200
`
ParseVinyllog(strings.NewReader(log), state, func() time.Time { return now })
snapshot := state.Snapshot(now)
if got := snapshot.Active["news"]; got != 2 {
t.Fatalf("active viewers = %d, want 2", got)
}
if got := snapshot.Requests[hlsRequestKey{Stream: "news", IdentitySource: "ip", StatusClass: "2xx"}]; got != 2 {
t.Fatalf("ip requests = %d, want 2", got)
}
}
func TestIPFallbackIgnoresForwardedForFromUntrustedPeer(t *testing.T) {
cfg := testHLSConfig()
cfg.TrustedProxies = []string{"127.0.0.1"}
state := NewHLSState(cfg)
now := time.Date(2026, 5, 15, 12, 0, 0, 0, time.UTC)
state.Observe(transaction{
URL: "/hls/news/index.m3u8",
ClientIP: "203.0.113.10",
ForwardedFor: "198.51.100.42",
Status: 200,
}, now)
state.Observe(transaction{
URL: "/hls/news/index.m3u8",
ClientIP: "203.0.113.10",
ForwardedFor: "198.51.100.43",
Status: 200,
}, now)
snapshot := state.Snapshot(now)
if got := snapshot.Active["news"]; got != 1 {
t.Fatalf("active viewers = %d, want 1", got)
}
}
func TestMissingIdentityWhenFallbackDisabled(t *testing.T) {
cfg := testHLSConfig()
cfg.QueryParams = nil