@@ -29,14 +29,14 @@ import {BackoffSettings} from 'google-gax/build/src/gax';
2929import { google } from '../protos/protos' ;
3030import { CallOptions , grpc , ServiceError } from 'google-gax' ;
3131import { Transform } from 'stream' ;
32- import * as is from 'is' ;
3332import { GoogleInnerError } from './table' ;
3433import { createReadStreamInternal } from './utils/createReadStreamInternal' ;
3534import { getRowsInternal } from './utils/getRowsInternal' ;
3635import {
3736 MethodName ,
3837 StreamingState ,
3938} from './client-side-metrics/client-side-metrics-attributes' ;
39+ import { mutateInternal } from './utils/mutateInternal' ;
4040
4141// See protos/google/rpc/code.proto
4242// (4=DEADLINE_EXCEEDED, 8=RESOURCE_EXHAUSTED, 10=ABORTED, 14=UNAVAILABLE)
@@ -333,216 +333,13 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
333333 optionsOrCallback ?: MutateOptions | MutateCallback ,
334334 cb ?: MutateCallback ,
335335 ) : void | Promise < MutateResponse > {
336- const callback =
337- typeof optionsOrCallback === 'function' ? optionsOrCallback : cb ! ;
338- const options =
339- typeof optionsOrCallback === 'object' ? optionsOrCallback : { } ;
340- const entries : Entry [ ] = ( arrify ( entriesRaw ) as Entry [ ] ) . reduce (
341- ( a , b ) => a . concat ( b ) ,
342- [ ] ,
343- ) ;
344- const collectMetricsCallback = (
345- originalError : ServiceError | null ,
346- err : ServiceError | PartialFailureError | null ,
347- apiResponse ?: google . protobuf . Empty ,
348- ) => {
349- // originalError is the error that was sent from the gapic layer. The
350- // compiler guarantees that it contains a code which needs to be
351- // provided when an operation is marked complete.
352- //
353- // err is the error we intend to send back to the user. Often it is the
354- // same as originalError, but in one case we construct a
355- // PartialFailureError and send that back to the user instead. In this
356- // case, we still need to pass the originalError into the method
357- // because the PartialFailureError doesn't have a code, but we need to
358- // communicate a code to the metrics collector.
359- //
360- const code = originalError ? originalError . code : 0 ;
361- metricsCollector . onOperationComplete ( code ) ;
362- callback ( err , apiResponse ) ;
363- } ;
364-
365336 const metricsCollector =
366337 this . bigtable . _metricsConfigManager . createOperation (
367338 MethodName . MUTATE_ROWS ,
368339 StreamingState . STREAMING ,
369340 this ,
370341 ) ;
371- /*
372- The following line of code sets the timeout if it was provided while
373- creating the client. This will be used to determine if the client should
374- retry on errors. Eventually, this will be handled downstream in google-gax.
375- */
376- const timeout =
377- options ?. gaxOptions ?. timeout ||
378- ( this ?. bigtable ?. options ?. BigtableClient ?. clientConfig ?. interfaces &&
379- this ?. bigtable ?. options ?. BigtableClient ?. clientConfig ?. interfaces [
380- 'google.bigtable.v2.Bigtable'
381- ] ?. methods [ 'MutateRows' ] ?. timeout_millis ) ;
382- const callTimeMillis = new Date ( ) . getTime ( ) ;
383-
384- let numRequestsMade = 0 ;
385-
386- const maxRetries = is . number ( this . maxRetries ) ? this . maxRetries ! : 3 ;
387- const pendingEntryIndices = new Set (
388- entries . map ( ( entry : Entry , index : number ) => index ) ,
389- ) ;
390- const entryToIndex = new Map (
391- entries . map ( ( entry : Entry , index : number ) => [ entry , index ] ) ,
392- ) ;
393- const mutationErrorsByEntryIndex = new Map ( ) ;
394-
395- const isRetryable = (
396- err : ServiceError | null ,
397- timeoutExceeded : boolean ,
398- ) => {
399- if ( timeoutExceeded ) {
400- // If the timeout has been exceeded then do not retry.
401- return false ;
402- }
403- // Don't retry if there are no more entries or retry attempts
404- if ( pendingEntryIndices . size === 0 || numRequestsMade >= maxRetries + 1 ) {
405- return false ;
406- }
407- // If the error is empty but there are still outstanding mutations,
408- // it means that there are retryable errors in the mutate response
409- // even when the RPC succeeded
410- return ! err || RETRYABLE_STATUS_CODES . has ( err . code ) ;
411- } ;
412-
413- const onBatchResponse = ( err : ServiceError | null ) => {
414- // Return if the error happened before a request was made
415- if ( numRequestsMade === 0 ) {
416- collectMetricsCallback ( err , err ) ;
417- return ;
418- }
419-
420- const timeoutExceeded = ! ! (
421- timeout && timeout < new Date ( ) . getTime ( ) - callTimeMillis
422- ) ;
423- if ( isRetryable ( err , timeoutExceeded ) ) {
424- // If the timeout or max retries is exceeded or if there are no
425- // pending indices left then the client doesn't retry.
426- // Otherwise, the client will retry if there is no error or if the
427- // error has a retryable status code.
428- const backOffSettings =
429- options . gaxOptions ?. retry ?. backoffSettings ||
430- DEFAULT_BACKOFF_SETTINGS ;
431- const nextDelay = getNextDelay ( numRequestsMade , backOffSettings ) ;
432- metricsCollector . onAttemptComplete ( err ? err . code : 0 ) ;
433- setTimeout ( makeNextBatchRequest , nextDelay ) ;
434- return ;
435- }
436-
437- // If there's no more pending mutations, set the error
438- // to null
439- if ( pendingEntryIndices . size === 0 ) {
440- err = null ;
441- }
442-
443- const mutationErrors = Array . from ( mutationErrorsByEntryIndex . values ( ) ) ;
444- if ( mutationErrorsByEntryIndex . size !== 0 ) {
445- collectMetricsCallback (
446- err ,
447- new PartialFailureError ( mutationErrors , err ) ,
448- ) ;
449- return ;
450- }
451- if ( err ) {
452- /* If there's an RPC level failure and the mutation entries don't have
453- a status code, the RPC level failure error code will be used as the
454- entry failure code.
455- */
456- ( err as ServiceError & { errors ?: ServiceError [ ] } ) . errors =
457- mutationErrors . concat (
458- [ ...pendingEntryIndices ]
459- . filter ( index => ! mutationErrorsByEntryIndex . has ( index ) )
460- . map ( ( ) => err ) ,
461- ) ;
462- collectMetricsCallback ( err , err ) ;
463- return ;
464- }
465- collectMetricsCallback ( null , null ) ;
466- } ;
467-
468- metricsCollector . onOperationStart ( ) ;
469- const makeNextBatchRequest = ( ) => {
470- metricsCollector . onAttemptStart ( ) ;
471- const entryBatch = entries . filter ( ( entry : Entry , index : number ) => {
472- return pendingEntryIndices . has ( index ) ;
473- } ) ;
474-
475- // If the viewName is provided then request will be made for an
476- // authorized view. Otherwise, the request is made for a table.
477- const baseReqOpts = (
478- this . viewName
479- ? {
480- authorizedViewName : `${ this . name } /authorizedViews/${ this . viewName } ` ,
481- }
482- : {
483- tableName : this . name ,
484- }
485- ) as google . bigtable . v2 . IReadRowsRequest ;
486- const reqOpts = Object . assign ( baseReqOpts , {
487- appProfileId : this . bigtable . appProfileId ,
488- entries : options . rawMutation
489- ? entryBatch
490- : entryBatch . map ( Mutation . parse ) ,
491- } ) ;
492-
493- const retryOpts = {
494- currentRetryAttempt : numRequestsMade ,
495- // Handling retries in this client. Specify the retry options to
496- // make sure nothing is retried in retry-request.
497- noResponseRetries : 0 ,
498- shouldRetryFn : ( _ : any ) => {
499- return false ;
500- } ,
501- } ;
502-
503- options . gaxOptions = populateAttemptHeader (
504- numRequestsMade ,
505- options . gaxOptions ,
506- ) ;
507-
508- const requestStream =
509- this . bigtable . request < google . bigtable . v2 . MutateRowsResponse > ( {
510- client : 'BigtableClient' ,
511- method : 'mutateRows' ,
512- reqOpts,
513- gaxOpts : options . gaxOptions ,
514- retryOpts,
515- } ) ;
516- metricsCollector . wrapRequest ( requestStream ) ;
517- requestStream
518- . on ( 'error' , ( err : ServiceError ) => {
519- onBatchResponse ( err ) ;
520- } )
521- . on ( 'data' , ( obj : google . bigtable . v2 . IMutateRowsResponse ) => {
522- obj . entries ! . forEach ( entry => {
523- const originalEntry = entryBatch [ entry . index as number ] ;
524- const originalEntriesIndex = entryToIndex . get ( originalEntry ) ! ;
525-
526- // Mutation was successful.
527- if ( entry . status ! . code === 0 ) {
528- pendingEntryIndices . delete ( originalEntriesIndex ) ;
529- mutationErrorsByEntryIndex . delete ( originalEntriesIndex ) ;
530- return ;
531- }
532- if ( ! RETRYABLE_STATUS_CODES . has ( entry . status ! . code ! ) ) {
533- pendingEntryIndices . delete ( originalEntriesIndex ) ;
534- }
535- const errorDetails = entry . status ;
536- // eslint-disable-next-line @typescript-eslint/no-explicit-any
537- ( errorDetails as any ) . entry = originalEntry ;
538- mutationErrorsByEntryIndex . set ( originalEntriesIndex , errorDetails ) ;
539- } ) ;
540- } )
541- . on ( 'end' , onBatchResponse ) ;
542- numRequestsMade ++ ;
543- } ;
544-
545- makeNextBatchRequest ( ) ;
342+ mutateInternal ( this , metricsCollector , entriesRaw , optionsOrCallback , cb ) ;
546343 }
547344
548345 /**
0 commit comments