Skip to content

Commit 354b028

Browse files
authored
Add gauge metric API and Otel implementation
This is needed by gRFC A78 for xds metrics, and for RLS metrics. Since gauges need to acquire a lock (or other synchronization) in the callback, the callback allows batching multiple gauges together to avoid acquiring-and-requiring such locks. Unlike other metrics, gauges are reported on-demand to the MetricSink. This means not all sinks will receive the same data, as the sinks will ask for the gauges at different times.
1 parent 8516cfe commit 354b028

File tree

8 files changed

+365
-2
lines changed

8 files changed

+365
-2
lines changed
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/*
2+
* Copyright 2024 The gRPC Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.grpc;
18+
19+
/**
20+
* Tagging interface for MetricInstruments that can be used with batch callbacks.
21+
*/
22+
@Internal
23+
public interface CallbackMetricInstrument extends MetricInstrument {}

api/src/main/java/io/grpc/LongGaugeMetricInstrument.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@
2222
* Represents a long-valued gauge metric instrument.
2323
*/
2424
@Internal
25-
public final class LongGaugeMetricInstrument extends PartialMetricInstrument {
25+
public final class LongGaugeMetricInstrument extends PartialMetricInstrument
26+
implements CallbackMetricInstrument {
2627
public LongGaugeMetricInstrument(int index, String name, String description, String unit,
2728
List<String> requiredLabelKeys, List<String> optionalLabelKeys, boolean enableByDefault) {
2829
super(index, name, description, unit, requiredLabelKeys, optionalLabelKeys, enableByDefault);

api/src/main/java/io/grpc/MetricRecorder.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,4 +67,43 @@ default void recordDoubleHistogram(DoubleHistogramMetricInstrument metricInstrum
6767
*/
6868
default void recordLongHistogram(LongHistogramMetricInstrument metricInstrument, long value,
6969
List<String> requiredLabelValues, List<String> optionalLabelValues) {}
70+
71+
/**
72+
* Registers a callback to produce metric values for only the listed instruments. The returned
73+
* registration must be closed when no longer needed, which will remove the callback.
74+
*
75+
* @param callback The callback to call to record.
76+
* @param metricInstruments The metric instruments the callback will record against.
77+
*/
78+
default Registration registerBatchCallback(BatchCallback callback,
79+
CallbackMetricInstrument... metricInstruments) {
80+
return () -> { };
81+
}
82+
83+
/** Callback to record gauge values. */
84+
interface BatchCallback {
85+
/** Records instrument values into {@code recorder}. */
86+
void accept(BatchRecorder recorder);
87+
}
88+
89+
/** Recorder for instrument values produced by a batch callback. */
90+
interface BatchRecorder {
91+
/**
92+
* Record a long gauge value.
93+
*
94+
* @param value The value to record.
95+
* @param requiredLabelValues A list of required label values for the metric.
96+
* @param optionalLabelValues A list of additional, optional label values for the metric.
97+
*/
98+
void recordLongGauge(LongGaugeMetricInstrument metricInstrument, long value,
99+
List<String> requiredLabelValues, List<String> optionalLabelValues);
100+
}
101+
102+
/** A handle to a registration, that allows unregistration. */
103+
interface Registration extends AutoCloseable {
104+
// Redefined to not throw an exception.
105+
/** Unregister. */
106+
@Override
107+
void close();
108+
}
70109
}

api/src/main/java/io/grpc/MetricSink.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,5 +99,30 @@ default void recordLongHistogram(LongHistogramMetricInstrument metricInstrument,
9999
List<String> requiredLabelValues, List<String> optionalLabelValues) {
100100
}
101101

102+
/**
103+
* Record a long gauge value.
104+
*
105+
* @param value The value to record.
106+
* @param requiredLabelValues A list of required label values for the metric.
107+
* @param optionalLabelValues A list of additional, optional label values for the metric.
108+
*/
109+
default void recordLongGauge(LongGaugeMetricInstrument metricInstrument, long value,
110+
List<String> requiredLabelValues, List<String> optionalLabelValues){
111+
}
112+
113+
/**
114+
* Registers a callback to produce metric values for only the listed instruments. The returned
115+
* registration must be closed when no longer needed, which will remove the callback.
116+
*
117+
* @param callback The callback to call to record.
118+
* @param metricInstruments The metric instruments the callback will record against.
119+
*/
120+
default Registration registerBatchCallback(Runnable callback,
121+
CallbackMetricInstrument... metricInstruments) {
122+
return () -> { };
123+
}
124+
125+
interface Registration extends MetricRecorder.Registration {}
126+
102127
void updateMeasures(List<MetricInstrument> instruments);
103128
}

core/src/main/java/io/grpc/internal/MetricRecorderImpl.java

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,21 @@
1717
package io.grpc.internal;
1818

1919
import static com.google.common.base.Preconditions.checkArgument;
20+
import static com.google.common.base.Preconditions.checkNotNull;
2021

2122
import com.google.common.annotations.VisibleForTesting;
23+
import io.grpc.CallbackMetricInstrument;
2224
import io.grpc.DoubleCounterMetricInstrument;
2325
import io.grpc.DoubleHistogramMetricInstrument;
2426
import io.grpc.LongCounterMetricInstrument;
27+
import io.grpc.LongGaugeMetricInstrument;
2528
import io.grpc.LongHistogramMetricInstrument;
2629
import io.grpc.MetricInstrument;
2730
import io.grpc.MetricInstrumentRegistry;
2831
import io.grpc.MetricRecorder;
2932
import io.grpc.MetricSink;
33+
import java.util.ArrayList;
34+
import java.util.BitSet;
3035
import java.util.List;
3136

3237
/**
@@ -171,4 +176,62 @@ public void recordLongHistogram(LongHistogramMetricInstrument metricInstrument,
171176
sink.recordLongHistogram(metricInstrument, value, requiredLabelValues, optionalLabelValues);
172177
}
173178
}
179+
180+
@Override
181+
public Registration registerBatchCallback(BatchCallback callback,
182+
CallbackMetricInstrument... metricInstruments) {
183+
long largestMetricInstrumentIndex = -1;
184+
BitSet allowedInstruments = new BitSet();
185+
for (CallbackMetricInstrument metricInstrument : metricInstruments) {
186+
largestMetricInstrumentIndex =
187+
Math.max(largestMetricInstrumentIndex, metricInstrument.getIndex());
188+
allowedInstruments.set(metricInstrument.getIndex());
189+
}
190+
List<MetricSink.Registration> registrations = new ArrayList<>();
191+
for (MetricSink sink : metricSinks) {
192+
int measuresSize = sink.getMeasuresSize();
193+
if (measuresSize <= largestMetricInstrumentIndex) {
194+
// Measures may need updating in two cases:
195+
// 1. When the sink is initially created with an empty list of measures.
196+
// 2. When new metric instruments are registered, requiring the sink to accommodate them.
197+
sink.updateMeasures(registry.getMetricInstruments());
198+
}
199+
BatchRecorder singleSinkRecorder = new BatchRecorderImpl(sink, allowedInstruments);
200+
registrations.add(sink.registerBatchCallback(
201+
() -> callback.accept(singleSinkRecorder), metricInstruments));
202+
}
203+
return () -> {
204+
for (MetricSink.Registration registration : registrations) {
205+
registration.close();
206+
}
207+
};
208+
}
209+
210+
/** Recorder for instrument values produced by a batch callback. */
211+
static class BatchRecorderImpl implements BatchRecorder {
212+
private final MetricSink sink;
213+
private final BitSet allowedInstruments;
214+
215+
BatchRecorderImpl(MetricSink sink, BitSet allowedInstruments) {
216+
this.sink = checkNotNull(sink, "sink");
217+
this.allowedInstruments = checkNotNull(allowedInstruments, "allowedInstruments");
218+
}
219+
220+
@Override
221+
public void recordLongGauge(LongGaugeMetricInstrument metricInstrument, long value,
222+
List<String> requiredLabelValues, List<String> optionalLabelValues) {
223+
checkArgument(allowedInstruments.get(metricInstrument.getIndex()),
224+
"Instrument was not listed when registering callback: %s", metricInstrument);
225+
checkArgument(requiredLabelValues != null
226+
&& requiredLabelValues.size() == metricInstrument.getRequiredLabelKeys().size(),
227+
"Incorrect number of required labels provided. Expected: %s",
228+
metricInstrument.getRequiredLabelKeys().size());
229+
checkArgument(optionalLabelValues != null
230+
&& optionalLabelValues.size() == metricInstrument.getOptionalLabelKeys().size(),
231+
"Incorrect number of optional labels provided. Expected: %s",
232+
metricInstrument.getOptionalLabelKeys().size());
233+
// Registering the callback checked that the instruments were be present in sink.
234+
sink.recordLongGauge(metricInstrument, value, requiredLabelValues, optionalLabelValues);
235+
}
236+
}
174237
}

core/src/test/java/io/grpc/internal/MetricRecorderImplTest.java

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package io.grpc.internal;
1818

19+
import static org.junit.Assert.assertThrows;
20+
import static org.mockito.ArgumentMatchers.any;
1921
import static org.mockito.ArgumentMatchers.anyList;
2022
import static org.mockito.ArgumentMatchers.eq;
2123
import static org.mockito.Mockito.mock;
@@ -28,6 +30,7 @@
2830
import io.grpc.DoubleCounterMetricInstrument;
2931
import io.grpc.DoubleHistogramMetricInstrument;
3032
import io.grpc.LongCounterMetricInstrument;
33+
import io.grpc.LongGaugeMetricInstrument;
3134
import io.grpc.LongHistogramMetricInstrument;
3235
import io.grpc.MetricInstrumentRegistry;
3336
import io.grpc.MetricInstrumentRegistryAccessor;
@@ -40,6 +43,7 @@
4043
import org.junit.Test;
4144
import org.junit.runner.RunWith;
4245
import org.junit.runners.JUnit4;
46+
import org.mockito.ArgumentCaptor;
4347

4448
/**
4549
* Unit test for {@link MetricRecorderImpl}.
@@ -72,6 +76,9 @@ public class MetricRecorderImplTest {
7276
private final LongHistogramMetricInstrument longHistogramInstrument =
7377
registry.registerLongHistogram("histogram2", DESCRIPTION, UNIT,
7478
Collections.emptyList(), REQUIRED_LABEL_KEYS, OPTIONAL_LABEL_KEYS, ENABLED);
79+
private final LongGaugeMetricInstrument longGaugeInstrument =
80+
registry.registerLongGauge("gauge0", DESCRIPTION, UNIT, REQUIRED_LABEL_KEYS,
81+
OPTIONAL_LABEL_KEYS, ENABLED);
7582
private MetricRecorder recorder;
7683

7784
@Before
@@ -113,6 +120,34 @@ public void recordHistogram() {
113120
verify(mockSink, never()).updateMeasures(registry.getMetricInstruments());
114121
}
115122

123+
@Test
124+
public void recordCallback() {
125+
MetricSink.Registration mockRegistration = mock(MetricSink.Registration.class);
126+
when(mockSink.getMeasuresSize()).thenReturn(5);
127+
when(mockSink.registerBatchCallback(any(Runnable.class), eq(longGaugeInstrument)))
128+
.thenReturn(mockRegistration);
129+
130+
MetricRecorder.Registration registration = recorder.registerBatchCallback((recorder) -> {
131+
recorder.recordLongGauge(
132+
longGaugeInstrument, 99, REQUIRED_LABEL_VALUES, OPTIONAL_LABEL_VALUES);
133+
}, longGaugeInstrument);
134+
135+
ArgumentCaptor<Runnable> callbackCaptor = ArgumentCaptor.forClass(Runnable.class);
136+
verify(mockSink, times(2))
137+
.registerBatchCallback(callbackCaptor.capture(), eq(longGaugeInstrument));
138+
139+
callbackCaptor.getValue().run();
140+
// Only once, for the one sink that called the callback.
141+
verify(mockSink).recordLongGauge(
142+
longGaugeInstrument, 99, REQUIRED_LABEL_VALUES, OPTIONAL_LABEL_VALUES);
143+
144+
verify(mockRegistration, never()).close();
145+
registration.close();
146+
verify(mockRegistration, times(2)).close();
147+
148+
verify(mockSink, never()).updateMeasures(registry.getMetricInstruments());
149+
}
150+
116151
@Test
117152
public void newRegisteredMetricUpdateMeasures() {
118153
// Sink is initialized with zero measures, should trigger updateMeasures() on sinks
@@ -145,6 +180,16 @@ public void newRegisteredMetricUpdateMeasures() {
145180
verify(mockSink, times(8)).updateMeasures(registry.getMetricInstruments());
146181
verify(mockSink, times(2)).recordLongHistogram(eq(longHistogramInstrument), eq(99L),
147182
eq(REQUIRED_LABEL_VALUES), eq(OPTIONAL_LABEL_VALUES));
183+
184+
// Callback
185+
when(mockSink.registerBatchCallback(any(Runnable.class), eq(longGaugeInstrument)))
186+
.thenReturn(mock(MetricSink.Registration.class));
187+
MetricRecorder.Registration registration = recorder.registerBatchCallback(
188+
(recorder) -> { }, longGaugeInstrument);
189+
verify(mockSink, times(10)).updateMeasures(registry.getMetricInstruments());
190+
verify(mockSink, times(2))
191+
.registerBatchCallback(any(Runnable.class), eq(longGaugeInstrument));
192+
registration.close();
148193
}
149194

150195
@Test(expected = IllegalArgumentException.class)
@@ -179,6 +224,26 @@ public void recordLongHistogramMismatchedRequiredLabelValues() {
179224
OPTIONAL_LABEL_VALUES);
180225
}
181226

227+
@Test
228+
public void recordLongGaugeMismatchedRequiredLabelValues() {
229+
when(mockSink.getMeasuresSize()).thenReturn(4);
230+
when(mockSink.registerBatchCallback(any(Runnable.class), eq(longGaugeInstrument)))
231+
.thenReturn(mock(MetricSink.Registration.class));
232+
233+
MetricRecorder.Registration registration = recorder.registerBatchCallback((recorder) -> {
234+
assertThrows(
235+
IllegalArgumentException.class,
236+
() -> recorder.recordLongGauge(
237+
longGaugeInstrument, 99, ImmutableList.of(), OPTIONAL_LABEL_VALUES));
238+
}, longGaugeInstrument);
239+
240+
ArgumentCaptor<Runnable> callbackCaptor = ArgumentCaptor.forClass(Runnable.class);
241+
verify(mockSink, times(2))
242+
.registerBatchCallback(callbackCaptor.capture(), eq(longGaugeInstrument));
243+
callbackCaptor.getValue().run();
244+
registration.close();
245+
}
246+
182247
@Test(expected = IllegalArgumentException.class)
183248
public void addDoubleCounterMismatchedOptionalLabelValues() {
184249
when(mockSink.getMeasuresSize()).thenReturn(4);
@@ -210,4 +275,24 @@ public void recordLongHistogramMismatchedOptionalLabelValues() {
210275
recorder.recordLongHistogram(longHistogramInstrument, 99, REQUIRED_LABEL_VALUES,
211276
ImmutableList.of());
212277
}
278+
279+
@Test
280+
public void recordLongGaugeMismatchedOptionalLabelValues() {
281+
when(mockSink.getMeasuresSize()).thenReturn(4);
282+
when(mockSink.registerBatchCallback(any(Runnable.class), eq(longGaugeInstrument)))
283+
.thenReturn(mock(MetricSink.Registration.class));
284+
285+
MetricRecorder.Registration registration = recorder.registerBatchCallback((recorder) -> {
286+
assertThrows(
287+
IllegalArgumentException.class,
288+
() -> recorder.recordLongGauge(
289+
longGaugeInstrument, 99, REQUIRED_LABEL_VALUES, ImmutableList.of()));
290+
}, longGaugeInstrument);
291+
292+
ArgumentCaptor<Runnable> callbackCaptor = ArgumentCaptor.forClass(Runnable.class);
293+
verify(mockSink, times(2))
294+
.registerBatchCallback(callbackCaptor.capture(), eq(longGaugeInstrument));
295+
callbackCaptor.getValue().run();
296+
registration.close();
297+
}
213298
}

0 commit comments

Comments
 (0)