This repository was archived by the owner on Dec 25, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 39
Expand file tree
/
Copy pathstress.py
More file actions
128 lines (97 loc) · 3.33 KB
/
stress.py
File metadata and controls
128 lines (97 loc) · 3.33 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
#
# Copyright 2008 The ndb Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Stress test for ndb with Python 2.7 threadsafe."""
import logging
import random
import threading
import time
from google.appengine.api import apiproxy_stub_map
from google.appengine.datastore import datastore_stub_util
from google.appengine.ext import testbed
from ndb import model, tasklets
INSTANCES = 4
RUNS = 10
logger = logging.getLogger()
logger.setLevel(logging.INFO)
class EmptyModel(model.Model):
pass
@tasklets.tasklet
def workload(id, run):
key = model.Key(EmptyModel, 1)
ent = EmptyModel(key=key)
keys = []
time.sleep(random.random() / 10)
def tx1():
new_key = yield ent.put_async()
assert key == new_key, (id, run)
def tx2():
new_ent = yield key.get_async()
assert ent == new_ent, (id, run)
def tx3():
ents = [EmptyModel() for _ in range(10)]
futs = [e.put_async() for e in ents]
keys.extend((yield futs))
for key, ent in zip(keys, ents):
assert ent == key.get()
def tx4():
yield key.delete_async()
for key in keys:
assert (yield key.get_async) is None
yield [model.transaction_async(tx, xg=True) for tx in tx1, tx2, tx3, tx4]
class Stress(threading.Thread):
def run(self):
global cache_policy, memcache_policy, datastore_policy
ctx = tasklets.get_context()
ctx.set_cache_policy(cache_policy)
ctx.set_memcache_policy(memcache_policy)
ctx.set_datastore_policy(datastore_policy)
id = threading.current_thread().ident
try:
for run in range(1, RUNS + 1):
workload(id, run).check_success()
except Exception, e:
logger.exception('Thread %d run %d raised %s: %s',
id, run, e.__class__.__name__, e)
finally:
logger.info('Thread %d stopped on run %d', id, run)
def main():
global cache_policy, memcache_policy, datastore_policy
# Test every single policy choice
for cache_policy in (True, False):
for memcache_policy in (True, False):
for datastore_policy in (True, False):
if not (cache_policy or memcache_policy or datastore_policy):
continue
logger.info('c: %i mc: %i ds: %i', cache_policy, memcache_policy,
datastore_policy)
tb = testbed.Testbed()
tb.activate()
tb.init_datastore_v3_stub()
tb.init_memcache_stub()
tb.init_taskqueue_stub()
datastore_stub = apiproxy_stub_map.apiproxy.GetStub('datastore_v3')
datastore_stub.SetConsistencyPolicy(
datastore_stub_util.BaseHighReplicationConsistencyPolicy())
threads = []
for _ in range(INSTANCES):
stress_thread = Stress()
stress_thread.start()
threads.append(stress_thread)
for t in threads:
t.join()
tb.deactivate()
if __name__ == '__main__':
main()