-
Notifications
You must be signed in to change notification settings - Fork 46
Expand file tree
/
Copy pathtest_009_pooling.py
More file actions
553 lines (443 loc) · 19.4 KB
/
test_009_pooling.py
File metadata and controls
553 lines (443 loc) · 19.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
# tests/test_009_pooling.py
"""
Connection Pooling Tests
This module contains all tests related to connection pooling functionality.
Tests cover basic pooling operations, pool management, cleanup, performance,
and edge cases including the pooling disable bug fix.
Test Categories:
- Basic pooling functionality and configuration
- Pool resource management (size limits, timeouts)
- Connection reuse and lifecycle
- Performance benefits verification
- Cleanup and disable operations (bug fix tests)
- Error handling and recovery scenarios
"""
import pytest
import time
import threading
import statistics
from mssql_python import connect, pooling
from mssql_python.pooling import PoolingManager
import mssql_python
@pytest.fixture(autouse=True)
def reset_pooling_state():
"""Reset pooling state before each test to ensure clean test isolation."""
yield
# Cleanup after each test
try:
pooling(enabled=False)
PoolingManager._reset_for_testing()
except Exception:
pass # Ignore cleanup errors
# =============================================================================
# Basic Pooling Functionality Tests
# =============================================================================
def test_connection_pooling_basic(conn_str):
"""Test basic connection pooling functionality with multiple connections."""
# Enable pooling with small pool size
pooling(max_size=2, idle_timeout=5)
conn1 = connect(conn_str)
conn2 = connect(conn_str)
assert conn1 is not None
assert conn2 is not None
try:
conn3 = connect(conn_str)
assert (
conn3 is not None
), "Third connection failed — pooling is not working or limit is too strict"
conn3.close()
except Exception as e:
print(f"Expected: Could not open third connection due to max_size=2: {e}")
conn1.close()
conn2.close()
def test_connection_pooling_reuse_spid(conn_str):
"""Test that connections are actually reused from the pool using SQL Server SPID."""
# Enable pooling
pooling(max_size=1, idle_timeout=30)
# Create and close a connection
conn1 = connect(conn_str)
cursor1 = conn1.cursor()
cursor1.execute("SELECT @@SPID") # Get SQL Server process ID
spid1 = cursor1.fetchone()[0]
conn1.close()
# Get another connection - should be the same one from pool
conn2 = connect(conn_str)
cursor2 = conn2.cursor()
cursor2.execute("SELECT @@SPID")
spid2 = cursor2.fetchone()[0]
conn2.close()
# The SPID should be the same, indicating connection reuse
assert spid1 == spid2, "Connections not reused - different SPIDs"
def test_connection_pooling_isolation_level_reset(conn_str):
"""Test that pooling correctly resets session state for isolation level.
This test verifies that when a connection is returned to the pool and then
reused, the isolation level setting is reset to the default (READ COMMITTED)
to prevent session state from leaking between connection usages.
Bug Fix: Previously, SQL_ATTR_RESET_CONNECTION was used which does NOT reset
the isolation level. Now we explicitly reset it to prevent state leakage.
"""
# Enable pooling with small pool to ensure connection reuse
pooling(enabled=True, max_size=1, idle_timeout=30)
# Create first connection and set isolation level to SERIALIZABLE
conn1 = connect(conn_str)
# Set isolation level to SERIALIZABLE (non-default)
conn1.set_attr(mssql_python.SQL_ATTR_TXN_ISOLATION, mssql_python.SQL_TXN_SERIALIZABLE)
# Verify the isolation level was set
cursor1 = conn1.cursor()
cursor1.execute(
"SELECT CASE transaction_isolation_level "
"WHEN 0 THEN 'Unspecified' "
"WHEN 1 THEN 'ReadUncommitted' "
"WHEN 2 THEN 'ReadCommitted' "
"WHEN 3 THEN 'RepeatableRead' "
"WHEN 4 THEN 'Serializable' "
"WHEN 5 THEN 'Snapshot' END AS isolation_level "
"FROM sys.dm_exec_sessions WHERE session_id = @@SPID"
)
isolation_level_1 = cursor1.fetchone()[0]
assert isolation_level_1 == "Serializable", f"Expected Serializable, got {isolation_level_1}"
# Get SPID for verification of connection reuse
cursor1.execute("SELECT @@SPID")
spid1 = cursor1.fetchone()[0]
# Close connection (return to pool)
cursor1.close()
conn1.close()
# Get second connection from pool (should reuse the same connection)
conn2 = connect(conn_str)
# Check if it's the same connection (same SPID)
cursor2 = conn2.cursor()
cursor2.execute("SELECT @@SPID")
spid2 = cursor2.fetchone()[0]
# Verify connection was reused
assert spid1 == spid2, "Connection was not reused from pool"
# Check if isolation level is reset to default
cursor2.execute(
"SELECT CASE transaction_isolation_level "
"WHEN 0 THEN 'Unspecified' "
"WHEN 1 THEN 'ReadUncommitted' "
"WHEN 2 THEN 'ReadCommitted' "
"WHEN 3 THEN 'RepeatableRead' "
"WHEN 4 THEN 'Serializable' "
"WHEN 5 THEN 'Snapshot' END AS isolation_level "
"FROM sys.dm_exec_sessions WHERE session_id = @@SPID"
)
isolation_level_2 = cursor2.fetchone()[0]
# Verify isolation level is reset to default (READ COMMITTED)
# This is the CORRECT behavior for connection pooling - we should reset
# session state to prevent settings from one usage affecting the next
assert isolation_level_2 == "ReadCommitted", (
f"Isolation level was not reset! Expected 'ReadCommitted', got '{isolation_level_2}'. "
f"This indicates session state leaked from the previous connection usage."
)
# Clean up
cursor2.close()
conn2.close()
def test_connection_pooling_speed(conn_str):
"""Test that connection pooling provides performance benefits over multiple iterations."""
# Warm up to eliminate cold start effects
for _ in range(3):
conn = connect(conn_str)
conn.close()
# Disable pooling first
pooling(enabled=False)
# Test without pooling (multiple times)
no_pool_times = []
for _ in range(10):
start = time.perf_counter()
conn = connect(conn_str)
conn.close()
end = time.perf_counter()
no_pool_times.append(end - start)
# Enable pooling
pooling(max_size=5, idle_timeout=30)
# Test with pooling (multiple times)
pool_times = []
for _ in range(10):
start = time.perf_counter()
conn = connect(conn_str)
conn.close()
end = time.perf_counter()
pool_times.append(end - start)
# Use median times to reduce impact of outliers
median_no_pool = statistics.median(no_pool_times)
median_pool = statistics.median(pool_times)
# Allow for some variance - pooling should be at least 30% faster on average
improvement_threshold = 0.7 # Pool should be <= 70% of no-pool time
print(f"No pool median: {median_no_pool:.6f}s")
print(f"Pool median: {median_pool:.6f}s")
print(f"Improvement ratio: {median_pool/median_no_pool:.2f}")
assert (
median_pool <= median_no_pool * improvement_threshold
), f"Expected pooling to be at least 30% faster. No-pool: {median_no_pool:.6f}s, Pool: {median_pool:.6f}s"
# =============================================================================
# Pool Resource Management Tests
# =============================================================================
def test_pool_exhaustion_max_size_1(conn_str):
"""Test pool exhaustion when max_size=1 and multiple concurrent connections are requested."""
pooling(max_size=1, idle_timeout=30)
conn1 = connect(conn_str)
results = []
def try_connect():
try:
conn2 = connect(conn_str)
results.append("success")
conn2.close()
except Exception as e:
results.append(str(e))
# Start a thread that will attempt to get a second connection while the first is open
t = threading.Thread(target=try_connect)
t.start()
t.join(timeout=2)
conn1.close()
# Depending on implementation, either blocks, raises, or times out
assert results, "Second connection attempt did not complete"
# If pool blocks, the thread may not finish until conn1 is closed, so allow both outcomes
assert (
results[0] == "success" or "pool" in results[0].lower() or "timeout" in results[0].lower()
), f"Unexpected pool exhaustion result: {results[0]}"
def test_pool_capacity_limit_and_overflow(conn_str):
"""Test that pool does not grow beyond max_size and handles overflow gracefully."""
pooling(max_size=2, idle_timeout=30)
conns = []
try:
# Open up to max_size connections
conns.append(connect(conn_str))
conns.append(connect(conn_str))
# Try to open a third connection, which should fail or block
overflow_result = []
def try_overflow():
try:
c = connect(conn_str)
overflow_result.append("success")
c.close()
except Exception as e:
overflow_result.append(str(e))
t = threading.Thread(target=try_overflow)
t.start()
t.join(timeout=2)
assert overflow_result, "Overflow connection attempt did not complete"
# Accept either block, error, or success if pool implementation allows overflow
assert (
overflow_result[0] == "success"
or "pool" in overflow_result[0].lower()
or "timeout" in overflow_result[0].lower()
), f"Unexpected pool overflow result: {overflow_result[0]}"
finally:
for c in conns:
c.close()
def test_pool_release_overflow_disconnects_outside_mutex(conn_str):
"""Test that releasing a connection when pool is full disconnects it correctly.
When a connection is returned to a pool that is already at max_size,
the connection must be disconnected. This exercises the overflow path in
ConnectionPool::release() (connection_pool.cpp) where should_disconnect
is set and disconnect happens outside the mutex.
With the current pool semantics, max_size limits total concurrent
connections, so we acquire two connections with max_size=2, then shrink
the pool to max_size=1 before returning them. The second close hits
the overflow path.
"""
pooling(max_size=2, idle_timeout=30)
conn1 = connect(conn_str)
conn2 = connect(conn_str)
# Shrink idle capacity so first close fills the pool and second overflows
pooling(max_size=1, idle_timeout=30)
# Close conn1 — returned to the pool (pool now has 1 idle entry)
conn1.close()
# Close conn2 — pool is full (1 idle already), so this connection
# must be disconnected rather than pooled (overflow path).
conn2.close()
# Verify the pool is still functional
conn3 = connect(conn_str)
cursor = conn3.cursor()
cursor.execute("SELECT 1")
assert cursor.fetchone()[0] == 1
conn3.close()
@pytest.mark.skip("Flaky test - idle timeout behavior needs investigation")
def test_pool_idle_timeout_removes_connections(conn_str):
"""Test that idle_timeout removes connections from the pool after the timeout."""
pooling(max_size=2, idle_timeout=1)
conn1 = connect(conn_str)
spid_list = []
cursor1 = conn1.cursor()
cursor1.execute("SELECT @@SPID")
spid1 = cursor1.fetchone()[0]
spid_list.append(spid1)
conn1.close()
# Wait for longer than idle_timeout
time.sleep(3)
# Get a new connection, which should not reuse the previous SPID
conn2 = connect(conn_str)
cursor2 = conn2.cursor()
cursor2.execute("SELECT @@SPID")
spid2 = cursor2.fetchone()[0]
spid_list.append(spid2)
conn2.close()
assert spid1 != spid2, "Idle timeout did not remove connection from pool"
# =============================================================================
# Error Handling and Recovery Tests
# =============================================================================
@pytest.mark.skip(
"Test causes fatal crash - forcibly closing underlying connection leads to undefined behavior"
)
def test_pool_removes_invalid_connections(conn_str):
"""Test that the pool removes connections that become invalid (simulate by closing underlying connection)."""
pooling(max_size=1, idle_timeout=30)
conn = connect(conn_str)
cursor = conn.cursor()
cursor.execute("SELECT 1")
# Simulate invalidation by forcibly closing the connection at the driver level
try:
# Try to access a private attribute or method to forcibly close the underlying connection
# This is implementation-specific; if not possible, skip
if hasattr(conn, "_conn") and hasattr(conn._conn, "close"):
conn._conn.close()
else:
pytest.skip("Cannot forcibly close underlying connection for this driver")
except Exception:
pass
# Safely close the connection, ignoring errors due to forced invalidation
try:
conn.close()
except RuntimeError as e:
if "not initialized" not in str(e):
raise
# Now, get a new connection from the pool and ensure it works
new_conn = connect(conn_str)
new_cursor = new_conn.cursor()
try:
new_cursor.execute("SELECT 1")
result = new_cursor.fetchone()
assert result is not None and result[0] == 1, "Pool did not remove invalid connection"
finally:
new_conn.close()
def test_pool_recovery_after_failed_connection(conn_str):
"""Test that the pool recovers after a failed connection attempt."""
pooling(max_size=1, idle_timeout=30)
# First, try to connect with a bad password (should fail)
if "Pwd=" in conn_str:
bad_conn_str = conn_str.replace("Pwd=", "Pwd=wrongpassword")
elif "Password=" in conn_str:
bad_conn_str = conn_str.replace("Password=", "Password=wrongpassword")
else:
pytest.skip("No password found in connection string to modify")
with pytest.raises(Exception):
connect(bad_conn_str)
# Now, connect with the correct string and ensure it works
conn = connect(conn_str)
cursor = conn.cursor()
cursor.execute("SELECT 1")
result = cursor.fetchone()
assert result is not None and result[0] == 1, "Pool did not recover after failed connection"
conn.close()
# =============================================================================
# Pooling Disable Bug Fix Tests
# =============================================================================
def test_pooling_disable_without_hang(conn_str):
"""Test that pooling(enabled=False) does not hang after connections are created (Bug Fix Test)."""
print("Testing pooling disable without hang...")
# Enable pooling
pooling(enabled=True)
# Create and use a connection
conn = connect(conn_str)
cursor = conn.cursor()
cursor.execute("SELECT 1")
result = cursor.fetchone()
assert result[0] == 1, "Basic query failed"
conn.close()
# This should not hang (was the original bug)
start_time = time.time()
pooling(enabled=False)
elapsed = time.time() - start_time
# Should complete quickly (within 2 seconds)
assert elapsed < 2.0, f"pooling(enabled=False) took too long: {elapsed:.2f}s"
print(f"pooling(enabled=False) completed in {elapsed:.3f}s")
def test_pooling_disable_without_closing_connection(conn_str):
"""Test that pooling(enabled=False) works even when connections are not explicitly closed."""
print("Testing pooling disable with unclosed connection...")
# Enable pooling
pooling(enabled=True)
# Create connection but don't close it
conn = connect(conn_str)
cursor = conn.cursor()
cursor.execute("SELECT 1")
result = cursor.fetchone()
assert result[0] == 1, "Basic query failed"
# Note: Not calling conn.close() here intentionally
# This should still not hang
start_time = time.time()
pooling(enabled=False)
elapsed = time.time() - start_time
# Should complete quickly (within 2 seconds)
assert elapsed < 2.0, f"pooling(enabled=False) took too long: {elapsed:.2f}s"
print(f"pooling(enabled=False) with unclosed connection completed in {elapsed:.3f}s")
def test_multiple_pooling_disable_calls(conn_str):
"""Test that multiple calls to pooling(enabled=False) are safe (double-cleanup prevention)."""
print("Testing multiple pooling disable calls...")
# Enable pooling and create connection
pooling(enabled=True)
conn = connect(conn_str)
conn.close()
# Multiple disable calls should be safe
start_time = time.time()
pooling(enabled=False) # First disable
pooling(enabled=False) # Second disable - should be safe
pooling(enabled=False) # Third disable - should be safe
elapsed = time.time() - start_time
# Should complete quickly
assert elapsed < 2.0, f"Multiple pooling disable calls took too long: {elapsed:.2f}s"
print(f"Multiple disable calls completed in {elapsed:.3f}s")
def test_pooling_disable_without_enable(conn_str):
"""Test that calling pooling(enabled=False) without enabling first is safe (edge case)."""
print("Testing pooling disable without enable...")
# Reset to clean state
PoolingManager._reset_for_testing()
# Disable without enabling should be safe
start_time = time.time()
pooling(enabled=False)
pooling(enabled=False) # Multiple calls should also be safe
elapsed = time.time() - start_time
# Should complete quickly
assert elapsed < 1.0, f"Disable without enable took too long: {elapsed:.2f}s"
print(f"Disable without enable completed in {elapsed:.3f}s")
def test_pooling_enable_disable_cycle(conn_str):
"""Test multiple enable/disable cycles work correctly."""
print("Testing enable/disable cycles...")
for cycle in range(3):
print(f" Cycle {cycle + 1}...")
# Enable pooling
pooling(enabled=True)
assert PoolingManager.is_enabled(), f"Pooling not enabled in cycle {cycle + 1}"
# Use pooling
conn = connect(conn_str)
cursor = conn.cursor()
cursor.execute("SELECT 1")
result = cursor.fetchone()
assert result[0] == 1, f"Query failed in cycle {cycle + 1}"
conn.close()
# Disable pooling
start_time = time.time()
pooling(enabled=False)
elapsed = time.time() - start_time
assert not PoolingManager.is_enabled(), f"Pooling not disabled in cycle {cycle + 1}"
assert elapsed < 2.0, f"Disable took too long in cycle {cycle + 1}: {elapsed:.2f}s"
print("All enable/disable cycles completed successfully")
def test_pooling_state_consistency(conn_str):
"""Test that pooling state remains consistent across operations."""
print("Testing pooling state consistency...")
# Initial state
PoolingManager._reset_for_testing()
assert not PoolingManager.is_enabled(), "Initial state should be disabled"
assert not PoolingManager.is_initialized(), "Initial state should be uninitialized"
# Enable pooling
pooling(enabled=True)
assert PoolingManager.is_enabled(), "Should be enabled after enable call"
assert PoolingManager.is_initialized(), "Should be initialized after enable call"
# Use pooling
conn = connect(conn_str)
conn.close()
assert PoolingManager.is_enabled(), "Should remain enabled after connection usage"
# Disable pooling
pooling(enabled=False)
assert not PoolingManager.is_enabled(), "Should be disabled after disable call"
assert PoolingManager.is_initialized(), "Should remain initialized after disable call"
print("Pooling state consistency verified")