Skip to content
This repository was archived by the owner on Mar 26, 2026. It is now read-only.

Commit f190a8c

Browse files
authored
feat: For client side metrics, record metrics as MUTATE_ROW for single row mutates (#1650)
* Remove unused import * Move mutateInternal into an internal method * Eliminate unused dependencies * Adjust mocks so tests pass * Fix mocks to mock out mutate internal * Add mutateRow test suite 1 * Add handler tests for MutateRow * Add header
1 parent 8af801b commit f190a8c

7 files changed

Lines changed: 561 additions & 224 deletions

File tree

src/row.ts

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import {
3636
MethodName,
3737
StreamingState,
3838
} from './client-side-metrics/client-side-metrics-attributes';
39+
import {mutateInternal} from './utils/mutateInternal';
3940

4041
export interface Rule {
4142
column: string;
@@ -830,7 +831,19 @@ export class Row {
830831
method: Mutation.methods.INSERT,
831832
} as {} as Entry;
832833
this.data = {};
833-
this.table.mutate(mutation, gaxOptions as {} as MutateOptions, callback);
834+
const metricsCollector =
835+
this.bigtable._metricsConfigManager.createOperation(
836+
MethodName.MUTATE_ROW,
837+
StreamingState.UNARY,
838+
this.table,
839+
);
840+
mutateInternal(
841+
this.table,
842+
metricsCollector,
843+
mutation,
844+
gaxOptions as {} as MutateOptions,
845+
callback,
846+
);
834847
}
835848
}
836849

src/tabular-api-surface.ts

Lines changed: 2 additions & 205 deletions
Original file line numberDiff line numberDiff line change
@@ -29,14 +29,14 @@ import {BackoffSettings} from 'google-gax/build/src/gax';
2929
import {google} from '../protos/protos';
3030
import {CallOptions, grpc, ServiceError} from 'google-gax';
3131
import {Transform} from 'stream';
32-
import * as is from 'is';
3332
import {GoogleInnerError} from './table';
3433
import {createReadStreamInternal} from './utils/createReadStreamInternal';
3534
import {getRowsInternal} from './utils/getRowsInternal';
3635
import {
3736
MethodName,
3837
StreamingState,
3938
} from './client-side-metrics/client-side-metrics-attributes';
39+
import {mutateInternal} from './utils/mutateInternal';
4040

4141
// See protos/google/rpc/code.proto
4242
// (4=DEADLINE_EXCEEDED, 8=RESOURCE_EXHAUSTED, 10=ABORTED, 14=UNAVAILABLE)
@@ -333,216 +333,13 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
333333
optionsOrCallback?: MutateOptions | MutateCallback,
334334
cb?: MutateCallback,
335335
): void | Promise<MutateResponse> {
336-
const callback =
337-
typeof optionsOrCallback === 'function' ? optionsOrCallback : cb!;
338-
const options =
339-
typeof optionsOrCallback === 'object' ? optionsOrCallback : {};
340-
const entries: Entry[] = (arrify(entriesRaw) as Entry[]).reduce(
341-
(a, b) => a.concat(b),
342-
[],
343-
);
344-
const collectMetricsCallback = (
345-
originalError: ServiceError | null,
346-
err: ServiceError | PartialFailureError | null,
347-
apiResponse?: google.protobuf.Empty,
348-
) => {
349-
// originalError is the error that was sent from the gapic layer. The
350-
// compiler guarantees that it contains a code which needs to be
351-
// provided when an operation is marked complete.
352-
//
353-
// err is the error we intend to send back to the user. Often it is the
354-
// same as originalError, but in one case we construct a
355-
// PartialFailureError and send that back to the user instead. In this
356-
// case, we still need to pass the originalError into the method
357-
// because the PartialFailureError doesn't have a code, but we need to
358-
// communicate a code to the metrics collector.
359-
//
360-
const code = originalError ? originalError.code : 0;
361-
metricsCollector.onOperationComplete(code);
362-
callback(err, apiResponse);
363-
};
364-
365336
const metricsCollector =
366337
this.bigtable._metricsConfigManager.createOperation(
367338
MethodName.MUTATE_ROWS,
368339
StreamingState.STREAMING,
369340
this,
370341
);
371-
/*
372-
The following line of code sets the timeout if it was provided while
373-
creating the client. This will be used to determine if the client should
374-
retry on errors. Eventually, this will be handled downstream in google-gax.
375-
*/
376-
const timeout =
377-
options?.gaxOptions?.timeout ||
378-
(this?.bigtable?.options?.BigtableClient?.clientConfig?.interfaces &&
379-
this?.bigtable?.options?.BigtableClient?.clientConfig?.interfaces[
380-
'google.bigtable.v2.Bigtable'
381-
]?.methods['MutateRows']?.timeout_millis);
382-
const callTimeMillis = new Date().getTime();
383-
384-
let numRequestsMade = 0;
385-
386-
const maxRetries = is.number(this.maxRetries) ? this.maxRetries! : 3;
387-
const pendingEntryIndices = new Set(
388-
entries.map((entry: Entry, index: number) => index),
389-
);
390-
const entryToIndex = new Map(
391-
entries.map((entry: Entry, index: number) => [entry, index]),
392-
);
393-
const mutationErrorsByEntryIndex = new Map();
394-
395-
const isRetryable = (
396-
err: ServiceError | null,
397-
timeoutExceeded: boolean,
398-
) => {
399-
if (timeoutExceeded) {
400-
// If the timeout has been exceeded then do not retry.
401-
return false;
402-
}
403-
// Don't retry if there are no more entries or retry attempts
404-
if (pendingEntryIndices.size === 0 || numRequestsMade >= maxRetries + 1) {
405-
return false;
406-
}
407-
// If the error is empty but there are still outstanding mutations,
408-
// it means that there are retryable errors in the mutate response
409-
// even when the RPC succeeded
410-
return !err || RETRYABLE_STATUS_CODES.has(err.code);
411-
};
412-
413-
const onBatchResponse = (err: ServiceError | null) => {
414-
// Return if the error happened before a request was made
415-
if (numRequestsMade === 0) {
416-
collectMetricsCallback(err, err);
417-
return;
418-
}
419-
420-
const timeoutExceeded = !!(
421-
timeout && timeout < new Date().getTime() - callTimeMillis
422-
);
423-
if (isRetryable(err, timeoutExceeded)) {
424-
// If the timeout or max retries is exceeded or if there are no
425-
// pending indices left then the client doesn't retry.
426-
// Otherwise, the client will retry if there is no error or if the
427-
// error has a retryable status code.
428-
const backOffSettings =
429-
options.gaxOptions?.retry?.backoffSettings ||
430-
DEFAULT_BACKOFF_SETTINGS;
431-
const nextDelay = getNextDelay(numRequestsMade, backOffSettings);
432-
metricsCollector.onAttemptComplete(err ? err.code : 0);
433-
setTimeout(makeNextBatchRequest, nextDelay);
434-
return;
435-
}
436-
437-
// If there's no more pending mutations, set the error
438-
// to null
439-
if (pendingEntryIndices.size === 0) {
440-
err = null;
441-
}
442-
443-
const mutationErrors = Array.from(mutationErrorsByEntryIndex.values());
444-
if (mutationErrorsByEntryIndex.size !== 0) {
445-
collectMetricsCallback(
446-
err,
447-
new PartialFailureError(mutationErrors, err),
448-
);
449-
return;
450-
}
451-
if (err) {
452-
/* If there's an RPC level failure and the mutation entries don't have
453-
a status code, the RPC level failure error code will be used as the
454-
entry failure code.
455-
*/
456-
(err as ServiceError & {errors?: ServiceError[]}).errors =
457-
mutationErrors.concat(
458-
[...pendingEntryIndices]
459-
.filter(index => !mutationErrorsByEntryIndex.has(index))
460-
.map(() => err),
461-
);
462-
collectMetricsCallback(err, err);
463-
return;
464-
}
465-
collectMetricsCallback(null, null);
466-
};
467-
468-
metricsCollector.onOperationStart();
469-
const makeNextBatchRequest = () => {
470-
metricsCollector.onAttemptStart();
471-
const entryBatch = entries.filter((entry: Entry, index: number) => {
472-
return pendingEntryIndices.has(index);
473-
});
474-
475-
// If the viewName is provided then request will be made for an
476-
// authorized view. Otherwise, the request is made for a table.
477-
const baseReqOpts = (
478-
this.viewName
479-
? {
480-
authorizedViewName: `${this.name}/authorizedViews/${this.viewName}`,
481-
}
482-
: {
483-
tableName: this.name,
484-
}
485-
) as google.bigtable.v2.IReadRowsRequest;
486-
const reqOpts = Object.assign(baseReqOpts, {
487-
appProfileId: this.bigtable.appProfileId,
488-
entries: options.rawMutation
489-
? entryBatch
490-
: entryBatch.map(Mutation.parse),
491-
});
492-
493-
const retryOpts = {
494-
currentRetryAttempt: numRequestsMade,
495-
// Handling retries in this client. Specify the retry options to
496-
// make sure nothing is retried in retry-request.
497-
noResponseRetries: 0,
498-
shouldRetryFn: (_: any) => {
499-
return false;
500-
},
501-
};
502-
503-
options.gaxOptions = populateAttemptHeader(
504-
numRequestsMade,
505-
options.gaxOptions,
506-
);
507-
508-
const requestStream =
509-
this.bigtable.request<google.bigtable.v2.MutateRowsResponse>({
510-
client: 'BigtableClient',
511-
method: 'mutateRows',
512-
reqOpts,
513-
gaxOpts: options.gaxOptions,
514-
retryOpts,
515-
});
516-
metricsCollector.wrapRequest(requestStream);
517-
requestStream
518-
.on('error', (err: ServiceError) => {
519-
onBatchResponse(err);
520-
})
521-
.on('data', (obj: google.bigtable.v2.IMutateRowsResponse) => {
522-
obj.entries!.forEach(entry => {
523-
const originalEntry = entryBatch[entry.index as number];
524-
const originalEntriesIndex = entryToIndex.get(originalEntry)!;
525-
526-
// Mutation was successful.
527-
if (entry.status!.code === 0) {
528-
pendingEntryIndices.delete(originalEntriesIndex);
529-
mutationErrorsByEntryIndex.delete(originalEntriesIndex);
530-
return;
531-
}
532-
if (!RETRYABLE_STATUS_CODES.has(entry.status!.code!)) {
533-
pendingEntryIndices.delete(originalEntriesIndex);
534-
}
535-
const errorDetails = entry.status;
536-
// eslint-disable-next-line @typescript-eslint/no-explicit-any
537-
(errorDetails as any).entry = originalEntry;
538-
mutationErrorsByEntryIndex.set(originalEntriesIndex, errorDetails);
539-
});
540-
})
541-
.on('end', onBatchResponse);
542-
numRequestsMade++;
543-
};
544-
545-
makeNextBatchRequest();
342+
mutateInternal(this, metricsCollector, entriesRaw, optionsOrCallback, cb);
546343
}
547344

548345
/**

src/utils/createReadStreamInternal.ts

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,6 @@ import {
2626
} from '../chunktransformer';
2727
import {TableUtils} from './table';
2828
import {Duplex, PassThrough, Transform} from 'stream';
29-
import {
30-
MethodName,
31-
StreamingState,
32-
} from '../client-side-metrics/client-side-metrics-attributes';
3329
import {google} from '../../protos/protos';
3430
const pumpify = require('pumpify');
3531
import {grpc, ServiceError} from 'google-gax';

0 commit comments

Comments
 (0)