Skip to content

Commit c9a86be

Browse files
committed
feat: add STMT/STMT2 support for external_window queries
- Remove stmtBindVersion guard in translateWindow() to allow STMT mode - Add externalWindowNodeCopy() in nodesCloneFuncs.c for correct clone - Defer SELECT translation in analyseSemantic() when pStmtCb != NULL - Allow STMT_EXECUTE from STMT_PREPARE status for query type - Fix use-after-free: set pResSchema/pResExtSchema to NULL after ownership transfer - Fix direct return while holding error path: use goto _return pattern - Fix stale prevStatus: capture after async-bind wait completes - Add stmt_external_window_regression() and stmt2_external_window_regression()
1 parent b428cf5 commit c9a86be

9 files changed

Lines changed: 393 additions & 55 deletions

File tree

source/client/src/clientStmt.c

Lines changed: 55 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ int32_t stmtSwitchStatus(STscStmt* pStmt, STMT_STATUS newStatus) {
154154
case STMT_EXECUTE:
155155
if (STMT_TYPE_QUERY == pStmt->sql.type) {
156156
if (STMT_STATUS_NE(ADD_BATCH) && STMT_STATUS_NE(FETCH_FIELDS) && STMT_STATUS_NE(BIND) &&
157-
STMT_STATUS_NE(BIND_COL)) {
157+
STMT_STATUS_NE(BIND_COL) && STMT_STATUS_NE(PREPARE)) {
158158
code = TSDB_CODE_TSC_STMT_API_ERROR;
159159
}
160160
} else {
@@ -1656,9 +1656,63 @@ int stmtExec(TAOS_STMT* stmt) {
16561656
return pStmt->errCode;
16571657
}
16581658

1659+
STMT_STATUS prevStatus = pStmt->sql.status;
1660+
// If the user calls execute directly after prepare (no '?' params query),
1661+
// we need to parse the SQL first to determine the query type.
1662+
if (prevStatus == STMT_PREPARE) {
1663+
// Ensure pRequest exists even when the statement was served from cache
1664+
// (needParse == false). Without a live pRequest, launchQueryImpl would
1665+
// receive a NULL pointer and crash.
1666+
if (pStmt->exec.pRequest == NULL) {
1667+
STMT_ERR_RET(stmtCreateRequest(pStmt));
1668+
}
1669+
if (pStmt->bInfo.needParse) {
1670+
STMT_ERR_RET(stmtParseSql(pStmt));
1671+
}
1672+
}
1673+
16591674
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_EXECUTE));
16601675

16611676
if (STMT_TYPE_QUERY == pStmt->sql.type) {
1677+
// If execute is called directly after prepare without binding params (no '?' in query),
1678+
// perform the full parse sequence to complete query planning.
1679+
if (prevStatus == STMT_PREPARE && pStmt->sql.pQuery && pStmt->sql.pQuery->pPrepareRoot) {
1680+
if (pStmt->sql.pQuery->placeholderNum > 0) {
1681+
// User has unbound '?' parameters — this is misuse.
1682+
STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
1683+
}
1684+
// No placeholders: use qStmtBindParams with colIdx=-1 (loop skipped, just clones pPrepareRoot).
1685+
STMT_ERR_RET(qStmtBindParams(pStmt->sql.pQuery, NULL, -1, pStmt->taos->optionInfo.charsetCxt));
1686+
1687+
SParseContext ctx = {.requestId = pStmt->exec.pRequest->requestId,
1688+
.acctId = pStmt->taos->acctId,
1689+
.db = pStmt->exec.pRequest->pDb,
1690+
.topicQuery = false,
1691+
.pSql = pStmt->sql.sqlStr,
1692+
.sqlLen = pStmt->sql.sqlLen,
1693+
.pMsg = pStmt->exec.pRequest->msgBuf,
1694+
.msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
1695+
.pTransporter = pStmt->taos->pAppInfo->pTransporter,
1696+
.pStmtCb = NULL,
1697+
.pUser = pStmt->taos->user,
1698+
.setQueryFp = setQueryRequest,
1699+
.stmtBindVersion = pStmt->exec.pRequest->stmtBindVersion};
1700+
ctx.mgmtEpSet = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp);
1701+
STMT_ERR_RET(catalogGetHandle(pStmt->taos->pAppInfo->clusterId, &ctx.pCatalog));
1702+
STMT_ERR_RET(qStmtParseQuerySql(&ctx, pStmt->sql.pQuery, NULL));
1703+
1704+
if (pStmt->sql.pQuery->haveResultSet) {
1705+
STMT_ERR_RET(setResSchemaInfo(&pStmt->exec.pRequest->body.resInfo, pStmt->sql.pQuery->pResSchema,
1706+
pStmt->sql.pQuery->numOfResCols, pStmt->sql.pQuery->pResExtSchema, true));
1707+
// setResSchemaInfo copies schema data into resInfo->fields; pQuery still owns these pointers.
1708+
taosMemoryFreeClear(pStmt->sql.pQuery->pResSchema);
1709+
taosMemoryFreeClear(pStmt->sql.pQuery->pResExtSchema);
1710+
setResPrecision(&pStmt->exec.pRequest->body.resInfo, pStmt->sql.pQuery->precision);
1711+
}
1712+
TSWAP(pStmt->exec.pRequest->dbList, pStmt->sql.pQuery->pDbList);
1713+
TSWAP(pStmt->exec.pRequest->tableList, pStmt->sql.pQuery->pTableList);
1714+
TSWAP(pStmt->exec.pRequest->targetTableList, pStmt->sql.pQuery->pTargetTableList);
1715+
}
16621716
launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, true, NULL);
16631717
} else {
16641718
if (pStmt->sql.stbInterlaceMode) {

source/client/src/clientStmt2.c

Lines changed: 78 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,7 @@ static int32_t stmtSwitchStatus(STscStmt2* pStmt, STMT_STATUS newStatus) {
195195
case STMT_EXECUTE:
196196
if (STMT_TYPE_QUERY == pStmt->sql.type) {
197197
if (STMT_STATUS_NE(ADD_BATCH) && STMT_STATUS_NE(FETCH_FIELDS) && STMT_STATUS_NE(BIND) &&
198-
STMT_STATUS_NE(BIND_COL)) {
198+
STMT_STATUS_NE(BIND_COL) && STMT_STATUS_NE(PREPARE)) {
199199
code = TSDB_CODE_TSC_STMT_API_ERROR;
200200
}
201201
} else {
@@ -2441,13 +2441,32 @@ int stmtExec2(TAOS_STMT2* stmt, int* affected_rows) {
24412441
while (atomic_load_8((int8_t*)&pStmt->asyncBindParam.asyncBindNum) > 0) {
24422442
(void)taosThreadCondWait(&pStmt->asyncBindParam.waitCond, &pStmt->asyncBindParam.mutex);
24432443
}
2444+
// Capture prevStatus after waiting so we see the status after any async bind completes.
2445+
STMT_STATUS prevStatus = pStmt->sql.status;
24442446
STMT_ERR_RET(taosThreadMutexUnlock(&pStmt->asyncBindParam.mutex));
24452447

24462448
if (pStmt->sql.stbInterlaceMode) {
24472449
STMT_ERR_RET(stmtAddBatch2(pStmt));
24482450
}
24492451

2450-
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_EXECUTE));
2452+
// For the prepare → execute shortcut (no bind_param calls), parse the SQL
2453+
// now so that sql.type is set before stmtSwitchStatus inspects it.
2454+
if (prevStatus == STMT_PREPARE) {
2455+
// Ensure pRequest exists even when the statement was served from cache
2456+
// (needParse == false). Without a live pRequest, launchQueryImpl would
2457+
// receive a NULL pointer and crash.
2458+
if (pStmt->exec.pRequest == NULL) {
2459+
code = stmtCreateRequest(pStmt);
2460+
if (code != TSDB_CODE_SUCCESS) goto _return;
2461+
}
2462+
if (pStmt->bInfo.needParse) {
2463+
code = stmtParseSql(pStmt);
2464+
if (code != TSDB_CODE_SUCCESS) goto _return;
2465+
}
2466+
}
2467+
2468+
code = stmtSwitchStatus(pStmt, STMT_EXECUTE);
2469+
if (code != TSDB_CODE_SUCCESS) goto _return;
24512470

24522471
if (STMT_TYPE_QUERY != pStmt->sql.type) {
24532472
if (pStmt->sql.stbInterlaceMode) {
@@ -2476,6 +2495,63 @@ int stmtExec2(TAOS_STMT2* stmt, int* affected_rows) {
24762495
}
24772496
}
24782497

2498+
// For the prepare→execute shortcut with no '?' params, complete the
2499+
// bind/translate step that is normally done inside stmtBindv2.
2500+
if (STMT_TYPE_QUERY == pStmt->sql.type && prevStatus == STMT_PREPARE &&
2501+
pStmt->sql.pQuery && pStmt->sql.pQuery->pPrepareRoot) {
2502+
if (pStmt->sql.pQuery->placeholderNum > 0) {
2503+
// User left '?' parameters unbound — this is API misuse.
2504+
code = TSDB_CODE_TSC_STMT_API_ERROR;
2505+
goto _return;
2506+
}
2507+
// No placeholders: clone pPrepareRoot → pRoot (loop inside qStmtBindParams2 is skipped).
2508+
code = qStmtBindParams2(pStmt->sql.pQuery, NULL, -1, pStmt->taos->optionInfo.charsetCxt);
2509+
if (code != TSDB_CODE_SUCCESS) goto _return;
2510+
2511+
SParseContext ctx = {.requestId = pStmt->exec.pRequest->requestId,
2512+
.acctId = pStmt->taos->acctId,
2513+
.db = pStmt->exec.pRequest->pDb,
2514+
.topicQuery = false,
2515+
.pSql = pStmt->sql.sqlStr,
2516+
.sqlLen = pStmt->sql.sqlLen,
2517+
.pMsg = pStmt->exec.pRequest->msgBuf,
2518+
.msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
2519+
.pTransporter = pStmt->taos->pAppInfo->pTransporter,
2520+
.pStmtCb = NULL,
2521+
.pUser = pStmt->taos->user,
2522+
.stmtBindVersion = pStmt->exec.pRequest->stmtBindVersion};
2523+
ctx.mgmtEpSet = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp);
2524+
code = catalogGetHandle(pStmt->taos->pAppInfo->clusterId, &ctx.pCatalog);
2525+
if (code != TSDB_CODE_SUCCESS) goto _return;
2526+
2527+
SMetaData metaData = {0};
2528+
code = stmtFetchMetadataForQuery(pStmt, &ctx, &metaData);
2529+
if (code != TSDB_CODE_SUCCESS) goto _return;
2530+
2531+
code = qStmtParseQuerySql(&ctx, pStmt->sql.pQuery, &metaData);
2532+
if (code == TSDB_CODE_SUCCESS) {
2533+
(void)memcpy(&pStmt->exec.pRequest->parseMeta, &metaData, sizeof(SMetaData));
2534+
(void)memset(&metaData, 0, sizeof(SMetaData));
2535+
} else {
2536+
catalogFreeMetaData(&metaData);
2537+
(void)memset(&metaData, 0, sizeof(SMetaData));
2538+
goto _return;
2539+
}
2540+
2541+
if (pStmt->sql.pQuery->haveResultSet) {
2542+
code = setResSchemaInfo(&pStmt->exec.pRequest->body.resInfo, pStmt->sql.pQuery->pResSchema,
2543+
pStmt->sql.pQuery->numOfResCols, pStmt->sql.pQuery->pResExtSchema, true);
2544+
if (code != TSDB_CODE_SUCCESS) goto _return;
2545+
// setResSchemaInfo copies schema data into resInfo->fields; pQuery still owns these pointers.
2546+
taosMemoryFreeClear(pStmt->sql.pQuery->pResSchema);
2547+
taosMemoryFreeClear(pStmt->sql.pQuery->pResExtSchema);
2548+
setResPrecision(&pStmt->exec.pRequest->body.resInfo, pStmt->sql.pQuery->precision);
2549+
}
2550+
TSWAP(pStmt->exec.pRequest->dbList, pStmt->sql.pQuery->pDbList);
2551+
TSWAP(pStmt->exec.pRequest->tableList, pStmt->sql.pQuery->pTableList);
2552+
TSWAP(pStmt->exec.pRequest->targetTableList, pStmt->sql.pQuery->pTargetTableList);
2553+
}
2554+
24792555
pStmt->asyncResultAvailable = false;
24802556
SRequestObj* pRequest = pStmt->exec.pRequest;
24812557
__taos_async_fn_t fp = pStmt->options.asyncExecFn;

source/libs/nodes/src/nodesCloneFuncs.c

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -425,6 +425,19 @@ static int32_t anomalyWindowNodeCopy(const SAnomalyWindowNode* pSrc, SAnomalyWin
425425
return TSDB_CODE_SUCCESS;
426426
}
427427

428+
static int32_t externalWindowNodeCopy(const SExternalWindowNode* pSrc, SExternalWindowNode* pDst) {
429+
CLONE_NODE_FIELD(pCol);
430+
CLONE_NODE_LIST_FIELD(pProjectionList);
431+
CLONE_NODE_LIST_FIELD(pAggFuncList);
432+
COPY_OBJECT_FIELD(timeRange, sizeof(STimeWindow));
433+
CLONE_NODE_FIELD(pTimeRange);
434+
pDst->timezone = pSrc->timezone; // shared pointer, not owned
435+
CLONE_NODE_FIELD(pSubquery);
436+
CLONE_NODE_FIELD(pFill);
437+
COPY_CHAR_ARRAY_FIELD(aliasName);
438+
return TSDB_CODE_SUCCESS;
439+
}
440+
428441
static int32_t sessionWindowNodeCopy(const SSessionWindowNode* pSrc, SSessionWindowNode* pDst) {
429442
CLONE_NODE_FIELD_EX(pCol, SColumnNode*);
430443
CLONE_NODE_FIELD_EX(pGap, SValueNode*);
@@ -1311,6 +1324,9 @@ int32_t nodesCloneNode(const SNode* pNode, SNode** ppNode) {
13111324
case QUERY_NODE_ANOMALY_WINDOW:
13121325
code = anomalyWindowNodeCopy((const SAnomalyWindowNode*)pNode, (SAnomalyWindowNode*)pDst);
13131326
break;
1327+
case QUERY_NODE_EXTERNAL_WINDOW:
1328+
code = externalWindowNodeCopy((const SExternalWindowNode*)pNode, (SExternalWindowNode*)pDst);
1329+
break;
13141330
case QUERY_NODE_SESSION_WINDOW:
13151331
code = sessionWindowNodeCopy((const SSessionWindowNode*)pNode, (SSessionWindowNode*)pDst);
13161332
break;

source/libs/nodes/src/nodesUtilFuncs.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2310,6 +2310,7 @@ void nodesDestroyNode(SNode* pNode) {
23102310
nodesDestroyList(pLogicNode->pColList);
23112311
nodesDestroyList(pLogicNode->pProjs);
23122312
destroyExtWindowFillInfo(&pLogicNode->extFill);
2313+
nodesDestroyNode(pLogicNode->pSubquery);
23132314
break;
23142315
}
23152316
case QUERY_NODE_LOGIC_PLAN_FILL: {

source/libs/parser/src/parTranslater.c

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9823,12 +9823,7 @@ static int32_t translateWindow(STranslateContext* pCxt, SSelectStmt* pSelect) {
98239823
if (NULL == pSelect->pWindow) {
98249824
return TSDB_CODE_SUCCESS;
98259825
}
9826-
if (QUERY_NODE_EXTERNAL_WINDOW == nodeType(pSelect->pWindow)) {
9827-
if (pCxt->pParseCxt->stmtBindVersion > 0) {
9828-
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_WINDOW_PC,
9829-
"External window query can not be used in stmt query");
9830-
}
9831-
}
9826+
98329827
if (pSelect->pFromTable->type == QUERY_NODE_REAL_TABLE &&
98339828
((SRealTableNode*)pSelect->pFromTable)->pMeta->tableType == TSDB_SYSTEM_TABLE) {
98349829
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_SYSTABLE_NOT_ALLOWED, "WINDOW");

source/libs/parser/src/parser.c

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -441,6 +441,20 @@ static int32_t analyseSemantic(SParseContext* pCxt, SQuery* pQuery, SParseMetaCa
441441
return TSDB_CODE_SUCCESS;
442442
}
443443

444+
// In STMT mode (pStmtCb set), defer translation for all non-INSERT queries,
445+
// even when there are no '?' placeholders. This is required for correctness:
446+
// stmtParseSql() identifies the query type by checking pPrepareRoot (→ QUERY)
447+
// vs pRoot (→ INSERT). If a 0-param SELECT were translated immediately, pRoot
448+
// would be non-NULL and pPrepareRoot NULL, causing stmtParseSql to wrongly
449+
// classify the query as INSERT. Deferring all non-INSERT queries (i.e. those
450+
// whose root is not QUERY_NODE_VNODE_MODIFY_STMT) ensures consistent handling
451+
// and allows qStmtParseQuerySql to perform the actual translation later.
452+
if (TSDB_CODE_SUCCESS == code && pCxt->pStmtCb && pQuery->pRoot &&
453+
nodeType(pQuery->pRoot) != QUERY_NODE_VNODE_MODIFY_STMT) {
454+
TSWAP(pQuery->pPrepareRoot, pQuery->pRoot);
455+
return TSDB_CODE_SUCCESS;
456+
}
457+
444458
if (TSDB_CODE_SUCCESS == code) {
445459
code = translate(pCxt, pQuery, pMetaCache);
446460
}

source/libs/planner/src/planLogicCreater.c

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2520,7 +2520,7 @@ static int32_t createWindowLogicNodeByExternal(SLogicPlanContext* pCxt, SExterna
25202520
PLAN_ERR_JRET(nodesCloneNode(pFill->pValues, &pWindow->extFill.pFillValues));
25212521
}
25222522

2523-
pWindow->pSubquery = pExternal->pSubquery;
2523+
PLAN_ERR_JRET(nodesCloneNode(pExternal->pSubquery, &pWindow->pSubquery));
25242524
return createExternalWindowLogicNodeFinalize(pCxt, pSelect, pWindow, pLogicNode);
25252525

25262526
_return:
@@ -2581,7 +2581,11 @@ static int32_t createWindowLogicNodeByStreamExternal(SLogicPlanContext* pCxt, SE
25812581
return TSDB_CODE_PLAN_INTERNAL_ERROR;
25822582
}
25832583

2584-
pWindow->pSubquery = pExternal->pSubquery;
2584+
code = nodesCloneNode(pExternal->pSubquery, &pWindow->pSubquery);
2585+
if (code != TSDB_CODE_SUCCESS) {
2586+
nodesDestroyNode((SNode*)pWindow);
2587+
return code;
2588+
}
25852589
return createExternalWindowLogicNodeFinalize(pCxt, pSelect, pWindow, pLogicNode);
25862590
}
25872591

source/libs/planner/src/planPhysiCreater.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3171,7 +3171,7 @@ static int32_t createExternalWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList*
31713171
pExternal->extFill.mode = pWindowLogicNode->extFill.mode;
31723172
pExternal->orgTableUid = pWindowLogicNode->orgTableUid;
31733173
pExternal->orgTableVgId = pWindowLogicNode->orgTableVgId;
3174-
pExternal->pSubquery = pWindowLogicNode->pSubquery;
3174+
PLAN_ERR_JRET(nodesCloneNode(pWindowLogicNode->pSubquery, &pExternal->pSubquery));
31753175
PLAN_ERR_JRET(nodesCloneNode(pWindowLogicNode->pTimeRange, &pExternal->pTimeRange));
31763176

31773177
PLAN_ERR_JRET(createWindowPhysiNodeFinalize(pCxt, pChildren, &pExternal->window, pWindowLogicNode));

0 commit comments

Comments
 (0)