Skip to content
This repository was archived by the owner on Apr 27, 2021. It is now read-only.

Commit d51f71c

Browse files
authored
Merge pull request apache#594 from datastax/569
PYTHON-569 - optional execution profiles in core driver
2 parents 7c9a4e0 + a4199dc commit d51f71c

9 files changed

Lines changed: 841 additions & 132 deletions

File tree

cassandra/cluster.py

Lines changed: 372 additions & 97 deletions
Large diffs are not rendered by default.

cassandra/policies.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -150,12 +150,11 @@ class RoundRobinPolicy(LoadBalancingPolicy):
150150
This load balancing policy is used by default.
151151
"""
152152
_live_hosts = frozenset(())
153+
_position = 0
153154

154155
def populate(self, cluster, hosts):
155156
self._live_hosts = frozenset(hosts)
156-
if len(hosts) <= 1:
157-
self._position = 0
158-
else:
157+
if len(hosts) > 1:
159158
self._position = randint(0, len(hosts) - 1)
160159

161160
def distance(self, host):

docs/api/cassandra/cluster.rst

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
.. autoattribute:: reconnection_policy
2323

2424
.. autoattribute:: default_retry_policy
25+
:annotation: = <cassandra.policies.RetryPolicy object>
2526

2627
.. autoattribute:: conviction_policy_factory
2728

@@ -73,6 +74,8 @@
7374

7475
.. automethod:: unregister_listener
7576

77+
.. automethod:: add_execution_profile
78+
7679
.. automethod:: set_max_requests_per_connection
7780

7881
.. automethod:: get_max_requests_per_connection
@@ -105,17 +108,25 @@
105108

106109
.. automethod:: set_meta_refresh_enabled
107110

111+
.. autoclass:: ExecutionProfile
112+
:members:
113+
114+
.. autodata:: EXEC_PROFILE_DEFAULT
115+
:annotation:
108116

109117
.. autoclass:: Session ()
110118

111119
.. autoattribute:: default_timeout
120+
:annotation: = 10.0
112121

113122
.. autoattribute:: default_consistency_level
114123
:annotation: = LOCAL_ONE
115124

116125
.. autoattribute:: default_serial_consistency_level
126+
:annotation: = None
117127

118128
.. autoattribute:: row_factory
129+
:annotation: = <function named_tuple_factory>
119130

120131
.. autoattribute:: default_fetch_size
121132

@@ -135,6 +146,8 @@
135146

136147
.. automethod:: set_keyspace(keyspace)
137148

149+
.. automethod:: execution_profile_clone_update
150+
138151
.. autoclass:: ResponseFuture ()
139152

140153
.. autoattribute:: query

docs/execution_profiles.rst

Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
Execution Profiles (experimental)
2+
=================================
3+
4+
Execution profiles are an experimental API aimed at making it easier to execute requests in different ways within
5+
a single connected ``Session``. Execution profiles are being introduced to deal with the exploding number of
6+
configuration options, especially as the database platform evolves more complex workloads.
7+
8+
The Execution Profile API is being introduced now, in an experimental capacity in order to take advantage of it in
9+
existing projects, and to gauge interest and feedback in the community. For now, the legacy configuration remains
10+
intact, but legacy and Execution Profile APIs cannot be used simultaneously on the same client ``Cluster``.
11+
12+
This document explains how Execution Profiles relate to existing settings, and shows how to use the new profiles for
13+
request execution.
14+
15+
Mapping Legacy Parameters to Profiles
16+
-------------------------------------
17+
18+
Execution profiles can inherit from :class:`.cluster.ExecutionProfile`, and currently provide the following options,
19+
previously input from the noted attributes:
20+
21+
- load_balancing_policy - :attr:`.Cluster.load_balancing_policy`
22+
- request_timeout - :attr:`.Session.default_timeout`, optional :meth:`.Session.execute` parameter
23+
- retry_policy - :attr:`.Cluster.default_retry_policy`, optional :attr:`.Statement.retry_policy` attribute
24+
- consistency_level - :attr:`.Session.default_consistency_level`, optional :attr:`.Statement.consistency_level` attribute
25+
- serial_consistency_level - :attr:`.Session.default_serial_consistency_level`, optional :attr:`.Statement.serial_consistency_level` attribute
26+
- row_factory - :attr:`.Session.row_factory` attribute
27+
28+
When using the new API, these parameters can be defined by instances of :class:`.cluster.ExecutionProfile`.
29+
30+
Using Execution Profiles
31+
------------------------
32+
Default
33+
~~~~~~~
34+
35+
.. code:: python
36+
37+
from cassandra.cluster import Cluster
38+
cluster = Cluster()
39+
session = cluster.connect()
40+
local_query = 'SELECT rpc_address FROM system.local'
41+
for _ in cluster.metadata.all_hosts():
42+
print session.execute(local_query)[0]
43+
44+
45+
.. parsed-literal::
46+
47+
Row(rpc_address='127.0.0.2')
48+
Row(rpc_address='127.0.0.1')
49+
50+
51+
The default execution profile is built from Cluster parameters and default Session attributes. This profile matches existing default
52+
parameters.
53+
54+
Initializing cluster with profiles
55+
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
56+
57+
.. code:: python
58+
59+
from cassandra.cluster import ExecutionProfile
60+
from cassandra.policies import WhiteListRoundRobinPolicy
61+
62+
node1_profile = ExecutionProfile(load_balancing_policy=WhiteListRoundRobinPolicy(['127.0.0.1']))
63+
node2_profile = ExecutionProfile(load_balancing_policy=WhiteListRoundRobinPolicy(['127.0.0.2']))
64+
65+
profiles = {'node1': node1_profile, 'node2': node2_profile}
66+
session = Cluster(execution_profiles=profiles).connect()
67+
for _ in cluster.metadata.all_hosts():
68+
print session.execute(local_query, execution_profile='node1')[0]
69+
70+
71+
.. parsed-literal::
72+
73+
Row(rpc_address='127.0.0.1')
74+
Row(rpc_address='127.0.0.1')
75+
76+
77+
.. code:: python
78+
79+
for _ in cluster.metadata.all_hosts():
80+
print session.execute(local_query, execution_profile='node2')[0]
81+
82+
83+
.. parsed-literal::
84+
85+
Row(rpc_address='127.0.0.2')
86+
Row(rpc_address='127.0.0.2')
87+
88+
89+
.. code:: python
90+
91+
for _ in cluster.metadata.all_hosts():
92+
print session.execute(local_query)[0]
93+
94+
95+
.. parsed-literal::
96+
97+
Row(rpc_address='127.0.0.2')
98+
Row(rpc_address='127.0.0.1')
99+
100+
Note that, even when custom profiles are injected, the default ``TokenAwarePolicy(DCAwareRoundRobinPolicy())`` is still
101+
present. To override the default, specify a policy with the :data:`~.cluster.EXEC_PROFILE_DEFAULT` key.
102+
103+
.. code:: python
104+
105+
from cassandra.cluster import EXEC_PROFILE_DEFAULT
106+
profile = ExecutionProfile(request_timeout=30)
107+
cluster = Cluster(execution_profiles={EXEC_PROFILE_DEFAULT: profile})
108+
109+
110+
Adding named profiles
111+
~~~~~~~~~~~~~~~~~~~~~
112+
113+
New profiles can be added constructing from scratch, or deriving from default:
114+
115+
.. code:: python
116+
117+
locked_execution = ExecutionProfile(load_balancing_policy=WhiteListRoundRobinPolicy(['127.0.0.1']))
118+
node1_profile = 'node1_whitelist'
119+
cluster.add_execution_profile(node1_profile, locked_execution)
120+
121+
for _ in cluster.metadata.all_hosts():
122+
print session.execute(local_query, execution_profile=node1_profile)[0]
123+
124+
125+
.. parsed-literal::
126+
127+
Row(rpc_address='127.0.0.1')
128+
Row(rpc_address='127.0.0.1')
129+
130+
See :meth:`.Cluster.add_execution_profile` for details and optional parameters.
131+
132+
Passing a profile instance without mapping
133+
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
134+
135+
We also have the ability to pass profile instances to be used for execution, but not added to the mapping:
136+
137+
.. code:: python
138+
139+
from cassandra.query import tuple_factory
140+
141+
tmp = session.execution_profile_clone_update('node1', request_timeout=100, row_factory=tuple_factory)
142+
143+
print session.execute(local_query, execution_profile=tmp)[0]
144+
print session.execute(local_query, execution_profile='node1')[0]
145+
146+
.. parsed-literal::
147+
148+
('127.0.0.1',)
149+
Row(rpc_address='127.0.0.1')
150+
151+
The new profile is a shallow copy, so the ``tmp`` profile shares a load balancing policy with one managed by the cluster.
152+
If reference objects are to be updated in the clone, one would typically set those attributes to a new instance.

docs/index.rst

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ Contents
1616
How to install the driver.
1717

1818
:doc:`getting_started`
19-
A guide through the first steps of connecting to Cassandra and executing queries.
19+
A guide through the first steps of connecting to Cassandra and executing queries
2020

2121
:doc:`object_mapper`
2222
Introduction to the integrated object mapper, cqlengine
@@ -25,22 +25,25 @@ Contents
2525
The API documentation.
2626

2727
:doc:`upgrading`
28-
A guide to upgrading versions of the driver.
28+
A guide to upgrading versions of the driver
29+
30+
:doc:`execution_profiles`
31+
An introduction to a more flexible way of configuring request execution
2932

3033
:doc:`performance`
3134
Tips for getting good performance.
3235

3336
:doc:`query_paging`
34-
Notes on paging large query results.
37+
Notes on paging large query results
3538

3639
:doc:`lwt`
3740
Working with results of conditional requests
3841

3942
:doc:`user_defined_types`
40-
Working with Cassandra 2.1's user-defined types.
43+
Working with Cassandra 2.1's user-defined types
4144

4245
:doc:`security`
43-
An overview of the security features of the driver.
46+
An overview of the security features of the driver
4447

4548
:doc:`dates_and_times`
4649
Some discussion on the driver's approach to working with timestamp, date, time types
@@ -55,6 +58,7 @@ Contents
5558
installation
5659
getting_started
5760
upgrading
61+
execution_profiles
5862
performance
5963
query_paging
6064
lwt

tests/integration/standard/test_cluster.py

Lines changed: 73 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,19 +18,20 @@
1818
import unittest # noqa
1919

2020
from collections import deque
21+
from copy import copy
2122
from mock import patch
2223
import time
2324
from uuid import uuid4
2425
import logging
2526

2627
import cassandra
27-
from cassandra.cluster import Cluster, NoHostAvailable
28+
from cassandra.cluster import Cluster, NoHostAvailable, ExecutionProfile, EXEC_PROFILE_DEFAULT
2829
from cassandra.concurrent import execute_concurrent
2930
from cassandra.policies import (RoundRobinPolicy, ExponentialReconnectionPolicy,
3031
RetryPolicy, SimpleConvictionPolicy, HostDistance,
3132
WhiteListRoundRobinPolicy, AddressTranslator)
3233
from cassandra.protocol import MAX_SUPPORTED_VERSION
33-
from cassandra.query import SimpleStatement, TraceUnavailable
34+
from cassandra.query import SimpleStatement, TraceUnavailable, tuple_factory
3435

3536
from tests.integration import use_singledc, PROTOCOL_VERSION, get_server_versions, get_node, CASSANDRA_VERSION, execute_until_pass, execute_with_long_wait_retry, get_node, MockLoggingHandler
3637
from tests.integration.util import assert_quiescent_pool_state
@@ -618,6 +619,76 @@ def test_pool_management(self):
618619

619620
cluster.shutdown()
620621

622+
def test_profile_load_balancing(self):
623+
query = "select release_version from system.local"
624+
node1 = ExecutionProfile(load_balancing_policy=WhiteListRoundRobinPolicy(['127.0.0.1']))
625+
with Cluster(execution_profiles={'node1': node1}) as cluster:
626+
session = cluster.connect()
627+
628+
# default is DCA RR for all hosts
629+
expected_hosts = set(cluster.metadata.all_hosts())
630+
queried_hosts = set()
631+
for _ in expected_hosts:
632+
rs = session.execute(query)
633+
queried_hosts.add(rs.response_future._current_host)
634+
self.assertEqual(queried_hosts, expected_hosts)
635+
636+
# by name we should only hit the one
637+
expected_hosts = set(h for h in cluster.metadata.all_hosts() if h.address == '127.0.0.1')
638+
queried_hosts = set()
639+
for _ in cluster.metadata.all_hosts():
640+
rs = session.execute(query, execution_profile='node1')
641+
queried_hosts.add(rs.response_future._current_host)
642+
self.assertEqual(queried_hosts, expected_hosts)
643+
644+
# use a copied instance and override the row factory
645+
# assert last returned value can be accessed as a namedtuple so we can prove something different
646+
named_tuple_row = rs[0]
647+
self.assertIsInstance(named_tuple_row, tuple)
648+
self.assertTrue(named_tuple_row.release_version)
649+
650+
tmp_profile = copy(node1)
651+
tmp_profile.row_factory = tuple_factory
652+
queried_hosts = set()
653+
for _ in cluster.metadata.all_hosts():
654+
rs = session.execute(query, execution_profile=tmp_profile)
655+
queried_hosts.add(rs.response_future._current_host)
656+
self.assertEqual(queried_hosts, expected_hosts)
657+
tuple_row = rs[0]
658+
self.assertIsInstance(tuple_row, tuple)
659+
with self.assertRaises(AttributeError):
660+
tuple_row.release_version
661+
662+
# make sure original profile is not impacted
663+
self.assertTrue(session.execute(query, execution_profile='node1')[0].release_version)
664+
665+
def test_profile_pool_management(self):
666+
node1 = ExecutionProfile(load_balancing_policy=WhiteListRoundRobinPolicy(['127.0.0.1']))
667+
node2 = ExecutionProfile(load_balancing_policy=WhiteListRoundRobinPolicy(['127.0.0.2']))
668+
with Cluster(execution_profiles={EXEC_PROFILE_DEFAULT: node1, 'node2': node2}) as cluster:
669+
session = cluster.connect()
670+
pools = session.get_pool_state()
671+
# there are more hosts, but we connected to the ones in the lbp aggregate
672+
self.assertGreater(len(cluster.metadata.all_hosts()), 2)
673+
self.assertEqual(set(h.address for h in pools), set(('127.0.0.1', '127.0.0.2')))
674+
675+
# dynamically update pools on add
676+
node3 = ExecutionProfile(load_balancing_policy=WhiteListRoundRobinPolicy(['127.0.0.3']))
677+
cluster.add_execution_profile('node3', node3)
678+
pools = session.get_pool_state()
679+
self.assertEqual(set(h.address for h in pools), set(('127.0.0.1', '127.0.0.2', '127.0.0.3')))
680+
681+
def test_add_profile_timeout(self):
682+
node1 = ExecutionProfile(load_balancing_policy=WhiteListRoundRobinPolicy(['127.0.0.1']))
683+
with Cluster(execution_profiles={EXEC_PROFILE_DEFAULT: node1}) as cluster:
684+
session = cluster.connect()
685+
pools = session.get_pool_state()
686+
self.assertGreater(len(cluster.metadata.all_hosts()), 2)
687+
self.assertEqual(set(h.address for h in pools), set(('127.0.0.1',)))
688+
689+
node2 = ExecutionProfile(load_balancing_policy=WhiteListRoundRobinPolicy(['127.0.0.2']))
690+
self.assertRaises(cassandra.OperationTimedOut, cluster.add_execution_profile, 'node2', node2, pool_wait_timeout=0.0000001)
691+
621692

622693
class LocalHostAdressTranslator(AddressTranslator):
623694

0 commit comments

Comments
 (0)