This repository was archived by the owner on Dec 25, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 39
Expand file tree
/
Copy pathrpc_test.py
More file actions
96 lines (79 loc) · 3.3 KB
/
rpc_test.py
File metadata and controls
96 lines (79 loc) · 3.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
#
# Copyright 2008 The ndb Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Some tests for datastore_rpc.py."""
from .google_imports import apiproxy_stub_map
from .google_imports import datastore_rpc
from .google_test_imports import unittest
from . import model
from . import test_utils
class PendingTests(test_utils.NDBTest):
"""Tests for the 'pending RPC' management."""
def testBasicSetup1(self):
ent = model.Expando()
ent.foo = 'bar'
rpc = self.conn.async_put(None, [ent])
[key] = rpc.get_result()
self.assertEqual(key, model.Key(flat=['Expando', 1]))
def testBasicSetup2(self):
key = model.Key(flat=['Expando', 1])
rpc = self.conn.async_get(None, [key])
[ent] = rpc.get_result()
self.assertTrue(ent is None)
def SetUpCallHooks(self):
self.pre_args = []
self.post_args = []
apiproxy_stub_map.apiproxy.GetPreCallHooks().Append('test1',
self.PreCallHook)
apiproxy_stub_map.apiproxy.GetPostCallHooks().Append('test1',
self.PostCallHook)
def PreCallHook(self, service, call, request, response, rpc=None):
self.pre_args.append((service, call, request, response, rpc))
def PostCallHook(self, service, call, request, response,
rpc=None, error=None):
self.post_args.append((service, call, request, response, rpc, error))
def testCallHooks(self):
self.SetUpCallHooks()
key = model.Key(flat=['Expando', 1])
rpc = self.conn.async_get(None, [key])
self.assertEqual(len(self.pre_args), 1)
self.assertEqual(self.post_args, [])
[ent] = rpc.get_result()
self.assertTrue(ent is None)
self.assertEqual(len(self.pre_args), 1)
self.assertEqual(len(self.post_args), 1)
self.assertEqual(self.pre_args[0][:2], ('datastore_v3', 'Get'))
self.assertEqual(self.post_args[0][:2], ('datastore_v3', 'Get'))
def testCallHooks_Pending(self):
self.SetUpCallHooks()
key = model.Key(flat=['Expando', 1])
rpc = self.conn.async_get(None, [key])
self.conn.wait_for_all_pending_rpcs()
self.assertEqual(rpc.state, 2) # FINISHING
self.assertEqual(len(self.pre_args), 1)
self.assertEqual(len(self.post_args), 1) # NAILED IT!
self.assertEqual(self.conn.get_pending_rpcs(), set())
def NastyCallback(self, rpc):
rpc.get_result()
key = model.Key(flat=['Expando', 1])
self.conn.async_get(None, [key])
def testCallHooks_Pending_CallbackAddsMore(self):
self.SetUpCallHooks()
conf = datastore_rpc.Configuration(on_completion=self.NastyCallback)
key = model.Key(flat=['Expando', 1])
self.conn.async_get(conf, [key])
self.conn.wait_for_all_pending_rpcs()
self.assertEqual(self.conn.get_pending_rpcs(), set())
if __name__ == '__main__':
unittest.main()