Skip to content

Commit ebad7e9

Browse files
authored
chore(Datastore): Refactor Datastore Metric classes to inherit from Gax (#12361)
# Changes 1. Rename MetricsRecorder to DatastoreMetricsRecorder to clarify the package specific implementation (not to confuse with Spanner's or Gax's variant) 2. DatastoreMetricsRecorder inherits from Gax's MetricsRecorder to keep the existing attempt and operation data 3. Remove gRPC's ApiTracerFactory so that Datastore can natively record gRPC and HttpJson metrics together. Internally, the some metric names have some variantions (e.g. latencies vs latency) and would require a view to update the discrepancies (future PR). Additionally, any new metrics recorded in Gax would only apply to gRPC and would still require a native HttpJson implementation in the client library. 4. The only attributes recorded for a metric are service, status, and method name. This aligns with the internal labels that are set on the metrics.
1 parent a9198ee commit ebad7e9

14 files changed

+346
-473
lines changed

java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreImpl.java

Lines changed: 14 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -45,15 +45,13 @@
4545
import com.google.cloud.RetryHelper;
4646
import com.google.cloud.RetryHelper.RetryHelperException;
4747
import com.google.cloud.ServiceOptions;
48-
import com.google.cloud.TransportOptions;
4948
import com.google.cloud.datastore.execution.AggregationQueryExecutor;
5049
import com.google.cloud.datastore.spi.v1.DatastoreRpc;
51-
import com.google.cloud.datastore.telemetry.MetricsRecorder;
50+
import com.google.cloud.datastore.telemetry.DatastoreMetricsRecorder;
5251
import com.google.cloud.datastore.telemetry.TelemetryConstants;
5352
import com.google.cloud.datastore.telemetry.TelemetryUtils;
5453
import com.google.cloud.datastore.telemetry.TraceUtil;
5554
import com.google.cloud.datastore.telemetry.TraceUtil.Scope;
56-
import com.google.cloud.http.HttpTransportOptions;
5755
import com.google.common.base.MoreObjects;
5856
import com.google.common.base.Preconditions;
5957
import com.google.common.base.Stopwatch;
@@ -75,7 +73,6 @@
7573
import java.util.ArrayList;
7674
import java.util.Arrays;
7775
import java.util.Collections;
78-
import java.util.HashMap;
7976
import java.util.Iterator;
8077
import java.util.LinkedHashMap;
8178
import java.util.LinkedHashSet;
@@ -101,16 +98,13 @@ final class DatastoreImpl extends BaseService<DatastoreOptions> implements Datas
10198

10299
private final com.google.cloud.datastore.telemetry.TraceUtil otelTraceUtil =
103100
getOptions().getTraceUtil();
104-
private final MetricsRecorder metricsRecorder = getOptions().getMetricsRecorder();
105-
private final boolean isHttpTransport;
106-
101+
private final DatastoreMetricsRecorder metricsRecorder = getOptions().getMetricsRecorder();
107102
private final ReadOptionProtoPreparer readOptionProtoPreparer;
108103
private final AggregationQueryExecutor aggregationQueryExecutor;
109104

110105
DatastoreImpl(DatastoreOptions options) {
111106
super(options);
112107
this.datastoreRpc = options.getDatastoreRpcV1();
113-
this.isHttpTransport = options.getTransportOptions() instanceof HttpTransportOptions;
114108
retrySettings =
115109
MoreObjects.firstNonNull(options.getRetrySettings(), ServiceOptions.getNoRetrySettings());
116110

@@ -185,15 +179,15 @@ public boolean isClosed() {
185179
static class ReadWriteTransactionCallable<T> implements Callable<T> {
186180
private final Datastore datastore;
187181
private final TransactionCallable<T> callable;
188-
private final MetricsRecorder metricsRecorder;
182+
private final DatastoreMetricsRecorder metricsRecorder;
189183
private volatile TransactionOptions options;
190184
private volatile Transaction transaction;
191185

192186
ReadWriteTransactionCallable(
193187
Datastore datastore,
194188
TransactionCallable<T> callable,
195189
TransactionOptions options,
196-
MetricsRecorder metricsRecorder) {
190+
DatastoreMetricsRecorder metricsRecorder) {
197191
this.datastore = datastore;
198192
this.callable = callable;
199193
this.options = options;
@@ -227,7 +221,7 @@ public T call() throws DatastoreException {
227221
}
228222
throw DatastoreException.propagateUserException(ex);
229223
} finally {
230-
recordAttempt(attemptStatus, datastore.getOptions().getTransportOptions());
224+
recordAttempt(attemptStatus);
231225
// If the transaction is active, then commit the rollback. If it was already successfully
232226
// rolled back, the transaction is inactive (prevents rolling back an already rolled back
233227
// transaction).
@@ -245,18 +239,10 @@ public T call() throws DatastoreException {
245239
* Records a single transaction commit attempt with the given status code. This is called once
246240
* per invocation of {@link #call()}, capturing the outcome of each individual commit attempt.
247241
*/
248-
private void recordAttempt(String status, TransportOptions transportOptions) {
249-
Map<String, String> attributes = new HashMap<>();
250-
attributes.put(TelemetryConstants.ATTRIBUTES_KEY_STATUS, status);
251-
attributes.put(
252-
TelemetryConstants.ATTRIBUTES_KEY_METHOD, TelemetryConstants.METHOD_TRANSACTION_COMMIT);
253-
attributes.put(
254-
TelemetryConstants.ATTRIBUTES_KEY_PROJECT_ID, datastore.getOptions().getProjectId());
255-
attributes.put(
256-
TelemetryConstants.ATTRIBUTES_KEY_DATABASE_ID, datastore.getOptions().getDatabaseId());
257-
attributes.put(
258-
TelemetryConstants.ATTRIBUTES_KEY_TRANSPORT,
259-
TelemetryConstants.getTransportName(transportOptions));
242+
private void recordAttempt(String status) {
243+
Map<String, String> attributes =
244+
TelemetryUtils.buildMetricAttributes(
245+
TelemetryConstants.METHOD_TRANSACTION_COMMIT, status);
260246
metricsRecorder.recordTransactionAttemptCount(1, attributes);
261247
}
262248
}
@@ -293,15 +279,8 @@ public <T> T runInTransaction(
293279
throw DatastoreException.translateAndThrow(e);
294280
} finally {
295281
long latencyMs = stopwatch.elapsed(TimeUnit.MILLISECONDS);
296-
Map<String, String> attributes = new HashMap<>();
297-
attributes.put(TelemetryConstants.ATTRIBUTES_KEY_STATUS, status);
298-
attributes.put(
299-
TelemetryConstants.ATTRIBUTES_KEY_METHOD, TelemetryConstants.METHOD_TRANSACTION_RUN);
300-
attributes.put(TelemetryConstants.ATTRIBUTES_KEY_PROJECT_ID, getOptions().getProjectId());
301-
attributes.put(TelemetryConstants.ATTRIBUTES_KEY_DATABASE_ID, getOptions().getDatabaseId());
302-
attributes.put(
303-
TelemetryConstants.ATTRIBUTES_KEY_TRANSPORT,
304-
TelemetryConstants.getTransportName(getOptions().getTransportOptions()));
282+
Map<String, String> attributes =
283+
TelemetryUtils.buildMetricAttributes(TelemetryConstants.METHOD_TRANSACTION_RUN, status);
305284
metricsRecorder.recordTransactionLatency(latencyMs, attributes);
306285
span.end();
307286
}
@@ -805,15 +784,12 @@ private <T> T runWithObservability(
805784
ResultRetryAlgorithm<?> exceptionHandler) {
806785
com.google.cloud.datastore.telemetry.TraceUtil.Span span = otelTraceUtil.startSpan(spanName);
807786

808-
// Gax already records operation and attempt metrics. Since Datastore HttpJson does not
809-
// integrate with Gax, manually instrument these metrics when using HttpJson for parity
810-
Stopwatch operationStopwatch = isHttpTransport ? Stopwatch.createStarted() : null;
787+
Stopwatch operationStopwatch = Stopwatch.createStarted();
811788
String operationStatus = StatusCode.Code.OK.toString();
812789

813790
DatastoreOptions options = getOptions();
814791
Callable<T> attemptCallable =
815-
TelemetryUtils.attemptMetricsCallable(
816-
callable, metricsRecorder, options, isHttpTransport, methodName);
792+
TelemetryUtils.attemptMetricsCallable(callable, metricsRecorder, methodName);
817793
try (TraceUtil.Scope ignored = span.makeCurrent()) {
818794
return RetryHelper.runWithRetries(
819795
attemptCallable, retrySettings, exceptionHandler, options.getClock());
@@ -823,12 +799,7 @@ private <T> T runWithObservability(
823799
throw DatastoreException.translateAndThrow(e);
824800
} finally {
825801
TelemetryUtils.recordOperationMetrics(
826-
metricsRecorder,
827-
options,
828-
isHttpTransport,
829-
operationStopwatch,
830-
methodName,
831-
operationStatus);
802+
metricsRecorder, operationStopwatch, methodName, operationStatus);
832803
span.end();
833804
}
834805
}

java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreOptions.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
import com.google.cloud.datastore.spi.v1.DatastoreRpc;
3232
import com.google.cloud.datastore.spi.v1.GrpcDatastoreRpc;
3333
import com.google.cloud.datastore.spi.v1.HttpDatastoreRpc;
34-
import com.google.cloud.datastore.telemetry.MetricsRecorder;
34+
import com.google.cloud.datastore.telemetry.DatastoreMetricsRecorder;
3535
import com.google.cloud.datastore.v1.DatastoreSettings;
3636
import com.google.cloud.grpc.GrpcTransportOptions;
3737
import com.google.cloud.http.HttpTransportOptions;
@@ -65,7 +65,7 @@ public class DatastoreOptions extends ServiceOptions<Datastore, DatastoreOptions
6565

6666
private final transient @Nonnull DatastoreOpenTelemetryOptions openTelemetryOptions;
6767
private final transient @Nonnull com.google.cloud.datastore.telemetry.TraceUtil traceUtil;
68-
private final transient @Nonnull MetricsRecorder metricsRecorder;
68+
private final transient @Nonnull DatastoreMetricsRecorder metricsRecorder;
6969

7070
public static class DefaultDatastoreFactory implements DatastoreFactory {
7171

@@ -107,7 +107,7 @@ public DatastoreOpenTelemetryOptions getOpenTelemetryOptions() {
107107
}
108108

109109
@Nonnull
110-
MetricsRecorder getMetricsRecorder() {
110+
DatastoreMetricsRecorder getMetricsRecorder() {
111111
return metricsRecorder;
112112
}
113113

@@ -193,7 +193,7 @@ public Builder setDatabaseId(String databaseId) {
193193
}
194194

195195
/**
196-
* Sets the {@link DatastoreOpenTelemetryOptions} to be used for this Firestore instance.
196+
* Sets the {@link DatastoreOpenTelemetryOptions} to be used for this Datastore instance.
197197
*
198198
* @param openTelemetryOptions The `DatastoreOpenTelemetryOptions` to use.
199199
*/
@@ -223,7 +223,7 @@ private DatastoreOptions(Builder builder) {
223223
? builder.openTelemetryOptions
224224
: DatastoreOpenTelemetryOptions.newBuilder().build();
225225
this.traceUtil = com.google.cloud.datastore.telemetry.TraceUtil.getInstance(this);
226-
this.metricsRecorder = MetricsRecorder.getInstance(openTelemetryOptions);
226+
this.metricsRecorder = DatastoreMetricsRecorder.getInstance(this);
227227

228228
namespace = MoreObjects.firstNonNull(builder.namespace, defaultNamespace());
229229
databaseId = MoreObjects.firstNonNull(builder.databaseId, DEFAULT_DATABASE_ID);

java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/RetryAndTraceDatastoreRpcDecorator.java

Lines changed: 9 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,11 @@
2424
import com.google.cloud.RetryHelper;
2525
import com.google.cloud.RetryHelper.RetryHelperException;
2626
import com.google.cloud.datastore.spi.v1.DatastoreRpc;
27-
import com.google.cloud.datastore.telemetry.MetricsRecorder;
28-
import com.google.cloud.datastore.telemetry.NoOpMetricsRecorder;
27+
import com.google.cloud.datastore.telemetry.DatastoreMetricsRecorder;
28+
import com.google.cloud.datastore.telemetry.NoOpDatastoreMetricsRecorder;
2929
import com.google.cloud.datastore.telemetry.TelemetryConstants;
3030
import com.google.cloud.datastore.telemetry.TelemetryUtils;
3131
import com.google.cloud.datastore.telemetry.TraceUtil;
32-
import com.google.cloud.http.HttpTransportOptions;
3332
import com.google.common.base.Preconditions;
3433
import com.google.common.base.Stopwatch;
3534
import com.google.datastore.v1.AllocateIdsRequest;
@@ -62,8 +61,7 @@ public class RetryAndTraceDatastoreRpcDecorator implements DatastoreRpc {
6261
private final com.google.cloud.datastore.telemetry.TraceUtil otelTraceUtil;
6362
private final RetrySettings retrySettings;
6463
private final DatastoreOptions datastoreOptions;
65-
private final MetricsRecorder metricsRecorder;
66-
private final boolean isHttpTransport;
64+
private final DatastoreMetricsRecorder metricsRecorder;
6765

6866
@ObsoleteApi("Prefer to create RetryAndTraceDatastoreRpcDecorator via the Builder")
6967
public RetryAndTraceDatastoreRpcDecorator(
@@ -75,8 +73,7 @@ public RetryAndTraceDatastoreRpcDecorator(
7573
this.retrySettings = retrySettings;
7674
this.datastoreOptions = datastoreOptions;
7775
this.otelTraceUtil = otelTraceUtil;
78-
this.metricsRecorder = new NoOpMetricsRecorder();
79-
this.isHttpTransport = datastoreOptions.getTransportOptions() instanceof HttpTransportOptions;
76+
this.metricsRecorder = new NoOpDatastoreMetricsRecorder();
8077
}
8178

8279
private RetryAndTraceDatastoreRpcDecorator(Builder builder) {
@@ -85,7 +82,6 @@ private RetryAndTraceDatastoreRpcDecorator(Builder builder) {
8582
this.retrySettings = builder.retrySettings;
8683
this.datastoreOptions = builder.datastoreOptions;
8784
this.metricsRecorder = builder.metricsRecorder;
88-
this.isHttpTransport = builder.isHttpTransport;
8985
}
9086

9187
public static Builder newBuilder() {
@@ -99,8 +95,7 @@ public static class Builder {
9995
private DatastoreOptions datastoreOptions;
10096

10197
// Defaults configured for this class
102-
private MetricsRecorder metricsRecorder = new NoOpMetricsRecorder();
103-
private boolean isHttpTransport = false;
98+
private DatastoreMetricsRecorder metricsRecorder = new NoOpDatastoreMetricsRecorder();
10499

105100
private Builder() {}
106101

@@ -124,7 +119,7 @@ public Builder setDatastoreOptions(DatastoreOptions datastoreOptions) {
124119
return this;
125120
}
126121

127-
public Builder setMetricsRecorder(MetricsRecorder metricsRecorder) {
122+
public Builder setMetricsRecorder(DatastoreMetricsRecorder metricsRecorder) {
128123
Preconditions.checkNotNull(metricsRecorder, "metricsRecorder can not be null");
129124
this.metricsRecorder = metricsRecorder;
130125
return this;
@@ -135,7 +130,6 @@ public RetryAndTraceDatastoreRpcDecorator build() {
135130
Preconditions.checkNotNull(otelTraceUtil, "otelTraceUtil is required");
136131
Preconditions.checkNotNull(retrySettings, "retrySettings is required");
137132
Preconditions.checkNotNull(datastoreOptions, "datastoreOptions is required");
138-
this.isHttpTransport = datastoreOptions.getTransportOptions() instanceof HttpTransportOptions;
139133
return new RetryAndTraceDatastoreRpcDecorator(this);
140134
}
141135
}
@@ -207,12 +201,11 @@ public <O> O invokeRpc(Callable<O> block, String startSpan) {
207201

208202
<O> O invokeRpc(Callable<O> block, String startSpan, String methodName) {
209203
TraceUtil.Span span = otelTraceUtil.startSpan(startSpan);
210-
Stopwatch stopwatch = isHttpTransport ? Stopwatch.createStarted() : null;
204+
Stopwatch stopwatch = Stopwatch.createStarted();
211205
String operationStatus = StatusCode.Code.UNKNOWN.toString();
212206
try (TraceUtil.Scope ignored = span.makeCurrent()) {
213207
Callable<O> callable =
214-
TelemetryUtils.attemptMetricsCallable(
215-
block, metricsRecorder, datastoreOptions, isHttpTransport, methodName);
208+
TelemetryUtils.attemptMetricsCallable(block, metricsRecorder, methodName);
216209
O result =
217210
RetryHelper.runWithRetries(
218211
callable, this.retrySettings, EXCEPTION_HANDLER, this.datastoreOptions.getClock());
@@ -224,12 +217,7 @@ <O> O invokeRpc(Callable<O> block, String startSpan, String methodName) {
224217
throw DatastoreException.translateAndThrow(e);
225218
} finally {
226219
TelemetryUtils.recordOperationMetrics(
227-
metricsRecorder,
228-
datastoreOptions,
229-
isHttpTransport,
230-
stopwatch,
231-
methodName,
232-
operationStatus);
220+
metricsRecorder, stopwatch, methodName, operationStatus);
233221
span.end();
234222
}
235223
}

java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/spi/v1/GrpcDatastoreRpc.java

Lines changed: 0 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,9 @@
3131
import com.google.api.gax.rpc.HeaderProvider;
3232
import com.google.api.gax.rpc.NoHeaderProvider;
3333
import com.google.api.gax.rpc.TransportChannel;
34-
import com.google.api.gax.tracing.MetricsTracerFactory;
35-
import com.google.api.gax.tracing.OpenTelemetryMetricsRecorder;
3634
import com.google.cloud.ServiceOptions;
3735
import com.google.cloud.datastore.DatastoreException;
3836
import com.google.cloud.datastore.DatastoreOptions;
39-
import com.google.cloud.datastore.telemetry.TelemetryConstants;
4037
import com.google.cloud.datastore.v1.DatastoreSettings;
4138
import com.google.cloud.datastore.v1.stub.DatastoreStubSettings;
4239
import com.google.cloud.datastore.v1.stub.GrpcDatastoreStub;
@@ -61,12 +58,8 @@
6158
import io.grpc.CallOptions;
6259
import io.grpc.ManagedChannel;
6360
import io.grpc.ManagedChannelBuilder;
64-
import io.opentelemetry.api.GlobalOpenTelemetry;
65-
import io.opentelemetry.api.OpenTelemetry;
6661
import java.io.IOException;
6762
import java.util.Collections;
68-
import java.util.HashMap;
69-
import java.util.Map;
7063

7164
@InternalApi
7265
public class GrpcDatastoreRpc implements DatastoreRpc {
@@ -95,44 +88,12 @@ public GrpcDatastoreRpc(DatastoreOptions datastoreOptions) throws IOException {
9588
.build())
9689
.build());
9790

98-
// Hook into Gax's Metrics collection framework
99-
MetricsTracerFactory metricsTracerFactory = buildMetricsTracerFactory(datastoreOptions);
100-
if (metricsTracerFactory != null) {
101-
builder.setTracerFactory(metricsTracerFactory);
102-
}
103-
10491
datastoreStub = GrpcDatastoreStub.create(builder.build());
10592
} catch (IOException e) {
10693
throw new IOException(e);
10794
}
10895
}
10996

110-
/**
111-
* Build the MetricsTracerFactory to hook into Gax's Otel Framework. Only hooks into Gax on two
112-
* conditions: 1. OpenTelemetry instance is passed in by the user 2. Metrics are enabled
113-
*
114-
* <p>Sets default attributes to be recorded as part of the metrics.
115-
*/
116-
static MetricsTracerFactory buildMetricsTracerFactory(DatastoreOptions datastoreOptions) {
117-
if (!datastoreOptions.getOpenTelemetryOptions().isMetricsEnabled()) {
118-
return null;
119-
}
120-
OpenTelemetry openTelemetry = datastoreOptions.getOpenTelemetryOptions().getOpenTelemetry();
121-
if (openTelemetry == null) {
122-
openTelemetry = GlobalOpenTelemetry.get();
123-
}
124-
OpenTelemetryMetricsRecorder gaxMetricsRecorder =
125-
new OpenTelemetryMetricsRecorder(openTelemetry, TelemetryConstants.SERVICE_NAME);
126-
Map<String, String> attributes = new HashMap<>();
127-
attributes.put(TelemetryConstants.ATTRIBUTES_KEY_PROJECT_ID, datastoreOptions.getProjectId());
128-
if (!Strings.isNullOrEmpty(datastoreOptions.getDatabaseId())) {
129-
attributes.put(
130-
TelemetryConstants.ATTRIBUTES_KEY_DATABASE_ID, datastoreOptions.getDatabaseId());
131-
}
132-
attributes.put(TelemetryConstants.ATTRIBUTES_KEY_TRANSPORT, "grpc");
133-
return new MetricsTracerFactory(gaxMetricsRecorder, attributes);
134-
}
135-
13697
@Override
13798
public void close() throws Exception {
13899
if (!closed) {

0 commit comments

Comments
 (0)