Skip to content

Commit fddb5c2

Browse files
author
James William Pye
committed
Be sure to close sockets and connections.
1 parent 845c443 commit fddb5c2

1 file changed

Lines changed: 50 additions & 35 deletions

File tree

postgresql/temporal.py

Lines changed: 50 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
"""
77
import os
88
import atexit
9+
from collections import deque
910
from .cluster import Cluster, ClusterError
1011
from . import installation
1112
from .python.socket import find_available_port
@@ -43,26 +44,28 @@ class Temporal(object):
4344
}
4445

4546
def __init__(self):
46-
self.builtins_stack = []
47+
self.builtins_stack = deque()
4748
self.sandbox_id = 0
49+
# identifier for keeping temporary instances unique.
4850
self.__class__._local_id_ = self.local_id = (self.__class__._local_id_ + 1)
4951

5052
def __call__(self, callable):
51-
def incontext(*args, **kw):
53+
def in_pg_temporal_context(*args, **kw):
5254
with self:
5355
return callable(*args, **kw)
5456
n = getattr(callable, '__name__', None)
5557
if n:
56-
incontext.__name__ = n
57-
return incontext
58+
in_pg_temporal_context.__name__ = n
59+
return in_pg_temporal_context
5860

5961
def destroy(self):
6062
# Don't destroy if it's not the initializing process.
6163
if os.getpid() == self._init_pid_:
6264
# Kill all the open connections.
6365
try:
64-
with self:
65-
db.sys.terminate_backends()
66+
c = cluster.connection(user = 'test', database = 'template1',)
67+
with c:
68+
c.sys.terminate_backends()
6669
except Exception:
6770
# Doesn't matter much if it fails.
6871
pass
@@ -71,7 +74,7 @@ def destroy(self):
7174
self._init_pid_ = None
7275
if cluster is not None:
7376
cluster.stop()
74-
cluster.wait_until_stopped(timeout = 10)
77+
cluster.wait_until_stopped(timeout = 5)
7578
cluster.drop()
7679

7780
def init(self,
@@ -101,10 +104,7 @@ def init(self,
101104
'could not find the default pg_config', details = inshint
102105
)
103106

104-
cluster = Cluster(
105-
installation,
106-
self.cluster_path,
107-
)
107+
cluster = Cluster(installation, self.cluster_path,)
108108

109109
# If it exists already, destroy it.
110110
if cluster.initialized():
@@ -138,23 +138,23 @@ def init(self,
138138
))
139139

140140
# Start it up.
141-
cluster.start(logfile = open(self.logfile, 'w'))
141+
with open(self.logfile, 'w') as lfo:
142+
cluster.start(logfile = lfo)
142143
cluster.wait_until_started()
143144

144145
# Initialize template1 and the test user database.
145-
c = cluster.connection(
146-
user = 'test', database = 'template1',
147-
)
146+
c = cluster.connection(user = 'test', database = 'template1',)
148147
with c:
149148
c.execute('create database test')
150149
# It's ready.
151150
self.cluster = cluster
152151

153152
def push(self):
154153
c = self.cluster.connection(user = 'test')
154+
c.connect()
155155
extras = []
156156

157-
def newdb(l = extras, c = c, sbid = 'sandbox' + str(self.sandbox_id + 1)):
157+
def new_pg_tmp_connection(l = extras, c = c, sbid = 'sandbox' + str(self.sandbox_id + 1)):
158158
# Used to create a new connection that will be closed
159159
# when the context stack is popped along with 'db'.
160160
l.append(c.clone())
@@ -171,7 +171,7 @@ def newdb(l = extras, c = c, sbid = 'sandbox' + str(self.sandbox_id + 1)):
171171
'settings' : c.settings,
172172
'proc' : c.proc,
173173
'connector' : c.connector,
174-
'new' : newdb,
174+
'new' : new_pg_tmp_connection,
175175
}
176176
if not self.builtins_stack:
177177
# Store any of those set or not set.
@@ -186,32 +186,45 @@ def newdb(l = extras, c = c, sbid = 'sandbox' + str(self.sandbox_id + 1)):
186186
__builtins__.update(builtins)
187187
self.sandbox_id += 1
188188

189-
def pop(self,
190-
interrupt = False,
191-
drop_schema = 'DROP SCHEMA sandbox{0} CASCADE'.format
192-
):
193-
builtins, extras = self.builtins_stack.pop(-1)
189+
def pop(self, exc, drop_schema = 'DROP SCHEMA sandbox{0} CASCADE'.format):
190+
builtins, extras = self.builtins_stack.pop()
194191
self.sandbox_id -= 1
195-
# restore
192+
193+
# restore __builtins__
196194
if len(self.builtins_stack) > 1:
197195
__builtins__.update(self.builtins_stack[-1][0])
198196
else:
199-
previous = self.builtins_stack.pop(0)
197+
previous = self.builtins_stack.popleft()
200198
for x in self.builtins_keys:
201199
if x in previous:
202200
__builtins__[x] = previous[x]
203201
else:
204202
# Wasn't set before.
205203
__builtins__.pop(x, None)
206-
if not interrupt:
204+
205+
# close popped connection, but only if we're not in an interrupt.
206+
# However, temporal will always terminate all backends atexit.
207+
if exc is None or isinstance(exc, Exception):
207208
# Interrupt then close. Just in case something is lingering.
208-
for x in [builtins['db']] + list(extras):
209-
if x.closed is False:
210-
x.interrupt()
211-
x.close()
212-
# Interrupted and closed all the other connections.
213-
with builtins['new']() as dropdb:
214-
dropdb.execute(drop_schema(self.sandbox_id+1))
209+
for xdb in [builtins['db']] + list(extras):
210+
if xdb.closed is False:
211+
# In order for a clean close of the connection,
212+
# interrupt before closing. It is still
213+
# possible for the close to block, but less likely.
214+
xdb.interrupt()
215+
xdb.close()
216+
217+
# Interrupted and closed all the other connections at this level;
218+
# now remove the sandbox schema.
219+
c = self.cluster.connection(user = 'test')
220+
with c:
221+
# Use a new connection so that the state of
222+
# the context connection will not have to be
223+
# contended with.
224+
c.execute(drop_schema(self.sandbox_id+1))
225+
else:
226+
# interrupt
227+
pass
215228

216229
def __enter__(self):
217230
if self.cluster is None:
@@ -221,12 +234,14 @@ def __enter__(self):
221234
db.connect()
222235
db.execute('CREATE SCHEMA sandbox' + str(self.sandbox_id))
223236
db.settings['search_path'] = 'sandbox' + str(self.sandbox_id) + ',' + db.settings['search_path']
224-
except:
225-
self.pop()
237+
except Exception as e:
238+
# failed to initialize sandbox schema; pop it.
239+
self.pop(e)
226240
raise
227241

228242
def __exit__(self, exc, val, tb):
229-
self.pop(exc and not issubclass(exc, Exception))
243+
if self.cluster is not None:
244+
self.pop(val)
230245

231246
#: The process' temporary cluster.
232247
pg_tmp = Temporal()

0 commit comments

Comments
 (0)