fix(stream): handle notify output for nested diff external window#35146
fix(stream): handle notify output for nested diff external window#35146JinqingKuang wants to merge 2 commits intomainfrom
Conversation
There was a problem hiding this comment.
Pull request overview
Prevents stream runner crashes when handling notify output for external-window queries where the per-window row-index metadata is cleared before a nested diff returns its final output block.
Changes:
- Add an early-return condition in the projection/indefinite-function path to avoid consuming additional downstream blocks after producing a result block for external-window streams.
- Harden stream-runner external-window output handling to tolerate missing
pStreamBlkWinIdxin a specific “final window” scenario. - Add a regression test case (
Basic17) to reproduce the nesteddiff+ notify crash scenario.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated no comments.
| File | Description |
|---|---|
| test/cases/18-StreamProcessing/05-Notify/test_notify.py | Adds Basic17 regression case to exercise external-window notify with nested diff. |
| source/libs/new-stream/src/streamRunner.c | Handles output blocks even when external-window row-index array is missing/cleared, avoiding runner crashes. |
| source/libs/executor/src/projectoperator.c | Breaks out as soon as a result block is produced for external-window streams to keep stream runtime state aligned with the returned block. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Code Review
This pull request introduces early exit logic for external-window outputs in the project operator and refactors the stream runner to handle scenarios where external-window row indices are missing. Additionally, a new test case has been added to verify the fix and prevent future regressions. A review comment suggests refactoring duplicated logic in the stream runner to improve maintainability and readability.
75c7087 to
12e413e
Compare
12e413e to
784de72
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 4 out of 4 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
代码审查报告
[中] 类型窄化:
|
| # | 问题 | 严重度 | 置信度 | 可发布 |
|---|---|---|---|---|
| 1 | rowsBefore 类型窄化(int32_t → int64_t) |
中 | 高 | inline |
| 2 | allProcessByRowCtxShareFuncId 检查外层全量数组,混合函数类型场景未被修复 |
中 | 高 | summary |
| 3 | 错误路径下 pStreamBlkWinIdx 半重建状态未恢复 |
低 | 中 | summary |
核心修复逻辑(extWinIndefRowsDo 传实际输出行数而非输入行数、stRunnerTopTaskHandleExternalWinOutputBlock 的空索引安全处理)验证正确,测试用例 Basic17 覆盖了描述的崩溃路径。
Apply -Wa,--noexecstack to fast-lzma2 in both the external Makefile patch and the Conan build path. Co-authored-by: OpenAI Codex (GPT-5) <noreply@openai.com>
784de72 to
99432ee
Compare
Keep the stream runner from crashing when external-window output row indexes are cleared before nested diff returns its final block. Co-authored-by: GPT-5 Codex <noreply@openai.com>
99432ee to
4220ccf
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 6 out of 6 changed files in this pull request and generated 7 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| int32_t* pCurrPair = (int32_t*)pCurr; | ||
| int32_t winIdx = pCurrPair[0]; | ||
| int32_t rowStart = pCurrPair[1]; |
There was a problem hiding this comment.
This packs/unpacks an int64_t via int32_t* type-punning, which is undefined behavior under C strict-aliasing rules and also bakes in little-endian layout assumptions. To make this safe and portable, encode/decode the (winIdx,rowStart) pair using bitwise operations (mask/shift) on a uint64_t (like the existing idx & 0xFFFFFFFF / idx >> 32 pattern in the runner), or use memcpy into/from a packed struct.
| int64_t val = 0; | ||
| int32_t* pOutPair = (int32_t*)&val; | ||
| pOutPair[0] = winIdx; | ||
| pOutPair[1] = totalRows; |
There was a problem hiding this comment.
This packs/unpacks an int64_t via int32_t* type-punning, which is undefined behavior under C strict-aliasing rules and also bakes in little-endian layout assumptions. To make this safe and portable, encode/decode the (winIdx,rowStart) pair using bitwise operations (mask/shift) on a uint64_t (like the existing idx & 0xFFFFFFFF / idx >> 32 pattern in the runner), or use memcpy into/from a packed struct.
| SExternalWindowOperator* pExtW = pOperator->info; | ||
| SSDataBlock* pResBlock = NULL; | ||
| SArray* pIdx = NULL; | ||
| int64_t rowsBefore = 0; |
There was a problem hiding this comment.
The new rowsBefore delta is computed as an int64_t and then cast to int32_t without validation. If pResBlock->info.rows ever decreases (buffer reuse/reset) or the delta exceeds INT32_MAX, this can become negative/truncated and corrupt window-row index accounting. Recommend computing an int64_t delta = pResBlock->info.rows - rowsBefore; and explicitly guarding delta >= 0 && delta <= INT32_MAX before casting/passing into extWinAppendWinIdx, otherwise return an error.
| int32_t code = TSDB_CODE_SUCCESS, lino = 0; | ||
|
|
||
| TAOS_CHECK_EXIT(extWinGetSetWinResBlockBuf(pOperator, rows, pWin, &pResBlock, &pIdx)); | ||
| rowsBefore = pResBlock->info.rows; |
There was a problem hiding this comment.
The new rowsBefore delta is computed as an int64_t and then cast to int32_t without validation. If pResBlock->info.rows ever decreases (buffer reuse/reset) or the delta exceeds INT32_MAX, this can become negative/truncated and corrupt window-row index accounting. Recommend computing an int64_t delta = pResBlock->info.rows - rowsBefore; and explicitly guarding delta >= 0 && delta <= INT32_MAX before casting/passing into extWinAppendWinIdx, otherwise return an error.
| TAOS_CHECK_EXIT(extWinAppendWinIdx(pOperator->pTaskInfo, pIdx, pResBlock, | ||
| extWinGetCurWinIdx(pOperator->pTaskInfo), | ||
| (int32_t)(pResBlock->info.rows - rowsBefore))); |
There was a problem hiding this comment.
The new rowsBefore delta is computed as an int64_t and then cast to int32_t without validation. If pResBlock->info.rows ever decreases (buffer reuse/reset) or the delta exceeds INT32_MAX, this can become negative/truncated and corrupt window-row index accounting. Recommend computing an int64_t delta = pResBlock->info.rows - rowsBefore; and explicitly guarding delta >= 0 && delta <= INT32_MAX before casting/passing into extWinAppendWinIdx, otherwise return an error.
|
|
||
| # Build make command | ||
| cflags = "-Wall -O2 -pthread" | ||
| asflags = "-Wa,--noexecstack" |
There was a problem hiding this comment.
-Wa,--noexecstack is generally ELF/GNU-as specific; unconditionally adding it in the Conan recipe (and in cmake/in/lzma2.Makefile) can break builds on non-Linux platforms or toolchains that don't recognize the flag. Consider applying this flag conditionally (e.g., only for Linux/ELF targets and compatible compilers/assemblers), or detecting support before setting ASFLAGS.
| # Execute make compilation | ||
| self.run( | ||
| f'make CFLAGS="{cflags}" CC={self.settings.get_safe("compiler", default="gcc")} libfast-lzma2', | ||
| f'make CFLAGS="{cflags}" ASFLAGS="{asflags}" CC={self.settings.get_safe("compiler", default="gcc")} libfast-lzma2', |
There was a problem hiding this comment.
-Wa,--noexecstack is generally ELF/GNU-as specific; unconditionally adding it in the Conan recipe (and in cmake/in/lzma2.Makefile) can break builds on non-Linux platforms or toolchains that don't recognize the flag. Consider applying this flag conditionally (e.g., only for Linux/ELF targets and compatible compilers/assemblers), or detecting support before setting ASFLAGS.
Description
Keep the stream runner from crashing when external-window output row indexes are cleared before nested diff returns its final block.
Issue(s)
Checklist
Please check the items in the checklist if applicable.