forked from redhat-openstack/packstack
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_base.py
More file actions
155 lines (129 loc) · 5.38 KB
/
Copy pathtest_base.py
File metadata and controls
155 lines (129 loc) · 5.38 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013, Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import shutil
import tempfile
import subprocess
import logging
LOG = logging.getLogger(__name__)
class FakePopen(object):
'''The FakePopen class replaces subprocess.Popen. Instead of actually
executing commands, it permits the caller to register a list of
commands the output to produce using the FakePopen.register and
FakePopen.register_as_script method. By default, FakePopen will return
empty stdout and stderr and a successful (0) returncode.
'''
cmd_registry = {}
script_registry = {}
@classmethod
def register(cls, args, stdout='', stderr='', returncode=0):
'''Register a fake command.'''
if isinstance(args, list):
args = tuple(args)
cls.cmd_registry[args] = {'stdout': stdout,
'stderr': stderr,
'returncode': returncode}
@classmethod
def register_as_script(cls, args, stdout='', stderr='', returncode=0):
'''Register a fake script.'''
if isinstance(args, list):
args = '\n'.join(args)
prefix = "function t(){ exit $? ; } \n trap t ERR \n"
args = prefix + args
cls.script_registry[args] = {'stdout': stdout,
'stderr': stderr,
'returncode': returncode}
def __init__(self, args, **kwargs):
script = ["ssh", "-o", "StrictHostKeyChecking=no",
"-o", "UserKnownHostsFile=/dev/null"]
if args[-1] == "bash -x" and args[:5] == script:
self._init_as_script(args, **kwargs)
else:
self._init_as_cmd(args, **kwargs)
def _init_as_cmd(self, args, **kwargs):
self._is_script = False
if isinstance(args, list):
args = tuple(args)
cmd = ' '.join(args)
else:
cmd = args
if args in self.cmd_registry:
this = self.cmd_registry[args]
else:
LOG.warning('call to unregistered command: %s', cmd)
this = {'stdout': '', 'stderr': '', 'returncode': 0}
self.stdout = this['stdout']
self.stderr = this['stderr']
self.returncode = this['returncode']
def _init_as_script(self, args, **kwargs):
self._is_script = True
def communicate(self, input=None):
if self._is_script:
if input in self.script_registry:
this = self.script_registry[input]
else:
LOG.warning('call to unregistered script: %s', input)
this = {'stdout': '', 'stderr': '', 'returncode': 0}
self.stdout = this['stdout']
self.stderr = this['stderr']
self.returncode = this['returncode']
return self.stdout, self.stderr
class PackstackTestCaseMixin(object):
"""
Implementation of some assertion methods available by default
in Python2.7+ only
"""
def setUp(self):
# Creating a temp directory that can be used by tests
self.tempdir = tempfile.mkdtemp()
# some plugins call popen, we're replacing it for tests
self._Popen = subprocess.Popen
self.fake_popen = subprocess.Popen = FakePopen
def tearDown(self):
# remove the temp directory
shutil.rmtree(self.tempdir)
subprocess.Popen = self._Popen
def assertItemsEqual(self, list1, list2, msg=None):
f, s = len(list1), len(list2)
_msg = msg or ('Element counts were not equal. First has %s, '
'Second has %s' % (f, s))
self.assertEqual(f, s, msg=_msg)
_msg = msg or ('Given lists differ:\n%(list1)s'
'\n%(list2)s' % locals())
for i in list1:
if i not in list2:
raise AssertionError(_msg)
def assertListEqual(self, list1, list2, msg=None):
f, s = len(list1), len(list2)
_msg = msg or ('Element counts were not equal. First has %s, '
'Second has %s' % (f, s))
self.assertEqual(f, s, msg=_msg)
_msg = msg or ('Given lists differ:\n%(list1)s'
'\n%(list2)s' % locals())
for index, item in enumerate(list1):
if item != list2[index]:
raise AssertionError(_msg)
def assertIsInstance(self, obj, cls, msg=None):
_msg = msg or ('%s is not an instance of %s' % (obj, cls))
if not isinstance(obj, cls):
raise AssertionError(_msg)
def assertIn(self, first, second, msg=None):
_msg = msg or ('%s is not a member of %s' % (first, second))
if first not in second:
raise AssertionError(_msg)
def assertIsNone(self, expr, msg=None):
_msg = msg or ('%s is not None' % expr)
if expr is not None:
raise AssertionError(_msg)