Skip to content

Commit 89d6e7e

Browse files
committed
CLOUDSTACK-4817: fix s3 multipart uplaod
Conflicts: plugins/hypervisors/xen/src/com/cloud/hypervisor/xen/resource/XenServerStorageProcessor.java
1 parent 2cac1aa commit 89d6e7e

2 files changed

Lines changed: 71 additions & 8 deletions

File tree

plugins/hypervisors/xen/src/com/cloud/hypervisor/xen/resource/XenServerStorageProcessor.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,11 @@
1818
*/
1919
package com.cloud.hypervisor.xen.resource;
2020

21+
22+
import static com.cloud.utils.ReflectUtil.flattenProperties;
23+
import static com.google.common.collect.Lists.newArrayList;
24+
25+
2126
import java.io.File;
2227
import java.net.URI;
2328
import java.util.Arrays;
@@ -82,9 +87,6 @@
8287
import com.xensource.xenapi.VM;
8388
import com.xensource.xenapi.VMGuestMetrics;
8489

85-
import static com.cloud.utils.ReflectUtil.flattenProperties;
86-
import static com.google.common.collect.Lists.newArrayList;
87-
8890
public class XenServerStorageProcessor implements StorageProcessor {
8991
private static final Logger s_logger = Logger.getLogger(XenServerStorageProcessor.class);
9092
protected CitrixResourceBase hypervisorResource;
@@ -1091,11 +1093,12 @@ private String backupSnapshotToS3(final Connection connection, final S3TO s3, fi
10911093
S3Utils.ClientOptions.class));
10921094
// https workaround for Introspector bug that does not
10931095
// recognize Boolean accessor methods ...
1096+
10941097
parameters.addAll(Arrays.asList("operation", "put", "filename",
10951098
dir + "/" + filename, "iSCSIFlag",
10961099
iSCSIFlag.toString(), "bucket", s3.getBucketName(),
10971100
"key", key, "https", s3.isHttps() != null ? s3.isHttps().toString()
1098-
: "null"));
1101+
: "null", "maxSingleUploadSizeInBytes", String.valueOf(s3.getMaxSingleUploadSizeInBytes())));
10991102
final String result = hypervisorResource.callHostPluginAsync(connection, "s3xen",
11001103
"s3", wait,
11011104
parameters.toArray(new String[parameters.size()]));

scripts/vm/hypervisor/xenserver/s3xen

Lines changed: 64 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import base64
3434
import hmac
3535
import traceback
3636
import urllib2
37+
from xml.dom.minidom import parseString
3738

3839
import XenAPIPlugin
3940
sys.path.extend(["/opt/xensource/sm/"])
@@ -260,15 +261,73 @@ class S3Client(object):
260261
sha).digest())[:-1]
261262

262263
return signature, request_date
264+
265+
def getText(self, nodelist):
266+
rc = []
267+
for node in nodelist:
268+
if node.nodeType == node.TEXT_NODE:
269+
rc.append(node.data)
270+
return ''.join(rc)
271+
272+
def multiUpload(self, bucket, key, src_fileName, chunkSize=5 * 1024 * 1024):
273+
uploadId={}
274+
def readInitalMultipart(response):
275+
data = response.read()
276+
xmlResult = parseString(data)
277+
result = xmlResult.getElementsByTagName("InitiateMultipartUploadResult")[0]
278+
upload = result.getElementsByTagName("UploadId")[0]
279+
uploadId["0"] = upload.childNodes[0].data
280+
281+
self.do_operation('POST', bucket, key + "?uploads", fn_read=readInitalMultipart)
282+
283+
fileSize = os.path.getsize(src_fileName)
284+
parts = fileSize / chunkSize + ((fileSize % chunkSize) and 1)
285+
part = 1
286+
srcFile = open(src_fileName, 'rb')
287+
etags = []
288+
while part <= parts:
289+
offset = part - 1
290+
size = min(fileSize - offset * chunkSize, chunkSize)
291+
headers = {
292+
self.HEADER_CONTENT_LENGTH: size
293+
}
294+
def send_body(connection):
295+
srcFile.seek(offset * chunkSize)
296+
block = srcFile.read(size)
297+
connection.send(block)
298+
def read_multiPart(response):
299+
etag = response.getheader('ETag')
300+
etags.append((part, etag))
301+
self.do_operation("PUT", bucket, "%s?partNumber=%s&uploadId=%s"%(key, part, uploadId["0"]), headers, send_body, read_multiPart)
302+
part = part + 1
303+
srcFile.close()
304+
305+
data = []
306+
partXml = "<Part><PartNumber>%i</PartNumber><ETag>%s</ETag></Part>"
307+
for etag in etags:
308+
data.append(partXml%etag)
309+
msg = "<CompleteMultipartUpload>%s</CompleteMultipartUpload>"%("".join(data))
310+
size = len(msg)
311+
headers = {
312+
self.HEADER_CONTENT_LENGTH: size
313+
}
314+
def send_complete_multipart(connection):
315+
connection.send(msg)
316+
self.do_operation("POST", bucket, "%s?uploadId=%s"%(key, uploadId["0"]), headers, send_complete_multipart)
263317

264-
def put(self, bucket, key, src_filename):
318+
def put(self, bucket, key, src_filename, maxSingleUpload):
265319

266320
if not os.path.isfile(src_filename):
267321
raise Exception(
268322
"Attempt to put " + src_filename + " that does not exist.")
269323

324+
size = os.path.getsize(src_filename)
325+
if size > maxSingleUpload or maxSingleUpload == 0:
326+
return self.multiUpload(bucket, key, src_filename)
327+
270328
headers = {
271329
self.HEADER_CONTENT_MD5: compute_md5(src_filename),
330+
272331
self.HEADER_CONTENT_TYPE: 'application/octet-stream',
273332
self.HEADER_CONTENT_LENGTH: str(os.stat(src_filename).st_size),
274333
}
@@ -323,6 +382,7 @@ def parseArguments(args):
323382
bucket = args['bucket']
324383
key = args['key']
325384
filename = args['filename']
385+
maxSingleUploadBytes = int(args["maxSingleUploadSizeInBytes"])
326386

327387
if is_blank(operation):
328388
raise ValueError('An operation must be specified.')
@@ -336,18 +396,18 @@ def parseArguments(args):
336396
if is_blank(filename):
337397
raise ValueError('A filename must be specified.')
338398

339-
return client, operation, bucket, key, filename
399+
return client, operation, bucket, key, filename, maxSingleUploadBytes
340400

341401

342402
@echo
343403
def s3(session, args):
344404

345-
client, operation, bucket, key, filename = parseArguments(args)
405+
client, operation, bucket, key, filename, maxSingleUploadBytes = parseArguments(args)
346406

347407
try:
348408

349409
if operation == 'put':
350-
client.put(bucket, key, filename)
410+
client.put(bucket, key, filename, maxSingleUploadBytes)
351411
elif operation == 'get':
352412
client.get(bucket, key, filename)
353413
elif operation == 'delete':

0 commit comments

Comments
 (0)