Skip to content

Commit 74a5492

Browse files
author
Kiall Mac Innes
committed
Add a TCP transport
1 parent 2cb8fc6 commit 74a5492

4 files changed

Lines changed: 90 additions & 10 deletions

File tree

README.rst

Lines changed: 31 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ usage::
3232
beaver [-h] [-c CONFIG] [-d] [-D] [-f FILES [FILES ...]]
3333
[-F {json,msgpack,raw,rawjson,string}] [-H HOSTNAME] [-m {bind,connect}]
3434
[-l OUTPUT] [-p PATH] [-P PID]
35-
[-t {mqtt,rabbitmq,redis,sqs,stdout,udp,zmq}] [-v] [--fqdn]
35+
[-t {mqtt,rabbitmq,redis,sqs,stdout,tcp,udp,zmq}] [-v] [--fqdn]
3636

3737
optional arguments::
3838

@@ -54,15 +54,15 @@ optional arguments::
5454
file to pipe output to (in addition to stdout)
5555
-p PATH, --path PATH path to log files
5656
-P PID, --pid PID path to pid file
57-
-t {mqtt,rabbitmq,redis,stdout,udp,zmq}, --transport {mqtt,rabbitmq,redis,sqs,stdout,udp,zmq}
57+
-t {mqtt,rabbitmq,redis,stdout,tcp,udp,zmq}, --transport {mqtt,rabbitmq,redis,sqs,stdout,tcp,udp,zmq}
5858
log transport method
5959
-v, --version output version and quit
6060
--fqdn use the machine's FQDN for source_host
6161

6262
Background
6363
==========
6464

65-
Beaver provides an lightweight method for shipping local log files to Logstash. It does this using redis, zeromq, udp, rabbit or stdout as the transport. This means you'll need a redis, zeromq, udp, amqp or stdin input somewhere down the road to get the events.
65+
Beaver provides an lightweight method for shipping local log files to Logstash. It does this using redis, zeromq, tcp, udp, rabbit or stdout as the transport. This means you'll need a redis, zeromq, tcp, udp, amqp or stdin input somewhere down the road to get the events.
6666

6767
Events are sent in logstash's ``json_event`` format. Options can also be set as environment variables.
6868

@@ -94,6 +94,8 @@ Beaver can optionally get data from a ``configfile`` using the ``-c`` flag. This
9494
* sqs_aws_secret_key: Can be left blank to use IAM Roles or AWS_SECRET_ACCESS_KEY environment variable (see: https://github.com/boto/boto#getting-started-with-boto)
9595
* sqs_aws_region: Default ``us-east-1``. AWS Region
9696
* sqs_aws_queue: SQS queue (must exist)
97+
* tcp_host: Default ``127.0.0.1``. TCP Host
98+
* tcp_port: Default ``9999``. TCP Port
9799
* udp_host: Default ``127.0.0.1``. UDP Host
98100
* udp_port: Default ``9999``. UDP Port
99101
* zeromq_address: Default ``tcp://localhost:2120``. Zeromq URL
@@ -263,7 +265,27 @@ Example 9: Read config from config.ini and put to stdout::
263265
# From the commandline
264266
beaver -c /etc/beaver/conf -t stdout
265267

266-
Example 10: UDP transport::
268+
Example 10: TCP transport::
269+
270+
# /etc/beaver/conf
271+
[beaver]
272+
tcp_host: 127.0.0.1
273+
tcp_port: 9999
274+
275+
# logstash indexer config:
276+
input {
277+
tcp {
278+
type => 'shipper-input'
279+
host => '127.0.0.1'
280+
port => '9999'
281+
}
282+
}
283+
output { stdout { debug => true } }
284+
285+
# From the commandline
286+
beaver -c /etc/beaver/conf -t tcp
287+
288+
Example 11: UDP transport::
267289

268290
# /etc/beaver/conf
269291
[beaver]
@@ -283,7 +305,7 @@ Example 10: UDP transport::
283305
# From the commandline
284306
beaver -c /etc/beaver/conf -t udp
285307

286-
Example 11: SQS Transport::
308+
Example 12: SQS Transport::
287309

288310
# /etc/beaver/conf
289311
[beaver]
@@ -307,11 +329,11 @@ Example 11: SQS Transport::
307329
# From the commandline
308330
beaver -c /etc/beaver/conf -t sqs
309331

310-
Example 12: [Raw Json Support](http://blog.pkhamre.com/2012/08/23/logging-to-logstash-json-format-in-nginx/::
332+
Example 13: [Raw Json Support](http://blog.pkhamre.com/2012/08/23/logging-to-logstash-json-format-in-nginx/::
311333

312334
beaver --format rawjson
313335

314-
Example 13: Mqtt transport using Mosquitto::
336+
Example 14: Mqtt transport using Mosquitto::
315337

316338
# /etc/beaver/conf
317339
[beaver]
@@ -335,7 +357,7 @@ Example 13: Mqtt transport using Mosquitto::
335357
# From the commandline
336358
beaver -c /etc/beaver/conf -f /var/log/unmappable.log -t mqtt
337359

338-
Example 14: Sincedb support using and sqlite3 db
360+
Example 15: Sincedb support using and sqlite3 db
339361

340362
Note that this will require R/W permissions on the file at sincedb path, as Beaver will store the current line for a given filename/file id.::
341363

@@ -351,7 +373,7 @@ Note that this will require R/W permissions on the file at sincedb path, as Beav
351373
# From the commandline
352374
beaver -c /etc/beaver/conf
353375

354-
Example 15: Loading stanzas from /etc/beaver/conf.d/* support::
376+
Example 16: Loading stanzas from /etc/beaver/conf.d/* support::
355377

356378
# /etc/beaver/conf
357379
[beaver]

beaver/config.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,8 @@ def __init__(self, args, logger=None):
6868
'sqs_aws_secret_key': '',
6969
'sqs_aws_region': 'us-east-1',
7070
'sqs_aws_queue': '',
71+
'tcp_host': '127.0.0.1',
72+
'tcp_port': '9999',
7173
'udp_host': os.environ.get('UDP_HOST', '127.0.0.1'),
7274
'udp_port': os.environ.get('UDP_PORT', '9999'),
7375
'zeromq_address': os.environ.get('ZEROMQ_ADDRESS', 'tcp://localhost:2120'),
@@ -253,6 +255,7 @@ def _main_parser(config):
253255
'rabbitmq_port',
254256
'respawn_delay',
255257
'subprocess_poll_sleep',
258+
'tcp_port',
256259
'udp_port',
257260
'wait_timeout',
258261
'zeromq_hwm',

beaver/transports/tcp_transport.py

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
# -*- coding: utf-8 -*-
2+
import socket
3+
import time
4+
5+
from beaver.transports.base_transport import BaseTransport
6+
7+
8+
class TcpTransport(BaseTransport):
9+
10+
def __init__(self, beaver_config, logger=None):
11+
super(TcpTransport, self).__init__(beaver_config, logger=logger)
12+
13+
self._is_valid = False
14+
self._tcp_host = beaver_config.get('tcp_host')
15+
self._tcp_port = beaver_config.get('tcp_port')
16+
17+
self._connect()
18+
19+
def _connect(self):
20+
wait = -1
21+
while True:
22+
wait += 1
23+
time.sleep(wait)
24+
25+
if wait == 20:
26+
return False
27+
28+
if wait > 0:
29+
self._logger.info("Retrying connection, attempt {0}".format(wait + 1))
30+
31+
try:
32+
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # TCP
33+
self._sock.connect((self._tcp_host, int(self._tcp_port)))
34+
except Exception:
35+
pass
36+
else:
37+
self._logger.info("Connected")
38+
self._is_valid = True
39+
return True
40+
41+
def reconnect(self):
42+
self._connect()
43+
44+
def invalidate(self):
45+
"""Invalidates the current transport"""
46+
super(TcpTransport, self).invalidate()
47+
self._sock.close()
48+
49+
def callback(self, filename, lines, **kwargs):
50+
timestamp = self.get_timestamp(**kwargs)
51+
if kwargs.get('timestamp', False):
52+
del kwargs['timestamp']
53+
54+
for line in lines:
55+
self._sock.send(self.format(filename, line, timestamp, **kwargs) + "\n")

beaver/utils.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ def parse_args():
4444
parser.add_argument('-l', '--logfile', '-o', '--output', help='file to pipe output to (in addition to stdout)', default=None, dest='output')
4545
parser.add_argument('-p', '--path', help='path to log files', default=None, dest='path')
4646
parser.add_argument('-P', '--pid', help='path to pid file', default=None, dest='pid')
47-
parser.add_argument('-t', '--transport', help='log transport method', dest='transport', default=None, choices=['mqtt', 'rabbitmq', 'redis', 'sqs', 'stdout', 'udp', 'zmq'])
47+
parser.add_argument('-t', '--transport', help='log transport method', dest='transport', default=None, choices=['mqtt', 'rabbitmq', 'redis', 'sqs', 'stdout', 'tcp', 'udp', 'zmq'])
4848
parser.add_argument('-e', '--experimental', help='use experimental version of beaver', dest='experimental', default=False, action='store_true')
4949
parser.add_argument('-v', '--version', help='output version and quit', dest='version', default=False, action='store_true')
5050
parser.add_argument('--fqdn', help='use the machine\'s FQDN for source_host', dest='fqdn', default=False, action='store_true')

0 commit comments

Comments
 (0)