|
| 1 | +# Copyright DataStax, Inc. |
| 2 | +# |
| 3 | +# Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | +# you may not use this file except in compliance with the License. |
| 5 | +# You may obtain a copy of the License at |
| 6 | +# |
| 7 | +# http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | +# |
| 9 | +# Unless required by applicable law or agreed to in writing, software |
| 10 | +# distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | +# See the License for the specific language governing permissions and |
| 13 | +# limitations under the License. |
| 14 | + |
| 15 | +import os |
| 16 | +import logging |
| 17 | +import json |
| 18 | +import tempfile |
| 19 | +import shutil |
| 20 | +from six.moves.urllib.request import urlopen |
| 21 | +from ssl import SSLContext, PROTOCOL_TLSv1, CERT_REQUIRED |
| 22 | +from zipfile import ZipFile |
| 23 | + |
| 24 | +# 2.7 vs 3.x |
| 25 | +try: |
| 26 | + from zipfile import BadZipFile |
| 27 | +except: |
| 28 | + from zipfile import BadZipfile as BadZipFile |
| 29 | + |
| 30 | +from cassandra import DriverException |
| 31 | + |
| 32 | +log = logging.getLogger(__name__) |
| 33 | + |
| 34 | +__all__ = ['get_cloud_config'] |
| 35 | + |
| 36 | + |
| 37 | +class CloudConfig(object): |
| 38 | + |
| 39 | + username = None |
| 40 | + password = None |
| 41 | + host = None |
| 42 | + port = None |
| 43 | + keyspace = None |
| 44 | + local_dc = None |
| 45 | + ssl_context = None |
| 46 | + |
| 47 | + sni_host = None |
| 48 | + sni_port = None |
| 49 | + host_ids = None |
| 50 | + |
| 51 | + @classmethod |
| 52 | + def from_dict(cls, d): |
| 53 | + c = cls() |
| 54 | + |
| 55 | + c.port = d.get('port', None) |
| 56 | + try: |
| 57 | + c.port = int(d['port']) |
| 58 | + except: |
| 59 | + pass |
| 60 | + |
| 61 | + c.username = d.get('username', None) |
| 62 | + c.password = d.get('password', None) |
| 63 | + c.host = d.get('host', None) |
| 64 | + c.keyspace = d.get('keyspace', None) |
| 65 | + c.local_dc = d.get('localDC', None) |
| 66 | + |
| 67 | + return c |
| 68 | + |
| 69 | + |
| 70 | +def get_cloud_config(cloud_config): |
| 71 | + if 'secure_connect_bundle' not in cloud_config: |
| 72 | + raise ValueError("The cloud config doesn't have a secure_connect_bundle specified.") |
| 73 | + |
| 74 | + try: |
| 75 | + config = read_cloud_config_from_zip(cloud_config) |
| 76 | + except BadZipFile: |
| 77 | + raise ValueError("Unable to open the zip file for the cloud config. Check your secure connect bundle.") |
| 78 | + |
| 79 | + return read_metadata_info(config, cloud_config) |
| 80 | + |
| 81 | + |
| 82 | +def read_cloud_config_from_zip(cloud_config): |
| 83 | + secure_bundle = cloud_config['secure_connect_bundle'] |
| 84 | + with ZipFile(secure_bundle) as zipfile: |
| 85 | + base_dir = os.path.dirname(secure_bundle) |
| 86 | + tmp_dir = tempfile.mkdtemp(dir=base_dir) |
| 87 | + try: |
| 88 | + zipfile.extractall(path=tmp_dir) |
| 89 | + return parse_cloud_config(os.path.join(tmp_dir, 'config.json'), cloud_config) |
| 90 | + finally: |
| 91 | + shutil.rmtree(tmp_dir) |
| 92 | + |
| 93 | + |
| 94 | +def parse_cloud_config(path, cloud_config): |
| 95 | + with open(path, 'r') as stream: |
| 96 | + data = json.load(stream) |
| 97 | + |
| 98 | + config = CloudConfig.from_dict(data) |
| 99 | + config_dir = os.path.dirname(path) |
| 100 | + |
| 101 | + if 'ssl_context' in cloud_config: |
| 102 | + config.ssl_context = cloud_config['ssl_context'] |
| 103 | + else: |
| 104 | + # Load the ssl_context before we delete the temporary directory |
| 105 | + ca_cert_location = os.path.join(config_dir, 'ca.crt') |
| 106 | + cert_location = os.path.join(config_dir, 'cert') |
| 107 | + key_location = os.path.join(config_dir, 'key') |
| 108 | + config.ssl_context = _ssl_context_from_cert(ca_cert_location, cert_location, key_location) |
| 109 | + |
| 110 | + return config |
| 111 | + |
| 112 | + |
| 113 | +def read_metadata_info(config, cloud_config): |
| 114 | + url = "https://{}:{}/metadata".format(config.host, config.port) |
| 115 | + timeout = cloud_config['connect_timeout'] if 'connect_timeout' in cloud_config else 5 |
| 116 | + try: |
| 117 | + response = urlopen(url, context=config.ssl_context, timeout=timeout) |
| 118 | + except Exception as e: |
| 119 | + log.exception(e) |
| 120 | + raise DriverException("Unable to connect to the metadata service at %s" % url) |
| 121 | + |
| 122 | + if response.code != 200: |
| 123 | + raise DriverException(("Error while fetching the metadata at: %s. " |
| 124 | + "The service returned error code %d." % (url, response.code))) |
| 125 | + return parse_metadata_info(config, response.read().decode('utf-8')) |
| 126 | + |
| 127 | + |
| 128 | +def parse_metadata_info(config, http_data): |
| 129 | + try: |
| 130 | + data = json.loads(http_data) |
| 131 | + except: |
| 132 | + msg = "Failed to load cluster metadata" |
| 133 | + raise DriverException(msg) |
| 134 | + |
| 135 | + contact_info = data['contact_info'] |
| 136 | + config.local_dc = contact_info['local_dc'] |
| 137 | + |
| 138 | + proxy_info = contact_info['sni_proxy_address'].split(':') |
| 139 | + config.sni_host = proxy_info[0] |
| 140 | + try: |
| 141 | + config.sni_port = int(proxy_info[1]) |
| 142 | + except: |
| 143 | + config.sni_port = 9042 |
| 144 | + |
| 145 | + config.host_ids = [host_id for host_id in contact_info['contact_points']] |
| 146 | + |
| 147 | + return config |
| 148 | + |
| 149 | + |
| 150 | +def _ssl_context_from_cert(ca_cert_location, cert_location, key_location): |
| 151 | + ssl_context = SSLContext(PROTOCOL_TLSv1) |
| 152 | + ssl_context.load_verify_locations(ca_cert_location) |
| 153 | + ssl_context.verify_mode = CERT_REQUIRED |
| 154 | + ssl_context.load_cert_chain(certfile=cert_location, keyfile=key_location) |
| 155 | + |
| 156 | + return ssl_context |
0 commit comments