forked from aws/aws-cli
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_sync_command.py
More file actions
276 lines (249 loc) · 11.7 KB
/
Copy pathtest_sync_command.py
File metadata and controls
276 lines (249 loc) · 11.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
# Copyright 2013 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
from awscli.testutils import set_invalid_utime
from mock import patch
import os
from awscli.compat import six
from tests.functional.s3 import BaseS3TransferCommandTest
class TestSyncCommand(BaseS3TransferCommandTest):
prefix = 's3 sync '
def test_website_redirect_ignore_paramfile(self):
full_path = self.files.create_file('foo.txt', 'mycontent')
cmdline = '%s %s s3://bucket/key.txt --website-redirect %s' % \
(self.prefix, self.files.rootdir, 'http://someserver')
self.parsed_responses = [
{"CommonPrefixes": [], "Contents": []},
{'ETag': '"c8afdb36c52cf4727836669019e69222"'}
]
self.run_cmd(cmdline, expected_rc=0)
# The only operations we should have called are ListObjectsV2/PutObject.
self.assertEqual(len(self.operations_called), 2, self.operations_called)
self.assertEqual(self.operations_called[0][0].name, 'ListObjectsV2')
self.assertEqual(self.operations_called[1][0].name, 'PutObject')
# Make sure that the specified web address is used as opposed to the
# contents of the web address when uploading the object
self.assertEqual(
self.operations_called[1][1]['WebsiteRedirectLocation'],
'http://someserver'
)
def test_no_recursive_option(self):
cmdline = '. s3://mybucket --recursive'
# Return code will be 2 for invalid parameter ``--recursive``
self.run_cmd(cmdline, expected_rc=2)
def test_sync_from_non_existant_directory(self):
non_existant_directory = os.path.join(self.files.rootdir, 'fakedir')
cmdline = '%s %s s3://bucket/' % (self.prefix, non_existant_directory)
self.parsed_responses = [
{"CommonPrefixes": [], "Contents": []}
]
_, stderr, _ = self.run_cmd(cmdline, expected_rc=255)
self.assertIn('does not exist', stderr)
def test_sync_to_non_existant_directory(self):
key = 'foo.txt'
non_existant_directory = os.path.join(self.files.rootdir, 'fakedir')
cmdline = '%s s3://bucket/ %s' % (self.prefix, non_existant_directory)
self.parsed_responses = [
{"CommonPrefixes": [], "Contents": [
{"Key": key, "Size": 3,
"LastModified": "2014-01-09T20:45:49.000Z"}]},
{'ETag': '"c8afdb36c52cf4727836669019e69222-"',
'Body': six.BytesIO(b'foo')}
]
self.run_cmd(cmdline, expected_rc=0)
# Make sure the file now exists.
self.assertTrue(
os.path.exists(os.path.join(non_existant_directory, key)))
def test_glacier_sync_with_force_glacier(self):
self.parsed_responses = [
{
'Contents': [
{'Key': 'foo/bar.txt', 'ContentLength': '100',
'LastModified': '00:00:00Z',
'StorageClass': 'GLACIER',
'Size': 100},
],
'CommonPrefixes': []
},
{'ETag': '"foo-1"', 'Body': six.BytesIO(b'foo')},
]
cmdline = '%s s3://bucket/foo %s --force-glacier-transfer' % (
self.prefix, self.files.rootdir)
self.run_cmd(cmdline, expected_rc=0)
self.assertEqual(len(self.operations_called), 2, self.operations_called)
self.assertEqual(self.operations_called[0][0].name, 'ListObjectsV2')
self.assertEqual(self.operations_called[1][0].name, 'GetObject')
def test_handles_glacier_incompatible_operations(self):
self.parsed_responses = [
{'Contents': [
{'Key': 'foo', 'Size': 100,
'LastModified': '00:00:00Z', 'StorageClass': 'GLACIER'}]}
]
cmdline = '%s s3://bucket/ %s' % (
self.prefix, self.files.rootdir)
_, stderr, _ = self.run_cmd(cmdline, expected_rc=2)
# There should not have been a download attempted because the
# operation was skipped because it is glacier incompatible.
self.assertEqual(len(self.operations_called), 1)
self.assertEqual(self.operations_called[0][0].name, 'ListObjectsV2')
self.assertIn('GLACIER', stderr)
def test_turn_off_glacier_warnings(self):
self.parsed_responses = [
{'Contents': [
{'Key': 'foo', 'Size': 100,
'LastModified': '00:00:00Z', 'StorageClass': 'GLACIER'}]}
]
cmdline = '%s s3://bucket/ %s --ignore-glacier-warnings' % (
self.prefix, self.files.rootdir)
_, stderr, _ = self.run_cmd(cmdline, expected_rc=0)
# There should not have been a download attempted because the
# operation was skipped because it is glacier incompatible.
self.assertEqual(len(self.operations_called), 1)
self.assertEqual(self.operations_called[0][0].name, 'ListObjectsV2')
self.assertEqual('', stderr)
def test_warning_on_invalid_timestamp(self):
full_path = self.files.create_file('foo.txt', 'mycontent')
# Set the update time to a value that will raise a ValueError when
# converting to datetime
set_invalid_utime(full_path)
cmdline = '%s %s s3://bucket/key.txt' % \
(self.prefix, self.files.rootdir)
self.parsed_responses = [
{"CommonPrefixes": [], "Contents": []},
{'ETag': '"c8afdb36c52cf4727836669019e69222"'}
]
self.run_cmd(cmdline, expected_rc=2)
# We should still have put the object
self.assertEqual(len(self.operations_called), 2, self.operations_called)
self.assertEqual(self.operations_called[0][0].name, 'ListObjectsV2')
self.assertEqual(self.operations_called[1][0].name, 'PutObject')
def test_sync_with_delete_on_downloads(self):
full_path = self.files.create_file('foo.txt', 'mycontent')
cmdline = '%s s3://bucket %s --delete' % (
self.prefix, self.files.rootdir)
self.parsed_responses = [
{"CommonPrefixes": [], "Contents": []},
{'ETag': '"c8afdb36c52cf4727836669019e69222"'}
]
self.run_cmd(cmdline, expected_rc=0)
# The only operations we should have called are ListObjectsV2.
self.assertEqual(len(self.operations_called), 1, self.operations_called)
self.assertEqual(self.operations_called[0][0].name, 'ListObjectsV2')
self.assertFalse(os.path.exists(full_path))
# When a file has been deleted after listing,
# awscli.customizations.s3.utils.get_file_stat may raise either some kind
# of OSError, or a ValueError, depending on the environment. In both cases,
# the behaviour should be the same: skip the file and emit a warning.
#
# This test covers the case where a ValueError is emitted.
def test_sync_skips_over_files_deleted_between_listing_and_transfer_valueerror(self):
full_path = self.files.create_file('foo.txt', 'mycontent')
cmdline = '%s %s s3://bucket/' % (
self.prefix, self.files.rootdir)
# FileGenerator.list_files should skip over files that cause an
# IOError to be raised because they are missing when we try to
# get their stats. This IOError is translated to a ValueError in
# awscli.customizations.s3.utils.get_file_stat.
def side_effect(_):
os.remove(full_path)
raise ValueError()
with patch(
'awscli.customizations.s3.filegenerator.get_file_stat',
side_effect=side_effect
):
self.run_cmd(cmdline, expected_rc=2)
# We should not call PutObject because the file was deleted
# before we could transfer it
self.assertEqual(len(self.operations_called), 1, self.operations_called)
self.assertEqual(self.operations_called[0][0].name, 'ListObjectsV2')
# This test covers the case where an OSError is emitted.
def test_sync_skips_over_files_deleted_between_listing_and_transfer_oserror(self):
full_path = self.files.create_file('foo.txt', 'mycontent')
cmdline = '%s %s s3://bucket/' % (
self.prefix, self.files.rootdir)
# FileGenerator.list_files should skip over files that cause an
# OSError to be raised because they are missing when we try to
# get their stats.
def side_effect(_):
os.remove(full_path)
raise OSError()
with patch(
'awscli.customizations.s3.filegenerator.get_file_stat',
side_effect=side_effect
):
self.run_cmd(cmdline, expected_rc=2)
# We should not call PutObject because the file was deleted
# before we could transfer it
self.assertEqual(len(self.operations_called), 1, self.operations_called)
self.assertEqual(self.operations_called[0][0].name, 'ListObjectsV2')
def test_request_payer(self):
cmdline = '%s s3://sourcebucket/ s3://mybucket --request-payer' % (
self.prefix)
self.parsed_responses = [
# Response for ListObjects on source bucket
self.list_objects_response(['mykey']),
# Response for ListObjects on destination bucket
self.list_objects_response([]),
self.copy_object_response(),
]
self.run_cmd(cmdline, expected_rc=0)
self.assert_operations_called(
[
self.list_objects_request(
'sourcebucket', RequestPayer='requester'),
self.list_objects_request(
'mybucket', RequestPayer='requester'),
self.copy_object_request(
'sourcebucket', 'mykey', 'mybucket', 'mykey',
RequestPayer='requester')
]
)
def test_request_payer_with_deletes(self):
cmdline = '%s s3://sourcebucket/ s3://mybucket' % self.prefix
cmdline += ' --request-payer'
cmdline += ' --delete'
self.parsed_responses = [
# Response for ListObjects on source bucket
self.list_objects_response([]),
# Response for ListObjects on destination bucket
self.list_objects_response(['key-to-delete']),
self.delete_object_response()
]
self.run_cmd(cmdline, expected_rc=0)
self.assert_operations_called(
[
self.list_objects_request(
'sourcebucket', RequestPayer='requester'),
self.list_objects_request(
'mybucket', RequestPayer='requester'),
self.delete_object_request(
'mybucket', 'key-to-delete', RequestPayer='requester'),
]
)
def test_with_accesspoint_arn(self):
accesspoint_arn = (
'arn:aws:s3:us-west-2:123456789012:accesspoint/endpoint'
)
cmdline = self.prefix
cmdline += 's3://%s' % accesspoint_arn
cmdline += ' %s' % self.files.rootdir
self.parsed_responses = [
self.list_objects_response(['mykey']),
self.get_object_response(),
]
self.run_cmd(cmdline, expected_rc=0)
self.assert_operations_called(
[
self.list_objects_request(accesspoint_arn),
self.get_object_request(accesspoint_arn, 'mykey')
]
)