2020import com .google .api .gax .batching .Batcher ;
2121import com .google .api .gax .batching .BatcherImpl ;
2222import com .google .api .gax .core .BackgroundResource ;
23+ import com .google .api .gax .core .GaxProperties ;
24+ import com .google .api .gax .grpc .GaxGrpcProperties ;
2325import com .google .api .gax .grpc .GrpcCallSettings ;
2426import com .google .api .gax .grpc .GrpcRawCallableFactory ;
2527import com .google .api .gax .retrying .ExponentialRetryAlgorithm ;
3234import com .google .api .gax .rpc .ServerStreamingCallSettings ;
3335import com .google .api .gax .rpc .ServerStreamingCallable ;
3436import com .google .api .gax .rpc .UnaryCallable ;
37+ import com .google .api .gax .tracing .OpencensusTracerFactory ;
3538import com .google .api .gax .tracing .SpanName ;
3639import com .google .api .gax .tracing .TracedServerStreamingCallable ;
3740import com .google .api .gax .tracing .TracedUnaryCallable ;
5962import com .google .cloud .bigtable .data .v2 .models .RowAdapter ;
6063import com .google .cloud .bigtable .data .v2 .models .RowMutation ;
6164import com .google .cloud .bigtable .data .v2 .models .RowMutationEntry ;
62- import com .google .cloud .bigtable .data .v2 .stub .metrics .MeasuredMutateRowsCallable ;
63- import com .google .cloud .bigtable .data .v2 .stub .metrics .MeasuredReadRowsCallable ;
64- import com .google .cloud .bigtable .data .v2 .stub .metrics .MeasuredUnaryCallable ;
65+ import com .google .cloud .bigtable .data .v2 .stub .metrics .CompositeTracerFactory ;
66+ import com .google .cloud .bigtable .data .v2 .stub .metrics .MetricsTracerFactory ;
67+ import com .google .cloud .bigtable .data .v2 .stub .metrics .RpcMeasureConstants ;
6568import com .google .cloud .bigtable .data .v2 .stub .mutaterows .BulkMutateRowsUserFacingCallable ;
6669import com .google .cloud .bigtable .data .v2 .stub .mutaterows .MutateRowsBatchingDescriptor ;
6770import com .google .cloud .bigtable .data .v2 .stub .mutaterows .MutateRowsRetryingCallable ;
7376import com .google .cloud .bigtable .data .v2 .stub .readrows .RowMergingCallable ;
7477import com .google .cloud .bigtable .gaxx .retrying .ApiResultRetryAlgorithm ;
7578import com .google .common .base .Preconditions ;
79+ import com .google .common .collect .ImmutableList ;
7680import com .google .common .collect .ImmutableMap ;
7781import com .google .protobuf .ByteString ;
7882import io .opencensus .stats .Stats ;
7983import io .opencensus .stats .StatsRecorder ;
84+ import io .opencensus .tags .TagKey ;
85+ import io .opencensus .tags .TagValue ;
8086import io .opencensus .tags .Tagger ;
8187import io .opencensus .tags .Tags ;
8288import java .io .IOException ;
98104 */
99105@ InternalApi
100106public class EnhancedBigtableStub implements AutoCloseable {
101- private static final String TRACING_OUTER_CLIENT_NAME = "Bigtable" ;
107+ private static final String CLIENT_NAME = "Bigtable" ;
102108
103109 private final EnhancedBigtableStubSettings settings ;
104110 private final ClientContext clientContext ;
105111 private final RequestContext requestContext ;
106112
107- // TODO: This should probably move to ClientContext
108- private final Tagger tagger ;
109- private final StatsRecorder statsRecorder ;
110-
111113 private final ServerStreamingCallable <Query , Row > readRowsCallable ;
112114 private final UnaryCallable <Query , Row > readRowCallable ;
113115 private final UnaryCallable <String , List <KeyOffset >> sampleRowKeysCallable ;
@@ -125,15 +127,58 @@ public static EnhancedBigtableStub create(EnhancedBigtableStubSettings settings)
125127 }
126128
127129 @ InternalApi ("Visible for testing" )
128- private EnhancedBigtableStub (
130+ public EnhancedBigtableStub (
129131 EnhancedBigtableStubSettings settings ,
130132 ClientContext clientContext ,
131133 Tagger tagger ,
132134 StatsRecorder statsRecorder ) {
133135 this .settings = settings ;
134- this .clientContext = clientContext ;
135- this .tagger = tagger ;
136- this .statsRecorder = statsRecorder ;
136+
137+ this .clientContext =
138+ clientContext
139+ .toBuilder ()
140+ .setTracerFactory (
141+ new CompositeTracerFactory (
142+ ImmutableList .of (
143+ // Add OpenCensus Tracing
144+ new OpencensusTracerFactory (
145+ ImmutableMap .<String , String >builder ()
146+ // Annotate traces with the same tags as metrics
147+ .put (
148+ RpcMeasureConstants .BIGTABLE_PROJECT_ID .getName (),
149+ settings .getProjectId ())
150+ .put (
151+ RpcMeasureConstants .BIGTABLE_INSTANCE_ID .getName (),
152+ settings .getInstanceId ())
153+ .put (
154+ RpcMeasureConstants .BIGTABLE_APP_PROFILE_ID .getName (),
155+ settings .getAppProfileId ())
156+ // Also annotate traces with library versions
157+ .put ("gax" , GaxGrpcProperties .getGaxGrpcVersion ())
158+ .put ("grpc" , GaxGrpcProperties .getGrpcVersion ())
159+ .put (
160+ "gapic" ,
161+ GaxProperties .getLibraryVersion (
162+ EnhancedBigtableStubSettings .class ))
163+ .build ()),
164+ // Add OpenCensus Metrics
165+ MetricsTracerFactory .create (
166+ tagger ,
167+ statsRecorder ,
168+ ImmutableMap .<TagKey , TagValue >builder ()
169+ .put (
170+ RpcMeasureConstants .BIGTABLE_PROJECT_ID ,
171+ TagValue .create (settings .getProjectId ()))
172+ .put (
173+ RpcMeasureConstants .BIGTABLE_INSTANCE_ID ,
174+ TagValue .create (settings .getInstanceId ()))
175+ .put (
176+ RpcMeasureConstants .BIGTABLE_APP_PROFILE_ID ,
177+ TagValue .create (settings .getAppProfileId ()))
178+ .build ()),
179+ // Add user configured tracer
180+ clientContext .getTracerFactory ())))
181+ .build ();
137182 this .requestContext =
138183 RequestContext .create (
139184 settings .getProjectId (), settings .getInstanceId (), settings .getAppProfileId ());
@@ -196,17 +241,9 @@ public <RowT> ServerStreamingCallable<Query, RowT> createReadRowsCallable(
196241 new TracedServerStreamingCallable <>(
197242 readRowsUserCallable ,
198243 clientContext .getTracerFactory (),
199- SpanName .of (TRACING_OUTER_CLIENT_NAME , "ReadRows" ));
200-
201- ServerStreamingCallable <Query , RowT > measured =
202- new MeasuredReadRowsCallable <>(
203- traced ,
204- TRACING_OUTER_CLIENT_NAME + ".ReadRows" ,
205- tagger ,
206- statsRecorder ,
207- clientContext .getClock ());
244+ SpanName .of (CLIENT_NAME , "ReadRows" ));
208245
209- return measured .withDefaultCallContext (clientContext .getDefaultCallContext ());
246+ return traced .withDefaultCallContext (clientContext .getDefaultCallContext ());
210247 }
211248
212249 /**
@@ -393,19 +430,9 @@ private UnaryCallable<BulkMutation, Void> createBulkMutateRowsCallable() {
393430
394431 UnaryCallable <BulkMutation , Void > traced =
395432 new TracedUnaryCallable <>(
396- userFacing ,
397- clientContext .getTracerFactory (),
398- SpanName .of (TRACING_OUTER_CLIENT_NAME , "MutateRows" ));
399-
400- UnaryCallable <BulkMutation , Void > measured =
401- new MeasuredMutateRowsCallable (
402- traced ,
403- TRACING_OUTER_CLIENT_NAME + ".MutateRows" ,
404- tagger ,
405- statsRecorder ,
406- clientContext .getClock ());
433+ userFacing , clientContext .getTracerFactory (), SpanName .of (CLIENT_NAME , "MutateRows" ));
407434
408- return measured .withDefaultCallContext (clientContext .getDefaultCallContext ());
435+ return traced .withDefaultCallContext (clientContext .getDefaultCallContext ());
409436 }
410437
411438 /**
@@ -578,19 +605,9 @@ private <RequestT, ResponseT> UnaryCallable<RequestT, ResponseT> createUserFacin
578605
579606 UnaryCallable <RequestT , ResponseT > traced =
580607 new TracedUnaryCallable <>(
581- inner ,
582- clientContext .getTracerFactory (),
583- SpanName .of (TRACING_OUTER_CLIENT_NAME , methodName ));
584-
585- UnaryCallable <RequestT , ResponseT > measured =
586- new MeasuredUnaryCallable <>(
587- traced ,
588- TRACING_OUTER_CLIENT_NAME + "." + methodName ,
589- tagger ,
590- statsRecorder ,
591- clientContext .getClock ());
608+ inner , clientContext .getTracerFactory (), SpanName .of (CLIENT_NAME , methodName ));
592609
593- return measured .withDefaultCallContext (clientContext .getDefaultCallContext ());
610+ return traced .withDefaultCallContext (clientContext .getDefaultCallContext ());
594611 }
595612 // </editor-fold>
596613
0 commit comments