-
Notifications
You must be signed in to change notification settings - Fork 46
Expand file tree
/
Copy pathtest_019_bulkcopy.py
More file actions
307 lines (244 loc) · 10.8 KB
/
test_019_bulkcopy.py
File metadata and controls
307 lines (244 loc) · 10.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
"""Basic integration tests for bulkcopy via the mssql_python driver."""
import pytest
# Skip the entire module when mssql_py_core can't be loaded at runtime
# (e.g. manylinux_2_28 build containers where glibc is too old for the .so).
mssql_py_core = pytest.importorskip(
"mssql_py_core", reason="mssql_py_core not loadable (glibc too old?)"
)
def test_connection_and_cursor(cursor):
"""Test that connection and cursor work correctly."""
cursor.execute("SELECT 1 AS connected")
result = cursor.fetchone()
assert result[0] == 1
def test_insert_and_fetch(cursor):
"""Test basic insert and fetch operations."""
table_name = "mssql_python_test_basic"
# Create table
cursor.execute(f"IF OBJECT_ID('{table_name}', 'U') IS NOT NULL DROP TABLE {table_name}")
cursor.execute(f"CREATE TABLE {table_name} (id INT, name NVARCHAR(50))")
cursor.connection.commit()
# Insert data
cursor.execute(f"INSERT INTO {table_name} (id, name) VALUES (?, ?)", (1, "Alice"))
cursor.execute(f"INSERT INTO {table_name} (id, name) VALUES (?, ?)", (2, "Bob"))
# Fetch and verify
cursor.execute(f"SELECT id, name FROM {table_name} ORDER BY id")
rows = cursor.fetchall()
assert len(rows) == 2
assert rows[0][0] == 1 and rows[0][1] == "Alice"
assert rows[1][0] == 2 and rows[1][1] == "Bob"
# Cleanup
cursor.execute(f"DROP TABLE {table_name}")
def test_bulkcopy_basic(cursor):
"""Test basic bulkcopy operation via mssql_python driver with auto-mapping.
Uses automatic column mapping (columns mapped by ordinal position).
"""
table_name = "mssql_python_bulkcopy_test"
# Create table
cursor.execute(f"IF OBJECT_ID('{table_name}', 'U') IS NOT NULL DROP TABLE {table_name}")
cursor.execute(f"CREATE TABLE {table_name} (id INT, name VARCHAR(50), value FLOAT)")
cursor.connection.commit()
# Prepare test data - columns match table order (id, name, value)
data = [
(1, "Alice", 100.5),
(2, "Bob", 200.75),
(3, "Charlie", 300.25),
]
# Perform bulkcopy with auto-mapping (no column_mappings specified)
# Using explicit timeout parameter instead of kwargs
result = cursor.bulkcopy(table_name, data, timeout=60)
# Verify result
assert result is not None
assert result["rows_copied"] == 3
# Verify data was inserted correctly
cursor.execute(f"SELECT id, name, value FROM {table_name} ORDER BY id")
rows = cursor.fetchall()
assert len(rows) == 3
assert rows[0][0] == 1 and rows[0][1] == "Alice" and abs(rows[0][2] - 100.5) < 0.01
assert rows[1][0] == 2 and rows[1][1] == "Bob" and abs(rows[1][2] - 200.75) < 0.01
assert rows[2][0] == 3 and rows[2][1] == "Charlie" and abs(rows[2][2] - 300.25) < 0.01
# Cleanup
cursor.execute(f"DROP TABLE {table_name}")
def test_bulkcopy_without_database_parameter(conn_str):
"""Test bulkcopy operation works when DATABASE is not specified in connection string.
The database keyword in connection string is optional. In its absence,
the client sends an empty database name and the server responds with
the default database the client was connected to.
"""
from mssql_python import connect
from mssql_python.connection_string_parser import _ConnectionStringParser
from mssql_python.connection_string_builder import _ConnectionStringBuilder
# Parse the connection string using the proper parser
parser = _ConnectionStringParser(validate_keywords=False)
params = parser._parse(conn_str)
# Remove DATABASE parameter if present (case-insensitive, handles all synonyms)
params.pop("database", None)
# Rebuild the connection string using the builder to preserve braced values
builder = _ConnectionStringBuilder(params)
conn_str_no_db = builder.build()
# Create connection without DATABASE parameter
conn = connect(conn_str_no_db)
try:
cursor = conn.cursor()
# Verify we're connected to a database (should be the default)
cursor.execute("SELECT DB_NAME() AS current_db")
current_db = cursor.fetchone()[0]
assert current_db is not None, "Should be connected to a database"
# Skip on Azure SQL Database (EngineEdition 5): bulkcopy internally
# opens a second connection via mssql_py_core using the stored
# connection string. Without a DATABASE keyword that second
# connection cannot resolve the target table on Azure SQL.
cursor.execute("SELECT CAST(SERVERPROPERTY('EngineEdition') AS INT)")
engine_edition = cursor.fetchone()[0]
if engine_edition == 5:
pytest.skip(
"bulkcopy uses a separate internal connection; without "
"DATABASE in the connection string it cannot resolve "
"table metadata on Azure SQL"
)
# Use unqualified table names — the table is created in whatever
# database we connected to. Three-part names ([db].[schema].[table])
# are NOT supported on Azure SQL, and the default database (often
# master) may deny CREATE TABLE on other CI environments, so we
# skip gracefully when the current database doesn't allow DDL.
table_name = "mssql_python_bulkcopy_no_db_test"
try:
cursor.execute(f"IF OBJECT_ID('{table_name}', 'U') IS NOT NULL DROP TABLE {table_name}")
cursor.execute(f"CREATE TABLE {table_name} (id INT, name VARCHAR(50), value FLOAT)")
conn.commit()
except Exception as e:
pytest.skip(f"Cannot create table in default database '{current_db}': {e}")
# Prepare test data
data = [
(1, "Alice", 100.5),
(2, "Bob", 200.75),
(3, "Charlie", 300.25),
]
# Perform bulkcopy - this should NOT raise ValueError about missing DATABASE
result = cursor.bulkcopy(table_name, data, timeout=60)
# Verify result
assert result is not None
assert result["rows_copied"] == 3
# Verify data was inserted correctly
cursor.execute(f"SELECT id, name, value FROM {table_name} ORDER BY id")
rows = cursor.fetchall()
assert len(rows) == 3
assert rows[0][0] == 1 and rows[0][1] == "Alice" and abs(rows[0][2] - 100.5) < 0.01
assert rows[1][0] == 2 and rows[1][1] == "Bob" and abs(rows[1][2] - 200.75) < 0.01
assert rows[2][0] == 3 and rows[2][1] == "Charlie" and abs(rows[2][2] - 300.25) < 0.01
# Cleanup
cursor.execute(f"DROP TABLE {table_name}")
cursor.close()
finally:
conn.close()
def test_bulkcopy_with_server_synonyms(conn_str):
"""Test that bulkcopy works with all SERVER parameter synonyms: server, addr, address."""
from mssql_python import connect
from mssql_python.connection_string_parser import _ConnectionStringParser
from mssql_python.connection_string_builder import _ConnectionStringBuilder
# Parse the connection string using the proper parser
parser = _ConnectionStringParser(validate_keywords=False)
params = parser._parse(conn_str)
# Test with 'Addr' synonym - replace 'server' with 'addr'
server_value = (
params.pop("server", None) or params.pop("addr", None) or params.pop("address", None)
)
params["addr"] = server_value
builder = _ConnectionStringBuilder(params)
conn_string_addr = builder.build()
conn = connect(conn_string_addr)
try:
cursor = conn.cursor()
table_name = "test_bulkcopy_addr_synonym"
# Create table
cursor.execute(f"DROP TABLE IF EXISTS {table_name}")
cursor.execute(f"""
CREATE TABLE {table_name} (
id INT,
name NVARCHAR(50),
value FLOAT
)
""")
conn.commit()
# Test data
test_data = [(1, "Test1", 1.5), (2, "Test2", 2.5), (3, "Test3", 3.5)]
# Perform bulkcopy with connection using Addr parameter
result = cursor.bulkcopy(table_name, test_data)
# Verify result
assert result is not None
assert "rows_copied" in result
assert result["rows_copied"] == 3
# Verify data
cursor.execute(f"SELECT COUNT(*) FROM {table_name}")
count = cursor.fetchone()[0]
assert count == 3
# Cleanup
cursor.execute(f"DROP TABLE {table_name}")
cursor.close()
finally:
conn.close()
# Test with 'Address' synonym - replace with 'address'
params = parser._parse(conn_str)
server_value = (
params.pop("server", None) or params.pop("addr", None) or params.pop("address", None)
)
params["address"] = server_value
builder = _ConnectionStringBuilder(params)
conn_string_address = builder.build()
conn = connect(conn_string_address)
try:
cursor = conn.cursor()
table_name = "test_bulkcopy_address_synonym"
# Create table
cursor.execute(f"DROP TABLE IF EXISTS {table_name}")
cursor.execute(f"""
CREATE TABLE {table_name} (
id INT,
name NVARCHAR(50),
value FLOAT
)
""")
conn.commit()
# Test data
test_data = [(1, "Test1", 1.5), (2, "Test2", 2.5), (3, "Test3", 3.5)]
# Perform bulkcopy with connection using Address parameter
result = cursor.bulkcopy(table_name, test_data)
# Verify result
assert result is not None
assert "rows_copied" in result
assert result["rows_copied"] == 3
# Verify data
cursor.execute(f"SELECT COUNT(*) FROM {table_name}")
count = cursor.fetchone()[0]
assert count == 3
# Cleanup
cursor.execute(f"DROP TABLE {table_name}")
cursor.close()
finally:
conn.close()
# Test that bulkcopy fails when SERVER parameter is missing entirely
params = parser._parse(conn_str)
# Remove all server synonyms
params.pop("server", None)
params.pop("addr", None)
params.pop("address", None)
builder = _ConnectionStringBuilder(params)
conn_string_no_server = builder.build()
# Ensure we have a valid connection string for the main connection
conn = connect(conn_str)
try:
cursor = conn.cursor()
# Manually override the connection string to one without server
cursor.connection.connection_str = conn_string_no_server
table_name = "test_bulkcopy_no_server"
test_data = [(1, "Test1", 1.5)]
# This should raise ValueError due to missing SERVER parameter
try:
cursor.bulkcopy(table_name, test_data)
assert False, "Expected ValueError for missing SERVER parameter"
except ValueError as e:
assert "SERVER parameter is required" in str(e)
cursor.close()
finally:
conn.close()