From 1daa4749e1b6589bd0b834953f9cb20175bb5686 Mon Sep 17 00:00:00 2001 From: gargsaumya Date: Tue, 15 Jul 2025 13:47:27 +0530 Subject: [PATCH 1/7] improving executemany() performance --- mssql_python/cursor.py | 135 ++++++++++++++++---- mssql_python/pybind/ddbc_bindings.cpp | 174 ++++++++++++++++++++++++++ 2 files changed, 284 insertions(+), 25 deletions(-) diff --git a/mssql_python/cursor.py b/mssql_python/cursor.py index 6e2efc9e7..bdadbe3d1 100644 --- a/mssql_python/cursor.py +++ b/mssql_python/cursor.py @@ -18,6 +18,8 @@ from mssql_python.logging_config import get_logger, ENABLE_LOGGING from mssql_python import ddbc_bindings from .row import Row +from typing import Sequence, Any + logger = get_logger() @@ -539,7 +541,7 @@ def _map_data_type(self, sql_type): # Add more mappings as needed } return sql_to_python_type.get(sql_type, str) - + def execute( self, operation: str, @@ -617,38 +619,121 @@ def execute( # Initialize description after execution self._initialize_description() + + + # def executemany(self, operation: str, seq_of_parameters: list) -> None: + # self._check_closed() + # self._reset_cursor() + + # if not seq_of_parameters: + # return + + # # Transpose to column-major format + # columns = list(zip(*seq_of_parameters)) # Each column: tuple of values + # sample_params = seq_of_parameters[0] + # param_info = ddbc_bindings.ParamInfo + + # parameters_type = [] + # for i, sample_val in enumerate(sample_params): + # paraminfo = self._create_parameter_types_list(sample_val, param_info, sample_params, i) + + # # Fix: Adjust string column sizes based on actual max length across all rows + # if isinstance(sample_val, str): + # max_len = max( + # (len(v) for v in columns[i] if isinstance(v, str)), + # default=1 # fallback if all values are None + # ) + # paraminfo.columnSize = max_len + + # parameters_type.append(paraminfo) + + # # Now execute with adjusted parameter types + # ret = ddbc_bindings.SQLExecuteMany( + # self.hstmt, operation, columns, parameters_type, len(seq_of_parameters) + # ) + # check_error(ddbc_sql_const.SQL_HANDLE_STMT.value, self.hstmt, ret) + + # self.rowcount = ddbc_bindings.DDBCSQLRowCount(self.hstmt) + # self._initialize_description() + + def _transpose_rowwise_to_columnwise(self, seq_of_parameters: list) -> list: + """ + Convert list of rows (row-wise) into list of columns (column-wise), + for array binding via ODBC. + + Example: + Input: [(1, "a"), (2, "b")] + Output: [[1, 2], ["a", "b"]] + """ + if not seq_of_parameters: + return [] + + num_params = len(seq_of_parameters[0]) + columnwise = [[] for _ in range(num_params)] + for row in seq_of_parameters: + if len(row) != num_params: + raise ValueError("Inconsistent parameter row size in executemany()") + for i, val in enumerate(row): + columnwise[i].append(val) + return columnwise def executemany(self, operation: str, seq_of_parameters: list) -> None: """ Prepare a database operation and execute it against all parameter sequences. + This version uses column-wise parameter binding and a single batched SQLExecute(). + """ + self._check_closed() + self._reset_cursor() - Args: - operation: SQL query or command. - seq_of_parameters: Sequence of sequences or mappings of parameters. + if not seq_of_parameters: + self.rowcount = 0 + return + + # # Infer types from the first row + # first_row = list(seq_of_parameters[0]) + # param_info = ddbc_bindings.ParamInfo + # parameters_type = [ + # self._create_parameter_types_list(param, param_info, first_row, i) + # for i, param in enumerate(first_row) + # ] + param_info = ddbc_bindings.ParamInfo + param_count = len(seq_of_parameters[0]) + parameters_type = [] - Raises: - Error: If the operation fails. - """ - self._check_closed() # Check if the cursor is closed + for col_index in range(param_count): + # Use the longest string (or most precise value) in that column for inference + column = [row[col_index] for row in seq_of_parameters] + sample_value = column[0] - self._reset_cursor() + # For strings, pick the value with max len + if isinstance(sample_value, str): + sample_value = max(column, key=lambda s: len(str(s)) if s is not None else 0) - first_execution = True - total_rowcount = 0 - for parameters in seq_of_parameters: - parameters = list(parameters) - if ENABLE_LOGGING: - logger.info("Executing query with parameters: %s", parameters) - prepare_stmt = first_execution - first_execution = False - self.execute( - operation, parameters, use_prepare=prepare_stmt, reset_cursor=False - ) - if self.rowcount != -1: - total_rowcount += self.rowcount - else: - total_rowcount = -1 - self.rowcount = total_rowcount + # For decimals, use the one with highest precision + elif isinstance(sample_value, decimal.Decimal): + sample_value = max(column, key=lambda d: len(d.as_tuple().digits) if d is not None else 0) + + param = sample_value + dummy_row = list(seq_of_parameters[0]) # to pass for `_get_numeric_data()` mutation + parameters_type.append(self._create_parameter_types_list(param, param_info, dummy_row, col_index)) + + + # Transpose to column-wise format for array binding + columnwise_params = self._transpose_rowwise_to_columnwise(seq_of_parameters) + + # Execute batched statement + ret = ddbc_bindings.SQLExecuteMany( + self.hstmt, + operation, + columnwise_params, + parameters_type, + len(seq_of_parameters) + ) + check_error(ddbc_sql_const.SQL_HANDLE_STMT.value, self.hstmt, ret) + + self.rowcount = len(seq_of_parameters) + self.last_executed_stmt = operation + self._initialize_description() def fetchone(self) -> Union[None, Row]: """ diff --git a/mssql_python/pybind/ddbc_bindings.cpp b/mssql_python/pybind/ddbc_bindings.cpp index f030fdb06..629b7488b 100644 --- a/mssql_python/pybind/ddbc_bindings.cpp +++ b/mssql_python/pybind/ddbc_bindings.cpp @@ -185,6 +185,15 @@ ParamType* AllocateParamBuffer(std::vector>& paramBuffers, return static_cast(paramBuffers.back().get()); } +template +ParamType* AllocateParamBufferArray(std::vector>& paramBuffers, + size_t count) { + std::shared_ptr buffer(new ParamType[count], std::default_delete()); + ParamType* raw = buffer.get(); + paramBuffers.push_back(buffer); + return raw; +} + std::string DescribeChar(unsigned char ch) { if (ch >= 32 && ch <= 126) { return std::string("'") + static_cast(ch) + "'"; @@ -933,6 +942,170 @@ SQLRETURN SQLExecute_wrap(const SqlHandlePtr statementHandle, } } +SQLRETURN BindParameterArray(SQLHANDLE hStmt, + const py::list& columnwise_params, + const std::vector& paramInfos, + size_t paramSetSize, + std::vector>& paramBuffers) { + LOG("Starting column-wise parameter array binding. paramSetSize: {}, paramCount: {}", paramSetSize, columnwise_params.size()); + + for (int paramIndex = 0; paramIndex < columnwise_params.size(); ++paramIndex) { + const py::list& columnValues = columnwise_params[paramIndex].cast(); + const ParamInfo& info = paramInfos[paramIndex]; + + if (columnValues.size() != paramSetSize) { + ThrowStdException("Column " + std::to_string(paramIndex) + " has mismatched size."); + } + + void* dataPtr = nullptr; + SQLLEN* strLenOrIndArray = nullptr; + SQLLEN bufferLength = 0; + + switch (info.paramCType) { + case SQL_C_LONG: { + int* dataArray = AllocateParamBufferArray(paramBuffers, paramSetSize); + for (size_t i = 0; i < paramSetSize; ++i) { + if (columnValues[i].is_none()) { + if (!strLenOrIndArray) + strLenOrIndArray = AllocateParamBufferArray(paramBuffers, paramSetSize); + dataArray[i] = 0; + strLenOrIndArray[i] = SQL_NULL_DATA; + } else { + dataArray[i] = columnValues[i].cast(); + if (strLenOrIndArray) strLenOrIndArray[i] = 0; + } + } + dataPtr = dataArray; + break; + } + case SQL_C_DOUBLE: { + double* dataArray = AllocateParamBufferArray(paramBuffers, paramSetSize); + for (size_t i = 0; i < paramSetSize; ++i) { + if (columnValues[i].is_none()) { + if (!strLenOrIndArray) + strLenOrIndArray = AllocateParamBufferArray(paramBuffers, paramSetSize); + dataArray[i] = 0; + strLenOrIndArray[i] = SQL_NULL_DATA; + } else { + dataArray[i] = columnValues[i].cast(); + if (strLenOrIndArray) strLenOrIndArray[i] = 0; + } + } + dataPtr = dataArray; + break; + } + case SQL_C_WCHAR: { + SQLWCHAR* wcharArray = AllocateParamBufferArray(paramBuffers, paramSetSize * (info.columnSize + 1)); + strLenOrIndArray = AllocateParamBufferArray(paramBuffers, paramSetSize); + for (size_t i = 0; i < paramSetSize; ++i) { + if (columnValues[i].is_none()) { + strLenOrIndArray[i] = SQL_NULL_DATA; + std::memset(wcharArray + i * (info.columnSize + 1), 0, (info.columnSize + 1) * sizeof(SQLWCHAR)); + continue; + } + + std::wstring wstr = columnValues[i].cast(); + if (wstr.length() > info.columnSize) { + std::string offending = WideToUTF8(wstr); + ThrowStdException("String too long at param " + std::to_string(paramIndex) + + ", value: " + offending + + ", len: " + std::to_string(wstr.length()) + + " > columnSize: " + std::to_string(info.columnSize)); + } + std::memcpy(wcharArray + i * (info.columnSize + 1), wstr.c_str(), (wstr.length() + 1) * sizeof(SQLWCHAR)); + strLenOrIndArray[i] = SQL_NTS; + } + dataPtr = wcharArray; + bufferLength = (info.columnSize + 1) * sizeof(SQLWCHAR); + break; + } + case SQL_C_TINYINT: + case SQL_C_UTINYINT: { + unsigned char* dataArray = AllocateParamBufferArray(paramBuffers, paramSetSize); + for (size_t i = 0; i < paramSetSize; ++i) { + const py::handle& value = columnValues[i]; + if (!py::isinstance(value)) { + ThrowStdException(MakeParamMismatchErrorStr(info.paramCType, paramIndex)); + } + int intVal = value.cast(); + if (intVal < 0 || intVal > 255) { + ThrowStdException("UTINYINT value out of range at rowIndex " + std::to_string(i)); + } + dataArray[i] = static_cast(intVal); + } + dataPtr = dataArray; + bufferLength = sizeof(unsigned char); + break; + } + case SQL_C_SHORT: { + short* dataArray = AllocateParamBufferArray(paramBuffers, paramSetSize); + for (size_t i = 0; i < paramSetSize; ++i) { + const py::handle& value = columnValues[i]; + if (!py::isinstance(value)) { + ThrowStdException(MakeParamMismatchErrorStr(info.paramCType, paramIndex)); + } + int intVal = value.cast(); + if (intVal < std::numeric_limits::min() || + intVal > std::numeric_limits::max()) { + ThrowStdException("SHORT value out of range at rowIndex " + std::to_string(i)); + } + dataArray[i] = static_cast(intVal); + } + dataPtr = dataArray; + bufferLength = sizeof(short); + break; + } + + default: { + ThrowStdException("BindParameterArray: Unsupported C type: " + std::to_string(info.paramCType)); + } + } + + RETCODE rc = SQLBindParameter_ptr( + hStmt, + static_cast(paramIndex + 1), + static_cast(info.inputOutputType), + static_cast(info.paramCType), + static_cast(info.paramSQLType), + info.columnSize, + info.decimalDigits, + dataPtr, + bufferLength, + strLenOrIndArray + ); + if (!SQL_SUCCEEDED(rc)) { + LOG("Failed to bind array param {}", paramIndex); + return rc; + } + } + LOG("Finished column-wise parameter array binding."); + return SQL_SUCCESS; +} + +SQLRETURN SQLExecuteMany_wrap(const SqlHandlePtr statementHandle, + const std::wstring& query, + const py::list& columnwise_params, + const std::vector& paramInfos, + size_t paramSetSize) { + SQLHANDLE hStmt = statementHandle->get(); + SQLWCHAR* queryPtr; +#if defined(__APPLE__) || defined(__linux__) + std::vector queryBuffer = WStringToSQLWCHAR(query); + queryPtr = queryBuffer.data(); +#else + queryPtr = const_cast(query.c_str()); +#endif + RETCODE rc = SQLPrepare_ptr(hStmt, queryPtr, SQL_NTS); + if (!SQL_SUCCEEDED(rc)) return rc; + std::vector> paramBuffers; + rc = BindParameterArray(hStmt, columnwise_params, paramInfos, paramSetSize, paramBuffers); + if (!SQL_SUCCEEDED(rc)) return rc; + rc = SQLSetStmtAttr_ptr(hStmt, SQL_ATTR_PARAMSET_SIZE, (SQLPOINTER)paramSetSize, 0); + if (!SQL_SUCCEEDED(rc)) return rc; + rc = SQLExecute_ptr(hStmt); + return rc; +} + // Wrap SQLNumResultCols SQLSMALLINT SQLNumResultCols_wrap(SqlHandlePtr statementHandle) { LOG("Get number of columns in result set"); @@ -2112,6 +2285,7 @@ PYBIND11_MODULE(ddbc_bindings, m) { m.def("close_pooling", []() {ConnectionPoolManager::getInstance().closePools();}); m.def("DDBCSQLExecDirect", &SQLExecDirect_wrap, "Execute a SQL query directly"); m.def("DDBCSQLExecute", &SQLExecute_wrap, "Prepare and execute T-SQL statements"); + m.def("SQLExecuteMany", &SQLExecuteMany_wrap, "Execute statement with multiple parameter sets"); m.def("DDBCSQLRowCount", &SQLRowCount_wrap, "Get the number of rows affected by the last statement"); m.def("DDBCSQLFetch", &SQLFetch_wrap, "Fetch the next row from the result set"); From 4ea32c7ad8c484ab920304aa48fcfb3618ec88dd Mon Sep 17 00:00:00 2001 From: gargsaumya Date: Tue, 15 Jul 2025 14:23:56 +0530 Subject: [PATCH 2/7] cleanup --- mssql_python/cursor.py | 64 ++------------------------- mssql_python/pybind/ddbc_bindings.cpp | 2 +- 2 files changed, 4 insertions(+), 62 deletions(-) diff --git a/mssql_python/cursor.py b/mssql_python/cursor.py index bdadbe3d1..b976b67be 100644 --- a/mssql_python/cursor.py +++ b/mssql_python/cursor.py @@ -619,51 +619,11 @@ def execute( # Initialize description after execution self._initialize_description() - - - # def executemany(self, operation: str, seq_of_parameters: list) -> None: - # self._check_closed() - # self._reset_cursor() - - # if not seq_of_parameters: - # return - - # # Transpose to column-major format - # columns = list(zip(*seq_of_parameters)) # Each column: tuple of values - # sample_params = seq_of_parameters[0] - # param_info = ddbc_bindings.ParamInfo - - # parameters_type = [] - # for i, sample_val in enumerate(sample_params): - # paraminfo = self._create_parameter_types_list(sample_val, param_info, sample_params, i) - - # # Fix: Adjust string column sizes based on actual max length across all rows - # if isinstance(sample_val, str): - # max_len = max( - # (len(v) for v in columns[i] if isinstance(v, str)), - # default=1 # fallback if all values are None - # ) - # paraminfo.columnSize = max_len - - # parameters_type.append(paraminfo) - - # # Now execute with adjusted parameter types - # ret = ddbc_bindings.SQLExecuteMany( - # self.hstmt, operation, columns, parameters_type, len(seq_of_parameters) - # ) - # check_error(ddbc_sql_const.SQL_HANDLE_STMT.value, self.hstmt, ret) - - # self.rowcount = ddbc_bindings.DDBCSQLRowCount(self.hstmt) - # self._initialize_description() def _transpose_rowwise_to_columnwise(self, seq_of_parameters: list) -> list: """ Convert list of rows (row-wise) into list of columns (column-wise), - for array binding via ODBC. - - Example: - Input: [(1, "a"), (2, "b")] - Output: [[1, 2], ["a", "b"]] + for array binding. """ if not seq_of_parameters: return [] @@ -688,39 +648,23 @@ def executemany(self, operation: str, seq_of_parameters: list) -> None: if not seq_of_parameters: self.rowcount = 0 return - - # # Infer types from the first row - # first_row = list(seq_of_parameters[0]) - # param_info = ddbc_bindings.ParamInfo - # parameters_type = [ - # self._create_parameter_types_list(param, param_info, first_row, i) - # for i, param in enumerate(first_row) - # ] param_info = ddbc_bindings.ParamInfo param_count = len(seq_of_parameters[0]) parameters_type = [] for col_index in range(param_count): - # Use the longest string (or most precise value) in that column for inference column = [row[col_index] for row in seq_of_parameters] sample_value = column[0] - - # For strings, pick the value with max len if isinstance(sample_value, str): sample_value = max(column, key=lambda s: len(str(s)) if s is not None else 0) - - # For decimals, use the one with highest precision elif isinstance(sample_value, decimal.Decimal): sample_value = max(column, key=lambda d: len(d.as_tuple().digits) if d is not None else 0) param = sample_value - dummy_row = list(seq_of_parameters[0]) # to pass for `_get_numeric_data()` mutation + dummy_row = list(seq_of_parameters[0]) parameters_type.append(self._create_parameter_types_list(param, param_info, dummy_row, col_index)) - - # Transpose to column-wise format for array binding columnwise_params = self._transpose_rowwise_to_columnwise(seq_of_parameters) - # Execute batched statement ret = ddbc_bindings.SQLExecuteMany( self.hstmt, @@ -729,9 +673,7 @@ def executemany(self, operation: str, seq_of_parameters: list) -> None: parameters_type, len(seq_of_parameters) ) - check_error(ddbc_sql_const.SQL_HANDLE_STMT.value, self.hstmt, ret) - - self.rowcount = len(seq_of_parameters) + self.rowcount = ddbc_bindings.DDBCSQLRowCount(self.hstmt) self.last_executed_stmt = operation self._initialize_description() diff --git a/mssql_python/pybind/ddbc_bindings.cpp b/mssql_python/pybind/ddbc_bindings.cpp index 629b7488b..cf203ee60 100644 --- a/mssql_python/pybind/ddbc_bindings.cpp +++ b/mssql_python/pybind/ddbc_bindings.cpp @@ -1012,7 +1012,7 @@ SQLRETURN BindParameterArray(SQLHANDLE hStmt, ", len: " + std::to_string(wstr.length()) + " > columnSize: " + std::to_string(info.columnSize)); } - std::memcpy(wcharArray + i * (info.columnSize + 1), wstr.c_str(), (wstr.length() + 1) * sizeof(SQLWCHAR)); + std::wcsncpy(wcharArray + i * (info.columnSize + 1), wstr.c_str(), info.columnSize + 1); strLenOrIndArray[i] = SQL_NTS; } dataPtr = wcharArray; From ffd36d4404c2808b638fefb27a40a163f191fb12 Mon Sep 17 00:00:00 2001 From: gargsaumya Date: Tue, 15 Jul 2025 20:43:28 +0530 Subject: [PATCH 3/7] Revert "cleanup" This reverts commit 4ea32c7ad8c484ab920304aa48fcfb3618ec88dd. --- mssql_python/cursor.py | 64 +++++++++++++++++++++++++-- mssql_python/pybind/ddbc_bindings.cpp | 2 +- 2 files changed, 62 insertions(+), 4 deletions(-) diff --git a/mssql_python/cursor.py b/mssql_python/cursor.py index b976b67be..bdadbe3d1 100644 --- a/mssql_python/cursor.py +++ b/mssql_python/cursor.py @@ -619,11 +619,51 @@ def execute( # Initialize description after execution self._initialize_description() + + + # def executemany(self, operation: str, seq_of_parameters: list) -> None: + # self._check_closed() + # self._reset_cursor() + + # if not seq_of_parameters: + # return + + # # Transpose to column-major format + # columns = list(zip(*seq_of_parameters)) # Each column: tuple of values + # sample_params = seq_of_parameters[0] + # param_info = ddbc_bindings.ParamInfo + + # parameters_type = [] + # for i, sample_val in enumerate(sample_params): + # paraminfo = self._create_parameter_types_list(sample_val, param_info, sample_params, i) + + # # Fix: Adjust string column sizes based on actual max length across all rows + # if isinstance(sample_val, str): + # max_len = max( + # (len(v) for v in columns[i] if isinstance(v, str)), + # default=1 # fallback if all values are None + # ) + # paraminfo.columnSize = max_len + + # parameters_type.append(paraminfo) + + # # Now execute with adjusted parameter types + # ret = ddbc_bindings.SQLExecuteMany( + # self.hstmt, operation, columns, parameters_type, len(seq_of_parameters) + # ) + # check_error(ddbc_sql_const.SQL_HANDLE_STMT.value, self.hstmt, ret) + + # self.rowcount = ddbc_bindings.DDBCSQLRowCount(self.hstmt) + # self._initialize_description() def _transpose_rowwise_to_columnwise(self, seq_of_parameters: list) -> list: """ Convert list of rows (row-wise) into list of columns (column-wise), - for array binding. + for array binding via ODBC. + + Example: + Input: [(1, "a"), (2, "b")] + Output: [[1, 2], ["a", "b"]] """ if not seq_of_parameters: return [] @@ -648,23 +688,39 @@ def executemany(self, operation: str, seq_of_parameters: list) -> None: if not seq_of_parameters: self.rowcount = 0 return + + # # Infer types from the first row + # first_row = list(seq_of_parameters[0]) + # param_info = ddbc_bindings.ParamInfo + # parameters_type = [ + # self._create_parameter_types_list(param, param_info, first_row, i) + # for i, param in enumerate(first_row) + # ] param_info = ddbc_bindings.ParamInfo param_count = len(seq_of_parameters[0]) parameters_type = [] for col_index in range(param_count): + # Use the longest string (or most precise value) in that column for inference column = [row[col_index] for row in seq_of_parameters] sample_value = column[0] + + # For strings, pick the value with max len if isinstance(sample_value, str): sample_value = max(column, key=lambda s: len(str(s)) if s is not None else 0) + + # For decimals, use the one with highest precision elif isinstance(sample_value, decimal.Decimal): sample_value = max(column, key=lambda d: len(d.as_tuple().digits) if d is not None else 0) param = sample_value - dummy_row = list(seq_of_parameters[0]) + dummy_row = list(seq_of_parameters[0]) # to pass for `_get_numeric_data()` mutation parameters_type.append(self._create_parameter_types_list(param, param_info, dummy_row, col_index)) + + # Transpose to column-wise format for array binding columnwise_params = self._transpose_rowwise_to_columnwise(seq_of_parameters) + # Execute batched statement ret = ddbc_bindings.SQLExecuteMany( self.hstmt, @@ -673,7 +729,9 @@ def executemany(self, operation: str, seq_of_parameters: list) -> None: parameters_type, len(seq_of_parameters) ) - self.rowcount = ddbc_bindings.DDBCSQLRowCount(self.hstmt) + check_error(ddbc_sql_const.SQL_HANDLE_STMT.value, self.hstmt, ret) + + self.rowcount = len(seq_of_parameters) self.last_executed_stmt = operation self._initialize_description() diff --git a/mssql_python/pybind/ddbc_bindings.cpp b/mssql_python/pybind/ddbc_bindings.cpp index cf203ee60..629b7488b 100644 --- a/mssql_python/pybind/ddbc_bindings.cpp +++ b/mssql_python/pybind/ddbc_bindings.cpp @@ -1012,7 +1012,7 @@ SQLRETURN BindParameterArray(SQLHANDLE hStmt, ", len: " + std::to_string(wstr.length()) + " > columnSize: " + std::to_string(info.columnSize)); } - std::wcsncpy(wcharArray + i * (info.columnSize + 1), wstr.c_str(), info.columnSize + 1); + std::memcpy(wcharArray + i * (info.columnSize + 1), wstr.c_str(), (wstr.length() + 1) * sizeof(SQLWCHAR)); strLenOrIndArray[i] = SQL_NTS; } dataPtr = wcharArray; From b47655bb3e99df97b2195984f9134645e4fc9f73 Mon Sep 17 00:00:00 2001 From: gargsaumya Date: Tue, 15 Jul 2025 21:34:56 +0530 Subject: [PATCH 4/7] working --- mssql_python/cursor.py | 54 +----------------------------------------- 1 file changed, 1 insertion(+), 53 deletions(-) diff --git a/mssql_python/cursor.py b/mssql_python/cursor.py index bdadbe3d1..ff308c756 100644 --- a/mssql_python/cursor.py +++ b/mssql_python/cursor.py @@ -619,51 +619,11 @@ def execute( # Initialize description after execution self._initialize_description() - - - # def executemany(self, operation: str, seq_of_parameters: list) -> None: - # self._check_closed() - # self._reset_cursor() - - # if not seq_of_parameters: - # return - - # # Transpose to column-major format - # columns = list(zip(*seq_of_parameters)) # Each column: tuple of values - # sample_params = seq_of_parameters[0] - # param_info = ddbc_bindings.ParamInfo - - # parameters_type = [] - # for i, sample_val in enumerate(sample_params): - # paraminfo = self._create_parameter_types_list(sample_val, param_info, sample_params, i) - - # # Fix: Adjust string column sizes based on actual max length across all rows - # if isinstance(sample_val, str): - # max_len = max( - # (len(v) for v in columns[i] if isinstance(v, str)), - # default=1 # fallback if all values are None - # ) - # paraminfo.columnSize = max_len - - # parameters_type.append(paraminfo) - - # # Now execute with adjusted parameter types - # ret = ddbc_bindings.SQLExecuteMany( - # self.hstmt, operation, columns, parameters_type, len(seq_of_parameters) - # ) - # check_error(ddbc_sql_const.SQL_HANDLE_STMT.value, self.hstmt, ret) - - # self.rowcount = ddbc_bindings.DDBCSQLRowCount(self.hstmt) - # self._initialize_description() def _transpose_rowwise_to_columnwise(self, seq_of_parameters: list) -> list: """ Convert list of rows (row-wise) into list of columns (column-wise), for array binding via ODBC. - - Example: - Input: [(1, "a"), (2, "b")] - Output: [[1, 2], ["a", "b"]] """ if not seq_of_parameters: return [] @@ -689,36 +649,24 @@ def executemany(self, operation: str, seq_of_parameters: list) -> None: self.rowcount = 0 return - # # Infer types from the first row - # first_row = list(seq_of_parameters[0]) - # param_info = ddbc_bindings.ParamInfo - # parameters_type = [ - # self._create_parameter_types_list(param, param_info, first_row, i) - # for i, param in enumerate(first_row) - # ] param_info = ddbc_bindings.ParamInfo param_count = len(seq_of_parameters[0]) parameters_type = [] for col_index in range(param_count): - # Use the longest string (or most precise value) in that column for inference column = [row[col_index] for row in seq_of_parameters] sample_value = column[0] - # For strings, pick the value with max len if isinstance(sample_value, str): sample_value = max(column, key=lambda s: len(str(s)) if s is not None else 0) - # For decimals, use the one with highest precision elif isinstance(sample_value, decimal.Decimal): sample_value = max(column, key=lambda d: len(d.as_tuple().digits) if d is not None else 0) param = sample_value - dummy_row = list(seq_of_parameters[0]) # to pass for `_get_numeric_data()` mutation + dummy_row = list(seq_of_parameters[0]) parameters_type.append(self._create_parameter_types_list(param, param_info, dummy_row, col_index)) - - # Transpose to column-wise format for array binding columnwise_params = self._transpose_rowwise_to_columnwise(seq_of_parameters) # Execute batched statement From 42824f190f74fc724016546fa9d0b8b60b2d2dcf Mon Sep 17 00:00:00 2001 From: gargsaumya Date: Tue, 15 Jul 2025 22:45:45 +0530 Subject: [PATCH 5/7] cleaning up --- mssql_python/cursor.py | 10 ++---- mssql_python/pybind/ddbc_bindings.cpp | 47 ++++++++++++++------------- 2 files changed, 27 insertions(+), 30 deletions(-) diff --git a/mssql_python/cursor.py b/mssql_python/cursor.py index ff308c756..8607180ad 100644 --- a/mssql_python/cursor.py +++ b/mssql_python/cursor.py @@ -18,12 +18,9 @@ from mssql_python.logging_config import get_logger, ENABLE_LOGGING from mssql_python import ddbc_bindings from .row import Row -from typing import Sequence, Any - logger = get_logger() - class Cursor: """ Represents a database cursor, which is used to manage the context of a fetch operation. @@ -541,7 +538,7 @@ def _map_data_type(self, sql_type): # Add more mappings as needed } return sql_to_python_type.get(sql_type, str) - + def execute( self, operation: str, @@ -656,13 +653,10 @@ def executemany(self, operation: str, seq_of_parameters: list) -> None: for col_index in range(param_count): column = [row[col_index] for row in seq_of_parameters] sample_value = column[0] - if isinstance(sample_value, str): sample_value = max(column, key=lambda s: len(str(s)) if s is not None else 0) - elif isinstance(sample_value, decimal.Decimal): sample_value = max(column, key=lambda d: len(d.as_tuple().digits) if d is not None else 0) - param = sample_value dummy_row = list(seq_of_parameters[0]) parameters_type.append(self._create_parameter_types_list(param, param_info, dummy_row, col_index)) @@ -679,7 +673,7 @@ def executemany(self, operation: str, seq_of_parameters: list) -> None: ) check_error(ddbc_sql_const.SQL_HANDLE_STMT.value, self.hstmt, ret) - self.rowcount = len(seq_of_parameters) + self.rowcount = ddbc_bindings.DDBCSQLRowCount(self.hstmt) self.last_executed_stmt = operation self._initialize_description() diff --git a/mssql_python/pybind/ddbc_bindings.cpp b/mssql_python/pybind/ddbc_bindings.cpp index 629b7488b..b2d4c28ca 100644 --- a/mssql_python/pybind/ddbc_bindings.cpp +++ b/mssql_python/pybind/ddbc_bindings.cpp @@ -948,15 +948,12 @@ SQLRETURN BindParameterArray(SQLHANDLE hStmt, size_t paramSetSize, std::vector>& paramBuffers) { LOG("Starting column-wise parameter array binding. paramSetSize: {}, paramCount: {}", paramSetSize, columnwise_params.size()); - for (int paramIndex = 0; paramIndex < columnwise_params.size(); ++paramIndex) { const py::list& columnValues = columnwise_params[paramIndex].cast(); const ParamInfo& info = paramInfos[paramIndex]; - if (columnValues.size() != paramSetSize) { ThrowStdException("Column " + std::to_string(paramIndex) + " has mismatched size."); } - void* dataPtr = nullptr; SQLLEN* strLenOrIndArray = nullptr; SQLLEN bufferLength = 0; @@ -1003,7 +1000,6 @@ SQLRETURN BindParameterArray(SQLHANDLE hStmt, std::memset(wcharArray + i * (info.columnSize + 1), 0, (info.columnSize + 1) * sizeof(SQLWCHAR)); continue; } - std::wstring wstr = columnValues[i].cast(); if (wstr.length() > info.columnSize) { std::string offending = WideToUTF8(wstr); @@ -1023,15 +1019,19 @@ SQLRETURN BindParameterArray(SQLHANDLE hStmt, case SQL_C_UTINYINT: { unsigned char* dataArray = AllocateParamBufferArray(paramBuffers, paramSetSize); for (size_t i = 0; i < paramSetSize; ++i) { - const py::handle& value = columnValues[i]; - if (!py::isinstance(value)) { - ThrowStdException(MakeParamMismatchErrorStr(info.paramCType, paramIndex)); - } - int intVal = value.cast(); - if (intVal < 0 || intVal > 255) { - ThrowStdException("UTINYINT value out of range at rowIndex " + std::to_string(i)); + if (columnValues[i].is_none()) { + if (!strLenOrIndArray) + strLenOrIndArray = AllocateParamBufferArray(paramBuffers, paramSetSize); + dataArray[i] = 0; + strLenOrIndArray[i] = SQL_NULL_DATA; + } else { + int intVal = columnValues[i].cast(); + if (intVal < 0 || intVal > 255) { + ThrowStdException("UTINYINT value out of range at rowIndex " + std::to_string(i)); + } + dataArray[i] = static_cast(intVal); + if (strLenOrIndArray) strLenOrIndArray[i] = 0; } - dataArray[i] = static_cast(intVal); } dataPtr = dataArray; bufferLength = sizeof(unsigned char); @@ -1040,22 +1040,25 @@ SQLRETURN BindParameterArray(SQLHANDLE hStmt, case SQL_C_SHORT: { short* dataArray = AllocateParamBufferArray(paramBuffers, paramSetSize); for (size_t i = 0; i < paramSetSize; ++i) { - const py::handle& value = columnValues[i]; - if (!py::isinstance(value)) { - ThrowStdException(MakeParamMismatchErrorStr(info.paramCType, paramIndex)); - } - int intVal = value.cast(); - if (intVal < std::numeric_limits::min() || - intVal > std::numeric_limits::max()) { - ThrowStdException("SHORT value out of range at rowIndex " + std::to_string(i)); + if (columnValues[i].is_none()) { + if (!strLenOrIndArray) + strLenOrIndArray = AllocateParamBufferArray(paramBuffers, paramSetSize); + dataArray[i] = 0; + strLenOrIndArray[i] = SQL_NULL_DATA; + } else { + int intVal = columnValues[i].cast(); + if (intVal < std::numeric_limits::min() || + intVal > std::numeric_limits::max()) { + ThrowStdException("SHORT value out of range at rowIndex " + std::to_string(i)); + } + dataArray[i] = static_cast(intVal); + if (strLenOrIndArray) strLenOrIndArray[i] = 0; } - dataArray[i] = static_cast(intVal); } dataPtr = dataArray; bufferLength = sizeof(short); break; } - default: { ThrowStdException("BindParameterArray: Unsupported C type: " + std::to_string(info.paramCType)); } From 46a16ffaff4d1cf50577950ec6d0a4fa034a89e2 Mon Sep 17 00:00:00 2001 From: gargsaumya Date: Wed, 16 Jul 2025 12:25:40 +0530 Subject: [PATCH 6/7] resolve comments --- mssql_python/cursor.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/mssql_python/cursor.py b/mssql_python/cursor.py index 8607180ad..d44e10ccb 100644 --- a/mssql_python/cursor.py +++ b/mssql_python/cursor.py @@ -638,6 +638,12 @@ def executemany(self, operation: str, seq_of_parameters: list) -> None: """ Prepare a database operation and execute it against all parameter sequences. This version uses column-wise parameter binding and a single batched SQLExecute(). + Args: + operation: SQL query or command. + seq_of_parameters: Sequence of sequences or mappings of parameters. + + Raises: + Error: If the operation fails. """ self._check_closed() self._reset_cursor() From c860e5d3ec1e51b154c11973ea90a03d2e829fcd Mon Sep 17 00:00:00 2001 From: gargsaumya Date: Thu, 17 Jul 2025 11:56:35 +0530 Subject: [PATCH 7/7] resolve review comments --- mssql_python/cursor.py | 46 ++- mssql_python/pybind/ddbc_bindings.cpp | 412 +++++++++++++++++++------- 2 files changed, 336 insertions(+), 122 deletions(-) diff --git a/mssql_python/cursor.py b/mssql_python/cursor.py index d44e10ccb..ba75a190b 100644 --- a/mssql_python/cursor.py +++ b/mssql_python/cursor.py @@ -617,10 +617,41 @@ def execute( # Initialize description after execution self._initialize_description() + @staticmethod + def _select_best_sample_value(column): + """ + Selects the most representative non-null value from a column for type inference. + + This is used during executemany() to infer SQL/C types based on actual data, + preferring a non-null value that is not the first row to avoid bias from placeholder defaults. + + Args: + column: List of values in the column. + """ + non_nulls = [v for v in column if v is not None] + if not non_nulls: + return None + if all(isinstance(v, int) for v in non_nulls): + # Pick the value with the widest range (min/max) + return max(non_nulls, key=lambda v: abs(v)) + if all(isinstance(v, float) for v in non_nulls): + return 0.0 + if all(isinstance(v, decimal.Decimal) for v in non_nulls): + return max(non_nulls, key=lambda d: len(d.as_tuple().digits)) + if all(isinstance(v, str) for v in non_nulls): + return max(non_nulls, key=lambda s: len(str(s))) + if all(isinstance(v, datetime.datetime) for v in non_nulls): + return datetime.datetime.now() + if all(isinstance(v, datetime.date) for v in non_nulls): + return datetime.date.today() + return non_nulls[0] # fallback + def _transpose_rowwise_to_columnwise(self, seq_of_parameters: list) -> list: """ Convert list of rows (row-wise) into list of columns (column-wise), for array binding via ODBC. + Args: + seq_of_parameters: Sequence of sequences or mappings of parameters. """ if not seq_of_parameters: return [] @@ -658,16 +689,17 @@ def executemany(self, operation: str, seq_of_parameters: list) -> None: for col_index in range(param_count): column = [row[col_index] for row in seq_of_parameters] - sample_value = column[0] - if isinstance(sample_value, str): - sample_value = max(column, key=lambda s: len(str(s)) if s is not None else 0) - elif isinstance(sample_value, decimal.Decimal): - sample_value = max(column, key=lambda d: len(d.as_tuple().digits) if d is not None else 0) - param = sample_value + sample_value = self._select_best_sample_value(column) dummy_row = list(seq_of_parameters[0]) - parameters_type.append(self._create_parameter_types_list(param, param_info, dummy_row, col_index)) + parameters_type.append( + self._create_parameter_types_list(sample_value, param_info, dummy_row, col_index) + ) columnwise_params = self._transpose_rowwise_to_columnwise(seq_of_parameters) + if ENABLE_LOGGING: + logger.info("Executing batch query with %d parameter sets:\n%s", + len(seq_of_parameters),"\n".join(f" {i+1}: {tuple(p) if isinstance(p, (list, tuple)) else p}" for i, p in enumerate(seq_of_parameters)) + ) # Execute batched statement ret = ddbc_bindings.SQLExecuteMany( diff --git a/mssql_python/pybind/ddbc_bindings.cpp b/mssql_python/pybind/ddbc_bindings.cpp index b2d4c28ca..5568ded00 100644 --- a/mssql_python/pybind/ddbc_bindings.cpp +++ b/mssql_python/pybind/ddbc_bindings.cpp @@ -948,139 +948,321 @@ SQLRETURN BindParameterArray(SQLHANDLE hStmt, size_t paramSetSize, std::vector>& paramBuffers) { LOG("Starting column-wise parameter array binding. paramSetSize: {}, paramCount: {}", paramSetSize, columnwise_params.size()); - for (int paramIndex = 0; paramIndex < columnwise_params.size(); ++paramIndex) { - const py::list& columnValues = columnwise_params[paramIndex].cast(); - const ParamInfo& info = paramInfos[paramIndex]; - if (columnValues.size() != paramSetSize) { - ThrowStdException("Column " + std::to_string(paramIndex) + " has mismatched size."); - } - void* dataPtr = nullptr; - SQLLEN* strLenOrIndArray = nullptr; - SQLLEN bufferLength = 0; - switch (info.paramCType) { - case SQL_C_LONG: { - int* dataArray = AllocateParamBufferArray(paramBuffers, paramSetSize); - for (size_t i = 0; i < paramSetSize; ++i) { - if (columnValues[i].is_none()) { - if (!strLenOrIndArray) - strLenOrIndArray = AllocateParamBufferArray(paramBuffers, paramSetSize); - dataArray[i] = 0; - strLenOrIndArray[i] = SQL_NULL_DATA; - } else { - dataArray[i] = columnValues[i].cast(); - if (strLenOrIndArray) strLenOrIndArray[i] = 0; + std::vector> tempBuffers; + + try { + for (int paramIndex = 0; paramIndex < columnwise_params.size(); ++paramIndex) { + const py::list& columnValues = columnwise_params[paramIndex].cast(); + const ParamInfo& info = paramInfos[paramIndex]; + if (columnValues.size() != paramSetSize) { + ThrowStdException("Column " + std::to_string(paramIndex) + " has mismatched size."); + } + void* dataPtr = nullptr; + SQLLEN* strLenOrIndArray = nullptr; + SQLLEN bufferLength = 0; + switch (info.paramCType) { + case SQL_C_LONG: { + int* dataArray = AllocateParamBufferArray(tempBuffers, paramSetSize); + for (size_t i = 0; i < paramSetSize; ++i) { + if (columnValues[i].is_none()) { + if (!strLenOrIndArray) + strLenOrIndArray = AllocateParamBufferArray(tempBuffers, paramSetSize); + dataArray[i] = 0; + strLenOrIndArray[i] = SQL_NULL_DATA; + } else { + dataArray[i] = columnValues[i].cast(); + if (strLenOrIndArray) strLenOrIndArray[i] = 0; + } } + dataPtr = dataArray; + break; } - dataPtr = dataArray; - break; - } - case SQL_C_DOUBLE: { - double* dataArray = AllocateParamBufferArray(paramBuffers, paramSetSize); - for (size_t i = 0; i < paramSetSize; ++i) { - if (columnValues[i].is_none()) { - if (!strLenOrIndArray) - strLenOrIndArray = AllocateParamBufferArray(paramBuffers, paramSetSize); - dataArray[i] = 0; - strLenOrIndArray[i] = SQL_NULL_DATA; - } else { - dataArray[i] = columnValues[i].cast(); - if (strLenOrIndArray) strLenOrIndArray[i] = 0; + case SQL_C_DOUBLE: { + double* dataArray = AllocateParamBufferArray(tempBuffers, paramSetSize); + for (size_t i = 0; i < paramSetSize; ++i) { + if (columnValues[i].is_none()) { + if (!strLenOrIndArray) + strLenOrIndArray = AllocateParamBufferArray(tempBuffers, paramSetSize); + dataArray[i] = 0; + strLenOrIndArray[i] = SQL_NULL_DATA; + } else { + dataArray[i] = columnValues[i].cast(); + if (strLenOrIndArray) strLenOrIndArray[i] = 0; + } } + dataPtr = dataArray; + break; } - dataPtr = dataArray; - break; - } - case SQL_C_WCHAR: { - SQLWCHAR* wcharArray = AllocateParamBufferArray(paramBuffers, paramSetSize * (info.columnSize + 1)); - strLenOrIndArray = AllocateParamBufferArray(paramBuffers, paramSetSize); - for (size_t i = 0; i < paramSetSize; ++i) { - if (columnValues[i].is_none()) { - strLenOrIndArray[i] = SQL_NULL_DATA; - std::memset(wcharArray + i * (info.columnSize + 1), 0, (info.columnSize + 1) * sizeof(SQLWCHAR)); - continue; + case SQL_C_WCHAR: { + SQLWCHAR* wcharArray = AllocateParamBufferArray(tempBuffers, paramSetSize * (info.columnSize + 1)); + strLenOrIndArray = AllocateParamBufferArray(tempBuffers, paramSetSize); + for (size_t i = 0; i < paramSetSize; ++i) { + if (columnValues[i].is_none()) { + strLenOrIndArray[i] = SQL_NULL_DATA; + std::memset(wcharArray + i * (info.columnSize + 1), 0, (info.columnSize + 1) * sizeof(SQLWCHAR)); + } else { + std::wstring wstr = columnValues[i].cast(); + if (wstr.length() > info.columnSize) { + std::string offending = WideToUTF8(wstr); + ThrowStdException("Input string exceeds allowed column size at parameter index " + std::to_string(paramIndex)); + } + std::memcpy(wcharArray + i * (info.columnSize + 1), wstr.c_str(), (wstr.length() + 1) * sizeof(SQLWCHAR)); + strLenOrIndArray[i] = SQL_NTS; + } } - std::wstring wstr = columnValues[i].cast(); - if (wstr.length() > info.columnSize) { - std::string offending = WideToUTF8(wstr); - ThrowStdException("String too long at param " + std::to_string(paramIndex) + - ", value: " + offending + - ", len: " + std::to_string(wstr.length()) + - " > columnSize: " + std::to_string(info.columnSize)); + dataPtr = wcharArray; + bufferLength = (info.columnSize + 1) * sizeof(SQLWCHAR); + break; + } + case SQL_C_TINYINT: + case SQL_C_UTINYINT: { + unsigned char* dataArray = AllocateParamBufferArray(tempBuffers, paramSetSize); + for (size_t i = 0; i < paramSetSize; ++i) { + if (columnValues[i].is_none()) { + if (!strLenOrIndArray) + strLenOrIndArray = AllocateParamBufferArray(tempBuffers, paramSetSize); + dataArray[i] = 0; + strLenOrIndArray[i] = SQL_NULL_DATA; + } else { + int intVal = columnValues[i].cast(); + if (intVal < 0 || intVal > 255) { + ThrowStdException("UTINYINT value out of range at rowIndex " + std::to_string(i)); + } + dataArray[i] = static_cast(intVal); + if (strLenOrIndArray) strLenOrIndArray[i] = 0; + } } - std::memcpy(wcharArray + i * (info.columnSize + 1), wstr.c_str(), (wstr.length() + 1) * sizeof(SQLWCHAR)); - strLenOrIndArray[i] = SQL_NTS; + dataPtr = dataArray; + bufferLength = sizeof(unsigned char); + break; } - dataPtr = wcharArray; - bufferLength = (info.columnSize + 1) * sizeof(SQLWCHAR); - break; - } - case SQL_C_TINYINT: - case SQL_C_UTINYINT: { - unsigned char* dataArray = AllocateParamBufferArray(paramBuffers, paramSetSize); - for (size_t i = 0; i < paramSetSize; ++i) { - if (columnValues[i].is_none()) { - if (!strLenOrIndArray) - strLenOrIndArray = AllocateParamBufferArray(paramBuffers, paramSetSize); - dataArray[i] = 0; - strLenOrIndArray[i] = SQL_NULL_DATA; - } else { - int intVal = columnValues[i].cast(); - if (intVal < 0 || intVal > 255) { - ThrowStdException("UTINYINT value out of range at rowIndex " + std::to_string(i)); + case SQL_C_SHORT: { + short* dataArray = AllocateParamBufferArray(tempBuffers, paramSetSize); + for (size_t i = 0; i < paramSetSize; ++i) { + if (columnValues[i].is_none()) { + if (!strLenOrIndArray) + strLenOrIndArray = AllocateParamBufferArray(tempBuffers, paramSetSize); + dataArray[i] = 0; + strLenOrIndArray[i] = SQL_NULL_DATA; + } else { + int intVal = columnValues[i].cast(); + if (intVal < std::numeric_limits::min() || + intVal > std::numeric_limits::max()) { + ThrowStdException("SHORT value out of range at rowIndex " + std::to_string(i)); + } + dataArray[i] = static_cast(intVal); + if (strLenOrIndArray) strLenOrIndArray[i] = 0; } - dataArray[i] = static_cast(intVal); - if (strLenOrIndArray) strLenOrIndArray[i] = 0; } + dataPtr = dataArray; + bufferLength = sizeof(short); + break; } - dataPtr = dataArray; - bufferLength = sizeof(unsigned char); - break; - } - case SQL_C_SHORT: { - short* dataArray = AllocateParamBufferArray(paramBuffers, paramSetSize); - for (size_t i = 0; i < paramSetSize; ++i) { - if (columnValues[i].is_none()) { - if (!strLenOrIndArray) - strLenOrIndArray = AllocateParamBufferArray(paramBuffers, paramSetSize); - dataArray[i] = 0; - strLenOrIndArray[i] = SQL_NULL_DATA; - } else { - int intVal = columnValues[i].cast(); - if (intVal < std::numeric_limits::min() || - intVal > std::numeric_limits::max()) { - ThrowStdException("SHORT value out of range at rowIndex " + std::to_string(i)); + case SQL_C_CHAR: + case SQL_C_BINARY: { + char* charArray = AllocateParamBufferArray(tempBuffers, paramSetSize * (info.columnSize + 1)); + strLenOrIndArray = AllocateParamBufferArray(tempBuffers, paramSetSize); + for (size_t i = 0; i < paramSetSize; ++i) { + if (columnValues[i].is_none()) { + strLenOrIndArray[i] = SQL_NULL_DATA; + std::memset(charArray + i * (info.columnSize + 1), 0, info.columnSize + 1); + } else { + std::string str = columnValues[i].cast(); + if (str.size() > info.columnSize) + ThrowStdException("Input exceeds column size at index " + std::to_string(i)); + std::memcpy(charArray + i * (info.columnSize + 1), str.c_str(), str.size()); + strLenOrIndArray[i] = static_cast(str.size()); } - dataArray[i] = static_cast(intVal); - if (strLenOrIndArray) strLenOrIndArray[i] = 0; } + dataPtr = charArray; + bufferLength = info.columnSize + 1; + break; + } + case SQL_C_BIT: { + char* boolArray = AllocateParamBufferArray(tempBuffers, paramSetSize); + strLenOrIndArray = AllocateParamBufferArray(tempBuffers, paramSetSize); + for (size_t i = 0; i < paramSetSize; ++i) { + if (columnValues[i].is_none()) { + boolArray[i] = 0; + strLenOrIndArray[i] = SQL_NULL_DATA; + } else { + boolArray[i] = columnValues[i].cast() ? 1 : 0; + strLenOrIndArray[i] = 0; + } + } + dataPtr = boolArray; + bufferLength = sizeof(char); + break; + } + case SQL_C_STINYINT: + case SQL_C_USHORT: { + unsigned short* dataArray = AllocateParamBufferArray(tempBuffers, paramSetSize); + strLenOrIndArray = AllocateParamBufferArray(tempBuffers, paramSetSize); + for (size_t i = 0; i < paramSetSize; ++i) { + if (columnValues[i].is_none()) { + strLenOrIndArray[i] = SQL_NULL_DATA; + dataArray[i] = 0; + } else { + dataArray[i] = columnValues[i].cast(); + strLenOrIndArray[i] = 0; + } + } + dataPtr = dataArray; + bufferLength = sizeof(unsigned short); + break; + } + case SQL_C_SBIGINT: + case SQL_C_SLONG: + case SQL_C_UBIGINT: + case SQL_C_ULONG: { + int64_t* dataArray = AllocateParamBufferArray(tempBuffers, paramSetSize); + strLenOrIndArray = AllocateParamBufferArray(tempBuffers, paramSetSize); + for (size_t i = 0; i < paramSetSize; ++i) { + if (columnValues[i].is_none()) { + strLenOrIndArray[i] = SQL_NULL_DATA; + dataArray[i] = 0; + } else { + dataArray[i] = columnValues[i].cast(); + strLenOrIndArray[i] = 0; + } + } + dataPtr = dataArray; + bufferLength = sizeof(int64_t); + break; + } + case SQL_C_FLOAT: { + float* dataArray = AllocateParamBufferArray(tempBuffers, paramSetSize); + strLenOrIndArray = AllocateParamBufferArray(tempBuffers, paramSetSize); + for (size_t i = 0; i < paramSetSize; ++i) { + if (columnValues[i].is_none()) { + strLenOrIndArray[i] = SQL_NULL_DATA; + dataArray[i] = 0.0f; + } else { + dataArray[i] = columnValues[i].cast(); + strLenOrIndArray[i] = 0; + } + } + dataPtr = dataArray; + bufferLength = sizeof(float); + break; + } + case SQL_C_TYPE_DATE: { + SQL_DATE_STRUCT* dateArray = AllocateParamBufferArray(tempBuffers, paramSetSize); + strLenOrIndArray = AllocateParamBufferArray(tempBuffers, paramSetSize); + for (size_t i = 0; i < paramSetSize; ++i) { + if (columnValues[i].is_none()) { + strLenOrIndArray[i] = SQL_NULL_DATA; + std::memset(&dateArray[i], 0, sizeof(SQL_DATE_STRUCT)); + } else { + py::object dateObj = columnValues[i]; + dateArray[i].year = dateObj.attr("year").cast(); + dateArray[i].month = dateObj.attr("month").cast(); + dateArray[i].day = dateObj.attr("day").cast(); + strLenOrIndArray[i] = 0; + } + } + dataPtr = dateArray; + bufferLength = sizeof(SQL_DATE_STRUCT); + break; + } + case SQL_C_TYPE_TIME: { + SQL_TIME_STRUCT* timeArray = AllocateParamBufferArray(tempBuffers, paramSetSize); + strLenOrIndArray = AllocateParamBufferArray(tempBuffers, paramSetSize); + for (size_t i = 0; i < paramSetSize; ++i) { + if (columnValues[i].is_none()) { + strLenOrIndArray[i] = SQL_NULL_DATA; + std::memset(&timeArray[i], 0, sizeof(SQL_TIME_STRUCT)); + } else { + py::object timeObj = columnValues[i]; + timeArray[i].hour = timeObj.attr("hour").cast(); + timeArray[i].minute = timeObj.attr("minute").cast(); + timeArray[i].second = timeObj.attr("second").cast(); + strLenOrIndArray[i] = 0; + } + } + dataPtr = timeArray; + bufferLength = sizeof(SQL_TIME_STRUCT); + break; + } + case SQL_C_TYPE_TIMESTAMP: { + SQL_TIMESTAMP_STRUCT* tsArray = AllocateParamBufferArray(tempBuffers, paramSetSize); + strLenOrIndArray = AllocateParamBufferArray(tempBuffers, paramSetSize); + for (size_t i = 0; i < paramSetSize; ++i) { + if (columnValues[i].is_none()) { + strLenOrIndArray[i] = SQL_NULL_DATA; + std::memset(&tsArray[i], 0, sizeof(SQL_TIMESTAMP_STRUCT)); + } else { + py::object dtObj = columnValues[i]; + tsArray[i].year = dtObj.attr("year").cast(); + tsArray[i].month = dtObj.attr("month").cast(); + tsArray[i].day = dtObj.attr("day").cast(); + tsArray[i].hour = dtObj.attr("hour").cast(); + tsArray[i].minute = dtObj.attr("minute").cast(); + tsArray[i].second = dtObj.attr("second").cast(); + tsArray[i].fraction = static_cast(dtObj.attr("microsecond").cast() * 1000); // µs to ns + strLenOrIndArray[i] = 0; + } + } + dataPtr = tsArray; + bufferLength = sizeof(SQL_TIMESTAMP_STRUCT); + break; + } + case SQL_C_NUMERIC: { + SQL_NUMERIC_STRUCT* numericArray = AllocateParamBufferArray(tempBuffers, paramSetSize); + strLenOrIndArray = AllocateParamBufferArray(tempBuffers, paramSetSize); + for (size_t i = 0; i < paramSetSize; ++i) { + const py::handle& element = columnValues[i]; + if (element.is_none()) { + strLenOrIndArray[i] = SQL_NULL_DATA; + std::memset(&numericArray[i], 0, sizeof(SQL_NUMERIC_STRUCT)); + continue; + } + if (!py::isinstance(element)) { + throw std::runtime_error(MakeParamMismatchErrorStr(info.paramCType, paramIndex)); + } + NumericData decimalParam = element.cast(); + LOG("Received numeric parameter at [%zu]: precision=%d, scale=%d, sign=%d, val=%lld", + i, decimalParam.precision, decimalParam.scale, decimalParam.sign, decimalParam.val); + numericArray[i].precision = decimalParam.precision; + numericArray[i].scale = decimalParam.scale; + numericArray[i].sign = decimalParam.sign; + std::memset(numericArray[i].val, 0, sizeof(numericArray[i].val)); + std::memcpy(numericArray[i].val, + reinterpret_cast(&decimalParam.val), + std::min(sizeof(decimalParam.val), sizeof(numericArray[i].val))); + strLenOrIndArray[i] = sizeof(SQL_NUMERIC_STRUCT); + } + dataPtr = numericArray; + bufferLength = sizeof(SQL_NUMERIC_STRUCT); + break; + } + default: { + ThrowStdException("BindParameterArray: Unsupported C type: " + std::to_string(info.paramCType)); } - dataPtr = dataArray; - bufferLength = sizeof(short); - break; } - default: { - ThrowStdException("BindParameterArray: Unsupported C type: " + std::to_string(info.paramCType)); + RETCODE rc = SQLBindParameter_ptr( + hStmt, + static_cast(paramIndex + 1), + static_cast(info.inputOutputType), + static_cast(info.paramCType), + static_cast(info.paramSQLType), + info.columnSize, + info.decimalDigits, + dataPtr, + bufferLength, + strLenOrIndArray + ); + if (!SQL_SUCCEEDED(rc)) { + LOG("Failed to bind array param {}", paramIndex); + return rc; } } - - RETCODE rc = SQLBindParameter_ptr( - hStmt, - static_cast(paramIndex + 1), - static_cast(info.inputOutputType), - static_cast(info.paramCType), - static_cast(info.paramSQLType), - info.columnSize, - info.decimalDigits, - dataPtr, - bufferLength, - strLenOrIndArray - ); - if (!SQL_SUCCEEDED(rc)) { - LOG("Failed to bind array param {}", paramIndex); - return rc; - } + } catch (...) { + LOG("Exception occurred during parameter array binding. Cleaning up."); + throw; } + paramBuffers.insert(paramBuffers.end(), tempBuffers.begin(), tempBuffers.end()); LOG("Finished column-wise parameter array binding."); return SQL_SUCCESS; }