Skip to content

Commit 0eb2234

Browse files
authored
Refactor to store samplers inside probes (#10605)
Refactor to store samplers inside probes Samplers were stored inside ProbeRateLimiter singleton into a concurrent map. and only one sampler per probe. Now to allow different samplers for probe, we are storing them directly into the probe instance. Samplers are created when probes are received from the configuration through initSamplers method from Sampled interface. Co-authored-by: jean-philippe.bempel <jean-philippe.bempel@datadoghq.com>
1 parent 8203a22 commit 0eb2234

14 files changed

Lines changed: 75 additions & 67 deletions

File tree

dd-java-agent/agent-debugger/debugger-bootstrap/src/main/java/datadog/trace/bootstrap/debugger/ProbeRateLimiter.java

Lines changed: 11 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@
55
import datadog.trace.api.sampling.Sampler;
66
import java.time.Duration;
77
import java.time.temporal.ChronoUnit;
8-
import java.util.concurrent.ConcurrentHashMap;
9-
import java.util.concurrent.ConcurrentMap;
108
import java.util.function.DoubleFunction;
119
import org.slf4j.Logger;
1210
import org.slf4j.LoggerFactory;
@@ -20,19 +18,15 @@ public class ProbeRateLimiter {
2018
private static final Duration TEN_SECONDS_WINDOW = Duration.of(10, ChronoUnit.SECONDS);
2119
private static final double DEFAULT_GLOBAL_SNAPSHOT_RATE = DEFAULT_SNAPSHOT_RATE * 100;
2220
private static final double DEFAULT_GLOBAL_LOG_RATE = 5000.0;
23-
private static final ConcurrentMap<String, RateLimitInfo> PROBE_SAMPLERS =
24-
new ConcurrentHashMap<>();
25-
private static Sampler GLOBAL_SNAPSHOT_SAMPLER = createSampler(DEFAULT_GLOBAL_SNAPSHOT_RATE);
26-
private static Sampler GLOBAL_LOG_SAMPLER = createSampler(DEFAULT_GLOBAL_LOG_RATE);
27-
private static DoubleFunction<Sampler> samplerSupplier = ProbeRateLimiter::createSampler;
21+
private static Sampler GLOBAL_SNAPSHOT_SAMPLER =
22+
defaultCreateSampler(DEFAULT_GLOBAL_SNAPSHOT_RATE);
23+
private static Sampler GLOBAL_LOG_SAMPLER = defaultCreateSampler(DEFAULT_GLOBAL_LOG_RATE);
24+
private static DoubleFunction<Sampler> samplerSupplier = ProbeRateLimiter::defaultCreateSampler;
2825

29-
public static boolean tryProbe(String probeId) {
30-
RateLimitInfo rateLimitInfo =
31-
PROBE_SAMPLERS.computeIfAbsent(probeId, ProbeRateLimiter::getDefaultRateLimitInfo);
32-
Sampler globalSampler =
33-
rateLimitInfo.isCaptureSnapshot ? GLOBAL_SNAPSHOT_SAMPLER : GLOBAL_LOG_SAMPLER;
26+
public static boolean tryProbe(Sampler sampler, boolean useGlobalLowRate) {
27+
Sampler globalSampler = useGlobalLowRate ? GLOBAL_SNAPSHOT_SAMPLER : GLOBAL_LOG_SAMPLER;
3428
if (globalSampler.sample()) {
35-
return rateLimitInfo.sampler.sample();
29+
return sampler.sample();
3630
}
3731
return false;
3832
}
@@ -42,8 +36,8 @@ private static RateLimitInfo getDefaultRateLimitInfo(String probeId) {
4236
return new RateLimitInfo(samplerSupplier.apply(DEFAULT_SNAPSHOT_RATE), true);
4337
}
4438

45-
public static void setRate(String probeId, double rate, boolean isCaptureSnapshot) {
46-
PROBE_SAMPLERS.put(probeId, new RateLimitInfo(samplerSupplier.apply(rate), isCaptureSnapshot));
39+
public static Sampler createSampler(double rate) {
40+
return samplerSupplier.apply(rate);
4741
}
4842

4943
public static void setGlobalSnapshotRate(double rate) {
@@ -54,25 +48,16 @@ public static void setGlobalLogRate(double rate) {
5448
GLOBAL_LOG_SAMPLER = samplerSupplier.apply(rate);
5549
}
5650

57-
public static void resetRate(String probeId) {
58-
PROBE_SAMPLERS.remove(probeId);
59-
}
60-
6151
public static void resetGlobalRate() {
6252
setGlobalSnapshotRate(DEFAULT_GLOBAL_LOG_RATE);
6353
}
6454

65-
public static void resetAll() {
66-
PROBE_SAMPLERS.clear();
67-
resetGlobalRate();
68-
}
69-
7055
public static void setSamplerSupplier(DoubleFunction<Sampler> samplerSupplier) {
7156
ProbeRateLimiter.samplerSupplier =
72-
samplerSupplier != null ? samplerSupplier : ProbeRateLimiter::createSampler;
57+
samplerSupplier != null ? samplerSupplier : ProbeRateLimiter::defaultCreateSampler;
7358
}
7459

75-
private static Sampler createSampler(double rate) {
60+
private static Sampler defaultCreateSampler(double rate) {
7661
if (rate < 0) {
7762
return new ConstantSampler(true);
7863
}

dd-java-agent/agent-debugger/src/main/java/com/datadog/debugger/agent/ConfigurationUpdater.java

Lines changed: 3 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
import com.datadog.debugger.probe.LogProbe;
1111
import com.datadog.debugger.probe.ProbeDefinition;
1212
import com.datadog.debugger.probe.Sampled;
13-
import com.datadog.debugger.probe.Sampling;
1413
import com.datadog.debugger.sink.DebuggerSink;
1514
import com.datadog.debugger.util.ExceptionHelper;
1615
import datadog.trace.api.Config;
@@ -136,8 +135,8 @@ private void applyNewConfiguration(Configuration newConfiguration) {
136135
new ConfigurationComparer(
137136
originalConfiguration, newConfiguration, instrumentationResults);
138137
if (changes.hasRateLimitRelatedChanged()) {
139-
// apply rate limit config first to avoid racing with execution/instrumentation of log
140-
// probes
138+
// apply rate limit config first to avoid racing with execution/instrumentation
139+
// of probes requiring samplers
141140
applyRateLimiter(changes, newConfiguration.getSampling());
142141
}
143142
currentConfiguration = newConfiguration;
@@ -282,18 +281,7 @@ private static void applyRateLimiter(
282281
for (ProbeDefinition added : changes.getAddedDefinitions()) {
283282
if (added instanceof Sampled) {
284283
Sampled probe = (Sampled) added;
285-
Sampling sampling = probe.getSampling();
286-
double rate = getDefaultRateLimitPerProbe(probe);
287-
if (sampling != null && sampling.getEventsPerSecond() != 0) {
288-
rate = sampling.getEventsPerSecond();
289-
}
290-
ProbeRateLimiter.setRate(probe.getId(), rate, probe.isCaptureSnapshot());
291-
}
292-
}
293-
// remove rate for all removed probes
294-
for (ProbeDefinition removedDefinition : changes.getRemovedDefinitions()) {
295-
if (removedDefinition instanceof LogProbe) {
296-
ProbeRateLimiter.resetRate(removedDefinition.getId());
284+
probe.initSamplers();
297285
}
298286
}
299287
// set global sampling
@@ -302,12 +290,6 @@ private static void applyRateLimiter(
302290
}
303291
}
304292

305-
private static double getDefaultRateLimitPerProbe(Sampled probe) {
306-
return probe.isCaptureSnapshot()
307-
? ProbeRateLimiter.DEFAULT_SNAPSHOT_RATE
308-
: ProbeRateLimiter.DEFAULT_LOG_RATE;
309-
}
310-
311293
private void removeCurrentTransformer() {
312294
if (currentTransformer == null) {
313295
return;

dd-java-agent/agent-debugger/src/main/java/com/datadog/debugger/agent/DebuggerTransformer.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -370,6 +370,7 @@ private void createMethodProbe(MethodInfo methodInfo, List<ProbeDefinition> prob
370370
.where(methodInfo.getClassNode().name, methodInfo.getMethodNode().name)
371371
.captureSnapshot(false)
372372
.build();
373+
probe.initSamplers();
373374
probes.add(probe);
374375
}
375376

dd-java-agent/agent-debugger/src/main/java/com/datadog/debugger/probe/ExceptionProbe.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ public ExceptionProbe(
5050
null);
5151
this.exceptionProbeManager = exceptionProbeManager;
5252
this.chainedExceptionIdx = chainedExceptionIdx;
53+
initSamplers();
5354
}
5455

5556
@Override

dd-java-agent/agent-debugger/src/main/java/com/datadog/debugger/probe/LogProbe.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import datadog.trace.api.Config;
2626
import datadog.trace.api.CorrelationIdentifier;
2727
import datadog.trace.api.DDTraceId;
28+
import datadog.trace.api.sampling.Sampler;
2829
import datadog.trace.bootstrap.debugger.CapturedContext;
2930
import datadog.trace.bootstrap.debugger.CapturedContextProbe;
3031
import datadog.trace.bootstrap.debugger.DebuggerContext;
@@ -323,6 +324,7 @@ public String toString() {
323324
private transient Consumer<Snapshot> snapshotProcessor;
324325
protected transient Map<DDTraceId, AtomicInteger> budget =
325326
Collections.synchronizedMap(new WeakIdentityHashMap<>());
327+
protected transient Sampler sampler;
326328

327329
// no-arg constructor is required by Moshi to avoid creating instance with unsafe and by-passing
328330
// constructors, including field initializers.
@@ -408,6 +410,7 @@ public LogProbe(LogProbe.Builder builder) {
408410
builder.sampling,
409411
builder.captureExpressions);
410412
this.snapshotProcessor = builder.snapshotProcessor;
413+
initSamplers();
411414
}
412415

413416
public LogProbe copy() {
@@ -450,6 +453,16 @@ public Sampling getSampling() {
450453
return sampling;
451454
}
452455

456+
public void initSamplers() {
457+
double rate =
458+
sampling != null
459+
? sampling.getEventsPerSecond()
460+
: (isCaptureSnapshot()
461+
? ProbeRateLimiter.DEFAULT_SNAPSHOT_RATE
462+
: ProbeRateLimiter.DEFAULT_LOG_RATE);
463+
sampler = ProbeRateLimiter.createSampler(rate);
464+
}
465+
453466
public List<CaptureExpression> getCaptureExpressions() {
454467
return captureExpressions;
455468
}
@@ -487,7 +500,7 @@ public InstrumentationResult.Status instrument(
487500
public boolean isReadyToCapture() {
488501
if (!hasCondition()) {
489502
// we are sampling here to avoid creating CapturedContext when the sampling result is negative
490-
return ProbeRateLimiter.tryProbe(id);
503+
return ProbeRateLimiter.tryProbe(sampler, isCaptureSnapshot());
491504
}
492505
return true;
493506
}
@@ -553,7 +566,8 @@ private void sample(LogStatus logStatus, MethodLocation methodLocation) {
553566
return;
554567
}
555568
boolean sampled =
556-
!logStatus.getDebugSessionStatus().isDisabled() && ProbeRateLimiter.tryProbe(id);
569+
!logStatus.getDebugSessionStatus().isDisabled()
570+
&& ProbeRateLimiter.tryProbe(sampler, isCaptureSnapshot());
557571
logStatus.setSampled(sampled);
558572
if (!sampled) {
559573
DebuggerAgent.getSink()
Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,5 @@
11
package com.datadog.debugger.probe;
22

33
public interface Sampled {
4-
Sampling getSampling();
5-
6-
String getId();
7-
8-
boolean isCaptureSnapshot();
4+
void initSamplers();
95
}

dd-java-agent/agent-debugger/src/main/java/com/datadog/debugger/probe/SpanDecorationProbe.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import com.datadog.debugger.instrumentation.MethodInfo;
1212
import com.datadog.debugger.sink.Snapshot;
1313
import datadog.trace.api.Pair;
14+
import datadog.trace.api.sampling.Sampler;
1415
import datadog.trace.bootstrap.debugger.CapturedContext;
1516
import datadog.trace.bootstrap.debugger.CapturedContextProbe;
1617
import datadog.trace.bootstrap.debugger.EvaluationError;
@@ -30,7 +31,7 @@
3031
import org.slf4j.Logger;
3132
import org.slf4j.LoggerFactory;
3233

33-
public class SpanDecorationProbe extends ProbeDefinition implements CapturedContextProbe {
34+
public class SpanDecorationProbe extends ProbeDefinition implements CapturedContextProbe, Sampled {
3435
private static final Logger LOGGER = LoggerFactory.getLogger(SpanDecorationProbe.class);
3536
private static final String PROBEID_DD_TAGS_FORMAT = "_dd.di.%s.probe_id";
3637
private static final String EVALERROR_DD_TAGS_FORMAT = "_dd.di.%s.evaluation_error";
@@ -157,6 +158,7 @@ public int hashCode() {
157158

158159
private final TargetSpan targetSpan;
159160
private final List<Decoration> decorations;
161+
private transient Sampler errorSampler;
160162

161163
// no-arg constructor is required by Moshi to avoid creating instance with unsafe and by-passing
162164
// constructors, including field initializers.
@@ -295,7 +297,7 @@ private void handleEvaluationErrors(SpanDecorationStatus status) {
295297
if (status.getErrors().isEmpty()) {
296298
return;
297299
}
298-
boolean sampled = ProbeRateLimiter.tryProbe(id);
300+
boolean sampled = ProbeRateLimiter.tryProbe(errorSampler, true);
299301
if (!sampled) {
300302
return;
301303
}
@@ -317,6 +319,11 @@ public List<Decoration> getDecorations() {
317319
return decorations;
318320
}
319321

322+
@Override
323+
public void initSamplers() {
324+
errorSampler = ProbeRateLimiter.createSampler(1.0);
325+
}
326+
320327
@Generated
321328
@Override
322329
public int hashCode() {

dd-java-agent/agent-debugger/src/main/java/com/datadog/debugger/probe/TriggerProbe.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import com.datadog.debugger.instrumentation.DiagnosticMessage;
1010
import com.datadog.debugger.instrumentation.InstrumentationResult;
1111
import com.datadog.debugger.instrumentation.MethodInfo;
12+
import datadog.trace.api.sampling.Sampler;
1213
import datadog.trace.bootstrap.debugger.CapturedContext;
1314
import datadog.trace.bootstrap.debugger.CapturedContextProbe;
1415
import datadog.trace.bootstrap.debugger.MethodLocation;
@@ -29,6 +30,7 @@ public class TriggerProbe extends ProbeDefinition implements Sampled, CapturedCo
2930
private ProbeCondition probeCondition;
3031
private Sampling sampling;
3132
private String sessionId;
33+
private transient Sampler sampler;
3234

3335
// no-arg constructor is required by Moshi to avoid creating instance with unsafe and by-passing
3436
// constructors, including field initializers.
@@ -68,6 +70,12 @@ public Sampling getSampling() {
6870
return sampling;
6971
}
7072

73+
@Override
74+
public void initSamplers() {
75+
double rate = sampling != null ? sampling.getEventsPerSecond() : 1.0;
76+
sampler = ProbeRateLimiter.createSampler(rate);
77+
}
78+
7179
@Override
7280
public boolean isCaptureSnapshot() {
7381
return false;
@@ -104,7 +112,8 @@ public void evaluate(
104112
if (sampling == null || !sampling.inCoolDown()) {
105113
boolean sample = true;
106114
if (!hasCondition()) {
107-
sample = MethodLocation.isSame(location, evaluateAt) && ProbeRateLimiter.tryProbe(id);
115+
sample =
116+
MethodLocation.isSame(location, evaluateAt) && ProbeRateLimiter.tryProbe(sampler, true);
108117
}
109118
boolean value = evaluateCondition(context);
110119

dd-java-agent/agent-debugger/src/test/java/com/datadog/debugger/agent/CapturingTestBase.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import com.datadog.debugger.instrumentation.InstrumentationResult;
1212
import com.datadog.debugger.probe.LogProbe;
1313
import com.datadog.debugger.probe.ProbeDefinition;
14+
import com.datadog.debugger.probe.Sampled;
1415
import com.datadog.debugger.sink.DebuggerSink;
1516
import com.datadog.debugger.sink.ProbeStatusSink;
1617
import com.datadog.debugger.util.MoshiHelper;
@@ -64,7 +65,7 @@ public void after() {
6465
if (currentTransformer != null) {
6566
instr.removeTransformer(currentTransformer);
6667
}
67-
ProbeRateLimiter.resetAll();
68+
ProbeRateLimiter.resetGlobalRate();
6869
Assertions.assertFalse(DebuggerContext.isInProbe());
6970
Redaction.clearUserDefinedTypes();
7071
}
@@ -354,10 +355,9 @@ protected TestSnapshotListener installProbes(Configuration configuration) {
354355
DebuggerContext.initClassFilter(new DenyListHelper(null));
355356
DebuggerContext.initValueSerializer(new JsonSnapshotSerializer());
356357

357-
for (LogProbe probe : configuration.getLogProbes()) {
358-
if (probe.getSampling() != null) {
359-
ProbeRateLimiter.setRate(
360-
probe.getId(), probe.getSampling().getEventsPerSecond(), probe.isCaptureSnapshot());
358+
for (ProbeDefinition probe : configuration.getDefinitions()) {
359+
if (probe instanceof Sampled) {
360+
((Sampled) probe).initSamplers();
361361
}
362362
}
363363
if (configuration.getSampling() != null) {

dd-java-agent/agent-debugger/src/test/java/com/datadog/debugger/agent/LogProbesInstrumentationTest.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
import com.datadog.debugger.el.DSL;
1414
import com.datadog.debugger.el.ProbeCondition;
1515
import com.datadog.debugger.probe.LogProbe;
16+
import com.datadog.debugger.probe.ProbeDefinition;
17+
import com.datadog.debugger.probe.Sampled;
1618
import com.datadog.debugger.sink.ProbeStatusSink;
1719
import com.datadog.debugger.sink.Snapshot;
1820
import com.datadog.debugger.util.TestSnapshotListener;
@@ -55,7 +57,7 @@ public void after() {
5557
if (currentTransformer != null) {
5658
instr.removeTransformer(currentTransformer);
5759
}
58-
ProbeRateLimiter.resetAll();
60+
ProbeRateLimiter.resetGlobalRate();
5961
}
6062

6163
@Test
@@ -564,6 +566,11 @@ private TestSnapshotListener installProbes(Configuration configuration) {
564566
.thenReturn("http://localhost:8126/debugger/v1/input");
565567
when(config.getFinalDebuggerSymDBUrl()).thenReturn("http://localhost:8126/symdb/v1/input");
566568
when(config.getDynamicInstrumentationUploadBatchSize()).thenReturn(100);
569+
for (ProbeDefinition probe : configuration.getDefinitions()) {
570+
if (probe instanceof Sampled) {
571+
((Sampled) probe).initSamplers();
572+
}
573+
}
567574
ProbeMetadata probeMetadata = new ProbeMetadata();
568575
currentTransformer = new DebuggerTransformer(config, probeMetadata, configuration);
569576
instr.addTransformer(currentTransformer);

0 commit comments

Comments
 (0)