diff --git a/internal/generator/vector/conf/complex.toml b/internal/generator/vector/conf/complex.toml index 86a7625f44..b244e97646 100644 --- a/internal/generator/vector/conf/complex.toml +++ b/internal/generator/vector/conf/complex.toml @@ -719,7 +719,21 @@ source = ''' # escape 'new line' symbol see: LOG-8090 msg = to_string!(del(._internal.kubernetes.event.message)) ._internal.message = replace(msg, "\n", s'\n') - ._internal."@timestamp" = .kubernetes.event.metadata.creationTimestamp + # Determine event timestamp: prefer lastTimestamp, then firstTimestamp (v1 events), + # fall back to eventTime (events.k8s.io/v1), then creationTimestamp + # lastTimestamp -> firstTimestamp -> eventTime -> creationTimestamp + ts = ._internal.kubernetes.event.lastTimestamp + if ts == null || ts == "" || ts == "0001-01-01T00:00:00Z" { + ts = ._internal.kubernetes.event.firstTimestamp + } + if ts == null || ts == "" || ts == "0001-01-01T00:00:00Z" { + ts = ._internal.kubernetes.event.eventTime + } + if ts == null || ts == "" { + ts = ._internal.kubernetes.event.metadata.creationTimestamp + } + ._internal."@timestamp" = ts + ._internal.timestamp = ts } else { log("Unable to merge EventRouter log message into record: " + err, level: "info") } diff --git a/internal/generator/vector/conf/complex_http_receiver.toml b/internal/generator/vector/conf/complex_http_receiver.toml index da1fa4802b..2b5c7bf66a 100644 --- a/internal/generator/vector/conf/complex_http_receiver.toml +++ b/internal/generator/vector/conf/complex_http_receiver.toml @@ -757,7 +757,21 @@ source = ''' # escape 'new line' symbol see: LOG-8090 msg = to_string!(del(._internal.kubernetes.event.message)) ._internal.message = replace(msg, "\n", s'\n') - ._internal."@timestamp" = .kubernetes.event.metadata.creationTimestamp + # Determine event timestamp: prefer lastTimestamp, then firstTimestamp (v1 events), + # fall back to eventTime (events.k8s.io/v1), then creationTimestamp + # lastTimestamp -> firstTimestamp -> eventTime -> creationTimestamp + ts = ._internal.kubernetes.event.lastTimestamp + if ts == null || ts == "" || ts == "0001-01-01T00:00:00Z" { + ts = ._internal.kubernetes.event.firstTimestamp + } + if ts == null || ts == "" || ts == "0001-01-01T00:00:00Z" { + ts = ._internal.kubernetes.event.eventTime + } + if ts == null || ts == "" { + ts = ._internal.kubernetes.event.metadata.creationTimestamp + } + ._internal."@timestamp" = ts + ._internal.timestamp = ts } else { log("Unable to merge EventRouter log message into record: " + err, level: "info") } diff --git a/internal/generator/vector/conf/container.toml b/internal/generator/vector/conf/container.toml index deff4128a9..7df673adfb 100644 --- a/internal/generator/vector/conf/container.toml +++ b/internal/generator/vector/conf/container.toml @@ -310,7 +310,21 @@ source = ''' # escape 'new line' symbol see: LOG-8090 msg = to_string!(del(._internal.kubernetes.event.message)) ._internal.message = replace(msg, "\n", s'\n') - ._internal."@timestamp" = .kubernetes.event.metadata.creationTimestamp + # Determine event timestamp: prefer lastTimestamp, then firstTimestamp (v1 events), + # fall back to eventTime (events.k8s.io/v1), then creationTimestamp + # lastTimestamp -> firstTimestamp -> eventTime -> creationTimestamp + ts = ._internal.kubernetes.event.lastTimestamp + if ts == null || ts == "" || ts == "0001-01-01T00:00:00Z" { + ts = ._internal.kubernetes.event.firstTimestamp + } + if ts == null || ts == "" || ts == "0001-01-01T00:00:00Z" { + ts = ._internal.kubernetes.event.eventTime + } + if ts == null || ts == "" { + ts = ._internal.kubernetes.event.metadata.creationTimestamp + } + ._internal."@timestamp" = ts + ._internal.timestamp = ts } else { log("Unable to merge EventRouter log message into record: " + err, level: "info") } diff --git a/internal/generator/vector/filter/openshift/viaq/v1/normalize.go b/internal/generator/vector/filter/openshift/viaq/v1/normalize.go index 6a8c3215e6..55c86ecd74 100644 --- a/internal/generator/vector/filter/openshift/viaq/v1/normalize.go +++ b/internal/generator/vector/filter/openshift/viaq/v1/normalize.go @@ -4,19 +4,33 @@ const ( HandleEventRouterLog = ` if exists(._internal.kubernetes.pod_name) && starts_with(string!(._internal.kubernetes.pod_name), "eventrouter-") { - + parsed, err = parse_json(._internal.message) if err != null { log("Unable to process EventRouter log: " + err, level: "info") } else { ._internal.event = parsed if exists(._internal.event.event) && is_object(._internal.event.event) { - ._internal.kubernetes.event = del(._internal.event.event) - ._internal.kubernetes.event.verb = del(._internal.event.verb) - # escape 'new line' symbol see: LOG-8090 - msg = to_string!(del(._internal.kubernetes.event.message)) - ._internal.message = replace(msg, "\n", s'\n') - ._internal."@timestamp" = .kubernetes.event.metadata.creationTimestamp + ._internal.kubernetes.event = del(._internal.event.event) + ._internal.kubernetes.event.verb = del(._internal.event.verb) + # escape 'new line' symbol see: LOG-8090 + msg = to_string!(del(._internal.kubernetes.event.message)) + ._internal.message = replace(msg, "\n", s'\n') + # Determine event timestamp: prefer lastTimestamp, then firstTimestamp (v1 events), + # fall back to eventTime (events.k8s.io/v1), then creationTimestamp + # lastTimestamp -> firstTimestamp -> eventTime -> creationTimestamp + ts = ._internal.kubernetes.event.lastTimestamp + if ts == null || ts == "" || ts == "0001-01-01T00:00:00Z" { + ts = ._internal.kubernetes.event.firstTimestamp + } + if ts == null || ts == "" || ts == "0001-01-01T00:00:00Z" { + ts = ._internal.kubernetes.event.eventTime + } + if ts == null || ts == "" { + ts = ._internal.kubernetes.event.metadata.creationTimestamp + } + ._internal."@timestamp" = ts + ._internal.timestamp = ts } else { log("Unable to merge EventRouter log message into record: " + err, level: "info") } @@ -34,22 +48,20 @@ if !exists(._internal.level) { message = ._internal.message # attempt 1: parse as logfmt (e.g. level=error msg="Failed to connect") - - parsed_logfmt, err = parse_logfmt(message) + parsed_logfmt, err = parse_logfmt(message) if err == null && is_string(parsed_logfmt.level) { level = downcase!(parsed_logfmt.level) } # attempt 2: parse as klog (e.g. I0920 14:22:00.089385 1 scheduler.go:592] "Successfully bound pod to node") if level == null { - parsed_klog, err = parse_klog(message) + parsed_klog, err = parse_klog(message) if err == null && is_string(parsed_klog.level) { level = parsed_klog.level } } - # attempt 3: parse with groks template (if previous attempts failed) for classic text logs like Logback, Log4j etc. - + # attempt 3: parse with groks template (if previous attempts failed) for classic text logs like Logback, Log4j etc. if level == null { parsed_grok, err = parse_groks(message, patterns: [ @@ -62,18 +74,17 @@ if !exists(._internal.level) { "_message": "%{GREEDYDATA:message}" } ) - if err == null && is_string(parsed_grok.level) { - level = downcase!(parsed_grok.level) + level = downcase!(parsed_grok.level) } } if level == null { level = "default" - + # attempt 4: Match on well known structured patterns # Order: emergency, alert, critical, error, warn, notice, info, debug, trace - + if match!(message, r'^EM[0-9]+|level=emergency|Value:emergency|"level":"emergency"') { level = "emergency" } else if match!(message, r'^A[0-9]+|level=alert|Value:alert|"level":"alert"') { @@ -93,7 +104,7 @@ if !exists(._internal.level) { } else if match!(message, r'^T[0-9]+|level=trace|Value:trace|"level":"trace"') { level = "trace" } - + # attempt 5: Match on the keyword that appears earliest in the message if level == "default" { level_patterns = r'(?i)(?emergency|)|(?alert|)|(?critical|)|(?error|)|(?warn(?:ing)?|)|(?notice|)|(?:\b(?info)\b|)|(?debug|)|(?trace|)' diff --git a/test/functional/normalization/eventrouter_test.go b/test/functional/normalization/eventrouter_test.go index d99c1ecbb8..8229adaa52 100644 --- a/test/functional/normalization/eventrouter_test.go +++ b/test/functional/normalization/eventrouter_test.go @@ -2,12 +2,15 @@ package normalization import ( "encoding/json" + "fmt" "strings" "time" "github.com/openshift/cluster-logging-operator/test/framework/functional" + "github.com/openshift/cluster-logging-operator/test/helpers/loki" "github.com/openshift/cluster-logging-operator/test/helpers/syslog" testruntime "github.com/openshift/cluster-logging-operator/test/runtime/observability" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" @@ -23,24 +26,30 @@ import ( var _ = Describe("[Functional][Normalization] Messages from EventRouter", func() { - const timestamp string = "1985-10-21T09:00:00.00000+00:00" var ( - framework *functional.CollectorFunctionalFramework - writeMsg func(msg string) error + framework *functional.CollectorFunctionalFramework + l *loki.Receiver + ts = time.Now().UTC() + templateForAnyKubernetesWithEvents = types.KubernetesWithEvent{ Kubernetes: functional.TemplateForAnyKubernetes, } + NewEventDataBuilder = func(verb, message string, podRef *corev1.ObjectReference) types.EventData { newEvent := types.NewEvent(podRef, corev1.EventTypeNormal, "reason", message) if verb == "UPDATED" { oldEvent := types.NewEvent(podRef, corev1.EventTypeWarning, "old_reason", "old_"+message) return types.EventData{Verb: "UPDATED", Event: newEvent, OldEvent: oldEvent} - } else { - return types.EventData{Verb: "ADDED", Event: newEvent} } + return types.EventData{Verb: "ADDED", Event: newEvent} } - ExpectedLogTemplateBuilder = func(event, oldEvent *corev1.Event) types.EventRouterLog { + ExpectedLogTemplateBuilder = func(event, oldEvent *corev1.Event, outputType obs.OutputType) types.EventRouterLog { + tsTruncated := ts.Truncate(time.Second) + timestamp := tsTruncated + if outputType == obs.OutputTypeLoki { + timestamp = time.Time{} + } tmpl := types.EventRouterLog{ Kubernetes: templateForAnyKubernetesWithEvents, ViaQCommon: types.ViaQCommon{ @@ -48,8 +57,8 @@ var _ = Describe("[Functional][Normalization] Messages from EventRouter", func() Level: types.AnyString, Hostname: types.AnyString, PipelineMetadata: types.PipelineMetadata{}, - Timestamp: time.Time{}, - TimestampLegacy: time.Time{}, + Timestamp: timestamp, + TimestampLegacy: tsTruncated, LogSource: string(obs.InfrastructureSourceContainer), LogType: string(obs.InputTypeApplication), Openshift: types.OpenshiftMeta{ @@ -77,7 +86,7 @@ var _ = Describe("[Functional][Normalization] Messages from EventRouter", func() parseLogs = func(raw []string, outputType obs.OutputType) ([]types.EventRouterLog, error) { var logs []types.EventRouterLog switch outputType { - case obs.OutputTypeHTTP: + case obs.OutputTypeHTTP, obs.OutputTypeLoki: err := types.StrictlyParseLogs(utils.ToJsonLogs(raw), &logs) return logs, err case obs.OutputTypeSyslog: @@ -94,19 +103,23 @@ var _ = Describe("[Functional][Normalization] Messages from EventRouter", func() ) DescribeTable("should be normalized to the ViaQ data model when sinking to different outputs", func(outputType obs.OutputType, verb, message, expectedMessage string) { + crioTimestamp := functional.CRIOTime(ts) framework = functional.NewCollectorFunctionalFramework() + if outputType == obs.OutputTypeLoki { + l = loki.NewReceiver(framework.Namespace, "loki-server") + Expect(l.Create(framework.Test.Client)).To(Succeed()) + } builder := testruntime.NewClusterLogForwarderBuilder(framework.Forwarder). FromInput(obs.InputTypeApplication) - if outputType == obs.OutputTypeHTTP { + switch outputType { + case obs.OutputTypeHTTP: builder.ToHttpOutput() - } - if outputType == obs.OutputTypeSyslog { + case obs.OutputTypeSyslog: builder.ToSyslogOutput(obs.SyslogRFC5424) + case obs.OutputTypeLoki: + builder.ToLokiOutput(*l.InternalURL("")) } - writeMsg = func(msg string) error { - return framework.WriteMessagesToApplicationLog(msg, 1) - } framework.VisitConfig = func(conf string) string { return strings.Replace(conf, `"eventrouter-"`, `"functional"`, 1) } @@ -114,19 +127,38 @@ var _ = Describe("[Functional][Normalization] Messages from EventRouter", func() podRef, err := reference.GetReference(scheme.Scheme, types.NewMockPod()) Expect(err).To(BeNil()) newEventData := NewEventDataBuilder(verb, message, podRef) + tsSec := ts.Truncate(time.Second) + newEventData.Event.FirstTimestamp = metav1.NewTime(tsSec) + newEventData.Event.LastTimestamp = metav1.NewTime(tsSec) + if verb == "UPDATED" { + tsOld := tsSec.Add(-10 * time.Minute) + newEventData.Event.FirstTimestamp = metav1.NewTime(tsOld) + newEventData.OldEvent.FirstTimestamp = metav1.NewTime(tsOld) + newEventData.OldEvent.LastTimestamp = metav1.NewTime(tsSec) + } jsonBytes, _ := json.Marshal(newEventData) jsonStr := string(jsonBytes) - msg := functional.NewCRIOLogMessage(timestamp, jsonStr, false) - err = writeMsg(msg) + msg := functional.NewCRIOLogMessage(crioTimestamp, jsonStr, false) + err = framework.WriteMessagesToApplicationLog(msg, 1) Expect(err).To(BeNil()) - raw, err := framework.ReadRawApplicationLogsFrom(string(outputType)) - Expect(err).To(BeNil(), "Expected no errors reading the logs") + var raw []string + if outputType == obs.OutputTypeLoki { + query := fmt.Sprintf(`{kubernetes_namespace_name=%q}`, framework.Namespace) + result, err := l.QueryUntil(query, "", 1) + Expect(err).To(BeNil()) + Expect(result).To(HaveLen(1)) + raw = result[0].Lines() + } else { + raw, err = framework.ReadRawApplicationLogsFrom(string(outputType)) + Expect(err).To(BeNil(), "Expected no errors reading the logs") + } + logs, err := parseLogs(raw, outputType) Expect(err).To(BeNil(), "Expected no errors parsing the logs") expectedEventData := newEventData expectedEventData.Event.Message = expectedMessage - var expectedLogTemplate = ExpectedLogTemplateBuilder(expectedEventData.Event, expectedEventData.OldEvent) + expectedLogTemplate := ExpectedLogTemplateBuilder(expectedEventData.Event, expectedEventData.OldEvent, outputType) Expect(logs[0]).To(matchers.FitLogFormatTemplate(expectedLogTemplate)) }, Entry("with HTTP output for ADDED events", obs.OutputTypeHTTP, "ADDED", "simple syslog message", "simple syslog message"), @@ -135,6 +167,49 @@ var _ = Describe("[Functional][Normalization] Messages from EventRouter", func() Entry("with Syslog output for UPDATED events", obs.OutputTypeSyslog, "UPDATED", "simple syslog message", "simple syslog message"), Entry("with Syslog output for ADDED events and new line symbol", obs.OutputTypeSyslog, "ADDED", "syslog message\n with new line", "syslog message\\n with new line"), Entry("with Syslog output for UPDATED events and new line symbol", obs.OutputTypeSyslog, "UPDATED", "syslog message\n with new line", "syslog message\\n with new line"), + Entry("with Loki output for ADDED events", obs.OutputTypeLoki, "ADDED", "simple syslog message", "simple syslog message"), + Entry("with Loki output for UPDATED events", obs.OutputTypeLoki, "UPDATED", "simple syslog message", "simple syslog message"), + ) + + DescribeTable("should use the correct timestamp fallback for @timestamp", func(setTimestamps func(event *corev1.Event, ts time.Time)) { + crioTimestamp := functional.CRIOTime(ts) + framework = functional.NewCollectorFunctionalFramework() + builder := testruntime.NewClusterLogForwarderBuilder(framework.Forwarder). + FromInput(obs.InputTypeApplication) + builder.ToHttpOutput() + + framework.VisitConfig = func(conf string) string { + return strings.Replace(conf, `"eventrouter-"`, `"functional"`, 1) + } + Expect(framework.Deploy()).To(BeNil()) + podRef, err := reference.GetReference(scheme.Scheme, types.NewMockPod()) + Expect(err).To(BeNil()) + newEventData := NewEventDataBuilder("ADDED", "test message", podRef) + setTimestamps(newEventData.Event, ts.Truncate(time.Second)) + jsonBytes, _ := json.Marshal(newEventData) + msg := functional.NewCRIOLogMessage(crioTimestamp, string(jsonBytes), false) + err = framework.WriteMessagesToApplicationLog(msg, 1) + Expect(err).To(BeNil()) + + raw, err := framework.ReadRawApplicationLogsFrom(string(obs.OutputTypeHTTP)) + Expect(err).To(BeNil(), "Expected no errors reading the logs") + logs, err := parseLogs(raw, obs.OutputTypeHTTP) + Expect(err).To(BeNil(), "Expected no errors parsing the logs") + Expect(logs[0].TimestampLegacy.IsZero()).To(BeFalse(), "Expected @timestamp to be set via timestamp fallback chain") + }, + Entry("lastTimestamp is set", func(event *corev1.Event, ts time.Time) { + event.LastTimestamp = metav1.NewTime(ts) + event.FirstTimestamp = metav1.NewTime(ts.Add(-5 * time.Minute)) + }), + Entry("lastTimestamp is zero, falls back to firstTimestamp", func(event *corev1.Event, ts time.Time) { + event.FirstTimestamp = metav1.NewTime(ts) + }), + Entry("lastTimestamp and firstTimestamp are zero, falls back to eventTime", func(event *corev1.Event, ts time.Time) { + event.EventTime = metav1.NewMicroTime(ts) + }), + Entry("all timestamps zero, falls back to creationTimestamp", func(event *corev1.Event, ts time.Time) { + event.CreationTimestamp = metav1.NewTime(ts) + }), ) AfterEach(func() { diff --git a/test/helpers/loki/receiver.go b/test/helpers/loki/receiver.go index 6f7a489e83..5a89929b2d 100644 --- a/test/helpers/loki/receiver.go +++ b/test/helpers/loki/receiver.go @@ -157,6 +157,9 @@ func (r *Receiver) Query(logQL string, orgID string, limit int) ([]StreamValues, q.Add("query", logQL) q.Add("limit", strconv.Itoa(limit)) q.Add("direction", "FORWARD") + now := time.Now() + q.Add("start", strconv.FormatInt(now.Add(-24*time.Hour).UnixNano(), 10)) + q.Add("end", strconv.FormatInt(now.Add(24*time.Hour).UnixNano(), 10)) u.RawQuery = q.Encode() log.V(3).Info("Loki Query", "url", u.String(), "org-id", orgID) header := http.Header{}