From 0599a1fcd2f89c80fa02dab5b927f94fb3e30ba9 Mon Sep 17 00:00:00 2001 From: Jinqing Kuang Date: Fri, 17 Apr 2026 14:12:19 +0800 Subject: [PATCH 1/2] fix(build): mark fast-lzma2 asm objects noexecstack Apply -Wa,--noexecstack to fast-lzma2 in both the external Makefile patch and the Conan build path. Co-authored-by: OpenAI Codex (GPT-5) --- cmake/in/lzma2.Makefile | 2 +- conan/fast-lzma2/conanfile.py | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/cmake/in/lzma2.Makefile b/cmake/in/lzma2.Makefile index 8ff6b34771b9..c949d5a00e6c 100644 --- a/cmake/in/lzma2.Makefile +++ b/cmake/in/lzma2.Makefile @@ -7,7 +7,7 @@ CC:=gcc AR:=ar -rcs RM:=rm -rf -ASFLAGS := +ASFLAGS := -Wa,--noexecstack SONAME:=libfast-lzma2.so.1 REAL_NAME:=libfast-lzma2.so.1.0 diff --git a/conan/fast-lzma2/conanfile.py b/conan/fast-lzma2/conanfile.py index f0e9c6d4324d..962a55b6b247 100644 --- a/conan/fast-lzma2/conanfile.py +++ b/conan/fast-lzma2/conanfile.py @@ -50,6 +50,7 @@ def build(self): # Build make command cflags = "-Wall -O2 -pthread" + asflags = "-Wa,--noexecstack" if self.options.get_safe("fPIC"): cflags += " -fPIC" @@ -59,7 +60,7 @@ def build(self): # Execute make compilation self.run( - f'make CFLAGS="{cflags}" CC={self.settings.get_safe("compiler", default="gcc")} libfast-lzma2', + f'make CFLAGS="{cflags}" ASFLAGS="{asflags}" CC={self.settings.get_safe("compiler", default="gcc")} libfast-lzma2', cwd=source_folder, ) From 4554448a834e05a0e5115e2034b784b8377ebe8d Mon Sep 17 00:00:00 2001 From: Jinqing Kuang Date: Wed, 15 Apr 2026 17:35:27 +0800 Subject: [PATCH 2/2] feat(stream): support nested event window sub-events Add recursive EVENT_WINDOW START WITH parsing, runtime conditionPath propagation, and _event_condition_path placeholder support. Co-authored-by: OpenAI Codex GPT-5 --- docs/en/06-advanced/03-stream.md | 6 +- docs/en/14-reference/03-taos-sql/41-stream.md | 63 +- docs/zh/06-advanced/03-stream.md | 22 +- docs/zh/14-reference/03-taos-sql/41-stream.md | 67 +- include/common/streamMsg.h | 2 + include/libs/function/functionMgt.h | 5 +- source/common/src/msg/streamMsg.c | 10 +- source/libs/function/src/builtins.c | 19 + source/libs/function/src/functionMgt.c | 21 +- source/libs/new-stream/inc/streamInt.h | 2 +- .../libs/new-stream/inc/streamTriggerTask.h | 17 + .../libs/new-stream/src/streamTriggerTask.c | 846 +++++++++++++++--- source/libs/new-stream/src/streamUtil.c | 6 +- source/libs/parser/inc/sql.y | 21 +- source/libs/parser/src/parTokenizer.c | 1 + source/libs/parser/src/parTranslater.c | 29 +- source/libs/parser/test/parStreamTest.cpp | 81 ++ source/libs/scalar/src/scalar.c | 48 + .../03-TriggerMode/test_event_new.py | 38 + 19 files changed, 1112 insertions(+), 192 deletions(-) diff --git a/docs/en/06-advanced/03-stream.md b/docs/en/06-advanced/03-stream.md index 8142eb1aa804..06282e3e506a 100644 --- a/docs/en/06-advanced/03-stream.md +++ b/docs/en/06-advanced/03-stream.md @@ -33,10 +33,12 @@ trigger_type: { | INTERVAL(interval_val[, interval_offset]) SLIDING(sliding_val[, offset_time]) | SESSION(ts_col, session_val) | STATE_WINDOW(expr [, extend[, zeroth_state]]) [TRUE_FOR(true_for_expr)] - | EVENT_WINDOW(START WITH start_condition END WITH end_condition) [TRUE_FOR(true_for_expr)] + | EVENT_WINDOW(START WITH start_event_item [END WITH end_condition]) [TRUE_FOR(true_for_expr)] | COUNT_WINDOW(count_val[, sliding_val][, col1[, ...]]) } +start_event_item: start_condition | (start_event_item, start_event_item [, ...]) + true_for_expr: { duration_time | COUNT count_val @@ -60,6 +62,8 @@ tag_definition: tag_name type_name [COMMENT 'string_value'] AS expr ``` +For `EVENT_WINDOW`, `start_event_item` supports multi-level nested sub-events. For example, `START WITH ((cond_1a, cond_1b), cond_2)`. When a stream calculation uses a sub-event structure, the `_event_condition_path` placeholder can be used in the query part to obtain the static path of the currently matched node in the start-condition tree. + ### Trigger Methods - Periodic trigger: drives execution by fixed intervals of system time. The baseline is midnight of the day the stream is created, and subsequent trigger times are determined by the specified interval. A time offset can be applied to adjust the baseline. diff --git a/docs/en/14-reference/03-taos-sql/41-stream.md b/docs/en/14-reference/03-taos-sql/41-stream.md index 54567e091b2e..58b64cae91d4 100755 --- a/docs/en/14-reference/03-taos-sql/41-stream.md +++ b/docs/en/14-reference/03-taos-sql/41-stream.md @@ -30,11 +30,12 @@ trigger_type: { | INTERVAL(interval_val[, interval_offset]) SLIDING(sliding_val[, offset_time]) | SESSION(ts_col, session_val) | STATE_WINDOW(expr[, extend[, zeroth_state]]) [TRUE_FOR(true_for_expr)] - | EVENT_WINDOW(START WITH start_condition END WITH end_condition) [TRUE_FOR(true_for_expr)] - | EVENT_WINDOW(START WITH (start_condition_1, start_condition_2 [,...]) [END WITH end_condition]) [TRUE_FOR(true_for_expr)] + | EVENT_WINDOW(START WITH start_event_item [END WITH end_condition]) [TRUE_FOR(true_for_expr)] | COUNT_WINDOW(count_val[, sliding_val][, col1[, ...]]) } +start_event_item: start_condition | (start_event_item, start_event_item [, ...]) + true_for_expr: { duration_time | COUNT count_val @@ -192,13 +193,13 @@ Applicable Scenarios: Suitable for use cases where computations and/or notificat ##### Event Window Trigger ```sql -EVENT_WINDOW(START WITH start_condition END WITH end_condition) [TRUE_FOR(true_for_expr)] +EVENT_WINDOW(START WITH start_condition [END WITH end_condition]) [TRUE_FOR(true_for_expr)] ``` An event window trigger partitions the incoming data of the trigger table into windows based on defined event start and end conditions, and triggers when the window opens and/or closes. Parameter definitions are as follows: - start_condition: Definition of the event start condition. It can be any valid conditional expression. -- end_condition: Definition of the event end condition. It can be any valid conditional expression. +- end_condition: Optional. Definition of the event end condition. It can be any valid conditional expression. - true_for_expr (optional): Specifies the filtering condition for windows. Only windows that meet the condition will generate a trigger. Supports the following four modes: - `TRUE_FOR(duration_time)`: Filters based on duration only. The window duration must be greater than or equal to `duration_time`. - `TRUE_FOR(COUNT n)`: Filters based on row count only. The window row count must be greater than or equal to `n`. @@ -225,16 +226,18 @@ CREATE STREAM s_tag_event Applicable Scenarios: Suitable for use cases where computations and/or notifications need to be driven by event windows. -##### Event Window Trigger (with Sub-Event Window Support) +##### Event Window Trigger (with Multi-Level Sub-Event Support) ```sql -EVENT_WINDOW(START WITH (start_condition_1, start_condition_2 [,...] [END WITH end_condition]) [TRUE_FOR(true_for_expr)] +EVENT_WINDOW(START WITH start_event_item [END WITH end_condition]) [TRUE_FOR(true_for_expr)] + +start_event_item: start_condition | (start_event_item, start_event_item [, ...]) ``` -An event window trigger partitions the incoming data of the trigger table into windows based on event windows. It now supports specifying multiple start conditions and can further subdivide and manage sub-event windows within the original event window based on changes in the effective trigger condition, while introducing the concept of a parent event window to aggregate related sub-event windows. Parameter definitions are as follows: +An event window trigger now supports recursively nested start-event groups. A start item can be a single valid condition expression or a group made of multiple `start_event_item` entries, and groups can be nested further. The engine evaluates the entire start-condition tree in SQL order and preserves the hierarchy of parent groups and leaf conditions in notifications and stream calculations. -- start_condition_1, start_condition_2 [, ...]: Defines multiple event start conditions. The event window opens when any one of these conditions is satisfied. The system evaluates these conditions in order from first to last, and the first satisfied condition becomes the "effective trigger condition". When all start_conditions are not satisfied, both the parent window and the last sub-window close. -- end_condition: Definition of the event end condition. When this condition is satisfied, both the current parent window and the last sub-window close. This parameter is now optional. +- start_event_item: A start-event item. It can be a single start condition or a grouped list. Group items can themselves contain nested groups. +- end_condition: Optional. When it is satisfied, the active leaf window and all affected ancestor group windows are closed in hierarchical order. - true_for_expr (optional): Specifies the filtering condition for windows. Only windows that meet the condition will generate a trigger. Supports the following four modes: - `TRUE_FOR(duration_time)`: Filters based on duration only. The window duration must be greater than or equal to `duration_time`. - `TRUE_FOR(COUNT n)`: Filters based on row count only. The window row count must be greater than or equal to `n`. @@ -248,17 +251,23 @@ Usage Notes: - A trigger table must be specified. When the trigger table is a supertable, grouping by tags or subtables is supported, as well as no grouping. - When used with a supertable, it must be combined with PARTITION BY tbname. - Supports conditional window triggering after filtering the written data. -- The multiple `start_condition` expressions and the optional `end_condition` can also reference tag columns visible in the trigger-table context. -- Parent and sub-window behavior: - - No parent/sub-windows: During the event window opening period, if the effective trigger condition does not change, only one window is produced. The system treats it as a regular event window, without generating the concept of parent/sub-windows. - - Sub-windows: When a specific start_condition becomes the effective trigger condition, a sub-window opens. If the effective trigger condition changes, or when the end_condition is satisfied, the current sub-window closes. Sub-windows do not overlap with each other. - - Parent window: A parent window only opens when the second sub-window opens. The parent window's start time is the start time of the first sub-window, and its end time is the end time of the last sub-window. It closes when all start_conditions are not satisfied, or when the end_condition is satisfied. -- Notification message extensions: In the window open (WINDOW_OPEN) notification message, two new fields are added: - - conditionIndex: The index number of the start condition that triggered the current window opening, counting from 0. For a parent window, its value is the same as the first sub-window's value. - - windowIndex: The index number of the sub-event window within the parent window, counting from 0. If it is not a sub-window (i.e., a regular event window or parent window), this field value is -1. +- `start_event_item` and the optional `end_condition` can reference tag columns visible in the trigger-table context. +- Each node in the start-condition tree is assigned a stable static path named `conditionPath`. Paths are generated in SQL order, and sibling nodes are numbered from `0`. For `START WITH ((a, b, c), d)`: + - Group `(a, b, c)` has `conditionPath = "0"` + - `a` has `conditionPath = "0.0"` + - `b` has `conditionPath = "0.1"` + - `c` has `conditionPath = "0.2"` + - `d` has `conditionPath = "1"` +- `conditionIndex` is redefined as the local index of the current node under its parent, which is always the last segment of `conditionPath`. +- When the active branch switches from one subtree to another, the engine closes the current leaf window first, then closes the affected ancestor group windows, and finally opens the windows for the new branch. +- Notification payloads no longer include `windowIndex`. Event-window node identification is now expressed with `conditionPath + conditionIndex`. +- A new placeholder, `_event_condition_path`, is available in stream calculations: + - It is only valid in `EVENT_WINDOW` stream calculations. + - `START WITH` must use a sub-event structure, either single-level or nested. + - The value is the static path string of the currently triggered node, such as `0` or `0.1`. - The TRUE_FOR option applies to both sub-windows and parent windows, meaning windows (whether sub-windows or parent windows) shorter than the duration limit will be directly ignored. When some sub-windows under a parent window do not meet the TRUE_FOR condition, the valid sub-windows may not be consecutive. If only 1 sub-window under a parent window meets the TRUE_FOR condition, the parent/sub-window structure is still retained and triggers notifications and computations. -Applicable Scenarios: Suitable for use cases where computations and/or notifications need to be driven by event windows, especially in IoT and industrial data management fields where fine-grained monitoring and analysis of events based on multiple dynamically changing conditions is required. For example, in equipment fault alarms, multiple alarm level conditions (such as "load above 90" and "load above 60") can be defined, and when alarm levels change, the escalation or de-escalation of alarm states can be clearly tracked. +Applicable Scenarios: Suitable for use cases where computations and/or notifications need to be driven by event windows, especially when multiple dynamic conditions must be modeled with explicit hierarchy. For example, an alarm stream can first define a high-level alarm group and then subdivide it into nested severity levels, while `conditionPath` or `_event_condition_path` identifies the exact branch that matched. ##### Count Window Trigger @@ -373,6 +382,7 @@ When performing calculations, you may need to use contextual information from th | Window Trigger | _twrownum | Number of rows in currently open window. Used only with WINDOW_CLOSE trigger. | | Idle Trigger | _tidlestart | The time (processing time) of the last data received by the group before it entered idle state. Nanosecond precision Unix epoch. Applicable only for IDLE/RESUME triggers. Cannot be mixed with `_twstart/_twend`. Since output tables are usually millisecond-precision, use `cast(_tidlestart/1000000 as timestamp)` to convert. | | Idle Trigger | _tidleend | The trigger time of the IDLE or RESUME event. Nanosecond precision Unix epoch. Applicable only for IDLE/RESUME triggers. Cannot be mixed with `_twstart/_twend`. Since output tables are usually millisecond-precision, use `cast(_tidleend/1000000 as timestamp)` to convert.| +| Event Window Trigger | _event_condition_path | Static path of the currently triggered node in the event-window start-condition tree. Valid only for `EVENT_WINDOW` stream calculations that use a sub-event structure. Returns a string such as `0` or `0.1`. | | All | _tgrpid | ID of trigger group (data type BIGINT) | | All | _tlocaltime | System time of current trigger (nanosecond precision) | | All | %%n | Reference to trigger group column
n is the column number in `[PARTITION BY col1[, ...]]`, starting with 1 | @@ -384,6 +394,7 @@ Usage Restrictions: - %%trows: Can only be used in the FROM clause. Queries that use %%trows do not support WHERE condition filtering or join operations on %%trows. - %%tbname: Can be used in the FROM, SELECT, and WHERE clauses. - Other placeholders: Can only be used in the SELECT and WHERE clauses. +- `_event_condition_path`: Only valid in `EVENT_WINDOW` stream calculations where `START WITH` uses a sub-event structure. It is illegal for a single plain start condition. ### Stream Processing Control Options @@ -504,8 +515,9 @@ An example structure of a notification message is shown below: "groupId": "7533998559487590581", "windowStart": 1733284800000, "triggerCondition": { + "conditionPath": "0.0", "conditionIndex": 0, - "fieldValue": { + "fieldValues": { "c1": 10, "c2": 15 } @@ -521,8 +533,9 @@ An example structure of a notification message is shown below: "windowStart": 1733284800000, "windowEnd": 1733284810000, "triggerCondition": { + "conditionPath": "0.1", "conditionIndex": 1, - "fieldValue": { + "fieldValues": { "c1": 20, "c2": 3 } @@ -621,14 +634,16 @@ These fields apply only when triggerType is Event. - If eventType = WINDOW_OPEN, the event object includes: - windowStart: Long integer timestamp indicating the window’s start time. Precision matches the time precision of the result table. - triggerCondition: Information about the condition that opened the window, including: - - conditionIndex: Integer. The index of the condition that triggered the window open, starting from 0. - - fieldValue: Key–value pairs containing the condition column names and their corresponding values. + - conditionPath: String. The static path of the current node in the start-condition tree. + - conditionIndex: Integer. The local index of the current node under its parent, equal to the last segment of `conditionPath`. + - fieldValues: Key–value pairs containing the columns referenced by the event-window start-condition tree and their values. - If eventType = WINDOW_CLOSE, the event object includes: - windowStart: Long integer timestamp indicating the window’s start time. Precision matches the time precision of the result table. - windowEnd: Long integer timestamp indicating the window’s end time. Precision matches the time precision of the result table. - triggerCondition: Information about the condition that closed the window, including: - - conditionIndex: Integer. The index of the condition that triggered the window close, starting from 0. - - fieldValue: Key–value pairs containing the condition column names and their corresponding values. + - conditionPath: String. The static path of the current node in the start-condition tree. + - conditionIndex: Integer. The local index of the current node under its parent, equal to the last segment of `conditionPath`. + - fieldValues: Key–value pairs containing the condition-related column values carried by the close notification. - result: The computation result, expressed as key–value pairs containing the names of the result columns and their corresponding values. ##### Fields for Count Windows diff --git a/docs/zh/06-advanced/03-stream.md b/docs/zh/06-advanced/03-stream.md index 37a1213ebfa0..50c525c58ef6 100644 --- a/docs/zh/06-advanced/03-stream.md +++ b/docs/zh/06-advanced/03-stream.md @@ -32,12 +32,14 @@ options: { trigger_type: { PERIOD(period_time[, offset_time]) | SLIDING(sliding_val[, offset_time]) - | INTERVAL(interval_val[, interval_offset]) SLIDING(sliding_val[, offset_time]) - | SESSION(ts_col, session_val) - | STATE_WINDOW(expr [, extend[, zeroth_state]]) [TRUE_FOR(true_for_expr)] - | EVENT_WINDOW(START WITH start_condition END WITH end_condition) [TRUE_FOR(true_for_expr)] - | COUNT_WINDOW(count_val[, sliding_val][, col1[, ...]]) -} + | INTERVAL(interval_val[, interval_offset]) SLIDING(sliding_val[, offset_time]) + | SESSION(ts_col, session_val) + | STATE_WINDOW(expr [, extend[, zeroth_state]]) [TRUE_FOR(true_for_expr)] + | EVENT_WINDOW(START WITH start_event_item [END WITH end_condition]) [TRUE_FOR(true_for_expr)] + | COUNT_WINDOW(count_val[, sliding_val][, col1[, ...]]) +} + +start_event_item: start_condition | (start_event_item, start_event_item [, ...]) true_for_expr: { duration_time @@ -56,11 +58,13 @@ notify_option: [NOTIFY_HISTORY | ON_FAILURE_PAUSE] event_types: event_type [|event_type] -event_type: {WINDOW_OPEN | WINDOW_CLOSE | IDLE | RESUME} +event_type: {WINDOW_OPEN | WINDOW_CLOSE | IDLE | RESUME} tag_definition: - tag_name type_name [COMMENT 'string_value'] AS expr -``` + tag_name type_name [COMMENT 'string_value'] AS expr +``` + +其中 `EVENT_WINDOW` 的 `start_event_item` 支持多级子事件嵌套。例如 `START WITH ((cond_1a, cond_1b), cond_2)`。当流计算使用了子事件结构时,可在查询部分使用 `_event_condition_path` 获取当前命中节点在开始条件树中的静态路径。 ### 触发方式 diff --git a/docs/zh/14-reference/03-taos-sql/41-stream.md b/docs/zh/14-reference/03-taos-sql/41-stream.md index 9e2365cac8ea..e22ed3b618e6 100755 --- a/docs/zh/14-reference/03-taos-sql/41-stream.md +++ b/docs/zh/14-reference/03-taos-sql/41-stream.md @@ -27,14 +27,15 @@ options: { trigger_type: { PERIOD(period_time[, offset_time]) | SLIDING(sliding_val[, offset_time]) - | INTERVAL(interval_val[, interval_offset]) SLIDING(sliding_val[, offset_time]) + | INTERVAL(interval_val[, interval_offset]) SLIDING(sliding_val[, offset_time]) | SESSION(ts_col, session_val) | STATE_WINDOW(expr [, extend[, zeroth_state]]) [TRUE_FOR(true_for_expr)] - | EVENT_WINDOW(START WITH start_condition END WITH end_condition) [TRUE_FOR(true_for_expr)] - | EVENT_WINDOW(START WITH (start_condition_1, start_condition_2 [,...]) [END WITH end_condition]) [TRUE_FOR(true_for_expr)] - | COUNT_WINDOW(count_val[, sliding_val][, col1[, ...]]) + | EVENT_WINDOW(START WITH start_event_item [END WITH end_condition]) [TRUE_FOR(true_for_expr)] + | COUNT_WINDOW(count_val[, sliding_val][, col1[, ...]]) } +start_event_item: start_condition | (start_event_item, start_event_item [, ...]) + true_for_expr: { duration_time | COUNT count_val @@ -192,13 +193,13 @@ CREATE STREAM s_tag_state ##### 事件窗口触发 ```sql -EVENT_WINDOW(START WITH start_condition END WITH end_condition) [TRUE_FOR(true_for_expr)] +EVENT_WINDOW(START WITH start_condition [END WITH end_condition]) [TRUE_FOR(true_for_expr)] ``` 事件窗口触发是指对触发表的写入数据按照事件窗口的方式进行窗口划分,当窗口启动和(或)关闭时进行的触发。各参数含义如下: - start_condition:事件开始条件的定义,可以是任意合法条件表达式。 -- end_condition:事件结束条件的定义,可以是任意合法条件表达式。 +- end_condition:可选,事件结束条件的定义,可以是任意合法条件表达式。 - true_for_expr:可选,指定窗口的过滤条件,只有满足条件的窗口才会产生触发。支持以下四种模式: - `TRUE_FOR(duration_time)`:仅基于持续时长过滤,窗口持续时长必须大于等于 `duration_time`。 - `TRUE_FOR(COUNT n)`:仅基于数据行数过滤,窗口数据行数必须大于等于 `n`。 @@ -225,16 +226,18 @@ CREATE STREAM s_tag_event 适用场景:需要通过事件窗口驱动计算和(或)通知的场景。 -##### 事件窗口触发 (支持子事件窗口) +##### 事件窗口触发(支持多级子事件) ```sql -EVENT_WINDOW(START WITH (start_condition_1, start_condition_2 [,...] [END WITH end_condition]) [TRUE_FOR(true_for_expr)] +EVENT_WINDOW(START WITH start_event_item [END WITH end_condition]) [TRUE_FOR(true_for_expr)] + +start_event_item: start_condition | (start_event_item, start_event_item [, ...]) ``` -事件窗口触发是指对触发表的写入数据按照事件窗口的方式进行窗口划分,它现在支持指定多个开始条件,并能根据有效触发条件的变化,在原有的事件窗口内进一步划分和管理子事件窗口,同时引入父事件窗口的概念来聚合相关的子事件窗口。各参数含义如下: +事件窗口触发支持将开始事件递归拆分为多级子事件。开始条件既可以是单个合法条件表达式,也可以是由多个 `start_event_item` 组成的分组;分组内部仍然可以继续嵌套分组。系统会按 SQL 书写顺序递归评估整棵开始条件树,并在窗口通知与流计算中保留父分组和叶子条件的层级语义。各参数含义如下: -- start_condition_1, start_condition_2 [,...]:定义多个事件开始条件。当任何一个条件满足时,事件窗口开启。系统会从前往后依次评估这些条件,第一个满足的条件即为“有效触发条件”。当所有 start_condition 都不满足时,父窗口和最后一个子窗口关闭。 -- end_condition:事件结束条件的定义。当该条件满足时,当前父窗口和最后一个子窗口均关闭。该参数现在是可选的。 +- start_event_item:开始事件项。可以是单个开始条件,也可以是分组。分组中的子项支持继续嵌套。 +- end_condition:可选,事件结束条件的定义。当该条件满足时,当前活动叶子窗口和受影响的父分组窗口会按层级顺序关闭。 - true_for_expr:可选,指定窗口的过滤条件,只有满足条件的窗口才会产生触发。支持以下四种模式: - `TRUE_FOR(duration_time)`:仅基于持续时长过滤,窗口持续时长必须大于等于 `duration_time`。 - `TRUE_FOR(COUNT n)`:仅基于数据行数过滤,窗口数据行数必须大于等于 `n`。 @@ -248,17 +251,23 @@ EVENT_WINDOW(START WITH (start_condition_1, start_condition_2 [,...] [END WITH e - 必须指定触发表,触发表为超级表时支持按标签、子表分组,支持不分组。 - 搭配超级表时,必须与 `partition by tbname` 一起使用。 - 支持对写入数据进行处理过滤后(有条件)的窗口触发。 -- 多个 `start_condition` 以及可选的 `end_condition` 同样可以引用触发表上下文中可见的 tag 列。 -- 父子窗口行为: - - 没有父/子窗口:在事件窗口开启期间,如果有效触发条件没有变化,则只产生一个窗口,系统将其视为常规事件窗口,不产生父/子窗口的概念。 - - 子窗口:当某一个具体的 start_condition 成为有效触发条件时,会开启一个子窗口。如果有效触发条件发生变化,或者 end_condition 满足时,当前子窗口关闭。子窗口之间不重叠。 - - 父窗口:仅当第二个子窗口开启时,才会开启父窗口。父窗口的起始时间为第一个子窗口的起始时间,结束时间为最后一个子窗口的结束时间,当所有 start_condition 都不满足,或者 end_condition 满足时关闭。 -- 通知消息扩展:在窗口开启(WINDOW_OPEN)的通知消息中,新增两个字段: - - conditionIndex:触发当前窗口开启的开始条件的序号,从 0 开始计数。对于父窗口,其值与第一个子窗口的值相同。 - - windowIndex:子事件窗口在父窗口中的序号,从 0 开始计数。如果不是子窗口(即常规事件窗口或父窗口),该字段值为 -1。 +- `start_event_item` 与可选的 `end_condition` 同样可以引用触发表上下文中可见的 tag 列。 +- 系统会为开始条件树中的每个节点分配一个稳定的静态路径 `conditionPath`。路径按 SQL 书写顺序生成,兄弟节点从 `0` 开始编号。例如 `START WITH ((a, b, c), d)` 中: + - 分组 `(a, b, c)` 的 `conditionPath` 为 `0` + - `a` 的 `conditionPath` 为 `0.0` + - `b` 的 `conditionPath` 为 `0.1` + - `c` 的 `conditionPath` 为 `0.2` + - `d` 的 `conditionPath` 为 `1` +- `conditionIndex` 的含义调整为“当前节点在父节点下的本地索引”,其值恒等于 `conditionPath` 最后一段。 +- 当活动分支从一棵子树切换到另一棵子树时,会先关闭当前叶子窗口,再关闭受影响的祖先分组窗口,最后打开新分支对应的分组和叶子窗口。 +- 通知消息不再输出 `windowIndex`。事件窗口相关的节点定位统一使用 `conditionPath + conditionIndex`。 +- 在流计算 SQL 中新增占位符 `_event_condition_path`: + - 仅可用于 `EVENT_WINDOW` 流计算。 + - 且要求 `START WITH` 使用了子事件结构(无论是一层还是多级嵌套)。 + - 其值为当前触发节点的静态路径字符串,例如 `0`、`0.1`。 - TRUE_FOR 选项对子窗口和父窗口均生效,即小于该时长限制的窗口(无论是子窗口还是父窗口)将直接被忽略。当父窗口下有部分子窗口不满足 TRUE_FOR 条件时,有效的子窗口可能不是连续的。如果父窗口下仅有 1 个子窗口满足 TRUE_FOR 条件,父/子窗口仍保留并触发通知和计算。 -适用场景:需要通过事件窗口驱动计算和(或)通知的场景,尤其适用于需要根据多个动态变化的条件来精细化监控和分析事件的物联网、工业数据管理等领域。例如,设备故障告警,可以定义多个告警级别条件(如“负载高于 90”、“负载高于 60”),并在告警级别变化时,清晰地追踪告警状态的升级或降级。 +适用场景:需要通过事件窗口驱动计算和(或)通知的场景,尤其适用于需要根据多个动态变化条件、并且需要保留条件层级关系来精细化监控和分析事件的物联网、工业数据管理等领域。例如,设备告警可以先定义“高优先级告警组”,再在组内细分不同告警等级,并通过 `conditionPath` 或 `_event_condition_path` 明确区分当前命中的告警分支。 ##### 计数窗口触发 @@ -373,6 +382,7 @@ tag_definition: | 窗口触发 | _twrownum | 本次触发窗口的记录条数,只适用于 `WINDOW_CLOSE` 触发使用 | | 空闲触发 | _tidlestart | 分组进入空闲前最后一次收到数据的时间(processing time,精度:ns)。只适用于 `IDLE`/`RESUME` 触发使用,不可与 `_twstart/_twend` 混用。由于输出表通常为 ms 精度,建议使用 `cast(_tidlestart/1000000 as timestamp)` 进行转换。 | | 空闲触发 | _tidleend | IDLE 或 RESUME 事件的触发时间(精度:ns)。只适用于 `IDLE`/`RESUME` 触发使用,不可与 `_twstart/_twend` 混用。由于输出表通常为 ms 精度,建议使用 `cast(_tidleend/1000000 as timestamp)` 进行转换。 | +| 事件窗口触发 | _event_condition_path | 当前事件窗口触发节点在开始条件树中的静态路径。只适用于使用了子事件结构的 `EVENT_WINDOW` 流计算,返回值类型为字符串,例如 `0`、`0.1`。 | | 通用 | _tgrpid | 触发分组的 ID 值,类型为 BIGINT | | 通用 | _tlocaltime | 本次触发时刻的系统时间(精度:ns) | | 通用 | %%n | 触发分组列的引用
n 为分组列(来自 `[PARTITION BY col1[, ...]]`)的下标(从 1 开始) | @@ -384,6 +394,7 @@ tag_definition: - %%trows:只能用于 FROM 子句,在使用 %%trows 的语句中不支持 where 条件过滤,不支持对 %%trows 进行关联查询。 - %%tbname:可以用于 FROM、SELECT 和 WHERE 子句。 - 其他占位符:只能用于 SELECT 和 WHERE 子句。 +- `_event_condition_path`:仅可用于 `EVENT_WINDOW` 流计算,且 `START WITH` 必须使用子事件结构。对于单个普通开始条件的事件窗口,该占位符非法。 ### 流式计算的控制选项 @@ -504,8 +515,9 @@ event_type: {WINDOW_OPEN | WINDOW_CLOSE | ON_TIME | IDLE | RESUME} "groupId": "7533998559487590581", "windowStart": 1733284800000, "triggerCondition": { + "conditionPath": "0.0", "conditionIndex": 0, - "fieldValue": { + "fieldValues": { "c1": 10, "c2": 15 } @@ -521,8 +533,9 @@ event_type: {WINDOW_OPEN | WINDOW_CLOSE | ON_TIME | IDLE | RESUME} "windowStart": 1733284800000, "windowEnd": 1733284810000, "triggerCondition": { + "conditionPath": "0.1", "conditionIndex": 1, - "fieldValue": { + "fieldValues": { "c1": 20, "c2": 3 } @@ -621,14 +634,16 @@ event_type: {WINDOW_OPEN | WINDOW_CLOSE | ON_TIME | IDLE | RESUME} - 如果 eventType 为 WINDOW_OPEN,则包含如下字段: - windowStart:长整型时间戳,表示窗口的开始时间,精度与结果表的时间精度一致。 - triggerCondition:触发窗口开始的条件信息,包括以下字段: - - conditionIndex:整型,表示满足的触发窗口开始的条件的索引,从 0 开始编号。 - - fieldValue:键值对形式,包含条件列列名及其对应的值。 + - conditionPath:字符串,表示当前触发节点在开始条件树中的静态路径。 + - conditionIndex:整型,表示当前节点在父节点下的本地索引,等于 `conditionPath` 的最后一段。 + - fieldValues:键值对形式,包含事件窗口开始条件树引用到的列及其对应的值。 - 如果 eventType 为 WINDOW_CLOSE,则包含如下字段: - windowStart:长整型时间戳,表示窗口的开始时间,精度与结果表的时间精度一致。 - windowEnd:长整型时间戳,表示窗口的结束时间,精度与结果表的时间精度一致。 - triggerCondition:触发窗口关闭的条件信息,包括以下字段: - - conditionIndex:整型,表示满足的触发窗口关闭的条件的索引,从 0 开始编号。 - - fieldValue:键值对形式,包含条件列列名及其对应的值。 + - conditionPath:字符串,表示当前关闭节点在开始条件树中的静态路径。 + - conditionIndex:整型,表示当前节点在父节点下的本地索引,等于 `conditionPath` 的最后一段。 + - fieldValues:键值对形式,包含事件窗口结束时通知所携带的条件列值。 - result:计算结果,为键值对形式,包含窗口计算的结果列列名及其对应的值。 ###### 计数窗口相关字段 diff --git a/include/common/streamMsg.h b/include/common/streamMsg.h index 49af02aa5251..251a52e92d11 100644 --- a/include/common/streamMsg.h +++ b/include/common/streamMsg.h @@ -60,6 +60,7 @@ typedef struct STokenBucket STokenBucket; #define PLACE_HOLDER_GRPID BIT_FLAG_MASK(13) #define PLACE_HOLDER_IDLE_START BIT_FLAG_MASK(14) #define PLACE_HOLDER_IDLE_END BIT_FLAG_MASK(15) +#define PLACE_HOLDER_EVENT_CONDITION_PATH BIT_FLAG_MASK(16) #define CREATE_STREAM_FLAG_NONE 0 #define CREATE_STREAM_FLAG_TRIGGER_VIRTUAL_STB BIT_FLAG_MASK(0) @@ -904,6 +905,7 @@ typedef struct SSTriggerCalcParam { int64_t triggerTime; // _tlocaltime int32_t notifyType; // See also: ESTriggerEventType + char* conditionPath; // _event_condition_path char* extraNotifyContent; // NULL if not available char* resultNotifyContent; // does not serialize SArray* pExternalWindowData; diff --git a/include/libs/function/functionMgt.h b/include/libs/function/functionMgt.h index 6987ee1b2c35..e4f663f349b2 100644 --- a/include/libs/function/functionMgt.h +++ b/include/libs/function/functionMgt.h @@ -208,8 +208,9 @@ typedef enum EFunctionType { FUNCTION_TYPE_IMPUTATION_ROWTS, FUNCTION_TYPE_IMPUTATION_MARK, FUNCTION_TYPE_ANOMALY_MARK, - FUNCTION_TYPE_TIDLESTART, // _tidlestart - FUNCTION_TYPE_TIDLEEND, // _tidleend + FUNCTION_TYPE_TIDLESTART, // _tidlestart + FUNCTION_TYPE_TIDLEEND, // _tidleend + FUNCTION_TYPE_EVENT_CONDITION_PATH, // _event_condition_path // internal function FUNCTION_TYPE_SELECT_VALUE = 3750, diff --git a/source/common/src/msg/streamMsg.c b/source/common/src/msg/streamMsg.c index df18075298dc..f32cafa630a5 100644 --- a/source/common/src/msg/streamMsg.c +++ b/source/common/src/msg/streamMsg.c @@ -3592,10 +3592,12 @@ static int32_t tSerializeSTriggerCalcParam(SEncoder* pEncoder, SArray* pParams, TAOS_MEMCPY(pEncoder->data + pEncoder->pos, param, plainFieldSize); } pEncoder->pos += plainFieldSize; + uint64_t len = (param->conditionPath != NULL) ? strlen(param->conditionPath) + 1 : 0; + TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, (uint8_t*)param->conditionPath, len)); if (!ignoreNotificationInfo) { TAOS_CHECK_EXIT(tEncodeI32(pEncoder, param->notifyType)); - uint64_t len = (param->extraNotifyContent != NULL) ? strlen(param->extraNotifyContent) + 1 : 0; + len = (param->extraNotifyContent != NULL) ? strlen(param->extraNotifyContent) + 1 : 0; TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, (uint8_t*)param->extraNotifyContent, len)); } } @@ -3605,6 +3607,9 @@ static int32_t tSerializeSTriggerCalcParam(SEncoder* pEncoder, SArray* pParams, void tDestroySSTriggerCalcParam(void* ptr) { SSTriggerCalcParam* pParam = ptr; + if (pParam && pParam->conditionPath != NULL) { + taosMemoryFreeClear(pParam->conditionPath); + } if (pParam && pParam->extraNotifyContent != NULL) { taosMemoryFreeClear(pParam->extraNotifyContent); } @@ -3672,10 +3677,11 @@ static int32_t tDeserializeSTriggerCalcParam(SDecoder* pDecoder, SArray**ppParam int64_t plainFieldSize = offsetof(SSTriggerCalcParam, notifyType); TAOS_MEMCPY(param, pDecoder->data + pDecoder->pos, plainFieldSize); pDecoder->pos += plainFieldSize; + uint64_t len = 0; + TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)¶m->conditionPath, &len)); if (!ignoreNotificationInfo) { TAOS_CHECK_EXIT(tDecodeI32(pDecoder, ¶m->notifyType)); - uint64_t len = 0; TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)¶m->extraNotifyContent, &len)); } } diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 58897d99bf67..d18053686c48 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -1145,6 +1145,11 @@ static int32_t translatePlaceHolderPseudoColumn(SFunctionNode* pFunc, char* pErr (SDataType){.bytes = TSDB_TABLE_FNAME_LEN - 1 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}; break; } + case FUNCTION_TYPE_EVENT_CONDITION_PATH: { + pFunc->node.resType = + (SDataType){.bytes = TSDB_TABLE_FNAME_LEN - 1 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}; + break; + } case FUNCTION_TYPE_PLACEHOLDER_COLUMN: { break; } @@ -6360,6 +6365,20 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .sprocessFunc = streamPseudoScalarFunction, .finalizeFunc = NULL, }, + { + .name = "_event_condition_path", + .type = FUNCTION_TYPE_EVENT_CONDITION_PATH, + .classification = FUNC_MGT_PSEUDO_COLUMN_FUNC | FUNC_MGT_PLACE_HOLDER_FUNC | FUNC_MGT_SKIP_SCAN_CHECK_FUNC, + .parameters = {.minParamNum = 0, + .maxParamNum = 0, + .paramInfoPattern = 0, + .outputParaInfo = {.validDataType = FUNC_PARAM_SUPPORT_VARCHAR_TYPE}}, + .translateFunc = translatePlaceHolderPseudoColumn, + .getEnvFunc = NULL, + .initFunc = NULL, + .sprocessFunc = streamPseudoScalarFunction, + .finalizeFunc = NULL, + }, { .name = "_tgrpid", .type = FUNCTION_TYPE_TGRPID, diff --git a/source/libs/function/src/functionMgt.c b/source/libs/function/src/functionMgt.c index b02b0e0d51f2..6cbf4b9d9919 100644 --- a/source/libs/function/src/functionMgt.c +++ b/source/libs/function/src/functionMgt.c @@ -872,6 +872,8 @@ const void* fmGetStreamPesudoFuncVal(int32_t funcId, const SStreamRuntimeFuncInf return pStreamRuntimeFuncInfo->pStreamPartColVals; case FUNCTION_TYPE_PLACEHOLDER_TBNAME: return pStreamRuntimeFuncInfo->pStreamPartColVals; + case FUNCTION_TYPE_EVENT_CONDITION_PATH: + return pParams->conditionPath; case FUNCTION_TYPE_TIDLESTART: return &pParams->idlestart; case FUNCTION_TYPE_TIDLEEND: @@ -884,7 +886,8 @@ const void* fmGetStreamPesudoFuncVal(int32_t funcId, const SStreamRuntimeFuncInf bool fmIsStreamPesudoColVal(int32_t funcId) { return funcMgtBuiltins[funcId].type == FUNCTION_TYPE_PLACEHOLDER_COLUMN - || funcMgtBuiltins[funcId].type == FUNCTION_TYPE_PLACEHOLDER_TBNAME; + || funcMgtBuiltins[funcId].type == FUNCTION_TYPE_PLACEHOLDER_TBNAME + || funcMgtBuiltins[funcId].type == FUNCTION_TYPE_EVENT_CONDITION_PATH; } int32_t fmGetStreamPesudoFuncEnv(int32_t funcId, SNodeList* pParamNodes, SFuncExecEnv *pEnv) { @@ -983,6 +986,21 @@ int32_t fmSetStreamPseudoFuncParamVal(int32_t funcId, SNodeList* pParamNodes, co break; } } + } else if (FUNCTION_TYPE_EVENT_CONDITION_PATH == t) { + const char* pVal = (const char*)fmGetStreamPesudoFuncVal(funcId, pStreamRuntimeInfo); + taosMemoryFreeClear(((SValueNode*)pFirstParam)->datum.p); + if (pVal == NULL) { + ((SValueNode*)pFirstParam)->isNull = true; + } else { + size_t len = strlen(pVal); + ((SValueNode*)pFirstParam)->datum.p = taosMemoryCalloc(len + VARSTR_HEADER_SIZE, 1); + if (NULL == ((SValueNode*)pFirstParam)->datum.p) { + return terrno; + } + (void)memcpy(varDataVal(((SValueNode*)pFirstParam)->datum.p), pVal, len); + varDataLen(((SValueNode*)pFirstParam)->datum.p) = len; + ((SValueNode*)pFirstParam)->isNull = false; + } } else if(FUNCTION_TYPE_EXTERNAL_WINDOW_COLUMN == t) { if (NULL == pParamNodes || LIST_LENGTH(pParamNodes) < 2) { uError("invalid stream external window column param list %p, len: %d", pParamNodes, @@ -1044,4 +1062,3 @@ int32_t fmSetStreamPseudoFuncParamVal(int32_t funcId, SNodeList* pParamNodes, co return code; } - diff --git a/source/libs/new-stream/inc/streamInt.h b/source/libs/new-stream/inc/streamInt.h index b92e21796004..7f5a4e939a27 100755 --- a/source/libs/new-stream/inc/streamInt.h +++ b/source/libs/new-stream/inc/streamInt.h @@ -120,7 +120,7 @@ int32_t streamBuildStateNotifyContent(ESTriggerEventType eventType, SColumnInfo* const char* pToState, char** ppContent); int32_t streamBuildIdleNotifyContent(ESTriggerEventType eventType, int64_t idleDurationMs, char** ppContent); int32_t streamBuildEventNotifyContent(const SSDataBlock* pInputBlock, const SNodeList* pCondCols, int32_t rowIdx, - int32_t condIdx, int32_t winIdx, char** ppContent); + const char* conditionPath, int32_t conditionIndex, char** ppContent); int32_t streamBuildBlockResultNotifyContent(const SStreamRunnerTask* pTask, const SSDataBlock* pBlock, char** ppContent, const SArray* pFields, const int32_t startRow, const int32_t endRow); int32_t streamSendNotifyContent(SStreamTask* pTask, const char* streamName, const char* tableName, int32_t triggerType, diff --git a/source/libs/new-stream/inc/streamTriggerTask.h b/source/libs/new-stream/inc/streamTriggerTask.h index 272f7dbf160b..de144d08599f 100644 --- a/source/libs/new-stream/inc/streamTriggerTask.h +++ b/source/libs/new-stream/inc/streamTriggerTask.h @@ -51,16 +51,27 @@ typedef struct SSTriggerOrigTableInfo { typedef struct SSTriggerWindow { STimeWindow range; int64_t wrownum; + int32_t eventNodeId; // event window start-condition node id int64_t prevProcTime; // only used in realtime group for max_delay check } SSTriggerWindow; typedef struct SSTriggerNotifyWindow { STimeWindow range; int64_t wrownum; + int32_t eventNodeId; // event window start-condition node id + bool openEmitted; // whether WINDOW_OPEN has already been emitted char *pWinOpenNotify; char *pWinCloseNotify; } SSTriggerNotifyWindow; +typedef struct SSTriggerEventNodeMeta { + int32_t nodeId; + int32_t parentNodeId; + int32_t localIndex; + bool isLeaf; + char* path; +} SSTriggerEventNodeMeta; + typedef TRINGBUF(SSTriggerWindow) TriggerWindowBuf; typedef struct SSTriggerRealtimeGroup { @@ -86,6 +97,8 @@ typedef struct SSTriggerRealtimeGroup { SSTriggerNotifyWindow parentWindow; int32_t numSubWindows; int32_t conditionIdx; + SArray *pEventParents; // SArray, active non-leaf path windows + int32_t activeLeafNodeId; // current active leaf node id, -1 if none }; int64_t totalCount; // for count window trigger }; @@ -129,6 +142,8 @@ typedef struct SSTriggerHistoryGroup { SSTriggerNotifyWindow parentWindow; int32_t numSubWindows; int32_t conditionIdx; + SArray *pEventParents; // SArray, active non-leaf path windows + int32_t activeLeafNodeId; // current active leaf node id, -1 if none }; }; @@ -396,6 +411,8 @@ typedef struct SStreamTriggerTask { SNode *pEndCond; SNodeList *pStartCondCols; SNodeList *pEndCondCols; + SArray *pStartCondMeta; // SArray + bool startCondHasSubEvents; STrueForInfo eventTrueForInfo; }; }; diff --git a/source/libs/new-stream/src/streamTriggerTask.c b/source/libs/new-stream/src/streamTriggerTask.c index 41fb3bbce7cb..7e237f7567b6 100644 --- a/source/libs/new-stream/src/streamTriggerTask.c +++ b/source/libs/new-stream/src/streamTriggerTask.c @@ -139,6 +139,180 @@ static SRWLatch gStreamTriggerWaitLatch; static SList gStreamTriggerWaitList; static tmr_h gStreamTriggerTimerId = NULL; +static void stTriggerTaskDestroyEventNodeMeta(void *ptr) { + SSTriggerEventNodeMeta *pMeta = ptr; + if (pMeta != NULL && pMeta->path != NULL) { + taosMemoryFreeClear(pMeta->path); + } +} + +static bool stTriggerTaskHasNestedEventStartCond(const SNode *pNode) { + if (pNode == NULL || nodeType((SNode *)pNode) != QUERY_NODE_NODE_LIST) { + return false; + } + + SNodeList *pNodeList = ((SNodeListNode *)pNode)->pNodeList; + SNode *pChild = NULL; + FOREACH(pChild, pNodeList) { + if (nodeType(pChild) == QUERY_NODE_NODE_LIST) { + return true; + } + } + return false; +} + +static int32_t stTriggerTaskBuildEventMetaItem(SNode *pNode, int32_t parentNodeId, int32_t localIndex, + const char *pParentPath, SArray *pMetas) { + int32_t code = TSDB_CODE_SUCCESS; + char idxBuf[32] = {0}; + int32_t pathLen = snprintf(idxBuf, sizeof(idxBuf), "%d", localIndex); + if (pathLen <= 0) { + return TSDB_CODE_INTERNAL_ERROR; + } + + int32_t parentLen = (pParentPath != NULL) ? (int32_t)strlen(pParentPath) : 0; + int32_t totalLen = parentLen + (parentLen > 0 ? 1 : 0) + pathLen; + char *pPath = taosMemoryCalloc(totalLen + 1, 1); + if (pPath == NULL) { + return terrno; + } + if (parentLen > 0) { + (void)memcpy(pPath, pParentPath, parentLen); + pPath[parentLen] = '.'; + (void)memcpy(pPath + parentLen + 1, idxBuf, pathLen); + } else { + (void)memcpy(pPath, idxBuf, pathLen); + } + + SSTriggerEventNodeMeta meta = { + .nodeId = taosArrayGetSize(pMetas), + .parentNodeId = parentNodeId, + .localIndex = localIndex, + .isLeaf = (nodeType(pNode) != QUERY_NODE_NODE_LIST), + .path = pPath, + }; + void *px = taosArrayPush(pMetas, &meta); + if (px == NULL) { + code = terrno; + goto _exit; + } + pPath = NULL; + + if (!meta.isLeaf) { + SNodeList *pNodeList = ((SNodeListNode *)pNode)->pNodeList; + SNode *pChild = NULL; + int32_t childIndex = 0; + FOREACH(pChild, pNodeList) { + code = stTriggerTaskBuildEventMetaItem(pChild, meta.nodeId, childIndex++, meta.path, pMetas); + if (code != TSDB_CODE_SUCCESS) { + goto _exit; + } + } + } + +_exit: + if (code != TSDB_CODE_SUCCESS && pPath != NULL) { + taosMemoryFreeClear(pPath); + } + return code; +} + +static int32_t stTriggerTaskBuildEventMetas(SNode *pStartCond, SArray **ppMetas) { + int32_t code = TSDB_CODE_SUCCESS; + *ppMetas = taosArrayInit(0, sizeof(SSTriggerEventNodeMeta)); + if (*ppMetas == NULL) { + return terrno; + } + + if (pStartCond == NULL) { + return TSDB_CODE_SUCCESS; + } + + if (nodeType(pStartCond) == QUERY_NODE_NODE_LIST) { + SNodeList *pNodeList = ((SNodeListNode *)pStartCond)->pNodeList; + SNode *pChild = NULL; + int32_t childIndex = 0; + FOREACH(pChild, pNodeList) { + code = stTriggerTaskBuildEventMetaItem(pChild, -1, childIndex++, NULL, *ppMetas); + if (code != TSDB_CODE_SUCCESS) { + goto _exit; + } + } + } else { + code = stTriggerTaskBuildEventMetaItem(pStartCond, -1, 0, NULL, *ppMetas); + if (code != TSDB_CODE_SUCCESS) { + goto _exit; + } + } + + return code; + +_exit: + if (code != TSDB_CODE_SUCCESS && *ppMetas != NULL) { + taosArrayDestroyEx(*ppMetas, stTriggerTaskDestroyEventNodeMeta); + *ppMetas = NULL; + } + return code; +} + +static SSTriggerEventNodeMeta *stTriggerTaskGetEventMeta(const SStreamTriggerTask *pTask, int32_t nodeId) { + if (pTask == NULL || pTask->pStartCondMeta == NULL || nodeId < 0 || + nodeId >= taosArrayGetSize(pTask->pStartCondMeta)) { + return NULL; + } + return taosArrayGet(pTask->pStartCondMeta, nodeId); +} + +static int32_t stTriggerTaskGetEventPathNodeIds(const SStreamTriggerTask *pTask, int32_t leafNodeId, int32_t **ppIds, + int32_t *pLen) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t maxNodes = (pTask != NULL && pTask->pStartCondMeta != NULL) ? taosArrayGetSize(pTask->pStartCondMeta) : 0; + if (maxNodes <= 0) { + return TSDB_CODE_INTERNAL_ERROR; + } + + int32_t *pIds = taosMemoryMalloc(sizeof(int32_t) * maxNodes); + if (pIds == NULL) { + return terrno; + } + + int32_t len = 0; + while (leafNodeId >= 0) { + SSTriggerEventNodeMeta *pMeta = stTriggerTaskGetEventMeta(pTask, leafNodeId); + if (pMeta == NULL) { + taosMemoryFreeClear(pIds); + return TSDB_CODE_INTERNAL_ERROR; + } + pIds[len++] = leafNodeId; + leafNodeId = pMeta->parentNodeId; + } + + for (int32_t i = 0; i < len / 2; ++i) { + int32_t tmp = pIds[i]; + pIds[i] = pIds[len - i - 1]; + pIds[len - i - 1] = tmp; + } + + *ppIds = pIds; + *pLen = len; + return code; + +_exit: + if (code != TSDB_CODE_SUCCESS) { + taosMemoryFreeClear(pIds); + } + return code; +} + +static int32_t stTriggerTaskBuildEventNotify(const SStreamTriggerTask *pTask, const SSDataBlock *pInputBlock, + const SNodeList *pCondCols, int32_t rowIdx, int32_t nodeId, + char **ppContent) { + SSTriggerEventNodeMeta *pMeta = stTriggerTaskGetEventMeta(pTask, nodeId); + const char *pPath = (pMeta != NULL) ? pMeta->path : "0"; + int32_t localIndex = (pMeta != NULL) ? pMeta->localIndex : 0; + return streamBuildEventNotifyContent(pInputBlock, pCondCols, rowIdx, pPath, localIndex, ppContent); +} + static int32_t stTriggerTaskAddWaitSession(SStreamTriggerTask *pTask, int64_t sessionId, int64_t resumeTime) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; @@ -2495,17 +2669,15 @@ int32_t stTriggerTaskDeploy(SStreamTriggerTask *pTask, SStreamTriggerDeployMsg * pTask->eventTrueForInfo.trueForType = pEvent->trueForType; pTask->eventTrueForInfo.count = pEvent->trueForCount; pTask->eventTrueForInfo.duration = pEvent->trueForDuration; + pTask->startCondHasSubEvents = stTriggerTaskHasNestedEventStartCond(pTask->pStartCond); code = nodesCollectColumnsFromNode(pTask->pStartCond, NULL, COLLECT_COL_TYPE_ALL, &pTask->pStartCondCols); QUERY_CHECK_CODE(code, lino, _end); - if (nodeType(pTask->pStartCond) == QUERY_NODE_NODE_LIST) { - SNode *nodes[2] = {pTask->pStartCond, pTask->pEndCond}; - code = nodesCollectColumnsFromMultiNodes(nodes, ARRAY_SIZE(nodes), NULL, COLLECT_COL_TYPE_ALL, - &pTask->pEndCondCols); - QUERY_CHECK_CODE(code, lino, _end); - } else { + if (pTask->pEndCond != NULL) { code = nodesCollectColumnsFromNode(pTask->pEndCond, NULL, COLLECT_COL_TYPE_ALL, &pTask->pEndCondCols); QUERY_CHECK_CODE(code, lino, _end); } + code = stTriggerTaskBuildEventMetas(pTask->pStartCond, &pTask->pStartCondMeta); + QUERY_CHECK_CODE(code, lino, _end); break; } case WINDOW_TYPE_COUNT: { @@ -2576,7 +2748,8 @@ int32_t stTriggerTaskDeploy(SStreamTriggerTask *pTask, SStreamTriggerDeployMsg * pTask->ignoreNoDataTrigger = pMsg->igNoDataTrigger; pTask->hasTriggerFilter = pMsg->triggerHasPF; pTask->multiGroupBatch = pMsg->enableMultiGroupCalc; - QUERY_CHECK_CONDITION(!pTask->multiGroupBatch, code, lino, _end, TSDB_CODE_INTERNAL_ERROR); // todo(kjq): enable multi group calc + QUERY_CHECK_CONDITION(!pTask->multiGroupBatch, code, lino, _end, + TSDB_CODE_INTERNAL_ERROR); // todo(kjq): enable multi group calc if (pTask->multiGroupBatch) { QUERY_CHECK_CONDITION((pTask->placeHolderBitmap & PLACE_HOLDER_PARTITION_ROWS) == 0, code, lino, _end, TSDB_CODE_INVALID_PARA); @@ -2768,6 +2941,10 @@ int32_t stTriggerTaskUndeployImpl(SStreamTriggerTask **ppTask, const SStreamUnde nodesDestroyList(pTask->pEndCondCols); pTask->pEndCondCols = NULL; } + if (pTask->pStartCondMeta != NULL) { + taosArrayDestroyEx(pTask->pStartCondMeta, stTriggerTaskDestroyEventNodeMeta); + pTask->pStartCondMeta = NULL; + } } if (pTask->triggerFilter != NULL) { @@ -3433,6 +3610,103 @@ static int32_t stRealtimeContextFinishVtablePatch(SSTriggerRealtimeContext *pCon // pGroup->newThreshold = pGroup->oldThreshold; // } +static int32_t stCalcNestedEventStartItem(SArray *pList, SNode *pNode, int32_t *pNextNodeId, int32_t nrows, + SColumnInfoData *pResCol) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t nodeId = (*pNextNodeId)++; + + if (nodeType(pNode) == QUERY_NODE_NODE_LIST) { + SNodeList *pNodeList = ((SNodeListNode *)pNode)->pNodeList; + SNode *pChild = NULL; + FOREACH(pChild, pNodeList) { + code = stCalcNestedEventStartItem(pList, pChild, pNextNodeId, nrows, pResCol); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + } + return TSDB_CODE_SUCCESS; + } + + SColumnInfoData tmpCol = {0}; + SDataType *pType = &((SExprNode *)pNode)->resType; + tmpCol.info.type = pType->type; + tmpCol.info.bytes = pType->bytes; + tmpCol.info.scale = pType->scale; + tmpCol.info.precision = pType->precision; + + SScalarParam output = {.columnData = &tmpCol}; + code = scalarCalculate(pNode, pList, &output, NULL); + if (code != TSDB_CODE_SUCCESS) { + colDataDestroy(&tmpCol); + return code; + } + + uint8_t *pTmpData = (uint8_t *)tmpCol.pData; + int32_t *pResData = (int32_t *)pResCol->pData; + for (int32_t i = 0; i < nrows; ++i) { + if (pResData[i] == 0 && pTmpData[i]) { + pResData[i] = nodeId + 1; + } + } + colDataDestroy(&tmpCol); + return TSDB_CODE_SUCCESS; +} + +static int32_t stCalcNestedEventStartExpr(SSDataBlock *pDataBlock, SNode *pExpr, SColumnInfoData *pResCol) { + int32_t code = TSDB_CODE_SUCCESS; + SArray *pList = taosArrayInit(1, POINTER_BYTES); + int32_t nextNodeId = 0; + int32_t nrows = blockDataGetNumOfRows(pDataBlock); + void *px = NULL; + SNode *pChild = NULL; + + if (pList == NULL) { + return terrno; + } + px = taosArrayPush(pList, &pDataBlock); + if (px == NULL) { + taosArrayDestroy(pList); + return terrno; + } + + pResCol->info.type = TSDB_DATA_TYPE_INT; + pResCol->info.bytes = tDataTypes[TSDB_DATA_TYPE_INT].bytes; + pResCol->info.scale = 0; + pResCol->info.precision = 0; + code = colInfoDataEnsureCapacity(pResCol, pDataBlock->info.capacity, false); + if (code != TSDB_CODE_SUCCESS) { + taosArrayDestroy(pList); + return code; + } + TAOS_MEMSET(pResCol->nullbitmap, 0, BitmapLen(pDataBlock->info.capacity)); + TAOS_MEMSET(pResCol->pData, 0, pDataBlock->info.capacity * pResCol->info.bytes); + + if (pExpr == NULL) { + taosArrayDestroy(pList); + return TSDB_CODE_SUCCESS; + } + + if (nodeType(pExpr) == QUERY_NODE_NODE_LIST) { + SNodeList *pNodeList = ((SNodeListNode *)pExpr)->pNodeList; + FOREACH(pChild, pNodeList) { + code = stCalcNestedEventStartItem(pList, pChild, &nextNodeId, nrows, pResCol); + if (code != TSDB_CODE_SUCCESS) { + taosArrayDestroy(pList); + return code; + } + } + } else { + code = stCalcNestedEventStartItem(pList, pExpr, &nextNodeId, nrows, pResCol); + if (code != TSDB_CODE_SUCCESS) { + taosArrayDestroy(pList); + return code; + } + } + + taosArrayDestroy(pList); + return TSDB_CODE_SUCCESS; +} + static int32_t stRealtimeContextCalcExpr(SSTriggerRealtimeContext *pContext, SSDataBlock *pDataBlock, SNode *pExpr, SColumnInfoData *pResCol) { int32_t code = TSDB_CODE_SUCCESS; @@ -3446,6 +3720,12 @@ static int32_t stRealtimeContextCalcExpr(SSTriggerRealtimeContext *pContext, SSD void *px = taosArrayPush(pList, &pDataBlock); QUERY_CHECK_NULL(px, code, lino, _end, terrno); + if (pContext->pTask->triggerType == STREAM_TRIGGER_EVENT && pExpr != NULL && + nodeType(pExpr) == QUERY_NODE_NODE_LIST && stTriggerTaskHasNestedEventStartCond(pExpr)) { + taosArrayDestroy(pList); + return stCalcNestedEventStartExpr(pDataBlock, pExpr, pResCol); + } + if (pExpr == NULL || nodeType(pExpr) == QUERY_NODE_NODE_LIST) { pResCol->info.type = TSDB_DATA_TYPE_UINT; pResCol->info.bytes = 1; @@ -4344,12 +4624,12 @@ static int32_t stRealtimeContextSendPullReq(SSTriggerRealtimeContext *pContext, // send STRIGGER_PULL_GROUP_COL_VALUE for given (pProgress, gid); used when pulling groupInfo for pending create-table static int32_t stRealtimeContextSendPullReqForGid(SSTriggerRealtimeContext *pContext, SSTriggerWalProgress *pProgress, int64_t gid) { - SStreamTriggerTask *pTask = pContext->pTask; - SSTriggerPullRequest *pReq = &pProgress->pullReq.base; - SStreamTaskAddr *pReader = pProgress->pTaskAddr; - SRpcMsg msg = {.msgType = TDMT_STREAM_TRIGGER_PULL}; - int32_t code = TSDB_CODE_SUCCESS; - int32_t lino = 0; + SStreamTriggerTask *pTask = pContext->pTask; + SSTriggerPullRequest *pReq = &pProgress->pullReq.base; + SStreamTaskAddr *pReader = pProgress->pTaskAddr; + SRpcMsg msg = {.msgType = TDMT_STREAM_TRIGGER_PULL}; + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; pReq->type = STRIGGER_PULL_GROUP_COL_VALUE; pReq->readerTaskId = pReader->taskId; @@ -4410,7 +4690,8 @@ static int32_t stTriggerTaskSendCreateTableReq(SStreamTriggerTask *pTask, SSTrig (pTask->placeHolderBitmap & PLACE_HOLDER_PARTITION_TBNAME)) { needTagValue = true; } - if (needTagValue && pContext != NULL && pContext->pGroupColVals != NULL && taosArrayGetSize(pCalcReq->groupColVals) == 0) { + if (needTagValue && pContext != NULL && pContext->pGroupColVals != NULL && + taosArrayGetSize(pCalcReq->groupColVals) == 0) { void *px = tSimpleHashGet(pContext->pGroupColVals, &gid, sizeof(int64_t)); if (px != NULL) { SArray *pGroupColVals = *(SArray **)px; @@ -4512,7 +4793,6 @@ static int32_t stTriggerTaskSendCreateTableReq(SStreamTriggerTask *pTask, SSTrig } static int32_t stRealtimeContextSendCalcReq(SSTriggerRealtimeContext *pContext, bool sendOnly) { - int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; SStreamTriggerTask *pTask = pContext->pTask; @@ -4747,8 +5027,8 @@ static int32_t stRealtimeContextSendCalcReq(SSTriggerRealtimeContext *pContext, TAOS_MEMCPY(&readInfo.lastParam, pLastParam, plainFieldSize); // todo(kjq): fill in ptables in readInfo SSTriggerRealtimeGroup *pGroup = stRealtimeContextGetCurrentGroup(pContext); - SArray *pInfos = NULL; - void *px = tSimpleHashGet(pCalcReq->pGroupReadInfos, &pGroup->vgId, sizeof(int32_t)); + SArray *pInfos = NULL; + void *px = tSimpleHashGet(pCalcReq->pGroupReadInfos, &pGroup->vgId, sizeof(int32_t)); if (px == NULL) { pInfos = taosArrayInit(0, sizeof(SSTriggerGroupReadInfo)); QUERY_CHECK_NULL(pInfos, code, lino, _end, terrno); @@ -5992,9 +6272,9 @@ static int32_t stRealtimeContextProcPullRsp(SSTriggerRealtimeContext *pContext, pProgress->doneVer = pProgress->startVer; pProgress->lastScanVer = pProgress->startVer; - int32_t nrows = blockDataGetNumOfRows(pDataBlock); - int64_t *pGidData = NULL; - int64_t *pTsData = NULL; + int32_t nrows = blockDataGetNumOfRows(pDataBlock); + int64_t *pGidData = NULL; + int64_t *pTsData = NULL; if (nrows > 0) { int32_t iCol = 0; @@ -6052,8 +6332,7 @@ static int32_t stRealtimeContextProcPullRsp(SSTriggerRealtimeContext *pContext, if (pTask->nodelayCreateSubtable && nrows > 0 && pGidData != NULL) { if (pContext->pPendingCreateTableGids == NULL) { - pContext->pPendingCreateTableGids = - taosArrayInit(0, sizeof(SSTriggerPendingCreateTableEntry)); + pContext->pPendingCreateTableGids = taosArrayInit(0, sizeof(SSTriggerPendingCreateTableEntry)); QUERY_CHECK_NULL(pContext->pPendingCreateTableGids, code, lino, _end, terrno); } if (pTask->isVirtualTable) { @@ -6063,8 +6342,7 @@ static int32_t stRealtimeContextProcPullRsp(SSTriggerRealtimeContext *pContext, if (pOrigTableInfo == NULL) continue; for (int32_t j = 0; j < TARRAY_SIZE(pOrigTableInfo->pVtbUids); j++) { int64_t vtbUid = *(int64_t *)TARRAY_GET_ELEM(pOrigTableInfo->pVtbUids, j); - SSTriggerVirtTableInfo *pVirtTableInfo = - tSimpleHashGet(pTask->pVirtTableInfos, &vtbUid, sizeof(int64_t)); + SSTriggerVirtTableInfo *pVirtTableInfo = tSimpleHashGet(pTask->pVirtTableInfos, &vtbUid, sizeof(int64_t)); if (pVirtTableInfo == NULL) continue; SSTriggerPendingCreateTableEntry entry = { .gid = pVirtTableInfo->tbGid, .pProgress = pProgress, .attemptCount = 1}; @@ -6075,7 +6353,7 @@ static int32_t stRealtimeContextProcPullRsp(SSTriggerRealtimeContext *pContext, } else { for (int32_t i = 0; i < nrows; i++) { SSTriggerPendingCreateTableEntry entry = {.gid = pGidData[i], .pProgress = pProgress, .attemptCount = 1}; - void *px = taosArrayPush(pContext->pPendingCreateTableGids, &entry); + void *px = taosArrayPush(pContext->pPendingCreateTableGids, &entry); QUERY_CHECK_NULL(px, code, lino, _end, terrno); } } @@ -6087,8 +6365,7 @@ static int32_t stRealtimeContextProcPullRsp(SSTriggerRealtimeContext *pContext, } // all readers responded; send first GROUP_COL_VALUE pull for nodelay create-table if any - if (pContext->pPendingCreateTableGids != NULL && - taosArrayGetSize(pContext->pPendingCreateTableGids) > 0) { + if (pContext->pPendingCreateTableGids != NULL && taosArrayGetSize(pContext->pPendingCreateTableGids) > 0) { SSTriggerPendingCreateTableEntry *pFirst = (SSTriggerPendingCreateTableEntry *)taosArrayGet(pContext->pPendingCreateTableGids, 0); QUERY_CHECK_NULL(pFirst, code, lino, _end, TSDB_CODE_INTERNAL_ERROR); @@ -6531,13 +6808,15 @@ static int32_t stRealtimeContextProcPullRsp(SSTriggerRealtimeContext *pContext, code, lino, _end, TSDB_CODE_INTERNAL_ERROR); SSTriggerGroupColValueRequest *pRequest = (SSTriggerGroupColValueRequest *)pReq; if (pContext->status == STRIGGER_CONTEXT_DETERMINE_BOUND && pContext->pPendingCreateTableGids != NULL) { - // pending create-table: LAST_TS needed tag, we pulled groupInfo for one gid; create table then continue or finish + // pending create-table: LAST_TS needed tag, we pulled groupInfo for one gid; create table then continue or + // finish SStreamGroupInfo groupInfo = {0}; if (pRsp->contLen > 0) { code = tDeserializeSStreamGroupInfo(pRsp->pCont, pRsp->contLen, &groupInfo); QUERY_CHECK_CODE(code, lino, _end); } - code = tSimpleHashPut(pContext->pGroupColVals, &pRequest->gid, sizeof(int64_t), &groupInfo.gInfo, POINTER_BYTES); + code = + tSimpleHashPut(pContext->pGroupColVals, &pRequest->gid, sizeof(int64_t), &groupInfo.gInfo, POINTER_BYTES); if (code != TSDB_CODE_SUCCESS) { taosArrayClearEx(groupInfo.gInfo, tDestroySStreamGroupValue); QUERY_CHECK_CODE(code, lino, _end); @@ -6560,8 +6839,7 @@ static int32_t stRealtimeContextProcPullRsp(SSTriggerRealtimeContext *pContext, SSTriggerPendingCreateTableEntry *pNext = (SSTriggerPendingCreateTableEntry *)taosArrayGet(pContext->pPendingCreateTableGids, 0); QUERY_CHECK_NULL(pNext, code, lino, _end, TSDB_CODE_INTERNAL_ERROR); - code = - stRealtimeContextSendPullReqForGid(pContext, pNext->pProgress, pNext->gid); + code = stRealtimeContextSendPullReqForGid(pContext, pNext->pProgress, pNext->gid); QUERY_CHECK_CODE(code, lino, _end); } else { taosArrayDestroy(pContext->pPendingCreateTableGids); @@ -7067,6 +7345,12 @@ static int32_t stHistoryContextCalcExpr(SSTriggerHistoryContext *pContext, SSDat void *px = taosArrayPush(pList, &pDataBlock); QUERY_CHECK_NULL(px, code, lino, _end, terrno); + if (pContext->pTask->triggerType == STREAM_TRIGGER_EVENT && pExpr != NULL && + nodeType(pExpr) == QUERY_NODE_NODE_LIST && stTriggerTaskHasNestedEventStartCond(pExpr)) { + taosArrayDestroy(pList); + return stCalcNestedEventStartExpr(pDataBlock, pExpr, pResCol); + } + if (pExpr == NULL || nodeType(pExpr) == QUERY_NODE_NODE_LIST) { pResCol->info.type = TSDB_DATA_TYPE_UINT; pResCol->info.bytes = 1; @@ -8999,6 +9283,10 @@ static int32_t stRealtimeGroupInit(SSTriggerRealtimeGroup *pGroup, SSTriggerReal if (pTask->triggerType == STREAM_TRIGGER_STATE) { pGroup->pendingNullStart = INT64_MIN; + } else if (pTask->triggerType == STREAM_TRIGGER_EVENT) { + pGroup->activeLeafNodeId = -1; + pGroup->pEventParents = taosArrayInit(0, sizeof(SSTriggerNotifyWindow)); + QUERY_CHECK_NULL(pGroup->pEventParents, code, lino, _end, terrno); } pGroup->prevWindow = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MIN}; code = taosObjListInit(&pGroup->windows, &pContext->windowPool); @@ -9039,6 +9327,10 @@ static void stRealtimeGroupDestroy(void *ptr) { taosMemoryFreeClear(pGroup->stateVal.pData); } else if (pGroup->pContext->pTask->triggerType == STREAM_TRIGGER_EVENT) { stRealtimeContextDestroyWindow(&pGroup->parentWindow); + if (pGroup->pEventParents != NULL) { + taosArrayDestroyEx(pGroup->pEventParents, stRealtimeContextDestroyWindow); + pGroup->pEventParents = NULL; + } } taosObjListClear(&pGroup->windows); taosObjListClearEx(&pGroup->pPendingParWinCalcParams, tDestroySSTriggerCalcParam); @@ -9469,6 +9761,7 @@ static int32_t stRealtimeGroupDoSlidingCheck(SSTriggerRealtimeGroup *pGroup) { taosObjListInitIter(&pGroup->windows, &iter, TOBJLIST_ITER_FORWARD); while ((pWin = taosObjListIterNext(&iter)) != NULL) { newWin.range = pWin->range; + newWin.eventNodeId = pWin->eventNodeId; void *px = taosArrayPush(pContext->pWindows, &newWin); QUERY_CHECK_NULL(px, code, lino, _end, terrno); } @@ -9679,6 +9972,7 @@ static int32_t stRealtimeGroupDoStateCheck(SSTriggerRealtimeGroup *pGroup) { SSTriggerWindow *pWin = taosObjListGetHead(&pGroup->windows); SSTriggerNotifyWindow *pNewWin = TARRAY_DATA(pContext->pWindows); pNewWin->wrownum = pWin->wrownum; + pNewWin->eventNodeId = pWin->eventNodeId; pNewWin->pWinOpenNotify = pGroup->pPendWinOpenNotify; pGroup->pendingWinOpen = false; pGroup->pPendWinOpenNotify = NULL; @@ -9791,6 +10085,234 @@ static int32_t stRealtimeGroupDoStateCheck(SSTriggerRealtimeGroup *pGroup) { return code; } +static int32_t stRealtimeGroupCloseNestedEventParents(SSTriggerRealtimeGroup *pGroup, const SSDataBlock *pDataBlock, + int32_t rowIdx, int32_t keepCount) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + SSTriggerRealtimeContext *pContext = pGroup->pContext; + SStreamTriggerTask *pTask = pContext->pTask; + + while (taosArrayGetSize(pGroup->pEventParents) > keepCount) { + int32_t lastIdx = taosArrayGetSize(pGroup->pEventParents) - 1; + SSTriggerNotifyWindow *pParent = taosArrayGet(pGroup->pEventParents, lastIdx); + QUERY_CHECK_NULL(pParent, code, lino, _end, TSDB_CODE_INTERNAL_ERROR); + pParent->range.ekey &= (~TRIGGER_GROUP_UNCLOSED_WINDOW_MASK); + if (pTask->notifyEventType & STRIGGER_EVENT_WINDOW_CLOSE) { + code = stTriggerTaskBuildEventNotify(pTask, pDataBlock, pTask->pEndCondCols, rowIdx, pParent->eventNodeId, + &pParent->pWinCloseNotify); + QUERY_CHECK_CODE(code, lino, _end); + } + SSTriggerNotifyWindow parent = *pParent; + void *px = taosArrayPush(pContext->pParentWindows, &parent); + QUERY_CHECK_NULL(px, code, lino, _end, terrno); + TARRAY_SIZE(pGroup->pEventParents) = lastIdx; + } + +_end: + if (code != TSDB_CODE_SUCCESS) { + ST_TASK_ELOG("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; +} + +static int32_t stRealtimeGroupOpenNestedEventPath(SSTriggerRealtimeGroup *pGroup, const SSDataBlock *pDataBlock, + int32_t rowIdx, int64_t ts, const int32_t *pNodeIds, int32_t pathLen, + int32_t startParentIdx, SSTriggerNotifyWindow **ppLeafWin) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + SSTriggerRealtimeContext *pContext = pGroup->pContext; + SStreamTriggerTask *pTask = pContext->pTask; + + for (int32_t i = startParentIdx; i < pathLen - 1; ++i) { + SSTriggerNotifyWindow parent = {.range = {.skey = ts, .ekey = INT64_MAX}, .eventNodeId = pNodeIds[i]}; + if (pTask->notifyEventType & STRIGGER_EVENT_WINDOW_OPEN) { + code = stTriggerTaskBuildEventNotify(pTask, pDataBlock, pTask->pStartCondCols, rowIdx, pNodeIds[i], + &parent.pWinOpenNotify); + QUERY_CHECK_CODE(code, lino, _end); + } + void *px = taosArrayPush(pGroup->pEventParents, &parent); + QUERY_CHECK_NULL(px, code, lino, _end, terrno); + } + + SSTriggerNotifyWindow leaf = {.range = {.skey = ts, .ekey = INT64_MAX}, .eventNodeId = pNodeIds[pathLen - 1]}; + void *px = taosArrayPush(pContext->pWindows, &leaf); + QUERY_CHECK_NULL(px, code, lino, _end, terrno); + *ppLeafWin = px; + if (pTask->notifyEventType & STRIGGER_EVENT_WINDOW_OPEN) { + code = stTriggerTaskBuildEventNotify(pTask, pDataBlock, pTask->pStartCondCols, rowIdx, leaf.eventNodeId, + &(*ppLeafWin)->pWinOpenNotify); + QUERY_CHECK_CODE(code, lino, _end); + } + pGroup->activeLeafNodeId = leaf.eventNodeId; + +_end: + if (code != TSDB_CODE_SUCCESS) { + ST_TASK_ELOG("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; +} + +static int32_t stRealtimeGroupSwitchNestedEventPath(SSTriggerRealtimeGroup *pGroup, const SSDataBlock *pDataBlock, + int32_t rowIdx, int64_t ts, int32_t newLeafNodeId, + SSTriggerNotifyWindow **ppLeafWin) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + SStreamTriggerTask *pTask = pGroup->pContext->pTask; + int32_t *pNewIds = NULL; + int32_t newLen = 0; + int32_t oldLen = taosArrayGetSize(pGroup->pEventParents) + ((*ppLeafWin != NULL) ? 1 : 0); + int32_t lcaLen = 0; + + code = stTriggerTaskGetEventPathNodeIds(pTask, newLeafNodeId, &pNewIds, &newLen); + QUERY_CHECK_CODE(code, lino, _end); + + while (lcaLen < oldLen && lcaLen < newLen) { + int32_t oldNodeId = 0; + if (lcaLen < taosArrayGetSize(pGroup->pEventParents)) { + SSTriggerNotifyWindow *pParent = taosArrayGet(pGroup->pEventParents, lcaLen); + oldNodeId = pParent->eventNodeId; + } else { + oldNodeId = pGroup->activeLeafNodeId; + } + if (oldNodeId != pNewIds[lcaLen]) { + break; + } + ++lcaLen; + } + + if (*ppLeafWin != NULL && pGroup->activeLeafNodeId >= 0) { + (*ppLeafWin)->range.ekey &= (~TRIGGER_GROUP_UNCLOSED_WINDOW_MASK); + if (pTask->notifyEventType & STRIGGER_EVENT_WINDOW_CLOSE) { + code = stTriggerTaskBuildEventNotify(pTask, pDataBlock, pTask->pEndCondCols, rowIdx, (*ppLeafWin)->eventNodeId, + &(*ppLeafWin)->pWinCloseNotify); + QUERY_CHECK_CODE(code, lino, _end); + } + pGroup->activeLeafNodeId = -1; + *ppLeafWin = NULL; + } + + code = stRealtimeGroupCloseNestedEventParents(pGroup, pDataBlock, rowIdx, lcaLen); + QUERY_CHECK_CODE(code, lino, _end); + code = stRealtimeGroupOpenNestedEventPath(pGroup, pDataBlock, rowIdx, ts, pNewIds, newLen, lcaLen, ppLeafWin); + QUERY_CHECK_CODE(code, lino, _end); + +_end: + taosMemoryFreeClear(pNewIds); + if (code != TSDB_CODE_SUCCESS) { + ST_TASK_ELOG("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; +} + +static int32_t stRealtimeGroupDoNestedEventCheck(SSTriggerRealtimeGroup *pGroup) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + SSTriggerRealtimeContext *pContext = pGroup->pContext; + SStreamTriggerTask *pTask = pContext->pTask; + SSDataBlock *pDataBlock = NULL; + int32_t startIdx = 0; + int32_t endIdx = 0; + + if (TARRAY_SIZE(pContext->pWindows) == 0) { + SSTriggerNotifyWindow newWin = {0}; + SSTriggerWindow *pOldWin = NULL; + SObjListIter iter = {0}; + taosObjListInitIter(&pGroup->windows, &iter, TOBJLIST_ITER_FORWARD); + while ((pOldWin = taosObjListIterNext(&iter)) != NULL) { + newWin.range = pOldWin->range; + newWin.wrownum = pOldWin->wrownum; + newWin.eventNodeId = pOldWin->eventNodeId; + void *px = taosArrayPush(pContext->pWindows, &newWin); + QUERY_CHECK_NULL(px, code, lino, _end, terrno); + } + if (pGroup->pendingWinOpen) { + QUERY_CHECK_CONDITION(TARRAY_SIZE(pContext->pWindows) == 1, code, lino, _end, TSDB_CODE_INTERNAL_ERROR); + SSTriggerWindow *pOld = taosObjListGetHead(&pGroup->windows); + SSTriggerNotifyWindow *pLeafWin = TARRAY_DATA(pContext->pWindows); + pLeafWin->wrownum = pOld->wrownum; + pLeafWin->eventNodeId = pOld->eventNodeId; + pLeafWin->pWinOpenNotify = pGroup->pPendWinOpenNotify; + pGroup->pendingWinOpen = false; + pGroup->pPendWinOpenNotify = NULL; + taosObjListClear(&pGroup->windows); + } + } + + SSTriggerNotifyWindow *pLeafWin = taosArrayGetLast(pContext->pWindows); + + while (true) { + code = stRealtimeGroupNextDataBlock(pGroup, &pDataBlock, &startIdx, &endIdx); + QUERY_CHECK_CODE(code, lino, _end); + if (pContext->needPseudoCols || pDataBlock == NULL || startIdx >= endIdx) { + break; + } + + SColumnInfoData *pTsCol = taosArrayGet(pDataBlock->pDataBlock, pTask->trigTsIndex); + QUERY_CHECK_NULL(pTsCol, code, lino, _end, terrno); + int64_t *pTsData = (int64_t *)pTsCol->pData; + + SColumnInfoData *psCol = NULL, *peCol = NULL; + if (pTask->isVirtualTable) { + code = stRealtimeContextCalcExpr(pContext, pDataBlock, pTask->pStartCond, &pContext->eventStartCol); + QUERY_CHECK_CODE(code, lino, _end); + code = stRealtimeContextCalcExpr(pContext, pDataBlock, pTask->pEndCond, &pContext->eventEndCol); + QUERY_CHECK_CODE(code, lino, _end); + psCol = &pContext->eventStartCol; + peCol = &pContext->eventEndCol; + } else { + peCol = taosArrayGetLast(pDataBlock->pDataBlock); + QUERY_CHECK_NULL(peCol, code, lino, _end, terrno); + psCol = peCol - 1; + } + + int32_t *ps = (int32_t *)psCol->pData; + uint8_t *pe = (uint8_t *)peCol->pData; + for (int32_t i = startIdx; i < endIdx; ++i) { + int32_t matchNodeId = (ps[i] > 0) ? (ps[i] - 1) : -1; + bool hasMatch = (matchNodeId >= 0); + + if (pLeafWin == NULL && hasMatch) { + code = stRealtimeGroupSwitchNestedEventPath(pGroup, pDataBlock, i, pTsData[i], matchNodeId, &pLeafWin); + QUERY_CHECK_CODE(code, lino, _end); + } else if (pLeafWin != NULL && hasMatch && matchNodeId != pGroup->activeLeafNodeId) { + code = stRealtimeGroupSwitchNestedEventPath(pGroup, pDataBlock, i, pTsData[i], matchNodeId, &pLeafWin); + QUERY_CHECK_CODE(code, lino, _end); + } + + if (pLeafWin == NULL) { + continue; + } + + pLeafWin->wrownum++; + pLeafWin->range.ekey = (pTsData[i] | TRIGGER_GROUP_UNCLOSED_WINDOW_MASK); + for (int32_t j = 0; j < taosArrayGetSize(pGroup->pEventParents); ++j) { + SSTriggerNotifyWindow *pParent = taosArrayGet(pGroup->pEventParents, j); + pParent->wrownum++; + pParent->range.ekey = (pTsData[i] | TRIGGER_GROUP_UNCLOSED_WINDOW_MASK); + } + + if (pe[i] || !hasMatch) { + pLeafWin->range.ekey &= (~TRIGGER_GROUP_UNCLOSED_WINDOW_MASK); + if (pTask->notifyEventType & STRIGGER_EVENT_WINDOW_CLOSE) { + code = stTriggerTaskBuildEventNotify(pTask, pDataBlock, pTask->pEndCondCols, i, pLeafWin->eventNodeId, + &pLeafWin->pWinCloseNotify); + QUERY_CHECK_CODE(code, lino, _end); + } + pLeafWin = NULL; + pGroup->activeLeafNodeId = -1; + code = stRealtimeGroupCloseNestedEventParents(pGroup, pDataBlock, i, 0); + QUERY_CHECK_CODE(code, lino, _end); + } + } + } + +_end: + if (code != TSDB_CODE_SUCCESS) { + ST_TASK_ELOG("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; +} + static int32_t stRealtimeGroupDoEventCheck(SSTriggerRealtimeGroup *pGroup) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; @@ -9800,6 +10322,10 @@ static int32_t stRealtimeGroupDoEventCheck(SSTriggerRealtimeGroup *pGroup) { int32_t startIdx = 0; int32_t endIdx = 0; + if (pTask->triggerType == STREAM_TRIGGER_EVENT && pTask->startCondHasSubEvents) { + return stRealtimeGroupDoNestedEventCheck(pGroup); + } + if (TARRAY_SIZE(pContext->pWindows) == 0) { SSTriggerNotifyWindow newWin = {0}; SSTriggerWindow *pWin = NULL; @@ -9856,7 +10382,7 @@ static int32_t stRealtimeGroupDoEventCheck(SSTriggerRealtimeGroup *pGroup) { if (pGroup->numSubWindows == 0) { pGroup->parentWindow = (SSTriggerNotifyWindow){.range.skey = pTsData[i], .range.ekey = INT64_MAX}; if (pTask->notifyEventType & STRIGGER_EVENT_WINDOW_OPEN) { - code = streamBuildEventNotifyContent(pDataBlock, pTask->pStartCondCols, i, ps[i] - 1, -1, + code = stTriggerTaskBuildEventNotify(pTask, pDataBlock, pTask->pStartCondCols, i, ps[i] - 1, &pGroup->parentWindow.pWinOpenNotify); QUERY_CHECK_CODE(code, lino, _end); } @@ -9868,6 +10394,7 @@ static int32_t stRealtimeGroupDoEventCheck(SSTriggerRealtimeGroup *pGroup) { SSTriggerNotifyWindow newWin = {0}; newWin.range.skey = pTsData[i]; newWin.range.ekey = INT64_MAX; + newWin.eventNodeId = ps[i] - 1; pWin = taosArrayPush(pContext->pWindows, &newWin); QUERY_CHECK_NULL(pWin, code, lino, _end, terrno); if (pTask->notifyEventType & STRIGGER_EVENT_WINDOW_OPEN) { @@ -9875,7 +10402,7 @@ static int32_t stRealtimeGroupDoEventCheck(SSTriggerRealtimeGroup *pGroup) { if (checkSubEvent && pGroup->numSubWindows > 1) { winIdx = pGroup->numSubWindows - 1; } - code = streamBuildEventNotifyContent(pDataBlock, pTask->pStartCondCols, i, ps[i] - 1, winIdx, + code = stTriggerTaskBuildEventNotify(pTask, pDataBlock, pTask->pStartCondCols, i, ps[i] - 1, &pWin->pWinOpenNotify); QUERY_CHECK_CODE(code, lino, _end); } @@ -9890,7 +10417,8 @@ static int32_t stRealtimeGroupDoEventCheck(SSTriggerRealtimeGroup *pGroup) { pWin->range.ekey &= (~TRIGGER_GROUP_UNCLOSED_WINDOW_MASK); if (pTask->notifyEventType & STRIGGER_EVENT_WINDOW_CLOSE) { int32_t winIdx = pGroup->numSubWindows - 1; - code = streamBuildEventNotifyContent(pDataBlock, pTask->pEndCondCols, i, 0, winIdx, &pWin->pWinCloseNotify); + code = stTriggerTaskBuildEventNotify(pTask, pDataBlock, pTask->pEndCondCols, i, pGroup->conditionIdx - 1, + &pWin->pWinCloseNotify); QUERY_CHECK_CODE(code, lino, _end); } pWin = NULL; @@ -9912,14 +10440,15 @@ static int32_t stRealtimeGroupDoEventCheck(SSTriggerRealtimeGroup *pGroup) { if (checkSubEvent && pGroup->numSubWindows > 1) { winIdx = pGroup->numSubWindows - 1; } - code = streamBuildEventNotifyContent(pDataBlock, pTask->pEndCondCols, i, 0, winIdx, &pWin->pWinCloseNotify); + code = stTriggerTaskBuildEventNotify(pTask, pDataBlock, pTask->pEndCondCols, i, pGroup->conditionIdx - 1, + &pWin->pWinCloseNotify); QUERY_CHECK_CODE(code, lino, _end); } pWin = NULL; if (checkSubEvent) { pGroup->parentWindow.range.ekey &= (~TRIGGER_GROUP_UNCLOSED_WINDOW_MASK); if (pTask->notifyEventType & STRIGGER_EVENT_WINDOW_CLOSE) { - code = streamBuildEventNotifyContent(pDataBlock, pTask->pEndCondCols, i, 0, -1, + code = stTriggerTaskBuildEventNotify(pTask, pDataBlock, pTask->pEndCondCols, i, pGroup->conditionIdx - 1, &pGroup->parentWindow.pWinCloseNotify); QUERY_CHECK_CODE(code, lino, _end); } @@ -10040,6 +10569,7 @@ static int32_t stRealtimeGroupMergeWindows(SSTriggerRealtimeGroup *pGroup) { QUERY_CHECK_CONDITION(pTmpWin->range.skey == pWin->range.skey, code, lino, _end, TSDB_CODE_INTERNAL_ERROR); pTmpWin->range.ekey = pWin->range.ekey; pTmpWin->wrownum = pWin->wrownum; + pTmpWin->eventNodeId = pWin->eventNodeId; pWin++; } } @@ -10051,6 +10581,7 @@ static int32_t stRealtimeGroupMergeWindows(SSTriggerRealtimeGroup *pGroup) { SSTriggerWindow win = {0}; win.range = pWin->range; win.wrownum = pWin->wrownum; + win.eventNodeId = pWin->eventNodeId; win.prevProcTime = now; code = taosObjListAppend(&pGroup->windows, &win); QUERY_CHECK_CODE(code, lino, _end); @@ -10139,6 +10670,14 @@ static int32_t stRealtimeGroupFillParam(SSTriggerRealtimeGroup *pGroup, SSTrigge } } + if (code == TSDB_CODE_SUCCESS && pTask->triggerType == STREAM_TRIGGER_EVENT && pWin->eventNodeId >= 0) { + SSTriggerEventNodeMeta *pMeta = stTriggerTaskGetEventMeta(pTask, pWin->eventNodeId); + if (pMeta != NULL && pMeta->path != NULL) { + pParam->conditionPath = taosStrdup(pMeta->path); + QUERY_CHECK_NULL(pParam->conditionPath, code, lino, _end, terrno); + } + } + _end: if (code != TSDB_CODE_SUCCESS) { ST_TASK_ELOG("%s failed at line %d since %s", __func__, lino, tstrerror(code)); @@ -10245,9 +10784,9 @@ static int32_t stRealtimeGroupGenCalcParams(SSTriggerRealtimeGroup *pGroup, int3 pWin->range.ekey & (~TRIGGER_GROUP_UNCLOSED_WINDOW_MASK), pWin->wrownum); bool ignore = (i < nInitWins) || !meetTrueFor; if ((calcOpen || notifyOpen) && !ignore && !pContext->recovering) { - SSTriggerCalcParam param = {.triggerTime = now, - .notifyType = (notifyOpen ? STRIGGER_EVENT_WINDOW_OPEN : STRIGGER_EVENT_WINDOW_NONE), - .extraNotifyContent = pWin->pWinOpenNotify}; + SSTriggerCalcParam param = {.triggerTime = now, + .notifyType = (notifyOpen ? STRIGGER_EVENT_WINDOW_OPEN : STRIGGER_EVENT_WINDOW_NONE), + .extraNotifyContent = pWin->pWinOpenNotify}; SSTriggerNotifyWindow win = *pWin; if (pTask->triggerType != STREAM_TRIGGER_SLIDING) { win.range.ekey = win.range.skey; @@ -10285,85 +10824,161 @@ static int32_t stRealtimeGroupGenCalcParams(SSTriggerRealtimeGroup *pGroup, int3 } } // trigger parent window open/close events - for (int32_t i = 0; i < TARRAY_SIZE(pContext->pParentWindows); i++) { - SSTriggerNotifyWindow *pWin = TARRAY_GET_ELEM(pContext->pParentWindows, i); - // check TRUE FOR condition - bool meetTrueFor = (pTrueForInfo == NULL) || (pTrueForInfo->duration == 0 && pTrueForInfo->count == 0) || - isTrueForSatisfied(pTrueForInfo, pWin->range.skey, - pWin->range.ekey & (~TRIGGER_GROUP_UNCLOSED_WINDOW_MASK), pWin->wrownum); - bool ignore = (pWin->range.skey <= pGroup->prevParentWinStart) || !meetTrueFor; - if ((calcOpen || notifyOpen) && !ignore && !pContext->recovering) { - SSTriggerCalcParam param = {.triggerTime = now, - .notifyType = (notifyOpen ? STRIGGER_EVENT_WINDOW_OPEN : STRIGGER_EVENT_WINDOW_NONE), - .extraNotifyContent = pWin->pWinOpenNotify}; - SSTriggerNotifyWindow win = *pWin; - if (pTask->triggerType != STREAM_TRIGGER_SLIDING) { + if (pTask->triggerType == STREAM_TRIGGER_EVENT && pTask->startCondHasSubEvents) { + for (int32_t i = 0; i < TARRAY_SIZE(pContext->pParentWindows); i++) { + SSTriggerNotifyWindow *pWin = TARRAY_GET_ELEM(pContext->pParentWindows, i); + bool meetTrueFor = (pTrueForInfo == NULL) || (pTrueForInfo->duration == 0 && pTrueForInfo->count == 0) || + isTrueForSatisfied(pTrueForInfo, pWin->range.skey, + pWin->range.ekey & (~TRIGGER_GROUP_UNCLOSED_WINDOW_MASK), pWin->wrownum); + if ((calcOpen || notifyOpen) && !pWin->openEmitted && meetTrueFor && !pContext->recovering) { + SSTriggerCalcParam param = { + .triggerTime = now, + .notifyType = (notifyOpen ? STRIGGER_EVENT_WINDOW_OPEN : STRIGGER_EVENT_WINDOW_NONE), + .extraNotifyContent = pWin->pWinOpenNotify}; + SSTriggerNotifyWindow win = *pWin; win.range.ekey = win.range.skey; + code = stRealtimeGroupFillParam(pGroup, ¶m, &win); + QUERY_CHECK_CODE(code, lino, _end); + if (calcOpen) { + code = taosObjListAppend(&pGroup->pPendingParWinCalcParams, ¶m); + QUERY_CHECK_CODE(code, lino, _end); + pWin->pWinOpenNotify = NULL; + } else if (notifyOpen) { + void *px = taosArrayPush(pContext->pNotifyParams, ¶m); + QUERY_CHECK_NULL(px, code, lino, _end, terrno); + pWin->pWinOpenNotify = NULL; + } } - code = stRealtimeGroupFillParam(pGroup, ¶m, &win); - QUERY_CHECK_CODE(code, lino, _end); - if (calcOpen) { - code = taosObjListAppend(&pGroup->pPendingParWinCalcParams, ¶m); + + bool ignore = (!meetTrueFor) || (pTask->ignoreNoDataTrigger && pWin->wrownum == 0); + if ((calcClose || notifyClose) && !ignore && !pContext->recovering) { + SSTriggerCalcParam param = { + .triggerTime = now, + .notifyType = (notifyClose ? STRIGGER_EVENT_WINDOW_CLOSE : STRIGGER_EVENT_WINDOW_NONE), + .extraNotifyContent = pWin->pWinCloseNotify}; + code = stRealtimeGroupFillParam(pGroup, ¶m, pWin); QUERY_CHECK_CODE(code, lino, _end); - pWin->pWinOpenNotify = NULL; - } else if (notifyOpen) { - void *px = taosArrayPush(pContext->pNotifyParams, ¶m); - QUERY_CHECK_NULL(px, code, lino, _end, terrno); - pWin->pWinOpenNotify = NULL; + if (calcClose) { + code = taosObjListAppend(&pGroup->pPendingParWinCalcParams, ¶m); + QUERY_CHECK_CODE(code, lino, _end); + pWin->pWinCloseNotify = NULL; + } else if (notifyClose) { + void *px = taosArrayPush(pContext->pNotifyParams, ¶m); + QUERY_CHECK_NULL(px, code, lino, _end, terrno); + pWin->pWinCloseNotify = NULL; + } } } - if (!ignore) { - pGroup->prevParentWinStart = pWin->range.skey; - } - ignore = (!meetTrueFor) || (pTask->ignoreNoDataTrigger && pWin->wrownum == 0); - if ((calcClose || notifyClose) && !ignore && !pContext->recovering) { - SSTriggerCalcParam param = { - .triggerTime = now, - .notifyType = (notifyClose ? STRIGGER_EVENT_WINDOW_CLOSE : STRIGGER_EVENT_WINDOW_NONE), - .extraNotifyContent = pWin->pWinCloseNotify}; - code = stRealtimeGroupFillParam(pGroup, ¶m, pWin); - QUERY_CHECK_CODE(code, lino, _end); - if (calcClose) { - code = taosObjListAppend(&pGroup->pPendingParWinCalcParams, ¶m); + for (int32_t i = 0; i < taosArrayGetSize(pGroup->pEventParents); ++i) { + SSTriggerNotifyWindow *pWin = taosArrayGet(pGroup->pEventParents, i); + bool meetTrueFor = (pTrueForInfo == NULL) || (pTrueForInfo->duration == 0 && pTrueForInfo->count == 0) || + isTrueForSatisfied(pTrueForInfo, pWin->range.skey, + pWin->range.ekey & (~TRIGGER_GROUP_UNCLOSED_WINDOW_MASK), pWin->wrownum); + if ((calcOpen || notifyOpen) && !pWin->openEmitted && meetTrueFor && !pContext->recovering) { + SSTriggerCalcParam param = { + .triggerTime = now, + .notifyType = (notifyOpen ? STRIGGER_EVENT_WINDOW_OPEN : STRIGGER_EVENT_WINDOW_NONE), + .extraNotifyContent = pWin->pWinOpenNotify}; + SSTriggerNotifyWindow win = *pWin; + win.range.ekey = win.range.skey; + code = stRealtimeGroupFillParam(pGroup, ¶m, &win); QUERY_CHECK_CODE(code, lino, _end); - pWin->pWinCloseNotify = NULL; - } else if (notifyClose) { - void *px = taosArrayPush(pContext->pNotifyParams, ¶m); - QUERY_CHECK_NULL(px, code, lino, _end, terrno); - pWin->pWinCloseNotify = NULL; + if (calcOpen) { + code = taosObjListAppend(&pGroup->pPendingParWinCalcParams, ¶m); + QUERY_CHECK_CODE(code, lino, _end); + pWin->pWinOpenNotify = NULL; + } else if (notifyOpen) { + void *px = taosArrayPush(pContext->pNotifyParams, ¶m); + QUERY_CHECK_NULL(px, code, lino, _end, terrno); + pWin->pWinOpenNotify = NULL; + } + pWin->openEmitted = true; } } - } - if (pTask->triggerType == STREAM_TRIGGER_EVENT && pGroup->numSubWindows > 0) { - SSTriggerNotifyWindow *pWin = &pGroup->parentWindow; - // check TRUE FOR condition - bool meetTrueFor = (pTrueForInfo == NULL) || (pTrueForInfo->duration == 0 && pTrueForInfo->count == 0) || - isTrueForSatisfied(pTrueForInfo, pWin->range.skey, - pWin->range.ekey & (~TRIGGER_GROUP_UNCLOSED_WINDOW_MASK), pWin->wrownum); - bool ignore = (pWin->range.skey <= pGroup->prevParentWinStart) || !meetTrueFor; - if ((calcOpen || notifyOpen) && !ignore && !pContext->recovering) { - SSTriggerCalcParam param = {.triggerTime = now, - .notifyType = (notifyOpen ? STRIGGER_EVENT_WINDOW_OPEN : STRIGGER_EVENT_WINDOW_NONE), - .extraNotifyContent = pWin->pWinOpenNotify}; - SSTriggerNotifyWindow win = *pWin; - if (pTask->triggerType != STREAM_TRIGGER_SLIDING) { - win.range.ekey = win.range.skey; + } else { + for (int32_t i = 0; i < TARRAY_SIZE(pContext->pParentWindows); i++) { + SSTriggerNotifyWindow *pWin = TARRAY_GET_ELEM(pContext->pParentWindows, i); + // check TRUE FOR condition + bool meetTrueFor = (pTrueForInfo == NULL) || (pTrueForInfo->duration == 0 && pTrueForInfo->count == 0) || + isTrueForSatisfied(pTrueForInfo, pWin->range.skey, + pWin->range.ekey & (~TRIGGER_GROUP_UNCLOSED_WINDOW_MASK), pWin->wrownum); + bool ignore = (pWin->range.skey <= pGroup->prevParentWinStart) || !meetTrueFor; + if ((calcOpen || notifyOpen) && !ignore && !pContext->recovering) { + SSTriggerCalcParam param = { + .triggerTime = now, + .notifyType = (notifyOpen ? STRIGGER_EVENT_WINDOW_OPEN : STRIGGER_EVENT_WINDOW_NONE), + .extraNotifyContent = pWin->pWinOpenNotify}; + SSTriggerNotifyWindow win = *pWin; + if (pTask->triggerType != STREAM_TRIGGER_SLIDING) { + win.range.ekey = win.range.skey; + } + code = stRealtimeGroupFillParam(pGroup, ¶m, &win); + QUERY_CHECK_CODE(code, lino, _end); + if (calcOpen) { + code = taosObjListAppend(&pGroup->pPendingParWinCalcParams, ¶m); + QUERY_CHECK_CODE(code, lino, _end); + pWin->pWinOpenNotify = NULL; + } else if (notifyOpen) { + void *px = taosArrayPush(pContext->pNotifyParams, ¶m); + QUERY_CHECK_NULL(px, code, lino, _end, terrno); + pWin->pWinOpenNotify = NULL; + } } - code = stRealtimeGroupFillParam(pGroup, ¶m, &win); - QUERY_CHECK_CODE(code, lino, _end); - if (calcOpen) { - code = taosObjListAppend(&pGroup->pPendingParWinCalcParams, ¶m); + if (!ignore) { + pGroup->prevParentWinStart = pWin->range.skey; + } + + ignore = (!meetTrueFor) || (pTask->ignoreNoDataTrigger && pWin->wrownum == 0); + if ((calcClose || notifyClose) && !ignore && !pContext->recovering) { + SSTriggerCalcParam param = { + .triggerTime = now, + .notifyType = (notifyClose ? STRIGGER_EVENT_WINDOW_CLOSE : STRIGGER_EVENT_WINDOW_NONE), + .extraNotifyContent = pWin->pWinCloseNotify}; + code = stRealtimeGroupFillParam(pGroup, ¶m, pWin); QUERY_CHECK_CODE(code, lino, _end); - pWin->pWinOpenNotify = NULL; - } else if (notifyOpen) { - void *px = taosArrayPush(pContext->pNotifyParams, ¶m); - QUERY_CHECK_NULL(px, code, lino, _end, terrno); - pWin->pWinOpenNotify = NULL; + if (calcClose) { + code = taosObjListAppend(&pGroup->pPendingParWinCalcParams, ¶m); + QUERY_CHECK_CODE(code, lino, _end); + pWin->pWinCloseNotify = NULL; + } else if (notifyClose) { + void *px = taosArrayPush(pContext->pNotifyParams, ¶m); + QUERY_CHECK_NULL(px, code, lino, _end, terrno); + pWin->pWinCloseNotify = NULL; + } } } - if (!ignore) { - pGroup->prevParentWinStart = pWin->range.skey; + if (pTask->triggerType == STREAM_TRIGGER_EVENT && pGroup->numSubWindows > 0) { + SSTriggerNotifyWindow *pWin = &pGroup->parentWindow; + // check TRUE FOR condition + bool meetTrueFor = (pTrueForInfo == NULL) || (pTrueForInfo->duration == 0 && pTrueForInfo->count == 0) || + isTrueForSatisfied(pTrueForInfo, pWin->range.skey, + pWin->range.ekey & (~TRIGGER_GROUP_UNCLOSED_WINDOW_MASK), pWin->wrownum); + bool ignore = (pWin->range.skey <= pGroup->prevParentWinStart) || !meetTrueFor; + if ((calcOpen || notifyOpen) && !ignore && !pContext->recovering) { + SSTriggerCalcParam param = { + .triggerTime = now, + .notifyType = (notifyOpen ? STRIGGER_EVENT_WINDOW_OPEN : STRIGGER_EVENT_WINDOW_NONE), + .extraNotifyContent = pWin->pWinOpenNotify}; + SSTriggerNotifyWindow win = *pWin; + if (pTask->triggerType != STREAM_TRIGGER_SLIDING) { + win.range.ekey = win.range.skey; + } + code = stRealtimeGroupFillParam(pGroup, ¶m, &win); + QUERY_CHECK_CODE(code, lino, _end); + if (calcOpen) { + code = taosObjListAppend(&pGroup->pPendingParWinCalcParams, ¶m); + QUERY_CHECK_CODE(code, lino, _end); + pWin->pWinOpenNotify = NULL; + } else if (notifyOpen) { + void *px = taosArrayPush(pContext->pNotifyParams, ¶m); + QUERY_CHECK_NULL(px, code, lino, _end, terrno); + pWin->pWinOpenNotify = NULL; + } + } + if (!ignore) { + pGroup->prevParentWinStart = pWin->range.skey; + } } } @@ -10667,6 +11282,10 @@ static int32_t stHistoryGroupInit(SSTriggerHistoryGroup *pGroup, SSTriggerHistor if (pTask->triggerType == STREAM_TRIGGER_STATE) { pGroup->pendingNullStart = INT64_MIN; + } else if (pTask->triggerType == STREAM_TRIGGER_EVENT) { + pGroup->activeLeafNodeId = -1; + pGroup->pEventParents = taosArrayInit(0, sizeof(SSTriggerNotifyWindow)); + QUERY_CHECK_NULL(pGroup->pEventParents, code, lino, _end, terrno); } code = taosObjListInit(&pGroup->pPendingParWinCalcParams, &pContext->calcParamPool); QUERY_CHECK_CODE(code, lino, _end); @@ -10700,6 +11319,10 @@ static void stHistoryGroupDestroy(void *ptr) { taosMemoryFreeClear(pGroup->stateVal.pData); } else if (pGroup->pContext->pTask->triggerType == STREAM_TRIGGER_EVENT) { stRealtimeContextDestroyWindow(&pGroup->parentWindow); + if (pGroup->pEventParents != NULL) { + taosArrayDestroyEx(pGroup->pEventParents, stRealtimeContextDestroyWindow); + pGroup->pEventParents = NULL; + } } taosObjListClearEx(&pGroup->pPendingParWinCalcParams, tDestroySSTriggerCalcParam); taosObjListClearEx(&pGroup->pPendingCalcParams, tDestroySSTriggerCalcParam); @@ -12075,8 +12698,8 @@ static int32_t stHistoryGroupDoEventCheck(SSTriggerHistoryGroup *pGroup) { // close previous sub-window since start condition index is changed if (pTask->notifyHistory && (pTask->notifyEventType & STRIGGER_EVENT_WINDOW_CLOSE)) { int32_t winIdx = pGroup->numSubWindows - 1; - code = - streamBuildEventNotifyContent(pDataBlock, pTask->histEndCondCols, r, 0, winIdx, &pExtraNotifyContent); + code = stTriggerTaskBuildEventNotify(pTask, pDataBlock, pTask->histEndCondCols, r, pGroup->conditionIdx - 1, + &pExtraNotifyContent); QUERY_CHECK_CODE(code, lino, _end); } code = stHistoryGroupCloseWindow(pGroup, &pExtraNotifyContent, false, false); @@ -12096,7 +12719,7 @@ static int32_t stHistoryGroupDoEventCheck(SSTriggerHistoryGroup *pGroup) { if (pGroup->numSubWindows == 0) { pGroup->parentWindow = (SSTriggerNotifyWindow){.range.skey = pTsData[r], .range.ekey = pTsData[r]}; if (pTask->notifyHistory && pTask->notifyEventType & STRIGGER_EVENT_WINDOW_OPEN) { - code = streamBuildEventNotifyContent(pDataBlock, pTask->histStartCondCols, r, ps[r] - 1, -1, + code = stTriggerTaskBuildEventNotify(pTask, pDataBlock, pTask->histStartCondCols, r, ps[r] - 1, &pGroup->parentWindow.pWinOpenNotify); QUERY_CHECK_CODE(code, lino, _end); } @@ -12110,7 +12733,7 @@ static int32_t stHistoryGroupDoEventCheck(SSTriggerHistoryGroup *pGroup) { if (checkSubEvent && pGroup->numSubWindows > 1) { winIdx = pGroup->numSubWindows - 1; } - code = streamBuildEventNotifyContent(pDataBlock, pTask->histStartCondCols, r, ps[r] - 1, winIdx, + code = stTriggerTaskBuildEventNotify(pTask, pDataBlock, pTask->histStartCondCols, r, ps[r] - 1, &pExtraNotifyContent); QUERY_CHECK_CODE(code, lino, _end); } @@ -12123,14 +12746,15 @@ static int32_t stHistoryGroupDoEventCheck(SSTriggerHistoryGroup *pGroup) { if (checkSubEvent && pGroup->numSubWindows > 1) { winIdx = pGroup->numSubWindows - 1; } - code = streamBuildEventNotifyContent(pDataBlock, pTask->histEndCondCols, r, 0, winIdx, &pExtraNotifyContent); + code = stTriggerTaskBuildEventNotify(pTask, pDataBlock, pTask->histEndCondCols, r, pGroup->conditionIdx - 1, + &pExtraNotifyContent); QUERY_CHECK_CODE(code, lino, _end); } code = stHistoryGroupCloseWindow(pGroup, &pExtraNotifyContent, false, false); QUERY_CHECK_CODE(code, lino, _end); if (checkSubEvent) { if (pTask->notifyHistory && (pTask->notifyEventType & STRIGGER_EVENT_WINDOW_CLOSE)) { - code = streamBuildEventNotifyContent(pDataBlock, pTask->histEndCondCols, r, 0, -1, + code = stTriggerTaskBuildEventNotify(pTask, pDataBlock, pTask->histEndCondCols, r, pGroup->conditionIdx - 1, &pGroup->parentWindow.pWinCloseNotify); QUERY_CHECK_CODE(code, lino, _end); } diff --git a/source/libs/new-stream/src/streamUtil.c b/source/libs/new-stream/src/streamUtil.c index 7c1410d9de2e..acc3bab3eca1 100755 --- a/source/libs/new-stream/src/streamUtil.c +++ b/source/libs/new-stream/src/streamUtil.c @@ -488,7 +488,7 @@ int32_t streamBuildIdleNotifyContent(ESTriggerEventType eventType, int64_t idleD } int32_t streamBuildEventNotifyContent(const SSDataBlock* pInputBlock, const SNodeList* pCondCols, int32_t rowIdx, - int32_t condIdx, int32_t winIdx, char** ppContent) { + const char* conditionPath, int32_t conditionIndex, char** ppContent) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; const SNode* pNode = NULL; @@ -510,14 +510,14 @@ int32_t streamBuildEventNotifyContent(const SSDataBlock* pInputBlock, const SNod cond = cJSON_CreateObject(); QUERY_CHECK_NULL(cond, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY); - JSON_CHECK_ADD_ITEM(cond, "conditionIndex", cJSON_CreateNumber(condIdx)); + JSON_CHECK_ADD_ITEM(cond, "conditionPath", cJSON_CreateString(conditionPath != NULL ? conditionPath : "0")); + JSON_CHECK_ADD_ITEM(cond, "conditionIndex", cJSON_CreateNumber(conditionIndex)); JSON_CHECK_ADD_ITEM(cond, "fieldValues", fields); fields = NULL; obj = cJSON_CreateObject(); QUERY_CHECK_NULL(obj, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY); JSON_CHECK_ADD_ITEM(obj, "triggerCondition", cond); - JSON_CHECK_ADD_ITEM(obj, "windowIndex", cJSON_CreateNumber(winIdx)); cond = NULL; *ppContent = cJSON_PrintUnformatted(obj); diff --git a/source/libs/parser/inc/sql.y b/source/libs/parser/inc/sql.y index d8864b8e7466..7b5784512f06 100755 --- a/source/libs/parser/inc/sql.y +++ b/source/libs/parser/inc/sql.y @@ -1598,17 +1598,19 @@ stream_trigger(A) ::= trigger_type(B) trigger_table_opt(C) stream_partition_by_o trigger_type(A) ::= SESSION NK_LP column_reference(B) NK_COMMA interval_sliding_duration_literal(C) NK_RP. { A = createSessionWindowNode(pCxt, releaseRawExprNode(pCxt, B), releaseRawExprNode(pCxt, C)); } trigger_type(A) ::= STATE_WINDOW NK_LP expr_or_subquery(B) state_window_opt(C) NK_RP true_for_opt(D). { A = createStateWindowNode(pCxt, releaseRawExprNode(pCxt, B), C, D); } trigger_type(A) ::= interval_opt(B) SLIDING NK_LP sliding_expr(C) NK_RP. { A = createIntervalWindowNodeExt(pCxt, B, C); } -trigger_type(A) ::= EVENT_WINDOW NK_LP START WITH search_condition(B) END WITH search_condition(C) NK_RP true_for_opt(D). { A = createEventWindowNode(pCxt, B, C, D); } trigger_type(A) ::= COUNT_WINDOW NK_LP count_window_args(B) NK_RP. { A = createCountWindowNodeFromArgs(pCxt, B); } trigger_type(A) ::= PERIOD NK_LP interval_sliding_duration_literal(B) offset_opt(C) NK_RP. { A = createPeriodWindowNode(pCxt, releaseRawExprNode(pCxt, B), C); } -trigger_type(A) ::= EVENT_WINDOW NK_LP START WITH NK_LP search_condition_list(B) NK_RP - END WITH search_condition(C) NK_RP true_for_opt(D). { A = createEventWindowNode(pCxt, createNodeListNode(pCxt, B), C, D); } -trigger_type(A) ::= EVENT_WINDOW NK_LP START WITH NK_LP search_condition_list(B) NK_RP NK_RP true_for_opt(D). { A = createEventWindowNode(pCxt, createNodeListNode(pCxt, B), NULL, D); } - -%type search_condition_list { SNodeList* } -%destructor search_condition_list { nodesDestroyList($$); } -search_condition_list(A) ::= search_condition(B) NK_COMMA search_condition(C). { A = addNodeToList(pCxt, createNodeList(pCxt, B), C); } -search_condition_list(A) ::= search_condition_list(B) NK_COMMA search_condition(C). { A = addNodeToList(pCxt, B, C); } +trigger_type(A) ::= EVENT_WINDOW NK_LP START WITH start_event_item(B) END WITH search_condition(C) NK_RP true_for_opt(D). { A = createEventWindowNode(pCxt, B, C, D); } +trigger_type(A) ::= EVENT_WINDOW NK_LP START WITH start_event_item(B) NK_RP true_for_opt(D). { A = createEventWindowNode(pCxt, B, NULL, D); } + +%type start_event_item { SNode* } +%destructor start_event_item { nodesDestroyNode($$); } +%type start_event_item_group { SNodeList* } +%destructor start_event_item_group { nodesDestroyList($$); } +start_event_item(A) ::= search_condition(B). { A = B; } +start_event_item(A) ::= NK_LP start_event_item_group(B) NK_RP. { A = createNodeListNode(pCxt, B); } +start_event_item_group(A) ::= start_event_item(B) NK_COMMA start_event_item(C). { A = addNodeToList(pCxt, createNodeList(pCxt, B), C); } +start_event_item_group(A) ::= start_event_item_group(B) NK_COMMA start_event_item(C). { A = addNodeToList(pCxt, B, C); } interval_opt(A) ::= . { A = NULL; } interval_opt(A) ::= INTERVAL NK_LP interval_sliding_duration_literal(C) NK_RP. { A = createIntervalWindowNode(pCxt, releaseRawExprNode(pCxt, C), NULL, NULL, NULL); } @@ -2246,6 +2248,7 @@ pseudo_column(A) ::= TWROWNUM(B). pseudo_column(A) ::= TPREV_LOCALTIME(B). { A = createRawExprNode(pCxt, &B, createFunctionNode(pCxt, &B, NULL)); } pseudo_column(A) ::= TNEXT_LOCALTIME(B). { A = createRawExprNode(pCxt, &B, createFunctionNode(pCxt, &B, NULL)); } pseudo_column(A) ::= TLOCALTIME(B). { A = createRawExprNode(pCxt, &B, createFunctionNode(pCxt, &B, NULL)); } +pseudo_column(A) ::= EVENT_CONDITION_PATH(B). { A = createRawExprNode(pCxt, &B, createFunctionNode(pCxt, &B, NULL)); } pseudo_column(A) ::= TGRPID(B). { A = createRawExprNode(pCxt, &B, createFunctionNode(pCxt, &B, NULL)); } pseudo_column(A) ::= NK_PH NK_INTEGER(B). { A = createRawExprNode(pCxt, &B, createPlaceHolderColumnNode(pCxt, createValueNode(pCxt, TSDB_DATA_TYPE_BIGINT, &B))); } pseudo_column(A) ::= NK_PH TBNAME(B). { A = createRawExprNode(pCxt, &B, createPHTbnameFunctionNode(pCxt, &B, NULL)); } diff --git a/source/libs/parser/src/parTokenizer.c b/source/libs/parser/src/parTokenizer.c index 1b0c6b5cbf03..f41ff477cc54 100644 --- a/source/libs/parser/src/parTokenizer.c +++ b/source/libs/parser/src/parTokenizer.c @@ -417,6 +417,7 @@ static SKeyword keywordTable[] = { {"_TPREV_LOCALTIME", TK_TPREV_LOCALTIME}, {"_TNEXT_LOCALTIME", TK_TNEXT_LOCALTIME}, {"_TLOCALTIME", TK_TLOCALTIME}, + {"_EVENT_CONDITION_PATH", TK_EVENT_CONDITION_PATH}, {"_TGRPID", TK_TGRPID}, {"ALIVE", TK_ALIVE}, {"VARBINARY", TK_VARBINARY}, diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 94833ed3d2fd..09e0d403a793 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -3963,6 +3963,12 @@ static EDealRes translatePlaceHolderFunc(STranslateContext* pCxt, SNode** pFunc) PAR_ERR_JRET(nodesMakeValueNodeFromTimestamp(0, &extraValue)); break; } + case FUNCTION_TYPE_EVENT_CONDITION_PATH: { + BIT_FLAG_SET_MASK(pCxt->streamInfo.placeHolderBitmap, PLACE_HOLDER_EVENT_CONDITION_PATH); + PAR_ERR_JRET(nodesMakeValueNodeFromString("", (SValueNode**)&extraValue)); + ((SValueNode*)extraValue)->node.resType.bytes = TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE; + break; + } case FUNCTION_TYPE_TGRPID: { BIT_FLAG_SET_MASK(pCxt->streamInfo.placeHolderBitmap, PLACE_HOLDER_GRPID); PAR_ERR_JRET(nodesMakeValueNodeFromInt64(0, &extraValue)); @@ -9560,6 +9566,11 @@ static int32_t checkWindowsConditonValid(SEventWindowNode* pEventWindowNode) { return code; } +static bool eventWindowHasSubEvents(const SEventWindowNode* pEventWindowNode) { + return pEventWindowNode != NULL && pEventWindowNode->pStartCond != NULL && + nodeType(pEventWindowNode->pStartCond) == QUERY_NODE_NODE_LIST; +} + static int32_t checkEventWindow(STranslateContext* pCxt, SEventWindowNode* pEvent) { PAR_ERR_RET(checkTrueForLimit(pCxt, pEvent->pTrueForLimit)); PAR_RET(checkWindowsConditonValid(pEvent)); @@ -18015,7 +18026,8 @@ static int32_t createStreamReqBuildTriggerAst(STranslateContext* pCxt, SCreateSt } static int32_t createStreamReqCheckPlaceHolder(STranslateContext* pCxt, SCMCreateStreamReq* pReq, - int32_t placeHolderBitmap, SNodeList* pTriggerPartition) { + int32_t placeHolderBitmap, SNodeList* pTriggerPartition, + SNode* pTriggerWindow) { int32_t code = TSDB_CODE_SUCCESS; bool hasIdleResumeEvent = (pReq->eventTypes & (EVENT_IDLE | EVENT_RESUME)) != 0; if (BIT_FLAG_TEST_MASK(pReq->placeHolderBitmap, PLACE_HOLDER_CURRENT_TS) || @@ -18086,6 +18098,18 @@ static int32_t createStreamReqCheckPlaceHolder(STranslateContext* pCxt, SCMCreat } } + if (BIT_FLAG_TEST_MASK(pReq->placeHolderBitmap, PLACE_HOLDER_EVENT_CONDITION_PATH)) { + if (pReq->triggerType != WINDOW_TYPE_EVENT) { + PAR_ERR_JRET(generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_STREAM_INVALID_PLACE_HOLDER, + "_event_condition_path can only be used in event window")); + } + if (pTriggerWindow == NULL || nodeType(pTriggerWindow) != QUERY_NODE_EVENT_WINDOW || + !eventWindowHasSubEvents((SEventWindowNode*)pTriggerWindow)) { + PAR_ERR_JRET(generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_STREAM_INVALID_PLACE_HOLDER, + "_event_condition_path requires event window sub-events")); + } + } + return code; _return: parserError("createStreamReqCheckPlaceHolder failed, code:%d", code); @@ -18112,7 +18136,8 @@ static int32_t createStreamReqBuildTriggerPlan(STranslateContext* pCxt, SCreateS PAR_ERR_JRET(createStreamReqBuildTriggerBuildPlan(pCxt, *pTriggerSelect, pReq, pTriggerSlotHash, pTriggerWindow, pTriggerPartition)); PAR_ERR_JRET(createStreamReqBuildTriggerBuildWindowInfo(pCxt, pTriggerWindow, pReq)); - PAR_ERR_JRET(createStreamReqCheckPlaceHolder(pCxt, pReq, pReq->placeHolderBitmap, pTriggerPartition)); + PAR_ERR_JRET(createStreamReqCheckPlaceHolder(pCxt, pReq, pReq->placeHolderBitmap, pTriggerPartition, + pTriggerWindow)); PAR_ERR_JRET(createStreamReqSetDefaultTag(pCxt, pStmt, pTriggerPartition, pReq)); _return: diff --git a/source/libs/parser/test/parStreamTest.cpp b/source/libs/parser/test/parStreamTest.cpp index c884f58a4b3f..274c22a8b681 100644 --- a/source/libs/parser/test/parStreamTest.cpp +++ b/source/libs/parser/test/parStreamTest.cpp @@ -2202,4 +2202,85 @@ TEST_F(ParserStreamTest, TestIdleTimeoutValidation) { TSDB_CODE_STREAM_INVALID_PLACE_HOLDER); } +TEST_F(ParserStreamTest, TestNestedEventWindowStartCondition) { + setAsyncFlag("-1"); + useDb("root", "stream_streamdb"); + + setCheckDdlFunc([&](const SQuery* pQuery, ParserStage stage) { + ASSERT_EQ(stage, PARSER_STAGE_TRANSLATE); + ASSERT_EQ(nodeType(pQuery->pRoot), QUERY_NODE_CREATE_STREAM_STMT); + + SCMCreateStreamReq req = {0}; + ASSERT_EQ(TSDB_CODE_SUCCESS, + tDeserializeSCMCreateStreamReq(pQuery->pCmdMsg->pMsg, pQuery->pCmdMsg->msgLen, &req)); + ASSERT_EQ(req.triggerType, WINDOW_TYPE_EVENT); + ASSERT_NE(req.trigger.event.startCond, nullptr); + + SNode* pStartCond = nullptr; + ASSERT_EQ(TSDB_CODE_SUCCESS, nodesStringToNode((char*)req.trigger.event.startCond, &pStartCond)); + ASSERT_EQ(nodeType(pStartCond), QUERY_NODE_NODE_LIST); + + SNodeList* pRootList = ((SNodeListNode*)pStartCond)->pNodeList; + ASSERT_EQ(LIST_LENGTH(pRootList), 2); + + SNode* pFirst = nodesListGetNode(pRootList, 0); + SNode* pSecond = nodesListGetNode(pRootList, 1); + ASSERT_EQ(nodeType(pFirst), QUERY_NODE_NODE_LIST); + ASSERT_NE(nodeType(pSecond), QUERY_NODE_NODE_LIST); + + SNodeList* pNestedList = ((SNodeListNode*)pFirst)->pNodeList; + ASSERT_EQ(LIST_LENGTH(pNestedList), 2); + ASSERT_NE(nodeType(nodesListGetNode(pNestedList, 0)), QUERY_NODE_NODE_LIST); + ASSERT_NE(nodeType(nodesListGetNode(pNestedList, 1)), QUERY_NODE_NODE_LIST); + + nodesDestroyNode(pStartCond); + tFreeSCMCreateStreamReq(&req); + }); + + run("create stream stream_streamdb.s_nested " + "event_window(start with ((c1 > 1, c1 > 2), c2 < 3) end with c2 < 1) " + "from stream_triggerdb.stream_t1 into stream_outdb.stream_out " + "as select _tlocaltime, avg(c1) from stream_querydb.stream_t2"); + + run("create stream stream_streamdb.s_nested_bad " + "event_window(start with ((c1 > 1,), c2 < 3) end with c2 < 1) " + "from stream_triggerdb.stream_t1 into stream_outdb.stream_out " + "as select _tlocaltime, avg(c1) from stream_querydb.stream_t2", + TSDB_CODE_PAR_SYNTAX_ERROR, PARSER_STAGE_PARSE); +} + +TEST_F(ParserStreamTest, TestEventConditionPathPlaceholder) { + setAsyncFlag("-1"); + useDb("root", "stream_streamdb"); + + setCheckDdlFunc([&](const SQuery* pQuery, ParserStage stage) { + ASSERT_EQ(stage, PARSER_STAGE_TRANSLATE); + ASSERT_EQ(nodeType(pQuery->pRoot), QUERY_NODE_CREATE_STREAM_STMT); + + SCMCreateStreamReq req = {0}; + ASSERT_EQ(TSDB_CODE_SUCCESS, + tDeserializeSCMCreateStreamReq(pQuery->pCmdMsg->pMsg, pQuery->pCmdMsg->msgLen, &req)); + ASSERT_EQ(req.triggerType, WINDOW_TYPE_EVENT); + ASSERT_NE(req.calcPlan, nullptr); + ASSERT_NE(std::string((char*)req.calcPlan).find("_event_condition_path"), std::string::npos); + tFreeSCMCreateStreamReq(&req); + }); + + run("create stream stream_streamdb.s_path_ok " + "event_window(start with ((c1 > 1, c1 > 2), c2 < 3) end with c2 < 1) " + "from stream_triggerdb.stream_t1 into stream_outdb.stream_out " + "as select _tlocaltime, _event_condition_path from %%trows"); + + run("create stream stream_streamdb.s_path_bad_single " + "event_window(start with c1 > 1 end with c2 < 1) " + "from stream_triggerdb.stream_t1 into stream_outdb.stream_out " + "as select _tlocaltime, _event_condition_path from %%trows", + TSDB_CODE_STREAM_INVALID_PLACE_HOLDER); + + run("create stream stream_streamdb.s_path_bad_interval " + "interval(1s) sliding(1s) from stream_triggerdb.stream_t1 into stream_outdb.stream_out " + "as select _tlocaltime, _event_condition_path from stream_querydb.stream_t2", + TSDB_CODE_STREAM_INVALID_PLACE_HOLDER); +} + } // namespace ParserTest diff --git a/source/libs/scalar/src/scalar.c b/source/libs/scalar/src/scalar.c index 0e010a93b877..8a9413bea145 100644 --- a/source/libs/scalar/src/scalar.c +++ b/source/libs/scalar/src/scalar.c @@ -801,6 +801,9 @@ int32_t sclInitParam(SNode *node, SScalarParam *param, SScalarCtx *ctx, int32_t } +static int32_t sclSetVarDataCString(SColumnInfoData* pResColData, uint32_t rowIndex, const char* pVal); +static int32_t sclSetVarDataCStringN(SColumnInfoData* pResColData, uint32_t currentRow, uint32_t numOfRows, + const char* pVal); int32_t sclSetStreamExtWinParam(int32_t funcId, SNodeList* pParamNodes, SScalarParam* res, SScalarCtx *pCtx) { int32_t code = 0; @@ -857,6 +860,9 @@ int32_t sclSetStreamExtWinParam(int32_t funcId, SNodeList* pParamNodes, SScalarP case FUNCTION_TYPE_TGRPID: ((int64_t*)res->columnData->pData)[i] = pInfo->groupId; break; + case FUNCTION_TYPE_EVENT_CONDITION_PATH: + SCL_ERR_RET(sclSetVarDataCString(res->columnData, i, pParams->conditionPath)); + break; case FUNCTION_TYPE_TIDLESTART: ((int64_t*)res->columnData->pData)[i] = pParams->idlestart; break; @@ -872,6 +878,46 @@ int32_t sclSetStreamExtWinParam(int32_t funcId, SNodeList* pParamNodes, SScalarP return code; } +static int32_t sclSetVarDataCString(SColumnInfoData* pResColData, uint32_t rowIndex, const char* pVal) { + if (pVal == NULL) { + return colDataSetVal(pResColData, rowIndex, NULL, true); + } + + size_t len = strlen(pVal); + char* buf = taosMemoryCalloc(len + VARSTR_HEADER_SIZE, 1); + if (buf == NULL) { + return terrno; + } + *(VarDataLenT*)buf = (VarDataLenT)len; + if (len > 0) { + (void)memcpy(varDataVal(buf), pVal, len); + } + int32_t code = colDataSetVal(pResColData, rowIndex, buf, false); + taosMemoryFreeClear(buf); + return code; +} + +static int32_t sclSetVarDataCStringN(SColumnInfoData* pResColData, uint32_t currentRow, uint32_t numOfRows, + const char* pVal) { + if (pVal == NULL) { + colDataSetNItemsNull(pResColData, currentRow, numOfRows); + return TSDB_CODE_SUCCESS; + } + + size_t len = strlen(pVal); + char* buf = taosMemoryCalloc(len + VARSTR_HEADER_SIZE, 1); + if (buf == NULL) { + return terrno; + } + *(VarDataLenT*)buf = (VarDataLenT)len; + if (len > 0) { + (void)memcpy(varDataVal(buf), pVal, len); + } + int32_t code = colDataSetNItems(pResColData, currentRow, buf, numOfRows, 1, false); + taosMemoryFreeClear(buf); + return code; +} + static int32_t sclAssignExternalWindowColumnRes(SColumnInfoData* pResColData, int64_t offset, int64_t rows, SSTriggerCalcParam *pParams, SNode* pParamNode) { if (pParamNode == NULL || nodeType(pParamNode) != QUERY_NODE_VALUE) { @@ -949,6 +995,8 @@ int32_t scalarAssignPlaceHolderRes(SColumnInfoData* pResColData, int64_t offset, case FUNCTION_TYPE_TGRPID: pData = &pInfo->groupId; break; + case FUNCTION_TYPE_EVENT_CONDITION_PATH: + return sclSetVarDataCStringN(pResColData, offset, rows, pParams->conditionPath); case FUNCTION_TYPE_PLACEHOLDER_TBNAME: { // find tbname from stream part col vals char buf[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0}; diff --git a/test/cases/18-StreamProcessing/03-TriggerMode/test_event_new.py b/test/cases/18-StreamProcessing/03-TriggerMode/test_event_new.py index 8c4bc60c70d4..4b15c6db7e43 100644 --- a/test/cases/18-StreamProcessing/03-TriggerMode/test_event_new.py +++ b/test/cases/18-StreamProcessing/03-TriggerMode/test_event_new.py @@ -44,6 +44,7 @@ def test_stream_event_trigger(self): streams.append(self.Basic8()) streams.append(self.Basic9()) streams.append(self.Basic10()) + streams.append(self.Basic11()) tdStream.checkAll(streams) @@ -2999,3 +3000,40 @@ def check3(self): and tdSql.compareData(9, 1, "2025-01-01 00:00:38") and tdSql.compareData(9, 2, 4) ) + + class Basic11(StreamCheckItem): + def __init__(self): + self.db = "sdb11" + self.stbName = "stb" + + def create(self): + tdSql.execute(f"create database {self.db} vgroups 1 buffer 8") + tdSql.execute(f"use {self.db}") + tdSql.execute( + f"create table if not exists {self.stbName} (cts timestamp, c1 int, c2 int) tags (tint int)" + ) + tdSql.execute("create table ct1 using stb tags(1)") + tdSql.execute( + "create stream s_path " + "event_window(start with ((c1 >= 90, c1 >= 60), c2 < 60) end with c2 < 50) " + "from ct1 into res_path(startts, path, cnt) " + "as select _twstart, _event_condition_path, count(*) from %%trows;" + ) + + def insert1(self): + tdSql.executes([ + "insert into ct1 values ('2025-01-01 00:00:00', 95, 95);", + "insert into ct1 values ('2025-01-01 00:00:01', 65, 65);", + "insert into ct1 values ('2025-01-01 00:00:02', 40, 40);", + "insert into ct1 values ('2025-01-01 00:00:03', 70, 70);", + "insert into ct1 values ('2025-01-01 00:00:04', 45, 45);", + ]) + + def check1(self): + tdSql.checkResultsByFunc( + sql=f"select distinct path from {self.db}.res_path order by path", + func=lambda: tdSql.getRows() == 3 + and tdSql.compareData(0, 0, "0") + and tdSql.compareData(1, 0, "0.1") + and tdSql.compareData(2, 0, "1"), + )