-
Notifications
You must be signed in to change notification settings - Fork 13
Expand file tree
/
Copy pathtelemetry.py
More file actions
172 lines (130 loc) · 5.87 KB
/
telemetry.py
File metadata and controls
172 lines (130 loc) · 5.87 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
"""Telemetry Sync Class."""
import abc
class TelemetrySynchronizer(object):
"""Telemetry synchronizer class."""
def __init__(self, telemetry_submitter):
"""Initialize Telemetry sync class."""
self._telemetry_submitter = telemetry_submitter
def synchronize_config(self):
"""synchronize initial config data class."""
self._telemetry_submitter.synchronize_config()
def synchronize_stats(self):
"""synchronize runtime stats class."""
self._telemetry_submitter.synchronize_stats()
class TelemetrySynchronizerAsync(object):
"""Telemetry synchronizer class."""
def __init__(self, telemetry_submitter):
"""Initialize Telemetry sync class."""
self._telemetry_submitter = telemetry_submitter
async def synchronize_config(self):
"""synchronize initial config data class."""
await self._telemetry_submitter.synchronize_config()
async def synchronize_stats(self):
"""synchronize runtime stats class."""
await self._telemetry_submitter.synchronize_stats()
class TelemetrySubmitter(object, metaclass=abc.ABCMeta):
"""Telemetry sumbitter interface."""
@abc.abstractmethod
def synchronize_config(self):
"""synchronize initial config data classe."""
@abc.abstractmethod
def synchronize_stats(self):
"""synchronize runtime stats class."""
class InMemoryTelemetrySubmitter(TelemetrySubmitter):
"""Telemetry sumbitter class."""
def __init__(self, telemetry_consumer, feature_flag_storage, segment_storage, telemetry_api):
"""Initialize all producer classes."""
self._telemetry_init_consumer = telemetry_consumer.get_telemetry_init_consumer()
self._telemetry_evaluation_consumer = telemetry_consumer.get_telemetry_evaluation_consumer()
self._telemetry_runtime_consumer = telemetry_consumer.get_telemetry_runtime_consumer()
self._telemetry_api = telemetry_api
self._feature_flag_storage = feature_flag_storage
self._segment_storage = segment_storage
def synchronize_config(self):
"""synchronize initial config data classe."""
self._telemetry_api.record_init(self._telemetry_init_consumer.get_config_stats())
def synchronize_stats(self):
"""synchronize runtime stats class."""
self._telemetry_api.record_stats(self._build_stats())
def _build_stats(self):
"""
Format stats to Dict.
:returns: formatted stats
:rtype: Dict
"""
merged_dict = {
'spC': self._feature_flag_storage.get_splits_count(),
'seC': self._segment_storage.get_segments_count(),
'skC': self._segment_storage.get_segments_keys_count()
}
merged_dict.update(self._telemetry_runtime_consumer.pop_formatted_stats())
merged_dict.update(self._telemetry_evaluation_consumer.pop_formatted_stats())
return merged_dict
class InMemoryTelemetrySubmitterAsync(TelemetrySubmitter):
"""Telemetry sumbitter async class."""
def __init__(self, telemetry_consumer, feature_flag_storage, segment_storage, telemetry_api):
"""Initialize all producer classes."""
self._telemetry_init_consumer = telemetry_consumer.get_telemetry_init_consumer()
self._telemetry_evaluation_consumer = telemetry_consumer.get_telemetry_evaluation_consumer()
self._telemetry_runtime_consumer = telemetry_consumer.get_telemetry_runtime_consumer()
self._telemetry_api = telemetry_api
self._feature_flag_storage = feature_flag_storage
self._segment_storage = segment_storage
async def synchronize_config(self):
"""synchronize initial config data classe."""
await self._telemetry_api.record_init(await self._telemetry_init_consumer.get_config_stats())
async def synchronize_stats(self):
"""synchronize runtime stats class."""
await self._telemetry_api.record_stats(await self._build_stats())
async def _build_stats(self):
"""
Format stats to Dict.
:returns: formatted stats
:rtype: Dict
"""
merged_dict = {
'spC': await self._feature_flag_storage.get_splits_count(),
'seC': await self._segment_storage.get_segments_count(),
'skC': await self._segment_storage.get_segments_keys_count()
}
merged_dict.update(await self._telemetry_runtime_consumer.pop_formatted_stats())
merged_dict.update(await self._telemetry_evaluation_consumer.pop_formatted_stats())
return merged_dict
class RedisTelemetrySubmitter(object):
"""Telemetry sumbitter class."""
def __init__(self, telemetry_storage):
"""Initialize all producer classes."""
self._telemetry_storage = telemetry_storage
def synchronize_config(self):
"""synchronize initial config data classe."""
self._telemetry_storage.push_config_stats()
def synchronize_stats(self):
"""No implementation."""
pass
class RedisTelemetrySubmitterAsync(object):
"""Telemetry sumbitter class."""
def __init__(self, telemetry_storage):
"""Initialize all producer classes."""
self._telemetry_storage = telemetry_storage
async def synchronize_config(self):
"""synchronize initial config data classe."""
await self._telemetry_storage.push_config_stats()
async def synchronize_stats(self):
"""No implementation."""
pass
class LocalhostTelemetrySubmitter(object):
"""Telemetry sumbitter class."""
def synchronize_config(self):
"""No implementation."""
pass
def synchronize_stats(self):
"""No implementation."""
pass
class LocalhostTelemetrySubmitterAsync(object):
"""Telemetry sumbitter class."""
async def synchronize_config(self):
"""No implementation."""
pass
async def synchronize_stats(self):
"""No implementation."""
pass