Skip to content

fix(stream): handle notify output for nested diff external window#35146

Open
JinqingKuang wants to merge 2 commits intomainfrom
fix/stream-notify-crash
Open

fix(stream): handle notify output for nested diff external window#35146
JinqingKuang wants to merge 2 commits intomainfrom
fix/stream-notify-crash

Conversation

@JinqingKuang
Copy link
Copy Markdown
Contributor

Description

Keep the stream runner from crashing when external-window output row indexes are cleared before nested diff returns its final block.

Issue(s)

Checklist

Please check the items in the checklist if applicable.

  • Is the user manual updated?
  • Are the test cases passed and automated?
  • Is there no significant decrease in test coverage?

Copilot AI review requested due to automatic review settings April 15, 2026 05:53
@JinqingKuang JinqingKuang requested a review from a team as a code owner April 15, 2026 05:53
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Prevents stream runner crashes when handling notify output for external-window queries where the per-window row-index metadata is cleared before a nested diff returns its final output block.

Changes:

  • Add an early-return condition in the projection/indefinite-function path to avoid consuming additional downstream blocks after producing a result block for external-window streams.
  • Harden stream-runner external-window output handling to tolerate missing pStreamBlkWinIdx in a specific “final window” scenario.
  • Add a regression test case (Basic17) to reproduce the nested diff + notify crash scenario.

Reviewed changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated no comments.

File Description
test/cases/18-StreamProcessing/05-Notify/test_notify.py Adds Basic17 regression case to exercise external-window notify with nested diff.
source/libs/new-stream/src/streamRunner.c Handles output blocks even when external-window row-index array is missing/cleared, avoiding runner crashes.
source/libs/executor/src/projectoperator.c Breaks out as soon as a result block is produced for external-window streams to keep stream runtime state aligned with the returned block.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces early exit logic for external-window outputs in the project operator and refactors the stream runner to handle scenarios where external-window row indices are missing. Additionally, a new test case has been added to verify the fix and prevent future regressions. A review comment suggests refactoring duplicated logic in the stream runner to improve maintainability and readability.

Comment thread source/libs/new-stream/src/streamRunner.c
@JinqingKuang JinqingKuang force-pushed the fix/stream-notify-crash branch from 75c7087 to 12e413e Compare April 15, 2026 11:44
Copilot AI review requested due to automatic review settings April 16, 2026 03:03
@JinqingKuang JinqingKuang force-pushed the fix/stream-notify-crash branch from 12e413e to 784de72 Compare April 16, 2026 03:03
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 4 out of 4 changed files in this pull request and generated 2 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread source/libs/executor/src/streamexternalwindowoperator.c Outdated
Comment thread source/libs/new-stream/src/streamRunner.c
@JinqingKuang
Copy link
Copy Markdown
Contributor Author

代码审查报告

修复目标:嵌套 diff 外部窗口输出时,pStreamBlkWinIdx 被提前清空导致 stream runner 崩溃。
整体方向正确,核心逻辑可信。以下是经过验证的问题。


[中] 类型窄化:rowsBefore 应声明为 int64_t

位置source/libs/executor/src/streamexternalwindowoperator.cextWinIndefRowsDo

int32_t rowsBefore 接收 pResBlock->info.rows(类型 int64_t,见 tcommon.h:212),存在隐式截断。

修复后调用:

extWinAppendWinIdx(..., pResBlock->info.rows - rowsBefore);

pResBlock->info.rows - rowsBefore 产生 int64_t 结果,而 extWinAppendWinIdx 最后一个参数为 int32_t,再次截断。extWinAppendWinIdx 内部计算 *pRowIdx = pBlock->info.rows - rows,若 delta 溢出则窗口起始行偏移记录错误。

在正常 stream 场景下块行数不会超过 INT32_MAX,实际不会触发,但属于类型层面的错误。

建议:将声明改为 int64_t rowsBefore = 0;,并对 extWinAppendWinIdx 的调用处加显式转换。


[中] allProcessByRowCtxShareFuncId 的检查范围不匹配

位置source/libs/executor/src/projectoperator.cprojectApplyFunctionsWithSelect

bool splitByExternalWindow = ... &&
    allProcessByRowCtxShareFuncId(processByRowFunctionCtx);  // ← 检查外层全量数组

processByRowFunctionCtx 是当前投影中所有 processByRow 函数 ctx 的全量数组(可能跨越多个 functionId)。但实际执行时是按 funcId 分组依次处理,每次进入 processByRowInExternalWindowspGroupedCtxArray 已经是同类函数的子集。

问题:若一个外部窗口查询同时包含两种 processByRow 函数(如 diff(a)lead(b, 1)),allProcessByRowCtxShareFuncId 对全量数组返回 false,导致 两组函数都退回旧路径(不做分窗口重置)。旧路径在此场景下与这次 crash 触发方式相同,理论上仍可能崩溃。

当前 PR 描述的复现场景为纯嵌套 diffallProcessByRowCtxShareFuncId 返回 true,修复有效。但建议在注释中说明此限制,或将检查粒度下移至 pGroupedCtxArray 级别。


[低] 错误路径下 pStreamBlkWinIdx 处于半重建状态

位置source/libs/executor/src/projectoperator.cprocessByRowInExternalWindows

函数开始时先 taosArrayClear(pStreamInfo->pStreamBlkWinIdx),再逐窗口重建。若循环中某次 resetProcessByRowCtxprocessFuncByRow 失败并跳到 _exitpStreamBlkWinIdx 此时已清空原有内容,仅含前 i 个窗口的新条目。_exit 段不会从 pInputWinIdx 恢复原数组。

TDengine 的错误处理约定为失败即放弃任务,上层拿到错误码后通常触发任务重建,因此不会产生静默错误。但若未来上层加入重试逻辑,半损坏的索引可能导致难以排查的问题。

建议:在 _exit 段当 code != TSDB_CODE_SUCCESS 时,从 pInputWinIdx 恢复 pStreamBlkWinIdx 的原始内容。


总体结论

# 问题 严重度 置信度 可发布
1 rowsBefore 类型窄化(int32_tint64_t inline
2 allProcessByRowCtxShareFuncId 检查外层全量数组,混合函数类型场景未被修复 summary
3 错误路径下 pStreamBlkWinIdx 半重建状态未恢复 summary

核心修复逻辑(extWinIndefRowsDo 传实际输出行数而非输入行数、stRunnerTopTaskHandleExternalWinOutputBlock 的空索引安全处理)验证正确,测试用例 Basic17 覆盖了描述的崩溃路径。

Comment thread source/libs/executor/src/streamexternalwindowoperator.c
Apply -Wa,--noexecstack to fast-lzma2 in both the external Makefile patch and the Conan build path.

Co-authored-by: OpenAI Codex (GPT-5) <noreply@openai.com>
@JinqingKuang JinqingKuang force-pushed the fix/stream-notify-crash branch from 784de72 to 99432ee Compare April 17, 2026 06:35
Keep the stream runner from crashing when external-window output row
indexes are cleared before nested diff returns its final block.

Co-authored-by: GPT-5 Codex <noreply@openai.com>
Copilot AI review requested due to automatic review settings April 17, 2026 09:36
@JinqingKuang JinqingKuang force-pushed the fix/stream-notify-crash branch from 99432ee to 4220ccf Compare April 17, 2026 09:36
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 6 out of 6 changed files in this pull request and generated 7 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +181 to +183
int32_t* pCurrPair = (int32_t*)pCurr;
int32_t winIdx = pCurrPair[0];
int32_t rowStart = pCurrPair[1];
Copy link

Copilot AI Apr 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This packs/unpacks an int64_t via int32_t* type-punning, which is undefined behavior under C strict-aliasing rules and also bakes in little-endian layout assumptions. To make this safe and portable, encode/decode the (winIdx,rowStart) pair using bitwise operations (mask/shift) on a uint64_t (like the existing idx & 0xFFFFFFFF / idx >> 32 pattern in the runner), or use memcpy into/from a packed struct.

Copilot uses AI. Check for mistakes.
Comment on lines +210 to +213
int64_t val = 0;
int32_t* pOutPair = (int32_t*)&val;
pOutPair[0] = winIdx;
pOutPair[1] = totalRows;
Copy link

Copilot AI Apr 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This packs/unpacks an int64_t via int32_t* type-punning, which is undefined behavior under C strict-aliasing rules and also bakes in little-endian layout assumptions. To make this safe and portable, encode/decode the (winIdx,rowStart) pair using bitwise operations (mask/shift) on a uint64_t (like the existing idx & 0xFFFFFFFF / idx >> 32 pattern in the runner), or use memcpy into/from a packed struct.

Copilot uses AI. Check for mistakes.
SExternalWindowOperator* pExtW = pOperator->info;
SSDataBlock* pResBlock = NULL;
SArray* pIdx = NULL;
int64_t rowsBefore = 0;
Copy link

Copilot AI Apr 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new rowsBefore delta is computed as an int64_t and then cast to int32_t without validation. If pResBlock->info.rows ever decreases (buffer reuse/reset) or the delta exceeds INT32_MAX, this can become negative/truncated and corrupt window-row index accounting. Recommend computing an int64_t delta = pResBlock->info.rows - rowsBefore; and explicitly guarding delta >= 0 && delta <= INT32_MAX before casting/passing into extWinAppendWinIdx, otherwise return an error.

Copilot uses AI. Check for mistakes.
int32_t code = TSDB_CODE_SUCCESS, lino = 0;

TAOS_CHECK_EXIT(extWinGetSetWinResBlockBuf(pOperator, rows, pWin, &pResBlock, &pIdx));
rowsBefore = pResBlock->info.rows;
Copy link

Copilot AI Apr 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new rowsBefore delta is computed as an int64_t and then cast to int32_t without validation. If pResBlock->info.rows ever decreases (buffer reuse/reset) or the delta exceeds INT32_MAX, this can become negative/truncated and corrupt window-row index accounting. Recommend computing an int64_t delta = pResBlock->info.rows - rowsBefore; and explicitly guarding delta >= 0 && delta <= INT32_MAX before casting/passing into extWinAppendWinIdx, otherwise return an error.

Copilot uses AI. Check for mistakes.
Comment on lines +1696 to +1698
TAOS_CHECK_EXIT(extWinAppendWinIdx(pOperator->pTaskInfo, pIdx, pResBlock,
extWinGetCurWinIdx(pOperator->pTaskInfo),
(int32_t)(pResBlock->info.rows - rowsBefore)));
Copy link

Copilot AI Apr 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new rowsBefore delta is computed as an int64_t and then cast to int32_t without validation. If pResBlock->info.rows ever decreases (buffer reuse/reset) or the delta exceeds INT32_MAX, this can become negative/truncated and corrupt window-row index accounting. Recommend computing an int64_t delta = pResBlock->info.rows - rowsBefore; and explicitly guarding delta >= 0 && delta <= INT32_MAX before casting/passing into extWinAppendWinIdx, otherwise return an error.

Copilot uses AI. Check for mistakes.

# Build make command
cflags = "-Wall -O2 -pthread"
asflags = "-Wa,--noexecstack"
Copy link

Copilot AI Apr 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

-Wa,--noexecstack is generally ELF/GNU-as specific; unconditionally adding it in the Conan recipe (and in cmake/in/lzma2.Makefile) can break builds on non-Linux platforms or toolchains that don't recognize the flag. Consider applying this flag conditionally (e.g., only for Linux/ELF targets and compatible compilers/assemblers), or detecting support before setting ASFLAGS.

Copilot uses AI. Check for mistakes.
# Execute make compilation
self.run(
f'make CFLAGS="{cflags}" CC={self.settings.get_safe("compiler", default="gcc")} libfast-lzma2',
f'make CFLAGS="{cflags}" ASFLAGS="{asflags}" CC={self.settings.get_safe("compiler", default="gcc")} libfast-lzma2',
Copy link

Copilot AI Apr 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

-Wa,--noexecstack is generally ELF/GNU-as specific; unconditionally adding it in the Conan recipe (and in cmake/in/lzma2.Makefile) can break builds on non-Linux platforms or toolchains that don't recognize the flag. Consider applying this flag conditionally (e.g., only for Linux/ELF targets and compatible compilers/assemblers), or detecting support before setting ASFLAGS.

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants