-
Notifications
You must be signed in to change notification settings - Fork 36
Expand file tree
/
Copy pathoptimizely_user_context.py
More file actions
345 lines (278 loc) · 11.6 KB
/
optimizely_user_context.py
File metadata and controls
345 lines (278 loc) · 11.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
# Copyright 2021-2022, Optimizely and contributors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from __future__ import annotations
import copy
import threading
from typing import TYPE_CHECKING, Any, Callable, Optional, NewType, Dict
from optimizely.decision import optimizely_decision
from optimizely.helpers.enums import OdpManagerConfig
if TYPE_CHECKING:
# prevent circular dependency by skipping import at runtime
from . import optimizely
from optimizely.helpers.event_tag_utils import EventTags
from .logger import Logger
# type for tracking user attributes (essentially a sub-type of dict)
UserAttributes = NewType('UserAttributes', Dict[str, Any])
class OptimizelyUserContext:
"""
Representation of an Optimizely User Context using which APIs are to be called.
"""
def __init__(
self,
optimizely_client: optimizely.Optimizely,
logger: Logger,
user_id: str,
user_attributes: Optional[UserAttributes] = None,
identify: bool = True
):
""" Create an instance of the Optimizely User Context.
Args:
optimizely_client: client used when calling decisions for this user context
logger: logger for logging
user_id: user id of this user context
user_attributes: user attributes to use for this user context
identify: True to send identify event to ODP.
Returns:
UserContext instance
"""
self.client = optimizely_client
self.logger = logger
self.user_id = user_id
self._qualified_segments: Optional[list[str]] = None
if not isinstance(user_attributes, dict):
user_attributes = UserAttributes({})
self._user_attributes = UserAttributes(user_attributes.copy() if user_attributes else {})
self.lock = threading.Lock()
self.forced_decisions_map: dict[
OptimizelyUserContext.OptimizelyDecisionContext,
OptimizelyUserContext.OptimizelyForcedDecision
] = {}
if self.client and identify:
identifiers = {OdpManagerConfig.KEY_FOR_USER_ID: user_id}
self.client._identify_user(identifiers)
class OptimizelyDecisionContext:
""" Using class with attributes here instead of namedtuple because
class is extensible, it's easy to add another attribute if we wanted
to extend decision context.
"""
def __init__(self, flag_key: str, rule_key: Optional[str] = None):
self.flag_key = flag_key
self.rule_key = rule_key
def __hash__(self) -> int:
return hash((self.flag_key, self.rule_key))
def __eq__(self, other: OptimizelyUserContext.OptimizelyDecisionContext) -> bool: # type: ignore[override]
return (self.flag_key, self.rule_key) == (other.flag_key, other.rule_key)
# forced decision
class OptimizelyForcedDecision:
def __init__(self, variation_key: str):
self.variation_key = variation_key
def _clone(self) -> Optional[OptimizelyUserContext]:
if not self.client:
return None
user_context = OptimizelyUserContext(
self.client,
self.logger,
self.user_id,
self.get_user_attributes(),
identify=False
)
with self.lock:
if self.forced_decisions_map:
# makes sure forced_decisions_map is duplicated without any references
user_context.forced_decisions_map = copy.deepcopy(self.forced_decisions_map)
if self._qualified_segments:
# no need to use deepcopy here as qualified_segments does not contain anything other than strings
user_context._qualified_segments = self._qualified_segments.copy()
return user_context
def get_user_attributes(self) -> UserAttributes:
with self.lock:
return UserAttributes(self._user_attributes.copy())
def set_attribute(self, attribute_key: str, attribute_value: Any) -> None:
"""
sets a attribute by key for this user context.
Args:
attribute_key: key to use for attribute
attribute_value: attribute value
Returns:
None
"""
with self.lock:
self._user_attributes[attribute_key] = attribute_value
def decide(
self, key: str, options: Optional[list[str]] = None
) -> optimizely_decision.OptimizelyDecision:
"""
Call decide on contained Optimizely object
Args:
key: feature key
options: array of DecisionOption
Returns:
Decision object
"""
if isinstance(options, list):
options = options[:]
return self.client._decide(self._clone(), key, options)
def decide_for_keys(
self, keys: list[str], options: Optional[list[str]] = None
) -> dict[str, optimizely_decision.OptimizelyDecision]:
"""
Call decide_for_keys on contained optimizely object
Args:
keys: array of feature keys
options: array of DecisionOption
Returns:
Dictionary with feature_key keys and Decision object values
"""
if isinstance(options, list):
options = options[:]
return self.client._decide_for_keys(self._clone(), keys, options)
def decide_all(self, options: Optional[list[str]] = None) -> dict[str, optimizely_decision.OptimizelyDecision]:
"""
Call decide_all on contained optimizely instance
Args:
options: Array of DecisionOption objects
Returns:
Dictionary with feature_key keys and Decision object values
"""
if isinstance(options, list):
options = options[:]
return self.client._decide_all(self._clone(), options)
def track_event(self, event_key: str, event_tags: Optional[EventTags] = None) -> None:
return self.client.track(event_key, self.user_id, self.get_user_attributes(), event_tags)
def as_json(self) -> dict[str, Any]:
return {
'user_id': self.user_id,
'attributes': self.get_user_attributes(),
}
def set_forced_decision(
self, decision_context: OptimizelyDecisionContext, decision: OptimizelyForcedDecision
) -> bool:
"""
Sets the forced decision for a given decision context.
Args:
decision_context: a decision context.
decision: a forced decision.
Returns:
True if the forced decision has been set successfully.
"""
with self.lock:
self.forced_decisions_map[decision_context] = decision
return True
def get_forced_decision(self, decision_context: OptimizelyDecisionContext) -> Optional[OptimizelyForcedDecision]:
"""
Gets the forced decision (variation key) for a given decision context.
Args:
decision_context: a decision context.
Returns:
A forced_decision or None if forced decisions are not set for the parameters.
"""
forced_decision = self.find_forced_decision(decision_context)
return forced_decision
def remove_forced_decision(self, decision_context: OptimizelyDecisionContext) -> bool:
"""
Removes the forced decision for a given decision context.
Args:
decision_context: a decision context.
Returns:
True if the forced decision has been removed successfully.
"""
with self.lock:
if decision_context in self.forced_decisions_map:
del self.forced_decisions_map[decision_context]
return True
return False
def remove_all_forced_decisions(self) -> bool:
"""
Removes all forced decisions bound to this user context.
Returns:
True if forced decisions have been removed successfully.
"""
with self.lock:
self.forced_decisions_map.clear()
return True
def find_forced_decision(self, decision_context: OptimizelyDecisionContext) -> Optional[OptimizelyForcedDecision]:
"""
Gets forced decision from forced decision map.
Args:
decision_context: a decision context.
Returns:
Forced decision.
"""
with self.lock:
if not self.forced_decisions_map:
return None
# must allow None to be returned for the Flags only case
return self.forced_decisions_map.get(decision_context)
def is_qualified_for(self, segment: str) -> bool:
"""
Checks is the provided segment is in the qualified_segments list.
Args:
segment: a segment name.
Returns:
Returns: true if the segment is in the qualified segments list.
"""
with self.lock:
if self._qualified_segments is not None:
return segment in self._qualified_segments
return False
def get_qualified_segments(self) -> Optional[list[str]]:
"""
Gets the qualified segments.
Returns:
A list of qualified segment names.
"""
with self.lock:
if self._qualified_segments is not None:
return self._qualified_segments.copy()
return None
def set_qualified_segments(self, segments: Optional[list[str]]) -> None:
"""
Replaces any qualified segments with the provided list of segments.
Args:
segments: a list of segment names.
Returns:
None.
"""
with self.lock:
self._qualified_segments = None if segments is None else segments.copy()
def fetch_qualified_segments(
self,
callback: Optional[Callable[[bool], None]] = None,
options: Optional[list[str]] = None
) -> bool | threading.Thread:
"""
Fetch all qualified segments for the user context.
The fetched segments will be saved and can be accessed using get/set_qualified_segment methods.
Args:
callback: An optional function to run after the fetch has completed. The function will be provided
a boolean value indicating if the fetch was successful. If a callback is provided, the fetch
will be run in a seperate thread, otherwise it will be run syncronously.
options: An array of OptimizelySegmentOptions used to ignore and/or reset the cache (optional).
Returns:
A boolean value indicating if the fetch was successful.
"""
def _fetch_qualified_segments() -> bool:
segments = self.client._fetch_qualified_segments(self.user_id, options or []) if self.client else None
self.set_qualified_segments(segments)
success = segments is not None
if callable(callback):
callback(success)
return success
if callback:
fetch_thread = threading.Thread(target=_fetch_qualified_segments, name="FetchQualifiedSegmentsThread")
fetch_thread.start()
return fetch_thread
else:
return _fetch_qualified_segments()