Skip to content

Commit 51fdc52

Browse files
committed
add unit tests
1 parent 09ebee8 commit 51fdc52

File tree

11 files changed

+649
-5
lines changed

11 files changed

+649
-5
lines changed

splitio/api/client.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,6 @@ def get(self, server, path, apikey, query=None, extra_headers=None): #pylint: d
9595
:rtype: HttpResponse
9696
"""
9797
headers = self._build_basic_headers(apikey)
98-
9998
if extra_headers is not None:
10099
headers.update(extra_headers)
101100

splitio/push/__init__.py

Whitespace-only changes.

splitio/push/splitsse.py

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
"""An SSE client wrapper to be used with split endpoint."""
2+
import logging
3+
import threading
4+
from enum import Enum
5+
import six
6+
from splitio.push.sse import SSEClient, SSE_EVENT_ERROR
7+
from splitio.util.threading import EventGroup
8+
9+
10+
_LOGGER = logging.getLogger(__name__)
11+
12+
13+
class SplitSSEClient(object):
14+
"""Split streaming endpoint SSE client."""
15+
16+
class _Status(Enum):
17+
IDLE = 0
18+
CONNECTING = 1
19+
ERRORED = 2
20+
CONNECTED = 3
21+
22+
def __init__(self, callback, base_url='https://streaming.split.io'):
23+
"""
24+
Construct a split sse client.
25+
26+
:param callback: fuction to call when an event is received.
27+
:type callback: callable
28+
29+
:param base_url: scheme + :// + host
30+
:type base_url: str
31+
"""
32+
self._client = SSEClient(self._raw_event_handler)
33+
self._callback = callback
34+
self._base_url = base_url
35+
self._status = SplitSSEClient._Status.IDLE
36+
self._sse_first_event = None
37+
self._sse_connection_closed = None
38+
39+
def _raw_event_handler(self, event):
40+
"""
41+
Handle incoming raw sse event.
42+
43+
:param event: Incoming raw sse event.
44+
:type event: splitio.push.sse.SSEEvent
45+
"""
46+
if self._status == SplitSSEClient._Status.CONNECTING:
47+
self._status = SplitSSEClient._Status.CONNECTED if event.event != SSE_EVENT_ERROR \
48+
else SplitSSEClient._Status.ERRORED
49+
self._sse_first_event.set()
50+
51+
if event.data is not None:
52+
self._callback(event)
53+
54+
@staticmethod
55+
def _format_channels(channels):
56+
"""
57+
Format channels into a list from the raw object retrieved in the token.
58+
59+
:param channels: object as extracted from the JWT capabilities.
60+
:type channels: dict[str,list[str]]
61+
62+
:returns: channels as a list of strings.
63+
:rtype: list[str]
64+
"""
65+
regular = [k for (k, v) in six.iteritems(channels) if v == ['subscribe']]
66+
occupancy = ['[?occupancy=metrics.publishers]' + k
67+
for (k, v) in six.iteritems(channels)
68+
if 'channel-metadata:publishers' in v]
69+
return regular + occupancy
70+
71+
def _build_url(self, token):
72+
"""
73+
Build the url to connect to and return it as a string.
74+
75+
:param token: (parsed) JWT
76+
:type token: splitio.models.token.Token
77+
78+
:returns: true if the connection was successful. False otherwise.
79+
:rtype: bool
80+
"""
81+
return '{base}/event-stream?v=1.1&accessToken={token}&channels={channels}'.format(
82+
base=self._base_url,
83+
token=token.token,
84+
channels=','.join(self._format_channels(token.channels)))
85+
86+
def start(self, token):
87+
"""
88+
Open a connection to start listening for events.
89+
90+
:param token: (parsed) JWT
91+
:type token: splitio.models.token.Token
92+
93+
:returns: true if the connection was successful. False otherwise.
94+
:rtype: bool
95+
"""
96+
if self._status != SplitSSEClient._Status.IDLE:
97+
raise Exception('SseClient already started.')
98+
99+
self._status = SplitSSEClient._Status.CONNECTING
100+
101+
event_group = EventGroup()
102+
self._sse_first_event = event_group.make_event()
103+
self._sse_connection_closed = event_group.make_event()
104+
105+
def connect(url):
106+
"""Connect to sse in a blocking manner."""
107+
self._client.start(url)
108+
self._sse_connection_closed.set()
109+
self._status = SplitSSEClient._Status.IDLE
110+
111+
url = self._build_url(token)
112+
task = threading.Thread(target=connect, args=(url,))
113+
task.setDaemon(True)
114+
task.start()
115+
event_group.wait()
116+
return self._status == SplitSSEClient._Status.CONNECTED
117+
118+
def stop(self, blocking=False, timeout=None):
119+
"""Abort the ongoing connection."""
120+
if self._status == SplitSSEClient._Status.IDLE:
121+
raise Exception('SseClient not running')
122+
self._client.shutdown()
123+
if blocking:
124+
self._sse_connection_closed.wait(timeout)

splitio/push/sse.py

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
"""Low-level SSE Client."""
2+
import logging
3+
import socket
4+
from collections import namedtuple
5+
6+
try: # try to import python3 names. fallback to python2
7+
from http.client import HTTPConnection, HTTPSConnection
8+
from urllib.parse import urlparse
9+
except ImportError:
10+
import urlparse
11+
from httplib import HTTPConnection, HTTPSConnection
12+
13+
14+
_LOGGER = logging.getLogger(__name__)
15+
16+
17+
SSE_EVENT_ERROR = 'error'
18+
SSE_EVENT_MESSAGE = 'message'
19+
20+
21+
SSEEvent = namedtuple('SSEEvent', ['event_id', 'event', 'retry', 'data'])
22+
23+
24+
class EventBuilder(object):
25+
"""Event builder class."""
26+
27+
_SEPARATOR = b':'
28+
29+
def __init__(self):
30+
"""Construct a builder."""
31+
self._lines = {}
32+
33+
def process_line(self, line):
34+
"""
35+
Process a new line.
36+
37+
:param line: Line to process
38+
:type line: bytes
39+
"""
40+
try:
41+
key, val = line.split(self._SEPARATOR, 1)
42+
self._lines[key.decode('utf8').strip()] = val.decode('utf8').strip()
43+
except ValueError: # key without a value
44+
self._lines[line.decode('utf8').strip()] = None
45+
46+
def build(self):
47+
"""Construct an event with relevant fields."""
48+
return SSEEvent(self._lines.get('id'), self._lines.get('event'),
49+
self._lines.get('retry'), self._lines.get('data'))
50+
51+
52+
class SSEClient(object):
53+
"""SSE Client implementation."""
54+
55+
_DEFAULT_HEADERS = {'Accept': 'text/event-stream'}
56+
_EVENT_SEPARATORS = set([b'\n', b'\r\n'])
57+
58+
def __init__(self, callback):
59+
"""
60+
Construct an SSE client.
61+
62+
:param callback: function to call when an event is received
63+
:type callback: callable
64+
"""
65+
self._connection = None
66+
self._event_callback = callback
67+
self._shutdown_requested = False
68+
69+
def _read_events(self):
70+
"""
71+
Read events from the supplied connection.
72+
73+
:returns: True if the connection was ended by us. False if it was closed by the serve.
74+
:rtype: bool
75+
"""
76+
try:
77+
response = self._connection.getresponse()
78+
event_builder = EventBuilder()
79+
while True:
80+
line = response.readline()
81+
if line is None or len(line) <= 0: # connection ended
82+
_LOGGER.info("sse connection has ended.")
83+
break
84+
elif line.startswith(b':'): # comment. Skip
85+
_LOGGER.debug("skipping sse comment")
86+
continue
87+
elif line in self._EVENT_SEPARATORS:
88+
event = event_builder.build()
89+
_LOGGER.debug("dispatching event: %s", event)
90+
self._event_callback(event)
91+
event_builder = EventBuilder()
92+
else:
93+
event_builder.process_line(line)
94+
except Exception: #pylint:disable=broad-except
95+
_LOGGER.info('sse connection ended.')
96+
_LOGGER.debug(exc_info=True)
97+
finally:
98+
self._connection.close()
99+
self._connection = None # clear so it can be started again
100+
101+
return self._shutdown_requested
102+
103+
def start(self, url, headers=None): #pylint:disable=dangerous-default-value
104+
"""
105+
Connect and start listening for events.
106+
107+
:param url: url to connect to
108+
:type url: str
109+
110+
:param headers: additional headers
111+
:type headers: dict[str, str]
112+
113+
:returns: True if the connection was ended by us. False if it was closed by the serve.
114+
:rtype: bool
115+
"""
116+
if self._connection is not None:
117+
raise RuntimeError('Client already started.')
118+
119+
url = urlparse(url)
120+
headers = self._DEFAULT_HEADERS.copy()
121+
headers.update(headers if headers is not None else {})
122+
self._connection = HTTPSConnection(url.hostname, url.port) if url.scheme == 'https' \
123+
else HTTPConnection(url.hostname, port=url.port)
124+
125+
self._connection.request('GET', '%s?%s' % (url.path, url.query), headers=headers)
126+
return self._read_events()
127+
128+
def shutdown(self):
129+
"""Shutdown the current connection."""
130+
if self._connection is None:
131+
_LOGGER.warn("no sse connection has been started on this SSEClient instance. Ignoring")
132+
return
133+
134+
if self._shutdown_requested:
135+
_LOGGER.warn("shutdown already requested")
136+
return
137+
138+
self._shutdown_requested = True
139+
self._connection.sock.shutdown(socket.SHUT_RDWR)

splitio/util/__init__.py

Whitespace-only changes.

splitio/util/threading.py

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
"""Threading utilities."""
2+
from inspect import isclass
3+
import threading
4+
5+
6+
# python2 workaround
7+
_EventClass = threading.Event if isclass(threading.Event) else threading._Event #pylint:disable=protected-access,invalid-name
8+
9+
10+
class EventGroup(object):
11+
"""EventGroup that can be waited with an OR condition."""
12+
13+
class Event(_EventClass): #pylint:disable=too-few-public-methods
14+
"""Threading event meant to be used in an group."""
15+
16+
def __init__(self, shared_condition):
17+
"""
18+
Construct an event.
19+
20+
:param shared_condition: shared condition varaible.
21+
:type shared_condition: threading.Condition
22+
"""
23+
_EventClass.__init__(self)
24+
self._shared_cond = shared_condition
25+
26+
def set(self):
27+
"""Set the event."""
28+
_EventClass.set(self)
29+
with self._shared_cond:
30+
self._shared_cond.notify()
31+
32+
def __init__(self):
33+
"""Construct an event group."""
34+
self._cond = threading.Condition()
35+
36+
def make_event(self):
37+
"""
38+
Make a new event associated to this waitable group.
39+
40+
:returns: an event that can be awaited as part of a group
41+
:rtype: EventGroup.Event
42+
"""
43+
return EventGroup.Event(self._cond)
44+
45+
def wait(self, timeout=None):
46+
"""
47+
Wait until one of the events is triggered.
48+
49+
:param timeout: how many seconds to wait. None means forever.
50+
:type timeout: int
51+
52+
:returns: True if the condition was notified within the specified timeout. False otherwise.
53+
:rtype: bool
54+
"""
55+
with self._cond:
56+
return self._cond.wait(timeout)

tests/models/test_token.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,7 @@
66

77
class TokenTests(object):
88
"""Token model tests."""
9-
raw_false = {
10-
'pushEnabled': False,
11-
'token': 'eyJhbGciOiJIUzI1NiIsImtpZCI6IjVZOU05US45QnJtR0EiLCJ0eXAiOiJKV1QifQ.eyJ4LWFibHktY2FwYWJpbGl0eSI6IntcIk56TTJNREk1TXpjMF9NVGd5TlRnMU1UZ3dOZz09X3NlZ21lbnRzXCI6W1wic3Vic2NyaWJlXCJdLFwiTnpNMk1ESTVNemMwX01UZ3lOVGcxTVRnd05nPT1fc3BsaXRzXCI6W1wic3Vic2NyaWJlXCJdLFwiY29udHJvbF9wcmlcIjpbXCJzdWJzY3JpYmVcIixcImNoYW5uZWwtbWV0YWRhdGE6cHVibGlzaGVyc1wiXSxcImNvbnRyb2xfc2VjXCI6W1wic3Vic2NyaWJlXCIsXCJjaGFubmVsLW1ldGFkYXRhOnB1Ymxpc2hlcnNcIl19IiwieC1hYmx5LWNsaWVudElkIjoiY2xpZW50SWQiLCJleHAiOjE2MDIwODgxMjcsImlhdCI6MTYwMjA4NDUyN30.5_MjWonhs6yoFhw44hNJm3H7_YMjXpSW105DwjjppqE',
12-
}
9+
raw_false = {'pushEnabled': False}
1310

1411
def test_from_raw_false(self):
1512
"""Test token model parsing."""

0 commit comments

Comments
 (0)