diff --git a/.coveragerc b/.coveragerc new file mode 100644 index 00000000..812fc3b1 --- /dev/null +++ b/.coveragerc @@ -0,0 +1,4 @@ +[report] +omit = + */python?.?/* + */site-packages/nose/* diff --git a/.gitchangelog.rc b/.gitchangelog.rc deleted file mode 100755 index 24c11cfb..00000000 --- a/.gitchangelog.rc +++ /dev/null @@ -1,118 +0,0 @@ -## -## Format -## -## ACTION: [AUDIENCE:] COMMIT_MSG [@TAG ...] -## -## Description -## -## ACTION is one of 'chg', 'fix', 'new' -## -## Is WHAT the change is about. -## -## 'chg' is for refactor, small improvement, cosmetic changes... -## 'fix' is for bug fixes -## 'new' is for new features, big improvement -## -## SUBJECT is optional and one of 'dev', 'usr', 'pkg', 'test', 'doc' -## -## Is WHO is concerned by the change. -## -## 'dev' is for developpers (API changes, refactors...) -## 'usr' is for final users (UI changes) -## 'pkg' is for packagers (packaging changes) -## 'test' is for testers (test only related changes) -## 'doc' is for doc guys (doc only changes) -## -## COMMIT_MSG is ... well ... the commit message itself. -## -## TAGs are additionnal adjective as 'refactor' 'minor' 'cosmetic' -## -## 'refactor' is obviously for refactoring code only -## 'minor' is for a very meaningless change (a typo, adding a comment) -## 'cosmetic' is for cosmetic driven change (re-indentation, 80-col...) -## -## Example: -## -## new: usr: support of bazaar implemented -## chg: re-indentend some lines @cosmetic -## new: dev: updated code to be compatible with last version of killer lib. -## fix: pkg: updated year of licence coverage. -## new: test: added a bunch of test around user usability of feature X. -## fix: typo in spelling my name in comment. @minor -## -## Please note that multi-line commit message are supported, and only the -## first line will be considered as the "summary" of the commit message. So -## tags, and other rules only applies to the summary. The body of the commit -## message will be displayed in the changelog with minor reformating. - -## -## ``ignore_regexps`` is a line of regexps -## -## Any commit having its full commit message matching any regexp listed here -## will be ignored and won't be reported in the changelog. -## -ignore_regexps = [ - r'(?i)^(Merge branch|Release|Update)', - ] - - -## -## ``replace_regexps`` is a dict associating a regexp pattern and its replacement -## -## It will be applied to get the summary line from the full commit message. -## -## Note that you can provide multiple replacement patterns, they will be all -## tried. If None matches, the summary line will be the full commit message. -## -replace_regexps = { - ## current format (ie: 'chg: dev: my commit msg @tag1 @tag2') - - r'^([cC]hg|[fF]ix|[nN]ew)\s*:\s*((dev|use?r|pkg|test|doc)\s*:\s*)?([^\n@]*)(@[a-z]+\s+)*$': - r'\4', -} - - -## ``section_regexps`` is a list of 2-tuples associating a string label and a -## list of regexp -## -## Commit messages will be classified in sections thanks to this. Section -## titles are the label, and a commit is classified under this section if any -## of the regexps associated is matching. -## -section_regexps = [ - ('New', [ - r'^[nN]ew\s*:\s*((dev|use?r|pkg|test|doc)\s*:\s*)?([^\n]*)$', - ]), - ('Changes', [ - r'^[cC]hg\s*:\s*((dev|use?r|pkg|test|doc)\s*:\s*)?([^\n]*)$', - ]), - ('Fix', [ - r'^[fF]ix\s*:\s*((dev|use?r|pkg|test|doc)\s*:\s*)?([^\n]*)$', - ]), - - ('Other', None ## Match all lines - ), - -] - -## ``body_split_regexp`` is a regexp -## -## Commit message body (not the summary) if existing will be split -## (new line) on this regexp -## -body_split_regexp = r'[\n-]' - - -## ``tag_filter_regexp`` is a regexp -## -## Tags that will be used for the changelog must match this regexp. -## -## tag_filter_regexp = r'^[0-9]+$' -tag_filter_regexp = r'^[0-9\.]+$' - - -## ``unreleased_version_label`` is a string -## -## This label will be used as the changelog Title of the last set of changes -## between last valid tag and HEAD if any. -unreleased_version_label = "%%version%% (unreleased)" diff --git a/.github/dependabot.yml b/.github/dependabot.yml new file mode 100644 index 00000000..7d9a31c7 --- /dev/null +++ b/.github/dependabot.yml @@ -0,0 +1,24 @@ +version: 2 +updates: +- package-ecosystem: pip + directory: "/" + schedule: + interval: daily + open-pull-requests-limit: 10 + ignore: + - dependency-name: moto + versions: + - 1.3.16 + - 2.0.0 + - 2.0.1 + - 2.0.2 + - 2.0.3 + - 2.0.4 + - dependency-name: pyzmq + versions: + - 21.0.2 + - 22.0.0 + - 22.0.2 + - dependency-name: python-daemon + versions: + - 2.2.4 diff --git a/.travis.yml b/.travis.yml index ab9a0ec6..045120e6 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,8 +1,17 @@ +dist: trusty language: python +branches: + except: + - gh-pages python: - "2.7" -before_install: - - sudo apt-get remove -qq libzmq3 || true - - sudo apt-get install -qq build-essential libzmq-dev -install: "pip install -r requirements/zeromq.txt -r requirements/tests.txt --use-mirrors" -script: nosetests +addons: + apt: + packages: + - build-essential + - libzmq-dev +install: ./install-dependencies.sh +script: + nosetests --with-coverage --cover-package=beaver +after_success: + coveralls diff --git a/CHANGES.rst b/CHANGES.rst index 12cd9d26..8ff4c31e 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -1,71 +1,215 @@ Changelog ========= -34.1.0 (2015-08-10) +36.3.1 (2019-04-03) +------------------- +------------------------ +- Require pika version below 1.0.0. [Michael Contento] + + +36.3.0 (2018-10-14) +------------------- + +Fix +~~~ +- Correct release process. [Jose Diaz-Gonzalez] + +Other +~~~~~ +- Fix typo in RABBITMQ_ARGUMENT system property. [eroberts] +- Add arbitrary RabbitMQ arguments - Updated transport to parse + arguments -Updated config.py to add system property - Updated user + usage documentation - Confirmed no change in functionality if args are + not passed. [eroberts] +- Usage fix. [Scott Brenner] + + Fixed missing/incorrect TCP option. +- Remove truncating of logline after 32766 chars (#422) [Robert + Wunderer] + + * Remove truncating of logline after 32766 chars + + It is not necessary to limit unicode()'s input to 32766 chars, while + doing puts an arbitrary limit on the length of lines beaver can + handle. + + * Pin moto version to fix tests + + * multiline_regex_before won't work when logfile ends with empty line + + * Revert "multiline_regex_before won't work when logfile ends with empty line" + + This reverts commit 5196789916afa184beb71a0ef86344d7580d9136. + + * Fix README link + + * Remove truncating of logline after 32766 chars + + It is not necessary to limit unicode()'s input to 32766 chars, while + doing puts an arbitrary limit on the length of lines beaver can + handle. +- Fix README link. [Jamie Cressey] +- Revert "multiline_regex_before won't work when logfile ends with empty + line" [Jamie Cressey] + + This reverts commit 5196789916afa184beb71a0ef86344d7580d9136. +- Multiline_regex_before won't work when logfile ends with empty line. + [jeroenmaelbrancke] +- Pin moto version to fix tests. [Jamie Cressey] +- Fixing primary key reuse issues. [Javier Villar] +- Limiting number of records in batch to 500 as this is the kinesis + limit. [Greg Sterin] +- Updating docs for number_of_consumer_processes config. [Greg Sterin] +- Avoid unreference variable 'st' error if file was removed. Raise + exception if unrecognized environment error from os.stat. [Greg + Sterin] +- Fix GitHub links. [Robin Baumgartner] + + The project was renamed to 'python-beaver', but the links do not reflect that change. + + +36.2.0 (2016-04-12) +------------------- +- Replaced Mosquitto with Paho-mqtt for mqtt transport. [Justin van + Heerde] +- Add log file rotate in limit size. [soarpenguin] +- Fix README.rst docs link. [Andrew Grigorev] + + +36.0.1 (2016-01-13) +------------------- +- Fix README.rst formatting for rst-lint. [Jose Diaz-Gonzalez] +- Remove tabs and use "except .. as .." [EYJ] +- Try to fix hanging test by joining leftover thread. [EYJ] +- Fix Queue timeout occurring after successful reconnect. [EYJ] +- Fix rabbitmq reconnection behaviour to use the beaver reconnect + mechanism. [EYJ] +- Migrating repo locations. [Jamie Cressey] +- Fixups for CONTRIBUTING. [Jamie Cressey] +- Fixing formatting. [Jamie Cressey] +- Changes to guidelines and adding reference to README. [Jamie Cressey] +- Adding contributing guidelines. [Jamie Cressey] + + +36.0.0 (2015-12-15) +------------------- +- Adding max_queue_size to docs. [Jamie Cressey] +- Pinning kafka-python version. [Jamie Cressey] +- Ensure we test against the latest version of kafka-python. [Jamie + Cressey] +- Attempt to reconnect to Kafka on failure. [Jamie Cressey] +- Adding SQS tests. [Jamie Cressey] +- Exclude gh-pages from TravisCI runs. [Jamie Cressey] +- Adding coverage results to README. [Jamie Cressey] +- Adding coverage tests. [Jamie Cressey] +- We say Py2.6+ is a requirement, but do the tests actually pass on 2.6? + [Jamie Cressey] +- Dont test py3, yet... [Jamie Cressey] +- Testing python 3.x. [Jamie Cressey] +- Using new travis config. [Jamie Cressey] +- Added requests as a dependency. [Jose Diaz-Gonzalez] + + Closes #304 +- Bump debian version on release. [David Moravek] +- Support both older and newer pika. [Tim Stoop] +- Make reconnecting to a lost RabbitMQ work. [Tim Stoop] +- Remove old worker code in favor of the - now non-experimental - + TailManager. [Jose Diaz-Gonzalez] + + +35.0.2 (2015-12-03) ------------------- +- Write to the SQS object not the dict when using sqs_bulk_lines flag. + [Jamie Cressey] + + +35.0.1 (2015-11-26) +------------------- +- Remove autospec attribute. [Jose Diaz-Gonzalez] + + For some reason, this broke attribute setting on the mock SelectConnection. +- Fix pika version to version with all named parameters. [Jose Diaz- + Gonzalez] +- Peg kafka to a known-good version. [Jose Diaz-Gonzalez] + +35.0.0 (2015-11-26) +------------------- +- Remove gitchangelog.rc. [Jose Diaz-Gonzalez] +- Merging changes. [Jamie Cressey] +- Added configuration option ignore_old_files. [Ryan Steele] + + Files older then n days are ignored +- Support writes into multiple redis namespaces. [Andrei Vaduva] +- Adding support for multiple SQS queues. [Jamie Cressey] +- Ensure log lines confirm to utf-8 standard. [Jamie Cressey] + + We've come across cases when certain characters break Beaver transmitting log lines. This PR ensures all log lines correctly conform to UTF-8 when they're formatted for transmission. +- Set timeout to 1 second. [Tim Stoop] + + Apparantly, it needs to be an integer, so we cannot use pika's default + of .25. +- Revert "Lower the default to .25, which is pika's default." [Tim + Stoop] + + This reverts commit 17157990a272e458cc9253666f01c6002b84bda8. +- Lower the default to .25, which is pika's default. [Tim Stoop] + + As suggested by @kitchen. +- Pieter's patch for rabbitmq timeout. [Tim Stoop] +- Typo in config variable default value. [Jamie Cressey] +- Fix regressed change. [Jamie Cressey] +- Ability to send multiple log entries per single SQS message. [Jamie + Cressey] +- Adding AWS profile authentication to SQS transport. [Jamie Cressey] + + +34.1.0 (2015-08-10) +------------------- - Adding AWS SNS as a transport option. [Jamie Cressey] + 34.0.1 (2015-08-07) ------------------- - - Revert some breakages caused by d159ec579c01b8fab532b3814c64b0ff8b2063ff. [Jose Diaz-Gonzalez] Closes #331 - - Set default for command. [Jose Diaz-Gonzalez] - - #323 - fix tests to run with pika SelectConnection. [Tom Kregenbild] - - #323 - fix RabbitMQ transport _on_open_connection_error function to print connection errors. [Tom Kregenbild] - - #323 1. Add clear debug prints with queue size (one print every 1000 items in order not to hurt performance) 2. If main queue is empty keep running and do nothing 3. In case of a timeout from main queue restart queue. [Tom Kregenbild] - - #323 - Change RabbitMQ pika to use Asynchronous SelectConnection instead of BlockingConnection for better performance. [Tom Kregenbild] - - #323 - add the ability to increase the number of Queue consumers by creating additional processes while running with --experimental flag. [Tom Kregenbild] - - #323 - add the ability to increase the number of Queue consumers by creating additional processes. [Tom Kregenbild] - - #323 - print current queue size and number of total number transports in debug mode in order to find problem in transport rate. [Tom Kregenbild] + 34.0.0 (2015-07-24) ------------------- - - Added ssl-tcp key file support. [babbleshack] - - Rename configuration dir and debian bin to python-beaver. [David Moravek] - - Rename debian package back to python-beaver. [David Moravek] - - Debian packaging code review; thx @mnicky. [David Moravek] - - Improves debian packaging. [David Moravek] - - Fix tests when ZMQ is not installed. [David Moravek] - - Fix tests for python 2.7 (add funcsigs test dependency) [David Moravek] - - Move badge to below header. [Jose Diaz-Gonzalez] - - Add constants for data types, validate in init, use callback map. [Hector Castro] - - Move data type method conditional outside of loop. [Hector Castro] - - Add channel support to Redis transport. [Hector Castro] This changeset adds support for publishing log entries to a Redis @@ -76,122 +220,90 @@ Changelog is `list`. Attempts to resolve #266. - - - Introduced a stomp transport for beaver using stomp.py. [Peter Lenderyou] - - Fix references to ConfigParser error classes. [Jose Diaz-Gonzalez] - - Redis transport: handle multiple connections and use them in round robin style. [musil] - - Fixes GELF format according to specs. [Marvin Frick] GELF formatted messages need to be \0 ended. At least for sending over TCP. - - - Kafka round robin partitioner. [David Moravek] - - Solve error: cannot convert argument to integer. [Theofilis George- Nektarios] See at #312 + +33.3.0 (2015-04-08) +------------------- - Basic docs for GELF formatter. [Oleg Rekutin] Also fixes formatting issues with the immediately-preceding HTTP transport example section. - - - Adds a GELF formatter. [Oleg Rekutin] short_message is truncated to 250 characters and only the first line is retained. Pair with the HTTP POST output to write directly to graylog2. - - - Issue #305, accept any 2xx code for http_transport. [Oleg Rekutin] + 33.2.0 (2015-03-11) ------------------- - - Improved kafka test. [Marcel Casado] - - Added example of kafka transport usage in the user docs. [Marcel Casado] - - Added placeholder "dist" directory to download kafka binaries. [Marcel Casado] - - Added integration test support for Kafka transport. [Marcel Casado] - - Wrapped kafka client init in a try catch. [Marcel Casado] - - Initial kafka transport impl. [Marcel Casado] - - Updating config examples and docs. [Jonathan Sabo] - - Adding support for sqs queues in different accounts. [Jonathan Sabo] + 33.1.0 (2015-02-04) ------------------- - - Improved error message for missing logstash_version. [Florian Hopf] Added a comment that the version needs to be set in the config - - Specify stricter dependency on python-daemon, fixes #286. [Graham Floyd] - - Add message_batch size checking since SQS can only handle 256KiB in a batch. Flush queue if message_batch is 10 messages or >= 250KiB. [Lance O'Connor] - - Explained valid values and meaning for rabbitmq_delivery_mode. [Fabio Coatti] - - Added documentation for rabbitmq_delivery_mode configuration parameter. [Fabio Coatti] - - A small change in except syntax. This should make happy python3 and work also in 2.6 and later. [Fabio Coatti] - - When sending a message, now we can tell rabbitmq which delivery mode we want, according to main configuration option rabbitmq_delivery_mode. [Fabio Coatti] - - Added configuration option for rabbitmq deliveryMode. Basically it works like a boolean, but having 1 and 2 as allowed values, we consider it integer and validate it as such. [Fabio Coatti] - - Newline removed. [Fabio Coatti] - - Added stanzas specific redis_namespace key to documentation. [Fabio Coatti] - - Added a space after comma, more compliant with python style guide. [Fabio Coatti] - - Revert "ignored eric files" [Fabio Coatti] This reverts commit ea2a6b27437570aeda3ee53b6c6ebd7ebb1f4f2a. as suggested, leave alone .gitignore :) - - - This small commit allows to specify a redis namespace in file section of configuration file (stanzas). Basically, beaver checks if a redis_namespace is defined for the current file. If yes, it is used for the redis payload. If not (or null), beaver uses the redis_namespace value specified in global section. [Fabio Coatti] - - Added a section (stanza) configuration option in order to be able to specify a redis namespace. If set, it will override the namespace set in main section. Default is null. [Fabio Coatti] - - Ignored eric files. [Fabio Coatti] - - Remove `python-daemon` from requirements on win32. [Ryan Davis] If we're installing on windows, don't require `python-daemon`. This @@ -199,62 +311,48 @@ Changelog trying to install `python-daemon`. refs #141 - - - Use new repository name for travis-ci badge. [Jose Diaz-Gonzalez] + 33.0.0 (2014-10-14) ------------------- - - Extend release script to support new, semver-tagged releases. [Jose Diaz-Gonzalez] - - Add gitchangelog.rc to fix changelog generation. [Jose Diaz-Gonzalez] + 32 (2014-10-14) --------------- - - Allow for the config file to override the logfile's setting. [Aaron France] - - Force update of sincedb when beaver stop. [Pierre Fersing] - - Fixed sincedb_write_interval (Bugs #229). [Pierre Fersing] - - Fix config.get('ssh_options') [svengerlach] ssh_options could never be returned due to a wrong type check - - Add debian packaging based on dh-virtualenv. [Jose Diaz-Gonzalez] - - Zmq3 split HWM into SNDHWM/RCVHWM. Closes #246. [Pete Fritchman] - - Fix typo in usage.rst. [Hugo Lopes Tavares] s/logstash_verion/logstash_version/ - - Fixed badge to point to master branch only. [Jose Diaz-Gonzalez] + 31 (2014-01-25) --------------- Fix ~~~ - - Beaver user can't write its pid nor its log. [Mathieu Lecarme] - Using a folder is the tactic used by Redis on Debian. - + Using a folder is the tactic used by Redis on Debian. Other ~~~~~ - - Add required spacing to readme for proper pypi doc support. [Jose Diaz-Gonzalez] - - Change release process to include processing of documentation. [Jose Diaz-Gonzalez] - - Use GlobSafeConfigParser to parse config files. [Clay Pence] In order to support all of the kinds of globs, pass GlobSafeConfigParser @@ -269,16 +367,11 @@ Other This should fix the unit test to run properly when run from the main directory. - - - Fix redis_transport.py redis exception handling. Fixes #238. [Hugo Lopes Tavares] - - Attempt to fix memory leaks. Closes #186. [Jose Diaz-Gonzalez] - - Allow for newer versions of boto to be used. Closes #236. [Jose Diaz- Gonzalez] - - When shipping logs, use millisecond-precision timestamps. [Ryan Park] Logstash 1.3.2 has a problem with microsecond-precision timestamps in the @@ -293,90 +386,58 @@ Other This patch reduces @timestamp to millisecond precision, which should correct the problem with Beaver 1.3.2. - - - Improve compatibility with case-sensitive filesystems. [Jose Diaz- Gonzalez] - - Modify test cases to support logstash_version. [Jose Diaz-Gonzalez] - - Document usage of logstash_version. [Peter Burkholder] - - Add add_field_env option to the config file to allow fields to be added using values from the environment. [Lance O'Connor] Closes #214 - - - Add SSL/TLS support to the RabbitMQ transport. Closes #217. [Jonathan Harker] - - Added http transport option. Closes #218. [Jeff Bryner] - - Adding missing config file option 'rabbitmq_queue_durable'. [Daniel Whelan] - - `StrictRedis.from_url` is better than DIY-ing it. [Kristian Glass] Note currently `fakeredis` doesn't support `from_url` - this is blocking on https://github.com/jamesls/fakeredis/pull/29 being merged in (I've bumped version requirement in `tests.txt` accordingly) - - - Python 2.6 ConfigParser does not handle non-string Fixed typo. [tommyulfsparre] - - Dont add empty object to input list. [tommyulfsparre] - - Import threading library in tail manager since we want to use it. [Chris Roberts] - - Add SSL to the TCP Transport. [Simon McCartney] - - Redirect all docs to readthedocs. Refs #150. [Jose Diaz-Gonzalez] - - Readthedocs support. Closes #150. [Jose Diaz-Gonzalez] - - Convert producer to process. Allow timed producer culling. [Chris Roberts] - - Make consumer check threaded to prevent wedge state. [Chris Roberts] - - Don't crash on a string decoding exception. [Adam Twardowski] - - Set transport as valid on connect (properly resets for reconnect) [Chris Roberts] - - Handle publication failures in the TCP transport correctly. [Kiall Mac Innes] - - Add config option to manipulate ssh_options. [Andreas Lappe] This option allows to pass all ssh options to the tunnel. - - - Fix version lookup. [Jose Diaz-Gonzalez] - - Moved multiline_merge function to utils.py. [Pierre Fersing] - - Support for multi-line and tail_lines options. [Pierre Fersing] - - Support for multi-line events in tail-version. [Pierre Fersing] - - Support for multi-line events. [Pierre Fersing] - - Ignore invalid rawjson log. [Tomoyuki Sakurai] this ensures beaver keeps running even when other application logged logs in invalid json format. - - - Removed duplicate self._current_host from @source field. Fixes #180. [Alexander Papaspyrou] + 30 (2013-08-22) --------------- - - Use os._exit over sys.exit in signal handlers to quit cleanly. [Kristian Glass] @@ -387,8 +448,6 @@ Other (http://docs.python.org/2/library/sys.html#sys.exit) which can be caught by try/except blocks that might have been executing at time of signal handling, resulting in beaver failing to quit - - - Allow string escapes in delimiter. [Michael Mittelstadt] As far as I can tell, there is no way for me to represent a newline as @@ -410,88 +469,57 @@ Other By doing this sort of multiline parsing with beaver, it allows one to run logstash without the multiline filter, which due to its lack of thread-safety, forces you to run logstash with only one worker thread. - - - CONFIG_DIR to CONFD_PATH. [iyingchi] - - Added doc for -C option for config directory. [iyingchi] - - Fixed example in Readme.rst for sqs_aws_secret_key. [Jonathan Quail] - - Allow path to be None. [Lars Hansson] Allow path to be set empty (None) in the configuration filer. This way all files and globs can be configured in files in confd_path. - - - Fix zmq transport tests. [Scott Smith] - - Move zmq address config parsing into _main_parser. [Scott Smith] - - Allow specifying multiple zmq addresses to bind/connect. [Scott Smith] - - Redis 2.4.11 is no longer available on Pypi. [Andrew Gross] Fixes issue #167 - - Add a TCP transport. [Kiall Mac Innes] - - Isolate connection logic. Provide proper reconnect support. [Chris Roberts] - - Corrected documentation for exclude tag. Closes #157. [Jose Diaz- Gonzalez] - - Add missing sqlite3 module to documentation. [Andreas Lappe] - - Tests status. [Denis Orlikhin] - - Travis integration. [Denis Orlikhin] - - Tests fix (conf_d does work without existing file) [Denis Orlikhin] - - Implicit broken zmq error handling. [Denis Orlikhin] + 29 (2013-05-24) --------------- - - Do not harcode path in TailManager. Closes #143. [Jose Diaz-Gonzalez] - - Use /etc/beaver/conf for path and provide conf.d example. Closes #149. [Jose Diaz-Gonzalez] - - Added mqtt as option in argparse configuration for the transport flag. [Jose Diaz-Gonzalez] - - Fixed broken MqttTransport naming. [Jose Diaz-Gonzalez] - - Refactored BeaverSubprocess to maintain the running command as an attribute. [Jose Diaz-Gonzalez] - - Properly parse the beaver conf.d path for new sections. Closes #144. Closes #145. Refs #107. [Jose Diaz-Gonzalez] - - Use a Buffered Tokenizer to read large/fast incoming log input. Refs #135. Refs #105. [Jose Diaz-Gonzalez] - - Close queue after worker has been stopped. Refs #135. [Jose Diaz- Gonzalez] - - Wrap manager.close() call in try/except to mimic the worker dispatcher. [Jose Diaz-Gonzalez] - - Properly parse out the port from the `ssh_tunnel` option. Closes #142. [Jose Diaz-Gonzalez] - - Subclass the BaseLog class in BeaverSubprocess. Refs #142. [Jose Diaz- Gonzalez] - - Move base_log module higher up in hierarchy. Refs #142. [Jose Diaz- Gonzalez] - - Disable daemonization on the windows platform. Closes #141. [Jose Diaz-Gonzalez] - - Move file unwatching in old-style worker out of for-loop. Refs #139. [Jose Diaz-Gonzalez] @@ -502,52 +530,35 @@ Other We should also take care to catch `RuntimeError` which may arise when closing the Worker out of band - such as in the `cleanup` step of the worker dispatcher - but nowhere else. This should fix issues where logrotate suddenly causes files to disappear for a time and beaver tries to tail the file at the exact time it is being recreated. - - - Typo in SQS docs. [Jonathan Quail] - - Remove ujson requirement. [Jose Diaz-Gonzalez] This allows users that do not have a compiler in their deployment area to install beaver. Closes #137 - - Turn on logfile output when running in non-daemon contexts. Closes #131. [Jose Diaz-Gonzalez] - - Expand logging output path. Closes #133. [Jose Diaz-Gonzalez] - - Ensure logging to a file does not destroy regular logging. Closes #132. [Jose Diaz-Gonzalez] - - Properly handle unreadable files by logging a warning instead of crashing. Closes #130. [Jose Diaz-Gonzalez] - - Rename null_formatter to raw_formatter in BaseTransport class. [Jose Diaz-Gonzalez] - - Ensure that the RedisTransport calls the super invalidate method. Refs #93. [Jose Diaz-Gonzalez] - - Fix issue where input type was not being detected properly. [Jose Diaz-Gonzalez] - - Use logfile flag for sending all output to a file in daemon contexts. [Jose Diaz-Gonzalez] - - Expand path for pidfile creation. [Jose Diaz-Gonzalez] - - Properly handle redis reconnects when the datastore becomes unreacheable. Refs #93. [Jose Diaz-Gonzalez] - - 'type' instead of 'exchange_type' in recent pika vers. [Pravir Chandra] - - Adding options to make queues durable and HA. [Pravir Chandra] - - Respect stat_interval file configuration in stable worker. [Jose Diaz- Gonzalez] - - Unified configuration file using conf_d module. [Jose Diaz-Gonzalez] This change adds support for a conf.d directory - configured only via the `--confd-path` flag - which allows beaver to read configuration from multiple files. @@ -559,299 +570,201 @@ Other Please note that this commit BREAKS custom transport classes, as the interface for creating a transport class has changed. If you are referencing a `file_config.get(field, filename)` anywhere, please omit this and refer to `beaver_config.get_field(field, filename)`. Closes #107 - - - Hack to prevent stupid TypeError: 'NoneType' when running tests via setup.py. [Jose Diaz-Gonzalez] - - Properly handle rotated files on Darwin architectures. [Jose Diaz- Gonzalez] - - Log to debug instead of warning for file reloading on Darwin architectures. [Jose Diaz-Gonzalez] - - Speed up experimental worker. [Jose Diaz-Gonzalez] - Removed inline sleep call, which slowed down passes n*0.1 seconds, where n is the number of files being tailed - Inline methods that update data structures which should speed up larger installations - Make self.active() an attribute lookup instead of a method call - - - Use latest version of message pack interface (0.3.0). Closes #128. [Jose Diaz-Gonzalez] - - Alternative for reading python requirements. [Justin Lambert] - - Fix options sent from original worker to queue. Refs #119. [Jose Diaz- Gonzalez] - - Allow users to ignore the results of a copytruncate from logrotate. Refs #105. [Jose Diaz-Gonzalez] - - Fix rpm package building. Closes #123. [Jose Diaz-Gonzalez] - - Added experimental tail-version of beaver. [Jose Diaz-Gonzalez] - - Beginning work to move from an omniscient worker to individual tail objects. [Jose Diaz-Gonzalez] - - Fix kwargs call. [Jose Diaz-Gonzalez] - - Add formatting to mqtt transport. Closes #115. [Jose Diaz-Gonzalez] - - Retrieve more data from callback to minimize dictionary lookups. [Jose Diaz-Gonzalez] - - Prefer single quotes to double quotes where possible. [Jose Diaz- Gonzalez] - - Ensure stat_interval and tail_lines are both integer values. [Jose Diaz-Gonzalez] - - Alphabetize config variables for file_config. [Jose Diaz-Gonzalez] - - Ensure that debug flag is a boolean. [Jose Diaz-Gonzalez] - - Follow logstash covention for 'format' instead of 'message_format' [Jose Diaz-Gonzalez] - - Use passed in 'ignore_empty' field instead of a file_config lookup in queue module. [Jose Diaz-Gonzalez] - - Prefer discover_interval over update_file_mapping_time. [Jose Diaz- Gonzalez] - - Fix TransportException import. Closes #122. [Jose Diaz-Gonzalez] - - Auto-reconnect mechanism for the SSH tunnel. [Michael Franz Aigner] - - Use an alternative method of reading in requirements. Refs #120. [Jose Diaz-Gonzalez] - - Fix import of REOPEN_FILES constant in dispatcher.py. [Jose Diaz- Gonzalez] - - Fix a PEP8 violation. [Jose Diaz-Gonzalez] - - Ensure all files are utf-8 encoded. [Jose Diaz-Gonzalez] - - Namespace transport classes in the transport module. [Jose Diaz- Gonzalez] - - Allow specifying debug mode via argument. [Jose Diaz-Gonzalez] - - Added thread-safety to datetime calls. [Jose Diaz-Gonzalez] - - Added support for message_format. Closes #91. [Jose Diaz-Gonzalez] - - Add msgpack_pure as fallback for C-Based msgpack package. [Jose Diaz- Gonzalez] - - Fix issues in sincedb implementation. Refs #116. [Jose Diaz-Gonzalez] - - Fix casting issue when checking start_position. [Jose Diaz-Gonzalez] - - Properly handle Queue.Full exceptions. [Jose Diaz-Gonzalez] - - More logging. [Jose Diaz-Gonzalez] - - Expand the sincedb path on configuration parse. [Jose Diaz-Gonzalez] - - Ignore since.db files. [Jose Diaz-Gonzalez] - - Simplified sincedb support to handle an edge case. Refs #116. [Jose Diaz-Gonzalez] - - Remove errant print. [Jose Diaz-Gonzalez] - - Added support for file exclusion in config stanzas. Closes #106. [Jose Diaz-Gonzalez] - - Added python regex exclusion support to eglob. Refs #106. [Jose Diaz- Gonzalez] - - PEP8. [Jose Diaz-Gonzalez] - - Added a tests directory with some sample tests from users. [Jose Diaz- Gonzalez] - - Convert the 'sincedb_write_interval' option to an integer. Refs #116. [Jose Diaz-Gonzalez] - - Moved logger call to a more intelligent spot. [Jose Diaz-Gonzalez] - - Ensure that we use the proper encoding when opening a file. Closes #104. [Jose Diaz-Gonzalez] - - Centralize file-reading using classmethod open() [Jose Diaz-Gonzalez] - - Fixed issue where tailed lines were not being properly sent to the callback. [Jose Diaz-Gonzalez] - - Remove unnecessary argument from Worke.__init__() [Jose Diaz-Gonzalez] - - Force-parse non-unicode files using unicode_dammit. [Jose Diaz- Gonzalez] - - Set utf-8 as default encoding on all python files. [Jose Diaz- Gonzalez] - - Fixed pyflakes issues. [rtoma] - - Syntax fix of list. [rtoma] - - Raise an AssertionError when run in daemon without a pid path specified. Closes #112. [Jose Diaz-Gonzalez] - - Add support for ignoring empty lines. [Jose Diaz-Gonzalez] - - Properly cast boolean values from strings. [Jose Diaz-Gonzalez] - - Ensure all sections have the proper values on start. [Jose Diaz- Gonzalez] - - Ensure internal file_config state is updated. [Jose Diaz-Gonzalez] - - Pass in timestamp from worker class for more accurate timestamps at the cost of speed of sending. [Jose Diaz-Gonzalez] - - Centralize timestamp retrieval to base transport class. [Jose Diaz- Gonzalez] - - Added support for gzipped files. refs #39. [Jose Diaz-Gonzalez] - - Added support for sqlite3-based sincedb. Refs #6 and #39. [Jose Diaz- Gonzalez] - - Refactored worker so as to allow further data to be added to the file_map. [Jose Diaz-Gonzalez] - - Refactor seek_to_end to properly support file tailing. [Jose Diaz- Gonzalez] - - Added support for pubsub zmq. [Jose Diaz-Gonzalez] - - Added support for mosquitto transport. [Jose Diaz-Gonzalez] - - Added support for specifying file encoding, using io.open vs os.open. [Jose Diaz-Gonzalez] - - Fix issue where a field may not exist in the data. [Jose Diaz- Gonzalez] - - Added support for rawjson format. [Jose Diaz-Gonzalez] - - Fixed zeromq tests. [Jose Diaz-Gonzalez] - - Added SQS transport. [Jonathan Quail] - - Fixing outdated transport docs. [Morgan Delagrange] + 28 (2013-03-05) --------------- - - BeaverSubprocess is now a new-style class. Fixes ssh_tunneling. [Jose Diaz-Gonzalez] + 27 (2013-03-05) --------------- - - Fix issue where super method was not called in BeaverSshTunnel. [Jose Diaz-Gonzalez] + 26 (2013-03-05) --------------- - - Add optional reconnect support for transports. Refs #93. [Jose Diaz- Gonzalez] - - Add a method for checking the validity of a Transport. Refs #93. [Jose Diaz-Gonzalez] - - Added a configurable subprocess poll sleep. [Jose Diaz-Gonzalez] - - Add a deafult sleep timeout to BeaverSubprocess polling. [Jose Diaz- Gonzalez] - - Use a larger sleep time to get around redis over ssh connection issues. [Jose Diaz-Gonzalez] + 25 (2013-03-05) --------------- - - Use True instead of 1 for while check. [Jose Diaz-Gonzalez] - - Fix orphan child processes. Closes #103. [Jose Diaz-Gonzalez] + 24 (2013-02-26) --------------- - - Ensure new files are added to a transports configuration. Closes #96. Closes #101. [Jose Diaz-Gonzalez] - - Allow float numbers for update_file_mapping_time. [Jose Diaz-Gonzalez] - - Fix invalid casting of boolean values. [Jose Diaz-Gonzalez] - - Perform all conversions in config.py. Closes #99. [Jose Diaz-Gonzalez] + 23 (2013-02-20) --------------- - - Worker: pretty format debug message "Iteration took %.6f" [Sergey Shepelev] - - Zeromq_hwm int() conversion moved to config. [Denis Orlikhin] - - Zeromq_hwm config entry. [Denis Orlikhin] - - Zeromq_hwm support. [Denis Orlikhin] - - Add test requirements to setup. [Paul Garner] - - Allow beaver to accept custom transport classes. [Paul Garner] - - Rabbitmq_exchange_type option fixed in the README. [Xabier de Zuazo] - - Make beaver slightly more amenable to test mocking and sort of fix the broken zmq test. [Paul Garner] + 22 (2013-01-15) --------------- - - Handle sigterm properly. Refs #87. [Jose Diaz-Gonzalez] - - Add --loglevel as alias for --output. Closes #92. [Jose Diaz-Gonzalez] - - Added logging on connection exception. [Thomas Morse] - - Adding exception when redis connection can't be confirmed. [William Jimenez] - - Add '--format raw' to pass through input unchanged. [Stephen Sugden] - - Fix string & null formatters in beaver.transport. [Stephen Sugden] the inline definitions were expecting a self parameter, which is *not* passed when you assign a function to an attribute on an object instance. - - Call file.readlines() with sizehint in a loop to avoid reading in massive files all at once. [Jose Diaz-Gonzalez] + 21 (2013-01-04) --------------- - - Move runner into a dispatcher class to solve installation issues. [Jose Diaz-Gonzalez] - - Added note for Python 2.6+ support. [Jose Diaz-Gonzalez] + 20 (2013-01-03) --------------- - - Copy the readme over to avoid pypi packaging warnings. [Jose Diaz- Gonzalez] - - Implement fully recursive file globing. [Brian L. Troutwine] Python's base glob.iglob does not operate as if globstar were in effect. To @@ -868,39 +781,31 @@ Other webmachine logs would be ignored by beaver. This is no longer the case, to an arbitrary depth. - Signed-off-by: Brian L. Troutwine - 19 (2013-01-01) --------------- - - Fix issue with supporting command line args. [Jose Diaz-Gonzalez] + 18 (2012-12-31) --------------- - - Add timing debug information to the worker loop. [Jose Diaz-Gonzalez] - - Use redis pipelining when sending events. [Jose Diaz-Gonzalez] - - Formatting. [Jose Diaz-Gonzalez] - - Do not output debug statement for file_config.get call. [Jose Diaz- Gonzalez] - - Pass in logger object to create_ssh_tunnel() [Jose Diaz-Gonzalez] + 17 (2012-12-28) --------------- - - Added missing python-daemon requirement. [Jose Diaz-Gonzalez] + 16 (2012-12-27) --------------- - - Specify a max queue size of 100 to limit overrunning memory. [Jose Diaz-Gonzalez] - - Use multiprocessing for handling larger queue sizes. [Jose Diaz- Gonzalez] @@ -909,63 +814,41 @@ Other This patch adds the ability to use an internal work queue for log lines. Whenever file.readlines() is called, the lines are placed in the queue, which is shared with a child process. The child process creates its own transport, allowing us to potentially create a Process Pool in the future to handle a larger queue size. Note that the limitation of file.readlines() reading in too many lines is still in existence, and may continue to cause issues for certain log files. - - - Add default redis_password to BeaverConfig class. [Jose Diaz-Gonzalez] - - Fix missing underscore causing transport to break. [Norman Joyner] - - Implement redis auth support. [Norman Joyner] - - Add beaver init script for daemonization mode. [Jose Diaz-Gonzalez] - - Use python logger when using StdoutTransport. [Jose Diaz-Gonzalez] - - Add short arg flags for hostname and format. [Jose Diaz-Gonzalez] - - Add the ability to daemonize. Closes #79. [Jose Diaz-Gonzalez] - - Pass around a logger instance to all transports. [Jose Diaz-Gonzalez] - - Revert "Added a lightweight Event class" [Jose Diaz-Gonzalez] After deliberation, beaver is meant to be "light-weight". Lets leave the heavy-hitting to the big-boys. This reverts commit 1619d33ef4803c3fe910cf4ff197d0dd0039d2eb. - - - Added a lightweight Event class. [Jose Diaz-Gonzalez] This class's sole responsibility will be the processing of a given line as an event. It's future goal will be to act as a lightweight implementation of the filter system within Logstash - - - Remove argparse requirement for python 2.7 and above. [Jose Diaz- Gonzalez] + 15 (2012-12-25) --------------- - - Pull argument parsing out of beaver __init__.py. [Jose Diaz-Gonzalez] - - Move app-running into __init__.py. [Jose Diaz-Gonzalez] - - Standardize on _parse() as method for parsing config. [Jose Diaz- Gonzalez] - - Automatically parse the path config option. [Jose Diaz-Gonzalez] - - Remove extensions argument on Worker class. [Jose Diaz-Gonzalez] This argument was only used when no globs were specified in a config file. Since it is not configurable, there is no sense leaving around the extra logic. - - - Remove extra callback invocation on readlines. [Jose Diaz-Gonzalez] - - Remove extra file_config module. [Jose Diaz-Gonzalez] - - General code reorganization. [Jose Diaz-Gonzalez] Move both BeaverConfig and FileConfig into a single class @@ -977,27 +860,18 @@ Other Moved extra configuration and setup code to beaver.utils module. In many cases, code was added hastily before. Made many logger calls debug as opposed to info. The info level should be generally reserved for instances where files are watched, unwatched, or some change in the file state has occurred. - - - Remove duplicative and old beaver instructions from binary. [Jose Diaz-Gonzalez] - - Remove unnecessary passing of ssh_tunnel subprocess. [Jose Diaz- Gonzalez] - - Added docstrings to ssh_tunnel module. [Jose Diaz-Gonzalez] - - Follow convention of underscore for object properties. [Jose Diaz- Gonzalez] - - Follow convention of underscore for object properties. [Jose Diaz- Gonzalez] - - Added a NullFormatter. [Jose Diaz-Gonzalez] Useful for cases where we do not want any extra overhead on message formatting - - - Refactored message formatting in base Transport class. [Jose Diaz- Gonzalez] @@ -1006,61 +880,44 @@ Other In the case of string output, we define a custom formatter using an anonymous function and specify that as the formatter. - - - Moved create_transport to transport module. [Jose Diaz-Gonzalez] - - Moved create_ssh_tunnel to ssh_tunnel module. [Jose Diaz-Gonzalez] - - Fixed order of beaver_config and file_config in args. [Jose Diaz- Gonzalez] - - Reduce overhead of parsing configuration for globs and files. [Jose Diaz-Gonzalez] - - Removed ordereddict dependency. [Jose Diaz-Gonzalez] - - Do not output info level when outputing version. [Jose Diaz-Gonzalez] - - Allow usage of ujson >= 1.19. Closes #76. [Jose Diaz-Gonzalez] + 14 (2012-12-18) --------------- - - Removed erroneous redundant code. [Jose Diaz-Gonzalez] - - Workaround for differing iteration implementation in Python 2.6. [Jose Diaz-Gonzalez] - - Properly detect non-linux platforms. [Jose Diaz-Gonzalez] - - Improve Python 2.6 support. [Jose Diaz-Gonzalez] - - Fix broken python readme. [Jose Diaz-Gonzalez] + 13 (2012-12-17) --------------- - - Fixed certain environment variables. [Jose Diaz-Gonzalez] - - SSH Tunnel Support. [Jose Diaz-Gonzalez] This code should allow us to create an ssh tunnel between two distinct servers for the purposes of sending and receiving data. This is useful in certain cases where you would otherwise need to whitelist in your Firewall or iptables setup, such as when running in two different regions on AWS. - - - Allow for initial connection lag. Helpful when waiting for an SSH proxy to connect. [Jose Diaz-Gonzalez] - - Fix issue where certain config defaults were of an improper value. [Jose Diaz-Gonzalez] - - Allow specifying host via flag. Closes #70. [Jose Diaz-Gonzalez] + 12 (2012-12-17) --------------- - - Reload tailed files on non-linux platforms. [Jose Diaz-Gonzalez] Python has an issue on OS X were the underlying C implementation of @@ -1072,8 +929,6 @@ Other Note that this also causes debug mode to be very noisy on OS X. We all have to make sacrifices... - - - Deprecate all environment variables. [Jose Diaz-Gonzalez] This shifts configuration management into the BeaverConfig class. @@ -1082,16 +937,13 @@ Other Refs #72 Closes #60 - - - Warn when using deprecated ENV variables for configuration. Refs #72. [Jose Diaz-Gonzalez] - - Minor changes for PEP8 conformance. [Jose Diaz-Gonzalez] + 11 (2012-12-16) --------------- - - Add optional support for socket.getfqdn. [Jeremy Kitchen] For my setup I need to have the fqdn used at all times since my @@ -1103,8 +955,6 @@ Other FQDN, it's now an option to explicitly always use the fqdn. Fixes #68 - - - Check for log file truncation fixes #55. [Jeremy Kitchen] This adds a simple check for log file truncation and resets the watch @@ -1121,183 +971,126 @@ Other Additionally, the files beaver will most likely be called upon to watch which may be truncated are generally going to be large enough and slow-filling enough that this won't crop up in the wild. - - - Add a version number to beaver. [Jose Diaz-Gonzalez] + 10 (2012-12-15) --------------- - - Fixed package name. [Jose Diaz-Gonzalez] - - Regenerate CHANGES.rst on release. [Jose Diaz-Gonzalez] - - Adding support for /path/{foo,bar}.log. [Josh Braegger] - - Consistency. [Chris Faulkner] - - Stating the obvious. [Chris Faulkner] - - Grist for the mill. [Chris Faulkner] - - Drop redundant README.txt. [Chris Faulkner] - - Ignore file errors in unwatch method -- the file might not exists. [Josh Braegger] - - Unwatch file when encountering a stale NFS handle. When an NFS file handle becomes stale (ie, file was removed), it was crashing beaver. Need to just unwatch file. [Josh Braegger] - - Consistency. [Chris Faulkner] - - Pull install requirements from requirements/base.txt so they don't get out of sync. [Chris Faulkner] - - Include changelog in setup. [Chris Faulkner] - - Convert changelog to RST. [Chris Faulkner] - - Actually show the license. [Chris Faulkner] - - Consistent casing. [Chris Faulkner] - - Don't use empty string for tag when no tags configured in config file. [Stylianos Modes] - - Making 'mode' option work for zmqtransport. Adding setuptools and tests (use ./setup.py nosetests). Adding .gitignore. [Josh Braegger] + 9 (2012-11-28) -------------- - - More release changes. [Jose Diaz-Gonzalez] - - Fixed deprecated warning when declaring exchange type. [Rafael Fonseca] + 8 (2012-11-28) -------------- - - Removed deprecated usage of e.message. [Rafael Fonseca] - - Fixed exception trapping code. [Rafael Fonseca] - - Added some resiliency code to rabbitmq transport. [Rafael Fonseca] + 7 (2012-11-28) -------------- - - Added a helper script for creating releases. [Jose Diaz-Gonzalez] - - Partial fix for crashes caused by globbed files. [Jose Diaz-Gonzalez] -6 (2012-11-26) --------------- +6 (2012-12-15) +-------------- - Fix issue where polling for files was done incorrectly. [Jose Diaz- Gonzalez] - - Added ubuntu init.d example config. [Jose Diaz-Gonzalez] -5 (2012-11-26) --------------- +5 (2012-12-15) +-------------- - Try to poll for files on startup instead of throwing exceptions. Closes #45. [Jose Diaz-Gonzalez] - - Added python 2.6 to classifiers. [Jose Diaz-Gonzalez] -4 (2012-11-26) --------------- +4 (2012-12-15) +-------------- - Remove unused local vars. [Jose Diaz-Gonzalez] - - Allow rabbitmq exchange type and durability to be configured. [Jose Diaz-Gonzalez] - - Remove unused import. [Jose Diaz-Gonzalez] - - Formatted code to fix PEP8 violations. [Jose Diaz-Gonzalez] - - Use alternate dict syntax for Python 2.6 support. Closes #43. [Jose Diaz-Gonzalez] - - Fixed release date for version 3. [Jose Diaz-Gonzalez] + 3 (2012-11-25) -------------- - - Added requirements files to manifest. [Jose Diaz-Gonzalez] - - Include all contrib files in release. [Jose Diaz-Gonzalez] - - Revert "removed redundant README.txt" to follow pypi standards. [Jose Diaz-Gonzalez] This reverts commit e667f63706e0af8bc82c0eac6eac43318144e107. - - - Added bash startup script. Closes #35. [Jose Diaz-Gonzalez] - - Added an example supervisor config for redis. closes #34. [Jose Diaz- Gonzalez] - - Removed redundant README.txt. [Jose Diaz-Gonzalez] - - Added classifiers to package. [Jose Diaz-Gonzalez] - - Re-order workers. [Jose Diaz-Gonzalez] - - Re-require pika. [Jose Diaz-Gonzalez] - - Make zeromq installation optional. [Morgan Delagrange] - - Formatting. [Jose Diaz-Gonzalez] - - Added changes to changelog for version 3. [Jose Diaz-Gonzalez] - - Timestamp in ISO 8601 format with the "Z" sufix to express UTC. [Xabier de Zuazo] - - Adding udp support. [Morgan Delagrange] - - Lpush changed to rpush on redis transport. This is required to always read the events in the correct order on the logstash side. See: https: //github.com/logstash/logstash/blob/6f745110671b5d9d66bf082fbfed99d145 af4620/lib/logstash/outputs/redis.rb#L4. [Xabier de Zuazo] -2 (2012-10-25) --------------- +2 (2012-11-25) +-------------- - Example upstart script. [Michael D'Auria] - - Fixed a few more import statements. [Jose Diaz-Gonzalez] - - Fixed binary call. [Jose Diaz-Gonzalez] - - Refactored logging. [Jose Diaz-Gonzalez] - - Improve logging. [Michael D'Auria] - - Removed unnecessary print statements. [Jose Diaz-Gonzalez] - - Add default stream handler when transport is stdout. Closes #26. [bear (Mike Taylor)] - - Better exception handling for unhandled exceptions. [Michael D'Auria] - - Handle the case where the config file is not present. [Michael D'Auria] - - Fix wrong addfield values. [Alexander Fortin] - - Add add_field to config example. [Alexander Fortin] - - Add support for add_field into config file. [Alexander Fortin] - - Minor readme updates. [Jose Diaz-Gonzalez] - - Add support for type reading from INI config file. [Alexander Fortin] Add support for symlinks in config file @@ -1317,116 +1110,70 @@ Other Conflicts: README.rst - - - Support globs in file paths. [Darren Worrall] - - When sending data over the wire, use UTC timestamps. [Darren Worrall] - - Added msgpack support. [Jose Diaz-Gonzalez] - - Use the python logging framework. [Jose Diaz-Gonzalez] - - Fixed Transport.format() method. [Jose Diaz-Gonzalez] - - Properly parse BEAVER_FILES env var. [Jose Diaz-Gonzalez] - - Refactor transports. [Jose Diaz-Gonzalez] - Fix the json import to use the fastest json module available - Move formatting into Transport class - - - Attempt to fix defaults from env variables. [Jose Diaz-Gonzalez] - - Fix README and beaver CLI help to reference correct RABBITMQ_HOST environment variable. [jdutton] - - Add RabbitMQ support. [Alexander Fortin] - - Added real-world example of beaver usage for tailing a file. [Jose Diaz-Gonzalez] - - Removed unused argument. [Jose Diaz-Gonzalez] - - Ensure that python-compatible readme is included in package. [Jose Diaz-Gonzalez] - - Fix variable naming and timeout for redis transport. [Jose Diaz- Gonzalez] - - Installation instructions. [Jose Diaz-Gonzalez] - - Use restructured text for readme instead of markdown. [Jose Diaz- Gonzalez] - - Removed unnecessary .gitignore. [Jose Diaz-Gonzalez] -1 (2012-08-06) --------------- +1 (2012-11-25) +-------------- - Moved app into python package format. [Jose Diaz-Gonzalez] - - Moved binary beaver.py to bin/beaver, as per python packaging. [Jose Diaz-Gonzalez] - - Moved around transports to be independent of each other. [Jose Diaz- Gonzalez] - - Reorder transports. [Jose Diaz-Gonzalez] - - Rewrote run_worker to throw exception if all transport options have been exhausted. [Jose Diaz-Gonzalez] - - Rename Amqp -> Zmq to avoid confusion with RabbitMQ. [Alexander Fortin] - - Added choices to the --transport argument. [Jose Diaz-Gonzalez] - - Fixed derpy formatting. [Jose Diaz-Gonzalez] - - Added usage to the readme. [Jose Diaz-Gonzalez] - - Support usage of environment variables instead of arguments. [Jose Diaz-Gonzalez] - - Fixed files argument parsing. [Jose Diaz-Gonzalez] - - One does not simply license all the things. [Jose Diaz-Gonzalez] - - Add todo to readme. [Jose Diaz-Gonzalez] - - Added version to pyzmq. [Jose Diaz-Gonzalez] - - Added license. [Jose Diaz-Gonzalez] - - Reordered imports. [Jose Diaz-Gonzalez] - - Moved all transports to beaver/transports.py. [Jose Diaz-Gonzalez] - - Calculate current timestamp at most once per callback fired. [Jose Diaz-Gonzalez] - - Modified transports to include proper information for ingestion in logstash. [Jose Diaz-Gonzalez] - - Fixed package imports. [Jose Diaz-Gonzalez] - - Removed another compiled python file. [Jose Diaz-Gonzalez] - - Use ujson instead of simplejson. [Jose Diaz-Gonzalez] - - Ignore compiled python files. [Jose Diaz-Gonzalez] - - Fixed imports. [Jose Diaz-Gonzalez] - - Fixed up readme instructions. [Jose Diaz-Gonzalez] - - Refactor transports so that connections are no longer global. [Jose Diaz-Gonzalez] - - Readme and License. [Jose Diaz-Gonzalez] - - First commit. [Jose Diaz-Gonzalez] diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md new file mode 100644 index 00000000..a5e4a466 --- /dev/null +++ b/CONTRIBUTING.md @@ -0,0 +1,145 @@ +# Contributing to Python-Beaver + +♥ [Beaver](https://github.com/python-beaver/python-beaver) and want to get involved? +Thanks! There are plenty of ways you can help! + +Please take a moment to review this document in order to make the contribution +process easy and effective for everyone involved. + +Following these guidelines helps to communicate that you respect the time of +the developers managing and developing this open source project. In return, +they should reciprocate that respect in addressing your issue or assessing +patches and features. + +## Using the issue tracker + +The [issue tracker](https://github.com/python-beaver/python-beaver/issues) is +the preferred channel for [bug reports](#bugs), [features requests](#features) +and [submitting pull requests](#pull-requests). + + +## Bug reports + +A bug is a _demonstrable problem_ that is caused by the code in the repository. +Good bug reports are extremely helpful - thank you! + +Guidelines for bug reports: + +1. **Use the GitHub issue search** — check if the issue has already been + reported. + +2. **Check if the issue has been fixed** — try to reproduce it using the + latest `master` branch in the repository. + +3. **Reproduce the problem** — providing as much detail as possible so the + bug can be identified and replicated locally. + +A good bug report shouldn't leave others needing to chase you up for more +information. Please try to be as detailed as possible in your report. What is +your environment? What steps will reproduce the issue? What OS and Beaver +versions were running when you experienced the outcome? What would you expect to +be the outcome? All these details will help people to fix any potential bugs. + +Example: + +> Short and descriptive example bug report title +> +> A summary of the issue and the OS\application environment in which it occurs. +> If suitable, include the steps required to reproduce the bug. +> +> 1. This is the first step +> 2. This is the second step +> 3. Further steps, etc. +> +> Config and (if applicable) log file examples in code tags, or via a [Gist](https://gist.github.com). +> +> Any other information you want to share that is relevant to the issue being +> reported. This might include the lines of code that you have identified as +> causing the bug, and potential solutions (and your opinions on their +> merits). + + + +## Feature requests + +Feature requests are welcome. But take a moment to find out whether your idea +fits with the scope and aims of the project. It's up to *you* to make a strong +case to convince the project's developers of the merits of this feature. Please +provide as much detail and context as possible. + + + +## Pull requests + +Good pull requests - patches, improvements, new features - are a fantastic +help. They should remain focused in scope and avoid containing unrelated +commits. + +**Please ask first** before embarking on any significant pull request (e.g. +implementing features, refactoring code, porting to a different language), +otherwise you risk spending a lot of time working on something that the +project's developers might not want to merge into the project. + +Please adhere to the coding conventions used throughout a project (indentation, +accurate comments, etc.) and any other requirements (such as test coverage). + +Adhering to the following process is the best way to get your work +included in the project: + +1. [Fork](https://help.github.com/articles/fork-a-repo/) the project, clone your + fork, and configure the remotes: + + ```bash + # Clone your fork of the repo into the current directory + git clone https://github.com//python-beaver.git + # Navigate to the newly cloned directory + cd python-beaver + # Assign the original repo to a remote called "upstream" + git remote add upstream https://github.com/python-beaver/python-beaver.git + ``` + +2. If you cloned a while ago, get the latest changes from upstream: + + ```bash + git checkout master + git pull upstream master + ``` + +3. Create a new topic branch (off the main project development branch) to + contain your feature, change, or fix: + + ```bash + git checkout -b + ``` + +4. Commit your changes in logical chunks. Please adhere to these [git commit + message guidelines](http://tbaggery.com/2008/04/19/a-note-about-git-commit-messages.html) + or your code is unlikely be merged into the main project. Use Git's + [interactive rebase](https://help.github.com/articles/about-git-rebase/) + feature to tidy up your commits before making them public. + - Test cases should accompany any change in functionality to the code-base + - Any change to end-user functionality should include corresponding documentation + +5. Locally merge (or rebase) the upstream development branch into your topic branch: + + ```bash + git pull [--rebase] upstream master + ``` + +6. Run the test suite locally, ensuring they pass: + + ```bash + nosetests #./install-dependencies.sh to install the pre-requirements + ``` + +7. Push your topic branch up to your fork: + + ```bash + git push origin + ``` + +8. [Open a Pull Request](https://help.github.com/articles/using-pull-requests/) + with a clear title and description. + +**IMPORTANT**: By submitting a patch, you agree to allow the project +owners to license your work under the terms of the [License](LICENSE.txt). diff --git a/README.rst b/README.rst index 720acdc3..ed89ca02 100644 --- a/README.rst +++ b/README.rst @@ -2,8 +2,11 @@ Beaver ====== -.. image:: https://travis-ci.org/josegonzalez/python-beaver.svg?branch=master - :target: https://travis-ci.org/josegonzalez/python-beaver +.. image:: https://travis-ci.org/python-beaver/python-beaver.svg?branch=master + :target: https://travis-ci.org/python-beaver/python-beaver + +.. image:: https://coveralls.io/repos/python-beaver/python-beaver/badge.svg?branch=master&service=github + :target: https://coveralls.io/github/python-beaver/python-beaver?branch=master python daemon that munches on logs and sends their contents to logstash @@ -20,16 +23,16 @@ Using PIP: From Github:: - pip install git+git://github.com/josegonzalez/python-beaver.git@34.1.0#egg=beaver + pip install git+git://github.com/python-beaver/python-beaver.git@36.3.1#egg=beaver From PyPI:: - pip install beaver==34.1.0 + pip install beaver==36.3.1 Documentation ============= -Full documentation is available online at http://beaver.readthedocs.org/ +Full documentation is available online at http://python-beaver.readthedocs.org/ You can also build the docs locally:: @@ -37,7 +40,7 @@ You can also build the docs locally:: pip install sphinx # retrieve the repository - git clone git://github.com/josegonzalez/beaver.git + git clone git://github.com/python-beaver/beaver.git # build the html output cd beaver/docs @@ -45,6 +48,12 @@ You can also build the docs locally:: HTML docs will be available in `beaver/docs/_build/html`. +Contributing +============ + +When contributing to Beaver, please review the full guidelines here: https://github.com/python-beaver/python-beaver/blob/master/CONTRIBUTING.md. +If you would like, you can open an issue to let others know about your work in progress. Documentation must be included and tests must pass on Python 2.6 and 2.7 for pull requests to be accepted. + Credits ======= diff --git a/beaver/__init__.py b/beaver/__init__.py index 2d1b336a..f42ae5bf 100644 --- a/beaver/__init__.py +++ b/beaver/__init__.py @@ -1,2 +1,2 @@ # -*- coding: utf-8 -*- -__version__ = '34.1.0' +__version__ = '36.3.1' diff --git a/beaver/config.py b/beaver/config.py index ed75cbe2..14092c53 100644 --- a/beaver/config.py +++ b/beaver/config.py @@ -63,7 +63,7 @@ def __init__(self, args, logger=None): 'kafka_batch_n': os.environ.get('KAFKA_BATCH_N', 10), 'kafka_batch_t': os.environ.get('KAFKA_BATCH_T', 10), 'kafka_round_robin': os.environ.get('KAFKA_ROUND_ROBIN', False), - 'mqtt_clientid': 'mosquitto', + 'mqtt_clientid': 'paho', 'mqtt_host': 'localhost', 'mqtt_port': '1883', 'mqtt_topic': '/logstash', @@ -82,8 +82,10 @@ def __init__(self, args, logger=None): 'rabbitmq_exchange_durable': os.environ.get('RABBITMQ_EXCHANGE_DURABLE', '0'), 'rabbitmq_queue_durable': os.environ.get('RABBITMQ_QUEUE_DURABLE', '0'), 'rabbitmq_ha_queue': os.environ.get('RABBITMQ_HA_QUEUE', '0'), + 'rabbitmq_arguments': os.environ.get('RABBITMQ_ARGUMENTS', {}), 'rabbitmq_key': os.environ.get('RABBITMQ_KEY', 'logstash-key'), 'rabbitmq_exchange': os.environ.get('RABBITMQ_EXCHANGE', 'logstash-exchange'), + 'rabbitmq_timeout': '1', 'rabbitmq_delivery_mode': 1, 'redis_url': os.environ.get('REDIS_URL', 'redis://localhost:6379/0'), 'redis_namespace': os.environ.get('REDIS_NAMESPACE', 'logstash:beaver'), @@ -96,9 +98,11 @@ def __init__(self, args, logger=None): 'sns_aws_topic_arn': '', 'sqs_aws_access_key': '', 'sqs_aws_secret_key': '', + 'sqs_aws_profile_name': '', 'sqs_aws_region': 'us-east-1', 'sqs_aws_queue': '', 'sqs_aws_queue_owner_acct_id': '', + 'sqs_bulk_lines': False, 'kinesis_aws_access_key': '', 'kinesis_aws_secret_key': '', 'kinesis_aws_region': 'us-east-1', @@ -179,6 +183,9 @@ def __init__(self, args, logger=None): 'debug': '0', 'daemonize': '0', 'pid': '', + + # Ignore files older then n days, use 0 to disable + 'ignore_old_files': 0 } self._configfile = args.config @@ -242,6 +249,7 @@ def use_ssh_tunnel(self): def _check_for_deprecated_usage(self): env_vars = [ + 'RABBITMQ_ARGUMENTS' 'RABBITMQ_HOST', 'RABBITMQ_PORT', 'RABBITMQ_VHOST', @@ -311,6 +319,7 @@ def _main_parser(config): 'max_queue_size', 'queue_timeout', 'rabbitmq_port', + 'rabbitmq_timeout', 'rabbitmq_delivery_mode', 'respawn_delay', 'subprocess_poll_sleep', @@ -323,7 +332,8 @@ def _main_parser(config): 'kafka_batch_n', 'kafka_batch_t', 'kafka_ack_timeout', - 'number_of_consumer_processes' + 'number_of_consumer_processes', + 'ignore_old_files' ] for key in require_int: if config[key] is not None: diff --git a/beaver/dispatcher/worker.py b/beaver/dispatcher/worker.py deleted file mode 100644 index ed7b6848..00000000 --- a/beaver/dispatcher/worker.py +++ /dev/null @@ -1,95 +0,0 @@ -# -*- coding: utf-8 -*- -import multiprocessing -import Queue -import signal -import os -import time - -from beaver.config import BeaverConfig -from beaver.run_queue import run_queue -from beaver.ssh_tunnel import create_ssh_tunnel -from beaver.utils import setup_custom_logger, REOPEN_FILES -from beaver.worker.worker import Worker - -def run(args=None): - - logger = setup_custom_logger('beaver', args) - beaver_config = BeaverConfig(args, logger=logger) - # so the config file can override the logger - logger = setup_custom_logger('beaver', args, config=beaver_config) - - if beaver_config.get('logstash_version') not in [0, 1]: - raise LookupError("Invalid logstash_version. Set it to 0 or 1 in your config.") - - queue = multiprocessing.Queue(beaver_config.get('max_queue_size')) - - worker_proc = None - ssh_tunnel = create_ssh_tunnel(beaver_config, logger=logger) - - def cleanup(signalnum, frame): - if signalnum is not None: - sig_name = tuple((v) for v, k in signal.__dict__.iteritems() if k == signalnum)[0] - logger.info('{0} detected'.format(sig_name)) - logger.info('Shutting down. Please wait...') - else: - logger.info('Worker process cleanup in progress...') - - try: - queue.put_nowait(('exit', ())) - except Queue.Full: - pass - - if worker_proc is not None: - try: - worker_proc.terminate() - worker_proc.join() - except RuntimeError: - pass - - if ssh_tunnel is not None: - logger.info('Closing ssh tunnel...') - ssh_tunnel.close() - - if signalnum is not None: - logger.info('Shutdown complete.') - return os._exit(signalnum) - - signal.signal(signal.SIGTERM, cleanup) - signal.signal(signal.SIGINT, cleanup) - signal.signal(signal.SIGQUIT, cleanup) - - def create_queue_consumer(): - process_args = (queue, beaver_config, logger) - proc = multiprocessing.Process(target=run_queue, args=process_args) - - logger.info('Starting queue consumer') - proc.start() - return proc - - def create_queue_producer(): - worker = Worker(beaver_config, queue_consumer_function=create_queue_consumer, callback=queue.put, logger=logger) - worker.loop() - - while 1: - - try: - if REOPEN_FILES: - logger.debug('Detected non-linux platform. Files will be reopened for tailing') - - t = time.time() - while True: - if worker_proc is None or not worker_proc.is_alive(): - logger.info('Starting worker...') - t = time.time() - worker_proc = multiprocessing.Process(target=create_queue_producer) - worker_proc.start() - logger.info('Working...') - worker_proc.join(10) - - if beaver_config.get('refresh_worker_process'): - if beaver_config.get('refresh_worker_process') < time.time() - t: - logger.info('Worker has exceeded refresh limit. Terminating process...') - cleanup(None, None) - - except KeyboardInterrupt: - pass diff --git a/beaver/run_queue.py b/beaver/run_queue.py index 479d03f2..a32abd26 100644 --- a/beaver/run_queue.py +++ b/beaver/run_queue.py @@ -30,10 +30,6 @@ def run_queue(queue, beaver_config, logger=None): logger.info('Transport connection issues, stopping queue') break - if int(time.time()) - last_update_time > queue_timeout: - logger.info('Queue timeout of "{0}" seconds exceeded, stopping queue'.format(queue_timeout)) - break - command = None try: if queue.full(): @@ -54,6 +50,10 @@ def run_queue(queue, beaver_config, logger=None): else: logger.debug('No data') + if int(time.time()) - last_update_time > queue_timeout: + logger.info('Queue timeout of "{0}" seconds exceeded, stopping queue'.format(queue_timeout)) + break + if command == 'callback': if data.get('ignore_empty', False): logger.debug('removing empty lines') @@ -76,13 +76,14 @@ def run_queue(queue, beaver_config, logger=None): count += 1 logger.debug("Number of transports: " + str(count)) break - except TransportException: + except TransportException as e: failure_count = failure_count + 1 if failure_count > beaver_config.get('max_failure'): failure_count = beaver_config.get('max_failure') sleep_time = beaver_config.get('respawn_delay') ** failure_count - logger.info('Caught transport exception, reconnecting in %d seconds' % sleep_time) + logger.info('Caught transport exception: %s', e) + logger.info('Reconnecting in %d seconds' % sleep_time) try: transport.invalidate() diff --git a/beaver/tests/test_glob_sections.py b/beaver/tests/test_glob_sections.py index c7fa9ad2..d5a98457 100644 --- a/beaver/tests/test_glob_sections.py +++ b/beaver/tests/test_glob_sections.py @@ -1,5 +1,10 @@ # -*- coding: utf-8 -*- -import unittest +import sys +if sys.version_info < (2, 7): + import unittest2 as unittest +else: + import unittest + import os import glob from beaver.config import BeaverConfig diff --git a/beaver/tests/test_kafka_transport.py b/beaver/tests/test_kafka_transport.py index 7eed8bfe..c9cf6eb7 100644 --- a/beaver/tests/test_kafka_transport.py +++ b/beaver/tests/test_kafka_transport.py @@ -1,6 +1,11 @@ # -*- coding: utf-8 -*- +import sys +if sys.version_info < (2, 7): + import unittest2 as unittest +else: + import unittest + import mock -import unittest import tempfile import logging diff --git a/beaver/tests/test_kinesis_transport.py b/beaver/tests/test_kinesis_transport.py new file mode 100644 index 00000000..dbea972e --- /dev/null +++ b/beaver/tests/test_kinesis_transport.py @@ -0,0 +1,155 @@ +# -*- coding: utf-8 -*- +import sys +if sys.version_info < (2, 7): + import unittest2 as unittest +else: + import unittest + +import mock +import tempfile +import logging + +import beaver +from beaver.config import BeaverConfig +from beaver.transports import create_transport +from beaver.unicode_dammit import unicode_dammit + +from fixtures import Fixture + +from moto import mock_kinesis +import boto.kinesis + +class KinesisTests(unittest.TestCase): + + @mock_kinesis + def _create_streams(self): + conn = boto.kinesis.connect_to_region("us-east-1") + conn.create_stream("stream1", 1) + conn.create_stream("stream2", 1) + + @classmethod + def setUpClass(cls): + cls.logger = logging.getLogger(__name__) + + empty_conf = tempfile.NamedTemporaryFile(delete=True) + cls.beaver_config = BeaverConfig(mock.Mock(config=empty_conf.name)) + cls.beaver_config.set('transport', 'kinesis') + cls.beaver_config.set('logstash_version', 1) + + output_file = Fixture.download_official_distribution() + Fixture.extract_distribution(output_file) + + @mock_kinesis + def test_kinesis_default_auth_profile(self): + self._create_streams() + self.beaver_config.set('kinesis_aws_profile_name', None) + self.beaver_config.set('kinesis_aws_access_key', None) + self.beaver_config.set('kinesis_aws_secret_key', None) + self.beaver_config.set('kinesis_aws_stream', 'stream1') + + transport = create_transport(self.beaver_config, logger=self.logger) + + self.assertIsInstance(transport, beaver.transports.kinesis_transport.KinesisTransport) + transport.interrupt() + + @mock_kinesis + def test_kinesis_auth_profile(self): + self._create_streams() + self.beaver_config.set('kinesis_aws_profile_name', 'beaver_stream') + self.beaver_config.set('kinesis_aws_access_key', None) + self.beaver_config.set('kinesis_aws_secret_key', None) + self.beaver_config.set('kinesis_aws_stream', 'stream1') + + transport = create_transport(self.beaver_config, logger=self.logger) + + self.assertIsInstance(transport, beaver.transports.kinesis_transport.KinesisTransport) + + @mock_kinesis + def test_kinesis_auth_key(self): + self._create_streams() + self.beaver_config.set('kinesis_aws_profile_name', None) + self.beaver_config.set('kinesis_aws_access_key', 'beaver_test_key') + self.beaver_config.set('kinesis_aws_secret_key', 'beaver_test_secret') + self.beaver_config.set('kinesis_aws_stream', 'stream1') + + transport = create_transport(self.beaver_config, logger=self.logger) + + self.assertIsInstance(transport, beaver.transports.kinesis_transport.KinesisTransport) + transport.interrupt() + + @mock_kinesis + def test_kinesis_auth_account_id(self): + self._create_streams() + self.beaver_config.set('kinesis_aws_stream_owner_acct_id', 'abc123') + self.beaver_config.set('kinesis_aws_profile_name', None) + self.beaver_config.set('kinesis_aws_access_key', 'beaver_test_key') + self.beaver_config.set('kinesis_aws_secret_key', 'beaver_test_secret') + self.beaver_config.set('kinesis_aws_stream', 'stream1') + + transport = create_transport(self.beaver_config, logger=self.logger) + + self.assertIsInstance(transport, beaver.transports.kinesis_transport.KinesisTransport) + transport.interrupt() + + @mock_kinesis + def test_kinesis_send_stream(self): + self._create_streams() + self.beaver_config.set('kinesis_aws_stream', 'stream1') + self.beaver_config.set('kinesis_aws_profile_name', None) + self.beaver_config.set('kinesis_aws_access_key', None) + self.beaver_config.set('kinesis_aws_secret_key', None) + self.beaver_config.set('kinesis_bulk_lines', False) + + transport = create_transport(self.beaver_config, logger=self.logger) + mock_send_batch = mock.Mock() + transport._send_message_batch = mock_send_batch + + self.assertIsInstance(transport, beaver.transports.kinesis_transport.KinesisTransport) + + data = {} + lines = [] + n=500 + for i in range(n): + lines.append('log' + str(i) + '\n') + new_lines = [] + for line in lines: + message = unicode_dammit(line) + if len(message) == 0: + continue + new_lines.append(message) + data['lines'] = new_lines + data['fields'] = [] + self.assertTrue(transport.callback("test.log", **data)) + self.assertEqual(1, mock_send_batch.call_count) + + + @mock_kinesis + def test_kinesis_send_stream_with_record_count_cutoff(self): + self._create_streams() + self.beaver_config.set('kinesis_aws_stream', 'stream1') + self.beaver_config.set('kinesis_aws_profile_name', None) + self.beaver_config.set('kinesis_aws_access_key', None) + self.beaver_config.set('kinesis_aws_secret_key', None) + self.beaver_config.set('kinesis_bulk_lines', False) + + transport = create_transport(self.beaver_config, logger=self.logger) + mock_send_batch = mock.Mock() + transport._send_message_batch = mock_send_batch + + self.assertIsInstance(transport, beaver.transports.kinesis_transport.KinesisTransport) + + data = {} + lines = [] + n = 501 + for i in range(n): + lines.append('log' + str(i) + '\n') + new_lines = [] + for line in lines: + message = unicode_dammit(line) + if len(message) == 0: + continue + new_lines.append(message) + data['lines'] = new_lines + data['fields'] = [] + self.assertTrue(transport.callback("test.log", **data)) + self.assertEqual(2, mock_send_batch.call_count) diff --git a/beaver/tests/test_sqs_transport.py b/beaver/tests/test_sqs_transport.py new file mode 100644 index 00000000..6b9521c1 --- /dev/null +++ b/beaver/tests/test_sqs_transport.py @@ -0,0 +1,259 @@ +# -*- coding: utf-8 -*- +import sys +if sys.version_info < (2, 7): + import unittest2 as unittest +else: + import unittest + +import mock +import tempfile +import logging + +import beaver +from beaver.config import BeaverConfig +from beaver.transports import create_transport +from beaver.unicode_dammit import unicode_dammit + +from fixtures import Fixture + +from moto import mock_sqs +import boto.sqs + +class SqsTests(unittest.TestCase): + + @mock_sqs + def _create_queues(cls): + conn = boto.sqs.connect_to_region("us-east-1") + conn.create_queue("queue1") + conn.create_queue("queue2") + + @classmethod + def setUpClass(cls): + cls.logger = logging.getLogger(__name__) + + empty_conf = tempfile.NamedTemporaryFile(delete=True) + cls.beaver_config = BeaverConfig(mock.Mock(config=empty_conf.name)) + cls.beaver_config.set('transport', 'sqs') + cls.beaver_config.set('logstash_version', 1) + + output_file = Fixture.download_official_distribution() + Fixture.extract_distribution(output_file) + + @mock_sqs + def test_sqs_default_auth_profile(cls): + cls._create_queues() + cls.beaver_config.set('sqs_aws_profile_name', None) + cls.beaver_config.set('sqs_aws_access_key', None) + cls.beaver_config.set('sqs_aws_secret_key', None) + cls.beaver_config.set('sqs_aws_queue', 'queue1') + + transport = create_transport(cls.beaver_config, logger=cls.logger) + + cls.assertIsInstance(transport, beaver.transports.sqs_transport.SqsTransport) + transport.interrupt() + + @mock_sqs + def test_sqs_auth_profile(cls): + cls._create_queues() + cls.beaver_config.set('sqs_aws_profile_name', 'beaver_queue') + cls.beaver_config.set('sqs_aws_access_key', None) + cls.beaver_config.set('sqs_aws_secret_key', None) + cls.beaver_config.set('sqs_aws_queue', 'queue1') + + transport = create_transport(cls.beaver_config, logger=cls.logger) + + cls.assertIsInstance(transport, beaver.transports.sqs_transport.SqsTransport) + + @mock_sqs + def test_sqs_auth_key(cls): + cls._create_queues() + cls.beaver_config.set('sqs_aws_profile_name', None) + cls.beaver_config.set('sqs_aws_access_key', 'beaver_test_key') + cls.beaver_config.set('sqs_aws_secret_key', 'beaver_test_secret') + cls.beaver_config.set('sqs_aws_queue', 'queue1') + + transport = create_transport(cls.beaver_config, logger=cls.logger) + + cls.assertIsInstance(transport, beaver.transports.sqs_transport.SqsTransport) + transport.interrupt() + + @mock_sqs + def test_sqs_auth_account_id(cls): + cls._create_queues() + cls.beaver_config.set('sqs_aws_queue_owner_acct_id', 'abc123') + cls.beaver_config.set('sqs_aws_profile_name', None) + cls.beaver_config.set('sqs_aws_access_key', 'beaver_test_key') + cls.beaver_config.set('sqs_aws_secret_key', 'beaver_test_secret') + cls.beaver_config.set('sqs_aws_queue', 'queue1') + + transport = create_transport(cls.beaver_config, logger=cls.logger) + + cls.assertIsInstance(transport, beaver.transports.sqs_transport.SqsTransport) + transport.interrupt() + + @mock_sqs + def test_sqs_single_queue(cls): + cls._create_queues() + cls.beaver_config.set('sqs_aws_queue', 'queue1') + cls.beaver_config.set('sqs_aws_profile_name', None) + cls.beaver_config.set('sqs_aws_access_key', None) + cls.beaver_config.set('sqs_aws_secret_key', None) + + transport = create_transport(cls.beaver_config, logger=cls.logger) + + cls.assertIsInstance(transport, beaver.transports.sqs_transport.SqsTransport) + transport.interrupt() + + @mock_sqs + def test_sqs_single_queue_bulklines(cls): + cls._create_queues() + cls.beaver_config.set('sqs_aws_queue', 'queue1') + cls.beaver_config.set('sqs_aws_profile_name', None) + cls.beaver_config.set('sqs_aws_access_key', None) + cls.beaver_config.set('sqs_aws_secret_key', None) + cls.beaver_config.set('sqs_bulk_lines', True) + + transport = create_transport(cls.beaver_config, logger=cls.logger) + + cls.assertIsInstance(transport, beaver.transports.sqs_transport.SqsTransport) + transport.interrupt() + + @mock_sqs + def test_sqs_multi_queue(cls): + cls._create_queues() + cls.beaver_config.set('sqs_aws_queue', 'queue1,queue2') + cls.beaver_config.set('sqs_aws_profile_name', None) + cls.beaver_config.set('sqs_aws_access_key', None) + cls.beaver_config.set('sqs_aws_secret_key', None) + cls.beaver_config.set('sqs_bulk_lines', False) + + transport = create_transport(cls.beaver_config, logger=cls.logger) + + cls.assertIsInstance(transport, beaver.transports.sqs_transport.SqsTransport) + transport.interrupt() + + @mock_sqs + def test_sqs_multi_queue_bulklines(cls): + cls._create_queues() + cls.beaver_config.set('sqs_aws_queue', 'queue1,queue2') + cls.beaver_config.set('sqs_aws_profile_name', None) + cls.beaver_config.set('sqs_aws_access_key', None) + cls.beaver_config.set('sqs_aws_secret_key', None) + cls.beaver_config.set('sqs_bulk_lines', True) + + transport = create_transport(cls.beaver_config, logger=cls.logger) + + cls.assertIsInstance(transport, beaver.transports.sqs_transport.SqsTransport) + transport.interrupt() + + @mock_sqs + def test_sqs_send_single_queue(cls): + cls._create_queues() + cls.beaver_config.set('sqs_aws_queue', 'queue1') + cls.beaver_config.set('sqs_aws_profile_name', None) + cls.beaver_config.set('sqs_aws_access_key', None) + cls.beaver_config.set('sqs_aws_secret_key', None) + cls.beaver_config.set('sqs_bulk_lines', False) + + transport = create_transport(cls.beaver_config, logger=cls.logger) + + cls.assertIsInstance(transport, beaver.transports.sqs_transport.SqsTransport) + + data = {} + lines = [] + n=100 + for i in range(n): + lines.append('log' + str(i) + '\n') + new_lines = [] + for line in lines: + message = unicode_dammit(line) + if len(message) == 0: + continue + new_lines.append(message) + data['lines'] = new_lines + data['fields'] = [] + transport.callback("test.log", **data) + + @mock_sqs + def test_sqs_send_multi_queue(cls): + cls._create_queues() + cls.beaver_config.set('sqs_aws_queue', 'queue1,queue2') + cls.beaver_config.set('sqs_aws_profile_name', None) + cls.beaver_config.set('sqs_aws_access_key', None) + cls.beaver_config.set('sqs_aws_secret_key', None) + cls.beaver_config.set('sqs_bulk_lines', False) + + transport = create_transport(cls.beaver_config, logger=cls.logger) + + cls.assertIsInstance(transport, beaver.transports.sqs_transport.SqsTransport) + + data = {} + lines = [] + n=100 + for i in range(n): + lines.append('log' + str(i) + '\n') + new_lines = [] + for line in lines: + message = unicode_dammit(line) + if len(message) == 0: + continue + new_lines.append(message) + data['lines'] = new_lines + data['fields'] = [] + transport.callback("test.log", **data) + + @mock_sqs + def test_sqs_send_single_queue_bulklines(cls): + cls._create_queues() + cls.beaver_config.set('sqs_aws_queue', 'queue1') + cls.beaver_config.set('sqs_aws_profile_name', None) + cls.beaver_config.set('sqs_aws_access_key', None) + cls.beaver_config.set('sqs_aws_secret_key', None) + cls.beaver_config.set('sqs_bulk_lines', True) + + transport = create_transport(cls.beaver_config, logger=cls.logger) + + cls.assertIsInstance(transport, beaver.transports.sqs_transport.SqsTransport) + + data = {} + lines = [] + n=100 + for i in range(n): + lines.append('log' + str(i) + '\n') + new_lines = [] + for line in lines: + message = unicode_dammit(line) + if len(message) == 0: + continue + new_lines.append(message) + data['lines'] = new_lines + data['fields'] = [] + transport.callback("test.log", **data) + + @mock_sqs + def test_sqs_send_multi_queue_bulklines(cls): + cls._create_queues() + cls.beaver_config.set('sqs_aws_queue', 'queue1,queue2') + cls.beaver_config.set('sqs_aws_profile_name', None) + cls.beaver_config.set('sqs_aws_access_key', None) + cls.beaver_config.set('sqs_aws_secret_key', None) + cls.beaver_config.set('sqs_bulk_lines', True) + + transport = create_transport(cls.beaver_config, logger=cls.logger) + + cls.assertIsInstance(transport, beaver.transports.sqs_transport.SqsTransport) + + data = {} + lines = [] + n=100 + for i in range(n): + lines.append('log' + str(i) + '\n') + new_lines = [] + for line in lines: + message = unicode_dammit(line) + if len(message) == 0: + continue + new_lines.append(message) + data['lines'] = new_lines + data['fields'] = [] + transport.callback("test.log", **data) diff --git a/beaver/tests/test_transport_config.py b/beaver/tests/test_transport_config.py index 99461292..da58c1a2 100644 --- a/beaver/tests/test_transport_config.py +++ b/beaver/tests/test_transport_config.py @@ -1,9 +1,14 @@ # -*- coding: utf-8 -*- +import sys +if sys.version_info < (2, 7): + import unittest2 as unittest +else: + import unittest + import fakeredis import logging import mock import tempfile -import unittest import beaver from beaver.config import BeaverConfig @@ -19,11 +24,12 @@ else: raise + class DummyTransport(BaseTransport): pass -with mock.patch('pika.adapters.SelectConnection', autospec=True) as mock_pika: +with mock.patch('pika.adapters.SelectConnection') as mock_pika: class TransportConfigTests(unittest.TestCase): def setUp(self): @@ -38,6 +44,7 @@ def test_builtin_rabbitmq(self): beaver_config = self._get_config(transport='rabbitmq') transport = create_transport(beaver_config, logger=self.logger) self.assertIsInstance(transport, beaver.transports.rabbitmq_transport.RabbitmqTransport) + transport.interrupt() @mock.patch('redis.StrictRedis', fakeredis.FakeStrictRedis) def test_builtin_redis(self): diff --git a/beaver/tests/test_zmq_transport.py b/beaver/tests/test_zmq_transport.py index 62348907..fb567ec8 100644 --- a/beaver/tests/test_zmq_transport.py +++ b/beaver/tests/test_zmq_transport.py @@ -1,6 +1,11 @@ # -*- coding: utf-8 -*- +import sys +if sys.version_info < (2, 7): + import unittest2 as unittest +else: + import unittest + import mock -import unittest import tempfile from beaver.config import BeaverConfig diff --git a/beaver/transports/base_transport.py b/beaver/transports/base_transport.py index 24cf69a7..cecba2bd 100644 --- a/beaver/transports/base_transport.py +++ b/beaver/transports/base_transport.py @@ -62,8 +62,8 @@ def rawjson_formatter(data): except ValueError: self._logger.warning("cannot parse as rawjson: {0}".format(self._fields.get('message'))) json_data = json.loads("{}") - - del data[self._fields.get('message')] + if json_data: + del data[self._fields.get('message')] for field in json_data: data[field] = json_data[field] @@ -116,6 +116,7 @@ def callback(self, filename, lines): def format(self, filename, line, timestamp, **kwargs): """Returns a formatted log line""" + line = unicode(line.encode("utf-8"), "utf-8", errors="ignore") formatter = self._beaver_config.get_field('format', filename) if formatter not in self._formatters: formatter = self._default_formatter diff --git a/beaver/transports/kafka_transport.py b/beaver/transports/kafka_transport.py index 68d0fa99..cfac47fa 100644 --- a/beaver/transports/kafka_transport.py +++ b/beaver/transports/kafka_transport.py @@ -19,6 +19,7 @@ def __init__(self, beaver_config, logger=None): self._kafka_config[key] = beaver_config.get('kafka_' + key) try: + self._connect() self._client = KafkaClient(self._kafka_config['hosts'], self._kafka_config['client_id']) self._client.ensure_topic_exists(self._kafka_config['topic']) self._key = self._kafka_config['key'] @@ -48,7 +49,6 @@ def __init__(self, beaver_config, logger=None): except Exception, e: raise TransportException(e.message) - def callback(self, filename, lines, **kwargs): """publishes lines one by one to the given topic""" timestamp = self.get_timestamp(**kwargs) @@ -78,6 +78,16 @@ def callback(self, filename, lines, **kwargs): except AttributeError: raise TransportException('Unspecified exception encountered') # TRAP ALL THE THINGS! + def _connect(self): + try: + self._client = KafkaClient(self._kafka_config['hosts'], self._kafka_config['client_id']) + self._client.ensure_topic_exists(self._kafka_config['topic']) + except Exception, e: + raise TransportException(e.message) + + def reconnect(self): + self._connect() + def interrupt(self): if self._prod: self._prod.stop() diff --git a/beaver/transports/kinesis_transport.py b/beaver/transports/kinesis_transport.py index 46aff6ba..e790a40c 100644 --- a/beaver/transports/kinesis_transport.py +++ b/beaver/transports/kinesis_transport.py @@ -20,6 +20,9 @@ def __init__(self, beaver_config, logger=None): # self-imposed max batch size to minimize the number of records in a given call to Kinesis self._batch_size_max = beaver_config.get('kinesis_aws_batch_size_max', '512000') + # Kinesis Limit http://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html#API_PutRecords_RequestSyntax + self._max_records_per_batch = 500 + try: if self._access_key is None and self._secret_key is None: self._connection = boto.kinesis.connect_to_region(self._region) @@ -55,7 +58,7 @@ def callback(self, filename, lines, **kwargs): continue # Check the self-enforced/declared batch size and flush before moving forward if we've eclipsed the max - if (len(message_batch) > 0) and ((message_batch_size + message_size) >= self._batch_size_max): + if len(message_batch) > 0 and ((message_batch_size + message_size) >= self._batch_size_max or len(message_batch) == self._max_records_per_batch): self._logger.debug('Flushing {0} messages to Kinesis stream {1} bytes'.format( len(message_batch), message_batch_size)) self._send_message_batch(message_batch) diff --git a/beaver/transports/mqtt_transport.py b/beaver/transports/mqtt_transport.py index d1d2b97d..484114c1 100644 --- a/beaver/transports/mqtt_transport.py +++ b/beaver/transports/mqtt_transport.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -from mosquitto import Mosquitto +import paho.mqtt.client as paho from beaver.transports.base_transport import BaseTransport from beaver.transports.exception import TransportException @@ -9,12 +9,12 @@ class MqttTransport(BaseTransport): def __init__(self, beaver_config, logger=None): """ - Mosquitto client initilization. Once this this transport is initialized + Paho client initialization. Once this this transport is initialized it has invoked a connection to the server """ super(MqttTransport, self).__init__(beaver_config, logger=logger) - self._client = Mosquitto(beaver_config.get('mqtt_clientid'), clean_session=True) + self._client = paho.Client(beaver_config.get('mqtt_clientid'), clean_session=True) self._topic = beaver_config.get('mqtt_topic') self._client.connect( host=beaver_config.get('mqtt_host'), @@ -24,9 +24,9 @@ def __init__(self, beaver_config, logger=None): def on_disconnect(mosq, obj, rc): if rc == 0: - logger.debug('Mosquitto has successfully disconnected') + logger.debug('Paho has successfully disconnected') else: - logger.debug('Mosquitto unexpectedly disconnected') + logger.debug('Paho unexpectedly disconnected') self._client.on_disconnect = on_disconnect diff --git a/beaver/transports/rabbitmq_transport.py b/beaver/transports/rabbitmq_transport.py index 20ff4af6..8992fb81 100644 --- a/beaver/transports/rabbitmq_transport.py +++ b/beaver/transports/rabbitmq_transport.py @@ -18,111 +18,134 @@ def __init__(self, beaver_config, logger=None): config_to_store = [ 'key', 'exchange', 'username', 'password', 'host', 'port', 'vhost', 'queue', 'queue_durable', 'ha_queue', 'exchange_type', 'exchange_durable', - 'ssl', 'ssl_key', 'ssl_cert', 'ssl_cacert', 'delivery_mode' + 'ssl', 'ssl_key', 'ssl_cert', 'ssl_cacert', 'timeout', 'delivery_mode', 'arguments' ] for key in config_to_store: self._rabbitmq_config[key] = beaver_config.get('rabbitmq_' + key) + if self._rabbitmq_config['arguments']: + self._rabbitmq_config['arguments'] = self.get_rabbitmq_args() + if self._rabbitmq_config['ha_queue']: + self._rabbitmq_config['arguments']['x-ha-policy'] = 'all' + + self._connection = None self._channel = None self._count = 0 self._lines = Queue() + self._connection_ok = False + self._thread = None + self._is_valid = True self._connect() + def get_rabbitmq_args(self): + res = {} + args = self._rabbitmq_config['arguments'].split(',') + for x in args: + k, v = x.split(':') + try: + # convert str to int if not a str + v = int(v) + except ValueError: + pass # is a str, not an int + res[k] = v + return res + + def _on_connection_open(self,connection): - self._logger.debug("connection created") + self._logger.debug("RabbitMQ: Connection Created") self._channel = connection.channel(self._on_channel_open) def _on_channel_open(self,unused): - self._logger.debug("Channel Created") + self._logger.debug("RabbitMQ: Channel Created") self._channel.exchange_declare(self._on_exchange_declareok, exchange=self._rabbitmq_config['exchange'], exchange_type=self._rabbitmq_config['exchange_type'], durable=self._rabbitmq_config['exchange_durable']) def _on_exchange_declareok(self,unused): - self._logger.debug("Exchange Declared") + self._logger.debug("RabbitMQ: Exchange Declared") self._channel.queue_declare(self._on_queue_declareok, queue=self._rabbitmq_config['queue'], durable=self._rabbitmq_config['queue_durable'], - arguments={'x-ha-policy': 'all'} if self._rabbitmq_config['ha_queue'] else {}) + arguments=self._rabbitmq_config['arguments']) def _on_queue_declareok(self,unused): - self._logger.debug("Queue Declared") + self._logger.debug("RabbitMQ: Queue Declared") self._channel.queue_bind(self._on_bindok, exchange=self._rabbitmq_config['exchange'], queue=self._rabbitmq_config['queue'], routing_key=self._rabbitmq_config['key']) def _on_bindok(self,unused): - self._logger.debug("Exchange to Queue Bind OK") - self._is_valid = True; - self._logger.debug("Scheduling next message for %0.1f seconds",1) - self._connection.add_timeout(1,self._publish_message) - + self._logger.info("RabbitMQ: Connection OK.") + self._connection_ok = True + self._logger.debug("RabbitMQ: Scheduling regular message transport.") + self._connection.add_timeout(1, self._publish_message) def _publish_message(self): - while True: - self._count += 0 - if self._lines.not_empty: - line = self._lines.get() - if self._count == 10000: - self._logger.debug("RabbitMQ transport queue size: %s" % (self._lines.qsize(), )) - self._count = 0 - else: - self._count += 1 - self._channel.basic_publish( - exchange=self._rabbitmq_config['exchange'], - routing_key=self._rabbitmq_config['key'], - body=line, - properties=pika.BasicProperties( - content_type='text/json', - delivery_mode=self._rabbitmq_config['delivery_mode'] - )) + self._logger.debug("RabbitMQ: Looking for messages to transport...") + while self._connection_ok and not self._lines.empty(): + line = self._lines.get() + if self._count == 10000: + self._logger.debug("RabbitMQ: Transport queue size: %s", self._lines.qsize()) + self._count = 0 else: - self._logger.debug("RabbitMQ transport queue is empty, sleeping for 1 second.") - time.sleep(1) - - + self._count += 1 + self._channel.basic_publish( + exchange=self._rabbitmq_config['exchange'], + routing_key=self._rabbitmq_config['key'], + body=line, + properties=pika.BasicProperties( + content_type='text/json', + delivery_mode=self._rabbitmq_config['delivery_mode'] + )) + if self._connection_ok: + self._logger.debug("RabbitMQ: No messages to transport. Sleeping.") + self._connection.add_timeout(1, self._publish_message) + else: + self._logger.info('RabbitMQ: Message publisher stopped.') - def _on_connection_open_error(self,non_used_connection=None,error=None): - self._logger.debug("connection open error") - if not error==None: - self._logger.error(error) + def _on_connection_open_error(self, non_used_connection=None, error=None): + self._connection_ok = False + self._logger.error('RabbitMQ: Could not open connection: %s', error) def _on_connection_closed(self, connection, reply_code, reply_text): - self._channel = None - if self._connection._closing: - try: - self._connection.ioloop.stop() - except: - pass - else: - self._logger.warning('RabbitMQ Connection closed, reopening in 1 seconds: (%s) %s', - reply_code, reply_text) - self._connection.add_timeout(1, self.reconnect) + self._connection_ok = False + self._logger.warning('RabbitMQ: Connection closed: %s %s', reply_code, reply_text) def reconnect(self): - try: - self._connection.ioloop.stop() - except: - pass - self._connection_start() + self._logger.debug("RabbitMQ: Reconnecting...") + self.interrupt() + + self._thread = Thread(target=self._connection_start) + self._thread.start() + while self._thread.is_alive() and not self._connection_ok: + time.sleep(1) + if self._connection_ok: + self._is_valid = True + self._logger.info('RabbitMQ: Reconnect successful.') + else: + self._logger.warning('RabbitMQ: Reconnect failed!') + self.interrupt() def _connection_start(self): - self._logger.debug("Creating Connection") + self._logger.debug("RabbitMQ: Connecting...") try: - self._connection = pika.adapters.SelectConnection(parameters=self._parameters,on_open_callback=self._on_connection_open,on_open_error_callback=self._on_connection_open_error,on_close_callback=self._on_connection_closed,stop_ioloop_on_close=False) - except Exception,e: - self._logger.error("Failed Creating RabbitMQ connection") - self._logger.error(e) - self._logger.debug("Starting ioloop") - self._connection.ioloop.start() + self._connection_ok = False + self._connection = pika.adapters.SelectConnection( + parameters=self._parameters, + on_open_callback=self._on_connection_open, + on_open_error_callback=self._on_connection_open_error, + on_close_callback=self._on_connection_closed + ) + if not self._connection.is_closed: + self._connection.ioloop.start() + except Exception as e: + self._logger.error('RabbitMQ: Failed to connect: %s', e) def _connect(self): - - # Setup RabbitMQ connection credentials = pika.PlainCredentials( self._rabbitmq_config['username'], self._rabbitmq_config['password'] @@ -139,11 +162,15 @@ def _connect(self): port=self._rabbitmq_config['port'], ssl=self._rabbitmq_config['ssl'], ssl_options=ssl_options, - virtual_host=self._rabbitmq_config['vhost'] + virtual_host=self._rabbitmq_config['vhost'], + socket_timeout=self._rabbitmq_config['timeout'] ) - Thread(target=self._connection_start).start() + self._thread = Thread(target=self._connection_start) + self._thread.start() def callback(self, filename, lines, **kwargs): + if not self._connection_ok: + raise TransportException('RabbitMQ: Not connected or connection not OK') timestamp = self.get_timestamp(**kwargs) if kwargs.get('timestamp', False): del kwargs['timestamp'] @@ -155,18 +182,21 @@ def callback(self, filename, lines, **kwargs): body = self.format(filename, line, timestamp, **kwargs) self._lines.put(body) except UserWarning: - self._is_valid = False raise TransportException('Connection appears to have been lost') except Exception as e: - self._is_valid = False try: raise TransportException(e.strerror) except AttributeError: - raise TransportException('Unspecified exception encountered') # TRAP ALL THE THINGS! + raise TransportException('Unspecified exception encountered') def interrupt(self): + self._connection_ok = False if self._connection: self._connection.close() + self._connection = None + if self._thread: + self._thread.join() + self._thread = None def unhandled(self): return True diff --git a/beaver/transports/redis_transport.py b/beaver/transports/redis_transport.py index 47ae0f49..d9aa309f 100644 --- a/beaver/transports/redis_transport.py +++ b/beaver/transports/redis_transport.py @@ -77,10 +77,12 @@ def callback(self, filename, lines, **kwargs): if kwargs.get('timestamp', False): del kwargs['timestamp'] - namespace = self._beaver_config.get_field('redis_namespace', filename) - if not namespace: - namespace = self._namespace - self._logger.debug('Got namespace: ' + namespace) + namespaces = self._beaver_config.get_field('redis_namespace', filename) + if not namespaces: + namespaces = self._namespace + namespaces = namespaces.split(",") + + self._logger.debug('Got namespaces: '.join(namespaces)) data_type = self._data_type self._logger.debug('Got data type: ' + data_type) @@ -97,10 +99,11 @@ def callback(self, filename, lines, **kwargs): callback_method = callback_map[data_type] for line in lines: - callback_method( - namespace, - self.format(filename, line, timestamp, **kwargs) - ) + for namespace in namespaces: + callback_method( + namespace.strip(), + self.format(filename, line, timestamp, **kwargs) + ) try: pipeline.execute() diff --git a/beaver/transports/sqs_transport.py b/beaver/transports/sqs_transport.py index 77ad9d2c..986025de 100644 --- a/beaver/transports/sqs_transport.py +++ b/beaver/transports/sqs_transport.py @@ -2,10 +2,10 @@ import boto.sqs import uuid -from boto.sqs.message import Message +from boto.sqs.message import Message, RawMessage from beaver.transports.base_transport import BaseTransport from beaver.transports.exception import TransportException - +from sys import getsizeof class SqsTransport(BaseTransport): @@ -14,12 +14,17 @@ def __init__(self, beaver_config, logger=None): self._access_key = beaver_config.get('sqs_aws_access_key') self._secret_key = beaver_config.get('sqs_aws_secret_key') + self._profile = beaver_config.get('sqs_aws_profile_name') self._region = beaver_config.get('sqs_aws_region') - self._queue_name = beaver_config.get('sqs_aws_queue') self._queue_owner_acct_id = beaver_config.get('sqs_aws_queue_owner_acct_id') + self._queue = beaver_config.get('sqs_aws_queue').split(',') + self._bulk_lines = beaver_config.get('sqs_bulk_lines') try: - if self._access_key is None and self._secret_key is None: + if self._profile: + self._connection = boto.sqs.connect_to_region(self._region, + profile_name=self._profile) + elif self._access_key is None and self._secret_key is None: self._connection = boto.sqs.connect_to_region(self._region) else: self._connection = boto.sqs.connect_to_region(self._region, @@ -30,14 +35,19 @@ def __init__(self, beaver_config, logger=None): self._logger.warn('Unable to connect to AWS - check your AWS credentials') raise TransportException('Unable to connect to AWS - check your AWS credentials') - if self._queue_owner_acct_id is None: - self._queue = self._connection.get_queue(self._queue_name) - else: - self._queue = self._connection.get_queue(self._queue_name, - owner_acct_id=self._queue_owner_acct_id) + self._queues = {} + for queue in self._queue: + self._logger.debug('Attempting to load SQS queue: {0}'.format(queue)) + if self._queue_owner_acct_id is None: + self._queues[queue] = self._connection.get_queue(queue) + else: + self._queues[queue] = self._connection.get_queue(queue, + owner_acct_id=self._queue_owner_acct_id) - if self._queue is None: - raise TransportException('Unable to access queue with name {0}'.format(self._queue_name)) + if self._queues[queue] is None: + raise TransportException('Unable to access queue with name {0}'.format(queue)) + + self._logger.debug('Successfully loaded SQS queue: {0}'.format(queue)) except Exception, e: raise TransportException(e.message) @@ -46,46 +56,89 @@ def callback(self, filename, lines, **kwargs): if kwargs.get('timestamp', False): del kwargs['timestamp'] - message_batch = [] + if self._bulk_lines: + message_batch = '' + message_count = 0 + else: + message_batch = [] + message_batch_size = 0 message_batch_size_max = 250000 # Max 256KiB but leave some headroom for line in lines: - m = Message() - m.set_body(self.format(filename, line, timestamp, **kwargs)) - message_size = len(m) + if self._bulk_lines: + m = self.format(filename, line, timestamp, **kwargs) + message_size = getsizeof(m) + else: + m = Message() + m.set_body(self.format(filename, line, timestamp, **kwargs)) + message_size = len(m) if (message_size > message_batch_size_max): self._logger.debug('Dropping the message as it is too large to send ({0} bytes)'.format(message_size)) continue - # SQS can only handle up to 10 messages in batch send and it can not exceed 256KiB (see above) # Check the new total size before adding a new message and don't try to send an empty batch - if (len(message_batch) > 0) and (((message_batch_size + message_size) >= message_batch_size_max) or (len(message_batch) == 10)): + if self._bulk_lines and (len(message_batch) > 0) and (((message_batch_size + message_size) >= message_batch_size_max)): + self._logger.debug('Flushing {0} messages to SQS queue {1} bytes'.format(message_count, message_batch_size)) + self._send_message(message_batch) + message_batch = '' + message_count = 0 + message_batch_size = 0 + + # SQS can only handle up to 10 messages in batch send and it can not exceed 256KiB (see above) + elif (len(message_batch) > 0) and (((message_batch_size + message_size) >= message_batch_size_max) or (len(message_batch) == 10)): self._logger.debug('Flushing {0} messages to SQS queue {1} bytes'.format(len(message_batch), message_batch_size)) self._send_message_batch(message_batch) message_batch = [] message_batch_size = 0 message_batch_size = message_batch_size + message_size - message_batch.append((uuid.uuid4(), self.format(filename, line, timestamp, **kwargs), 0)) + + if self._bulk_lines: + message_batch += '{0},'.format(m) + message_count += 1 + else: + message_batch.append((uuid.uuid4(), self.format(filename, line, timestamp, **kwargs), 0)) if len(message_batch) > 0: - self._logger.debug('Flushing the last {0} messages to SQS queue {1} bytes'.format(len(message_batch), message_batch_size)) - self._send_message_batch(message_batch) + if self._bulk_lines: + self._logger.debug('Flushing the last {0} messages to SQS queue {1} bytes'.format(message_count, message_batch_size)) + self._send_message(message_batch) + else: + self._logger.debug('Flushing the last {0} messages to SQS queue {1} bytes'.format(len(message_batch), message_batch_size)) + self._send_message_batch(message_batch) return True + def _send_message(self, msg): + for queue in self._queues: + try: + msg = '[{0}]'.format(msg.rstrip(',')) + m = RawMessage() + m.set_body(msg) + result = self._queues[queue].write(m) + if not result: + self._logger.error('Error occurred sending message to SQS queue {0}. result: {1}'.format( + self._queue_name, result)) + raise TransportException('Error occurred sending message to queue {0}'.format(self._queue_name)) + except Exception, e: + self._logger.exception('Exception occurred sending message to SQS queue') + raise TransportException(e.message) + def _send_message_batch(self, message_batch): - try: - result = self._queue.write_batch(message_batch) - if not result: - self._logger.error('Error occurred sending messages to SQS queue {0}. result: {1}'.format( - self._queue_name, result)) - raise TransportException('Error occurred sending message to queue {0}'.format(self._queue_name)) - except Exception, e: - self._logger.exception('Exception occurred sending batch to SQS queue') - raise TransportException(e.message) + for queue in self._queues: + try: + self._logger.debug('Attempting to push batch message to SQS queue: {0}'.format(queue)) + result = self._queues[queue].write_batch(message_batch) + if not result: + self._logger.error('Error occurred sending messages to SQS queue {0}. result: {1}'.format( + queue, result)) + raise TransportException('Error occurred sending message to queue {0}'.format(queue)) + self._logger.debug('Successfully pushed batch message to SQS queue: {0}'.format(queue)) + except Exception, e: + self._logger.exception('Exception occurred sending batch to SQS queue') + raise TransportException(e.message) def interrupt(self): return True diff --git a/beaver/transports/stomp_transport.py b/beaver/transports/stomp_transport.py index 9e570a67..9b39ccca 100644 --- a/beaver/transports/stomp_transport.py +++ b/beaver/transports/stomp_transport.py @@ -9,7 +9,7 @@ class StompTransport(BaseTransport): def __init__(self, beaver_config, logger=None): """ - Mosquitto client initilization. Once this this transport is initialized + Stomp client initialization. Once this this transport is initialized it has invoked a connection to the server """ super(StompTransport, self).__init__(beaver_config, logger=logger) diff --git a/beaver/utils.py b/beaver/utils.py index 4f06a22e..bf7fb20e 100644 --- a/beaver/utils.py +++ b/beaver/utils.py @@ -3,6 +3,7 @@ import glob2 import itertools import logging +from logging.handlers import RotatingFileHandler import platform import re import os @@ -45,14 +46,20 @@ def parse_args(): parser.add_argument('-p', '--path', help='path to log files', default=None, dest='path') parser.add_argument('-P', '--pid', help='path to pid file', default=None, dest='pid') parser.add_argument('-t', '--transport', help='log transport method', dest='transport', default=None, choices=['kafka', 'mqtt', 'rabbitmq', 'redis', 'sns', 'sqs', 'kinesis', 'stdout', 'tcp', 'udp', 'zmq', 'http']) - parser.add_argument('-e', '--experimental', help='use experimental version of beaver', dest='experimental', default=False, action='store_true') parser.add_argument('-v', '--version', help='output version and quit', dest='version', default=False, action='store_true') parser.add_argument('--fqdn', help='use the machine\'s FQDN for source_host', dest='fqdn', default=False, action='store_true') + parser.add_argument('--max-bytes', action='store', dest='max_bytes', type=int, default=64 * 1024 * 1024, help='Maximum bytes per a logfile.') + parser.add_argument('--backup-count', action='store', dest='backup_count', type=int, default=1, help='Maximum number of logfiles to backup.') - return parser.parse_args() + args = parser.parse_args() + if args.config != "/dev/null": + args.config = os.path.realpath(args.config) -def setup_custom_logger(name, args=None, output=None, formatter=None, debug=None, config=None): + return args + + +def setup_custom_logger(name, args=None, output=None, formatter=None, debug=None, config=None, max_bytes=None, backup_count=None): logger = logging.getLogger(name) logger.propagate = False if logger.handlers: @@ -66,7 +73,6 @@ def setup_custom_logger(name, args=None, output=None, formatter=None, debug=None if formatter is None: formatter = logging.Formatter('[%(asctime)s] %(levelname)-7s %(message)s') - handler = logging.StreamHandler() if output is None and has_args: if config and config.get('output'): output = config.get('output') @@ -77,15 +83,31 @@ def setup_custom_logger(name, args=None, output=None, formatter=None, debug=None output = os.path.realpath(output) if output is not None: - file_handler = logging.FileHandler(output) - if formatter is not False: - file_handler.setFormatter(formatter) - logger.addHandler(file_handler) + if has_args and backup_count is None: + backup_count = args.backup_count + + if has_args and max_bytes is None: + max_bytes = args.max_bytes + + if backup_count is not None and max_bytes is not None: + assert backup_count > 0 + assert max_bytes > 0 + ch = RotatingFileHandler(output, 'a', max_bytes, backup_count) + if formatter is not False: + ch.setFormatter(formatter) + logger.addHandler(ch) + else: + file_handler = logging.FileHandler(output) + if formatter is not False: + file_handler.setFormatter(formatter) + logger.addHandler(file_handler) + else: + handler = logging.StreamHandler() - if formatter is not False: - handler.setFormatter(formatter) + if formatter is not False: + handler.setFormatter(formatter) - logger.addHandler(handler) + logger.addHandler(handler) if debug: logger.setLevel(logging.DEBUG) @@ -158,7 +180,7 @@ def _replace_all(path, replacements): def multiline_merge(lines, current_event, re_after, re_before): """ Merge multi-line events based. - Some event (like Python trackback or Java stracktrace) spawn + Some event (like Python traceback or Java stacktrace) spawn on multiple line. This method will merge them using two regular expression: regex_after and regex_before. @@ -166,8 +188,8 @@ def multiline_merge(lines, current_event, re_after, re_before): If a line match re_before, it will be merged with previous line. - This function return a list of complet event. Note that because - we don't know if an event is complet before another new event + This function return a list of complete event. Note that because + we don't know if an event is complete before another new event start, the last event will not be returned but stored in current_event. You should pass the same current_event to successive call to multiline_merge. current_event is a list diff --git a/beaver/worker/tail.py b/beaver/worker/tail.py index 2c1a52e1..b5f81a9a 100644 --- a/beaver/worker/tail.py +++ b/beaver/worker/tail.py @@ -207,6 +207,8 @@ def _ensure_file_is_good(self, current_time): if err.errno == errno.ENOENT: self._log_info('file removed') self.close() + return + raise fid = self.get_file_id(st) if fid != self._fid: @@ -420,7 +422,7 @@ def _sincedb_update_position(self, lines=0, force_update=False): conn = sqlite3.connect(self._sincedb_path, isolation_level=None) cursor = conn.cursor() - query = 'insert or ignore into sincedb (fid, filename) values (:fid, :filename);' + query = 'insert or replace into sincedb (fid, filename) values (:fid, :filename);' cursor.execute(query, { 'fid': self._fid, 'filename': self._filename diff --git a/beaver/worker/worker.py b/beaver/worker/worker.py deleted file mode 100644 index cd91b796..00000000 --- a/beaver/worker/worker.py +++ /dev/null @@ -1,687 +0,0 @@ -# -*- coding: utf-8 -*- -import collections -import datetime -import errno -import gzip -import io -import os -import signal -import sqlite3 -import stat -import time -import threading - -from beaver.utils import IS_GZIPPED_FILE, REOPEN_FILES, eglob, multiline_merge -from beaver.unicode_dammit import ENCODINGS - - -class Worker(object): - """Looks for changes in all files of a directory. - This is useful for watching log file changes in real-time. - It also supports files rotation. - - Example: - - >>> def callback(filename, lines): - ... print filename, lines - ... - >>> l = Worker(args, callback, ["log", "txt"]) - >>> l.loop() - """ - - def __init__(self, beaver_config, queue_consumer_function, callback, logger=None): - """Arguments: - - (FileConfig) @file_config: - object containing file-related configuration - - (BeaverConfig) @beaver_config: - object containing global configuration - - (Logger) @logger - object containing a python logger - - (callable) @callback: - a function which is called every time a new line in a - file being watched is found; - this is called with "filename" and "lines" arguments. - """ - self._beaver_config = beaver_config - self._callback = callback - self._create_queue_consumer = queue_consumer_function - self._file_map = {} - self._folder = self._beaver_config.get('path') - self._last_file_mapping_update = {} - self._logger = logger - self._number_of_consumer_processes = int(self._beaver_config.get('number_of_consumer_processes')) - self._proc = [None] * self._number_of_consumer_processes - self._sincedb_path = self._beaver_config.get('sincedb_path') - self._update_time = None - self._running = True - - if not callable(self._callback): - raise RuntimeError("Callback for worker is not callable") - - self.update_files() - self._seek_to_end() - signal.signal(signal.SIGTERM, self.close) - - def __del__(self): - """Closes all files""" - self.close() - - def close(self, signalnum=None, frame=None): - self._running = False - """Closes all currently open file pointers""" - for id, data in self._file_map.iteritems(): - data['file'].close() - self._sincedb_update_position(data['file'], fid=id, force_update=True) - self._file_map.clear() - for n in range(0,self._number_of_consumer_processes): - if self._proc[n] is not None and self._proc[n].is_alive(): - self._logger.debug("Terminate Process: " + str(n)) - self._proc[n].terminate() - self._proc[n].join() - - - def listdir(self): - """List directory and filter files by extension. - You may want to override this to add extra logic or - globbling support. - """ - if self._folder is not None: - ls = os.listdir(self._folder) - return [x for x in ls if os.path.splitext(x)[1][1:] == "log"] - else: - return [] - - def create_queue_consumer_if_required(self, interval=5.0): - - for n in range(0,self._number_of_consumer_processes): - if not (self._proc[n] and self._proc[n].is_alive()): - self._logger.debug("creating consumer process: " + str(n)) - self._proc[n] = self._create_queue_consumer() - timer = threading.Timer(interval, self.create_queue_consumer_if_required) - timer.start() - - def loop(self, interval=0.1, async=False): - """Start the loop. - If async is True make one loop then return. - """ - self.create_queue_consumer_if_required() - - while self._running: - - t = time.time() - - if int(time.time()) - self._update_time > self._beaver_config.get('discover_interval'): - self.update_files() - - self._ensure_files_are_good(current_time=t) - - unwatch_list = [] - - for fid, data in self._file_map.iteritems(): - try: - self._run_pass(fid, data['file']) - except IOError, e: - if e.errno == errno.ESTALE: - unwatch_list.append(fid) - - self.unwatch_list(unwatch_list) - - if async: - return - - self._logger.debug("Iteration took {0:.6f}".format(time.time() - t)) - time.sleep(interval) - - def _run_pass(self, fid, file): - """Read lines from a file and performs a callback against them""" - while True: - try: - data = file.read(4096) - except IOError, e: - if e.errno == errno.ESTALE: - self.active = False - return False - - lines = self._buffer_extract(data=data, fid=fid) - - if not lines: - # Before returning, check if an event (maybe partial) is waiting for too long. - if self._file_map[fid]['current_event'] and time.time() - self._file_map[fid]['last_activity'] > 1: - event = '\n'.join(self._file_map[fid]['current_event']) - self._file_map[fid]['current_event'].clear() - self._callback_wrapper(filename=file.name, lines=[event]) - break - - self._file_map[fid]['last_activity'] = time.time() - - if self._file_map[fid]['multiline_regex_after'] or self._file_map[fid]['multiline_regex_before']: - # Multiline is enabled for this file. - events = multiline_merge( - lines, - self._file_map[fid]['current_event'], - self._file_map[fid]['multiline_regex_after'], - self._file_map[fid]['multiline_regex_before']) - else: - events = lines - - if events: - self._callback_wrapper(filename=file.name, lines=events) - - if self._sincedb_path: - current_line_count = len(lines) - self._sincedb_update_position(file, fid=fid, lines=current_line_count) - - self._sincedb_update_position(file, fid=fid) - - def _buffer_extract(self, data, fid): - """ - Extract takes an arbitrary string of input data and returns an array of - tokenized entities, provided there were any available to extract. This - makes for easy processing of datagrams using a pattern like: - - tokenizer.extract(data).map { |entity| Decode(entity) }.each do ...""" - # Extract token-delimited entities from the input string with the split command. - # There's a bit of craftiness here with the -1 parameter. Normally split would - # behave no differently regardless of if the token lies at the very end of the - # input buffer or not (i.e. a literal edge case) Specifying -1 forces split to - # return "" in this case, meaning that the last entry in the list represents a - # new segment of data where the token has not been encountered - entities = collections.deque(data.split(self._file_map[fid]['delimiter'], -1)) - - # Check to see if the buffer has exceeded capacity, if we're imposing a limit - if self._file_map[fid]['size_limit']: - if self._file_map[fid]['input_size'] + len(entities[0]) > self._file_map[fid]['size_limit']: - raise Exception('input buffer full') - self._file_map[fid]['input_size'] += len(entities[0]) - - # Move the first entry in the resulting array into the input buffer. It represents - # the last segment of a token-delimited entity unless it's the only entry in the list. - first_entry = entities.popleft() - if len(first_entry) > 0: - self._file_map[fid]['input'].append(first_entry) - - # If the resulting array from the split is empty, the token was not encountered - # (not even at the end of the buffer). Since we've encountered no token-delimited - # entities this go-around, return an empty array. - if len(entities) == 0: - return [] - - # At this point, we've hit a token, or potentially multiple tokens. Now we can bring - # together all the data we've buffered from earlier calls without hitting a token, - # and add it to our list of discovered entities. - entities.appendleft(''.join(self._file_map[fid]['input'])) - - # Now that we've hit a token, joined the input buffer and added it to the entities - # list, we can go ahead and clear the input buffer. All of the segments that were - # stored before the join can now be garbage collected. - self._file_map[fid]['input'].clear() - - # The last entity in the list is not token delimited, however, thanks to the -1 - # passed to split. It represents the beginning of a new list of as-yet-untokenized - # data, so we add it to the start of the list. - self._file_map[fid]['input'].append(entities.pop()) - - # Set the new input buffer size, provided we're keeping track - if self._file_map[fid]['size_limit']: - self._file_map[fid]['input_size'] = len(self._file_map[fid]['input'][0]) - - # Now we're left with the list of extracted token-delimited entities we wanted - # in the first place. Hooray! - return entities - - # Flush the contents of the input buffer, i.e. return the input buffer even though - # a token has not yet been encountered - def _buffer_flush(self, fid): - buf = ''.join(self._file_map[fid]['input']) - self._file_map[fid]['input'].clear - return buf - - # Is the buffer empty? - def _buffer_empty(self, fid): - return len(self._file_map[fid]['input']) > 0 - - def _seek_to_end(self): - unwatch_list = [] - - # The first time we run the script we move all file markers at EOF. - # In case of files created afterwards we don't do this. - for fid, data in self._file_map.iteritems(): - self._logger.debug("[{0}] - getting start position {1}".format(fid, data['file'].name)) - start_position = self._beaver_config.get_field('start_position', data['file'].name) - is_active = data['active'] - - if self._sincedb_path: - sincedb_start_position = self._sincedb_start_position(data['file'], fid=fid) - if sincedb_start_position: - start_position = sincedb_start_position - - if start_position == "beginning": - continue - - line_count = 0 - - if str(start_position).isdigit(): - self._logger.debug("[{0}] - going to start position {1} for {2}".format(fid, start_position, data['file'].name)) - start_position = int(start_position) - for encoding in ENCODINGS: - try: - line_count = 0 - while data['file'].readline(): - line_count += 1 - if line_count == start_position: - break - except UnicodeDecodeError: - self._logger.debug("[{0}] - UnicodeDecodeError raised for {1} with encoding {2}".format(fid, data['file'].name, data['encoding'])) - data['file'] = self.open(data['file'].name, encoding=encoding) - if not data['file']: - unwatch_list.append(fid) - is_active = False - break - - data['encoding'] = encoding - - if line_count != start_position: - self._logger.debug("[{0}] - file at different position than {1}, assuming manual truncate for {2}".format(fid, start_position, data['file'].name)) - data['file'].seek(0, os.SEEK_SET) - start_position == "beginning" - - if not is_active: - continue - - if start_position == "beginning": - continue - - if start_position == "end": - self._logger.debug("[{0}] - getting end position for {1}".format(fid, data['file'].name)) - for encoding in ENCODINGS: - try: - line_count = 0 - while data['file'].readline(): - line_count += 1 - break - except UnicodeDecodeError: - self._logger.debug("[{0}] - UnicodeDecodeError raised for {1} with encoding {2}".format(fid, data['file'].name, data['encoding'])) - data['file'] = self.open(data['file'].name, encoding=encoding) - if not data['file']: - unwatch_list.append(fid) - is_active = False - break - - data['encoding'] = encoding - - if not is_active: - continue - - current_position = data['file'].tell() - self._logger.debug("[{0}] - line count {1} for {2}".format(fid, line_count, data['file'].name)) - self._sincedb_update_position(data['file'], fid=fid, lines=line_count, force_update=True) - # Reset this, so line added processed just after this initialization - # will update the sincedb. Without this, if beaver run for less than - # sincedb_write_interval it will always re-process the last lines. - data['update_time'] = 0 - - tail_lines = self._beaver_config.get_field('tail_lines', data['file'].name) - tail_lines = int(tail_lines) - if tail_lines: - encoding = data['encoding'] - - lines = self.tail(data['file'].name, encoding=encoding, window=tail_lines, position=current_position) - if lines: - if self._file_map[fid]['multiline_regex_after'] or self._file_map[fid]['multiline_regex_before']: - # Multiline is enabled for this file. - events = multiline_merge( - lines, - self._file_map[fid]['current_event'], - self._file_map[fid]['multiline_regex_after'], - self._file_map[fid]['multiline_regex_before']) - else: - events = lines - self._callback_wrapper(filename=data['file'].name, lines=events) - - self.unwatch_list(unwatch_list) - - def _callback_wrapper(self, filename, lines): - now = datetime.datetime.utcnow() - timestamp = now.strftime("%Y-%m-%dT%H:%M:%S") + ".%03d" % (now.microsecond / 1000) + "Z" - self._callback(('callback', { - 'fields': self._beaver_config.get_field('fields', filename), - 'filename': filename, - 'format': self._beaver_config.get_field('format', filename), - 'ignore_empty': self._beaver_config.get_field('ignore_empty', filename), - 'lines': lines, - 'timestamp': timestamp, - 'tags': self._beaver_config.get_field('tags', filename), - 'type': self._beaver_config.get_field('type', filename), - })) - - def _sincedb_init(self): - """Initializes the sincedb schema in an sqlite db""" - if not self._sincedb_path: - return - - if not os.path.exists(self._sincedb_path): - self._logger.debug('Initializing sincedb sqlite schema') - conn = sqlite3.connect(self._sincedb_path, isolation_level=None) - conn.execute(""" - create table sincedb ( - fid text primary key, - filename text, - position integer default 1 - ); - """) - conn.close() - - def _sincedb_update_position(self, file, fid=None, lines=0, force_update=False): - """Retrieves the starting position from the sincedb sql db for a given file - Returns a boolean representing whether or not it updated the record - """ - if not self._sincedb_path: - return False - - if not fid: - fid = self.get_file_id(os.stat(file.name)) - - self._file_map[fid]['line'] = self._file_map[fid]['line'] + lines - old_count = self._file_map[fid]['line_in_sincedb'] - lines = self._file_map[fid]['line'] - - current_time = int(time.time()) - update_time = self._file_map[fid]['update_time'] - if not force_update: - sincedb_write_interval = self._beaver_config.get_field('sincedb_write_interval', file.name) - if update_time and current_time - update_time <= sincedb_write_interval: - return False - - if old_count == lines: - return False - - self._sincedb_init() - - self._file_map[fid]['update_time'] = current_time - - self._logger.debug("[{0}] - updating sincedb for logfile {1} from {2} to {3}".format(fid, file.name, old_count, lines)) - - conn = sqlite3.connect(self._sincedb_path, isolation_level=None) - cursor = conn.cursor() - query = "insert or ignore into sincedb (fid, filename) values (:fid, :filename);" - cursor.execute(query, { - 'fid': fid, - 'filename': file.name - }) - - query = "update sincedb set position = :position where fid = :fid and filename = :filename" - cursor.execute(query, { - 'fid': fid, - 'filename': file.name, - 'position': int(lines), - }) - conn.close() - - self._file_map[fid]['line_in_sincedb'] = lines - - return True - - def _sincedb_start_position(self, file, fid=None): - """Retrieves the starting position from the sincedb sql db - for a given file - """ - if not self._sincedb_path: - return None - - if not fid: - fid = self.get_file_id(os.stat(file.name)) - - self._sincedb_init() - conn = sqlite3.connect(self._sincedb_path, isolation_level=None) - cursor = conn.cursor() - cursor.execute("select position from sincedb where fid = :fid and filename = :filename", { - 'fid': fid, - 'filename': file.name - }) - - start_position = None - for row in cursor.fetchall(): - start_position, = row - - return start_position - - def update_files(self): - """Ensures all files are properly loaded. - Detects new files, file removals, file rotation, and truncation. - On non-linux platforms, it will also manually reload the file for tailing. - Note that this hack is necessary because EOF is cached on BSD systems. - """ - self._update_time = int(time.time()) - - ls = [] - files = [] - if len(self._beaver_config.get('globs')) > 0: - for name, exclude in self._beaver_config.get('globs').items(): - globbed = [os.path.realpath(filename) for filename in eglob(name, exclude)] - files.extend(globbed) - self._beaver_config.addglob(name, globbed) - self._callback(("addglob", (name, globbed))) - else: - for name in self.listdir(): - files.append(os.path.realpath(os.path.join(self._folder, name))) - - for absname in files: - try: - st = os.stat(absname) - except EnvironmentError, err: - if err.errno != errno.ENOENT: - raise - else: - if not stat.S_ISREG(st.st_mode): - continue - fid = self.get_file_id(st) - ls.append((fid, absname)) - - # add new ones - for fid, fname in ls: - if fid not in self._file_map: - self.watch(fname) - - def _ensure_files_are_good(self, current_time): - """Every N seconds, ensures that the file we are tailing is the file we expect to be tailing""" - - # We cannot watch/unwatch in a single iteration - rewatch_list = [] - unwatch_list = [] - - # check existent files - for fid, data in self._file_map.iteritems(): - filename = data['file'].name - stat_interval = self._beaver_config.get_field('stat_interval', filename) - if filename in self._last_file_mapping_update and current_time - self._last_file_mapping_update[filename] <= stat_interval: - continue - - self._last_file_mapping_update[filename] = time.time() - - try: - st = os.stat(data['file'].name) - except EnvironmentError, err: - if err.errno == errno.ENOENT: - unwatch_list.append(fid) - else: - raise - else: - if fid != self.get_file_id(st): - self._logger.info("[{0}] - file rotated {1}".format(fid, data['file'].name)) - rewatch_list.append(fid) - elif data['file'].tell() > st.st_size: - if st.st_size == 0 and self._beaver_config.get_field('ignore_truncate', data['file'].name): - self._logger.info("[{0}] - file size is 0 {1}. ".format(fid, data['file'].name) + - "If you use another tool (i.e. logrotate) to truncate " + - "the file, your application may continue to write to " + - "the offset it last wrote later. In such a case, we'd " + - "better do nothing here") - continue - self._logger.info("[{0}] - file truncated {1}".format(fid, data['file'].name)) - rewatch_list.append(fid) - elif REOPEN_FILES: - self._logger.debug("[{0}] - file reloaded (non-linux) {1}".format(fid, data['file'].name)) - position = data['file'].tell() - fname = data['file'].name - data['file'].close() - file = self.open(fname, encoding=data['encoding']) - if file: - file.seek(position) - self._file_map[fid]['file'] = file - - self.unwatch_list(unwatch_list) - self.rewatch_list(rewatch_list) - - def rewatch_list(self, rewatch_list): - for fid in rewatch_list: - if fid not in self._file_map: - continue - - f = self._file_map[fid]['file'] - filename = f.name - self.unwatch(f, fid) - self.watch(filename) - - def unwatch_list(self, unwatch_list): - for fid in unwatch_list: - if fid not in self._file_map: - continue - - f = self._file_map[fid]['file'] - self.unwatch(f, fid) - - def unwatch(self, file, fid): - """file no longer exists; if it has been renamed - try to read it for the last time in case the - log rotator has written something in it. - """ - try: - if file: - self._run_pass(fid, file) - if self._file_map[fid]['current_event']: - event = '\n'.join(self._file_map[fid]['current_event']) - self._file_map[fid]['current_event'].clear() - self._callback_wrapper(filename=file.name, lines=[event]) - except IOError: - # Silently ignore any IOErrors -- file is gone - pass - - if file: - self._logger.info("[{0}] - un-watching logfile {1}".format(fid, file.name)) - else: - self._logger.info("[{0}] - un-watching logfile".format(fid)) - - self._file_map[fid]['file'].close() - del self._file_map[fid] - - def watch(self, fname): - """Opens a file for log tailing""" - try: - file = self.open(fname, encoding=self._beaver_config.get_field('encoding', fname)) - if file: - fid = self.get_file_id(os.stat(fname)) - except EnvironmentError, err: - if err.errno != errno.ENOENT: - raise - else: - if file: - self._logger.info("[{0}] - watching logfile {1}".format(fid, fname)) - self._file_map[fid] = { - 'current_event': collections.deque([]), - 'delimiter': self._beaver_config.get_field('delimiter', fname), - 'encoding': self._beaver_config.get_field('encoding', fname), - 'file': file, - 'input': collections.deque([]), - 'input_size': 0, - 'last_activity': time.time(), - 'line': 0, - 'line_in_sincedb': 0, - 'multiline_regex_after': self._beaver_config.get_field('multiline_regex_after', fname), - 'multiline_regex_before': self._beaver_config.get_field('multiline_regex_before', fname), - 'size_limit': self._beaver_config.get_field('size_limit', fname), - 'update_time': None, - 'active': True, - } - - def open(self, filename, encoding=None): - """Opens a file with the appropriate call""" - try: - if IS_GZIPPED_FILE.search(filename): - _file = gzip.open(filename, "rb") - else: - file_encoding = self._beaver_config.get_field('encoding', filename) - if encoding: - _file = io.open(filename, "r", encoding=encoding, errors='replace') - elif file_encoding: - _file = io.open(filename, "r", encoding=file_encoding, errors='replace') - else: - _file = io.open(filename, "r", errors='replace') - except IOError, e: - self._logger.warning(str(e)) - _file = None - - return _file - - def tail(self, fname, encoding, window, position=None): - """Read last N lines from file fname.""" - if window <= 0: - raise ValueError('invalid window %r' % window) - - encodings = ENCODINGS - if encoding: - encodings = [encoding] + ENCODINGS - - for enc in encodings: - try: - f = self.open(fname, encoding=enc) - if not f: - return [] - return self.tail_read(f, window, position=position) - except IOError, err: - if err.errno == errno.ENOENT: - return [] - raise - except UnicodeDecodeError: - pass - - @staticmethod - def get_file_id(st): - return "%xg%x" % (st.st_dev, st.st_ino) - - @classmethod - def tail_read(cls, f, window, position=None): - BUFSIZ = 1024 - # open() was overridden and file was opened in text - # mode; read() will return a string instead bytes. - encoded = getattr(f, 'encoding', False) - CR = '\n' if encoded else b'\n' - data = '' if encoded else b'' - f.seek(0, os.SEEK_END) - if position is None: - position = f.tell() - - block = -1 - exit = False - read = BUFSIZ - - while not exit: - step = (block * BUFSIZ) + position - if step < 0: - step = 0 - read = ((block + 1) * BUFSIZ) + position - exit = True - - f.seek(step, os.SEEK_SET) - newdata = f.read(read) - - data = newdata + data - if data.count(CR) > window: - break - else: - block -= 1 - - return data.splitlines()[-window:] diff --git a/bin/beaver b/bin/beaver index afd2511a..1eab3db4 100755 --- a/bin/beaver +++ b/bin/beaver @@ -1,7 +1,6 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- from beaver.dispatcher.tail import run as tail_run -from beaver.dispatcher.worker import run as worker_run from beaver.pidfile import PidFile from beaver.utils import CAN_DAEMONIZE, parse_args, version @@ -20,12 +19,6 @@ if args.daemonize: context = daemon.DaemonContext(pidfile=PidFile(args.pid)) with context: - if args.experimental: - tail_run(args) - else: - worker_run(args) -else: - if args.experimental: tail_run(args) - else: - worker_run(args) +else: + tail_run(args) diff --git a/debian/changelog b/debian/changelog index a76c1cff..935126ee 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,3 +1,45 @@ +python-beaver (36.3.1) stable; urgency=low + + * Release version 36.3.1 + + -- Jose Diaz-Gonzalez Wed, 03 Apr 2019 16:48:10 +0000 + +python-beaver (36.3.0) stable; urgency=low + + * Release version 36.3.0 + + -- Jose Diaz-Gonzalez Sun, 14 Oct 2018 23:42:32 +0000 + +python-beaver (36.2.1) stable; urgency=low + + * Release version 36.2.1 + + -- Jose Diaz-Gonzalez Tue, 20 Sep 2016 18:50:12 +0000 + +python-beaver (36.2.0) stable; urgency=low + + * Release version 36.2.0 + + -- Jose Diaz-Gonzalez Tue, 12 Apr 2016 17:18:32 +0000 + +python-beaver (36.1.0) stable; urgency=low + + * Release version 36.1.0 + + -- Jose Diaz-Gonzalez Sun, 14 Feb 2016 21:02:59 +0000 + +python-beaver (36.0.1) stable; urgency=low + + * Release version 36.0.1 + + -- Jose Diaz-Gonzalez Wed, 13 Jan 2016 19:33:51 +0000 + +python-beaver (36.0.0) stable; urgency=low + + * Release version 36.0.0 + + -- Jose Diaz-Gonzalez Tue, 15 Dec 2015 22:12:18 +0000 + python-beaver (31) stable; urgency=low * Initial public debian release diff --git a/debian/control b/debian/control index 4ac0eaa3..f9b019f6 100644 --- a/debian/control +++ b/debian/control @@ -8,5 +8,6 @@ Standards-Version: 3.9.5 Package: python-beaver Architecture: all Depends: ${python:Depends}, ${misc:Depends} -Homepage: http://github.com/josegonzalez/python-beaver -Description: Outputs a valid hostname for a given autoscale group instance +Homepage: http://github.com/python-beaver/python-beaver +Description: Python log forwarder + Python daemon that munches on logs and sends their contents to logstash diff --git a/docs/conf.py b/docs/conf.py index 74cd2f91..9d4bc5e9 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -53,9 +53,9 @@ # built documents. # # The short X.Y version. -version = '34.1.0' +version = '36.3.1' # The full version, including alpha/beta/rc tags. -release = '34.1.0' +release = '36.3.1' # The language for content autogenerated by Sphinx. Refer to documentation # for a list of supported languages. diff --git a/docs/user/install.rst b/docs/user/install.rst index cd31d868..a4e3acb4 100644 --- a/docs/user/install.rst +++ b/docs/user/install.rst @@ -38,19 +38,19 @@ Get the Code ------------ Beaver is actively developed on GitHub, where the code is -`always available `_. +`always available `_. You can either clone the public repository:: - git clone git://github.com/josegonzalez/beaver.git + git clone git://github.com/python-beaver/python-beaver.git -Download the `tarball `_:: +Download the `tarball `_:: - $ curl -OL https://github.com/josegonzalez/beaver/tarball/master + $ curl -OL https://github.com/python-beaver/python-beaver/tarball/master -Or, download the `zipball `_:: +Or, download the `zipball `_:: - $ curl -OL https://github.com/josegonzalez/beaver/zipball/master + $ curl -OL https://github.com/python-beaver/python-beaver/zipball/master Once you have a copy of the source, you can embed it in your Python package, diff --git a/docs/user/usage.rst b/docs/user/usage.rst index 8b1c4408..86792ed6 100644 --- a/docs/user/usage.rst +++ b/docs/user/usage.rst @@ -54,9 +54,11 @@ Beaver can optionally get data from a ``configfile`` using the ``-c`` flag. This * kafka_batch_t: Default ``10``. Batch log message timeout * mqtt_host: Default ``localhost``. Host for mosquitto * mqtt_port: Default ``1883``. Port for mosquitto -* mqtt_clientid: Default ``mosquitto``. Mosquitto client id +* mqtt_clientid: Default ``paho``. Paho client id * mqtt_keepalive: Default ``60``. mqtt keepalive ping * mqtt_topic: Default ``/logstash``. Topic to publish to +* number_of_consumer_processes: Default ``1``. Number of parallel consumer processes that read and process messages from the beaver queue. +* rabbitmq_arguments: Defaults ``{}``. RabbitMQ arguments comma separated, colon separated key value pairs. i.e ``rabbitmq_arguments: x-max-length:750000,x-max-length-bytes:1073741824`` * rabbitmq_host: Defaults ``localhost``. Host for RabbitMQ * rabbitmq_port: Defaults ``5672``. Port for RabbitMQ * rabbitmq_ssl: Defaults ``0``. Connect using SSL/TLS @@ -72,6 +74,7 @@ Beaver can optionally get data from a ``configfile`` using the ``-c`` flag. This * rabbitmq_exchange_durable: Default ``0``. * rabbitmq_key: Default ``logstash-key``. * rabbitmq_exchange: Default ``logstash-exchange``. +* rabbitmq_timeout: Default ``1``. The timeout in seconds for the connection to the RabbitMQ broker * rabbitmq_delivery_mode: Default ``1``. Message deliveryMode. 1: non persistent 2: persistent * redis_url: Default ``redis://localhost:6379/0``. Comma separated redis URLs * redis_namespace: Default ``logstash:beaver``. Redis key namespace @@ -83,9 +86,11 @@ Beaver can optionally get data from a ``configfile`` using the ``-c`` flag. This * sns_aws_topic_arn: Topic ARN (must exist) * sqs_aws_access_key: Can be left blank to use IAM Roles or AWS_ACCESS_KEY_ID environment variable (see: https://github.com/boto/boto#getting-started-with-boto) * sqs_aws_secret_key: Can be left blank to use IAM Roles or AWS_SECRET_ACCESS_KEY environment variable (see: https://github.com/boto/boto#getting-started-with-boto) +* sqs_aws_profile_name: Can be left blank to use IAM Roles AWS_SECRET_ACCESS_KEY environment variable, or fixed keypair (above) (see: https://github.com/boto/boto#getting-started-with-boto) * sqs_aws_region: Default ``us-east-1``. AWS Region -* sqs_aws_queue: SQS queue (must exist) +* sqs_aws_queue: SQS queue, or comma delimited list of queues (must exist) * sqs_aws_queue_owner_acct_id: Optional. Defaults ``None``. Account ID or Principal allowed to write to queue +* sqs_bulk_lines: Optional. Send multiple log entries in a single SQS message (cost saving benefit on larger environments) * kinesis_aws_access_key: Can be left blank to use IAM roles or AWS_ACCESS_KEY_ID environment variable (see: https://github.com/boto/boto#getting-started-with-boto) * kinesis_aws_secret_key: Can be left blank to use IAM Roles or AWS_SECRET_ACCESS_KEY environment variable (see: https://github.com/boto/boto#getting-started-with-boto) * kinesis_aws_region: Default ``us-east-1``. AWS Region @@ -93,7 +98,7 @@ Beaver can optionally get data from a ``configfile`` using the ``-c`` flag. This * kinesis_aws_batch_size_max: Default ``512000``. Arbitrary flush size to limit size of logs in transit. * tcp_host: Default ``127.0.0.1``. TCP Host * tcp_port: Default ``9999``. TCP Port -* tcp_ssl Defaults ``0``. Connect using SSL/TLS +* tcp_ssl_enabled: Defaults ``0``. Connect using SSL/TLS * tcp_ssl_key Optional. Defaults ``None``. Path to client private key for SSL/TLS * tcp_ssl_cert Optional. Defaults ``None``. Path to client certificate for SSL/TLS * tcp_ssl_cacert Optional. Defaults ``None``. Path to CA certificate for SSL/TLS @@ -114,6 +119,7 @@ The following are used for instances when a TransportException is thrown - Trans * respawn_delay: Default ``3``. Initial respawn delay for exponential backoff * max_failure: Default ``7``. Max failures before exponential backoff terminates +* max_queue_size: Default ``100``. Max log entries Beaver can store in it's queue before backing off until they have been transmitted The following configuration keys are for SinceDB support. Specifying these will enable saving the current line number in an sqlite database. This is useful for cases where you may be restarting the Beaver process, such as during a logrotate. @@ -218,7 +224,7 @@ Read config from config.ini and put to stdout:: tags: tag1,tag2 add_field: fieldname1,fieldvalue1[,fieldname2,fieldvalue2, ...] - ; follow all logs in /var/log except those with `messages` or `secure` in the name. + ; follow all logs in /var/log except those with `messages` or `secure` in the name *of the file*. Currently it is not possible to exclude certain lines of a file. ; The exclude tag must be a valid python regular expression. [/var/log/*log] type: syslog @@ -494,7 +500,7 @@ SQS Transport:: # /etc/beaver/conf [beaver] sqs_aws_region: us-east-1 - sqs_aws_queue: logstash-input + sqs_aws_queue: logstash-input1,logstash-input2 sqs_aws_access_key: sqs_aws_secret_key: @@ -527,7 +533,7 @@ Kinesis Transport:: # From the commandline beaver -c /etc/beaver/conf -t kinesis -Mqtt transport using Mosquitto:: +Mqtt transport using Paho:: # /etc/beaver/conf [beaver] diff --git a/install-dependencies.sh b/install-dependencies.sh new file mode 100755 index 00000000..50e4d652 --- /dev/null +++ b/install-dependencies.sh @@ -0,0 +1,9 @@ +#!/usr/bin/env bash +pip install -r requirements/zeromq.txt -r requirements/tests.txt + +mkdir ~/.aws/ +cat > ~/.aws/credentials << EOL +[beaver_queue] +aws_access_key_id = 111 +aws_secret_access_key = 1111 +EOL diff --git a/release b/release index ee14ebe3..b48a67ef 100755 --- a/release +++ b/release @@ -1,6 +1,8 @@ #!/usr/bin/env bash +set -eo pipefail; [[ $RELEASE_TRACE ]] && set -x PACKAGE_NAME='beaver' +INIT_PACKAGE_NAME='beaver' PUBLIC="true" # Colors @@ -11,6 +13,22 @@ YELLOW="\033[0;33m" # yellow MAGENTA="\033[0;35m" # magenta CYAN="\033[0;36m" # cyan +# ensure wheel is available +pip install wheel > /dev/null + +# ensure Pygment is available +pip install Pygments > /dev/null + +command -v gitchangelog >/dev/null 2>&1 || { + echo -e "${RED}WARNING: Missing gitchangelog binary, please run: pip install gitchangelog==2.2.0${COLOR_OFF}\n" + exit 1 +} + +command -v rst-lint > /dev/null || { + echo -e "${RED}WARNING: Missing rst-lint binary, please run: pip install restructuredtext_lint${COLOR_OFF}\n" + exit 1 +} + if [[ "$@" != "major" ]] && [[ "$@" != "minor" ]] && [[ "$@" != "patch" ]]; then echo -e "${RED}WARNING: Invalid release type, must specify 'major', 'minor', or 'patch'${COLOR_OFF}\n" exit 1 @@ -18,21 +36,23 @@ fi echo -e "\n${GREEN}STARTING RELEASE PROCESS${COLOR_OFF}\n" -git status | grep "working directory clean" &> /dev/null +set +e; +git status | grep -Eo "working (directory|tree) clean" &> /dev/null if [ ! $? -eq 0 ]; then # working directory is NOT clean echo -e "${RED}WARNING: You have uncomitted changes, you may have forgotten something${COLOR_OFF}\n" exit 1 fi +set -e; echo -e "${YELLOW}--->${COLOR_OFF} Updating local copy" git pull -q origin master echo -e "${YELLOW}--->${COLOR_OFF} Retrieving release versions" -current_version=`cat ${PACKAGE_NAME}/__init__.py |grep '__version__ ='|sed 's/[^0-9.]//g'` -major=`echo $current_version | awk '{split($0,a,"."); print a[1]}'` -minor=`echo $current_version | awk '{split($0,a,"."); print a[2]}'` -patch=`echo $current_version | awk '{split($0,a,"."); print a[3]}'` +current_version=$(cat ${INIT_PACKAGE_NAME}/__init__.py |grep '__version__ ='|sed 's/[^0-9.]//g') +major=$(echo $current_version | awk '{split($0,a,"."); print a[1]}') +minor=$(echo $current_version | awk '{split($0,a,"."); print a[2]}') +patch=$(echo $current_version | awk '{split($0,a,"."); print a[3]}') if [[ "$@" == "major" ]]; then major=$(($major + 1)); @@ -49,9 +69,12 @@ next_version="${major}.${minor}.${patch}" echo -e "${YELLOW} >${COLOR_OFF} ${MAGENTA}${current_version}${COLOR_OFF} -> ${MAGENTA}${next_version}${COLOR_OFF}" +echo -e "${YELLOW}--->${COLOR_OFF} Ensuring readme passes lint checks (if this fails, run rst-lint)" +rst-lint README.rst > /dev/null + echo -e "${YELLOW}--->${COLOR_OFF} Creating necessary temp file" -tempfoo=`basename $0` -TMPFILE=`mktemp /tmp/${tempfoo}.XXXXXX` || { +tempfoo=$(basename $0) +TMPFILE=$(mktemp /tmp/${tempfoo}.XXXXXX) || { echo -e "${RED}WARNING: Cannot create temp file using mktemp in /tmp dir ${COLOR_OFF}\n" exit 1 } @@ -59,8 +82,8 @@ TMPFILE=`mktemp /tmp/${tempfoo}.XXXXXX` || { find_this="__version__ = '$current_version'" replace_with="__version__ = '$next_version'" -echo -e "${YELLOW}--->${COLOR_OFF} Updating ${PACKAGE_NAME}/__init__.py" -sed "s/$find_this/$replace_with/" ${PACKAGE_NAME}/__init__.py > $TMPFILE && mv $TMPFILE ${PACKAGE_NAME}/__init__.py +echo -e "${YELLOW}--->${COLOR_OFF} Updating ${INIT_PACKAGE_NAME}/__init__.py" +sed "s/$find_this/$replace_with/" ${INIT_PACKAGE_NAME}/__init__.py > $TMPFILE && mv $TMPFILE ${INIT_PACKAGE_NAME}/__init__.py echo -e "${YELLOW}--->${COLOR_OFF} Updating README.rst" find_this="${PACKAGE_NAME}.git@$current_version" @@ -83,11 +106,15 @@ fi echo -e "${YELLOW}--->${COLOR_OFF} Updating CHANGES.rst for new release" version_header="$next_version ($(date +%F))" -dashes=`yes '-'|head -n ${#version_header}|tr -d '\n'` +set +e; dashes=$(yes '-'|head -n ${#version_header}|tr -d '\n') ; set -e gitchangelog |sed "4s/.*/$version_header/"|sed "5s/.*/$dashes/" > $TMPFILE && mv $TMPFILE CHANGES.rst +echo -e "${YELLOW}--->${COLOR_OFF} Updating debian/changelog" +dch -v $next_version "Release version $next_version" +dch -r stable + echo -e "${YELLOW}--->${COLOR_OFF} Adding changed files to git" -git add CHANGES.rst README.rst ${PACKAGE_NAME}/__init__.py +git add CHANGES.rst README.rst debian/changelog ${INIT_PACKAGE_NAME}/__init__.py if [ -f docs/conf.py ]; then git add docs/conf.py; fi echo -e "${YELLOW}--->${COLOR_OFF} Creating release" @@ -102,7 +129,7 @@ git push -q origin master && git push -q --tags if [[ "$PUBLIC" == "true" ]]; then echo -e "${YELLOW}--->${COLOR_OFF} Creating python release" cp README.rst README - python setup.py sdist upload > /dev/null + python setup.py sdist bdist_wheel upload > /dev/null rm README fi diff --git a/requirements/base.txt b/requirements/base.txt index 7a056748..33e02ce6 100644 --- a/requirements/base.txt +++ b/requirements/base.txt @@ -1,9 +1,10 @@ boto>=2.18.0 conf_d>=0.0.4 -glob2==0.3 -kafka-python>=0.9.3 -mosquitto>=1.1 +glob2==0.7 +kafka-python==0.9.5 +paho-mqtt==1.1 msgpack-pure>=0.1.3 -pika>=0.9.12 +pika>=0.9.14,<1.0.0 python-daemon>=1.5.2,<=1.6.1 redis>=2.7.5 +requests diff --git a/requirements/tests.txt b/requirements/tests.txt index 99f5e0f8..e087027f 100644 --- a/requirements/tests.txt +++ b/requirements/tests.txt @@ -3,3 +3,6 @@ funcsigs mock nose six +unittest2 +coveralls +moto==0.4.31 diff --git a/setup.py b/setup.py index 1d795d4d..d943d495 100755 --- a/setup.py +++ b/setup.py @@ -64,7 +64,7 @@ 'beaver.worker' ], scripts=['bin/beaver'], - url='http://github.com/josegonzalez/beaver', + url='http://github.com/python-beaver/python-beaver', license='LICENSE.txt', classifiers=[ 'Development Status :: 5 - Production/Stable',