Skip to content

Commit a06f4b9

Browse files
authored
Merge pull request #2 from riptano/python-1098
PYTHON-1098: Add a new Endpoint type to support unix sockets
2 parents 3b3adb3 + 0f8c991 commit a06f4b9

6 files changed

Lines changed: 170 additions & 6 deletions

File tree

CHANGELOG.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ Not Released
55
Features
66
--------
77
* Add SNIEndPoint support (PYTHON-1084)
8+
* Add a new Endpoint type to support unix sockets (PYTHON-1098)
89

910
3.19.0
1011
======

build.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,9 @@ build:
150150
export PATH=$JAVA_HOME/bin:$PATH
151151
export PYTHONPATH=""
152152
153+
# Required for unix socket tests
154+
sudo apt-get install socat
155+
153156
# Install latest setuptools
154157
pip install --upgrade pip
155158
pip install -U setuptools

cassandra/connection.py

Lines changed: 64 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,13 @@ def ssl_options(self):
144144
"""
145145
return None
146146

147+
@property
148+
def socket_family(self):
149+
"""
150+
The socket family of the endpoint.
151+
"""
152+
return socket.AF_UNSPEC
153+
147154
def resolve(self):
148155
"""
149156
Resolve the endpoint to an address/port. This is called
@@ -299,6 +306,47 @@ def create_from_sni(self, sni):
299306
return SniEndPoint(self._proxy_address, sni, self._port)
300307

301308

309+
@total_ordering
310+
class UnixSocketEndPoint(EndPoint):
311+
"""
312+
Unix Socket EndPoint implementation.
313+
"""
314+
315+
def __init__(self, unix_socket_path):
316+
self._unix_socket_path = unix_socket_path
317+
318+
@property
319+
def address(self):
320+
return self._unix_socket_path
321+
322+
@property
323+
def port(self):
324+
return None
325+
326+
@property
327+
def socket_family(self):
328+
return socket.AF_UNIX
329+
330+
def resolve(self):
331+
return self.address, None
332+
333+
def __eq__(self, other):
334+
return (isinstance(other, UnixSocketEndPoint) and
335+
self._unix_socket_path == other._unix_socket_path)
336+
337+
def __hash__(self):
338+
return hash(self._unix_socket_path)
339+
340+
def __lt__(self, other):
341+
return self._unix_socket_path < other._unix_socket_path
342+
343+
def __str__(self):
344+
return str("%s" % (self._unix_socket_path,))
345+
346+
def __repr__(self):
347+
return "<%s: %s>" % (self.__class__.__name__, self._unix_socket_path)
348+
349+
302350
class _Frame(object):
303351
def __init__(self, version, flags, stream, opcode, body_offset, end_pos):
304352
self.version = version
@@ -557,13 +605,22 @@ def factory(cls, endpoint, timeout, *args, **kwargs):
557605
else:
558606
return conn
559607

560-
def _connect_socket(self):
561-
sockerr = None
562-
inet_address, port = self.endpoint.resolve()
563-
addresses = socket.getaddrinfo(inet_address, port, socket.AF_UNSPEC, socket.SOCK_STREAM)
608+
def _get_socket_addresses(self):
609+
address, port = self.endpoint.resolve()
610+
611+
if self.endpoint.socket_family == socket.AF_UNIX:
612+
return [(socket.AF_UNIX, socket.SOCK_STREAM, 0, None, address)]
613+
614+
addresses = socket.getaddrinfo(address, port, self.endpoint.socket_family, socket.SOCK_STREAM)
564615
if not addresses:
565616
raise ConnectionException("getaddrinfo returned empty list for %s" % (self.endpoint,))
566-
for (af, socktype, proto, canonname, sockaddr) in addresses:
617+
618+
return addresses
619+
620+
def _connect_socket(self):
621+
sockerr = None
622+
addresses = self._get_socket_addresses()
623+
for (af, socktype, proto, _, sockaddr) in addresses:
567624
try:
568625
self._socket = self._socket_impl.socket(af, socktype, proto)
569626
if self.ssl_context:
@@ -587,7 +644,8 @@ def _connect_socket(self):
587644
sockerr = err
588645

589646
if sockerr:
590-
raise socket.error(sockerr.errno, "Tried connecting to %s. Last error: %s" % ([a[4] for a in addresses], sockerr.strerror or sockerr))
647+
raise socket.error(sockerr.errno, "Tried connecting to %s. Last error: %s" %
648+
([a[4] for a in addresses], sockerr.strerror or sockerr))
591649

592650
if self.sockopts:
593651
for args in self.sockopts:

docs/api/cassandra/connection.rst

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,3 +17,5 @@
1717
.. autoclass:: SniEndPoint
1818

1919
.. autoclass:: SniEndPointFactory
20+
21+
.. autoclass:: UnixSocketEndPoint
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
# Copyright DataStax, Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License
14+
15+
try:
16+
import unittest2 as unittest
17+
except ImportError:
18+
import unittest # noqa
19+
20+
try:
21+
from ccmlib import common
22+
except ImportError as e:
23+
raise unittest.SkipTest('ccm is a dependency for integration tests:', e)
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
# Copyright DataStax, Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License
14+
try:
15+
import unittest2 as unittest
16+
except ImportError:
17+
import unittest # noqa
18+
19+
import time
20+
import subprocess
21+
import logging
22+
23+
from cassandra.cluster import Cluster, ExecutionProfile, EXEC_PROFILE_DEFAULT
24+
from cassandra.connection import UnixSocketEndPoint
25+
from cassandra.policies import WhiteListRoundRobinPolicy, RoundRobinPolicy
26+
27+
from tests import notwindows
28+
from tests.integration import use_single_node
29+
30+
log = logging.getLogger()
31+
log.setLevel('DEBUG')
32+
33+
UNIX_SOCKET_PATH = '/tmp/cass.sock'
34+
35+
36+
def setup_module():
37+
use_single_node()
38+
39+
40+
class UnixSocketWhiteListRoundRobinPolicy(WhiteListRoundRobinPolicy):
41+
def __init__(self, hosts):
42+
self._allowed_hosts = self._allowed_hosts_resolved = tuple(hosts)
43+
RoundRobinPolicy.__init__(self)
44+
45+
46+
@notwindows
47+
class UnixSocketTest(unittest.TestCase):
48+
49+
@classmethod
50+
def setUpClass(cls):
51+
log.debug("Starting socat...")
52+
cls.proc = subprocess.Popen(
53+
['socat',
54+
'UNIX-LISTEN:%s,fork' % UNIX_SOCKET_PATH,
55+
'TCP:localhost:9042'],
56+
stdout=subprocess.PIPE,
57+
stderr=subprocess.STDOUT)
58+
59+
time.sleep(1)
60+
if cls.proc.poll() is not None:
61+
for line in cls.proc.stdout.readlines():
62+
log.debug("socat: " + line.decode('utf-8'))
63+
raise Exception("Error while starting socat. Return code: %d" % cls.proc.returncode)
64+
65+
lbp = UnixSocketWhiteListRoundRobinPolicy([UNIX_SOCKET_PATH])
66+
ep = ExecutionProfile(load_balancing_policy=lbp)
67+
endpoint = UnixSocketEndPoint(UNIX_SOCKET_PATH)
68+
cls.cluster = Cluster([endpoint], execution_profiles={EXEC_PROFILE_DEFAULT: ep})
69+
70+
@classmethod
71+
def tearDownClass(cls):
72+
cls.cluster.shutdown()
73+
cls.proc.terminate()
74+
75+
def test_unix_socket_connection(self):
76+
s = self.cluster.connect()
77+
s.execute('select * from system.local')

0 commit comments

Comments
 (0)