Skip to content

Commit bd6435a

Browse files
authored
Merge pull request googleapis#1914 from linar-jether/master
Support blob streaming for file-like objects
2 parents c618091 + 7464c5a commit bd6435a

4 files changed

Lines changed: 125 additions & 13 deletions

File tree

gcloud/storage/blob.py

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import copy
1919
import hashlib
2020
from io import BytesIO
21+
from io import UnsupportedOperation
2122
import json
2223
import mimetypes
2324
import os
@@ -491,10 +492,11 @@ def upload_from_file(self, file_obj, rewind=False, size=None,
491492
total_bytes = size
492493
if total_bytes is None:
493494
if hasattr(file_obj, 'fileno'):
494-
total_bytes = os.fstat(file_obj.fileno()).st_size
495-
else:
496-
raise ValueError('total bytes could not be determined. Please '
497-
'pass an explicit size.')
495+
try:
496+
total_bytes = os.fstat(file_obj.fileno()).st_size
497+
except (OSError, UnsupportedOperation):
498+
pass # Assuming fd is not an actual file (maybe socket).
499+
498500
headers = {
499501
'Accept': 'application/json',
500502
'Accept-Encoding': 'gzip, deflate',
@@ -510,6 +512,13 @@ def upload_from_file(self, file_obj, rewind=False, size=None,
510512
if self.chunk_size is not None:
511513
upload.chunksize = self.chunk_size
512514

515+
if total_bytes is None:
516+
upload.strategy = RESUMABLE_UPLOAD
517+
elif total_bytes is None:
518+
raise ValueError('total bytes could not be determined. Please '
519+
'pass an explicit size, or supply a chunk size '
520+
'for a streaming transfer.')
521+
513522
url_builder = _UrlBuilder(bucket_name=self.bucket.name,
514523
object_name=self.name)
515524
upload_config = _UploadConfig()

gcloud/storage/test_blob.py

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -507,6 +507,102 @@ def _upload_from_file_simple_test_helper(self, properties=None,
507507
self.assertEqual(headers['Content-Length'], '6')
508508
self.assertEqual(headers['Content-Type'], expected_content_type)
509509

510+
def test_upload_from_file_stream(self):
511+
from six.moves.http_client import OK
512+
from six.moves.urllib.parse import parse_qsl
513+
from six.moves.urllib.parse import urlsplit
514+
from gcloud.streaming import http_wrapper
515+
516+
BLOB_NAME = 'blob-name'
517+
UPLOAD_URL = 'http://example.com/upload/name/key'
518+
DATA = b'ABCDE'
519+
loc_response = {'status': OK, 'location': UPLOAD_URL}
520+
chunk1_response = {'status': http_wrapper.RESUME_INCOMPLETE,
521+
'range': 'bytes 0-4'}
522+
chunk2_response = {'status': OK}
523+
# Need valid JSON on last response, since resumable.
524+
connection = _Connection(
525+
(loc_response, b''),
526+
(chunk1_response, b''),
527+
(chunk2_response, b'{}'),
528+
)
529+
client = _Client(connection)
530+
bucket = _Bucket(client)
531+
blob = self._makeOne(BLOB_NAME, bucket=bucket)
532+
blob._CHUNK_SIZE_MULTIPLE = 1
533+
blob.chunk_size = 5
534+
535+
from gcloud.streaming.test_transfer import _Stream
536+
file_obj = _Stream(DATA)
537+
538+
# Mock stream closes at end of data, like a socket might
539+
def is_stream_closed(stream):
540+
if stream.tell() < len(DATA):
541+
return stream._closed
542+
else:
543+
return stream.close() or True
544+
545+
_Stream.closed = property(is_stream_closed)
546+
547+
def fileno_mock():
548+
from io import UnsupportedOperation
549+
raise UnsupportedOperation()
550+
551+
file_obj.fileno = fileno_mock
552+
553+
blob.upload_from_file(file_obj)
554+
555+
# Remove the temp property
556+
delattr(_Stream, "closed")
557+
558+
rq = connection.http._requested
559+
self.assertEqual(len(rq), 3)
560+
561+
# Requested[0]
562+
headers = dict(
563+
[(x.title(), str(y)) for x, y in rq[0].pop('headers').items()])
564+
self.assertEqual(headers['Content-Length'], '0')
565+
self.assertEqual(headers['X-Upload-Content-Type'],
566+
'application/octet-stream')
567+
568+
uri = rq[0].pop('uri')
569+
scheme, netloc, path, qs, _ = urlsplit(uri)
570+
self.assertEqual(scheme, 'http')
571+
self.assertEqual(netloc, 'example.com')
572+
self.assertEqual(path, '/b/name/o')
573+
self.assertEqual(dict(parse_qsl(qs)),
574+
{'uploadType': 'resumable', 'name': BLOB_NAME})
575+
self.assertEqual(rq[0], {
576+
'method': 'POST',
577+
'body': '',
578+
'connection_type': None,
579+
'redirections': 5,
580+
})
581+
582+
# Requested[1]
583+
headers = dict(
584+
[(x.title(), str(y)) for x, y in rq[1].pop('headers').items()])
585+
self.assertEqual(headers['Content-Range'], 'bytes 0-4/*')
586+
self.assertEqual(rq[1], {
587+
'method': 'PUT',
588+
'uri': UPLOAD_URL,
589+
'body': DATA[:5],
590+
'connection_type': None,
591+
'redirections': 5,
592+
})
593+
594+
# Requested[2]
595+
headers = dict(
596+
[(x.title(), str(y)) for x, y in rq[2].pop('headers').items()])
597+
self.assertEqual(headers['Content-Range'], 'bytes */5')
598+
self.assertEqual(rq[2], {
599+
'method': 'PUT',
600+
'uri': UPLOAD_URL,
601+
'body': DATA[5:],
602+
'connection_type': None,
603+
'redirections': 5,
604+
})
605+
510606
def test_upload_from_file_simple(self):
511607
self._upload_from_file_simple_test_helper(
512608
expected_content_type='application/octet-stream')

gcloud/streaming/buffered_stream.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,12 @@ def __init__(self, stream, start, size):
2020
self._stream = stream
2121
self._start_pos = start
2222
self._buffer_pos = 0
23-
self._buffered_data = self._stream.read(size)
23+
24+
if not hasattr(self._stream, 'closed') or not self._stream.closed:
25+
self._buffered_data = self._stream.read(size)
26+
else:
27+
self._buffered_data = b''
28+
2429
self._stream_at_end = len(self._buffered_data) < size
2530
self._end_pos = self._start_pos + len(self._buffered_data)
2631

gcloud/streaming/transfer.py

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1038,14 +1038,16 @@ def stream_file(self, use_chunks=True):
10381038
'Failed to transfer all bytes in chunk, upload paused at '
10391039
'byte %d' % self.progress)
10401040
if self.complete and hasattr(self.stream, 'seek'):
1041-
current_pos = self.stream.tell()
1042-
self.stream.seek(0, os.SEEK_END)
1043-
end_pos = self.stream.tell()
1044-
self.stream.seek(current_pos)
1045-
if current_pos != end_pos:
1046-
raise TransferInvalidError(
1047-
'Upload complete with %s additional bytes left in stream' %
1048-
(int(end_pos) - int(current_pos)))
1041+
if not hasattr(self.stream, 'seekable') or self.stream.seekable():
1042+
current_pos = self.stream.tell()
1043+
self.stream.seek(0, os.SEEK_END)
1044+
end_pos = self.stream.tell()
1045+
self.stream.seek(current_pos)
1046+
if current_pos != end_pos:
1047+
raise TransferInvalidError(
1048+
'Upload complete with %s '
1049+
'additional bytes left in stream' %
1050+
(int(end_pos) - int(current_pos)))
10491051
return response
10501052

10511053
def _send_media_request(self, request, end):

0 commit comments

Comments
 (0)