Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 55 additions & 1 deletion source/client/src/clientStmt.c
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ int32_t stmtSwitchStatus(STscStmt* pStmt, STMT_STATUS newStatus) {
case STMT_EXECUTE:
if (STMT_TYPE_QUERY == pStmt->sql.type) {
if (STMT_STATUS_NE(ADD_BATCH) && STMT_STATUS_NE(FETCH_FIELDS) && STMT_STATUS_NE(BIND) &&
STMT_STATUS_NE(BIND_COL)) {
STMT_STATUS_NE(BIND_COL) && STMT_STATUS_NE(PREPARE)) {
code = TSDB_CODE_TSC_STMT_API_ERROR;
}
} else {
Expand Down Expand Up @@ -1656,9 +1656,63 @@ int stmtExec(TAOS_STMT* stmt) {
return pStmt->errCode;
}

STMT_STATUS prevStatus = pStmt->sql.status;
// If the user calls execute directly after prepare (no '?' params query),
// we need to parse the SQL first to determine the query type.
if (prevStatus == STMT_PREPARE) {
// Ensure pRequest exists even when the statement was served from cache
// (needParse == false). Without a live pRequest, launchQueryImpl would
// receive a NULL pointer and crash.
if (pStmt->exec.pRequest == NULL) {
STMT_ERR_RET(stmtCreateRequest(pStmt));
}
if (pStmt->bInfo.needParse) {
STMT_ERR_RET(stmtParseSql(pStmt));
}
}

STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_EXECUTE));

if (STMT_TYPE_QUERY == pStmt->sql.type) {
// If execute is called directly after prepare without binding params (no '?' in query),
// perform the full parse sequence to complete query planning.
if (prevStatus == STMT_PREPARE && pStmt->sql.pQuery && pStmt->sql.pQuery->pPrepareRoot) {
if (pStmt->sql.pQuery->placeholderNum > 0) {
// User has unbound '?' parameters — this is misuse.
STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
}
// No placeholders: use qStmtBindParams with colIdx=-1 (loop skipped, just clones pPrepareRoot).
STMT_ERR_RET(qStmtBindParams(pStmt->sql.pQuery, NULL, -1, pStmt->taos->optionInfo.charsetCxt));

SParseContext ctx = {.requestId = pStmt->exec.pRequest->requestId,
.acctId = pStmt->taos->acctId,
.db = pStmt->exec.pRequest->pDb,
.topicQuery = false,
.pSql = pStmt->sql.sqlStr,
.sqlLen = pStmt->sql.sqlLen,
.pMsg = pStmt->exec.pRequest->msgBuf,
.msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
.pTransporter = pStmt->taos->pAppInfo->pTransporter,
.pStmtCb = NULL,
.pUser = pStmt->taos->user,
.setQueryFp = setQueryRequest,
.stmtBindVersion = pStmt->exec.pRequest->stmtBindVersion};
ctx.mgmtEpSet = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp);
STMT_ERR_RET(catalogGetHandle(pStmt->taos->pAppInfo->clusterId, &ctx.pCatalog));
STMT_ERR_RET(qStmtParseQuerySql(&ctx, pStmt->sql.pQuery, NULL));
Comment thread
facetosea marked this conversation as resolved.

if (pStmt->sql.pQuery->haveResultSet) {
STMT_ERR_RET(setResSchemaInfo(&pStmt->exec.pRequest->body.resInfo, pStmt->sql.pQuery->pResSchema,
pStmt->sql.pQuery->numOfResCols, pStmt->sql.pQuery->pResExtSchema, true));
// setResSchemaInfo copies schema data into resInfo->fields; pQuery still owns these pointers.
taosMemoryFreeClear(pStmt->sql.pQuery->pResSchema);
taosMemoryFreeClear(pStmt->sql.pQuery->pResExtSchema);
setResPrecision(&pStmt->exec.pRequest->body.resInfo, pStmt->sql.pQuery->precision);
}
TSWAP(pStmt->exec.pRequest->dbList, pStmt->sql.pQuery->pDbList);
TSWAP(pStmt->exec.pRequest->tableList, pStmt->sql.pQuery->pTableList);
TSWAP(pStmt->exec.pRequest->targetTableList, pStmt->sql.pQuery->pTargetTableList);
}
launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, true, NULL);
} else {
if (pStmt->sql.stbInterlaceMode) {
Expand Down
82 changes: 80 additions & 2 deletions source/client/src/clientStmt2.c
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ static int32_t stmtSwitchStatus(STscStmt2* pStmt, STMT_STATUS newStatus) {
case STMT_EXECUTE:
if (STMT_TYPE_QUERY == pStmt->sql.type) {
if (STMT_STATUS_NE(ADD_BATCH) && STMT_STATUS_NE(FETCH_FIELDS) && STMT_STATUS_NE(BIND) &&
STMT_STATUS_NE(BIND_COL)) {
STMT_STATUS_NE(BIND_COL) && STMT_STATUS_NE(PREPARE)) {
code = TSDB_CODE_TSC_STMT_API_ERROR;
}
} else {
Expand Down Expand Up @@ -2441,13 +2441,32 @@ int stmtExec2(TAOS_STMT2* stmt, int* affected_rows) {
while (atomic_load_8((int8_t*)&pStmt->asyncBindParam.asyncBindNum) > 0) {
(void)taosThreadCondWait(&pStmt->asyncBindParam.waitCond, &pStmt->asyncBindParam.mutex);
}
// Capture prevStatus after waiting so we see the status after any async bind completes.
STMT_STATUS prevStatus = pStmt->sql.status;
STMT_ERR_RET(taosThreadMutexUnlock(&pStmt->asyncBindParam.mutex));

if (pStmt->sql.stbInterlaceMode) {
STMT_ERR_RET(stmtAddBatch2(pStmt));
}

STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_EXECUTE));
// For the prepare → execute shortcut (no bind_param calls), parse the SQL
// now so that sql.type is set before stmtSwitchStatus inspects it.
if (prevStatus == STMT_PREPARE) {
// Ensure pRequest exists even when the statement was served from cache
// (needParse == false). Without a live pRequest, launchQueryImpl would
// receive a NULL pointer and crash.
if (pStmt->exec.pRequest == NULL) {
code = stmtCreateRequest(pStmt);
if (code != TSDB_CODE_SUCCESS) goto _return;
}
if (pStmt->bInfo.needParse) {
code = stmtParseSql(pStmt);
if (code != TSDB_CODE_SUCCESS) goto _return;
}
}

code = stmtSwitchStatus(pStmt, STMT_EXECUTE);
if (code != TSDB_CODE_SUCCESS) goto _return;

if (STMT_TYPE_QUERY != pStmt->sql.type) {
if (pStmt->sql.stbInterlaceMode) {
Expand Down Expand Up @@ -2476,6 +2495,65 @@ int stmtExec2(TAOS_STMT2* stmt, int* affected_rows) {
}
}

// For the prepare→execute shortcut with no '?' params, complete the
// bind/translate step that is normally done inside stmtBindv2.
if (STMT_TYPE_QUERY == pStmt->sql.type && prevStatus == STMT_PREPARE &&
pStmt->sql.pQuery && pStmt->sql.pQuery->pPrepareRoot) {
if (pStmt->sql.pQuery->placeholderNum > 0) {
// User left '?' parameters unbound — this is API misuse.
code = TSDB_CODE_TSC_STMT_API_ERROR;
goto _return;
}
// No placeholders: clone pPrepareRoot → pRoot (loop inside qStmtBindParams2 is skipped).
code = qStmtBindParams2(pStmt->sql.pQuery, NULL, -1, pStmt->taos->optionInfo.charsetCxt);
if (code != TSDB_CODE_SUCCESS) goto _return;

SParseContext ctx = {.requestId = pStmt->exec.pRequest->requestId,
.requestRid = pStmt->exec.pRequest->self,
.acctId = pStmt->taos->acctId,
.db = pStmt->exec.pRequest->pDb,
.topicQuery = false,
.pSql = pStmt->sql.sqlStr,
.sqlLen = pStmt->sql.sqlLen,
.pMsg = pStmt->exec.pRequest->msgBuf,
.msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
.pTransporter = pStmt->taos->pAppInfo->pTransporter,
.pStmtCb = NULL,
.pUser = pStmt->taos->user,
.setQueryFp = setQueryRequest,
.stmtBindVersion = pStmt->exec.pRequest->stmtBindVersion};
Comment thread
facetosea marked this conversation as resolved.
ctx.mgmtEpSet = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp);
code = catalogGetHandle(pStmt->taos->pAppInfo->clusterId, &ctx.pCatalog);
if (code != TSDB_CODE_SUCCESS) goto _return;

SMetaData metaData = {0};
code = stmtFetchMetadataForQuery(pStmt, &ctx, &metaData);
if (code != TSDB_CODE_SUCCESS) goto _return;
Comment thread
facetosea marked this conversation as resolved.

code = qStmtParseQuerySql(&ctx, pStmt->sql.pQuery, &metaData);
if (code == TSDB_CODE_SUCCESS) {
(void)memcpy(&pStmt->exec.pRequest->parseMeta, &metaData, sizeof(SMetaData));
(void)memset(&metaData, 0, sizeof(SMetaData));
Comment thread
facetosea marked this conversation as resolved.
} else {
catalogFreeMetaData(&metaData);
(void)memset(&metaData, 0, sizeof(SMetaData));
goto _return;
}
Comment thread
facetosea marked this conversation as resolved.

if (pStmt->sql.pQuery->haveResultSet) {
code = setResSchemaInfo(&pStmt->exec.pRequest->body.resInfo, pStmt->sql.pQuery->pResSchema,
pStmt->sql.pQuery->numOfResCols, pStmt->sql.pQuery->pResExtSchema, true);
if (code != TSDB_CODE_SUCCESS) goto _return;
// setResSchemaInfo copies schema data into resInfo->fields; pQuery still owns these pointers.
taosMemoryFreeClear(pStmt->sql.pQuery->pResSchema);
taosMemoryFreeClear(pStmt->sql.pQuery->pResExtSchema);
setResPrecision(&pStmt->exec.pRequest->body.resInfo, pStmt->sql.pQuery->precision);
}
TSWAP(pStmt->exec.pRequest->dbList, pStmt->sql.pQuery->pDbList);
TSWAP(pStmt->exec.pRequest->tableList, pStmt->sql.pQuery->pTableList);
TSWAP(pStmt->exec.pRequest->targetTableList, pStmt->sql.pQuery->pTargetTableList);
}

pStmt->asyncResultAvailable = false;
SRequestObj* pRequest = pStmt->exec.pRequest;
__taos_async_fn_t fp = pStmt->options.asyncExecFn;
Expand Down
16 changes: 16 additions & 0 deletions source/libs/nodes/src/nodesCloneFuncs.c
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,19 @@ static int32_t anomalyWindowNodeCopy(const SAnomalyWindowNode* pSrc, SAnomalyWin
return TSDB_CODE_SUCCESS;
}

static int32_t externalWindowNodeCopy(const SExternalWindowNode* pSrc, SExternalWindowNode* pDst) {
CLONE_NODE_FIELD(pCol);
CLONE_NODE_LIST_FIELD(pProjectionList);
CLONE_NODE_LIST_FIELD(pAggFuncList);
COPY_OBJECT_FIELD(timeRange, sizeof(STimeWindow));
CLONE_NODE_FIELD(pTimeRange);
pDst->timezone = pSrc->timezone; // shared pointer, not owned
CLONE_NODE_FIELD(pSubquery);
CLONE_NODE_FIELD(pFill);
COPY_CHAR_ARRAY_FIELD(aliasName);
return TSDB_CODE_SUCCESS;
}

static int32_t sessionWindowNodeCopy(const SSessionWindowNode* pSrc, SSessionWindowNode* pDst) {
CLONE_NODE_FIELD_EX(pCol, SColumnNode*);
CLONE_NODE_FIELD_EX(pGap, SValueNode*);
Expand Down Expand Up @@ -1311,6 +1324,9 @@ int32_t nodesCloneNode(const SNode* pNode, SNode** ppNode) {
case QUERY_NODE_ANOMALY_WINDOW:
code = anomalyWindowNodeCopy((const SAnomalyWindowNode*)pNode, (SAnomalyWindowNode*)pDst);
break;
case QUERY_NODE_EXTERNAL_WINDOW:
code = externalWindowNodeCopy((const SExternalWindowNode*)pNode, (SExternalWindowNode*)pDst);
break;
case QUERY_NODE_SESSION_WINDOW:
code = sessionWindowNodeCopy((const SSessionWindowNode*)pNode, (SSessionWindowNode*)pDst);
break;
Expand Down
1 change: 1 addition & 0 deletions source/libs/nodes/src/nodesUtilFuncs.c
Original file line number Diff line number Diff line change
Expand Up @@ -2310,6 +2310,7 @@ void nodesDestroyNode(SNode* pNode) {
nodesDestroyList(pLogicNode->pColList);
nodesDestroyList(pLogicNode->pProjs);
destroyExtWindowFillInfo(&pLogicNode->extFill);
nodesDestroyNode(pLogicNode->pSubquery);
break;
}
case QUERY_NODE_LOGIC_PLAN_FILL: {
Expand Down
7 changes: 1 addition & 6 deletions source/libs/parser/src/parTranslater.c
Original file line number Diff line number Diff line change
Expand Up @@ -9823,12 +9823,7 @@ static int32_t translateWindow(STranslateContext* pCxt, SSelectStmt* pSelect) {
if (NULL == pSelect->pWindow) {
return TSDB_CODE_SUCCESS;
}
if (QUERY_NODE_EXTERNAL_WINDOW == nodeType(pSelect->pWindow)) {
if (pCxt->pParseCxt->stmtBindVersion > 0) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_WINDOW_PC,
"External window query can not be used in stmt query");
}
}

if (pSelect->pFromTable->type == QUERY_NODE_REAL_TABLE &&
((SRealTableNode*)pSelect->pFromTable)->pMeta->tableType == TSDB_SYSTEM_TABLE) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_SYSTABLE_NOT_ALLOWED, "WINDOW");
Expand Down
15 changes: 15 additions & 0 deletions source/libs/parser/src/parser.c
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,21 @@ static int32_t analyseSemantic(SParseContext* pCxt, SQuery* pQuery, SParseMetaCa
return TSDB_CODE_SUCCESS;
}

// In STMT mode (pStmtCb set), defer translation for all non-INSERT queries,
// even when there are no '?' placeholders. This is required for correctness:
// stmtParseSql() identifies the query type by checking pPrepareRoot (→ QUERY)
// vs pRoot (→ INSERT). If a 0-param SELECT were translated immediately, pRoot
// would be non-NULL and pPrepareRoot NULL, causing stmtParseSql to wrongly
// classify the query as INSERT. Deferring all non-INSERT queries (i.e. those
// whose root is not QUERY_NODE_VNODE_MODIFY_STMT) ensures consistent handling
// and allows qStmtParseQuerySql to perform the actual translation later.
if (TSDB_CODE_SUCCESS == code && pCxt->pStmtCb && pQuery->pRoot &&
nodeType(pQuery->pRoot) != QUERY_NODE_VNODE_MODIFY_STMT &&
nodeType(pQuery->pRoot) != QUERY_NODE_INSERT_STMT) {
TSWAP(pQuery->pPrepareRoot, pQuery->pRoot);
return TSDB_CODE_SUCCESS;
}

if (TSDB_CODE_SUCCESS == code) {
code = translate(pCxt, pQuery, pMetaCache);
}
Expand Down
8 changes: 6 additions & 2 deletions source/libs/planner/src/planLogicCreater.c
Original file line number Diff line number Diff line change
Expand Up @@ -2520,7 +2520,7 @@ static int32_t createWindowLogicNodeByExternal(SLogicPlanContext* pCxt, SExterna
PLAN_ERR_JRET(nodesCloneNode(pFill->pValues, &pWindow->extFill.pFillValues));
}

pWindow->pSubquery = pExternal->pSubquery;
PLAN_ERR_JRET(nodesCloneNode(pExternal->pSubquery, &pWindow->pSubquery));
return createExternalWindowLogicNodeFinalize(pCxt, pSelect, pWindow, pLogicNode);

_return:
Expand Down Expand Up @@ -2581,7 +2581,11 @@ static int32_t createWindowLogicNodeByStreamExternal(SLogicPlanContext* pCxt, SE
return TSDB_CODE_PLAN_INTERNAL_ERROR;
}

pWindow->pSubquery = pExternal->pSubquery;
code = nodesCloneNode(pExternal->pSubquery, &pWindow->pSubquery);
if (code != TSDB_CODE_SUCCESS) {
nodesDestroyNode((SNode*)pWindow);
return code;
}
return createExternalWindowLogicNodeFinalize(pCxt, pSelect, pWindow, pLogicNode);
}

Expand Down
2 changes: 1 addition & 1 deletion source/libs/planner/src/planPhysiCreater.c
Original file line number Diff line number Diff line change
Expand Up @@ -3171,7 +3171,7 @@ static int32_t createExternalWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList*
pExternal->extFill.mode = pWindowLogicNode->extFill.mode;
pExternal->orgTableUid = pWindowLogicNode->orgTableUid;
pExternal->orgTableVgId = pWindowLogicNode->orgTableVgId;
pExternal->pSubquery = pWindowLogicNode->pSubquery;
PLAN_ERR_JRET(nodesCloneNode(pWindowLogicNode->pSubquery, &pExternal->pSubquery));
PLAN_ERR_JRET(nodesCloneNode(pWindowLogicNode->pTimeRange, &pExternal->pTimeRange));

PLAN_ERR_JRET(createWindowPhysiNodeFinalize(pCxt, pChildren, &pExternal->window, pWindowLogicNode));
Expand Down
Loading
Loading