Skip to content

Commit 8b37e7f

Browse files
authored
patterns: compact persisted snapshot state (#214)
* patterns: compact persisted snapshot state * docs: restore unreleased patterns entry * fix: remove dead pattern snapshot helper type
1 parent ed4660c commit 8b37e7f

3 files changed

Lines changed: 204 additions & 16 deletions

File tree

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## [Unreleased]
99

10+
### Bug Fixes
11+
12+
- patterns/persistence: store query-level compact pattern snapshots for persistence instead of range-shaped filled payload variants, reducing snapshot entry churn and write amplification when the same logical Drilldown query is refreshed across different time windows.
13+
1014
## [1.9.4] - 2026-04-20
1115

1216
### Bug Fixes

internal/proxy/patterns_persistence_test.go

Lines changed: 83 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -423,14 +423,74 @@ func TestPatternSnapshotDedup_Update_RemovesExactDuplicatePayload(t *testing.T)
423423
if len(p.patternsSnapshotEntries) != 1 {
424424
t.Fatalf("expected one deduplicated snapshot entry, got %d", len(p.patternsSnapshotEntries))
425425
}
426-
if _, ok := p.patternsSnapshotEntries[keyNew]; !ok {
427-
t.Fatalf("expected newer cache key %q to be retained after deduplication", keyNew)
426+
canonicalKey := canonicalPatternSnapshotCacheKey(keyNew)
427+
if _, ok := p.patternsSnapshotEntries[canonicalKey]; !ok {
428+
t.Fatalf("expected canonical snapshot key %q to be retained after deduplication", canonicalKey)
428429
}
429-
if _, ok := p.patternsSnapshotEntries[keyOld]; ok {
430-
t.Fatalf("expected older duplicate cache key %q to be removed", keyOld)
430+
if _, _, hit := p.cache.GetWithTTL(keyOld); !hit {
431+
t.Fatalf("expected request-scoped cache key %q to remain untouched", keyOld)
431432
}
432-
if _, _, hit := p.cache.GetWithTTL(keyOld); hit {
433-
t.Fatalf("expected dropped cache key %q to be invalidated", keyOld)
433+
}
434+
435+
func TestPatternSnapshotEntry_CanonicalizesRangeKeysAndCompactsSamples(t *testing.T) {
436+
backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
437+
w.WriteHeader(http.StatusOK)
438+
}))
439+
defer backend.Close()
440+
441+
p := newPatternPersistenceProxy(t, backend.URL, filepath.Join(t.TempDir(), "patterns.snapshot.json"), true)
442+
t.Cleanup(func() {
443+
_ = p.Shutdown(context.Background())
444+
})
445+
446+
query := `{service_name="api"}`
447+
keyA := p.patternsAutodetectCacheKey("org-a", query, "1", "181", "60s")
448+
keyB := p.patternsAutodetectCacheKey("org-a", query, "61", "241", "60s")
449+
payload := mustMarshalJSON(t, patternsResponse{
450+
Status: "success",
451+
Data: []patternResultEntry{
452+
{
453+
Pattern: "request <_> completed",
454+
Samples: [][]interface{}{
455+
{int64(60), 0},
456+
{int64(120), 4},
457+
{int64(180), 0},
458+
},
459+
},
460+
},
461+
})
462+
463+
p.recordPatternSnapshotEntry(keyA, payload, time.Unix(1710000000, 0).UTC())
464+
465+
canonicalKey := canonicalPatternSnapshotCacheKey(keyA)
466+
p.patternsSnapshotMu.RLock()
467+
entry, ok := p.patternsSnapshotEntries[canonicalKey]
468+
p.patternsSnapshotMu.RUnlock()
469+
if !ok {
470+
t.Fatalf("expected canonical snapshot key %q to be stored", canonicalKey)
471+
}
472+
473+
var stored patternsResponse
474+
if err := json.Unmarshal(entry.Value, &stored); err != nil {
475+
t.Fatalf("decode stored snapshot payload: %v", err)
476+
}
477+
if len(stored.Data) != 1 {
478+
t.Fatalf("expected one persisted pattern entry, got %#v", stored.Data)
479+
}
480+
if got := stored.Data[0].Samples; len(got) != 1 || got[0][0] != float64(120) || got[0][1] != float64(4) {
481+
t.Fatalf("expected compacted non-zero samples only, got %#v", got)
482+
}
483+
484+
p.patternsPersistDirty.Store(false)
485+
p.recordPatternSnapshotEntry(keyB, payload, time.Unix(1710000001, 0).UTC())
486+
if p.patternsPersistDirty.Load() {
487+
t.Fatal("expected canonical identical snapshot update to avoid marking persistence dirty")
488+
}
489+
490+
p.patternsSnapshotMu.RLock()
491+
defer p.patternsSnapshotMu.RUnlock()
492+
if len(p.patternsSnapshotEntries) != 1 {
493+
t.Fatalf("expected one canonical snapshot entry, got %d", len(p.patternsSnapshotEntries))
434494
}
435495
}
436496

@@ -466,8 +526,20 @@ func TestPatternSnapshotDedup_Update_PreservesDistinctPayloads(t *testing.T) {
466526

467527
p.patternsSnapshotMu.RLock()
468528
defer p.patternsSnapshotMu.RUnlock()
469-
if len(p.patternsSnapshotEntries) != 2 {
470-
t.Fatalf("expected distinct payloads to remain separate, got %d entries", len(p.patternsSnapshotEntries))
529+
if len(p.patternsSnapshotEntries) != 1 {
530+
t.Fatalf("expected latest query-level snapshot to replace older range variants, got %d entries", len(p.patternsSnapshotEntries))
531+
}
532+
canonicalKey := canonicalPatternSnapshotCacheKey(keyB)
533+
entry, ok := p.patternsSnapshotEntries[canonicalKey]
534+
if !ok {
535+
t.Fatalf("expected canonical key %q to be retained", canonicalKey)
536+
}
537+
var stored patternsResponse
538+
if err := json.Unmarshal(entry.Value, &stored); err != nil {
539+
t.Fatalf("decode stored payload: %v", err)
540+
}
541+
if len(stored.Data) != 1 || stored.Data[0].Pattern != "request <_> failed" {
542+
t.Fatalf("expected latest payload to win for canonical snapshot, got %#v", stored.Data)
471543
}
472544
}
473545

@@ -508,8 +580,9 @@ func TestPatternSnapshotDedup_PeerMerge_RemovesExactDuplicates(t *testing.T) {
508580
if len(p.patternsSnapshotEntries) != 1 {
509581
t.Fatalf("expected peer merge dedup to keep one entry, got %d", len(p.patternsSnapshotEntries))
510582
}
511-
if _, ok := p.patternsSnapshotEntries[keyB]; !ok {
512-
t.Fatalf("expected newer peer key %q to be retained", keyB)
583+
canonicalKey := canonicalPatternSnapshotCacheKey(keyB)
584+
if _, ok := p.patternsSnapshotEntries[canonicalKey]; !ok {
585+
t.Fatalf("expected canonical peer key %q to be retained", canonicalKey)
513586
}
514587
if _, _, hit := p.cache.GetWithTTL(keyA); hit {
515588
t.Fatalf("expected duplicate peer key %q to be invalidated from cache", keyA)

internal/proxy/proxy.go

Lines changed: 117 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2776,6 +2776,11 @@ func (p *Proxy) recordPatternSnapshotEntry(cacheKey string, payload []byte, now
27762776
if !p.patternsEnabled {
27772777
return
27782778
}
2779+
canonicalKey := canonicalPatternSnapshotCacheKey(cacheKey)
2780+
if canonicalKey != "" {
2781+
cacheKey = canonicalKey
2782+
}
2783+
payload = normalizePatternSnapshotPayload(payload)
27792784
patternCount := patternCountFromPayload(payload)
27802785
if patternCount > 0 {
27812786
p.metrics.RecordPatternsStored(patternCount)
@@ -2907,29 +2912,34 @@ func (p *Proxy) applyPatternsSnapshot(snapshot patternsSnapshot, source string)
29072912
if strings.TrimSpace(key) == "" || len(incoming.Value) == 0 {
29082913
continue
29092914
}
2915+
canonicalKey := canonicalPatternSnapshotCacheKey(key)
2916+
if strings.TrimSpace(canonicalKey) == "" {
2917+
canonicalKey = key
2918+
}
2919+
incoming.Value = normalizePatternSnapshotPayload(incoming.Value)
29102920
if incoming.UpdatedAtUnixNano <= 0 {
29112921
incoming.UpdatedAtUnixNano = nowUnix
29122922
}
2913-
if existing, ok := p.patternsSnapshotEntries[key]; ok && existing.UpdatedAtUnixNano >= incoming.UpdatedAtUnixNano {
2923+
if existing, ok := p.patternsSnapshotEntries[canonicalKey]; ok && existing.UpdatedAtUnixNano >= incoming.UpdatedAtUnixNano {
29142924
continue
29152925
}
29162926
incomingPatternCount := incoming.PatternCount
29172927
if incomingPatternCount <= 0 {
29182928
incomingPatternCount = patternCountFromPayload(incoming.Value)
29192929
}
29202930
copied := append([]byte(nil), incoming.Value...)
2921-
if existing, ok := p.patternsSnapshotEntries[key]; ok {
2931+
if existing, ok := p.patternsSnapshotEntries[canonicalKey]; ok {
29222932
p.patternsSnapshotPatternCount -= int64(existing.PatternCount)
29232933
p.patternsSnapshotPayloadBytes -= int64(len(existing.Value))
29242934
}
2925-
p.patternsSnapshotEntries[key] = patternSnapshotEntry{
2935+
p.patternsSnapshotEntries[canonicalKey] = patternSnapshotEntry{
29262936
Value: copied,
29272937
UpdatedAtUnixNano: incoming.UpdatedAtUnixNano,
29282938
PatternCount: incomingPatternCount,
29292939
}
29302940
p.patternsSnapshotPatternCount += int64(incomingPatternCount)
29312941
p.patternsSnapshotPayloadBytes += int64(len(copied))
2932-
p.cache.SetWithTTL(key, copied, patternsCacheRetention)
2942+
p.cache.SetWithTTL(canonicalKey, copied, patternsCacheRetention)
29332943
appliedEntries++
29342944
appliedPatterns += incomingPatternCount
29352945
}
@@ -2967,6 +2977,93 @@ func patternSnapshotIdentityFromCacheKey(cacheKey string) string {
29672977
return orgID + "\x00" + query
29682978
}
29692979

2980+
func canonicalPatternSnapshotCacheKey(cacheKey string) string {
2981+
cacheKey = strings.TrimSpace(cacheKey)
2982+
if cacheKey == "" {
2983+
return ""
2984+
}
2985+
parts := strings.SplitN(cacheKey, ":", 3)
2986+
if len(parts) != 3 || parts[0] != "patterns" {
2987+
return cacheKey
2988+
}
2989+
orgID := strings.TrimSpace(parts[1])
2990+
params, err := url.ParseQuery(parts[2])
2991+
if err != nil {
2992+
return cacheKey
2993+
}
2994+
query := patternScopeQuery(params.Get("query"))
2995+
if strings.TrimSpace(query) == "" {
2996+
return cacheKey
2997+
}
2998+
canonical := url.Values{}
2999+
canonical.Set("query", query)
3000+
return "patterns:" + orgID + ":" + canonical.Encode()
3001+
}
3002+
3003+
func normalizePatternSnapshotPayload(payload []byte) []byte {
3004+
if len(payload) == 0 {
3005+
return payload
3006+
}
3007+
var resp patternsResponse
3008+
if err := json.Unmarshal(payload, &resp); err != nil {
3009+
return payload
3010+
}
3011+
changed := false
3012+
for i := range resp.Data {
3013+
entry := resp.Data[i]
3014+
if len(entry.Samples) == 0 {
3015+
continue
3016+
}
3017+
compacted := compactPatternSnapshotSamples(entry.Samples)
3018+
if len(compacted) != len(entry.Samples) {
3019+
changed = true
3020+
}
3021+
entry.Samples = compacted
3022+
resp.Data[i] = entry
3023+
}
3024+
if !changed {
3025+
return payload
3026+
}
3027+
encoded, err := json.Marshal(resp)
3028+
if err != nil {
3029+
return payload
3030+
}
3031+
return encoded
3032+
}
3033+
3034+
func compactPatternSnapshotSamples(samples [][]interface{}) [][]interface{} {
3035+
if len(samples) == 0 {
3036+
return samples
3037+
}
3038+
byTimestamp := make(map[int64]int, len(samples))
3039+
order := make([]int64, 0, len(samples))
3040+
for _, pair := range samples {
3041+
if len(pair) < 2 {
3042+
continue
3043+
}
3044+
ts, okTS := numberToInt64(pair[0])
3045+
count, okCount := numberToInt(pair[1])
3046+
if !okTS || !okCount || count <= 0 {
3047+
continue
3048+
}
3049+
if _, seen := byTimestamp[ts]; !seen {
3050+
order = append(order, ts)
3051+
}
3052+
if count > byTimestamp[ts] {
3053+
byTimestamp[ts] = count
3054+
}
3055+
}
3056+
if len(byTimestamp) == 0 {
3057+
return [][]interface{}{}
3058+
}
3059+
sort.Slice(order, func(i, j int) bool { return order[i] < order[j] })
3060+
compacted := make([][]interface{}, 0, len(order))
3061+
for _, ts := range order {
3062+
compacted = append(compacted, []interface{}{ts, byTimestamp[ts]})
3063+
}
3064+
return compacted
3065+
}
3066+
29703067
func betterPatternSnapshotEntry(current, incoming patternSnapshotEntry, currentKey, incomingKey string) bool {
29713068
if incoming.UpdatedAtUnixNano != current.UpdatedAtUnixNano {
29723069
return incoming.UpdatedAtUnixNano > current.UpdatedAtUnixNano
@@ -5328,6 +5425,16 @@ func (p *Proxy) handlePatterns(w http.ResponseWriter, r *http.Request) {
53285425
}
53295426
p.recordPatternFetchDiagnostics(diag)
53305427
p.metrics.RecordPatternsDetected(len(entries))
5428+
snapshotBody := []byte(nil)
5429+
if len(entries) > 0 {
5430+
rawSnapshotBody, marshalErr := json.Marshal(patternsResponse{
5431+
Status: "success",
5432+
Data: entries,
5433+
})
5434+
if marshalErr == nil {
5435+
snapshotBody = rawSnapshotBody
5436+
}
5437+
}
53315438
entries = p.prependCustomPatternEntries(entries, startParam, stepParam, patternLimit)
53325439
entries = fillPatternSamplesAcrossRequestedRange(entries, startParam, endParam, stepParam)
53335440
resultBody, err := json.Marshal(patternsResponse{
@@ -5344,11 +5451,15 @@ func (p *Proxy) handlePatterns(w http.ResponseWriter, r *http.Request) {
53445451
// Avoid sticky empty results: first-call empty probes should not poison long-lived pattern cache entries.
53455452
if len(entries) > 0 {
53465453
now := time.Now().UTC()
5454+
snapshotPayload := snapshotBody
5455+
if len(snapshotPayload) == 0 {
5456+
snapshotPayload = resultBody
5457+
}
53475458
p.cache.SetWithTTL(cacheWriteKey, resultBody, patternsCacheRetention)
5348-
p.recordPatternSnapshotEntry(cacheWriteKey, resultBody, now)
5459+
p.recordPatternSnapshotEntry(cacheWriteKey, snapshotPayload, now)
53495460
if derivedStepCacheKey != "" && derivedStepCacheKey != cacheWriteKey {
53505461
p.cache.SetWithTTL(derivedStepCacheKey, resultBody, patternsCacheRetention)
5351-
p.recordPatternSnapshotEntry(derivedStepCacheKey, resultBody, now)
5462+
p.recordPatternSnapshotEntry(derivedStepCacheKey, snapshotPayload, now)
53525463
}
53535464
}
53545465
w.Header().Set("Content-Type", "application/json")

0 commit comments

Comments
 (0)