Skip to content

Commit 5a76e8f

Browse files
committed
feat: [6598098782] Support indefrows funcs in window query.
1 parent 88dcf28 commit 5a76e8f

26 files changed

+75912
-293
lines changed

docs/en/14-reference/09-error-code.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -380,6 +380,7 @@ Below are the business error codes for each module.
380380
| 0x8000073D | Alter minReservedMemorySize failed since no enough system available memory | Failed to update minReservedMemorySize | Check current system memory: 1. Total available system memory should not be less than 5G; 2. Available system memory after deducting reserved portion should not be less than 4G |
381381
| 0x8000073E | Duplicate timestamp not allowed in count/event/state window | Duplicate timestamps in the window's input primary key column. When querying supertables with count/event/state window, all subtable data will be sorted by timestamp and merged into one timeline for calculation, which may result in duplicate timestamps, causing errors in some calculations. | Ensure there are no duplicate timestamp data in subtables when querying supertables using count/event/state window. |
382382
| 0x80000741 | VSTB slotId not found for column | Failed to map a source column to a virtual table slotId during query execution | Preserve the scene and logs, report issue on GitHub |
383+
| 0x80000742 | Window state not found | Query execution failed to find the expected window state by its window key, which usually indicates an inconsistency in executor-side window state management. | Preserve the scene and logs, report issue on GitHub |
383384

384385
#### grant
385386

docs/zh/14-reference/09-error-code.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -379,6 +379,7 @@ TSDB 错误码包括 taosc 客户端和服务端,所有语言的连接器无
379379
| 0x8000073D | Alter minReservedMemorySize failed since no enough system available memory | 更新 minReservedMemorySize 失败 | 确认当前的系统内存:1. 系统的可用内存总量不低于 5G;2. 扣除预留部分后系统的可用内存不低于 4G |
380380
| 0x8000073E | Duplicate timestamp not allowed in count/event/state window | 窗口输入主键列有重复时间戳。对状态窗口、事件窗口、计数窗口做超级表查询时,所有子表数据会按照时间戳进行排序后合并为一条时间线进行计算,因此子表合并后的时间戳可能会出现重复,导致某些计算没有意义而报错。 | 如果需要对超级表查询并且使用这些窗口时,确保子表中不存在重复时间戳数据。 |
381381
| 0x80000741 | VSTB slotId not found for column | 查询执行时未能将源列映射到虚拟表的 slotId | 保留现场和日志,github 上报 issue |
382+
| 0x80000742 | Window state not found | 查询执行时未能按窗口键找到对应的窗口状态,通常表示执行器内部窗口状态管理出现异常。 | 保留现场和日志,github 上报 issue |
382383

383384
#### grant
384385

include/libs/function/functionMgt.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -322,6 +322,7 @@ bool fmIsPseudoColumnFunc(int32_t funcId);
322322
bool fmIsScanPseudoColumnFunc(int32_t funcId);
323323
bool fmIsWindowPseudoColumnFunc(int32_t funcId);
324324
bool fmIsWindowClauseFunc(int32_t funcId);
325+
bool fmIsWindowIndefRowsFunc(int32_t funcId);
325326
bool fmIsStreamWindowClauseFunc(int32_t funcId);
326327
bool fmIsSpecialDataRequiredFunc(int32_t funcId);
327328
bool fmIsDynamicScanOptimizedFunc(int32_t funcId);

include/util/taoserror.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -766,6 +766,7 @@ int32_t taosGetErrSize();
766766
#define TSDB_CODE_QRY_SUBQ_NOT_FOUND TAOS_DEF_ERROR_CODE(0, 0x073F)
767767
#define TSDB_CODE_QRY_SUBQ_EXEC_ERROR TAOS_DEF_ERROR_CODE(0, 0x0740)
768768
#define TSDB_CODE_QRY_VSTB_SLOTID_NOT_FOUND TAOS_DEF_ERROR_CODE(0, 0x0741)
769+
#define TSDB_CODE_QRY_WINDOW_STATE_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0742)
769770

770771
// grant
771772
#define TSDB_CODE_GRANT_EXPIRED TAOS_DEF_ERROR_CODE(0, 0x0800)

source/libs/executor/inc/executorInt.h

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -523,6 +523,28 @@ typedef struct SOptrBasicInfo {
523523
int32_t outputTsOrder;
524524
} SOptrBasicInfo;
525525

526+
typedef struct SIndefRowsWindowState {
527+
STimeWindow win; // logical window range for this state
528+
uint64_t groupId; // source group id of this logical window
529+
SResultRow* pRow; // persistent function state for this logical window
530+
SList* pSealedBlocks; // SList<SSDataBlock*> - completed full blocks
531+
SSDataBlock* pCurBlock; // block currently being filled
532+
} SIndefRowsWindowState;
533+
534+
typedef struct SIndefRowsStateKey {
535+
uint64_t groupId;
536+
TSKEY skey;
537+
} SIndefRowsStateKey;
538+
539+
typedef struct SIndefRowsRuntime {
540+
SSHashObj* pOpenStatesMap; // key: SIndefRowsStateKey -> val: SIndefRowsWindowState*
541+
SList* pReadyBlocks; // SList<SSDataBlock*> - blocks ready to return to upstream
542+
SSDataBlock* pReturnedBlock; // last returned block, destroyed on next fetch
543+
SArray* pPseudoColInfo; // pseudo-column slot mapping for direct project
544+
SSDataBlock* pTmpBlock; // reusable temp block for one segment copy
545+
int32_t blockCapacity; // max rows per output block
546+
} SIndefRowsRuntime;
547+
526548
typedef struct SIntervalAggOperatorInfo {
527549
SOptrBasicInfo binfo; // basic info
528550
SAggSupporter aggSup; // aggregate supporter
@@ -537,6 +559,8 @@ typedef struct SIntervalAggOperatorInfo {
537559
STimeWindowAggSupp twAggSup;
538560
SArray* pPrevValues; // SArray<SGroupKeys> used to keep the previous not null value for interpolation.
539561
bool cleanGroupResInfo;
562+
bool indefRowsMode;
563+
SIndefRowsRuntime indefRows;
540564
struct SOperatorInfo* pOperator;
541565
// for limit optimization
542566
bool limited;
@@ -614,6 +638,8 @@ typedef struct SSessionAggOperatorInfo {
614638
int64_t gap; // session window gap
615639
int32_t tsSlotId; // primary timestamp slot id
616640
STimeWindowAggSupp twAggSup;
641+
bool indefRowsMode;
642+
SIndefRowsRuntime indefRows;
617643
struct SOperatorInfo* pOperator;
618644
bool cleanGroupResInfo;
619645
} SSessionAggOperatorInfo;
@@ -629,6 +655,8 @@ typedef struct SStateWindowOperatorInfo {
629655
SStateKeys stateKey;
630656
int32_t tsSlotId; // primary timestamp column slot id
631657
STimeWindowAggSupp twAggSup;
658+
bool indefRowsMode;
659+
SIndefRowsRuntime indefRows;
632660
struct SOperatorInfo* pOperator;
633661
bool cleanGroupResInfo;
634662
STrueForInfo trueForInfo;
@@ -649,6 +677,8 @@ typedef struct SEventWindowOperatorInfo {
649677
bool inWindow;
650678
SResultRow* pRow;
651679
SSDataBlock* pPreDataBlock;
680+
bool indefRowsMode;
681+
SIndefRowsRuntime indefRows;
652682
struct SOperatorInfo* pOperator;
653683
STrueForInfo trueForInfo;
654684
} SEventWindowOperatorInfo;
@@ -668,6 +698,22 @@ void cleanupBasicInfo(SOptrBasicInfo* pInfo);
668698

669699
int32_t initExprSupp(SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfExpr, SFunctionStateStore* pStore);
670700
void checkIndefRowsFuncs(SExprSupp* pSup);
701+
int32_t initIndefRowsRuntime(SIndefRowsRuntime* pRuntime, SqlFunctionCtx* pCtx, int32_t numOfExprs, int32_t blockCapacity);
702+
void resetIndefRowsRuntime(SIndefRowsRuntime* pRuntime, struct SOperatorInfo* pOperator);
703+
void cleanupIndefRowsRuntime(SIndefRowsRuntime* pRuntime, struct SOperatorInfo* pOperator);
704+
SIndefRowsWindowState* findIndefRowsWindowState(const SIndefRowsRuntime* pRuntime, uint64_t groupId, TSKEY winSKey);
705+
int32_t applyIndefRowsFuncOnWindowState(struct SOperatorInfo* pOperator, SIndefRowsRuntime* pRuntime,
706+
SIndefRowsWindowState** ppState, SSDataBlock* pResultTemplate,
707+
uint64_t groupId, const STimeWindow* pWin, SSDataBlock* pInputBlock,
708+
int32_t startRow, int32_t numRows, int32_t inputTsOrder,
709+
int32_t resultRowSize);
710+
int32_t closeIndefRowsWindowState(struct SOperatorInfo* pOperator, SIndefRowsRuntime* pRuntime,
711+
SIndefRowsWindowState* pState);
712+
int32_t closeAllIndefRowsWindowStates(struct SOperatorInfo* pOperator, SIndefRowsRuntime* pRuntime);
713+
void dropIndefRowsWindowState(struct SOperatorInfo* pOperator, SIndefRowsRuntime* pRuntime,
714+
SIndefRowsWindowState* pState);
715+
void dropAllIndefRowsWindowStates(struct SOperatorInfo* pOperator, SIndefRowsRuntime* pRuntime);
716+
SSDataBlock* getNextIndefRowsResultBlock(SIndefRowsRuntime* pRuntime, struct SOperatorInfo* pOperator);
671717
void cleanupExprSupp(SExprSupp* pSup);
672718
void cleanupExprSuppWithoutFilter(SExprSupp* pSupp);
673719

0 commit comments

Comments
 (0)