Skip to content
This repository was archived by the owner on Mar 26, 2026. It is now read-only.

Commit 7601e4d

Browse files
feat: Client side metrics support for mutateRows (#1638)
* Add metrics collector hooks in the right places * Move readrows tests over * Group ReadRows under separate describe block * Add mutateRows tests * Add onResponse to mutateRows collection * Eliminate the extra mutateRows calls * Change the test frame to work without inserting * Remove console traces * Remove the error console log * Inserts will conflate results for readRows too * Remove only * Remove only * Remove the extra onResponse call * Include onOperationComplete in the callback * Remove the onOperationComplete call * Get rid of error code fragment * onResponse handler moved into metrics collector * Rename handleStatusAndMetadata * Add comments, shorten snippet * Add the wrapRequest method to the mock * Pass null along instead * Add retries comment * Use the same setup table code as before * Rename method to setupBigtableWithInsert * Keep setupBigtable name as setupBigtable * Eliminate unused import * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * Run the linter * Make sure the table is set up properly, but don’t add a mutateRow call for the handlers tests * Remove only --------- Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
1 parent afcd78f commit 7601e4d

9 files changed

Lines changed: 1046 additions & 16 deletions

src/client-side-metrics/operation-metrics-collector.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ export class OperationMetricsCollector {
168168
*
169169
* @param stream
170170
*/
171-
handleStatusAndMetadata(stream: AbortableDuplex) {
171+
wrapRequest(stream: AbortableDuplex) {
172172
stream
173173
.on(
174174
'metadata',
@@ -183,7 +183,10 @@ export class OperationMetricsCollector {
183183
}) => {
184184
this.onStatusMetadataReceived(status);
185185
},
186-
);
186+
)
187+
.on('data', () => {
188+
this.onResponse();
189+
});
187190
}
188191

189192
/**

src/tabular-api-surface.ts

Lines changed: 45 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -341,7 +341,33 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
341341
(a, b) => a.concat(b),
342342
[],
343343
);
344+
const collectMetricsCallback = (
345+
originalError: ServiceError | null,
346+
err: ServiceError | PartialFailureError | null,
347+
apiResponse?: google.protobuf.Empty,
348+
) => {
349+
// originalError is the error that was sent from the gapic layer. The
350+
// compiler guarantees that it contains a code which needs to be
351+
// provided when an operation is marked complete.
352+
//
353+
// err is the error we intend to send back to the user. Often it is the
354+
// same as originalError, but in one case we construct a
355+
// PartialFailureError and send that back to the user instead. In this
356+
// case, we still need to pass the originalError into the method
357+
// because the PartialFailureError doesn't have a code, but we need to
358+
// communicate a code to the metrics collector.
359+
//
360+
const code = originalError ? originalError.code : 0;
361+
metricsCollector.onOperationComplete(code);
362+
callback(err, apiResponse);
363+
};
344364

365+
const metricsCollector =
366+
this.bigtable._metricsConfigManager.createOperation(
367+
MethodName.MUTATE_ROWS,
368+
StreamingState.STREAMING,
369+
this,
370+
);
345371
/*
346372
The following line of code sets the timeout if it was provided while
347373
creating the client. This will be used to determine if the client should
@@ -387,18 +413,23 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
387413
const onBatchResponse = (err: ServiceError | null) => {
388414
// Return if the error happened before a request was made
389415
if (numRequestsMade === 0) {
390-
callback(err);
416+
collectMetricsCallback(err, err);
391417
return;
392418
}
393419

394420
const timeoutExceeded = !!(
395421
timeout && timeout < new Date().getTime() - callTimeMillis
396422
);
397423
if (isRetryable(err, timeoutExceeded)) {
424+
// If the timeout or max retries is exceeded or if there are no
425+
// pending indices left then the client doesn't retry.
426+
// Otherwise, the client will retry if there is no error or if the
427+
// error has a retryable status code.
398428
const backOffSettings =
399429
options.gaxOptions?.retry?.backoffSettings ||
400430
DEFAULT_BACKOFF_SETTINGS;
401431
const nextDelay = getNextDelay(numRequestsMade, backOffSettings);
432+
metricsCollector.onAttemptComplete(err ? err.code : 0);
402433
setTimeout(makeNextBatchRequest, nextDelay);
403434
return;
404435
}
@@ -411,7 +442,10 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
411442

412443
const mutationErrors = Array.from(mutationErrorsByEntryIndex.values());
413444
if (mutationErrorsByEntryIndex.size !== 0) {
414-
callback(new PartialFailureError(mutationErrors, err));
445+
collectMetricsCallback(
446+
err,
447+
new PartialFailureError(mutationErrors, err),
448+
);
415449
return;
416450
}
417451
if (err) {
@@ -425,13 +459,15 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
425459
.filter(index => !mutationErrorsByEntryIndex.has(index))
426460
.map(() => err),
427461
);
428-
callback(err);
462+
collectMetricsCallback(err, err);
429463
return;
430464
}
431-
callback(err);
465+
collectMetricsCallback(null, null);
432466
};
433467

468+
metricsCollector.onOperationStart();
434469
const makeNextBatchRequest = () => {
470+
metricsCollector.onAttemptStart();
435471
const entryBatch = entries.filter((entry: Entry, index: number) => {
436472
return pendingEntryIndices.has(index);
437473
});
@@ -469,14 +505,16 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
469505
options.gaxOptions,
470506
);
471507

472-
this.bigtable
473-
.request<google.bigtable.v2.MutateRowsResponse>({
508+
const requestStream =
509+
this.bigtable.request<google.bigtable.v2.MutateRowsResponse>({
474510
client: 'BigtableClient',
475511
method: 'mutateRows',
476512
reqOpts,
477513
gaxOpts: options.gaxOptions,
478514
retryOpts,
479-
})
515+
});
516+
metricsCollector.wrapRequest(requestStream);
517+
requestStream
480518
.on('error', (err: ServiceError) => {
481519
onBatchResponse(err);
482520
})

src/utils/createReadStreamInternal.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -364,7 +364,7 @@ export function createReadStreamInternal(
364364

365365
rowStream = pumpify.obj([requestStream, chunkTransformer, toRowStream]);
366366

367-
metricsCollector.handleStatusAndMetadata(requestStream);
367+
metricsCollector.wrapRequest(requestStream);
368368
rowStream
369369
.on('error', (error: ServiceError) => {
370370
rowStreamUnpipe(rowStream, userStream);
@@ -413,7 +413,6 @@ export function createReadStreamInternal(
413413
// Reset error count after a successful read so the backoff
414414
// time won't keep increasing when as stream had multiple errors
415415
numConsecutiveErrors = 0;
416-
metricsCollector.onResponse();
417416
})
418417
.on('end', () => {
419418
activeRequestStream = null;

0 commit comments

Comments
 (0)