Skip to content
This repository was archived by the owner on Mar 31, 2026. It is now read-only.

Commit 637019e

Browse files
committed
remove extra logs
1 parent 53abafe commit 637019e

4 files changed

Lines changed: 1 addition & 46 deletions

File tree

google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -265,7 +265,6 @@ async def open(
265265
:raises ValueError: If the stream is already open.
266266
267267
"""
268-
print("DEBUG: AsyncAppendableObjectWriter.open() called")
269268
if self._is_stream_open:
270269
raise ValueError("Underlying bidi-gRPC stream is already open")
271270

@@ -291,7 +290,6 @@ def combined_on_error(exc):
291290
)
292291

293292
async def _do_open():
294-
print("DEBUG: AsyncAppendableObjectWriter.open._do_open() called")
295293
current_metadata = list(metadata) if metadata else []
296294

297295
# Cleanup stream from previous failed attempt, if any.
@@ -312,7 +310,6 @@ async def _do_open():
312310
write_handle=self.write_handle,
313311
routing_token=self._routing_token,
314312
)
315-
print(f"DEBUG: AsyncAppendableObjectWriter.open._do_open() - created _AsyncWriteObjectStream with routing_token: {self._routing_token}")
316313

317314
if self._routing_token:
318315
current_metadata.append(
@@ -322,7 +319,6 @@ async def _do_open():
322319
await self.write_obj_stream.open(
323320
metadata=current_metadata if metadata else None
324321
)
325-
print("DEBUG: AsyncAppendableObjectWriter.open._do_open() - self.write_obj_stream.open() finished")
326322

327323
if self.write_obj_stream.generation_number:
328324
self.generation = self.write_obj_stream.generation_number
@@ -333,10 +329,8 @@ async def _do_open():
333329

334330
self._is_stream_open = True
335331
self._routing_token = None
336-
print(f"DEBUG: AsyncAppendableObjectWriter.open._do_open() - stream opened. persisted_size: {self.persisted_size}, write_handle: {self.write_handle}")
337332

338333
await retry_policy(_do_open)()
339-
print("DEBUG: AsyncAppendableObjectWriter.open() finished")
340334

341335
async def append(
342336
self,
@@ -365,7 +359,6 @@ async def append(
365359
366360
:raises ValueError: If the stream is not open.
367361
"""
368-
print(f"AsyncAppendableObjectWriter: append() called with {len(data)} bytes")
369362
if not self._is_stream_open:
370363
raise ValueError("Stream is not open. Call open() before append().")
371364
if not data:
@@ -389,7 +382,6 @@ async def generator():
389382
resp = None
390383
async with self._lock:
391384
write_state = state["write_state"]
392-
print(f"AsyncAppendableObjectWriter: append - attempt {attempt_count}, write_state: {write_state}")
393385
# If this is a retry or redirect, we must re-open the stream
394386
if attempt_count > 1 or write_state.routing_token:
395387
logger.info(
@@ -429,10 +421,8 @@ async def generator():
429421
# Check if this is an open/state-lookup request (no checksummed_data)
430422
if chunk_req.state_lookup and not chunk_req.checksummed_data:
431423
# This is an open request - send it and get response
432-
print(f"AsyncAppendableObjectWriter: append - sending state_lookup request: {chunk_req}")
433424
await self.write_obj_stream.send(chunk_req)
434425
resp = await self.write_obj_stream.recv()
435-
print(f"AsyncAppendableObjectWriter: append - received state_lookup response")
436426

437427
# Update state from open response
438428
if resp:
@@ -446,12 +436,10 @@ async def generator():
446436
continue
447437

448438
# This is a data request - send it
449-
print(f"AsyncAppendableObjectWriter: append - sending data request:")
450439
await self.write_obj_stream.send(chunk_req)
451440

452441
# Get final response from the last request (which has state_lookup=True)
453442
resp = await self.write_obj_stream.recv()
454-
print(f"AsyncAppendableObjectWriter: append - received final response")
455443

456444
if resp:
457445
if resp.persisted_size is not None:
@@ -473,7 +461,6 @@ async def generator():
473461
write_state.persisted_size = self.persisted_size
474462
write_state.bytes_sent = self.persisted_size
475463
write_state.bytes_since_last_flush = self.bytes_appended_since_last_flush
476-
print(f"AsyncAppendableObjectWriter: append - initial write_state: {write_state}")
477464

478465
retry_manager = _BidiStreamRetryManager(
479466
_WriteResumptionStrategy(),
@@ -487,7 +474,6 @@ async def generator():
487474
self.bytes_appended_since_last_flush = write_state.bytes_since_last_flush
488475
self.persisted_size = write_state.persisted_size
489476
self.offset = write_state.persisted_size
490-
print(f"AsyncAppendableObjectWriter: append - finished. persisted_size: {self.persisted_size}")
491477

492478
async def simple_flush(self) -> None:
493479
"""Flushes the data to the server.
@@ -550,7 +536,6 @@ async def close(self, finalize_on_close=False) -> Union[int, _storage_v2.Object]
550536
been called).
551537
552538
"""
553-
print(f"AsyncAppendableObjectWriter: close() called with finalize_on_close={finalize_on_close}")
554539
if not self._is_stream_open:
555540
raise ValueError("Stream is not open. Call open() before close().")
556541

@@ -580,19 +565,16 @@ async def finalize(self) -> _storage_v2.Object:
580565
:raises ValueError: If the stream is not open (i.e., `open()` has not
581566
been called).
582567
"""
583-
print("AsyncAppendableObjectWriter: finalize() called")
584568
if not self._is_stream_open:
585569
raise ValueError("Stream is not open. Call open() before finalize().")
586570

587571
await self.write_obj_stream.send(
588572
_storage_v2.BidiWriteObjectRequest(finish_write=True)
589573
)
590574
response = await self.write_obj_stream.recv()
591-
print(f"AsyncAppendableObjectWriter: finalize - received response: {response}")
592575
self.object_resource = response.resource
593576
self.persisted_size = self.object_resource.size
594577
await self.write_obj_stream.close()
595-
print("AsyncAppendableObjectWriter: finalize - stream closed")
596578

597579
self._is_stream_open = False
598580
self.offset = None

google/cloud/storage/_experimental/asyncio/async_write_object_stream.py

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,6 @@ def __init__(
7373
write_handle: Optional[bytes] = None,
7474
routing_token: Optional[str] = None,
7575
) -> None:
76-
print(f"in _AsyncWriteObjectStream __init__: object_name={object_name}")
7776
if client is None:
7877
raise ValueError("client must be provided")
7978
if bucket_name is None:
@@ -115,7 +114,6 @@ async def open(self, metadata: Optional[List[Tuple[str, str]]] = None) -> None:
115114
:raises google.api_core.exceptions.FailedPrecondition:
116115
if `generation_number` is 0 and object already exists.
117116
"""
118-
print(f"in _AsyncWriteObjectStream open: object_name={self.object_name}")
119117
if self._is_stream_open:
120118
raise ValueError("Stream is already open")
121119

@@ -146,7 +144,6 @@ async def open(self, metadata: Optional[List[Tuple[str, str]]] = None) -> None:
146144
routing_token=self.routing_token if self.routing_token else None,
147145
),
148146
)
149-
print(f"in _AsyncWriteObjectStream open: first_bidi_write_req={self.first_bidi_write_req}")
150147

151148
request_params = [f"bucket={self._full_bucket_name}"]
152149
other_metadata = []
@@ -167,9 +164,7 @@ async def open(self, metadata: Optional[List[Tuple[str, str]]] = None) -> None:
167164
)
168165

169166
await self.socket_like_rpc.open() # this is actually 1 send
170-
print(f"in _AsyncWriteObjectStream open: rpc opened")
171167
response = await self.socket_like_rpc.recv()
172-
print(f"in _AsyncWriteObjectStream open: received response:")
173168
self._is_stream_open = True
174169

175170
if response.persisted_size:
@@ -189,7 +184,6 @@ async def open(self, metadata: Optional[List[Tuple[str, str]]] = None) -> None:
189184

190185
async def close(self) -> None:
191186
"""Closes the bidi-gRPC connection."""
192-
print(f"in _AsyncWriteObjectStream close: object_name={self.object_name}")
193187
if not self._is_stream_open:
194188
raise ValueError("Stream is not open")
195189
await self.requests_done()
@@ -198,7 +192,6 @@ async def close(self) -> None:
198192

199193
async def requests_done(self):
200194
"""Signals that all requests have been sent."""
201-
print(f"in _AsyncWriteObjectStream requests_done: object_name={self.object_name}")
202195
await self.socket_like_rpc.send(None)
203196
await self.socket_like_rpc.recv()
204197

@@ -212,7 +205,6 @@ async def send(
212205
The request message to send. This is typically used to specify
213206
the read offset and limit.
214207
"""
215-
print(f"in _AsyncWriteObjectStream send: sending request")
216208
if not self._is_stream_open:
217209
raise ValueError("Stream is not open")
218210
await self.socket_like_rpc.send(bidi_write_object_request)
@@ -229,9 +221,7 @@ async def recv(self) -> _storage_v2.BidiWriteObjectResponse:
229221
"""
230222
if not self._is_stream_open:
231223
raise ValueError("Stream is not open")
232-
print(f"in _AsyncWriteObjectStream recv: receiving response")
233224
response = await self.socket_like_rpc.recv()
234-
print(f"in _AsyncWriteObjectStream recv: received response")
235225
# Update write_handle if present in response
236226
if response:
237227
if response.write_handle:

google/cloud/storage/_experimental/asyncio/retry/bidi_stream_retry_manager.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@ def __init__(
3636
bidi operation (e.g., reads or writes).
3737
send_and_recv: An async callable that opens a new gRPC stream.
3838
"""
39-
print("Initializing _BidiStreamRetryManager.")
4039
self._strategy = strategy
4140
self._send_and_recv = send_and_recv
4241

@@ -48,11 +47,9 @@ async def execute(self, initial_state: Any, retry_policy):
4847
retry_policy: The `google.api_core.retry.AsyncRetry` object to
4948
govern the retry behavior for this specific operation.
5049
"""
51-
print(f"Executing bidi stream with initial_state: {initial_state}")
5250
state = initial_state
5351

5452
async def attempt():
55-
print("New attempt for bidi stream operation.")
5653
requests_generator = self._strategy.generate_requests(state)
5754
stream = self._send_and_recv(requests_generator, state)
5855
try:
@@ -61,7 +58,7 @@ async def attempt():
6158
return
6259
except Exception as e:
6360
if retry_policy._predicate(e):
64-
print(
61+
logger.info(
6562
f"Bidi stream operation failed: {e}. Attempting state recovery and retry."
6663
)
6764
await self._strategy.recover_state_on_failure(e, state)

google/cloud/storage/_experimental/asyncio/retry/writes_resumption_strategy.py

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,6 @@ def __init__(
4949
user_buffer: IO[bytes],
5050
flush_interval: Optional[int] = None,
5151
):
52-
print(
53-
f"Initializing _WriteState with chunk_size: {chunk_size}, flush_interval: {flush_interval}"
54-
)
5552
self.chunk_size = chunk_size
5653
self.user_buffer = user_buffer
5754
self.total_size = self.user_buffer.getbuffer().nbytes
@@ -89,7 +86,6 @@ def generate_requests(self, state: Dict[str, Any]):
8986
to ensure the server persists the final data and returns the updated state.
9087
"""
9188
write_state: _WriteState = state["write_state"]
92-
print(f"Generating requests with state: {write_state}")
9389

9490
# If this is a retry/redirect, yield a state lookup request first
9591
# This allows the sender to get current persisted_size before proceeding
@@ -98,14 +94,12 @@ def generate_requests(self, state: Dict[str, Any]):
9894
or write_state.bytes_sent > write_state.persisted_size
9995
):
10096
# Yield an open/state-lookup request with no data
101-
print("Yielding state_lookup request.")
10297
yield storage_type.BidiWriteObjectRequest(state_lookup=True)
10398

10499
# The buffer should already be seeked to the correct position (persisted_size)
105100
# by the `recover_state_on_failure` method before this is called.
106101
while not write_state.is_finalized:
107102
chunk = write_state.user_buffer.read(write_state.chunk_size)
108-
print(f"Read chunk of size: {len(chunk)}")
109103

110104
if not chunk:
111105
break
@@ -126,7 +120,6 @@ def generate_requests(self, state: Dict[str, Any]):
126120

127121
write_state.bytes_sent += chunk_len
128122
write_state.bytes_since_last_flush += chunk_len
129-
print(f"Yielding request with offset: {request.write_offset}")
130123

131124
is_flush_point = (
132125
write_state.flush_interval
@@ -137,11 +130,9 @@ def generate_requests(self, state: Dict[str, Any]):
137130
request.flush = True
138131
request.state_lookup = True
139132
write_state.bytes_since_last_flush = 0
140-
print("Marking request with flush=True and state_lookup=True")
141133
elif is_flush_point:
142134
request.flush = True
143135
write_state.bytes_since_last_flush = 0
144-
print("Marking request with flush=True")
145136

146137
yield request
147138

@@ -150,7 +141,6 @@ def update_state_from_response(
150141
) -> None:
151142
"""Processes a server response and updates the write state."""
152143
write_state: _WriteState = state["write_state"]
153-
print(f"Updating state from response: {response}")
154144
if response is None:
155145
return
156146
if response.persisted_size:
@@ -163,7 +153,6 @@ def update_state_from_response(
163153
write_state.persisted_size = response.resource.size
164154
if response.resource.finalize_time:
165155
write_state.is_finalized = True
166-
print(f"New state: {write_state}")
167156

168157
async def recover_state_on_failure(
169158
self, error: Exception, state: Dict[str, Any]
@@ -174,7 +163,6 @@ async def recover_state_on_failure(
174163
This method rewinds the user buffer and internal byte tracking to the
175164
last confirmed 'persisted_size' from the server.
176165
"""
177-
print(f"Recovering from error: {error}")
178166
write_state: _WriteState = state["write_state"]
179167

180168
redirect_proto = None
@@ -193,9 +181,7 @@ async def recover_state_on_failure(
193181

194182
# We must assume any data sent beyond 'persisted_size' was lost.
195183
# Reset the user buffer to the last known good byte confirmed by the server.
196-
print(f"Seeking buffer to: {write_state.persisted_size}")
197184
write_state.user_buffer.seek(write_state.persisted_size)
198185
write_state.bytes_sent = write_state.persisted_size
199186
write_state.bytes_since_last_flush = 0
200-
print(f"Recovered state: {write_state}")
201187

0 commit comments

Comments
 (0)