33import os
44import uuid
55from io import BytesIO
6+ import random
67
78# python additional imports
9+ import google_crc32c
10+
811import pytest
912
1013# current library imports
1114from google .cloud .storage ._experimental .asyncio .async_grpc_client import AsyncGrpcClient
1215from google .cloud .storage ._experimental .asyncio .async_appendable_object_writer import (
1316 AsyncAppendableObjectWriter ,
17+ _MAX_BUFFER_SIZE_BYTES ,
1418)
1519from google .cloud .storage ._experimental .asyncio .async_multi_range_downloader import (
1620 AsyncMultiRangeDownloader ,
2832_BYTES_TO_UPLOAD = b"dummy_bytes_to_write_read_and_delete_appendable_object"
2933
3034
35+ def _get_equal_dist (a : int , b : int ) -> tuple [int , int ]:
36+ step = (b - a ) // 3
37+ return a + step , a + 2 * step
38+
39+
3140async def write_one_appendable_object (
3241 bucket_name : str ,
3342 object_name : str ,
@@ -59,11 +68,21 @@ def appendable_object(storage_client, blobs_to_delete):
5968
6069
6170@pytest .mark .asyncio
71+ @pytest .mark .parametrize (
72+ "object_size" ,
73+ [
74+ 256 , # less than _chunk size
75+ 10 * 1024 * 1024 , # less than _MAX_BUFFER_SIZE_BYTES
76+ 20 * 1024 * 1024 , # greater than _MAX_BUFFER_SIZE
77+ ],
78+ )
6279@pytest .mark .parametrize (
6380 "attempt_direct_path" ,
6481 [True , False ],
6582)
66- async def test_basic_wrd (storage_client , blobs_to_delete , attempt_direct_path ):
83+ async def test_basic_wrd (
84+ storage_client , blobs_to_delete , attempt_direct_path , object_size
85+ ):
6786 object_name = f"test_basic_wrd-{ str (uuid .uuid4 ())} "
6887
6988 # Client instantiation; it cannot be part of fixture because.
@@ -74,22 +93,76 @@ async def test_basic_wrd(storage_client, blobs_to_delete, attempt_direct_path):
7493 # 2. we can keep the same event loop for entire module but that may
7594 # create issues if tests are run in parallel and one test hogs the event
7695 # loop slowing down other tests.
96+ object_data = os .urandom (object_size )
97+ object_checksum = google_crc32c .value (object_data )
7798 grpc_client = AsyncGrpcClient (attempt_direct_path = attempt_direct_path ).grpc_client
7899
79100 writer = AsyncAppendableObjectWriter (grpc_client , _ZONAL_BUCKET , object_name )
80101 await writer .open ()
81- await writer .append (_BYTES_TO_UPLOAD )
102+ await writer .append (object_data )
82103 object_metadata = await writer .close (finalize_on_close = True )
83- assert object_metadata .size == len (_BYTES_TO_UPLOAD )
104+ assert object_metadata .size == object_size
105+ assert int (object_metadata .checksums .crc32c ) == object_checksum
84106
85107 mrd = AsyncMultiRangeDownloader (grpc_client , _ZONAL_BUCKET , object_name )
86108 buffer = BytesIO ()
87109 await mrd .open ()
88110 # (0, 0) means read the whole object
89111 await mrd .download_ranges ([(0 , 0 , buffer )])
90112 await mrd .close ()
91- assert buffer .getvalue () == _BYTES_TO_UPLOAD
92- assert mrd .persisted_size == len (_BYTES_TO_UPLOAD )
113+ assert buffer .getvalue () == object_data
114+ assert mrd .persisted_size == object_size
115+
116+ # Clean up; use json client (i.e. `storage_client` fixture) to delete.
117+ blobs_to_delete .append (storage_client .bucket (_ZONAL_BUCKET ).blob (object_name ))
118+
119+
120+ @pytest .mark .asyncio
121+ @pytest .mark .parametrize (
122+ "object_size" ,
123+ [
124+ 20 * 1024 * 1024 , # greater than _MAX_BUFFER_SIZE
125+ ],
126+ )
127+ @pytest .mark .parametrize (
128+ "attempt_direct_path" ,
129+ [True ],
130+ )
131+ async def test_basic_wrd_in_slices (
132+ storage_client , blobs_to_delete , attempt_direct_path , object_size
133+ ):
134+ object_name = f"test_basic_wrd-{ str (uuid .uuid4 ())} "
135+
136+ # Client instantiation; it cannot be part of fixture because.
137+ # grpc_client's event loop and event loop of coroutine running it
138+ # (i.e. this test) must be same.
139+ # Note:
140+ # 1. @pytest.mark.asyncio ensures new event loop for each test.
141+ # 2. we can keep the same event loop for entire module but that may
142+ # create issues if tests are run in parallel and one test hogs the event
143+ # loop slowing down other tests.
144+ object_data = os .urandom (object_size )
145+ object_checksum = google_crc32c .value (object_data )
146+ grpc_client = AsyncGrpcClient (attempt_direct_path = attempt_direct_path ).grpc_client
147+
148+ writer = AsyncAppendableObjectWriter (grpc_client , _ZONAL_BUCKET , object_name )
149+ await writer .open ()
150+ mark1 , mark2 = _get_equal_dist (0 , object_size )
151+ await writer .append (object_data [0 :mark1 ])
152+ await writer .append (object_data [mark1 :mark2 ])
153+ await writer .append (object_data [mark2 :])
154+ object_metadata = await writer .close (finalize_on_close = True )
155+ assert object_metadata .size == object_size
156+ assert int (object_metadata .checksums .crc32c ) == object_checksum
157+
158+ mrd = AsyncMultiRangeDownloader (grpc_client , _ZONAL_BUCKET , object_name )
159+ buffer = BytesIO ()
160+ await mrd .open ()
161+ # (0, 0) means read the whole object
162+ await mrd .download_ranges ([(0 , 0 , buffer )])
163+ await mrd .close ()
164+ assert buffer .getvalue () == object_data
165+ assert mrd .persisted_size == object_size
93166
94167 # Clean up; use json client (i.e. `storage_client` fixture) to delete.
95168 blobs_to_delete .append (storage_client .bucket (_ZONAL_BUCKET ).blob (object_name ))
0 commit comments