Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion internal/generator/vector/conf/complex.toml
Original file line number Diff line number Diff line change
Expand Up @@ -719,7 +719,21 @@ source = '''
# escape 'new line' symbol see: LOG-8090
msg = to_string!(del(._internal.kubernetes.event.message))
._internal.message = replace(msg, "\n", s'\n')
._internal."@timestamp" = .kubernetes.event.metadata.creationTimestamp
# Determine event timestamp: prefer lastTimestamp, then firstTimestamp (v1 events),
# fall back to eventTime (events.k8s.io/v1), then creationTimestamp
# lastTimestamp -> firstTimestamp -> eventTime -> creationTimestamp
ts = ._internal.kubernetes.event.lastTimestamp
if ts == null || ts == "" || ts == "0001-01-01T00:00:00Z" {
ts = ._internal.kubernetes.event.firstTimestamp
}
if ts == null || ts == "" || ts == "0001-01-01T00:00:00Z" {
ts = ._internal.kubernetes.event.eventTime
}
if ts == null || ts == "" {
ts = ._internal.kubernetes.event.metadata.creationTimestamp
}
._internal."@timestamp" = ts
._internal.timestamp = ts
} else {
log("Unable to merge EventRouter log message into record: " + err, level: "info")
}
Expand Down
16 changes: 15 additions & 1 deletion internal/generator/vector/conf/complex_http_receiver.toml
Original file line number Diff line number Diff line change
Expand Up @@ -757,7 +757,21 @@ source = '''
# escape 'new line' symbol see: LOG-8090
msg = to_string!(del(._internal.kubernetes.event.message))
._internal.message = replace(msg, "\n", s'\n')
._internal."@timestamp" = .kubernetes.event.metadata.creationTimestamp
# Determine event timestamp: prefer lastTimestamp, then firstTimestamp (v1 events),
# fall back to eventTime (events.k8s.io/v1), then creationTimestamp
# lastTimestamp -> firstTimestamp -> eventTime -> creationTimestamp
ts = ._internal.kubernetes.event.lastTimestamp
if ts == null || ts == "" || ts == "0001-01-01T00:00:00Z" {
ts = ._internal.kubernetes.event.firstTimestamp
}
if ts == null || ts == "" || ts == "0001-01-01T00:00:00Z" {
ts = ._internal.kubernetes.event.eventTime
}
if ts == null || ts == "" {
ts = ._internal.kubernetes.event.metadata.creationTimestamp
}
._internal."@timestamp" = ts
._internal.timestamp = ts
} else {
log("Unable to merge EventRouter log message into record: " + err, level: "info")
}
Expand Down
16 changes: 15 additions & 1 deletion internal/generator/vector/conf/container.toml
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,21 @@ source = '''
# escape 'new line' symbol see: LOG-8090
msg = to_string!(del(._internal.kubernetes.event.message))
._internal.message = replace(msg, "\n", s'\n')
._internal."@timestamp" = .kubernetes.event.metadata.creationTimestamp
# Determine event timestamp: prefer lastTimestamp, then firstTimestamp (v1 events),
# fall back to eventTime (events.k8s.io/v1), then creationTimestamp
# lastTimestamp -> firstTimestamp -> eventTime -> creationTimestamp
ts = ._internal.kubernetes.event.lastTimestamp
if ts == null || ts == "" || ts == "0001-01-01T00:00:00Z" {
ts = ._internal.kubernetes.event.firstTimestamp
}
if ts == null || ts == "" || ts == "0001-01-01T00:00:00Z" {
ts = ._internal.kubernetes.event.eventTime
}
if ts == null || ts == "" {
ts = ._internal.kubernetes.event.metadata.creationTimestamp
}
._internal."@timestamp" = ts
._internal.timestamp = ts
} else {
log("Unable to merge EventRouter log message into record: " + err, level: "info")
}
Expand Down
45 changes: 28 additions & 17 deletions internal/generator/vector/filter/openshift/viaq/v1/normalize.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,33 @@ const (
HandleEventRouterLog = `

if exists(._internal.kubernetes.pod_name) && starts_with(string!(._internal.kubernetes.pod_name), "eventrouter-") {

parsed, err = parse_json(._internal.message)
if err != null {
log("Unable to process EventRouter log: " + err, level: "info")
} else {
._internal.event = parsed
if exists(._internal.event.event) && is_object(._internal.event.event) {
._internal.kubernetes.event = del(._internal.event.event)
._internal.kubernetes.event.verb = del(._internal.event.verb)
# escape 'new line' symbol see: LOG-8090
msg = to_string!(del(._internal.kubernetes.event.message))
._internal.message = replace(msg, "\n", s'\n')
._internal."@timestamp" = .kubernetes.event.metadata.creationTimestamp
._internal.kubernetes.event = del(._internal.event.event)
._internal.kubernetes.event.verb = del(._internal.event.verb)
# escape 'new line' symbol see: LOG-8090
msg = to_string!(del(._internal.kubernetes.event.message))
._internal.message = replace(msg, "\n", s'\n')
# Determine event timestamp: prefer lastTimestamp, then firstTimestamp (v1 events),
# fall back to eventTime (events.k8s.io/v1), then creationTimestamp
# lastTimestamp -> firstTimestamp -> eventTime -> creationTimestamp
ts = ._internal.kubernetes.event.lastTimestamp
if ts == null || ts == "" || ts == "0001-01-01T00:00:00Z" {
ts = ._internal.kubernetes.event.firstTimestamp
}
if ts == null || ts == "" || ts == "0001-01-01T00:00:00Z" {
ts = ._internal.kubernetes.event.eventTime
}
if ts == null || ts == "" {
ts = ._internal.kubernetes.event.metadata.creationTimestamp
}
._internal."@timestamp" = ts
._internal.timestamp = ts
Comment on lines +22 to +33
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Action required

1. Null timestamp clobbers time 🐞 Bug ≡ Correctness

The EventRouter timestamp selection unconditionally assigns ._internal.timestamp = ts even when
all candidate event timestamp fields are null/empty, which can erase the original CRI/source
timestamp and emit records with null/zero @timestamp/timestamp. This can break downstream
time-based indexing/sorting and violates the prior behavior where the CRI timestamp would have
remained intact if the event lacked timestamps.
Agent Prompt
### Issue description
EventRouter timestamp fallback assigns `._internal.timestamp = ts` even when `ts` remains `null`/empty after checking `lastTimestamp`, `firstTimestamp`, `eventTime`, and `metadata.creationTimestamp`. This can overwrite the original container log timestamp (from the kubernetes_logs source) and lead to output records with missing/zero `@timestamp` and `timestamp`.

### Issue Context
The Vector pipeline ultimately sets root `.timestamp` and `.@timestamp` from `._internal.timestamp`, so clobbering it with null propagates to all sinks.

### Fix Focus Areas
- internal/generator/vector/filter/openshift/viaq/v1/normalize.go[19-33]
- internal/generator/vector/conf/complex.toml[722-736]
- internal/generator/vector/conf/complex_http_receiver.toml[760-774]
- internal/generator/vector/conf/container.toml[313-327]

### Suggested change
After the fallback chain, add a final safeguard:
- If `ts` is still `null`/""/"0001-01-01T00:00:00Z", fall back to the pre-existing `._internal.timestamp` (the CRI/source timestamp) or `now()`.
- Only override `._internal.timestamp` (and any related fields) when `ts` is valid.

Example VRL pattern:
```vrl
# ...existing fallback chain...
if ts == null || ts == "" || ts == "0001-01-01T00:00:00Z" {
  ts = ._internal.timestamp
}
if ts != null && ts != "" && ts != "0001-01-01T00:00:00Z" {
  ._internal.timestamp = ts
  ._internal."@timestamp" = ts
}
```

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools

} else {
log("Unable to merge EventRouter log message into record: " + err, level: "info")
}
Expand All @@ -34,22 +48,20 @@ if !exists(._internal.level) {
message = ._internal.message

# attempt 1: parse as logfmt (e.g. level=error msg="Failed to connect")

parsed_logfmt, err = parse_logfmt(message)
parsed_logfmt, err = parse_logfmt(message)
if err == null && is_string(parsed_logfmt.level) {
level = downcase!(parsed_logfmt.level)
}

# attempt 2: parse as klog (e.g. I0920 14:22:00.089385 1 scheduler.go:592] "Successfully bound pod to node")
if level == null {
parsed_klog, err = parse_klog(message)
parsed_klog, err = parse_klog(message)
if err == null && is_string(parsed_klog.level) {
level = parsed_klog.level
}
}

# attempt 3: parse with groks template (if previous attempts failed) for classic text logs like Logback, Log4j etc.

# attempt 3: parse with groks template (if previous attempts failed) for classic text logs like Logback, Log4j etc.
if level == null {
parsed_grok, err = parse_groks(message,
patterns: [
Expand All @@ -62,18 +74,17 @@ if !exists(._internal.level) {
"_message": "%{GREEDYDATA:message}"
}
)

if err == null && is_string(parsed_grok.level) {
level = downcase!(parsed_grok.level)
level = downcase!(parsed_grok.level)
}
}

if level == null {
level = "default"

# attempt 4: Match on well known structured patterns
# Order: emergency, alert, critical, error, warn, notice, info, debug, trace

if match!(message, r'^EM[0-9]+|level=emergency|Value:emergency|"level":"emergency"') {
level = "emergency"
} else if match!(message, r'^A[0-9]+|level=alert|Value:alert|"level":"alert"') {
Expand All @@ -93,7 +104,7 @@ if !exists(._internal.level) {
} else if match!(message, r'^T[0-9]+|level=trace|Value:trace|"level":"trace"') {
level = "trace"
}

# attempt 5: Match on the keyword that appears earliest in the message
if level == "default" {
level_patterns = r'(?i)(?<emergency>emergency|<emergency>)|(?<alert>alert|<alert>)|(?<critical>critical|<critical>)|(?<error>error|<error>)|(?<warn>warn(?:ing)?|<warn>)|(?<notice>notice|<notice>)|(?:\b(?<info>info)\b|<info>)|(?<debug>debug|<debug>)|(?<trace>trace|<trace>)'
Expand Down
115 changes: 95 additions & 20 deletions test/functional/normalization/eventrouter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@ package normalization

import (
"encoding/json"
"fmt"
"strings"
"time"

"github.com/openshift/cluster-logging-operator/test/framework/functional"
"github.com/openshift/cluster-logging-operator/test/helpers/loki"
"github.com/openshift/cluster-logging-operator/test/helpers/syslog"
testruntime "github.com/openshift/cluster-logging-operator/test/runtime/observability"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
Expand All @@ -23,33 +26,39 @@ import (

var _ = Describe("[Functional][Normalization] Messages from EventRouter", func() {

const timestamp string = "1985-10-21T09:00:00.00000+00:00"
var (
framework *functional.CollectorFunctionalFramework
writeMsg func(msg string) error
framework *functional.CollectorFunctionalFramework
l *loki.Receiver
ts = time.Now().UTC()

templateForAnyKubernetesWithEvents = types.KubernetesWithEvent{
Kubernetes: functional.TemplateForAnyKubernetes,
}

NewEventDataBuilder = func(verb, message string, podRef *corev1.ObjectReference) types.EventData {
newEvent := types.NewEvent(podRef, corev1.EventTypeNormal, "reason", message)
if verb == "UPDATED" {
oldEvent := types.NewEvent(podRef, corev1.EventTypeWarning, "old_reason", "old_"+message)
return types.EventData{Verb: "UPDATED", Event: newEvent, OldEvent: oldEvent}
} else {
return types.EventData{Verb: "ADDED", Event: newEvent}
}
return types.EventData{Verb: "ADDED", Event: newEvent}
}

ExpectedLogTemplateBuilder = func(event, oldEvent *corev1.Event) types.EventRouterLog {
ExpectedLogTemplateBuilder = func(event, oldEvent *corev1.Event, outputType obs.OutputType) types.EventRouterLog {
tsTruncated := ts.Truncate(time.Second)
timestamp := tsTruncated
if outputType == obs.OutputTypeLoki {
timestamp = time.Time{}
}
tmpl := types.EventRouterLog{
Kubernetes: templateForAnyKubernetesWithEvents,
ViaQCommon: types.ViaQCommon{
Message: event.Message,
Level: types.AnyString,
Hostname: types.AnyString,
PipelineMetadata: types.PipelineMetadata{},
Timestamp: time.Time{},
TimestampLegacy: time.Time{},
Timestamp: timestamp,
TimestampLegacy: tsTruncated,
LogSource: string(obs.InfrastructureSourceContainer),
LogType: string(obs.InputTypeApplication),
Openshift: types.OpenshiftMeta{
Expand Down Expand Up @@ -77,7 +86,7 @@ var _ = Describe("[Functional][Normalization] Messages from EventRouter", func()
parseLogs = func(raw []string, outputType obs.OutputType) ([]types.EventRouterLog, error) {
var logs []types.EventRouterLog
switch outputType {
case obs.OutputTypeHTTP:
case obs.OutputTypeHTTP, obs.OutputTypeLoki:
err := types.StrictlyParseLogs(utils.ToJsonLogs(raw), &logs)
return logs, err
case obs.OutputTypeSyslog:
Expand All @@ -94,39 +103,62 @@ var _ = Describe("[Functional][Normalization] Messages from EventRouter", func()
)

DescribeTable("should be normalized to the ViaQ data model when sinking to different outputs", func(outputType obs.OutputType, verb, message, expectedMessage string) {
crioTimestamp := functional.CRIOTime(ts)
framework = functional.NewCollectorFunctionalFramework()
if outputType == obs.OutputTypeLoki {
l = loki.NewReceiver(framework.Namespace, "loki-server")
Expect(l.Create(framework.Test.Client)).To(Succeed())
}
builder := testruntime.NewClusterLogForwarderBuilder(framework.Forwarder).
FromInput(obs.InputTypeApplication)
if outputType == obs.OutputTypeHTTP {
switch outputType {
case obs.OutputTypeHTTP:
builder.ToHttpOutput()
}
if outputType == obs.OutputTypeSyslog {
case obs.OutputTypeSyslog:
builder.ToSyslogOutput(obs.SyslogRFC5424)
case obs.OutputTypeLoki:
builder.ToLokiOutput(*l.InternalURL(""))
}

writeMsg = func(msg string) error {
return framework.WriteMessagesToApplicationLog(msg, 1)
}
framework.VisitConfig = func(conf string) string {
return strings.Replace(conf, `"eventrouter-"`, `"functional"`, 1)
}
Expect(framework.Deploy()).To(BeNil())
podRef, err := reference.GetReference(scheme.Scheme, types.NewMockPod())
Expect(err).To(BeNil())
newEventData := NewEventDataBuilder(verb, message, podRef)
tsSec := ts.Truncate(time.Second)
newEventData.Event.FirstTimestamp = metav1.NewTime(tsSec)
newEventData.Event.LastTimestamp = metav1.NewTime(tsSec)
if verb == "UPDATED" {
tsOld := tsSec.Add(-10 * time.Minute)
newEventData.Event.FirstTimestamp = metav1.NewTime(tsOld)
newEventData.OldEvent.FirstTimestamp = metav1.NewTime(tsOld)
newEventData.OldEvent.LastTimestamp = metav1.NewTime(tsSec)
}
jsonBytes, _ := json.Marshal(newEventData)
jsonStr := string(jsonBytes)
msg := functional.NewCRIOLogMessage(timestamp, jsonStr, false)
err = writeMsg(msg)
msg := functional.NewCRIOLogMessage(crioTimestamp, jsonStr, false)
err = framework.WriteMessagesToApplicationLog(msg, 1)
Expect(err).To(BeNil())

raw, err := framework.ReadRawApplicationLogsFrom(string(outputType))
Expect(err).To(BeNil(), "Expected no errors reading the logs")
var raw []string
if outputType == obs.OutputTypeLoki {
query := fmt.Sprintf(`{kubernetes_namespace_name=%q}`, framework.Namespace)
result, err := l.QueryUntil(query, "", 1)
Expect(err).To(BeNil())
Expect(result).To(HaveLen(1))
raw = result[0].Lines()
} else {
raw, err = framework.ReadRawApplicationLogsFrom(string(outputType))
Expect(err).To(BeNil(), "Expected no errors reading the logs")
}

logs, err := parseLogs(raw, outputType)
Expect(err).To(BeNil(), "Expected no errors parsing the logs")
expectedEventData := newEventData
expectedEventData.Event.Message = expectedMessage
var expectedLogTemplate = ExpectedLogTemplateBuilder(expectedEventData.Event, expectedEventData.OldEvent)
expectedLogTemplate := ExpectedLogTemplateBuilder(expectedEventData.Event, expectedEventData.OldEvent, outputType)
Expect(logs[0]).To(matchers.FitLogFormatTemplate(expectedLogTemplate))
},
Entry("with HTTP output for ADDED events", obs.OutputTypeHTTP, "ADDED", "simple syslog message", "simple syslog message"),
Expand All @@ -135,6 +167,49 @@ var _ = Describe("[Functional][Normalization] Messages from EventRouter", func()
Entry("with Syslog output for UPDATED events", obs.OutputTypeSyslog, "UPDATED", "simple syslog message", "simple syslog message"),
Entry("with Syslog output for ADDED events and new line symbol", obs.OutputTypeSyslog, "ADDED", "syslog message\n with new line", "syslog message\\n with new line"),
Entry("with Syslog output for UPDATED events and new line symbol", obs.OutputTypeSyslog, "UPDATED", "syslog message\n with new line", "syslog message\\n with new line"),
Entry("with Loki output for ADDED events", obs.OutputTypeLoki, "ADDED", "simple syslog message", "simple syslog message"),
Entry("with Loki output for UPDATED events", obs.OutputTypeLoki, "UPDATED", "simple syslog message", "simple syslog message"),
)

DescribeTable("should use the correct timestamp fallback for @timestamp", func(setTimestamps func(event *corev1.Event, ts time.Time)) {
crioTimestamp := functional.CRIOTime(ts)
framework = functional.NewCollectorFunctionalFramework()
builder := testruntime.NewClusterLogForwarderBuilder(framework.Forwarder).
FromInput(obs.InputTypeApplication)
builder.ToHttpOutput()

framework.VisitConfig = func(conf string) string {
return strings.Replace(conf, `"eventrouter-"`, `"functional"`, 1)
}
Expect(framework.Deploy()).To(BeNil())
podRef, err := reference.GetReference(scheme.Scheme, types.NewMockPod())
Expect(err).To(BeNil())
newEventData := NewEventDataBuilder("ADDED", "test message", podRef)
setTimestamps(newEventData.Event, ts.Truncate(time.Second))
jsonBytes, _ := json.Marshal(newEventData)
msg := functional.NewCRIOLogMessage(crioTimestamp, string(jsonBytes), false)
err = framework.WriteMessagesToApplicationLog(msg, 1)
Expect(err).To(BeNil())

raw, err := framework.ReadRawApplicationLogsFrom(string(obs.OutputTypeHTTP))
Expect(err).To(BeNil(), "Expected no errors reading the logs")
logs, err := parseLogs(raw, obs.OutputTypeHTTP)
Expect(err).To(BeNil(), "Expected no errors parsing the logs")
Expect(logs[0].TimestampLegacy.IsZero()).To(BeFalse(), "Expected @timestamp to be set via timestamp fallback chain")
},
Entry("lastTimestamp is set", func(event *corev1.Event, ts time.Time) {
event.LastTimestamp = metav1.NewTime(ts)
event.FirstTimestamp = metav1.NewTime(ts.Add(-5 * time.Minute))
}),
Entry("lastTimestamp is zero, falls back to firstTimestamp", func(event *corev1.Event, ts time.Time) {
event.FirstTimestamp = metav1.NewTime(ts)
}),
Entry("lastTimestamp and firstTimestamp are zero, falls back to eventTime", func(event *corev1.Event, ts time.Time) {
event.EventTime = metav1.NewMicroTime(ts)
}),
Entry("all timestamps zero, falls back to creationTimestamp", func(event *corev1.Event, ts time.Time) {
event.CreationTimestamp = metav1.NewTime(ts)
}),
)

AfterEach(func() {
Expand Down
3 changes: 3 additions & 0 deletions test/helpers/loki/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,9 @@ func (r *Receiver) Query(logQL string, orgID string, limit int) ([]StreamValues,
q.Add("query", logQL)
q.Add("limit", strconv.Itoa(limit))
q.Add("direction", "FORWARD")
now := time.Now()
q.Add("start", strconv.FormatInt(now.Add(-24*time.Hour).UnixNano(), 10))
q.Add("end", strconv.FormatInt(now.Add(24*time.Hour).UnixNano(), 10))
u.RawQuery = q.Encode()
log.V(3).Info("Loki Query", "url", u.String(), "org-id", orgID)
header := http.Header{}
Expand Down