-
Notifications
You must be signed in to change notification settings - Fork 46
Expand file tree
/
Copy pathpooling.py
More file actions
139 lines (118 loc) · 4.8 KB
/
pooling.py
File metadata and controls
139 lines (118 loc) · 4.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
"""
Copyright (c) Microsoft Corporation.
Licensed under the MIT license.
This module provides connection pooling functionality for the mssql_python package.
"""
import atexit
import threading
from typing import Dict
from mssql_python import ddbc_bindings
from mssql_python.logging import logger
class PoolingManager:
"""
Manages connection pooling for the mssql_python package.
This class provides thread-safe connection pooling functionality using the
underlying DDBC bindings. It follows a singleton pattern with class-level
state management.
"""
_enabled: bool = False
_initialized: bool = False
_pools_closed: bool = False # Track if pools have been closed
_lock: threading.Lock = threading.Lock()
_config: Dict[str, int] = {"max_size": 100, "idle_timeout": 600}
@classmethod
def enable(cls, max_size: int = 100, idle_timeout: int = 600) -> None:
"""
Enable connection pooling with specified parameters.
Args:
max_size: Maximum number of connections in the pool (default: 100)
idle_timeout: Timeout in seconds for idle connections (default: 600)
Raises:
ValueError: If parameters are invalid (max_size <= 0 or idle_timeout < 0)
"""
logger.debug(
"PoolingManager.enable: Attempting to enable pooling - max_size=%d, idle_timeout=%d",
max_size,
idle_timeout,
)
with cls._lock:
if cls._enabled:
logger.debug("PoolingManager.enable: Pooling already enabled, skipping")
return
if max_size <= 0 or idle_timeout < 0:
logger.error(
"PoolingManager.enable: Invalid parameters - max_size=%d, idle_timeout=%d",
max_size,
idle_timeout,
)
raise ValueError("Invalid pooling parameters")
logger.info(
"PoolingManager.enable: Enabling connection pooling - max_size=%d, idle_timeout=%d seconds",
max_size,
idle_timeout,
)
ddbc_bindings.enable_pooling(max_size, idle_timeout)
cls._config["max_size"] = max_size
cls._config["idle_timeout"] = idle_timeout
cls._enabled = True
cls._initialized = True
logger.info("PoolingManager.enable: Connection pooling enabled successfully")
@classmethod
def disable(cls) -> None:
"""
Disable connection pooling and clean up resources.
This method safely disables pooling and closes existing connections.
It can be called multiple times safely.
"""
logger.debug("PoolingManager.disable: Attempting to disable pooling")
with cls._lock:
if (
cls._enabled and not cls._pools_closed
): # Only cleanup if enabled and not already closed
logger.info("PoolingManager.disable: Closing connection pools")
ddbc_bindings.close_pooling()
logger.info("PoolingManager.disable: Connection pools closed successfully")
else:
logger.debug("PoolingManager.disable: Pooling already disabled or closed")
cls._pools_closed = True
cls._enabled = False
cls._initialized = True
@classmethod
def is_enabled(cls) -> bool:
"""
Check if connection pooling is currently enabled.
Returns:
bool: True if pooling is enabled, False otherwise
"""
return cls._enabled
@classmethod
def is_initialized(cls) -> bool:
"""
Check if the pooling manager has been initialized.
Returns:
bool: True if initialized (either enabled or disabled), False otherwise
"""
return cls._initialized
@classmethod
def _reset_for_testing(cls) -> None:
"""Reset pooling state - for testing purposes only"""
with cls._lock:
cls._enabled = False
cls._initialized = False
cls._pools_closed = False
@atexit.register
def shutdown_pooling():
"""
Shutdown pooling during application exit.
This function is registered with atexit to ensure proper cleanup of
connection pools when the application terminates.
"""
logger.debug("shutdown_pooling: atexit cleanup triggered")
with PoolingManager._lock:
if PoolingManager._enabled and not PoolingManager._pools_closed:
logger.info("shutdown_pooling: Closing connection pools during application exit")
ddbc_bindings.close_pooling()
PoolingManager._pools_closed = True
logger.info("shutdown_pooling: Connection pools closed successfully")
else:
logger.debug("shutdown_pooling: No active pools to close")