Skip to content
Prev Previous commit
Next Next commit
test improvement
  • Loading branch information
subrata-ms committed Apr 1, 2026
commit 78c51fd652f50d1de37f5efbc4a6159f9262d31b
77 changes: 33 additions & 44 deletions tests/test_009_pooling.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,20 +104,16 @@ def test_connection_pooling_isolation_level_reset(conn_str):
# Set isolation level to SERIALIZABLE (non-default)
conn1.set_attr(mssql_python.SQL_ATTR_TXN_ISOLATION, mssql_python.SQL_TXN_SERIALIZABLE)

# Verify the isolation level was set
# Verify the isolation level was set (use DBCC USEROPTIONS to avoid
# requiring VIEW SERVER PERFORMANCE STATE permission for sys.dm_exec_sessions)
cursor1 = conn1.cursor()
cursor1.execute(
"SELECT CASE transaction_isolation_level "
"WHEN 0 THEN 'Unspecified' "
"WHEN 1 THEN 'ReadUncommitted' "
"WHEN 2 THEN 'ReadCommitted' "
"WHEN 3 THEN 'RepeatableRead' "
"WHEN 4 THEN 'Serializable' "
"WHEN 5 THEN 'Snapshot' END AS isolation_level "
"FROM sys.dm_exec_sessions WHERE session_id = @@SPID"
)
isolation_level_1 = cursor1.fetchone()[0]
assert isolation_level_1 == "Serializable", f"Expected Serializable, got {isolation_level_1}"
cursor1.execute("DBCC USEROPTIONS WITH NO_INFOMSGS")
isolation_level_1 = None
for row in cursor1.fetchall():
if row[0] == "isolation level":
isolation_level_1 = row[1]
break
assert isolation_level_1 == "serializable", f"Expected serializable, got {isolation_level_1}"

# Get SPID for verification of connection reuse
cursor1.execute("SELECT @@SPID")
Expand All @@ -138,24 +134,20 @@ def test_connection_pooling_isolation_level_reset(conn_str):
# Verify connection was reused
assert spid1 == spid2, "Connection was not reused from pool"

# Check if isolation level is reset to default
cursor2.execute(
"SELECT CASE transaction_isolation_level "
"WHEN 0 THEN 'Unspecified' "
"WHEN 1 THEN 'ReadUncommitted' "
"WHEN 2 THEN 'ReadCommitted' "
"WHEN 3 THEN 'RepeatableRead' "
"WHEN 4 THEN 'Serializable' "
"WHEN 5 THEN 'Snapshot' END AS isolation_level "
"FROM sys.dm_exec_sessions WHERE session_id = @@SPID"
)
isolation_level_2 = cursor2.fetchone()[0]
# Check if isolation level is reset to default (use DBCC USEROPTIONS to avoid
# requiring VIEW SERVER PERFORMANCE STATE permission for sys.dm_exec_sessions)
cursor2.execute("DBCC USEROPTIONS WITH NO_INFOMSGS")
isolation_level_2 = None
for row in cursor2.fetchall():
if row[0] == "isolation level":
isolation_level_2 = row[1]
break

# Verify isolation level is reset to default (READ COMMITTED)
# This is the CORRECT behavior for connection pooling - we should reset
# session state to prevent settings from one usage affecting the next
assert isolation_level_2 == "ReadCommitted", (
f"Isolation level was not reset! Expected 'ReadCommitted', got '{isolation_level_2}'. "
assert isolation_level_2 == "read committed", (
f"Isolation level was not reset! Expected 'read committed', got '{isolation_level_2}'. "
f"This indicates session state leaked from the previous connection usage."
)

Expand Down Expand Up @@ -283,10 +275,10 @@ def test_pool_idle_timeout_removes_connections(conn_str):
pooling(max_size=2, idle_timeout=1)
conn1 = connect(conn_str)
cursor1 = conn1.cursor()
# Use connection_id (a GUID unique per physical connection) instead of @@SPID,
# because SQL Server can reassign the same SPID to a new connection.
cursor1.execute("SELECT connection_id FROM sys.dm_exec_connections WHERE session_id = @@SPID")
conn_id1 = cursor1.fetchone()[0]
# Use @@SPID to identify the connection without requiring
# VIEW SERVER PERFORMANCE STATE permission for sys.dm_exec_connections.
cursor1.execute("SELECT @@SPID")
spid1 = cursor1.fetchone()[0]
conn1.close()

# Wait well beyond the idle_timeout to account for slow CI and integer-second granularity
Expand All @@ -295,13 +287,11 @@ def test_pool_idle_timeout_removes_connections(conn_str):
# Get a new connection — the idle one should have been evicted during acquire()
conn2 = connect(conn_str)
cursor2 = conn2.cursor()
cursor2.execute("SELECT connection_id FROM sys.dm_exec_connections WHERE session_id = @@SPID")
conn_id2 = cursor2.fetchone()[0]
cursor2.execute("SELECT @@SPID")
spid2 = cursor2.fetchone()[0]
conn2.close()

assert (
conn_id1 != conn_id2
), "Idle timeout did not remove connection from pool — same connection_id reused"
assert spid1 != spid2, "Idle timeout did not remove connection from pool — same SPID reused"


# =============================================================================
Expand All @@ -323,9 +313,10 @@ def test_pool_removes_invalid_connections(conn_str):
cursor.execute("SELECT 1")
cursor.fetchone()

# Record the connection_id of the original connection
cursor.execute("SELECT connection_id FROM sys.dm_exec_connections WHERE session_id = @@SPID")
original_conn_id = cursor.fetchone()[0]
# Record the SPID of the original connection (avoids requiring
# VIEW SERVER PERFORMANCE STATE permission for sys.dm_exec_connections)
cursor.execute("SELECT @@SPID")
original_spid = cursor.fetchone()[0]

# Force-return the connection to the pool WITHOUT rollback.
# This leaves the pooled connection in a dirty state (open implicit transaction)
Expand All @@ -346,12 +337,10 @@ def test_pool_removes_invalid_connections(conn_str):
assert result is not None and result[0] == 1, "Pool did not recover from invalid connection"

# Verify it's a different physical connection
new_cursor.execute(
"SELECT connection_id FROM sys.dm_exec_connections WHERE session_id = @@SPID"
)
new_conn_id = new_cursor.fetchone()[0]
new_cursor.execute("SELECT @@SPID")
new_spid = new_cursor.fetchone()[0]
assert (
original_conn_id != new_conn_id
original_spid != new_spid
), "Expected a new physical connection after pool discarded the dirty one"
Comment thread
subrata-ms marked this conversation as resolved.

new_conn.close()
Expand Down
Loading