Skip to content

Commit 59b8ab2

Browse files
author
James William Pye
committed
Add the object signalling mechanism to InterfaceElement for passing Messages.
Also, get element tracebacks working. :)
1 parent 03703af commit 59b8ab2

3 files changed

Lines changed: 173 additions & 58 deletions

File tree

postgresql/api.py

Lines changed: 123 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,30 @@
1515
This module is used to define the PG-API. It creates a set of ABCs
1616
that makes up the basic interfaces used to work with a PostgreSQL.
1717
"""
18-
from abc import ABCMeta, abstractproperty, abstractmethod
18+
import os
19+
import warnings
1920
import collections
20-
from operator import attrgetter, methodcaller
21+
from abc import ABCMeta, abstractproperty, abstractmethod
22+
from operator import attrgetter, methodcaller, itemgetter
23+
24+
class Receptor(collections.Callable):
25+
"""
26+
A receptor is a type of callable used by `InterfaceElement`'s `ife_emit`
27+
method.
28+
"""
29+
30+
@abstractmethod
31+
def __call__(self,
32+
source_ife : "The element whose `ife_emit` method was called.",
33+
receiving_ife : "The element that included the `Receptor`('self').",
34+
obj : "The object that was given to `ife_emit`"
35+
) -> bool:
36+
"""
37+
This is the type signature of receptor capable functions.
38+
39+
If the receptor returns `True`, further propagation will be halted if the
40+
`allow_consumption` parameter given to `ife_emit` is `True`(default).
41+
"""
2142

2243
class InterfaceElement(metaclass = ABCMeta):
2344
"""
@@ -59,6 +80,14 @@ class InterfaceElement(metaclass = ABCMeta):
5980
CURSOR: <cursor_id>
6081
<parameters>
6182
ERROR: <message>
83+
84+
85+
Receptors
86+
---------
87+
88+
Reception is a faculty created to support PostgreSQL message and warning
89+
propagation in a context specific way. For instance, the NOTICE emitted by
90+
PostgreSQL when creating a table with a PRIMARY KEY might be
6291
"""
6392
@abstractproperty
6493
def ife_ancestor(self):
@@ -106,7 +135,7 @@ def ife_ancestry(self) -> "Sequence of IFE ancestors":
106135
if ife in ancestors or ife is self:
107136
raise TypeError("recursive element ancestry detected")
108137
if isinstance(ife, InterfaceElement):
109-
stack.append(ife)
138+
ancestors.append(ife)
110139
else:
111140
break
112141
ife = getattr(ife, 'ife_ancestor', None)
@@ -141,6 +170,62 @@ def ife_descend(self,
141170
for x in args:
142171
x.ife_ancestor = self
143172

173+
def ife_emit(self,
174+
obj : "object to emit",
175+
allow_consumption : "whether or not receptors are allowed to stop further propagation" = True,
176+
) -> (False, (collections.Callable)):
177+
"""
178+
Send an arbitrary object through the ancestry.
179+
180+
This is used in situations where the effects of an element result in an
181+
object that is not returned by the element's interaction(method call,
182+
property get, etc).
183+
184+
To handle these additional results, the object is passed up through the
185+
ancestry. Any ancestor that has receptors will see the object
186+
187+
If `obj` was consumed by a receptor, the receptor that consumed it will be
188+
returned
189+
"""
190+
a = self.ifa_ancestry()
191+
a.insert(0, self)
192+
for ife in a:
193+
for recep in getattr(ife, '_ife_receptors', ()):
194+
# (emit source element, reception element, object)
195+
r = recep(self, ife, obj)
196+
if r is True and allow_consumption:
197+
# receptor indicated halt
198+
return (recep, ife)
199+
return False
200+
201+
def ife_connect(self,
202+
*args : (Receptor,)
203+
) -> None:
204+
"""
205+
Add the `Receptor`s to the element. "Connecting" a receptor allows it to
206+
receive objects "emitted" by descendent elements.
207+
208+
Whenever an object is given to `ife_emit`, the given `Receptor`s will be
209+
called with the `obj`.
210+
"""
211+
if not hasattr(self, '_ife_receptors'):
212+
recept = self._ife_receptors = list(args)
213+
return
214+
# Prepend the list. Newer receptors are given priority.
215+
new = list(recept)
216+
self._ife_receptors = new.extend(self._ife_receptors)
217+
218+
def ife_sever(self,
219+
*args : (Receptor,)
220+
) -> None:
221+
"""
222+
Remove the `Receptor`s from the element.
223+
"""
224+
if hasattr(self, '_ife_receptors'):
225+
for x in args:
226+
if x in self._ife_receptors:
227+
self._ife_receptors.remove(x)
228+
144229
class Message(InterfaceElement):
145230
"A message emitted by PostgreSQL"
146231
ife_label = 'MESSAGE'
@@ -185,17 +270,20 @@ def __str__(self):
185270
]
186271
locstr = (
187272
"" if tuple(loc) == ('?', '?', '?')
188-
else "LOCATION: File {0!r}, line {1!s}, in {2!s}".format(*loc) + os.linesep
273+
else os.linesep + "LOCATION: File {0!r}, line {1!s}, in {2!s}".format(*loc)
189274
)
190275

191276
sev = details.get('severity')
277+
sevmsg = os.linesep
192278
if sev:
193-
sevmsg = "SEVERITY: " + sev.upper() + os.linesep
194-
return self.message + os.linesep + sevmsg + \
195-
os.linesep.join((
196-
': '.join((k.upper(), v)) for k, v in sorted(details.items(), key = itemgetter(0))
197-
if k not in ('message', 'severity', 'file', 'function', 'line')
198-
)) + locstr
279+
sevmsg = os.linesep + "SEVERITY: " + sev.upper()
280+
detailstr = os.linesep.join((
281+
': '.join((k.upper(), v)) for k, v in sorted(details.items(), key = itemgetter(0))
282+
if k not in ('message', 'severity', 'file', 'function', 'line')
283+
))
284+
if detailstr:
285+
detailstr = os.linesep + detailstr
286+
return self.message + sevmsg + detailstr + locstr
199287

200288
class Cursor(
201289
InterfaceElement,
@@ -933,29 +1021,39 @@ class Driver(InterfaceElement):
9331021
ife_label = "DRIVER"
9341022
ife_ancestor = None
9351023

936-
@abstractproperty
937-
def Connector(self):
938-
"""
939-
The `Connector` implementation for the driver.
940-
"""
941-
1024+
@abstractmethod
9421025
def connect(**kw):
9431026
"""
9441027
Create a connection using the given parameters for the Connector.
9451028
946-
This caches the `Connector` instance for re-use when the same parameters
1029+
This should cache the `Connector` instance for re-use when the same parameters
9471030
are given again.
9481031
"""
949-
id = set(kw.items())
950-
cr = self._connectors.get(id)
951-
if cr is None:
952-
cr = self.Connector(**kw)
953-
c = cr()
954-
self._connectors[id] = cr
955-
return c
1032+
1033+
def print_message(self, msg, file = None):
1034+
"""
1035+
Standard message printer.
1036+
"""
1037+
file = sys.stderr if not file else file
1038+
if file and not file.closed:
1039+
file.write(str(msg))
1040+
else:
1041+
warnings.warn("sys.stderr unavailable for printing messages")
1042+
1043+
def handle_warnings_and_messages(self, source, this, obj):
1044+
"""
1045+
Send warnings to `warnings.warn` and print `Message`s to standard error.
1046+
"""
1047+
if isinstance(obj, Message):
1048+
self.print_message(obj)
1049+
elif isinstance(obj, warnings.Warning):
1050+
warnings.warn(obj)
9561051

9571052
def __init__(self):
958-
self._connectors = {}
1053+
"""
1054+
The driver, by default will emit warnings and messages.
1055+
"""
1056+
self.ife_connect(self.handle_warnings_and_messages)
9591057

9601058
class Cluster(InterfaceElement):
9611059
"""

postgresql/driver/pq3.py

Lines changed: 49 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1031,6 +1031,7 @@ def prime(self):
10311031
pq.element.DescribeStatement(self.statement_id),
10321032
pq.element.SynchronizeMessage,
10331033
))
1034+
self.ife_descend(self._init_xact)
10341035
self.connection._push(self._init_xact)
10351036

10361037
class StoredProcedure(pg_api.StoredProcedure):
@@ -1520,26 +1521,37 @@ def fdel(self):
15201521
version = None
15211522
backend_id = None
15221523

1523-
def _decode_message(self, msg):
1524-
"[internal] decode a Notice/Error message's information"
1524+
def _decode_pq_message(self, msg):
1525+
'[internal] decode the values in a message(E,N)'
1526+
dmsg = {}
15251527
for k, v in msg.items():
15261528
try:
1527-
dv = self.typio.decode(v)
1529+
# code should always be ascii..
1530+
if k == "code":
1531+
v = v.decode('ascii')
1532+
else:
1533+
v = self.typio._decode(v)[0]
15281534
except UnicodeDecodeError:
1529-
dv = repr(v)
1530-
yield k, v
1535+
# Fallback to the bytes representation.
1536+
# This should be sufficiently informative in most cases,
1537+
# and in the cases where it isn't, an element traceback should
1538+
# yield the pertinent information
1539+
v = repr(v)[2:-1]
1540+
dmsg[k] = v
1541+
return dmsg
15311542

15321543
def _N(self, msg, xact):
15331544
'[internal] print a notice message using the notifier attribute'
1534-
decoded = dict(self._decode_message(msg))
1545+
dmsg = self._decode_pq_message(msg)
1546+
m = dmsg.pop('message')
1547+
c = dmsg.pop('code')
15351548
if decoded['severity'].upper() == 'WARNING':
1536-
w = pg_exc.WarningLookup(decoded['code'])
1537-
w = w(msg['message'], code = msg['code'], details = decoded)
1538-
w.connection = self
1539-
w.creator = getattr(xact, 'creator', None)
1540-
warnings.warn(w)
1549+
mo = pg_exc.WarningLookup(c)(m, code = c, details = dmsg)
15411550
else:
1542-
self.notifier(decoded)
1551+
mo = pg_api.Message(m, code = c, details = dmsg)
1552+
mo.connection = self
1553+
xact.ife_descend(mo)
1554+
xact.ife_emit(mo)
15431555

15441556
def _A(self, msg, xact):
15451557
'[internal] Send notification to any listeners; NOTIFY messages'
@@ -1624,29 +1636,37 @@ def interrupt(self):
16241636
finally:
16251637
s.close()
16261638

1627-
def execute(self, query):
1639+
def execute(self, query : str) -> None:
16281640
'Execute an arbitrary block of SQL'
16291641
q = pq.Transaction((
16301642
pq.element.Query(self.typio.encode(query)),
16311643
))
1644+
self.ife_descend(q)
16321645
self._push(q)
16331646
self._complete()
16341647

16351648
def query(*args, **kw):
16361649
'Create a query object using the given query string'
1637-
return ClientPreparedStatement(*args, **kw)
1650+
q = ClientPreparedStatement(*args, **kw)
1651+
return q
16381652

16391653
def statement(*args, **kw):
16401654
'Create a query object using the statement identifier'
1641-
return PreparedStatement(*args, **kw)
1655+
ps = PreparedStatement(*args, **kw)
1656+
args[0].ife_descend(ps)
1657+
return ps
16421658

16431659
def proc(*args, **kw):
16441660
'Create a StoredProcedure object using the given procedure identity'
1645-
return StoredProcedure(*args, **kw)
1661+
sp = StoredProcedure(*args, **kw)
1662+
args[0].ife_descend(sp)
1663+
return sp
16461664

16471665
def cursor(*args, **kw):
16481666
'Create a Portal object that references an already existing cursor'
1649-
return Cursor(*args, **kw)
1667+
c = Cursor(*args, **kw)
1668+
args[0].ife_descend(c)
1669+
return c
16501670

16511671
def _xp_query(self, string, args = (), rformats = ()):
16521672
'[internal] create an extended protocol transaction for the query'
@@ -1870,7 +1890,6 @@ def _standard_write_messages(self, messages):
18701890
def _traced_write_messages(self, messages):
18711891
'[internal] _message_writer used when tracing'
18721892
for msg in messages:
1873-
print(msg)
18741893
t = getattr(msg, 'type', None)
18751894
if t is not None:
18761895
data_out = msg.bytes()
@@ -1915,7 +1934,7 @@ def _backend_gc(self):
19151934
del self._closeportals[:portals], self._closestatements[:statements]
19161935
self._complete()
19171936

1918-
def _push(self, xact):
1937+
def _push(self, xact : pq.ProtocolState):
19191938
'[internal] setup the given transaction to be processed'
19201939
# Push any queued closures onto the transaction or a new transaction.
19211940
if xact.state is pq.Complete:
@@ -1928,19 +1947,17 @@ def _push(self, xact):
19281947
self._xact = xact
19291948
self._step()
19301949

1931-
def _postgres_error(self, em):
1950+
def _postgres_error(self, em : pq.element.Error) -> pg_exc.Error:
19321951
'[internal] lookup a PostgreSQL error and instantiate it'
1933-
c = em["code"].decode('ascii')
1952+
m = self._decode_pq_message(em)
1953+
c = m.pop('code')
1954+
ms = m.pop('message')
19341955
err = pg_exc.ErrorLookup(c)
1935-
err = err(
1936-
em['message'],
1937-
code = c,
1938-
details = em
1939-
)
1940-
err.error_message = em
1956+
1957+
err = err(ms, code = c, details = m)
19411958
err.connection = self
19421959
return err
1943-
1960+
19441961
def _procasyncs(self):
19451962
'[internal] process the async messages in self._asyncs'
19461963
if self._n_procasyncs:
@@ -2013,6 +2030,7 @@ def _pop(self):
20132030
em = getattr(self._xact, 'error_message', None)
20142031
if em is not None:
20152032
xact_error = self._postgres_error(em)
2033+
self._xact.ife_descend(xact_error)
20162034
if self._xact.fatal is not True:
20172035
# only remove the transaction if it's *not* fatal
20182036
xact_error.fatal = False
@@ -2274,6 +2292,9 @@ class Driver(pg_api.Driver):
22742292
Connector = Connector
22752293
ife_ancestor = None
22762294

2295+
def connect(self):
2296+
pass
2297+
22772298
def __str__(self):
22782299
return 'postgresql.driver.pq3'
22792300

postgresql/exceptions.py

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,7 @@
3030
"""
3131
import sys
3232
import os
33-
from operator import itemgetter
3433
from functools import partial
35-
from string import Formatter
3634
from . import api as pg_api
3735

3836
severities = (
@@ -45,8 +43,6 @@
4543
'PANIC',
4644
)
4745

48-
message_ife_lineage_filter = set([])
49-
5046
class Exception(Exception):
5147
'Base PostgreSQL exception class'
5248
pass
@@ -81,7 +77,7 @@ def __str__(self):
8177
': '.join((x.ife_label, str(x))) for x in self.ife_ancestry()
8278
]
8379
l.reverse()
84-
return super().__str__(self) + os.linesep.join(l)
80+
return super().__str__() + (os.linesep + os.linesep.join(l) if l else "")
8581

8682
class Warning(PythonMessage, Warning):
8783
code = '01000'

0 commit comments

Comments
 (0)