-
Notifications
You must be signed in to change notification settings - Fork 55
Expand file tree
/
Copy pathds_client.py
More file actions
280 lines (236 loc) · 9.57 KB
/
Copy pathds_client.py
File metadata and controls
280 lines (236 loc) · 9.57 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
import uuid
from os import path, urandom
import hashlib
import base64
import secrets
import requests
from flask import current_app as app, url_for, redirect, render_template, request, session, redirect
from flask_oauthlib.client import OAuth
from requests_oauthlib import OAuth2Session
from docusign_esign import ApiClient
from docusign_esign.client.api_exception import ApiException
from ..ds_config import DS_CONFIG, DS_JWT
from ..error_handlers import process_error
from ..jwt_helpers import get_jwt_token, get_private_key
from ..consts import API_TYPE
SCOPES = [
"signature"
]
ROOMS_SCOPES = [
"room_forms", "dtr.rooms.read", "dtr.rooms.write",
"dtr.documents.read", "dtr.documents.write", "dtr.profile.read",
"dtr.profile.write", "dtr.company.read", "dtr.company.write"
]
CLICK_SCOPES = [
"signature", "click.manage", "click.send"
]
ADMIN_SCOPES = [
"signature", "organization_read", "group_read", "permission_read", "user_read", "user_write",
"account_read", "domain_read", "identity_provider_read", "impersonation", "user_data_redact",
"asset_group_account_read", "asset_group_account_clone_write", "asset_group_account_clone_read",
"organization_sub_account_write", "organization_sub_account_read"
]
WEBFORMS_SCOPES = [
"signature", "webforms_read", "webforms_instance_read", "webforms_instance_write"
]
NOTARY_SCOPES = [
"signature", "organization_read", "notary_read", "notary_write"
]
CONNECTED_FIELDS = [
"signature", "adm_store_unified_repo_read"
]
class DSClient:
ds_app = None
@classmethod
def _init(cls, auth_type, api):
if auth_type == "code_grant":
if session.get("pkce_failed", False):
cls._auth_code_grant(api)
else:
cls._pkce_auth(api)
elif auth_type == "jwt":
cls._jwt_auth(api)
@classmethod
def _auth_code_grant(cls, api):
"""Authorize with the Authorization Code Grant - OAuth 2.0 flow"""
oauth = OAuth(app)
use_scopes = []
if api == "Rooms":
use_scopes.extend(ROOMS_SCOPES)
elif api == "Click":
use_scopes.extend(CLICK_SCOPES)
elif api == "Admin":
use_scopes.extend(ADMIN_SCOPES)
elif api == "WebForms":
use_scopes.extend(WEBFORMS_SCOPES)
elif api == "Notary":
use_scopes.extend(NOTARY_SCOPES)
elif api == "ConnectedFields":
use_scopes.extend(CONNECTED_FIELDS)
else:
use_scopes.extend(SCOPES)
# remove duplicate scopes
use_scopes = list(set(use_scopes))
request_token_params = {
"scope": " ".join(use_scopes),
"state": lambda: uuid.uuid4().hex.upper()
}
if not DS_CONFIG["allow_silent_authentication"]:
request_token_params["prompt"] = "login"
cls.ds_app = oauth.remote_app(
"docusign",
consumer_key=DS_CONFIG["ds_client_id"],
consumer_secret=DS_CONFIG["ds_client_secret"],
access_token_url=DS_CONFIG["authorization_server"] + "/oauth/token",
authorize_url=DS_CONFIG["authorization_server"] + "/oauth/auth",
request_token_params=request_token_params,
base_url=None,
request_token_url=None,
access_token_method="POST"
)
@classmethod
def _pkce_auth(cls, api):
"""Authorize with the Authorization Code Grant - OAuth 2.0 flow"""
use_scopes = []
if api == "Rooms":
use_scopes.extend(ROOMS_SCOPES)
elif api == "Click":
use_scopes.extend(CLICK_SCOPES)
elif api == "Admin":
use_scopes.extend(ADMIN_SCOPES)
elif api == "WebForms":
use_scopes.extend(WEBFORMS_SCOPES)
elif api == "Notary":
use_scopes.extend(NOTARY_SCOPES)
elif api == "ConnectedFields":
use_scopes.extend(CONNECTED_FIELDS)
else:
use_scopes.extend(SCOPES)
# remove duplicate scopes
use_scopes = list(set(use_scopes))
redirect_uri = DS_CONFIG["app_url"] + url_for("ds.ds_callback")
cls.ds_app = OAuth2Session(DS_CONFIG["ds_client_id"], redirect_uri=redirect_uri, scope=use_scopes)
@classmethod
def _jwt_auth(cls, api):
"""JSON Web Token authorization"""
api_client = ApiClient()
api_client.set_base_path(DS_JWT["authorization_server"])
use_scopes = []
if api == "Rooms":
use_scopes.extend(ROOMS_SCOPES)
elif api == "Click":
use_scopes.extend(CLICK_SCOPES)
elif api == "Admin":
use_scopes.extend(ADMIN_SCOPES)
elif api == "WebForms":
use_scopes.extend(WEBFORMS_SCOPES)
elif api == "Notary":
use_scopes.extend(NOTARY_SCOPES)
else:
use_scopes.extend(SCOPES)
# remove duplicate scopes
use_scopes = list(set(use_scopes))
use_scopes.append("impersonation")
# Catch IO error
try:
private_key = get_private_key(DS_JWT["private_key_file"]).encode("ascii").decode("utf-8")
except (OSError, IOError) as err:
return render_template(
"error.html",
err=err
)
try:
cls.ds_app = get_jwt_token(private_key, use_scopes, DS_JWT["authorization_server"], DS_JWT["ds_client_id"], DS_JWT["ds_impersonated_user_id"])
return redirect(url_for("ds.ds_callback"))
except ApiException as err:
body = err.body.decode('utf8')
# Grant explicit consent for the application
if "consent_required" in body:
consent_scopes = " ".join(use_scopes)
redirect_uri = DS_CONFIG["app_url"] + url_for("ds.ds_callback")
consent_url = f"{DS_CONFIG['authorization_server']}/oauth/auth?response_type=code&" \
f"scope={consent_scopes}&client_id={DS_JWT['ds_client_id']}&redirect_uri={redirect_uri}"
return redirect(consent_url)
else:
process_error(err)
return redirect(url_for("ds.ds_callback"))
@classmethod
def destroy(cls):
cls.ds_app = None
@classmethod
def login(cls, auth_type, api):
cls._init(auth_type, api)
if auth_type == "code_grant":
if session.get("pkce_failed", False):
return cls.get(auth_type, api).authorize(callback=url_for("ds.ds_callback", _external=True))
else:
code_verifier = cls.generate_code_verifier()
code_challenge = cls.generate_code_challenge(code_verifier)
session["code_verifier"] = code_verifier
return redirect(cls.get_auth_url_with_pkce(code_challenge))
elif auth_type == "jwt":
return cls._jwt_auth(api)
@classmethod
def get_token(cls, auth_type):
resp = None
if auth_type == "code_grant":
if session.get("pkce_failed", False):
resp = cls.get(auth_type).authorized_response()
else:
return cls.fetch_token_with_pkce(request.url)
elif auth_type == "jwt":
resp = cls.get(auth_type).to_dict()
if resp is None or resp.get("access_token") is None:
return "Access denied: reason=%s error=%s resp=%s" % (
request.args["error"],
request.args["error_description"],
resp
)
return resp
@classmethod
def get_user(cls, access_token):
"""Make request to the API to get the user information"""
# Determine user, account_id, base_url by calling OAuth::getUserInfo
# See https://developers.docusign.com/esign-rest-api/guides/authentication/user-info-endpoints
url = DS_CONFIG["authorization_server"] + "/oauth/userinfo"
auth = {"Authorization": "Bearer " + access_token}
response = requests.get(url, headers=auth).json()
return response
@classmethod
def get(cls, auth_type, api=API_TYPE["ESIGNATURE"]):
if not cls.ds_app:
cls._init(auth_type, api)
return cls.ds_app
@classmethod
def generate_code_verifier(cls):
# Generate a random 32-byte string and base64-url encode it
return secrets.token_urlsafe(32)
@classmethod
def generate_code_challenge(cls, code_verifier):
# Hash the code verifier using SHA-256
sha256_hash = hashlib.sha256(code_verifier.encode()).digest()
# Base64 encode the hash and make it URL safe
base64_encoded = base64.urlsafe_b64encode(sha256_hash).decode().rstrip('=')
return base64_encoded
@classmethod
def get_auth_url_with_pkce(cls, code_challenge):
authorize_url = DS_CONFIG["authorization_server"] + "/oauth/auth"
auth_url, state = cls.ds_app.authorization_url(
authorize_url,
code_challenge=code_challenge,
code_challenge_method='S256', # PKCE uses SHA-256 hashing,
approval_prompt="auto"
)
return auth_url
@classmethod
def fetch_token_with_pkce(cls, authorization_response):
access_token_url = DS_CONFIG["authorization_server"] + "/oauth/token"
token = cls.get("code_grant", session.get("api")).fetch_token(
access_token_url,
authorization_response=authorization_response,
client_id=DS_CONFIG["ds_client_id"],
client_secret=DS_CONFIG["ds_client_secret"],
code_verifier=session["code_verifier"],
code_challenge_method="S256"
)
return token