Skip to content

Commit 2241a73

Browse files
authored
fix(drilldown): stop injecting unknown_service for non-service volume grouping (#229)
* fix(drilldown): avoid unknown_service volume buckets for non-service grouping * fix(drilldown): honor fieldBy fallback for volume grouping * fix(metrics): harden binary scalar handling and add ops matrix * fix: align rate and unwrap parity with Loki * fix(translator): keep rate parity for non-stream labels
1 parent f7bbc05 commit 2241a73

13 files changed

Lines changed: 1467 additions & 103 deletions

CHANGELOG.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,18 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## [Unreleased]
99

10+
### Bug Fixes
11+
12+
- drilldown/volume: stop injecting synthetic `service_name="unknown_service"` into `index/volume` and `index/volume_range` buckets when requests are grouped by non-service labels (for example `cluster`), while preserving service-aware grouping behavior.
13+
- drilldown/volume: honor Drilldown grouping hints (`drillDownLabel`, `fieldBy`, and `var-fieldBy`) as target-label fallbacks when `targetLabels` is omitted, so field/label include-exclude actions keep grouping on the selected dimension instead of falling back to selector-order inference.
14+
- translator/metrics: make `rate` and `bytes_rate` preserve Loki per-second semantics via window normalization, make parser+unwrap metric paths preserve Loki-like cardinality with outer-aggregation composition, emit VictoriaLogs-compatible byte aggregations via `sum_len(_msg)`, and implement `stdvar_over_time` via proxy-side `stddev^2` composition to avoid backend `stdvar` parser failures.
15+
- proxy/binary-metrics: fix scalar and binary post-processing to mutate both legacy `results` payloads and Prometheus-style `data.result` payloads (including instant-vector `value` samples), preventing silent no-op arithmetic on valid `stats_query` responses.
16+
17+
### Tests
18+
19+
- drilldown/volume: add regression coverage for inferred non-service target labels (no synthetic `unknown_service`) and for Drilldown `fieldBy` fallback mapping on both vector and matrix volume endpoints.
20+
- compat/matrix: add operation/filter/function matrix coverage across translator and proxy scalar paths, with deterministic e2e checks for binary scalar operators, filter operators, and metric function families (cross-engine parity for compatible functions plus proxy-local expected-value checks where semantics intentionally differ), and add scalar/binary fuzz coverage for response-shape robustness.
21+
1022
## [1.11.0] - 2026-04-21
1123

1224
### Bug Fixes

docs/translation-reference.md

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -73,29 +73,29 @@ These stages are executed at the proxy level (VL has no native equivalents):
7373

7474
| LogQL | LogsQL |
7575
|---|---|
76-
| `rate({...}[5m])` | `... \| stats rate()` |
76+
| `rate({...}[5m])` | `stats count()` + `math` normalization by window seconds, then `stats sum(...)` per grouping |
7777
| `count_over_time({...}[5m])` | `... \| stats count()` |
78-
| `bytes_over_time({...}[5m])` | `... \| stats sum(len(_msg))` |
79-
| `bytes_rate({...}[5m])` | `... \| stats rate_sum(len(_msg))` |
78+
| `bytes_over_time({...}[5m])` | `... \| stats sum_len(_msg)` |
79+
| `bytes_rate({...}[5m])` | `stats sum_len(_msg)` + `math` normalization by window seconds, then `stats sum(...)` per grouping |
8080
| `sum_over_time({...} \| unwrap f [5m])` | `... \| stats sum(f)` |
8181
| `avg_over_time({...} \| unwrap f [5m])` | `... \| stats avg(f)` |
8282
| `max_over_time({...} \| unwrap f [5m])` | `... \| stats max(f)` |
8383
| `min_over_time({...} \| unwrap f [5m])` | `... \| stats min(f)` |
8484
| `first_over_time({...} \| unwrap f [5m])` | `... \| stats first(f)` |
8585
| `last_over_time({...} \| unwrap f [5m])` | `... \| stats last(f)` |
8686
| `stddev_over_time({...} \| unwrap f [5m])` | `... \| stats stddev(f)` |
87-
| `stdvar_over_time({...} \| unwrap f [5m])` | `... \| stats stdvar(f)` |
87+
| `stdvar_over_time({...} \| unwrap f [5m])` | proxy binary expression: `(... \| stats stddev(f)) ^ 2` |
8888
| `quantile_over_time(0.95, {...} \| unwrap f [5m])` | `... \| stats quantile(0.95, f)` |
8989
| `absent_over_time({...}[5m])` | `... \| stats count()` |
9090

9191
### Outer Aggregations
9292

9393
| LogQL | LogsQL |
9494
|---|---|
95-
| `sum(rate({...}[5m]))` | `... \| stats rate()` |
96-
| `sum(rate({...}[5m])) by (x)` | `... \| stats by (x) rate()` |
97-
| `avg(rate({...}[5m])) by (x)` | `... \| stats by (x) rate()` |
98-
| `topk(10, rate({...}[5m]))` | `... \| stats rate()` |
95+
| `sum(rate({...}[5m]))` | normalized per-stream/per-group rate, then proxy applies outer aggregation where supported |
96+
| `sum(rate({...}[5m])) by (x)` | `... \| stats by (x) count() as __lvp_inner \| math __lvp_inner/window as __lvp_rate \| stats by (x) sum(__lvp_rate)` |
97+
| `avg(rate({...}[5m])) by (x)` | same normalized-rate path grouped by `(x)` |
98+
| `topk(10, rate({...}[5m]))` | normalized-rate path with stream grouping; top-level selection remains simplified |
9999

100100
Supported: `sum`, `avg`, `max`, `min`, `count`, `topk`, `bottomk`, `stddev`, `stdvar`, `sort`, `sort_desc`.
101101

internal/proxy/drilldown_compat_test.go

Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -407,6 +407,12 @@ func TestDrilldown_IndexVolume_InfersPrimaryTargetLabelForAdditionalTabs(t *test
407407
if len(result) != 2 {
408408
t.Fatalf("expected cluster buckets, got %v", result)
409409
}
410+
for _, item := range result {
411+
metric := item.(map[string]interface{})["metric"].(map[string]interface{})
412+
if _, ok := metric["service_name"]; ok {
413+
t.Fatalf("expected inferred cluster volume metrics to omit synthetic unknown service_name, got %v", metric)
414+
}
415+
}
410416
}
411417

412418
func TestDrilldown_IndexVolume_TranslatesInferredTargetLabelMetrics(t *testing.T) {
@@ -461,6 +467,140 @@ func TestDrilldown_IndexVolume_TranslatesInferredTargetLabelMetrics(t *testing.T
461467
}
462468
}
463469

470+
func TestDrilldown_IndexVolume_UsesDrilldownFieldByFallbackForTargetLabels(t *testing.T) {
471+
var receivedField string
472+
vlBackend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
473+
if r.URL.Path != "/select/logsql/hits" {
474+
t.Fatalf("unexpected backend path %s", r.URL.Path)
475+
}
476+
receivedField = r.URL.Query().Get("field")
477+
json.NewEncoder(w).Encode(map[string]interface{}{
478+
"hints": map[string]interface{}{},
479+
"hits": []map[string]interface{}{
480+
{
481+
"fields": map[string]string{"method": "GET"},
482+
"timestamps": []string{"2026-04-04T17:18:49Z"},
483+
"values": []int{4},
484+
},
485+
},
486+
})
487+
}))
488+
defer vlBackend.Close()
489+
490+
p := newGapTestProxy(t, vlBackend.URL)
491+
w := httptest.NewRecorder()
492+
r := httptest.NewRequest(
493+
"GET",
494+
"/loki/api/v1/index/volume?query=%7Bcluster%3D~%60.%2B%60%7D&start=1&end=2&var-fieldBy=method&drillDownLabel=method",
495+
nil,
496+
)
497+
p.handleVolume(w, r)
498+
499+
if receivedField != "method" {
500+
t.Fatalf("expected drilldown fieldBy fallback to drive target label mapping, got %q", receivedField)
501+
}
502+
503+
var resp map[string]interface{}
504+
mustUnmarshal(t, w.Body.Bytes(), &resp)
505+
data := assertDataIsObject(t, resp)
506+
result := assertResultIsArray(t, data)
507+
if len(result) != 1 {
508+
t.Fatalf("expected one method bucket, got %v", result)
509+
}
510+
metric := result[0].(map[string]interface{})["metric"].(map[string]interface{})
511+
if metric["method"] != "GET" {
512+
t.Fatalf("expected method grouping bucket in metric, got %v", metric)
513+
}
514+
}
515+
516+
func TestDrilldown_IndexVolumeRange_InfersPrimaryTargetLabelWithoutUnknownService(t *testing.T) {
517+
var receivedField string
518+
vlBackend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
519+
if r.URL.Path != "/select/logsql/hits" {
520+
t.Fatalf("unexpected backend path %s", r.URL.Path)
521+
}
522+
receivedField = r.URL.Query().Get("field")
523+
json.NewEncoder(w).Encode(map[string]interface{}{
524+
"hints": map[string]interface{}{},
525+
"hits": []map[string]interface{}{
526+
{
527+
"fields": map[string]string{"cluster": "us-east-1"},
528+
"timestamps": []string{"2026-04-04T17:18:49Z"},
529+
"values": []int{12},
530+
},
531+
},
532+
})
533+
}))
534+
defer vlBackend.Close()
535+
536+
p := newGapTestProxy(t, vlBackend.URL)
537+
w := httptest.NewRecorder()
538+
r := httptest.NewRequest("GET", "/loki/api/v1/index/volume_range?query=%7Bcluster%3D~%60.%2B%60%7D&start=1&end=2&step=60", nil)
539+
p.handleVolumeRange(w, r)
540+
541+
if receivedField != "cluster" {
542+
t.Fatalf("expected inferred volume_range field=cluster, got %q", receivedField)
543+
}
544+
545+
var resp map[string]interface{}
546+
mustUnmarshal(t, w.Body.Bytes(), &resp)
547+
data := assertDataIsObject(t, resp)
548+
result := assertResultIsArray(t, data)
549+
if len(result) != 1 {
550+
t.Fatalf("expected one cluster series, got %v", result)
551+
}
552+
metric := result[0].(map[string]interface{})["metric"].(map[string]interface{})
553+
if _, ok := metric["service_name"]; ok {
554+
t.Fatalf("expected inferred cluster volume_range metric to omit synthetic unknown service_name, got %v", metric)
555+
}
556+
}
557+
558+
func TestDrilldown_IndexVolumeRange_UsesDrilldownFieldByFallbackForTargetLabels(t *testing.T) {
559+
var receivedField string
560+
vlBackend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
561+
if r.URL.Path != "/select/logsql/hits" {
562+
t.Fatalf("unexpected backend path %s", r.URL.Path)
563+
}
564+
receivedField = r.URL.Query().Get("field")
565+
json.NewEncoder(w).Encode(map[string]interface{}{
566+
"hints": map[string]interface{}{},
567+
"hits": []map[string]interface{}{
568+
{
569+
"fields": map[string]string{"method": "POST"},
570+
"timestamps": []string{"2026-04-04T17:18:49Z"},
571+
"values": []int{6},
572+
},
573+
},
574+
})
575+
}))
576+
defer vlBackend.Close()
577+
578+
p := newGapTestProxy(t, vlBackend.URL)
579+
w := httptest.NewRecorder()
580+
r := httptest.NewRequest(
581+
"GET",
582+
"/loki/api/v1/index/volume_range?query=%7Bcluster%3D~%60.%2B%60%7D&start=1&end=2&step=60&var-fieldBy=method&drillDownLabel=method",
583+
nil,
584+
)
585+
p.handleVolumeRange(w, r)
586+
587+
if receivedField != "method" {
588+
t.Fatalf("expected drilldown fieldBy fallback to drive volume_range target label mapping, got %q", receivedField)
589+
}
590+
591+
var resp map[string]interface{}
592+
mustUnmarshal(t, w.Body.Bytes(), &resp)
593+
data := assertDataIsObject(t, resp)
594+
result := assertResultIsArray(t, data)
595+
if len(result) != 1 {
596+
t.Fatalf("expected one method series, got %v", result)
597+
}
598+
metric := result[0].(map[string]interface{})["metric"].(map[string]interface{})
599+
if metric["method"] != "POST" {
600+
t.Fatalf("expected method grouping series in volume_range metric, got %v", metric)
601+
}
602+
}
603+
464604
func TestDrilldown_IndexVolumeRange_TargetLabelsDetectedLevelUsesDerivedAggregation(t *testing.T) {
465605
vlBackend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
466606
if r.URL.Path != "/select/logsql/hits" {

internal/proxy/fuzz_test.go

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ package proxy
22

33
import (
44
"encoding/json"
5+
"fmt"
6+
"strconv"
57
"testing"
68
)
79

@@ -148,3 +150,52 @@ func FuzzNormalizeMetadataPairs(f *testing.F) {
148150
}
149151
})
150152
}
153+
154+
// FuzzCombineBinaryMetricResultsScalarPath validates scalar/vector merge shaping for all operators.
155+
func FuzzCombineBinaryMetricResultsScalarPath(f *testing.F) {
156+
ops := []string{"+", "-", "*", "/", "%", "^", "==", "!=", ">", "<", ">=", "<=", "unknown"}
157+
for _, op := range ops {
158+
f.Add(op, 10.0, 3.0, false, true)
159+
f.Add(op, 10.0, 3.0, true, false)
160+
f.Add(op, 10.0, 3.0, false, false)
161+
}
162+
163+
f.Fuzz(func(t *testing.T, op string, metricValue, scalarValue float64, leftScalar, rightScalar bool) {
164+
if !isFinite(metricValue) || !isFinite(scalarValue) {
165+
return
166+
}
167+
168+
metricBody := []byte(fmt.Sprintf(
169+
`{"results":[{"metric":{"app":"fuzz"},"values":[[1,"%s"]]}]}`,
170+
strconv.FormatFloat(metricValue, 'f', -1, 64),
171+
))
172+
scalarBody := []byte(fmt.Sprintf(
173+
`{"status":"success","data":{"resultType":"scalar","result":[0,"%s"]}}`,
174+
strconv.FormatFloat(scalarValue, 'f', -1, 64),
175+
))
176+
177+
leftBody := metricBody
178+
rightBody := metricBody
179+
leftQL := ""
180+
rightQL := ""
181+
182+
if leftScalar {
183+
leftBody = scalarBody
184+
leftQL = strconv.FormatFloat(scalarValue, 'f', -1, 64)
185+
}
186+
if rightScalar {
187+
rightBody = scalarBody
188+
rightQL = strconv.FormatFloat(scalarValue, 'f', -1, 64)
189+
}
190+
191+
out := combineBinaryMetricResults(leftBody, rightBody, op, "matrix", leftScalar, rightScalar, leftQL, rightQL)
192+
var payload map[string]interface{}
193+
if err := json.Unmarshal(out, &payload); err != nil {
194+
t.Fatalf("combined output must stay valid JSON: %v; body=%s", err, string(out))
195+
}
196+
status, _ := payload["status"].(string)
197+
if status == "" {
198+
t.Fatalf("combined output missing status field: %#v", payload)
199+
}
200+
})
201+
}

0 commit comments

Comments
 (0)