66"""
77import os
88import atexit
9+ from collections import deque
910from .cluster import Cluster , ClusterError
1011from . import installation
1112from .python .socket import find_available_port
@@ -43,26 +44,28 @@ class Temporal(object):
4344 }
4445
4546 def __init__ (self ):
46- self .builtins_stack = []
47+ self .builtins_stack = deque ()
4748 self .sandbox_id = 0
49+ # identifier for keeping temporary instances unique.
4850 self .__class__ ._local_id_ = self .local_id = (self .__class__ ._local_id_ + 1 )
4951
5052 def __call__ (self , callable ):
51- def incontext (* args , ** kw ):
53+ def in_pg_temporal_context (* args , ** kw ):
5254 with self :
5355 return callable (* args , ** kw )
5456 n = getattr (callable , '__name__' , None )
5557 if n :
56- incontext .__name__ = n
57- return incontext
58+ in_pg_temporal_context .__name__ = n
59+ return in_pg_temporal_context
5860
5961 def destroy (self ):
6062 # Don't destroy if it's not the initializing process.
6163 if os .getpid () == self ._init_pid_ :
6264 # Kill all the open connections.
6365 try :
64- with self :
65- db .sys .terminate_backends ()
66+ c = cluster .connection (user = 'test' , database = 'template1' ,)
67+ with c :
68+ c .sys .terminate_backends ()
6669 except Exception :
6770 # Doesn't matter much if it fails.
6871 pass
@@ -71,7 +74,7 @@ def destroy(self):
7174 self ._init_pid_ = None
7275 if cluster is not None :
7376 cluster .stop ()
74- cluster .wait_until_stopped (timeout = 10 )
77+ cluster .wait_until_stopped (timeout = 5 )
7578 cluster .drop ()
7679
7780 def init (self ,
@@ -101,10 +104,7 @@ def init(self,
101104 'could not find the default pg_config' , details = inshint
102105 )
103106
104- cluster = Cluster (
105- installation ,
106- self .cluster_path ,
107- )
107+ cluster = Cluster (installation , self .cluster_path ,)
108108
109109 # If it exists already, destroy it.
110110 if cluster .initialized ():
@@ -138,23 +138,23 @@ def init(self,
138138 ))
139139
140140 # Start it up.
141- cluster .start (logfile = open (self .logfile , 'w' ))
141+ with open (self .logfile , 'w' ) as lfo :
142+ cluster .start (logfile = lfo )
142143 cluster .wait_until_started ()
143144
144145 # Initialize template1 and the test user database.
145- c = cluster .connection (
146- user = 'test' , database = 'template1' ,
147- )
146+ c = cluster .connection (user = 'test' , database = 'template1' ,)
148147 with c :
149148 c .execute ('create database test' )
150149 # It's ready.
151150 self .cluster = cluster
152151
153152 def push (self ):
154153 c = self .cluster .connection (user = 'test' )
154+ c .connect ()
155155 extras = []
156156
157- def newdb (l = extras , c = c , sbid = 'sandbox' + str (self .sandbox_id + 1 )):
157+ def new_pg_tmp_connection (l = extras , c = c , sbid = 'sandbox' + str (self .sandbox_id + 1 )):
158158 # Used to create a new connection that will be closed
159159 # when the context stack is popped along with 'db'.
160160 l .append (c .clone ())
@@ -171,7 +171,7 @@ def newdb(l = extras, c = c, sbid = 'sandbox' + str(self.sandbox_id + 1)):
171171 'settings' : c .settings ,
172172 'proc' : c .proc ,
173173 'connector' : c .connector ,
174- 'new' : newdb ,
174+ 'new' : new_pg_tmp_connection ,
175175 }
176176 if not self .builtins_stack :
177177 # Store any of those set or not set.
@@ -186,32 +186,45 @@ def newdb(l = extras, c = c, sbid = 'sandbox' + str(self.sandbox_id + 1)):
186186 __builtins__ .update (builtins )
187187 self .sandbox_id += 1
188188
189- def pop (self ,
190- interrupt = False ,
191- drop_schema = 'DROP SCHEMA sandbox{0} CASCADE' .format
192- ):
193- builtins , extras = self .builtins_stack .pop (- 1 )
189+ def pop (self , exc , drop_schema = 'DROP SCHEMA sandbox{0} CASCADE' .format ):
190+ builtins , extras = self .builtins_stack .pop ()
194191 self .sandbox_id -= 1
195- # restore
192+
193+ # restore __builtins__
196194 if len (self .builtins_stack ) > 1 :
197195 __builtins__ .update (self .builtins_stack [- 1 ][0 ])
198196 else :
199- previous = self .builtins_stack .pop ( 0 )
197+ previous = self .builtins_stack .popleft ( )
200198 for x in self .builtins_keys :
201199 if x in previous :
202200 __builtins__ [x ] = previous [x ]
203201 else :
204202 # Wasn't set before.
205203 __builtins__ .pop (x , None )
206- if not interrupt :
204+
205+ # close popped connection, but only if we're not in an interrupt.
206+ # However, temporal will always terminate all backends atexit.
207+ if exc is None or isinstance (exc , Exception ):
207208 # Interrupt then close. Just in case something is lingering.
208- for x in [builtins ['db' ]] + list (extras ):
209- if x .closed is False :
210- x .interrupt ()
211- x .close ()
212- # Interrupted and closed all the other connections.
213- with builtins ['new' ]() as dropdb :
214- dropdb .execute (drop_schema (self .sandbox_id + 1 ))
209+ for xdb in [builtins ['db' ]] + list (extras ):
210+ if xdb .closed is False :
211+ # In order for a clean close of the connection,
212+ # interrupt before closing. It is still
213+ # possible for the close to block, but less likely.
214+ xdb .interrupt ()
215+ xdb .close ()
216+
217+ # Interrupted and closed all the other connections at this level;
218+ # now remove the sandbox schema.
219+ c = self .cluster .connection (user = 'test' )
220+ with c :
221+ # Use a new connection so that the state of
222+ # the context connection will not have to be
223+ # contended with.
224+ c .execute (drop_schema (self .sandbox_id + 1 ))
225+ else :
226+ # interrupt
227+ pass
215228
216229 def __enter__ (self ):
217230 if self .cluster is None :
@@ -221,12 +234,14 @@ def __enter__(self):
221234 db .connect ()
222235 db .execute ('CREATE SCHEMA sandbox' + str (self .sandbox_id ))
223236 db .settings ['search_path' ] = 'sandbox' + str (self .sandbox_id ) + ',' + db .settings ['search_path' ]
224- except :
225- self .pop ()
237+ except Exception as e :
238+ # failed to initialize sandbox schema; pop it.
239+ self .pop (e )
226240 raise
227241
228242 def __exit__ (self , exc , val , tb ):
229- self .pop (exc and not issubclass (exc , Exception ))
243+ if self .cluster is not None :
244+ self .pop (val )
230245
231246#: The process' temporary cluster.
232247pg_tmp = Temporal ()
0 commit comments