forked from twilio/twilio-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathutil.py
More file actions
165 lines (125 loc) · 5.25 KB
/
util.py
File metadata and controls
165 lines (125 loc) · 5.25 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
import base64
import hmac
import time
from hashlib import sha1
from . import jwt
from .compat import izip, urlencode
from six import iteritems
class RequestValidator(object):
def __init__(self, token):
self.token = token.encode("utf-8")
def compute_signature(self, uri, params):
"""Compute the signature for a given request
:param uri: full URI that Twilio requested on your server
:param params: post vars that Twilio sent with the request
:param auth: tuple with (account_sid, token)
:returns: The computed signature
"""
s = uri
if len(params) > 0:
for k, v in sorted(params.items()):
s += k + v
# compute signature and compare signatures
mac = hmac.new(self.token, s.encode("utf-8"), sha1)
computed = base64.b64encode(mac.digest())
return computed.strip()
def validate(self, uri, params, signature):
"""Validate a request from Twilio
:param uri: full URI that Twilio requested on your server
:param params: post vars that Twilio sent with the request
:param signature: expexcted signature in HTTP X-Twilio-Signature header
:param auth: tuple with (account_sid, token)
:returns: True if the request passes validation, False if not
"""
return secure_compare(self.compute_signature(uri, params), signature)
def secure_compare(string1, string2):
"""Compare two strings while protecting against timing attacks
:param str string1: the first string
:param str string2: the second string
:returns: True if the strings are equal, False if not
:rtype: :obj:`bool`
"""
if len(string1) != len(string2):
return False
result = True
for c1, c2 in izip(string1, string2):
result &= c1 == c2
return result
class TwilioCapability(object):
"""
A token to control permissions with Twilio Client
:param str account_sid: the account sid to which this token
is granted access
:param str auth_token: the secret key used to sign the token.
Note, this auth token is not visible to the
user of the token.
:returns: A new TwilioCapability with zero permissions
"""
def __init__(self, account_sid, auth_token):
self.account_sid = account_sid
self.auth_token = auth_token
self.capabilities = {}
self.client_name = None
def payload(self):
"""Return the payload for this token."""
if "outgoing" in self.capabilities and self.client_name is not None:
scope = self.capabilities["outgoing"]
scope.params["clientName"] = self.client_name
capabilities = self.capabilities.values()
scope_uris = [str(scope_uri) for scope_uri in capabilities]
return {
"scope": " ".join(scope_uris)
}
def generate(self, expires=3600):
"""Generate a valid JWT token with an expiration date.
:param int expires: The token lifetime, in seconds. Defaults to
1 hour (3600)
"""
payload = self.payload()
payload['iss'] = self.account_sid
payload['exp'] = int(time.time() + expires)
return jwt.encode(payload, self.auth_token)
def allow_client_outgoing(self, application_sid, **kwargs):
"""Allow the user of this token to make outgoing connections.
Keyword arguments are passed to the application.
:param str application_sid: Application to contact
"""
scope_params = {
"appSid": application_sid,
}
if kwargs:
scope_params["appParams"] = urlencode(kwargs, doseq=True)
self.capabilities["outgoing"] = ScopeURI("client", "outgoing",
scope_params)
def allow_client_incoming(self, client_name):
"""If the user of this token should be allowed to accept incoming
connections then configure the TwilioCapability through this method and
specify the client name.
:param str client_name: Client name to accept calls from
"""
self.client_name = client_name
self.capabilities["incoming"] = ScopeURI("client", "incoming", {
'clientName': client_name
})
def allow_event_stream(self, **kwargs):
"""Allow the user of this token to access their event stream."""
scope_params = {
"path": "/2010-04-01/Events",
}
if kwargs:
scope_params['params'] = urlencode(kwargs, doseq=True)
self.capabilities["events"] = ScopeURI("stream", "subscribe",
scope_params)
class ScopeURI(object):
def __init__(self, service, privilege, params=None):
self.service = service
self.privilege = privilege
self.params = params
def __str__(self):
if self.params:
sorted_params = sorted([(k, v) for k, v in iteritems(self.params)])
encoded_params = urlencode(sorted_params)
param_string = '?%s' % encoded_params
else:
param_string = ''
return "scope:%s:%s%s" % (self.service, self.privilege, param_string)