diff --git a/cassandra/cluster.py b/cassandra/cluster.py index 92cc39377e..af8e3f4b8c 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -70,6 +70,7 @@ from cassandra.query import (SimpleStatement, PreparedStatement, BoundStatement, BatchStatement, bind_params, QueryTrace, named_tuple_factory, dict_factory, tuple_factory, FETCH_SIZE_UNSET) +from cassandra.timestamps import MonotonicTimestampGenerator def _is_eventlet_monkey_patched(): @@ -771,7 +772,8 @@ def __init__(self, prepare_on_all_hosts=True, reprepare_on_up=True, execution_profiles=None, - allow_beta_protocol_version=False): + allow_beta_protocol_version=False, + timestamp_generator=None): """ ``executor_threads`` defines the number of threads in a pool for handling asynchronous tasks such as extablishing connection pools or refreshing metadata. @@ -830,6 +832,13 @@ def __init__(self, if connection_class is not None: self.connection_class = connection_class + if timestamp_generator is not None: + if not callable(timestamp_generator): + raise ValueError("timestamp_generator must be callable") + self.timestamp_generator = timestamp_generator + else: + self.timestamp_generator = MonotonicTimestampGenerator() + self.profile_manager = ProfileManager() self.profile_manager.profiles[EXEC_PROFILE_DEFAULT] = ExecutionProfile(self.load_balancing_policy, self.default_retry_policy, @@ -1893,6 +1902,27 @@ def default_serial_consistency_level(self, cl): .. versionadded:: 2.1.0 """ + timestamp_generator = None + """ + When :attr:`use_client_timestamp` is set, sessions call this object and use + the result as the timestamp. (Note that timestamps specified within a CQL + query will override this timestamp.) By default, a new + :class:`~.MonotonicTimestampGenerator` is created for + each :class:`Cluster` instance. + + Applications can set this value for custom timestamp behavior. For + example, an application could share a timestamp generator across + :class:`Cluster` objects to guarantee that the application will use unique, + increasing timestamps across clusters, or set it to to ``lambda: + int(time.time() * 1e6)`` if losing records over clock inconsistencies is + acceptable for the application. Custom :attr:`timestamp_generator` s should + be callable, and calling them should return an integer representing seconds + since some point in time, typically UNIX epoch. + + .. versionadded:: 3.8.0 + """ + + encoder = None """ A :class:`~cassandra.encoder.Encoder` instance that will be used when @@ -2085,7 +2115,7 @@ def _create_response_future(self, query, parameters, trace, custom_payload, time start_time = time.time() if self._protocol_version >= 3 and self.use_client_timestamp: - timestamp = int(start_time * 1e6) + timestamp = self.cluster.timestamp_generator() else: timestamp = None diff --git a/cassandra/timestamps.py b/cassandra/timestamps.py new file mode 100644 index 0000000000..385b8c501b --- /dev/null +++ b/cassandra/timestamps.py @@ -0,0 +1,107 @@ +# Copyright 2013-2016 DataStax, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +This module contains utilities for generating timestamps for client-side +timestamp specification. +""" + +import logging +import time +from threading import Lock + +log = logging.getLogger(__name__) + +class MonotonicTimestampGenerator(object): + """ + An object that, when called, returns ``int(time.time() * 1e6)`` when + possible, but, if the value returned by ``time.time`` doesn't increase, + drifts into the future and logs warnings. + Exposed configuration attributes can be configured with arguments to + ``__init__`` or by changing attributes on an initialized object. + + .. versionadded:: 3.8.0 + """ + + warn_on_drift = True + """ + If true, log warnings when timestamps drift into the future as allowed by + :attr:`warning_threshold` and :attr:`warning_interval`. + """ + + warning_threshold = 0 + """ + This object will only issue warnings when the returned timestamp drifts + more than ``warning_threshold`` seconds into the future. + """ + + warning_interval = 0 + """ + This object will only issue warnings every ``warning_interval`` seconds. + """ + + def __init__(self, warn_on_drift=True, warning_threshold=0, warning_interval=0): + self.lock = Lock() + with self.lock: + self.last = 0 + self._last_warn = 0 + self.warn_on_drift = warn_on_drift + self.warning_threshold = warning_threshold + self.warning_interval = warning_interval + + def _next_timestamp(self, now, last): + """ + Returns the timestamp that should be used if ``now`` is the current + time and ``last`` is the last timestamp returned by this object. + Intended for internal and testing use only; to generate timestamps, + call an instantiated ``MonotonicTimestampGenerator`` object. + + :param int now: an integer to be used as the current time, typically + representing the current time in seconds since the UNIX epoch + :param int last: an integer representing the last timestamp returned by + this object + """ + if now > last: + self.last = now + return now + else: + self._maybe_warn(now=now) + self.last = last + 1 + return self.last + + def __call__(self): + """ + Makes ``MonotonicTimestampGenerator`` objects callable; defers + internally to _next_timestamp. + """ + with self.lock: + return self._next_timestamp(now=int(time.time() * 1e6), + last=self.last) + + def _maybe_warn(self, now): + # should be called from inside the self.lock. + diff = self.last - now + since_last_warn = now - self._last_warn + + warn = (self.warn_on_drift and + (diff > self.warning_threshold * 1e6) and + (since_last_warn >= self.warning_interval * 1e6)) + if warn: + log.warn( + "Clock skew detected: current tick ({now}) was {diff} " + "microseconds behind the last generated timestamp " + "({last}), returned timestamps will be artificially " + "incremented to guarantee monotonicity.".format( + now=now, diff=diff, last=self.last)) + self._last_warn = now diff --git a/docs/api/cassandra/cluster.rst b/docs/api/cassandra/cluster.rst index 05d66278d0..3d8917784f 100644 --- a/docs/api/cassandra/cluster.rst +++ b/docs/api/cassandra/cluster.rst @@ -134,6 +134,8 @@ .. autoattribute:: use_client_timestamp + .. autoattribute:: timestamp_generator + .. autoattribute:: encoder .. autoattribute:: client_protocol_handler diff --git a/docs/api/cassandra/timestamps.rst b/docs/api/cassandra/timestamps.rst new file mode 100644 index 0000000000..7c7f534aea --- /dev/null +++ b/docs/api/cassandra/timestamps.rst @@ -0,0 +1,14 @@ +``cassandra.timestamps`` - Timestamp Generation +============================================= + +.. module:: cassandra.timestamps + +.. autoclass:: MonotonicTimestampGenerator (warn_on_drift=True, warning_threshold=0, warning_interval=0) + + .. autoattribute:: warn_on_drift + + .. autoattribute:: warning_threshold + + .. autoattribute:: warning_interval + + .. automethod:: _next_timestamp diff --git a/tests/unit/test_timestamps.py b/tests/unit/test_timestamps.py new file mode 100644 index 0000000000..c2f8b93da8 --- /dev/null +++ b/tests/unit/test_timestamps.py @@ -0,0 +1,272 @@ +# Copyright 2013-2016 DataStax, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +try: + import unittest2 as unittest +except ImportError: + import unittest # noqa + +import mock + +from cassandra import timestamps +import time +from threading import Thread, Lock + +class _TimestampTestMixin(object): + + @mock.patch('cassandra.timestamps.time') + def _call_and_check_results(self, + patched_time_module, + system_time_expected_stamp_pairs, + timestamp_generator=None): + """ + For each element in an iterable of (system_time, expected_timestamp) + pairs, call a :class:`cassandra.timestamps.MonotonicTimestampGenerator` + with system_times as the underlying time.time() result, then assert + that the result is expected_timestamp. Skips the check if + expected_timestamp is None. + """ + patched_time_module.time = mock.Mock() + system_times, expected_timestamps = zip(*system_time_expected_stamp_pairs) + + patched_time_module.time.side_effect = system_times + tsg = timestamp_generator or timestamps.MonotonicTimestampGenerator() + + for expected in expected_timestamps: + actual = tsg() + if expected is not None: + self.assertEqual(actual, expected) + + # assert we patched timestamps.time.time correctly + with self.assertRaises(StopIteration): + tsg() + + +class TestTimestampGeneratorOutput(unittest.TestCase, _TimestampTestMixin): + """ + Mock time.time and test the output of MonotonicTimestampGenerator.__call__ + given different patterns of changing results. + """ + + def test_timestamps_during_and_after_same_system_time(self): + """ + Test that MonotonicTimestampGenerator's output increases by 1 when the + underlying system time is the same, then returns to normal when the + system time increases again. + + @since 3.8.0 + @expected_result Timestamps should increase monotonically over repeated system time. + @test_category timing + """ + self._call_and_check_results( + system_time_expected_stamp_pairs=( + (15.0, 15 * 1e6), + (15.0, 15 * 1e6 + 1), + (15.0, 15 * 1e6 + 2), + (15.01, 15.01 * 1e6)) + ) + + def test_timestamps_during_and_after_backwards_system_time(self): + """ + Test that MonotonicTimestampGenerator's output increases by 1 when the + underlying system time goes backward, then returns to normal when the + system time increases again. + + @since 3.8.0 + @expected_result Timestamps should increase monotonically over system time going backwards. + @test_category timing + """ + self._call_and_check_results( + system_time_expected_stamp_pairs=( + (15.0, 15 * 1e6), + (13.0, 15 * 1e6 + 1), + (14.0, 15 * 1e6 + 2), + (13.5, 15 * 1e6 + 3), + (15.01, 15.01 * 1e6)) + ) + + +class TestTimestampGeneratorLogging(unittest.TestCase): + + def setUp(self): + self.log_patcher = mock.patch('cassandra.timestamps.log') + self.addCleanup(self.log_patcher.stop) + self.patched_timestamp_log = self.log_patcher.start() + + def assertLastCallArgRegex(self, call, pattern): + last_warn_args, last_warn_kwargs = call + self.assertEqual(len(last_warn_args), 1) + self.assertEqual(len(last_warn_kwargs), 0) + self.assertRegexpMatches( + last_warn_args[0], + pattern, + ) + + def test_basic_log_content(self): + """ + Tests there are logs + + @since 3.8.0 + @jira_ticket PYTHON-676 + @expected_result logs + + @test_category timing + """ + tsg = timestamps.MonotonicTimestampGenerator() + #The units of _last_warn is seconds + tsg._last_warn = 12 + + tsg._next_timestamp(20, tsg.last) + self.assertEqual(len(self.patched_timestamp_log.warn.call_args_list), 0) + tsg._next_timestamp(16, tsg.last) + + self.assertEqual(len(self.patched_timestamp_log.warn.call_args_list), 1) + self.assertLastCallArgRegex( + self.patched_timestamp_log.warn.call_args, + r'Clock skew detected:.*\b16\b.*\b4\b.*\b20\b' + ) + + def test_disable_logging(self): + """ + Tests there are no logs when there is a clock skew if logging is disabled + + @since 3.8.0 + @jira_ticket PYTHON-676 + @expected_result no logs + + @test_category timing + """ + no_warn_tsg = timestamps.MonotonicTimestampGenerator(warn_on_drift=False) + + no_warn_tsg.last = 100 + no_warn_tsg._next_timestamp(99, no_warn_tsg.last) + self.assertEqual(len(self.patched_timestamp_log.warn.call_args_list), 0) + + def test_warning_threshold_respected_no_logging(self): + """ + Tests there are no logs if `warning_threshold` is not exceeded + + @since 3.8.0 + @jira_ticket PYTHON-676 + @expected_result no logs + + @test_category timing + """ + tsg = timestamps.MonotonicTimestampGenerator( + warning_threshold=2e-6, + ) + tsg.last, tsg._last_warn = 100, 97 + tsg._next_timestamp(98, tsg.last) + self.assertEqual(len(self.patched_timestamp_log.warn.call_args_list), 0) + + def test_warning_threshold_respected_logs(self): + """ + Tests there are logs if `warning_threshold` is exceeded + + @since 3.8.0 + @jira_ticket PYTHON-676 + @expected_result logs + + @test_category timing + """ + tsg = timestamps.MonotonicTimestampGenerator( + warning_threshold=1e-6 + ) + tsg.last, tsg._last_warn = 100, 97 + tsg._next_timestamp(98, tsg.last) + self.assertEqual(len(self.patched_timestamp_log.warn.call_args_list), 1) + + def test_warning_interval_respected_no_logging(self): + """ + Tests there is only one log in the interval `warning_interval` + + @since 3.8.0 + @jira_ticket PYTHON-676 + @expected_result one log + + @test_category timing + """ + tsg = timestamps.MonotonicTimestampGenerator( + warning_interval=2e-6 + ) + tsg.last = 100 + tsg._next_timestamp(70, tsg.last) + self.assertEqual(len(self.patched_timestamp_log.warn.call_args_list), 1) + + tsg._next_timestamp(71, tsg.last) + self.assertEqual(len(self.patched_timestamp_log.warn.call_args_list), 1) + + def test_warning_interval_respected_logs(self): + """ + Tests there are logs again if the + clock skew happens after`warning_interval` + + @since 3.8.0 + @jira_ticket PYTHON-676 + @expected_result logs + + @test_category timing + """ + tsg = timestamps.MonotonicTimestampGenerator( + warning_interval=1e-6 + ) + tsg.last = 100 + tsg._next_timestamp(70, tsg.last) + self.assertEqual(len(self.patched_timestamp_log.warn.call_args_list), 1) + + tsg._next_timestamp(72, tsg.last) + self.assertEqual(len(self.patched_timestamp_log.warn.call_args_list), 2) + + +class TestTimestampGeneratorMultipleThreads(unittest.TestCase): + + def test_should_generate_incrementing_timestamps_for_all_threads(self): + """ + Tests when time is "stopped", values are assigned incrementally + + @since 3.8.0 + @jira_ticket PYTHON-676 + @expected_result the returned values increase + + @test_category timing + """ + lock = Lock() + + def request_time(): + for _ in range(timestamp_to_generate): + timestamp = tsg() + with lock: + generated_timestamps.append(timestamp) + + tsg = timestamps.MonotonicTimestampGenerator(warning_threshold=1) + fixed_time = 1 + num_threads = 5 + + timestamp_to_generate = 1000 + generated_timestamps = [] + + with mock.patch('time.time', new=mock.Mock(return_value=fixed_time)): + threads = [] + for _ in range(num_threads): + threads.append(Thread(target=request_time)) + + for t in threads: + t.start() + + for t in threads: + t.join() + + self.assertEqual(len(generated_timestamps), num_threads * timestamp_to_generate) + for i, timestamp in enumerate(sorted(generated_timestamps)): + self.assertEqual(int(i + 1e6), timestamp)