Skip to content

Commit 6c1d4f6

Browse files
authored
Merge pull request apache#681 from mambocab/timestamp-generator
Timestamp generator
2 parents 14093fc + 0c25970 commit 6c1d4f6

5 files changed

Lines changed: 427 additions & 2 deletions

File tree

cassandra/cluster.py

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@
6969
from cassandra.query import (SimpleStatement, PreparedStatement, BoundStatement,
7070
BatchStatement, bind_params, QueryTrace,
7171
named_tuple_factory, dict_factory, tuple_factory, FETCH_SIZE_UNSET)
72+
from cassandra.timestamps import MonotonicTimestampGenerator
7273

7374

7475
def _is_eventlet_monkey_patched():
@@ -743,7 +744,8 @@ def __init__(self,
743744
prepare_on_all_hosts=True,
744745
reprepare_on_up=True,
745746
execution_profiles=None,
746-
allow_beta_protocol_version=False):
747+
allow_beta_protocol_version=False,
748+
timestamp_generator=None):
747749
"""
748750
``executor_threads`` defines the number of threads in a pool for handling asynchronous tasks such as
749751
extablishing connection pools or refreshing metadata.
@@ -802,6 +804,13 @@ def __init__(self,
802804
if connection_class is not None:
803805
self.connection_class = connection_class
804806

807+
if timestamp_generator is not None:
808+
if not callable(timestamp_generator):
809+
raise ValueError("timestamp_generator must be callable")
810+
self.timestamp_generator = timestamp_generator
811+
else:
812+
self.timestamp_generator = MonotonicTimestampGenerator()
813+
805814
self.profile_manager = ProfileManager()
806815
self.profile_manager.profiles[EXEC_PROFILE_DEFAULT] = ExecutionProfile(self.load_balancing_policy,
807816
self.default_retry_policy,
@@ -1866,6 +1875,27 @@ def default_serial_consistency_level(self, cl):
18661875
.. versionadded:: 2.1.0
18671876
"""
18681877

1878+
timestamp_generator = None
1879+
"""
1880+
When :attr:`use_client_timestamp` is set, sessions call this object and use
1881+
the result as the timestamp. (Note that timestamps specified within a CQL
1882+
query will override this timestamp.) By default, a new
1883+
:class:`~.MonotonicTimestampGenerator` is created for
1884+
each :class:`Cluster` instance.
1885+
1886+
Applications can set this value for custom timestamp behavior. For
1887+
example, an application could share a timestamp generator across
1888+
:class:`Cluster` objects to guarantee that the application will use unique,
1889+
increasing timestamps across clusters, or set it to to ``lambda:
1890+
int(time.time() * 1e6)`` if losing records over clock inconsistencies is
1891+
acceptable for the application. Custom :attr:`timestamp_generator` s should
1892+
be callable, and calling them should return an integer representing seconds
1893+
since some point in time, typically UNIX epoch.
1894+
1895+
.. versionadded:: 3.8.0
1896+
"""
1897+
1898+
18691899
encoder = None
18701900
"""
18711901
A :class:`~cassandra.encoder.Encoder` instance that will be used when
@@ -2058,7 +2088,7 @@ def _create_response_future(self, query, parameters, trace, custom_payload, time
20582088

20592089
start_time = time.time()
20602090
if self._protocol_version >= 3 and self.use_client_timestamp:
2061-
timestamp = int(start_time * 1e6)
2091+
timestamp = self.cluster.timestamp_generator()
20622092
else:
20632093
timestamp = None
20642094

cassandra/timestamps.py

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
# Copyright 2013-2016 DataStax, Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""
16+
This module contains utilities for generating timestamps for client-side
17+
timestamp specification.
18+
"""
19+
20+
import logging
21+
import time
22+
from threading import Lock
23+
24+
log = logging.getLogger(__name__)
25+
26+
class MonotonicTimestampGenerator(object):
27+
"""
28+
An object that, when called, returns ``int(time.time() * 1e6)`` when
29+
possible, but, if the value returned by ``time.time`` doesn't increase,
30+
drifts into the future and logs warnings.
31+
Exposed configuration attributes can be configured with arguments to
32+
``__init__`` or by changing attributes on an initialized object.
33+
34+
.. versionadded:: 3.8.0
35+
"""
36+
37+
warn_on_drift = True
38+
"""
39+
If true, log warnings when timestamps drift into the future as allowed by
40+
:attr:`warning_threshold` and :attr:`warning_interval`.
41+
"""
42+
43+
warning_threshold = 0
44+
"""
45+
This object will only issue warnings when the returned timestamp drifts
46+
more than ``warning_threshold`` seconds into the future.
47+
"""
48+
49+
warning_interval = 0
50+
"""
51+
This object will only issue warnings every ``warning_interval`` seconds.
52+
"""
53+
54+
def __init__(self, warn_on_drift=True, warning_threshold=0, warning_interval=0):
55+
self.lock = Lock()
56+
with self.lock:
57+
self.last = 0
58+
self._last_warn = 0
59+
self.warn_on_drift = warn_on_drift
60+
self.warning_threshold = warning_threshold
61+
self.warning_interval = warning_interval
62+
63+
def _next_timestamp(self, now, last):
64+
"""
65+
Returns the timestamp that should be used if ``now`` is the current
66+
time and ``last`` is the last timestamp returned by this object.
67+
Intended for internal and testing use only; to generate timestamps,
68+
call an instantiated ``MonotonicTimestampGenerator`` object.
69+
70+
:param int now: an integer to be used as the current time, typically
71+
representing the current time in seconds since the UNIX epoch
72+
:param int last: an integer representing the last timestamp returned by
73+
this object
74+
"""
75+
if now > last:
76+
self.last = now
77+
return now
78+
else:
79+
self._maybe_warn(now=now)
80+
self.last = last + 1
81+
return self.last
82+
83+
def __call__(self):
84+
"""
85+
Makes ``MonotonicTimestampGenerator`` objects callable; defers
86+
internally to _next_timestamp.
87+
"""
88+
with self.lock:
89+
return self._next_timestamp(now=int(time.time() * 1e6),
90+
last=self.last)
91+
92+
def _maybe_warn(self, now):
93+
# should be called from inside the self.lock.
94+
diff = self.last - now
95+
since_last_warn = now - self._last_warn
96+
97+
warn = (self.warn_on_drift and
98+
(diff > self.warning_threshold * 1e6) and
99+
(since_last_warn >= self.warning_interval * 1e6))
100+
if warn:
101+
log.warn(
102+
"Clock skew detected: current tick ({now}) was {diff} "
103+
"microseconds behind the last generated timestamp "
104+
"({last}), returned timestamps will be artificially "
105+
"incremented to guarantee monotonicity.".format(
106+
now=now, diff=diff, last=self.last))
107+
self._last_warn = now

docs/api/cassandra/cluster.rst

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,8 @@
134134

135135
.. autoattribute:: use_client_timestamp
136136

137+
.. autoattribute:: timestamp_generator
138+
137139
.. autoattribute:: encoder
138140

139141
.. autoattribute:: client_protocol_handler

docs/api/cassandra/timestamps.rst

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
``cassandra.timestamps`` - Timestamp Generation
2+
=============================================
3+
4+
.. module:: cassandra.timestamps
5+
6+
.. autoclass:: MonotonicTimestampGenerator (warn_on_drift=True, warning_threshold=0, warning_interval=0)
7+
8+
.. autoattribute:: warn_on_drift
9+
10+
.. autoattribute:: warning_threshold
11+
12+
.. autoattribute:: warning_interval
13+
14+
.. automethod:: _next_timestamp

0 commit comments

Comments
 (0)