Skip to content

Commit 01f080d

Browse files
author
Jose Diaz-Gonzalez
committed
Merge pull request #52 from rafaelmagu/master
Added resiliency to RabbitMQ transport
2 parents f5c5804 + 5de877c commit 01f080d

4 files changed

Lines changed: 57 additions & 12 deletions

File tree

beaver/rabbitmq_transport.py

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import pika
44

55
import beaver.transport
6+
from beaver.transport import TransportException
67

78

89
class RabbitmqTransport(beaver.transport.Transport):
@@ -52,18 +53,30 @@ def __init__(self, configfile):
5253
def callback(self, filename, lines):
5354
timestamp = datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.%fZ")
5455
for line in lines:
55-
self.channel.basic_publish(
56-
exchange=self.rabbitmq_exchange,
57-
routing_key=self.rabbitmq_key,
58-
body=self.format(filename, timestamp, line),
59-
properties=pika.BasicProperties(
60-
content_type="text/json",
61-
delivery_mode=1
62-
)
63-
)
56+
try:
57+
import warnings
58+
with warnings.catch_warnings():
59+
warnings.simplefilter("error")
60+
self.channel.basic_publish(
61+
exchange=self.rabbitmq_exchange,
62+
routing_key=self.rabbitmq_key,
63+
body=self.format(filename, timestamp, line),
64+
properties=pika.BasicProperties(
65+
content_type="text/json",
66+
delivery_mode=1
67+
)
68+
)
69+
except UserWarning:
70+
raise TransportException("Connection appears to have been lost")
71+
except Exception, e:
72+
try:
73+
raise TransportException(e.strerror)
74+
except AttributeError:
75+
raise TransportException("Unspecified exception encountered") # TRAP ALL THE THINGS!
6476

6577
def interrupt(self):
66-
self.connection.close()
78+
if self.connection:
79+
self.connection.close()
6780

6881
def unhandled(self):
6982
return True

beaver/transport.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,3 +61,7 @@ def format(self, filename, timestamp, line):
6161
})
6262

6363
return "[{0}] [{1}] {2}".format(self.current_host, timestamp, line)
64+
65+
66+
class TransportException(Exception):
67+
pass

beaver/worker.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import stat
66
import sys
77
import time
8+
from transport import TransportException
89

910

1011
class Worker(object):
@@ -225,6 +226,8 @@ def run_worker(configfile, args):
225226
l = Worker(configfile, args, transport.callback)
226227
logger.info("Working...")
227228
l.loop()
229+
except TransportException, e:
230+
raise TransportException(e.message)
228231
except KeyboardInterrupt:
229232
logger.info("Shutting down. Please wait.")
230233
transport.interrupt()

bin/beaver

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,15 @@
22

33
import argparse
44
import os
5+
import sys
56

67
import beaver.config
78
import beaver.utils
89
import beaver.worker
910

11+
from beaver.transport import TransportException
12+
from time import sleep
13+
1014
epilog_example = """
1115
Beaver provides an lightweight method for shipping local log
1216
files to Logstash. It does this using either redis, stdin,
@@ -62,7 +66,6 @@ parser.set_defaults(
6266
debug=False
6367
)
6468

65-
6669
args = parser.parse_args()
6770
args.globs = files
6871

@@ -76,4 +79,26 @@ except AttributeError:
7679
args.files = configfile.getfilepaths()
7780
args.globs = configfile.getglobs()
7881

79-
beaver.worker.run_worker(configfile, args)
82+
failure_count = 0
83+
respawn_delay = 3
84+
max_failure = 7
85+
86+
while 1:
87+
try:
88+
worker = beaver.worker.run_worker(configfile, args)
89+
except TransportException, e:
90+
failure_count = failure_count + 1
91+
if failure_count > max_failure:
92+
failure_count = max_failure
93+
sleep_time = respawn_delay ** failure_count
94+
logger.info("Caught transport exception, respawning in %d seconds" % sleep_time)
95+
try:
96+
sleep(sleep_time)
97+
except KeyboardInterrupt:
98+
logger.info("User cancelled respawn.")
99+
sys.exit(0)
100+
except KeyboardInterrupt:
101+
logger.info("Shutting down. Please wait.")
102+
worker.close()
103+
logger.info("Shutdown complete.")
104+
sys.exit(0)

0 commit comments

Comments
 (0)