|
3 | 3 | import pika |
4 | 4 |
|
5 | 5 | import beaver.transport |
| 6 | +from beaver.transport import TransportException |
6 | 7 |
|
7 | 8 |
|
8 | 9 | class RabbitmqTransport(beaver.transport.Transport): |
@@ -52,18 +53,30 @@ def __init__(self, configfile): |
52 | 53 | def callback(self, filename, lines): |
53 | 54 | timestamp = datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.%fZ") |
54 | 55 | for line in lines: |
55 | | - self.channel.basic_publish( |
56 | | - exchange=self.rabbitmq_exchange, |
57 | | - routing_key=self.rabbitmq_key, |
58 | | - body=self.format(filename, timestamp, line), |
59 | | - properties=pika.BasicProperties( |
60 | | - content_type="text/json", |
61 | | - delivery_mode=1 |
62 | | - ) |
63 | | - ) |
| 56 | + try: |
| 57 | + import warnings |
| 58 | + with warnings.catch_warnings(): |
| 59 | + warnings.simplefilter("error") |
| 60 | + self.channel.basic_publish( |
| 61 | + exchange=self.rabbitmq_exchange, |
| 62 | + routing_key=self.rabbitmq_key, |
| 63 | + body=self.format(filename, timestamp, line), |
| 64 | + properties=pika.BasicProperties( |
| 65 | + content_type="text/json", |
| 66 | + delivery_mode=1 |
| 67 | + ) |
| 68 | + ) |
| 69 | + except UserWarning: |
| 70 | + raise TransportException("Connection appears to have been lost") |
| 71 | + except Exception, e: |
| 72 | + try: |
| 73 | + raise TransportException(e.strerror) |
| 74 | + except AttributeError: |
| 75 | + raise TransportException("Unspecified exception encountered") # TRAP ALL THE THINGS! |
64 | 76 |
|
65 | 77 | def interrupt(self): |
66 | | - self.connection.close() |
| 78 | + if self.connection: |
| 79 | + self.connection.close() |
67 | 80 |
|
68 | 81 | def unhandled(self): |
69 | 82 | return True |
0 commit comments