Skip to content

enh(stream): optimize the logic of history data scan in stream#35196

Open
wangmm0220 wants to merge 14 commits into3.0from
feat/6490635370
Open

enh(stream): optimize the logic of history data scan in stream#35196
wangmm0220 wants to merge 14 commits into3.0from
feat/6490635370

Conversation

@wangmm0220
Copy link
Copy Markdown
Contributor

Description

Issue(s)

  • Close/close/Fix/fix/Resolve/resolve: Issue Link

Checklist

Please check the items in the checklist if applicable.

  • Is the user manual updated?
  • Are the test cases passed and automated?
  • Is there no significant decrease in test coverage?

wangmm0220 and others added 5 commits April 20, 2026 18:52
- Add 8 new enums under ESTriggerPullType for DiffRange/SameRange × trigger/calc × first/next
- Mark all legacy STRIGGER_PULL_TSDB_DATA* enums as DEPRECATED (kept for compatibility)
- Add SSTriggerTableTimeRange, SSTriggerTsdbDataDiffRangeRequest, SSTriggerTsdbDataSameRangeRequest
- Add isHistory flag to SSTriggerSetTableRequest for vSetTableListHistory routing
- Extend SSTriggerPullRequestUnion with new request members

Refs: #6490635370 stream history pull optimization (design v0.3)

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
- Add serialize/deserialize cases for STRIGGER_PULL_TSDB_DATA_DIFF_RANGE family (4 variants) and SAME_RANGE family (4 variants)
- Add isHistory serialization to SET_TABLE with backward-compatible tDecodeIsEnd guard
- Add destroy branch for DIFF_RANGE ranges SArray
- Add streamMsgTest gtest covering 6 round-trip scenarios (isHistory flag, 4 DiffRange variants, 4 SameRange variants, empty/large ranges, double-destroy safety)

Refs: #6490635370 stream history pull optimization

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
…nctions

Introduce a third StreamTableListInfo data source (vSetTableListHistory) to
isolate the TSDB historical path of virtual-table streams from the realtime
WAL path (which keeps using vSetTableList).

- streamReader.h: add vSetTableListHistory field + 4 helper declarations
  (qStreamCopyTableInfoHistory, qStreamGetTableListNumHistory,
  qStreamGetTableListGroupNumHistory, qStreamGetTableListHistory)
- streamReader.c: implement 4 *History helpers (mirror of existing helpers
  but unconditionally point at vSetTableListHistory; no isVtableStream branching)
- streamReader.c: destroy vSetTableListHistory alongside other lists
- qStreamSetTableList already takes a list pointer directly, so no *History
  variant is needed; callers will pass &reader->vSetTableListHistory

Existing isVtableStream ternaries and realtime vSetTableList init/clear are
intentionally unchanged. Routing of new TSDB-history requests to *History
helpers will land in Batch 4.

Refs: #6490635370 stream history pull optimization

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
…History

- qBuildVTableListHistory mirrors qBuildVTableList but writes to
  vSetTableListHistory, isolating TSDB historical metadata from the
  realtime WAL path.
- vnodeProcessStreamTsdbDataDiffRangeReq scans contiguous-by-uid
  SSTriggerTableTimeRange list, accumulates per-uid blocks, truncates
  at STREAM_RETURN_ROWS_NUM, caches SDiffRangeIter in streamTaskMap.
- vnodeProcessStreamTsdbDataSameRangeReq guards against virtual-table
  streams and reuses the createStreamTask / getTableData / buildArrayRsp
  pattern of the trigger-data handler.
- pickSchemasHistory + scanOneTableForRange thin helpers extract the
  per-table-per-range scan boilerplate.
- Dispatch switch routes 8 new STRIGGER_PULL_TSDB_DATA_(DIFF|SAME)_RANGE
  enums to the two handlers.
- vnodeProcessStreamSetTableReq now writes to vSetTableListHistory
  (via qBuildVTableListHistory) when SET_TABLE.isHistory is true,
  preserving current behavior otherwise.

Refs: stream history pull optimization (design v0.3)

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
…y TSWAP isolation

Updates per spec revision (history-pull section):

* Add uidHashTriggerHistory and uidHashCalcHistory to SStreamTriggerReaderInfo
  (same shape as uidHashTrigger/uidHashCalc, dedicated to the TSDB historical
  path of virtual-table streams).
* SetTableRequest with isHistory=true now TSWAPs into the *History fields
  instead of overwriting the realtime uidHashTrigger/uidHashCalc — fixes the
  cross-contamination where a STRIGGER_PULL_SET_TABLE_HISTORY request would
  clobber the realtime mapping.
* Cleanup releases both *History hashes alongside the realtime ones.
* pickSchemasHistory comment now points to the *History hashes as the source
  of per-uid slotId->colId mapping; per-uid column projection in the DiffRange
  vtable handler will land in a follow-up commit.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Copilot AI review requested due to automatic review settings April 22, 2026 03:09
Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces optimizations for history-data pulls in stream processing by adding new message types and dedicated history state management. The changes include new request structures, serialization logic, and vnode-side processing. The review feedback identifies several critical issues, including potential OOM vulnerabilities from unvalidated input, thread-safety concerns in the history task map, and logic errors in group ID lookups for history data. Additionally, suggestions were provided to reduce memory pressure during large table scans and to fix a bug in the range iterator initialization.

Comment on lines +3532 to +3535
int32_t nRanges = 0;
TAOS_CHECK_EXIT(tDecodeI32(&decoder, &nRanges));
if (nRanges > 0) {
pRequest->ranges = taosArrayInit_s(sizeof(SSTriggerTableTimeRange), nRanges);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The nRanges value is read directly from the network buffer without validation. A very large value could lead to an excessive memory allocation in taosArrayInit_s, potentially causing an Out-Of-Memory (OOM) condition. Please add a check to enforce a reasonable upper limit on the number of ranges.

Comment on lines +3234 to 3256
if (sStreamReaderInfo->streamTaskMapHistory == NULL) {
sStreamReaderInfo->streamTaskMapHistory =
taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
STREAM_CHECK_NULL_GOTO(sStreamReaderInfo->streamTaskMapHistory, terrno);
taosHashSetFreeFp(sStreamReaderInfo->streamTaskMapHistory, releaseDiffRangeIterFp);
}
/* If a stale iter exists for this key, freeFp will destroy it on overwrite. */
STREAM_CHECK_RET_GOTO(newDiffRangeIter(req->tsdbDataDiffRangeReq.ranges,
req->tsdbDataDiffRangeReq.ver,
req->tsdbDataDiffRangeReq.order,
isCalc, &iter));
code = taosHashPut(sStreamReaderInfo->streamTaskMapHistory, &key, LONG_BYTES, &iter, sizeof(iter));
if (code != 0) {
destroyDiffRangeIter(iter);
iter = NULL;
STREAM_CHECK_RET_GOTO(code);
}
} else {
void** tmp = taosHashGet(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES);
void** tmp = taosHashGet(sStreamReaderInfo->streamTaskMapHistory, &key, LONG_BYTES);
STREAM_CHECK_NULL_GOTO(tmp, TSDB_CODE_STREAM_NO_CONTEXT);
pTaskInner = *(SStreamReaderTaskInner**)tmp;
STREAM_CHECK_NULL_GOTO(pTaskInner, TSDB_CODE_INTERNAL_ERROR);
iter = *(SDiffRangeIter**)tmp;
STREAM_CHECK_NULL_GOTO(iter, TSDB_CODE_INTERNAL_ERROR);
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Access to streamTaskMapHistory and its initialization are not thread-safe. The sStreamReaderInfo structure is shared across threads, and other members are protected by sStreamReaderInfo->lock. Here, streamTaskMapHistory is initialized with HASH_NO_LOCK and accessed without holding the latch, which is a race condition. All operations on this map should be protected by the reader's lock, and the map should be initialized with appropriate locking (e.g., HASH_LOCK) or the initialization itself should be synchronized.

}

STableKeyInfo pList = {.uid = r->uid,
.groupId = qStreamGetGroupIdFromSet(reader, r->uid)};
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

For history data pulls on virtual table streams, the table-to-group mapping is stored in vSetTableListHistory. qStreamGetGroupIdFromSet likely only searches vSetTableList (the realtime mapping), which will fail to find tables that are only present in the history set. A history-specific lookup or an updated lookup function that considers the history list should be used.

Comment on lines 3117 to 3134
while (1) {
bool hasNext = false;
STREAM_CHECK_RET_GOTO(getTableDataInfo(pTaskInner, &hasNext));
if (!hasNext) {
break;
}
if (!hasNext) break;

SSDataBlock* pBlock = NULL;
STREAM_CHECK_RET_GOTO(getTableData(pTaskInner, &pBlock));
STREAM_CHECK_RET_GOTO(blockDataMerge(pBlockRes, pBlock));
ST_TASK_DLOG("vgId:%d %s get skey:%" PRId64 ", eksy:%" PRId64 ", uid:%" PRId64 ", gId:%" PRIu64 ", rows:%" PRId64,
TD_VID(pVnode), __func__, pBlockRes->info.window.skey, pBlockRes->info.window.ekey,
pBlockRes->info.id.uid, pBlockRes->info.id.groupId, pBlockRes->info.rows);
if (pBlock != NULL && pBlock->info.rows > 0) {
STREAM_CHECK_RET_GOTO(processTag(reader, false, pBlock->info.id.uid, pBlock,
0, pBlock->info.rows, 1));
}
STREAM_CHECK_RET_GOTO(qStreamFilter(pBlock, reader->pFilterInfo, NULL));

if (pBlock->info.rows == 0) {
continue;
}
STREAM_CHECK_RET_GOTO(blockDataMerge(pResult, pBlock));
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This loop scans the entire time range for a single table into memory (pResult) before returning. If a single table range contains a massive amount of data, this can lead to high memory pressure. Since the caller checks for STREAM_RETURN_ROWS_NUM, it would be more efficient to check the accumulated row count inside this loop and support returning a partial scan of the table range.

Comment on lines +3192 to +3197
p->ranges = taosArrayDup(ranges, NULL);
if (p->ranges == NULL) {
taosMemoryFree(p);
return terrno;
}
taosArraySort(p->ranges, diffRangeUidCmp);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

If ranges is NULL (which happens when nRanges is 0 during deserialization), taosArrayDup will return NULL, and the function will return terrno. This causes the pull request to fail with an error instead of returning an empty result. The function should handle NULL ranges gracefully by initializing an empty array.

  p->ranges = (ranges != NULL) ? taosArrayDup(ranges, NULL) : taosArrayInit(0, sizeof(SSTriggerTableTimeRange));
  if (p->ranges == NULL) {
    taosMemoryFree(p);
    return terrno;
  }
  taosArraySort(p->ranges, diffRangeUidCmp);

}

STableKeyInfo pList = {.uid = r->uid,
.groupId = qStreamGetGroupIdFromSet(reader, r->uid)};
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The return value of qStreamGetGroupIdFromSet is not checked. If the table is not found (e.g., returns -1), passing an invalid group ID to createStreamTask may lead to internal errors or crashes.

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces new stream trigger pull message types and reader-side logic intended to optimize history-data scans, especially for virtual-table streams, by switching from legacy TSDB pull messages to new “diff-range/same-range” request patterns and adding separate History state in the stream reader.

Changes:

  • Added new trigger pull types and message structs for TSDB_DATA_DIFF_RANGE / TSDB_DATA_SAME_RANGE plus SET_TABLE_HISTORY, including serialization/deserialization and partial destroy support.
  • Added “history” variants of virtual-table table-list bookkeeping in the stream reader (*History hashes and table lists).
  • Added a new GTest unit test target validating stream message round-trips for the new message types.

Reviewed changes

Copilot reviewed 7 out of 7 changed files in this pull request and generated 6 comments.

Show a summary per file
File Description
source/libs/new-stream/test/streamMsgTest.cpp Adds GTest coverage for new stream pull message serialization/deserialization.
source/libs/new-stream/test/CMakeLists.txt Builds and registers the new streamMsgTest binary.
source/libs/new-stream/src/streamReader.c Adds History table-list helpers and virtual-table list building for history path.
source/dnode/vnode/src/vnd/vnodeStream.c Adds new DIFF_RANGE/SAME_RANGE handlers and history SetTable routing; removes legacy TSDB handlers from dispatch.
source/common/src/msg/streamMsg.c Implements encode/decode + destroy updates for new pull request types.
include/libs/new-stream/streamReader.h Extends SStreamTriggerReaderInfo and exposes new history helper APIs.
include/common/streamMsg.h Defines new pull types and request structs for history scan optimization.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines 3389 to 3393
// add uid to the end
SColumnInfoData* pColData = taosArrayGetLast(pResult->pDataBlock);
STREAM_CHECK_NULL_GOTO(pColData, terrno);
STREAM_CHECK_RET_GOTO(colDataSetNItems(pColData, 0, (const char*)&pBlock->info.id.uid, pResult->info.rows, 1, false));

Copy link

Copilot AI Apr 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

vnodeProcessStreamTsdbDataSameRangeReq() also writes uid into the last column of the response block. As with DiffRange, the trigger-side timestamp sorter treats the last column as the per-row version column, so this will break sorting/merging. Use the version column for version, and introduce a separate uid column if per-row uid is required.

Suggested change
// add uid to the end
SColumnInfoData* pColData = taosArrayGetLast(pResult->pDataBlock);
STREAM_CHECK_NULL_GOTO(pColData, terrno);
STREAM_CHECK_RET_GOTO(colDataSetNItems(pColData, 0, (const char*)&pBlock->info.id.uid, pResult->info.rows, 1, false));
// the last column is treated as the per-row version column by downstream sorting/merging
SColumnInfoData* pColData = taosArrayGetLast(pResult->pDataBlock);
STREAM_CHECK_NULL_GOTO(pColData, terrno);
STREAM_CHECK_RET_GOTO(
colDataSetNItems(pColData, 0, (const char*)&pBlock->info.version, pResult->info.rows, 1, false));

Copilot uses AI. Check for mistakes.
Comment on lines +3331 to +3358
ESTriggerPullType type = req->base.type;
bool isCalc = (type == STRIGGER_PULL_TSDB_DATA_SAME_RANGE_CALC ||
type == STRIGGER_PULL_TSDB_DATA_SAME_RANGE_CALC_NEXT);
bool isFirst = (type == STRIGGER_PULL_TSDB_DATA_SAME_RANGE ||
type == STRIGGER_PULL_TSDB_DATA_SAME_RANGE_CALC);
ESTriggerPullType baseType = isCalc ? STRIGGER_PULL_TSDB_DATA_SAME_RANGE_CALC
: STRIGGER_PULL_TSDB_DATA_SAME_RANGE;
int64_t key = getSessionKey(req->base.sessionId, (int64_t)baseType);

if (req->base.type == STRIGGER_PULL_TSDB_CALC_DATA) {
int32_t pNum = 0;
STREAM_CHECK_RET_GOTO(qStreamGetTableList(sStreamReaderInfo, req->tsdbCalcDataReq.gid, &pList, &pNum));
BUILD_OPTION(options, getSuid(sStreamReaderInfo, pList), req->tsdbCalcDataReq.ver, TSDB_ORDER_ASC, req->tsdbCalcDataReq.skey, req->tsdbCalcDataReq.ekey,
sStreamReaderInfo->triggerCols, false, NULL);
STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, sStreamReaderInfo->triggerResBlock, pList, pNum, &sStreamReaderInfo->storageApi));
ST_TASK_DLOG("vgId:%d %s start, type:%d, isCalc:%d, isFirst:%d, gid:%"PRId64
", skey:%"PRId64", ekey:%"PRId64, TD_VID(pVnode), __func__,
type, isCalc, isFirst,
req->tsdbDataSameRangeReq.gid,
req->tsdbDataSameRangeReq.skey,
req->tsdbDataSameRangeReq.ekey);

STREAM_CHECK_RET_GOTO(taosHashPut(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES, &pTaskInner, sizeof(pTaskInner)));
STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamReaderInfo->triggerResBlock, false, &pTaskInner->pResBlockDst));
SStreamReaderTaskInner* pTaskInner = NULL;

if (isFirst) {
int32_t pNum = 0;
STREAM_CHECK_RET_GOTO(qStreamGetTableList(sStreamReaderInfo, req->tsdbDataSameRangeReq.gid, &pList, &pNum));
void* schemas = sStreamReaderInfo->triggerCols;
BUILD_OPTION(options, getSuid(sStreamReaderInfo, pList), req->tsdbDataSameRangeReq.ver,
req->tsdbDataSameRangeReq.order,
req->tsdbDataSameRangeReq.skey, req->tsdbDataSameRangeReq.ekey,
schemas, false, NULL);
STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, sStreamReaderInfo->triggerResBlock,
pList, pNum, &sStreamReaderInfo->storageApi));
Copy link

Copilot AI Apr 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

vnodeProcessStreamTsdbDataSameRangeReq() computes isCalc/isFirst/baseType, but the scan path always uses triggerCols/triggerResBlock/triggerBlock and never switches to calc columns/blocks for the _CALC/_CALC_NEXT variants. This likely returns the wrong schema for calc pulls and may break callers expecting calc output. Branch on isCalc to use the calc plan’s schema/blocks (or split into two handlers) and ensure the session keying still matches the chosen baseType.

Copilot uses AI. Check for mistakes.
Comment on lines +280 to +284

int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
Copy link

Copilot AI Apr 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test defines its own main(), but CMake uses DEP_ext_gtest(streamMsgTest) which links against GTest::gtest_main. That will cause a duplicate symbol link error (multiple definitions of main). Remove the custom main() or switch to linking only GTest::gtest (without gtest_main) for this target.

Suggested change
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

Copilot uses AI. Check for mistakes.
Comment on lines 4198 to +4249
@@ -4166,25 +4211,6 @@ int32_t vnodeProcessStreamReaderMsg(SVnode* pVnode, SRpcMsg* pMsg, SQueueInfo *p
case STRIGGER_PULL_TSDB_META_NEXT:
STREAM_CHECK_RET_GOTO(vnodeProcessStreamTsdbMetaReq(pVnode, pMsg, &req, sStreamReaderInfo));
break;
case STRIGGER_PULL_TSDB_TS_DATA:
if (sStreamReaderInfo->isVtableStream) {
STREAM_CHECK_RET_GOTO(vnodeProcessStreamTsdbTsDataReqVTable(pVnode, pMsg, &req, sStreamReaderInfo));
} else {
STREAM_CHECK_RET_GOTO(vnodeProcessStreamTsdbTsDataReqNonVTable(pVnode, pMsg, &req, sStreamReaderInfo));
}
break;
case STRIGGER_PULL_TSDB_TRIGGER_DATA:
case STRIGGER_PULL_TSDB_TRIGGER_DATA_NEXT:
STREAM_CHECK_RET_GOTO(vnodeProcessStreamTsdbTriggerDataReq(pVnode, pMsg, &req, sStreamReaderInfo));
break;
case STRIGGER_PULL_TSDB_CALC_DATA:
case STRIGGER_PULL_TSDB_CALC_DATA_NEXT:
STREAM_CHECK_RET_GOTO(vnodeProcessStreamTsdbCalcDataReq(pVnode, pMsg, &req, sStreamReaderInfo));
break;
case STRIGGER_PULL_TSDB_DATA:
case STRIGGER_PULL_TSDB_DATA_NEXT:
STREAM_CHECK_RET_GOTO(vnodeProcessStreamTsdbVirtalDataReq(pVnode, pMsg, &req, sStreamReaderInfo));
break;
case STRIGGER_PULL_GROUP_COL_VALUE:
STREAM_CHECK_RET_GOTO(vnodeProcessStreamGroupColValueReq(pVnode, pMsg, &req, sStreamReaderInfo));
break;
@@ -4209,6 +4235,18 @@ int32_t vnodeProcessStreamReaderMsg(SVnode* pVnode, SRpcMsg* pMsg, SQueueInfo *p
case STRIGGER_PULL_WAL_CALC_DATA_NEW:
STREAM_CHECK_RET_GOTO(vnodeProcessStreamWalCalcDataNewReq(pVnode, pMsg, &req, sStreamReaderInfo));
break;
case STRIGGER_PULL_TSDB_DATA_DIFF_RANGE:
case STRIGGER_PULL_TSDB_DATA_DIFF_RANGE_NEXT:
case STRIGGER_PULL_TSDB_DATA_DIFF_RANGE_CALC:
case STRIGGER_PULL_TSDB_DATA_DIFF_RANGE_CALC_NEXT:
STREAM_CHECK_RET_GOTO(vnodeProcessStreamTsdbDataDiffRangeReq(pVnode, pMsg, &req, sStreamReaderInfo));
break;
case STRIGGER_PULL_TSDB_DATA_SAME_RANGE:
case STRIGGER_PULL_TSDB_DATA_SAME_RANGE_NEXT:
case STRIGGER_PULL_TSDB_DATA_SAME_RANGE_CALC:
case STRIGGER_PULL_TSDB_DATA_SAME_RANGE_CALC_NEXT:
STREAM_CHECK_RET_GOTO(vnodeProcessStreamTsdbDataSameRangeReq(pVnode, pMsg, &req, sStreamReaderInfo));
break;
Copy link

Copilot AI Apr 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reader-side dispatch removed handling for legacy pull types (STRIGGER_PULL_TSDB_TS_DATA / _TRIGGER_DATA / _CALC_DATA / _TSDB_DATA and their _NEXT variants). The trigger task code still sends these types (e.g., streamTriggerTask.c uses STRIGGER_PULL_TSDB_TS_DATA / STRIGGER_PULL_TSDB_TRIGGER_DATA / STRIGGER_PULL_TSDB_CALC_DATA / STRIGGER_PULL_TSDB_DATA), so this change will cause runtime “unknown inner msg type” errors and break history scans until the trigger side is migrated. Either keep backward-compatible handlers here (even if deprecated) or update the trigger side in the same PR to send the new DIFF_RANGE/SAME_RANGE requests.

Copilot uses AI. Check for mistakes.
Comment on lines +3188 to +3198
static int32_t newDiffRangeIter(SArray* ranges, int64_t ver, int8_t order, bool isCalc,
SDiffRangeIter** out) {
SDiffRangeIter* p = taosMemoryCalloc(1, sizeof(SDiffRangeIter));
if (p == NULL) return terrno;
p->ranges = taosArrayDup(ranges, NULL);
if (p->ranges == NULL) {
taosMemoryFree(p);
return terrno;
}
taosArraySort(p->ranges, diffRangeUidCmp);
p->pos = 0;
Copy link

Copilot AI Apr 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

newDiffRangeIter() calls taosArrayDup(ranges, ...) without handling ranges==NULL. taosArrayDup(NULL, ...) returns NULL with TSDB_CODE_INVALID_PARA, so a DiffRange request with NULL/empty ranges (which is valid per the serializer/deserializer and covered by streamMsgTest) will fail at runtime. Treat NULL as an empty range list (no-op) or allocate an empty SArray when ranges is NULL.

Copilot uses AI. Check for mistakes.
Comment on lines +3144 to +3147
// add uid to the end
SColumnInfoData* pColData = taosArrayGetLast(pBlockRes->pDataBlock);
STREAM_CHECK_NULL_GOTO(pColData, terrno);
STREAM_CHECK_RET_GOTO(colDataSetNItems(pColData, 0, (const char*)&r->uid, pBlockRes->info.rows, 1, false));
Copy link

Copilot AI Apr 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

scanOneTableForRange() fills the last column with r->uid ("add uid to the end"), but downstream history sorting logic treats the last column as the per-row version column (see stNewTimestampSorterSetData: it reads taosArrayGetLast(pDataBlock->pDataBlock) as ver, with optional bias). Writing uid here will corrupt version ordering/merge semantics. Populate the version column with the actual row/block version, and if you need a per-row uid, add a dedicated uid column and adjust the verColBias accordingly.

Suggested change
// add uid to the end
SColumnInfoData* pColData = taosArrayGetLast(pBlockRes->pDataBlock);
STREAM_CHECK_NULL_GOTO(pColData, terrno);
STREAM_CHECK_RET_GOTO(colDataSetNItems(pColData, 0, (const char*)&r->uid, pBlockRes->info.rows, 1, false));
// populate the trailing version column used by downstream history sorting
SColumnInfoData* pColData = taosArrayGetLast(pBlockRes->pDataBlock);
STREAM_CHECK_NULL_GOTO(pColData, terrno);
int64_t ver = options.ver;
STREAM_CHECK_RET_GOTO(colDataSetNItems(pColData, 0, (const char*)&ver, pBlockRes->info.rows, 1, false));

Copilot uses AI. Check for mistakes.
Copilot and others added 6 commits April 22, 2026 11:26
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Adds SSelectStmt::pWhereInjectedFromPreFilter and a parser-side helper
injectPreFilterIntoCalcQuery that clones trigger pre_filter into the
calc query's WHERE for every %%trows SELECT. translateWhere bypasses
the pre-existing '%%trows + WHERE' error when the WHERE is the
injected one. This is a behavior-neutral scaffold; the next commit
removes the wrong column-append block on the trigger side.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
The block at parTranslater.c:19264-19275 wrongly appended every column
referenced by the calc SELECT (collected via COLLECT_COL_TYPE_ALL into
triggerScanList) to the trigger SELECT's projection. This forced the
trigger side to scan calc-only columns (e.g. c3, t2 in 'select sum(c3),
avg(t2) from %%trows' with state_window(c1) + pre_filter(c2>2)).

With the prior commit's pre_filter injection making the calc side
self-contained, the trigger projection now stays exactly what
createStreamReqBuildTriggerSelect computed (trigger window cols +
pre_filter cols + tbname()).

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
After the previous commit removed its only reader, the field is
unreferenced. Delete the field declaration, the COLLECT_COL_TYPE_ALL
filler in planLogicCreater.c that was specifically populating it for
SP_PARTITION_ROWS, and the now-useless = NULL initializer in
parTranslater.c.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Adds a regression test mirroring the design-spec example:
  trigger: state_window(c1) + pre_filter(c2>2)
  calc:    select _c0, sum(c3), avg(t2) from %%trows
The aggregate values verify pre_filter is actually applied on the calc
side (rows with c2<=2 must not contribute to sum(c3)/avg(t2)). A
negative case re-confirms that user-written WHERE on %%trows is still
rejected.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
@wangmm0220 wangmm0220 requested review from a team and zitsen as code owners April 22, 2026 06:26
…r path

Adds MND_STREAM_VER_NUMBER=9 with MND_STREAM_OLD_TRIGGER_COLS=8 so that
streams persisted by older dnodes keep the legacy trigger-scan/calc-scan
behavior while freshly created streams use the new optimized path.
isOldPlan is wired through:
  - mndDef.c deserialization (sver == 8 -> isOldPlan = true)
  - SCMCreateStreamReq / streamMsg encode+decode (symmetric)
  - SStreamTriggerReaderInfo at reader runtime

Reader side (vnodeStream.c + streamReader.c):
  - Split pFilterInfo into pFilterInfoTrigger / pFilterInfoCalc; calc
  - Merge duplicated history/realtime table-list helpers behind an
    isHistory parameter
  - Rewrite slotIdList build to walk the hash and assign each element,
    eliminating the uninitialized-slot risk
  - Add TRANSFORM_DATA_TO_CALC macro for legacy-plan calc block reshape
  - Drop unused getSuid / sortCid / sortSSchema helpers
  - Clean up taosHashPut failure paths to release the task on error

Parser (parTranslater.c):
  - Remove the hard rejection of %%trows + virtual-table + pre_filter;
    the combination is now gated by the version bump above

Test:
  - Adjust test_pre_filter_trows_scan_cols to exercise vtable-style
    pre_filter (c2>2 and t1>1) with partition by tbname and select *

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Copilot AI review requested due to automatic review settings April 23, 2026 05:51
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 18 out of 18 changed files in this pull request and generated 5 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines 4264 to +4322
@@ -4164,27 +4275,12 @@ int32_t vnodeProcessStreamReaderMsg(SVnode* pVnode, SRpcMsg* pMsg, SQueueInfo *p
break;
case STRIGGER_PULL_TSDB_META:
case STRIGGER_PULL_TSDB_META_NEXT:
STREAM_CHECK_RET_GOTO(vnodeProcessStreamTsdbMetaReq(pVnode, pMsg, &req, sStreamReaderInfo));
break;
case STRIGGER_PULL_TSDB_TS_DATA:
if (sStreamReaderInfo->isVtableStream) {
STREAM_CHECK_RET_GOTO(vnodeProcessStreamTsdbTsDataReqVTable(pVnode, pMsg, &req, sStreamReaderInfo));
STREAM_CHECK_RET_GOTO(vnodeProcessStreamTsdbMetaVtableReq(pVnode, pMsg, &req, sStreamReaderInfo));
} else {
STREAM_CHECK_RET_GOTO(vnodeProcessStreamTsdbTsDataReqNonVTable(pVnode, pMsg, &req, sStreamReaderInfo));
STREAM_CHECK_RET_GOTO(vnodeProcessStreamTsdbMetaReq(pVnode, pMsg, &req, sStreamReaderInfo));
}
break;
case STRIGGER_PULL_TSDB_TRIGGER_DATA:
case STRIGGER_PULL_TSDB_TRIGGER_DATA_NEXT:
STREAM_CHECK_RET_GOTO(vnodeProcessStreamTsdbTriggerDataReq(pVnode, pMsg, &req, sStreamReaderInfo));
break;
case STRIGGER_PULL_TSDB_CALC_DATA:
case STRIGGER_PULL_TSDB_CALC_DATA_NEXT:
STREAM_CHECK_RET_GOTO(vnodeProcessStreamTsdbCalcDataReq(pVnode, pMsg, &req, sStreamReaderInfo));
break;
case STRIGGER_PULL_TSDB_DATA:
case STRIGGER_PULL_TSDB_DATA_NEXT:
STREAM_CHECK_RET_GOTO(vnodeProcessStreamTsdbVirtalDataReq(pVnode, pMsg, &req, sStreamReaderInfo));
break;
case STRIGGER_PULL_GROUP_COL_VALUE:
STREAM_CHECK_RET_GOTO(vnodeProcessStreamGroupColValueReq(pVnode, pMsg, &req, sStreamReaderInfo));
break;
@@ -4207,7 +4303,23 @@ int32_t vnodeProcessStreamReaderMsg(SVnode* pVnode, SRpcMsg* pMsg, SQueueInfo *p
STREAM_CHECK_RET_GOTO(vnodeProcessStreamWalMetaDataNewReq(pVnode, pMsg, &req, sStreamReaderInfo));
break;
case STRIGGER_PULL_WAL_CALC_DATA_NEW:
STREAM_CHECK_RET_GOTO(vnodeProcessStreamWalCalcDataNewReq(pVnode, pMsg, &req, sStreamReaderInfo));
if (sStreamReaderInfo->isOldPlan) {
STREAM_CHECK_RET_GOTO(vnodeProcessStreamWalCalcDataNewReq(pVnode, pMsg, &req, sStreamReaderInfo));
} else {
STREAM_CHECK_RET_GOTO(vnodeProcessStreamWalDataNewReq(pVnode, pMsg, &req, sStreamReaderInfo));
}
break;
case STRIGGER_PULL_TSDB_DATA_DIFF_RANGE:
case STRIGGER_PULL_TSDB_DATA_DIFF_RANGE_NEXT:
case STRIGGER_PULL_TSDB_DATA_DIFF_RANGE_CALC:
case STRIGGER_PULL_TSDB_DATA_DIFF_RANGE_CALC_NEXT:
STREAM_CHECK_RET_GOTO(vnodeProcessStreamTsdbDataDiffRangeReq(pVnode, pMsg, &req, sStreamReaderInfo));
break;
case STRIGGER_PULL_TSDB_DATA_SAME_RANGE:
case STRIGGER_PULL_TSDB_DATA_SAME_RANGE_NEXT:
case STRIGGER_PULL_TSDB_DATA_SAME_RANGE_CALC:
case STRIGGER_PULL_TSDB_DATA_SAME_RANGE_CALC_NEXT:
STREAM_CHECK_RET_GOTO(vnodeProcessStreamTsdbDataSameRangeReq(pVnode, pMsg, &req, sStreamReaderInfo));
Copy link

Copilot AI Apr 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In vnodeProcessStreamReaderMsg, the switch no longer handles the legacy history pull request types (e.g. STRIGGER_PULL_TSDB_TS_DATA, STRIGGER_PULL_TSDB_TRIGGER_DATA(_NEXT), STRIGGER_PULL_TSDB_CALC_DATA(_NEXT), STRIGGER_PULL_TSDB_DATA(_NEXT)). The trigger-side history logic in streamTriggerTask.c still sends these types, so the reader will now hit the default branch and return an error for history scans. Either keep backward-compatible handling for these deprecated types (map them to the new diff/same-range handlers) or update the sender to stop emitting the legacy types in the same change-set/version gate (e.g. keyed off isOldPlan).

Copilot uses AI. Check for mistakes.
Comment on lines +54 to +59
# calc: select _c0, sum(c3), avg(t2) from %%trows
tdSql.execute(
f"create stream s_pf state_window(c1) from stb "
f"partition by tbname stream_options(pre_filter(c2 > 2 and t1 > 1)) "
f"into res_stb as "
f"select * from %%trows;"
Copy link

Copilot AI Apr 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The stream created here uses into res_stb as select * from %%trows;, but check1() later queries sum_c3/avg_t2 columns and expects aggregated results. As written, res_stb will not contain those columns, so the test will fail (or validate the wrong behavior). Align the stream definition/output table schema with the assertions in check1() (or change the assertions to match select *).

Suggested change
# calc: select _c0, sum(c3), avg(t2) from %%trows
tdSql.execute(
f"create stream s_pf state_window(c1) from stb "
f"partition by tbname stream_options(pre_filter(c2 > 2 and t1 > 1)) "
f"into res_stb as "
f"select * from %%trows;"
# calc: select first(_c0), sum(c3), avg(t2) from %%trows
tdSql.execute(
f"create stream s_pf state_window(c1) from stb "
f"partition by tbname stream_options(pre_filter(c2 > 2 and t1 > 1)) "
f"into res_stb (firstts, sum_c3, avg_t2) as "
f"select first(_c0), sum(c3), avg(t2) from %%trows;"

Copilot uses AI. Check for mistakes.
TSDB_CHECK_CODE(code, lino, _over);

if (sver != MND_STREAM_VER_NUMBER && sver != MND_STREAM_COMPATIBLE_VER_NUMBER) {
if (sver < MND_STREAM_COMPATIBLE_VER_NUMBER) {
Copy link

Copilot AI Apr 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This version gate now only rejects sver < MND_STREAM_COMPATIBLE_VER_NUMBER, which means it will accept stream objects with sver > MND_STREAM_VER_NUMBER (future/unknown versions). The subsequent decode path is not forward-compatible and may mis-parse newer layouts. Consider restoring an upper-bound check (reject sver > MND_STREAM_VER_NUMBER) and keep a separate compatibility path for known older versions.

Suggested change
if (sver < MND_STREAM_COMPATIBLE_VER_NUMBER) {
if (sver < MND_STREAM_COMPATIBLE_VER_NUMBER || sver > MND_STREAM_VER_NUMBER) {

Copilot uses AI. Check for mistakes.
Comment on lines +17 to +24
仅涉及 client 侧 parser/planner:

- `source/libs/parser/src/parTranslater.c`
- `source/libs/planner/src/planLogicCreater.c`
- `include/libs/planner/planner.h`(若 `SPlanContext.streamCxt.triggerScanList` 字段定义在此)

不涉及 mnode、snode、vnode 执行链路;不修改任何 RPC 协议字段。

Copy link

Copilot AI Apr 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This spec states the change is limited to client-side parser/planner and does not modify mnode/vnode execution or RPC protocol fields, but this PR also changes mnode/vnode logic and extends stream RPC payloads (e.g. new pull types, isOldPlan, diff/same-range request structs). Please update the scope/assumptions in the spec so it matches the actual implementation and compatibility impact.

Copilot uses AI. Check for mistakes.
Comment on lines 706 to 711
int32_t timeLineFromOrderBy;
bool isEmptyResult;
bool isSubquery;
bool pWhereInjectedFromPreFilter; // true if pWhere was cloned from stream pre_filter
bool hasAggFuncs;
bool hasRepeatScanFuncs;
Copy link

Copilot AI Apr 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SSelectStmt gained pWhereInjectedFromPreFilter, but the node clone/copy implementation for QUERY_NODE_SELECT_STMT does not currently copy this field (see source/libs/nodes/src/nodesCloneFuncs.c:selectStmtCopy, which copies isSubquery/isEmptyResult etc but not this new flag). If any rewrite/cloning happens after injection, the cloned stmt will have pWhere != NULL but the flag reset to false, and translateWhere may incorrectly reject the query. Please update the select-stmt clone/copy path (and any other struct-copy/serialization logic as applicable) to preserve the new flag.

Copilot uses AI. Check for mistakes.
… proper functions

- Convert NEW_CALC macro to static inline isNewCalc() function, eliminating
  implicit dependency on sStreamReaderInfo/isCalc variables
- Convert TRANSFORM_DATA_TO_CALC macro to static transformDataToCalc() function
  with proper error handling and its own end: label
- Fix misc issues: taosMemoryCalloc arg order, stDebug→ST_TASK_DLOG,
  indentation of pResBlockDst->info.rows++, remove dead groupId assignment
Copilot AI review requested due to automatic review settings April 23, 2026 11:22
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 19 out of 19 changed files in this pull request and generated 1 comment.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +64 to +85
# ct1 (t1=1, t2=100): c1 alternates to drive state windows;
# c2 values include some <=2 (must be filtered out by pre_filter).
"insert into ct1 values ('2025-01-01 00:00:00', 1, 1, 10);", # c2<=2 -> filtered
"insert into ct1 values ('2025-01-01 00:00:01', 1, 5, 20);",
"insert into ct1 values ('2025-01-01 00:00:02', 1, 7, 30);",
"insert into ct1 values ('2025-01-01 00:00:03', 2, 8, 40);", # state change closes window
"insert into ct1 values ('2025-01-01 00:00:04', 2, 2, 50);", # c2<=2 -> filtered
"insert into ct1 values ('2025-01-01 00:00:05', 2, 9, 60);",
"insert into ct1 values ('2025-01-01 00:00:06', 1, 4, 70);", # state change
]
tdSql.executes(sqls)

def check1(self):
# Expect at least one closed window for ct1 with c1==1 spanning ts 1..2
# (the ts 0 row is dropped by pre_filter).
# sum(c3) over rows kept = 20+30 = 50; avg(t2) = 100.
tdSql.checkResultsByFunc(
sql=f"select sum_c3, avg_t2 from {self.db}.res_stb "
f"where firstts = '2025-01-01 00:00:01';",
func=lambda: tdSql.getRows() == 1
and tdSql.getData(0, 0) == 50
and abs(tdSql.getData(0, 1) - 100.0) < 1e-9,
Copy link

Copilot AI Apr 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test data inserts only into ct1 (tags t1=1), but the stream’s pre_filter includes t1 > 1, which would filter out all rows from ct1 and make check1() expectations unreachable. Either insert rows into a table with tags satisfying the filter (e.g., ct2) or adjust the pre_filter/expected results so the dataset actually exercises the pre_filter + %%trows path.

Suggested change
# ct1 (t1=1, t2=100): c1 alternates to drive state windows;
# c2 values include some <=2 (must be filtered out by pre_filter).
"insert into ct1 values ('2025-01-01 00:00:00', 1, 1, 10);", # c2<=2 -> filtered
"insert into ct1 values ('2025-01-01 00:00:01', 1, 5, 20);",
"insert into ct1 values ('2025-01-01 00:00:02', 1, 7, 30);",
"insert into ct1 values ('2025-01-01 00:00:03', 2, 8, 40);", # state change closes window
"insert into ct1 values ('2025-01-01 00:00:04', 2, 2, 50);", # c2<=2 -> filtered
"insert into ct1 values ('2025-01-01 00:00:05', 2, 9, 60);",
"insert into ct1 values ('2025-01-01 00:00:06', 1, 4, 70);", # state change
]
tdSql.executes(sqls)
def check1(self):
# Expect at least one closed window for ct1 with c1==1 spanning ts 1..2
# (the ts 0 row is dropped by pre_filter).
# sum(c3) over rows kept = 20+30 = 50; avg(t2) = 100.
tdSql.checkResultsByFunc(
sql=f"select sum_c3, avg_t2 from {self.db}.res_stb "
f"where firstts = '2025-01-01 00:00:01';",
func=lambda: tdSql.getRows() == 1
and tdSql.getData(0, 0) == 50
and abs(tdSql.getData(0, 1) - 100.0) < 1e-9,
# ct2 (t1=2, t2=200): satisfies pre_filter tag predicate t1>1;
# c1 alternates to drive state windows; c2 values include some <=2
# (must be filtered out by pre_filter).
"insert into ct2 values ('2025-01-01 00:00:00', 1, 1, 10);", # c2<=2 -> filtered
"insert into ct2 values ('2025-01-01 00:00:01', 1, 5, 20);",
"insert into ct2 values ('2025-01-01 00:00:02', 1, 7, 30);",
"insert into ct2 values ('2025-01-01 00:00:03', 2, 8, 40);", # state change closes window
"insert into ct2 values ('2025-01-01 00:00:04', 2, 2, 50);", # c2<=2 -> filtered
"insert into ct2 values ('2025-01-01 00:00:05', 2, 9, 60);",
"insert into ct2 values ('2025-01-01 00:00:06', 1, 4, 70);", # state change
]
tdSql.executes(sqls)
def check1(self):
# Expect at least one closed window for ct2 with c1==1 spanning ts 1..2
# (the ts 0 row is dropped by pre_filter).
# sum(c3) over rows kept = 20+30 = 50; avg(t2) = 200.
tdSql.checkResultsByFunc(
sql=f"select sum_c3, avg_t2 from {self.db}.res_stb "
f"where firstts = '2025-01-01 00:00:01';",
func=lambda: tdSql.getRows() == 1
and tdSql.getData(0, 0) == 50
and abs(tdSql.getData(0, 1) - 200.0) < 1e-9,

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants