Skip to content

Commit e3fd262

Browse files
committed
DataStax Cloud enablement
1 parent a06f4b9 commit e3fd262

13 files changed

Lines changed: 595 additions & 2 deletions

File tree

CHANGELOG.rst

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ Features
66
--------
77
* Add SNIEndPoint support (PYTHON-1084)
88
* Add a new Endpoint type to support unix sockets (PYTHON-1098)
9+
* DataStax Cloud enablement (PYTHON-1075)
10+
* Add creds.zip support (PYTHON-1097)
911

1012
3.19.0
1113
======

build.yaml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,9 @@ build:
164164
pip install nose-exclude
165165
pip install service_identity
166166
167+
# Required for DBaaS and https+ssl cert
168+
pip install pyopenssl ndg-httpsclient pyasn1
169+
167170
FORCE_CYTHON=False
168171
if [[ $CYTHON == 'CYTHON' ]]; then
169172
FORCE_CYTHON=True
@@ -211,9 +214,13 @@ build:
211214
echo "==========RUNNING INTEGRATION TESTS=========="
212215
EVENT_LOOP_MANAGER=$EVENT_LOOP_MANAGER CCM_ARGS="$CCM_ARGS" CASSANDRA_VERSION=$CCM_CASSANDRA_VERSION MAPPED_CASSANDRA_VERSION=$MAPPED_CASSANDRA_VERSION VERIFY_CYTHON=$FORCE_CYTHON nosetests -s -v --logging-format="[%(levelname)s] %(asctime)s %(thread)d: %(message)s" --with-ignore-docstrings --with-xunit --xunit-file=standard_results.xml tests/integration/standard/ || true
213216
217+
echo "==========RUNNING ADVANCED AND CLOUD TESTS=========="
218+
EVENT_LOOP_MANAGER=$EVENT_LOOP_MANAGER CLOUD_PROXY_PATH="$HOME/proxy/" CASSANDRA_VERSION=$CCM_CASSANDRA_VERSION MAPPED_CASSANDRA_VERSION=$MAPPED_CASSANDRA_VERSION VERIFY_CYTHON=$FORCE_CYTHON nosetests -s -v --logging-format="[%(levelname)s] %(asctime)s %(thread)d: %(message)s" --with-ignore-docstrings --with-xunit --xunit-file=advanced_results.xml tests/integration/advanced/ || true
219+
214220
if [ -z "$EXCLUDE_LONG" ]; then
215221
echo "==========RUNNING LONG INTEGRATION TESTS=========="
216222
EVENT_LOOP_MANAGER=$EVENT_LOOP_MANAGER CCM_ARGS="$CCM_ARGS" CASSANDRA_VERSION=$CCM_CASSANDRA_VERSION MAPPED_CASSANDRA_VERSION=$MAPPED_CASSANDRA_VERSION VERIFY_CYTHON=$FORCE_CYTHON nosetests -s -v --logging-format="[%(levelname)s] %(asctime)s %(thread)d: %(message)s" --exclude-dir=tests/integration/long/upgrade --with-ignore-docstrings --with-xunit --xunit-file=long_results.xml tests/integration/long/ || true
217223
fi
224+
218225
- xunit:
219226
- "*_results.xml"

cassandra/cloud/__init__.py

Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
# Copyright DataStax, Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import os
16+
import logging
17+
import json
18+
import tempfile
19+
import shutil
20+
from six.moves.urllib.request import urlopen
21+
from ssl import SSLContext, PROTOCOL_TLSv1, CERT_REQUIRED
22+
from zipfile import ZipFile
23+
24+
# 2.7 vs 3.x
25+
try:
26+
from zipfile import BadZipFile
27+
except:
28+
from zipfile import BadZipfile as BadZipFile
29+
30+
from cassandra import DriverException
31+
32+
log = logging.getLogger(__name__)
33+
34+
__all__ = ['get_cloud_config']
35+
36+
37+
class CloudConfig(object):
38+
39+
username = None
40+
password = None
41+
host = None
42+
port = None
43+
keyspace = None
44+
local_dc = None
45+
ssl_context = None
46+
47+
sni_host = None
48+
sni_port = None
49+
host_ids = None
50+
51+
@classmethod
52+
def from_dict(cls, d):
53+
c = cls()
54+
55+
c.port = d.get('port', None)
56+
try:
57+
c.port = int(d['port'])
58+
except:
59+
pass
60+
61+
c.username = d.get('username', None)
62+
c.password = d.get('password', None)
63+
c.host = d.get('host', None)
64+
c.keyspace = d.get('keyspace', None)
65+
c.local_dc = d.get('localDC', None)
66+
67+
return c
68+
69+
70+
def get_cloud_config(cloud_config):
71+
if 'secure_connect_bundle' not in cloud_config:
72+
raise ValueError("The cloud config doesn't have a secure_connect_bundle specified.")
73+
74+
try:
75+
config = read_cloud_config_from_zip(cloud_config)
76+
except BadZipFile:
77+
raise ValueError("Unable to open the zip file for the cloud config. Check your secure connect bundle.")
78+
79+
return read_metadata_info(config, cloud_config)
80+
81+
82+
def read_cloud_config_from_zip(cloud_config):
83+
secure_bundle = cloud_config['secure_connect_bundle']
84+
with ZipFile(secure_bundle) as zipfile:
85+
base_dir = os.path.dirname(secure_bundle)
86+
tmp_dir = tempfile.mkdtemp(dir=base_dir)
87+
try:
88+
zipfile.extractall(path=tmp_dir)
89+
return parse_cloud_config(os.path.join(tmp_dir, 'config.json'), cloud_config)
90+
finally:
91+
shutil.rmtree(tmp_dir)
92+
93+
94+
def parse_cloud_config(path, cloud_config):
95+
with open(path, 'r') as stream:
96+
data = json.load(stream)
97+
98+
config = CloudConfig.from_dict(data)
99+
config_dir = os.path.dirname(path)
100+
101+
if 'ssl_context' in cloud_config:
102+
config.ssl_context = cloud_config['ssl_context']
103+
else:
104+
# Load the ssl_context before we delete the temporary directory
105+
ca_cert_location = os.path.join(config_dir, 'ca.crt')
106+
cert_location = os.path.join(config_dir, 'cert')
107+
key_location = os.path.join(config_dir, 'key')
108+
config.ssl_context = _ssl_context_from_cert(ca_cert_location, cert_location, key_location)
109+
110+
return config
111+
112+
113+
def read_metadata_info(config, cloud_config):
114+
url = "https://{}:{}/metadata".format(config.host, config.port)
115+
timeout = cloud_config['connect_timeout'] if 'connect_timeout' in cloud_config else 5
116+
try:
117+
response = urlopen(url, context=config.ssl_context, timeout=timeout)
118+
except Exception as e:
119+
log.exception(e)
120+
raise DriverException("Unable to connect to the metadata service at %s" % url)
121+
122+
if response.code != 200:
123+
raise DriverException(("Error while fetching the metadata at: %s. "
124+
"The service returned error code %d." % (url, response.code)))
125+
return parse_metadata_info(config, response.read().decode('utf-8'))
126+
127+
128+
def parse_metadata_info(config, http_data):
129+
try:
130+
data = json.loads(http_data)
131+
except:
132+
msg = "Failed to load cluster metadata"
133+
raise DriverException(msg)
134+
135+
contact_info = data['contact_info']
136+
config.local_dc = contact_info['local_dc']
137+
138+
proxy_info = contact_info['sni_proxy_address'].split(':')
139+
config.sni_host = proxy_info[0]
140+
try:
141+
config.sni_port = int(proxy_info[1])
142+
except:
143+
config.sni_port = 9042
144+
145+
config.host_ids = [host_id for host_id in contact_info['contact_points']]
146+
147+
return config
148+
149+
150+
def _ssl_context_from_cert(ca_cert_location, cert_location, key_location):
151+
ssl_context = SSLContext(PROTOCOL_TLSv1)
152+
ssl_context.load_verify_locations(ca_cert_location)
153+
ssl_context.verify_mode = CERT_REQUIRED
154+
ssl_context.load_cert_chain(certfile=cert_location, keyfile=key_location)
155+
156+
return ssl_context

cassandra/cluster.py

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
"""
1919
from __future__ import absolute_import
2020

21+
import os
2122
import atexit
2223
from collections import defaultdict
2324
from concurrent.futures import ThreadPoolExecutor, FIRST_COMPLETED, wait as wait_futures
@@ -45,9 +46,11 @@
4546
OperationTimedOut, UnsupportedOperation,
4647
SchemaTargetType, DriverException, ProtocolVersion,
4748
UnresolvableContactPoints)
49+
from cassandra.auth import PlainTextAuthProvider
4850
from cassandra.connection import (ConnectionException, ConnectionShutdown,
4951
ConnectionHeartbeat, ProtocolVersionUnsupported,
50-
EndPoint, DefaultEndPoint, DefaultEndPointFactory)
52+
EndPoint, DefaultEndPoint, DefaultEndPointFactory,
53+
SniEndPointFactory)
5154
from cassandra.cqltypes import UserType
5255
from cassandra.encoder import Encoder
5356
from cassandra.protocol import (QueryMessage, ResultMessage,
@@ -75,6 +78,7 @@
7578
named_tuple_factory, dict_factory, tuple_factory, FETCH_SIZE_UNSET)
7679
from cassandra.timestamps import MonotonicTimestampGenerator
7780
from cassandra.compat import Mapping
81+
from cassandra import cloud as dscloud
7882

7983

8084
def _is_eventlet_monkey_patched():
@@ -779,6 +783,18 @@ def default_retry_policy(self, policy):
779783
documentation for :meth:`Session.timestamp_generator`.
780784
"""
781785

786+
cloud = None
787+
"""
788+
A dict of the cloud configuration. Example::
789+
{
790+
# path to the secure connect bundle
791+
'secure_connect_bundle': '/path/to/secure-connect-dbname.zip'
792+
}
793+
794+
The zip file will be temporarily extracted in the same directory to
795+
load the configuration and certificates.
796+
"""
797+
782798
@property
783799
def schema_metadata_enabled(self):
784800
"""
@@ -874,13 +890,36 @@ def __init__(self,
874890
idle_heartbeat_timeout=30,
875891
no_compact=False,
876892
ssl_context=None,
877-
endpoint_factory=None):
893+
endpoint_factory=None,
894+
cloud=None):
878895
"""
879896
``executor_threads`` defines the number of threads in a pool for handling asynchronous tasks such as
880897
extablishing connection pools or refreshing metadata.
881898
882899
Any of the mutable Cluster attributes may be set as keyword arguments to the constructor.
883900
"""
901+
902+
if cloud is not None:
903+
if contact_points is not _NOT_SET or endpoint_factory:
904+
raise ValueError(("contact_points and endpoint_factory"
905+
"cannot be specified with a cloud configuration"))
906+
907+
if ssl_context:
908+
cloud['ssl_context'] = ssl_context
909+
910+
cloud_config = dscloud.get_cloud_config(cloud)
911+
912+
ssl_context = cloud_config.ssl_context
913+
if (auth_provider is None and cloud_config.username
914+
and cloud_config.password):
915+
auth_provider = PlainTextAuthProvider(cloud_config.username, cloud_config.password)
916+
917+
endpoint_factory = SniEndPointFactory(cloud_config.sni_host, cloud_config.sni_port)
918+
contact_points = [
919+
endpoint_factory.create_from_sni(host_id)
920+
for host_id in cloud_config.host_ids
921+
]
922+
884923
if contact_points is not None:
885924
if contact_points is _NOT_SET:
886925
self._contact_points_explicit = False

docs/api/cassandra/cluster.rst

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,8 @@
7272

7373
.. autoattribute:: endpoint_factory
7474

75+
.. autoattribute:: cloud
76+
7577
.. automethod:: connect
7678

7779
.. automethod:: shutdown

docs/getting_started.rst

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -403,3 +403,13 @@ level on that:
403403
user3_lookup = user_lookup_stmt.bind([user_id3])
404404
user3_lookup.consistency_level = ConsistencyLevel.ALL
405405
user3 = session.execute(user3_lookup)
406+
407+
Connecting to DataStax Cloud
408+
----------------------------
409+
1. Download the secure connect bundle from your DataStax Constellation account.
410+
2. Connect to your cloud cluster with::
411+
cloud_config = {
412+
'secure_connect_bundle': '/path/to/secure-connect-dbname.zip'
413+
}
414+
cluster = Cluster(cloud=cloud_config)
415+
session = cluster.connect()

tests/integration/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@ def _get_dse_version_from_cass(cass_version):
122122
USE_CASS_EXTERNAL = bool(os.getenv('USE_CASS_EXTERNAL', False))
123123
KEEP_TEST_CLUSTER = bool(os.getenv('KEEP_TEST_CLUSTER', False))
124124
SIMULACRON_JAR = os.getenv('SIMULACRON_JAR', None)
125+
CLOUD_PROXY_PATH = os.getenv('CLOUD_PROXY_PATH', None)
125126

126127
CASSANDRA_IP = os.getenv('CASSANDRA_IP', '127.0.0.1')
127128
CASSANDRA_DIR = os.getenv('CASSANDRA_DIR', None)
@@ -275,6 +276,7 @@ def _id_and_mark(f):
275276
requiresmallclockgranularity = unittest.skipIf("Windows" in platform.system() or "asyncore" in EVENT_LOOP_MANAGER,
276277
"This test is not suitible for environments with large clock granularity")
277278
requiressimulacron = unittest.skipIf(SIMULACRON_JAR is None or CASSANDRA_VERSION < Version("2.1"), "Simulacron jar hasn't been specified or C* version is 2.0")
279+
requirescloudproxy = unittest.skipIf(CLOUD_PROXY_PATH is None, "Cloud Proxy path hasn't been specified")
278280

279281

280282
def wait_for_node_socket(node, timeout):

0 commit comments

Comments
 (0)