@@ -7887,10 +7887,8 @@ func (p *Proxy) vlPostCoalesced(ctx context.Context, key, path string, params ur
78877887// --- Stats query proxying ---
78887888
78897889func (p * Proxy ) proxyStatsQueryRange (w http.ResponseWriter , r * http.Request , logsqlQuery string ) {
7890- if p .proxyStatsQueryRangeWindowed (w , r , logsqlQuery ) {
7891- return
7892- }
7893-
7890+ // Keep metric query_range as a single backend request. Window splitting and
7891+ // window-level cache reuse are for raw log queries only.
78947892 params := buildStatsQueryRangeParams (logsqlQuery , r .FormValue ("start" ), r .FormValue ("end" ), r .FormValue ("step" ))
78957893
78967894 resp , err := p .vlPost (r .Context (), "/select/logsql/stats_query_range" , params )
@@ -7918,115 +7916,6 @@ func (p *Proxy) proxyStatsQueryRange(w http.ResponseWriter, r *http.Request, log
79187916 w .Write (wrapAsLokiResponse (body , "matrix" ))
79197917}
79207918
7921- func (p * Proxy ) proxyStatsQueryRangeWindowed (w http.ResponseWriter , r * http.Request , logsqlQuery string ) bool {
7922- if ! p .queryRangeWindowing {
7923- return false
7924- }
7925-
7926- startNs , endNs , ok := parseLokiTimeRangeToUnixNano (r .FormValue ("start" ), r .FormValue ("end" ))
7927- if ! ok {
7928- return false
7929- }
7930-
7931- windows := splitQueryRangeWindowsWithOptions (
7932- startNs ,
7933- endNs ,
7934- p .queryRangeSplitInterval ,
7935- "forward" ,
7936- p .queryRangeAlignWindows ,
7937- )
7938- if len (windows ) <= 1 {
7939- return false
7940- }
7941-
7942- p .metrics .RecordQueryRangeWindowCount (len (windows ))
7943- mergedBody , err := p .fetchAndMergeStatsQueryRangeWindows (r .Context (), r , logsqlQuery , windows )
7944- if err != nil {
7945- p .writeError (w , statusFromQueryRangeWindowErr (err ), err .Error ())
7946- return true
7947- }
7948-
7949- mergedBody = p .translateStatsResponseLabelsWithContext (r .Context (), mergedBody , r .FormValue ("query" ))
7950- w .Header ().Set ("Content-Type" , "application/json" )
7951- _ , _ = w .Write (wrapAsLokiResponse (mergedBody , "matrix" ))
7952- return true
7953- }
7954-
7955- func (p * Proxy ) fetchAndMergeStatsQueryRangeWindows (ctx context.Context , r * http.Request , logsqlQuery string , windows []queryRangeWindow ) ([]byte , error ) {
7956- bodies := make ([][]byte , 0 , len (windows ))
7957- for _ , window := range windows {
7958- body , err := p .fetchStatsQueryRangeWindow (ctx , r , logsqlQuery , window )
7959- if err != nil {
7960- return nil , err
7961- }
7962- bodies = append (bodies , body )
7963- }
7964- return mergeStatsQueryRangeResponses (bodies )
7965- }
7966-
7967- func (p * Proxy ) fetchStatsQueryRangeWindow (ctx context.Context , r * http.Request , logsqlQuery string , window queryRangeWindow ) ([]byte , error ) {
7968- fetchCtx := ctx
7969- cancel := func () {}
7970- if p .queryRangeWindowTimeout > 0 {
7971- fetchCtx , cancel = context .WithTimeout (ctx , p .queryRangeWindowTimeout )
7972- }
7973- defer cancel ()
7974-
7975- params := buildStatsQueryRangeParams (
7976- logsqlQuery ,
7977- strconv .FormatInt (window .startNs , 10 ),
7978- strconv .FormatInt (window .endNs , 10 ),
7979- r .FormValue ("step" ),
7980- )
7981-
7982- for attempt := 1 ; attempt <= queryRangeWindowFetchAttempts ; attempt ++ {
7983- fetchStart := time .Now ()
7984- resp , err := p .vlPost (fetchCtx , "/select/logsql/stats_query_range" , params )
7985- fetchDuration := time .Since (fetchStart )
7986- p .metrics .RecordQueryRangeWindowFetchDuration (fetchDuration )
7987-
7988- if err == nil {
7989- body , readErr := readBodyLimited (resp .Body , maxBufferedBackendBodyBytes )
7990- _ = resp .Body .Close ()
7991- if readErr != nil {
7992- err = readErr
7993- } else if resp .StatusCode < http .StatusBadRequest {
7994- body = trimStatsQueryRangeResponseToEnd (body , strconv .FormatInt (window .endNs , 10 ))
7995- p .observeQueryRangeWindowFetch (fetchDuration , false )
7996- return body , nil
7997- } else {
7998- msg := strings .TrimSpace (string (body ))
7999- if msg == "" {
8000- msg = fmt .Sprintf ("VL backend returned %d" , resp .StatusCode )
8001- }
8002- err = & queryRangeWindowHTTPError {status : resp .StatusCode , msg : msg }
8003- }
8004- }
8005-
8006- p .observeQueryRangeWindowFetch (fetchDuration , true )
8007- if attempt >= queryRangeWindowFetchAttempts || ! shouldRetryQueryRangeWindow (err ) {
8008- return nil , err
8009- }
8010-
8011- p .metrics .RecordQueryRangeWindowRetry ()
8012- backoff := queryRangeWindowRetryBackoff (attempt )
8013- p .log .Warn (
8014- "stats_query_range window fetch retrying" ,
8015- "attempt" , attempt + 1 ,
8016- "max_attempts" , queryRangeWindowFetchAttempts ,
8017- "backoff" , backoff .String (),
8018- "error" , err ,
8019- )
8020- select {
8021- case <- fetchCtx .Done ():
8022- return nil , fetchCtx .Err ()
8023- case <- time .After (backoff ):
8024- }
8025- }
8026-
8027- return nil , errors .New ("stats_query_range window fetch failed" )
8028- }
8029-
80307919type statsQueryRangeSeries struct {
80317920 Metric map [string ]interface {} `json:"metric"`
80327921 Values [][]interface {} `json:"values"`
@@ -8039,85 +7928,6 @@ type statsQueryRangeResponse struct {
80397928 Results []statsQueryRangeSeries `json:"results"`
80407929}
80417930
8042- type mergedStatsQueryRangeSeries struct {
8043- metric map [string ]interface {}
8044- values [][]interface {}
8045- seen map [string ]struct {}
8046- }
8047-
8048- func mergeStatsQueryRangeResponses (bodies [][]byte ) ([]byte , error ) {
8049- seriesByKey := make (map [string ]* mergedStatsQueryRangeSeries , len (bodies ))
8050- keys := make ([]string , 0 , len (bodies ))
8051-
8052- for _ , body := range bodies {
8053- var resp statsQueryRangeResponse
8054- if err := json .Unmarshal (body , & resp ); err != nil {
8055- return nil , err
8056- }
8057-
8058- results := resp .Results
8059- if len (results ) == 0 {
8060- results = resp .Data .Result
8061- }
8062- for _ , series := range results {
8063- key := metricKey (series .Metric )
8064- merged , ok := seriesByKey [key ]
8065- if ! ok {
8066- merged = & mergedStatsQueryRangeSeries {
8067- metric : cloneStatsQueryRangeMetric (series .Metric ),
8068- values : make ([][]interface {}, 0 , len (series .Values )),
8069- seen : make (map [string ]struct {}, len (series .Values )),
8070- }
8071- seriesByKey [key ] = merged
8072- keys = append (keys , key )
8073- }
8074- for _ , point := range series .Values {
8075- pointKey := statsQueryRangePointKey (point )
8076- if _ , exists := merged .seen [pointKey ]; exists {
8077- continue
8078- }
8079- merged .seen [pointKey ] = struct {}{}
8080- merged .values = append (merged .values , cloneStatsQueryRangePoint (point ))
8081- }
8082- }
8083- }
8084-
8085- sort .Strings (keys )
8086- results := make ([]map [string ]interface {}, 0 , len (keys ))
8087- for _ , key := range keys {
8088- merged := seriesByKey [key ]
8089- sort .Slice (merged .values , func (i , j int ) bool {
8090- return statsQueryRangePointTimestamp (merged .values [i ]) < statsQueryRangePointTimestamp (merged .values [j ])
8091- })
8092- results = append (results , map [string ]interface {}{
8093- "metric" : merged .metric ,
8094- "values" : merged .values ,
8095- })
8096- }
8097-
8098- return json .Marshal (map [string ]interface {}{
8099- "resultType" : "matrix" ,
8100- "result" : results ,
8101- })
8102- }
8103-
8104- func cloneStatsQueryRangeMetric (metric map [string ]interface {}) map [string ]interface {} {
8105- if len (metric ) == 0 {
8106- return map [string ]interface {}{}
8107- }
8108- cloned := make (map [string ]interface {}, len (metric ))
8109- for k , v := range metric {
8110- cloned [k ] = v
8111- }
8112- return cloned
8113- }
8114-
8115- func cloneStatsQueryRangePoint (point []interface {}) []interface {} {
8116- cloned := make ([]interface {}, len (point ))
8117- copy (cloned , point )
8118- return cloned
8119- }
8120-
81217931func buildStatsQueryRangeParams (logsqlQuery , startRaw , endRaw , stepRaw string ) url.Values {
81227932 params := url.Values {}
81237933 params .Set ("query" , logsqlQuery )
@@ -8195,39 +8005,6 @@ func trimStatsQueryRangeResponseToEnd(body []byte, endRaw string) []byte {
81958005 return encoded
81968006}
81978007
8198- func statsQueryRangePointKey (point []interface {}) string {
8199- if len (point ) == 0 {
8200- return ""
8201- }
8202- return fmt .Sprintf ("%v" , point [0 ])
8203- }
8204-
8205- func statsQueryRangePointTimestamp (point []interface {}) float64 {
8206- if len (point ) == 0 {
8207- return 0
8208- }
8209- switch ts := point [0 ].(type ) {
8210- case float64 :
8211- return ts
8212- case float32 :
8213- return float64 (ts )
8214- case int :
8215- return float64 (ts )
8216- case int64 :
8217- return float64 (ts )
8218- case int32 :
8219- return float64 (ts )
8220- case json.Number :
8221- value , _ := ts .Float64 ()
8222- return value
8223- case string :
8224- value , _ := strconv .ParseFloat (ts , 64 )
8225- return value
8226- default :
8227- return 0
8228- }
8229- }
8230-
82318008func statsQueryRangePointUnixNano (point []interface {}) int64 {
82328009 if len (point ) == 0 {
82338010 return 0
0 commit comments