enh(stream): optimize the logic of history data scan in stream#35196
enh(stream): optimize the logic of history data scan in stream#35196wangmm0220 wants to merge 14 commits into3.0from
Conversation
- Add 8 new enums under ESTriggerPullType for DiffRange/SameRange × trigger/calc × first/next - Mark all legacy STRIGGER_PULL_TSDB_DATA* enums as DEPRECATED (kept for compatibility) - Add SSTriggerTableTimeRange, SSTriggerTsdbDataDiffRangeRequest, SSTriggerTsdbDataSameRangeRequest - Add isHistory flag to SSTriggerSetTableRequest for vSetTableListHistory routing - Extend SSTriggerPullRequestUnion with new request members Refs: #6490635370 stream history pull optimization (design v0.3) Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
- Add serialize/deserialize cases for STRIGGER_PULL_TSDB_DATA_DIFF_RANGE family (4 variants) and SAME_RANGE family (4 variants) - Add isHistory serialization to SET_TABLE with backward-compatible tDecodeIsEnd guard - Add destroy branch for DIFF_RANGE ranges SArray - Add streamMsgTest gtest covering 6 round-trip scenarios (isHistory flag, 4 DiffRange variants, 4 SameRange variants, empty/large ranges, double-destroy safety) Refs: #6490635370 stream history pull optimization Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
…nctions Introduce a third StreamTableListInfo data source (vSetTableListHistory) to isolate the TSDB historical path of virtual-table streams from the realtime WAL path (which keeps using vSetTableList). - streamReader.h: add vSetTableListHistory field + 4 helper declarations (qStreamCopyTableInfoHistory, qStreamGetTableListNumHistory, qStreamGetTableListGroupNumHistory, qStreamGetTableListHistory) - streamReader.c: implement 4 *History helpers (mirror of existing helpers but unconditionally point at vSetTableListHistory; no isVtableStream branching) - streamReader.c: destroy vSetTableListHistory alongside other lists - qStreamSetTableList already takes a list pointer directly, so no *History variant is needed; callers will pass &reader->vSetTableListHistory Existing isVtableStream ternaries and realtime vSetTableList init/clear are intentionally unchanged. Routing of new TSDB-history requests to *History helpers will land in Batch 4. Refs: #6490635370 stream history pull optimization Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
…History - qBuildVTableListHistory mirrors qBuildVTableList but writes to vSetTableListHistory, isolating TSDB historical metadata from the realtime WAL path. - vnodeProcessStreamTsdbDataDiffRangeReq scans contiguous-by-uid SSTriggerTableTimeRange list, accumulates per-uid blocks, truncates at STREAM_RETURN_ROWS_NUM, caches SDiffRangeIter in streamTaskMap. - vnodeProcessStreamTsdbDataSameRangeReq guards against virtual-table streams and reuses the createStreamTask / getTableData / buildArrayRsp pattern of the trigger-data handler. - pickSchemasHistory + scanOneTableForRange thin helpers extract the per-table-per-range scan boilerplate. - Dispatch switch routes 8 new STRIGGER_PULL_TSDB_DATA_(DIFF|SAME)_RANGE enums to the two handlers. - vnodeProcessStreamSetTableReq now writes to vSetTableListHistory (via qBuildVTableListHistory) when SET_TABLE.isHistory is true, preserving current behavior otherwise. Refs: stream history pull optimization (design v0.3) Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
…y TSWAP isolation Updates per spec revision (history-pull section): * Add uidHashTriggerHistory and uidHashCalcHistory to SStreamTriggerReaderInfo (same shape as uidHashTrigger/uidHashCalc, dedicated to the TSDB historical path of virtual-table streams). * SetTableRequest with isHistory=true now TSWAPs into the *History fields instead of overwriting the realtime uidHashTrigger/uidHashCalc — fixes the cross-contamination where a STRIGGER_PULL_SET_TABLE_HISTORY request would clobber the realtime mapping. * Cleanup releases both *History hashes alongside the realtime ones. * pickSchemasHistory comment now points to the *History hashes as the source of per-uid slotId->colId mapping; per-uid column projection in the DiffRange vtable handler will land in a follow-up commit. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
There was a problem hiding this comment.
Code Review
This pull request introduces optimizations for history-data pulls in stream processing by adding new message types and dedicated history state management. The changes include new request structures, serialization logic, and vnode-side processing. The review feedback identifies several critical issues, including potential OOM vulnerabilities from unvalidated input, thread-safety concerns in the history task map, and logic errors in group ID lookups for history data. Additionally, suggestions were provided to reduce memory pressure during large table scans and to fix a bug in the range iterator initialization.
| int32_t nRanges = 0; | ||
| TAOS_CHECK_EXIT(tDecodeI32(&decoder, &nRanges)); | ||
| if (nRanges > 0) { | ||
| pRequest->ranges = taosArrayInit_s(sizeof(SSTriggerTableTimeRange), nRanges); |
There was a problem hiding this comment.
| if (sStreamReaderInfo->streamTaskMapHistory == NULL) { | ||
| sStreamReaderInfo->streamTaskMapHistory = | ||
| taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK); | ||
| STREAM_CHECK_NULL_GOTO(sStreamReaderInfo->streamTaskMapHistory, terrno); | ||
| taosHashSetFreeFp(sStreamReaderInfo->streamTaskMapHistory, releaseDiffRangeIterFp); | ||
| } | ||
| /* If a stale iter exists for this key, freeFp will destroy it on overwrite. */ | ||
| STREAM_CHECK_RET_GOTO(newDiffRangeIter(req->tsdbDataDiffRangeReq.ranges, | ||
| req->tsdbDataDiffRangeReq.ver, | ||
| req->tsdbDataDiffRangeReq.order, | ||
| isCalc, &iter)); | ||
| code = taosHashPut(sStreamReaderInfo->streamTaskMapHistory, &key, LONG_BYTES, &iter, sizeof(iter)); | ||
| if (code != 0) { | ||
| destroyDiffRangeIter(iter); | ||
| iter = NULL; | ||
| STREAM_CHECK_RET_GOTO(code); | ||
| } | ||
| } else { | ||
| void** tmp = taosHashGet(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES); | ||
| void** tmp = taosHashGet(sStreamReaderInfo->streamTaskMapHistory, &key, LONG_BYTES); | ||
| STREAM_CHECK_NULL_GOTO(tmp, TSDB_CODE_STREAM_NO_CONTEXT); | ||
| pTaskInner = *(SStreamReaderTaskInner**)tmp; | ||
| STREAM_CHECK_NULL_GOTO(pTaskInner, TSDB_CODE_INTERNAL_ERROR); | ||
| iter = *(SDiffRangeIter**)tmp; | ||
| STREAM_CHECK_NULL_GOTO(iter, TSDB_CODE_INTERNAL_ERROR); | ||
| } |
There was a problem hiding this comment.
Access to streamTaskMapHistory and its initialization are not thread-safe. The sStreamReaderInfo structure is shared across threads, and other members are protected by sStreamReaderInfo->lock. Here, streamTaskMapHistory is initialized with HASH_NO_LOCK and accessed without holding the latch, which is a race condition. All operations on this map should be protected by the reader's lock, and the map should be initialized with appropriate locking (e.g., HASH_LOCK) or the initialization itself should be synchronized.
| } | ||
|
|
||
| STableKeyInfo pList = {.uid = r->uid, | ||
| .groupId = qStreamGetGroupIdFromSet(reader, r->uid)}; |
There was a problem hiding this comment.
For history data pulls on virtual table streams, the table-to-group mapping is stored in vSetTableListHistory. qStreamGetGroupIdFromSet likely only searches vSetTableList (the realtime mapping), which will fail to find tables that are only present in the history set. A history-specific lookup or an updated lookup function that considers the history list should be used.
| while (1) { | ||
| bool hasNext = false; | ||
| STREAM_CHECK_RET_GOTO(getTableDataInfo(pTaskInner, &hasNext)); | ||
| if (!hasNext) { | ||
| break; | ||
| } | ||
| if (!hasNext) break; | ||
|
|
||
| SSDataBlock* pBlock = NULL; | ||
| STREAM_CHECK_RET_GOTO(getTableData(pTaskInner, &pBlock)); | ||
| STREAM_CHECK_RET_GOTO(blockDataMerge(pBlockRes, pBlock)); | ||
| ST_TASK_DLOG("vgId:%d %s get skey:%" PRId64 ", eksy:%" PRId64 ", uid:%" PRId64 ", gId:%" PRIu64 ", rows:%" PRId64, | ||
| TD_VID(pVnode), __func__, pBlockRes->info.window.skey, pBlockRes->info.window.ekey, | ||
| pBlockRes->info.id.uid, pBlockRes->info.id.groupId, pBlockRes->info.rows); | ||
| if (pBlock != NULL && pBlock->info.rows > 0) { | ||
| STREAM_CHECK_RET_GOTO(processTag(reader, false, pBlock->info.id.uid, pBlock, | ||
| 0, pBlock->info.rows, 1)); | ||
| } | ||
| STREAM_CHECK_RET_GOTO(qStreamFilter(pBlock, reader->pFilterInfo, NULL)); | ||
|
|
||
| if (pBlock->info.rows == 0) { | ||
| continue; | ||
| } | ||
| STREAM_CHECK_RET_GOTO(blockDataMerge(pResult, pBlock)); | ||
| } |
There was a problem hiding this comment.
This loop scans the entire time range for a single table into memory (pResult) before returning. If a single table range contains a massive amount of data, this can lead to high memory pressure. Since the caller checks for STREAM_RETURN_ROWS_NUM, it would be more efficient to check the accumulated row count inside this loop and support returning a partial scan of the table range.
| p->ranges = taosArrayDup(ranges, NULL); | ||
| if (p->ranges == NULL) { | ||
| taosMemoryFree(p); | ||
| return terrno; | ||
| } | ||
| taosArraySort(p->ranges, diffRangeUidCmp); |
There was a problem hiding this comment.
If ranges is NULL (which happens when nRanges is 0 during deserialization), taosArrayDup will return NULL, and the function will return terrno. This causes the pull request to fail with an error instead of returning an empty result. The function should handle NULL ranges gracefully by initializing an empty array.
p->ranges = (ranges != NULL) ? taosArrayDup(ranges, NULL) : taosArrayInit(0, sizeof(SSTriggerTableTimeRange));
if (p->ranges == NULL) {
taosMemoryFree(p);
return terrno;
}
taosArraySort(p->ranges, diffRangeUidCmp);| } | ||
|
|
||
| STableKeyInfo pList = {.uid = r->uid, | ||
| .groupId = qStreamGetGroupIdFromSet(reader, r->uid)}; |
There was a problem hiding this comment.
Pull request overview
This PR introduces new stream trigger pull message types and reader-side logic intended to optimize history-data scans, especially for virtual-table streams, by switching from legacy TSDB pull messages to new “diff-range/same-range” request patterns and adding separate History state in the stream reader.
Changes:
- Added new trigger pull types and message structs for
TSDB_DATA_DIFF_RANGE/TSDB_DATA_SAME_RANGEplusSET_TABLE_HISTORY, including serialization/deserialization and partial destroy support. - Added “history” variants of virtual-table table-list bookkeeping in the stream reader (
*Historyhashes and table lists). - Added a new GTest unit test target validating stream message round-trips for the new message types.
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
source/libs/new-stream/test/streamMsgTest.cpp |
Adds GTest coverage for new stream pull message serialization/deserialization. |
source/libs/new-stream/test/CMakeLists.txt |
Builds and registers the new streamMsgTest binary. |
source/libs/new-stream/src/streamReader.c |
Adds History table-list helpers and virtual-table list building for history path. |
source/dnode/vnode/src/vnd/vnodeStream.c |
Adds new DIFF_RANGE/SAME_RANGE handlers and history SetTable routing; removes legacy TSDB handlers from dispatch. |
source/common/src/msg/streamMsg.c |
Implements encode/decode + destroy updates for new pull request types. |
include/libs/new-stream/streamReader.h |
Extends SStreamTriggerReaderInfo and exposes new history helper APIs. |
include/common/streamMsg.h |
Defines new pull types and request structs for history scan optimization. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // add uid to the end | ||
| SColumnInfoData* pColData = taosArrayGetLast(pResult->pDataBlock); | ||
| STREAM_CHECK_NULL_GOTO(pColData, terrno); | ||
| STREAM_CHECK_RET_GOTO(colDataSetNItems(pColData, 0, (const char*)&pBlock->info.id.uid, pResult->info.rows, 1, false)); | ||
|
|
There was a problem hiding this comment.
vnodeProcessStreamTsdbDataSameRangeReq() also writes uid into the last column of the response block. As with DiffRange, the trigger-side timestamp sorter treats the last column as the per-row version column, so this will break sorting/merging. Use the version column for version, and introduce a separate uid column if per-row uid is required.
| // add uid to the end | |
| SColumnInfoData* pColData = taosArrayGetLast(pResult->pDataBlock); | |
| STREAM_CHECK_NULL_GOTO(pColData, terrno); | |
| STREAM_CHECK_RET_GOTO(colDataSetNItems(pColData, 0, (const char*)&pBlock->info.id.uid, pResult->info.rows, 1, false)); | |
| // the last column is treated as the per-row version column by downstream sorting/merging | |
| SColumnInfoData* pColData = taosArrayGetLast(pResult->pDataBlock); | |
| STREAM_CHECK_NULL_GOTO(pColData, terrno); | |
| STREAM_CHECK_RET_GOTO( | |
| colDataSetNItems(pColData, 0, (const char*)&pBlock->info.version, pResult->info.rows, 1, false)); |
| ESTriggerPullType type = req->base.type; | ||
| bool isCalc = (type == STRIGGER_PULL_TSDB_DATA_SAME_RANGE_CALC || | ||
| type == STRIGGER_PULL_TSDB_DATA_SAME_RANGE_CALC_NEXT); | ||
| bool isFirst = (type == STRIGGER_PULL_TSDB_DATA_SAME_RANGE || | ||
| type == STRIGGER_PULL_TSDB_DATA_SAME_RANGE_CALC); | ||
| ESTriggerPullType baseType = isCalc ? STRIGGER_PULL_TSDB_DATA_SAME_RANGE_CALC | ||
| : STRIGGER_PULL_TSDB_DATA_SAME_RANGE; | ||
| int64_t key = getSessionKey(req->base.sessionId, (int64_t)baseType); | ||
|
|
||
| if (req->base.type == STRIGGER_PULL_TSDB_CALC_DATA) { | ||
| int32_t pNum = 0; | ||
| STREAM_CHECK_RET_GOTO(qStreamGetTableList(sStreamReaderInfo, req->tsdbCalcDataReq.gid, &pList, &pNum)); | ||
| BUILD_OPTION(options, getSuid(sStreamReaderInfo, pList), req->tsdbCalcDataReq.ver, TSDB_ORDER_ASC, req->tsdbCalcDataReq.skey, req->tsdbCalcDataReq.ekey, | ||
| sStreamReaderInfo->triggerCols, false, NULL); | ||
| STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, sStreamReaderInfo->triggerResBlock, pList, pNum, &sStreamReaderInfo->storageApi)); | ||
| ST_TASK_DLOG("vgId:%d %s start, type:%d, isCalc:%d, isFirst:%d, gid:%"PRId64 | ||
| ", skey:%"PRId64", ekey:%"PRId64, TD_VID(pVnode), __func__, | ||
| type, isCalc, isFirst, | ||
| req->tsdbDataSameRangeReq.gid, | ||
| req->tsdbDataSameRangeReq.skey, | ||
| req->tsdbDataSameRangeReq.ekey); | ||
|
|
||
| STREAM_CHECK_RET_GOTO(taosHashPut(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES, &pTaskInner, sizeof(pTaskInner))); | ||
| STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamReaderInfo->triggerResBlock, false, &pTaskInner->pResBlockDst)); | ||
| SStreamReaderTaskInner* pTaskInner = NULL; | ||
|
|
||
| if (isFirst) { | ||
| int32_t pNum = 0; | ||
| STREAM_CHECK_RET_GOTO(qStreamGetTableList(sStreamReaderInfo, req->tsdbDataSameRangeReq.gid, &pList, &pNum)); | ||
| void* schemas = sStreamReaderInfo->triggerCols; | ||
| BUILD_OPTION(options, getSuid(sStreamReaderInfo, pList), req->tsdbDataSameRangeReq.ver, | ||
| req->tsdbDataSameRangeReq.order, | ||
| req->tsdbDataSameRangeReq.skey, req->tsdbDataSameRangeReq.ekey, | ||
| schemas, false, NULL); | ||
| STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, sStreamReaderInfo->triggerResBlock, | ||
| pList, pNum, &sStreamReaderInfo->storageApi)); |
There was a problem hiding this comment.
vnodeProcessStreamTsdbDataSameRangeReq() computes isCalc/isFirst/baseType, but the scan path always uses triggerCols/triggerResBlock/triggerBlock and never switches to calc columns/blocks for the _CALC/_CALC_NEXT variants. This likely returns the wrong schema for calc pulls and may break callers expecting calc output. Branch on isCalc to use the calc plan’s schema/blocks (or split into two handlers) and ensure the session keying still matches the chosen baseType.
|
|
||
| int main(int argc, char** argv) { | ||
| ::testing::InitGoogleTest(&argc, argv); | ||
| return RUN_ALL_TESTS(); | ||
| } |
There was a problem hiding this comment.
This test defines its own main(), but CMake uses DEP_ext_gtest(streamMsgTest) which links against GTest::gtest_main. That will cause a duplicate symbol link error (multiple definitions of main). Remove the custom main() or switch to linking only GTest::gtest (without gtest_main) for this target.
| int main(int argc, char** argv) { | |
| ::testing::InitGoogleTest(&argc, argv); | |
| return RUN_ALL_TESTS(); | |
| } |
| @@ -4166,25 +4211,6 @@ int32_t vnodeProcessStreamReaderMsg(SVnode* pVnode, SRpcMsg* pMsg, SQueueInfo *p | |||
| case STRIGGER_PULL_TSDB_META_NEXT: | |||
| STREAM_CHECK_RET_GOTO(vnodeProcessStreamTsdbMetaReq(pVnode, pMsg, &req, sStreamReaderInfo)); | |||
| break; | |||
| case STRIGGER_PULL_TSDB_TS_DATA: | |||
| if (sStreamReaderInfo->isVtableStream) { | |||
| STREAM_CHECK_RET_GOTO(vnodeProcessStreamTsdbTsDataReqVTable(pVnode, pMsg, &req, sStreamReaderInfo)); | |||
| } else { | |||
| STREAM_CHECK_RET_GOTO(vnodeProcessStreamTsdbTsDataReqNonVTable(pVnode, pMsg, &req, sStreamReaderInfo)); | |||
| } | |||
| break; | |||
| case STRIGGER_PULL_TSDB_TRIGGER_DATA: | |||
| case STRIGGER_PULL_TSDB_TRIGGER_DATA_NEXT: | |||
| STREAM_CHECK_RET_GOTO(vnodeProcessStreamTsdbTriggerDataReq(pVnode, pMsg, &req, sStreamReaderInfo)); | |||
| break; | |||
| case STRIGGER_PULL_TSDB_CALC_DATA: | |||
| case STRIGGER_PULL_TSDB_CALC_DATA_NEXT: | |||
| STREAM_CHECK_RET_GOTO(vnodeProcessStreamTsdbCalcDataReq(pVnode, pMsg, &req, sStreamReaderInfo)); | |||
| break; | |||
| case STRIGGER_PULL_TSDB_DATA: | |||
| case STRIGGER_PULL_TSDB_DATA_NEXT: | |||
| STREAM_CHECK_RET_GOTO(vnodeProcessStreamTsdbVirtalDataReq(pVnode, pMsg, &req, sStreamReaderInfo)); | |||
| break; | |||
| case STRIGGER_PULL_GROUP_COL_VALUE: | |||
| STREAM_CHECK_RET_GOTO(vnodeProcessStreamGroupColValueReq(pVnode, pMsg, &req, sStreamReaderInfo)); | |||
| break; | |||
| @@ -4209,6 +4235,18 @@ int32_t vnodeProcessStreamReaderMsg(SVnode* pVnode, SRpcMsg* pMsg, SQueueInfo *p | |||
| case STRIGGER_PULL_WAL_CALC_DATA_NEW: | |||
| STREAM_CHECK_RET_GOTO(vnodeProcessStreamWalCalcDataNewReq(pVnode, pMsg, &req, sStreamReaderInfo)); | |||
| break; | |||
| case STRIGGER_PULL_TSDB_DATA_DIFF_RANGE: | |||
| case STRIGGER_PULL_TSDB_DATA_DIFF_RANGE_NEXT: | |||
| case STRIGGER_PULL_TSDB_DATA_DIFF_RANGE_CALC: | |||
| case STRIGGER_PULL_TSDB_DATA_DIFF_RANGE_CALC_NEXT: | |||
| STREAM_CHECK_RET_GOTO(vnodeProcessStreamTsdbDataDiffRangeReq(pVnode, pMsg, &req, sStreamReaderInfo)); | |||
| break; | |||
| case STRIGGER_PULL_TSDB_DATA_SAME_RANGE: | |||
| case STRIGGER_PULL_TSDB_DATA_SAME_RANGE_NEXT: | |||
| case STRIGGER_PULL_TSDB_DATA_SAME_RANGE_CALC: | |||
| case STRIGGER_PULL_TSDB_DATA_SAME_RANGE_CALC_NEXT: | |||
| STREAM_CHECK_RET_GOTO(vnodeProcessStreamTsdbDataSameRangeReq(pVnode, pMsg, &req, sStreamReaderInfo)); | |||
| break; | |||
There was a problem hiding this comment.
The reader-side dispatch removed handling for legacy pull types (STRIGGER_PULL_TSDB_TS_DATA / _TRIGGER_DATA / _CALC_DATA / _TSDB_DATA and their _NEXT variants). The trigger task code still sends these types (e.g., streamTriggerTask.c uses STRIGGER_PULL_TSDB_TS_DATA / STRIGGER_PULL_TSDB_TRIGGER_DATA / STRIGGER_PULL_TSDB_CALC_DATA / STRIGGER_PULL_TSDB_DATA), so this change will cause runtime “unknown inner msg type” errors and break history scans until the trigger side is migrated. Either keep backward-compatible handlers here (even if deprecated) or update the trigger side in the same PR to send the new DIFF_RANGE/SAME_RANGE requests.
| static int32_t newDiffRangeIter(SArray* ranges, int64_t ver, int8_t order, bool isCalc, | ||
| SDiffRangeIter** out) { | ||
| SDiffRangeIter* p = taosMemoryCalloc(1, sizeof(SDiffRangeIter)); | ||
| if (p == NULL) return terrno; | ||
| p->ranges = taosArrayDup(ranges, NULL); | ||
| if (p->ranges == NULL) { | ||
| taosMemoryFree(p); | ||
| return terrno; | ||
| } | ||
| taosArraySort(p->ranges, diffRangeUidCmp); | ||
| p->pos = 0; |
There was a problem hiding this comment.
newDiffRangeIter() calls taosArrayDup(ranges, ...) without handling ranges==NULL. taosArrayDup(NULL, ...) returns NULL with TSDB_CODE_INVALID_PARA, so a DiffRange request with NULL/empty ranges (which is valid per the serializer/deserializer and covered by streamMsgTest) will fail at runtime. Treat NULL as an empty range list (no-op) or allocate an empty SArray when ranges is NULL.
| // add uid to the end | ||
| SColumnInfoData* pColData = taosArrayGetLast(pBlockRes->pDataBlock); | ||
| STREAM_CHECK_NULL_GOTO(pColData, terrno); | ||
| STREAM_CHECK_RET_GOTO(colDataSetNItems(pColData, 0, (const char*)&r->uid, pBlockRes->info.rows, 1, false)); |
There was a problem hiding this comment.
scanOneTableForRange() fills the last column with r->uid ("add uid to the end"), but downstream history sorting logic treats the last column as the per-row version column (see stNewTimestampSorterSetData: it reads taosArrayGetLast(pDataBlock->pDataBlock) as ver, with optional bias). Writing uid here will corrupt version ordering/merge semantics. Populate the version column with the actual row/block version, and if you need a per-row uid, add a dedicated uid column and adjust the verColBias accordingly.
| // add uid to the end | |
| SColumnInfoData* pColData = taosArrayGetLast(pBlockRes->pDataBlock); | |
| STREAM_CHECK_NULL_GOTO(pColData, terrno); | |
| STREAM_CHECK_RET_GOTO(colDataSetNItems(pColData, 0, (const char*)&r->uid, pBlockRes->info.rows, 1, false)); | |
| // populate the trailing version column used by downstream history sorting | |
| SColumnInfoData* pColData = taosArrayGetLast(pBlockRes->pDataBlock); | |
| STREAM_CHECK_NULL_GOTO(pColData, terrno); | |
| int64_t ver = options.ver; | |
| STREAM_CHECK_RET_GOTO(colDataSetNItems(pColData, 0, (const char*)&ver, pBlockRes->info.rows, 1, false)); |
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Adds SSelectStmt::pWhereInjectedFromPreFilter and a parser-side helper injectPreFilterIntoCalcQuery that clones trigger pre_filter into the calc query's WHERE for every %%trows SELECT. translateWhere bypasses the pre-existing '%%trows + WHERE' error when the WHERE is the injected one. This is a behavior-neutral scaffold; the next commit removes the wrong column-append block on the trigger side. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
The block at parTranslater.c:19264-19275 wrongly appended every column referenced by the calc SELECT (collected via COLLECT_COL_TYPE_ALL into triggerScanList) to the trigger SELECT's projection. This forced the trigger side to scan calc-only columns (e.g. c3, t2 in 'select sum(c3), avg(t2) from %%trows' with state_window(c1) + pre_filter(c2>2)). With the prior commit's pre_filter injection making the calc side self-contained, the trigger projection now stays exactly what createStreamReqBuildTriggerSelect computed (trigger window cols + pre_filter cols + tbname()). Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
After the previous commit removed its only reader, the field is unreferenced. Delete the field declaration, the COLLECT_COL_TYPE_ALL filler in planLogicCreater.c that was specifically populating it for SP_PARTITION_ROWS, and the now-useless = NULL initializer in parTranslater.c. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Adds a regression test mirroring the design-spec example: trigger: state_window(c1) + pre_filter(c2>2) calc: select _c0, sum(c3), avg(t2) from %%trows The aggregate values verify pre_filter is actually applied on the calc side (rows with c2<=2 must not contribute to sum(c3)/avg(t2)). A negative case re-confirms that user-written WHERE on %%trows is still rejected. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
…r path
Adds MND_STREAM_VER_NUMBER=9 with MND_STREAM_OLD_TRIGGER_COLS=8 so that
streams persisted by older dnodes keep the legacy trigger-scan/calc-scan
behavior while freshly created streams use the new optimized path.
isOldPlan is wired through:
- mndDef.c deserialization (sver == 8 -> isOldPlan = true)
- SCMCreateStreamReq / streamMsg encode+decode (symmetric)
- SStreamTriggerReaderInfo at reader runtime
Reader side (vnodeStream.c + streamReader.c):
- Split pFilterInfo into pFilterInfoTrigger / pFilterInfoCalc; calc
- Merge duplicated history/realtime table-list helpers behind an
isHistory parameter
- Rewrite slotIdList build to walk the hash and assign each element,
eliminating the uninitialized-slot risk
- Add TRANSFORM_DATA_TO_CALC macro for legacy-plan calc block reshape
- Drop unused getSuid / sortCid / sortSSchema helpers
- Clean up taosHashPut failure paths to release the task on error
Parser (parTranslater.c):
- Remove the hard rejection of %%trows + virtual-table + pre_filter;
the combination is now gated by the version bump above
Test:
- Adjust test_pre_filter_trows_scan_cols to exercise vtable-style
pre_filter (c2>2 and t1>1) with partition by tbname and select *
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 18 out of 18 changed files in this pull request and generated 5 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| @@ -4164,27 +4275,12 @@ int32_t vnodeProcessStreamReaderMsg(SVnode* pVnode, SRpcMsg* pMsg, SQueueInfo *p | |||
| break; | |||
| case STRIGGER_PULL_TSDB_META: | |||
| case STRIGGER_PULL_TSDB_META_NEXT: | |||
| STREAM_CHECK_RET_GOTO(vnodeProcessStreamTsdbMetaReq(pVnode, pMsg, &req, sStreamReaderInfo)); | |||
| break; | |||
| case STRIGGER_PULL_TSDB_TS_DATA: | |||
| if (sStreamReaderInfo->isVtableStream) { | |||
| STREAM_CHECK_RET_GOTO(vnodeProcessStreamTsdbTsDataReqVTable(pVnode, pMsg, &req, sStreamReaderInfo)); | |||
| STREAM_CHECK_RET_GOTO(vnodeProcessStreamTsdbMetaVtableReq(pVnode, pMsg, &req, sStreamReaderInfo)); | |||
| } else { | |||
| STREAM_CHECK_RET_GOTO(vnodeProcessStreamTsdbTsDataReqNonVTable(pVnode, pMsg, &req, sStreamReaderInfo)); | |||
| STREAM_CHECK_RET_GOTO(vnodeProcessStreamTsdbMetaReq(pVnode, pMsg, &req, sStreamReaderInfo)); | |||
| } | |||
| break; | |||
| case STRIGGER_PULL_TSDB_TRIGGER_DATA: | |||
| case STRIGGER_PULL_TSDB_TRIGGER_DATA_NEXT: | |||
| STREAM_CHECK_RET_GOTO(vnodeProcessStreamTsdbTriggerDataReq(pVnode, pMsg, &req, sStreamReaderInfo)); | |||
| break; | |||
| case STRIGGER_PULL_TSDB_CALC_DATA: | |||
| case STRIGGER_PULL_TSDB_CALC_DATA_NEXT: | |||
| STREAM_CHECK_RET_GOTO(vnodeProcessStreamTsdbCalcDataReq(pVnode, pMsg, &req, sStreamReaderInfo)); | |||
| break; | |||
| case STRIGGER_PULL_TSDB_DATA: | |||
| case STRIGGER_PULL_TSDB_DATA_NEXT: | |||
| STREAM_CHECK_RET_GOTO(vnodeProcessStreamTsdbVirtalDataReq(pVnode, pMsg, &req, sStreamReaderInfo)); | |||
| break; | |||
| case STRIGGER_PULL_GROUP_COL_VALUE: | |||
| STREAM_CHECK_RET_GOTO(vnodeProcessStreamGroupColValueReq(pVnode, pMsg, &req, sStreamReaderInfo)); | |||
| break; | |||
| @@ -4207,7 +4303,23 @@ int32_t vnodeProcessStreamReaderMsg(SVnode* pVnode, SRpcMsg* pMsg, SQueueInfo *p | |||
| STREAM_CHECK_RET_GOTO(vnodeProcessStreamWalMetaDataNewReq(pVnode, pMsg, &req, sStreamReaderInfo)); | |||
| break; | |||
| case STRIGGER_PULL_WAL_CALC_DATA_NEW: | |||
| STREAM_CHECK_RET_GOTO(vnodeProcessStreamWalCalcDataNewReq(pVnode, pMsg, &req, sStreamReaderInfo)); | |||
| if (sStreamReaderInfo->isOldPlan) { | |||
| STREAM_CHECK_RET_GOTO(vnodeProcessStreamWalCalcDataNewReq(pVnode, pMsg, &req, sStreamReaderInfo)); | |||
| } else { | |||
| STREAM_CHECK_RET_GOTO(vnodeProcessStreamWalDataNewReq(pVnode, pMsg, &req, sStreamReaderInfo)); | |||
| } | |||
| break; | |||
| case STRIGGER_PULL_TSDB_DATA_DIFF_RANGE: | |||
| case STRIGGER_PULL_TSDB_DATA_DIFF_RANGE_NEXT: | |||
| case STRIGGER_PULL_TSDB_DATA_DIFF_RANGE_CALC: | |||
| case STRIGGER_PULL_TSDB_DATA_DIFF_RANGE_CALC_NEXT: | |||
| STREAM_CHECK_RET_GOTO(vnodeProcessStreamTsdbDataDiffRangeReq(pVnode, pMsg, &req, sStreamReaderInfo)); | |||
| break; | |||
| case STRIGGER_PULL_TSDB_DATA_SAME_RANGE: | |||
| case STRIGGER_PULL_TSDB_DATA_SAME_RANGE_NEXT: | |||
| case STRIGGER_PULL_TSDB_DATA_SAME_RANGE_CALC: | |||
| case STRIGGER_PULL_TSDB_DATA_SAME_RANGE_CALC_NEXT: | |||
| STREAM_CHECK_RET_GOTO(vnodeProcessStreamTsdbDataSameRangeReq(pVnode, pMsg, &req, sStreamReaderInfo)); | |||
There was a problem hiding this comment.
In vnodeProcessStreamReaderMsg, the switch no longer handles the legacy history pull request types (e.g. STRIGGER_PULL_TSDB_TS_DATA, STRIGGER_PULL_TSDB_TRIGGER_DATA(_NEXT), STRIGGER_PULL_TSDB_CALC_DATA(_NEXT), STRIGGER_PULL_TSDB_DATA(_NEXT)). The trigger-side history logic in streamTriggerTask.c still sends these types, so the reader will now hit the default branch and return an error for history scans. Either keep backward-compatible handling for these deprecated types (map them to the new diff/same-range handlers) or update the sender to stop emitting the legacy types in the same change-set/version gate (e.g. keyed off isOldPlan).
| # calc: select _c0, sum(c3), avg(t2) from %%trows | ||
| tdSql.execute( | ||
| f"create stream s_pf state_window(c1) from stb " | ||
| f"partition by tbname stream_options(pre_filter(c2 > 2 and t1 > 1)) " | ||
| f"into res_stb as " | ||
| f"select * from %%trows;" |
There was a problem hiding this comment.
The stream created here uses into res_stb as select * from %%trows;, but check1() later queries sum_c3/avg_t2 columns and expects aggregated results. As written, res_stb will not contain those columns, so the test will fail (or validate the wrong behavior). Align the stream definition/output table schema with the assertions in check1() (or change the assertions to match select *).
| # calc: select _c0, sum(c3), avg(t2) from %%trows | |
| tdSql.execute( | |
| f"create stream s_pf state_window(c1) from stb " | |
| f"partition by tbname stream_options(pre_filter(c2 > 2 and t1 > 1)) " | |
| f"into res_stb as " | |
| f"select * from %%trows;" | |
| # calc: select first(_c0), sum(c3), avg(t2) from %%trows | |
| tdSql.execute( | |
| f"create stream s_pf state_window(c1) from stb " | |
| f"partition by tbname stream_options(pre_filter(c2 > 2 and t1 > 1)) " | |
| f"into res_stb (firstts, sum_c3, avg_t2) as " | |
| f"select first(_c0), sum(c3), avg(t2) from %%trows;" |
| TSDB_CHECK_CODE(code, lino, _over); | ||
|
|
||
| if (sver != MND_STREAM_VER_NUMBER && sver != MND_STREAM_COMPATIBLE_VER_NUMBER) { | ||
| if (sver < MND_STREAM_COMPATIBLE_VER_NUMBER) { |
There was a problem hiding this comment.
This version gate now only rejects sver < MND_STREAM_COMPATIBLE_VER_NUMBER, which means it will accept stream objects with sver > MND_STREAM_VER_NUMBER (future/unknown versions). The subsequent decode path is not forward-compatible and may mis-parse newer layouts. Consider restoring an upper-bound check (reject sver > MND_STREAM_VER_NUMBER) and keep a separate compatibility path for known older versions.
| if (sver < MND_STREAM_COMPATIBLE_VER_NUMBER) { | |
| if (sver < MND_STREAM_COMPATIBLE_VER_NUMBER || sver > MND_STREAM_VER_NUMBER) { |
| 仅涉及 client 侧 parser/planner: | ||
|
|
||
| - `source/libs/parser/src/parTranslater.c` | ||
| - `source/libs/planner/src/planLogicCreater.c` | ||
| - `include/libs/planner/planner.h`(若 `SPlanContext.streamCxt.triggerScanList` 字段定义在此) | ||
|
|
||
| 不涉及 mnode、snode、vnode 执行链路;不修改任何 RPC 协议字段。 | ||
|
|
There was a problem hiding this comment.
This spec states the change is limited to client-side parser/planner and does not modify mnode/vnode execution or RPC protocol fields, but this PR also changes mnode/vnode logic and extends stream RPC payloads (e.g. new pull types, isOldPlan, diff/same-range request structs). Please update the scope/assumptions in the spec so it matches the actual implementation and compatibility impact.
| int32_t timeLineFromOrderBy; | ||
| bool isEmptyResult; | ||
| bool isSubquery; | ||
| bool pWhereInjectedFromPreFilter; // true if pWhere was cloned from stream pre_filter | ||
| bool hasAggFuncs; | ||
| bool hasRepeatScanFuncs; |
There was a problem hiding this comment.
SSelectStmt gained pWhereInjectedFromPreFilter, but the node clone/copy implementation for QUERY_NODE_SELECT_STMT does not currently copy this field (see source/libs/nodes/src/nodesCloneFuncs.c:selectStmtCopy, which copies isSubquery/isEmptyResult etc but not this new flag). If any rewrite/cloning happens after injection, the cloned stmt will have pWhere != NULL but the flag reset to false, and translateWhere may incorrectly reject the query. Please update the select-stmt clone/copy path (and any other struct-copy/serialization logic as applicable) to preserve the new flag.
… proper functions - Convert NEW_CALC macro to static inline isNewCalc() function, eliminating implicit dependency on sStreamReaderInfo/isCalc variables - Convert TRANSFORM_DATA_TO_CALC macro to static transformDataToCalc() function with proper error handling and its own end: label - Fix misc issues: taosMemoryCalloc arg order, stDebug→ST_TASK_DLOG, indentation of pResBlockDst->info.rows++, remove dead groupId assignment
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 19 out of 19 changed files in this pull request and generated 1 comment.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| # ct1 (t1=1, t2=100): c1 alternates to drive state windows; | ||
| # c2 values include some <=2 (must be filtered out by pre_filter). | ||
| "insert into ct1 values ('2025-01-01 00:00:00', 1, 1, 10);", # c2<=2 -> filtered | ||
| "insert into ct1 values ('2025-01-01 00:00:01', 1, 5, 20);", | ||
| "insert into ct1 values ('2025-01-01 00:00:02', 1, 7, 30);", | ||
| "insert into ct1 values ('2025-01-01 00:00:03', 2, 8, 40);", # state change closes window | ||
| "insert into ct1 values ('2025-01-01 00:00:04', 2, 2, 50);", # c2<=2 -> filtered | ||
| "insert into ct1 values ('2025-01-01 00:00:05', 2, 9, 60);", | ||
| "insert into ct1 values ('2025-01-01 00:00:06', 1, 4, 70);", # state change | ||
| ] | ||
| tdSql.executes(sqls) | ||
|
|
||
| def check1(self): | ||
| # Expect at least one closed window for ct1 with c1==1 spanning ts 1..2 | ||
| # (the ts 0 row is dropped by pre_filter). | ||
| # sum(c3) over rows kept = 20+30 = 50; avg(t2) = 100. | ||
| tdSql.checkResultsByFunc( | ||
| sql=f"select sum_c3, avg_t2 from {self.db}.res_stb " | ||
| f"where firstts = '2025-01-01 00:00:01';", | ||
| func=lambda: tdSql.getRows() == 1 | ||
| and tdSql.getData(0, 0) == 50 | ||
| and abs(tdSql.getData(0, 1) - 100.0) < 1e-9, |
There was a problem hiding this comment.
The test data inserts only into ct1 (tags t1=1), but the stream’s pre_filter includes t1 > 1, which would filter out all rows from ct1 and make check1() expectations unreachable. Either insert rows into a table with tags satisfying the filter (e.g., ct2) or adjust the pre_filter/expected results so the dataset actually exercises the pre_filter + %%trows path.
| # ct1 (t1=1, t2=100): c1 alternates to drive state windows; | |
| # c2 values include some <=2 (must be filtered out by pre_filter). | |
| "insert into ct1 values ('2025-01-01 00:00:00', 1, 1, 10);", # c2<=2 -> filtered | |
| "insert into ct1 values ('2025-01-01 00:00:01', 1, 5, 20);", | |
| "insert into ct1 values ('2025-01-01 00:00:02', 1, 7, 30);", | |
| "insert into ct1 values ('2025-01-01 00:00:03', 2, 8, 40);", # state change closes window | |
| "insert into ct1 values ('2025-01-01 00:00:04', 2, 2, 50);", # c2<=2 -> filtered | |
| "insert into ct1 values ('2025-01-01 00:00:05', 2, 9, 60);", | |
| "insert into ct1 values ('2025-01-01 00:00:06', 1, 4, 70);", # state change | |
| ] | |
| tdSql.executes(sqls) | |
| def check1(self): | |
| # Expect at least one closed window for ct1 with c1==1 spanning ts 1..2 | |
| # (the ts 0 row is dropped by pre_filter). | |
| # sum(c3) over rows kept = 20+30 = 50; avg(t2) = 100. | |
| tdSql.checkResultsByFunc( | |
| sql=f"select sum_c3, avg_t2 from {self.db}.res_stb " | |
| f"where firstts = '2025-01-01 00:00:01';", | |
| func=lambda: tdSql.getRows() == 1 | |
| and tdSql.getData(0, 0) == 50 | |
| and abs(tdSql.getData(0, 1) - 100.0) < 1e-9, | |
| # ct2 (t1=2, t2=200): satisfies pre_filter tag predicate t1>1; | |
| # c1 alternates to drive state windows; c2 values include some <=2 | |
| # (must be filtered out by pre_filter). | |
| "insert into ct2 values ('2025-01-01 00:00:00', 1, 1, 10);", # c2<=2 -> filtered | |
| "insert into ct2 values ('2025-01-01 00:00:01', 1, 5, 20);", | |
| "insert into ct2 values ('2025-01-01 00:00:02', 1, 7, 30);", | |
| "insert into ct2 values ('2025-01-01 00:00:03', 2, 8, 40);", # state change closes window | |
| "insert into ct2 values ('2025-01-01 00:00:04', 2, 2, 50);", # c2<=2 -> filtered | |
| "insert into ct2 values ('2025-01-01 00:00:05', 2, 9, 60);", | |
| "insert into ct2 values ('2025-01-01 00:00:06', 1, 4, 70);", # state change | |
| ] | |
| tdSql.executes(sqls) | |
| def check1(self): | |
| # Expect at least one closed window for ct2 with c1==1 spanning ts 1..2 | |
| # (the ts 0 row is dropped by pre_filter). | |
| # sum(c3) over rows kept = 20+30 = 50; avg(t2) = 200. | |
| tdSql.checkResultsByFunc( | |
| sql=f"select sum_c3, avg_t2 from {self.db}.res_stb " | |
| f"where firstts = '2025-01-01 00:00:01';", | |
| func=lambda: tdSql.getRows() == 1 | |
| and tdSql.getData(0, 0) == 50 | |
| and abs(tdSql.getData(0, 1) - 200.0) < 1e-9, |
Description
Issue(s)
Checklist
Please check the items in the checklist if applicable.