Skip to content

Commit 30a9872

Browse files
committed
Introducing DDAgentWriter.Monitor
Introducing a Monitor for DDAgentWriter Monitor can be used to track critical lifecycle events in the publishing of spans. This change only contains a NoopMonitor, but a StatsD variant will be added in later commit. The DDAgentWriter has been altered to take a Monitor instance at construction for the purposes of testing the Monitor. DDAgentWriter now includes tests to count spans published and sent -- including new tests for multi-threaded writing and slow agent response scenarios.
1 parent 71cb23f commit 30a9872

2 files changed

Lines changed: 453 additions & 7 deletions

File tree

dd-trace-ot/src/main/java/datadog/trace/common/writer/DDAgentWriter.java

Lines changed: 161 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -71,12 +71,20 @@ public void translateTo(final Event<List<DDSpan>> event, final long sequence) {
7171
private final Phaser apiPhaser;
7272
private volatile boolean running = false;
7373

74+
private final Monitor monitor;
75+
7476
public DDAgentWriter() {
75-
this(new DDApi(DEFAULT_AGENT_HOST, DEFAULT_TRACE_AGENT_PORT, DEFAULT_AGENT_UNIX_DOMAIN_SOCKET));
77+
this(
78+
new DDApi(DEFAULT_AGENT_HOST, DEFAULT_TRACE_AGENT_PORT, DEFAULT_AGENT_UNIX_DOMAIN_SOCKET),
79+
new NoopMonitor());
80+
}
81+
82+
public DDAgentWriter(final DDApi api, final Monitor monitor) {
83+
this(api, monitor, DISRUPTOR_BUFFER_SIZE, FLUSH_PAYLOAD_DELAY);
7684
}
7785

7886
public DDAgentWriter(final DDApi api) {
79-
this(api, DISRUPTOR_BUFFER_SIZE, FLUSH_PAYLOAD_DELAY);
87+
this(api, new NoopMonitor());
8088
}
8189

8290
/**
@@ -87,8 +95,17 @@ public DDAgentWriter(final DDApi api) {
8795
* @param flushFrequencySeconds value < 1 disables scheduled flushes
8896
*/
8997
private DDAgentWriter(final DDApi api, final int disruptorSize, final int flushFrequencySeconds) {
98+
this(api, new NoopMonitor(), disruptorSize, flushFrequencySeconds);
99+
}
100+
101+
private DDAgentWriter(
102+
final DDApi api,
103+
final Monitor monitor,
104+
final int disruptorSize,
105+
final int flushFrequencySeconds) {
90106
this.api = api;
91-
this.flushFrequencySeconds = flushFrequencySeconds;
107+
this.monitor = monitor;
108+
92109
disruptor =
93110
new Disruptor<>(
94111
new DisruptorEventFactory<List<DDSpan>>(),
@@ -97,7 +114,10 @@ private DDAgentWriter(final DDApi api, final int disruptorSize, final int flushF
97114
ProducerType.MULTI,
98115
new SleepingWaitStrategy(0, TimeUnit.MILLISECONDS.toNanos(5)));
99116
disruptor.handleEventsWith(new TraceConsumer());
117+
118+
this.flushFrequencySeconds = flushFrequencySeconds;
100119
scheduledWriterExecutor = Executors.newScheduledThreadPool(1, SCHEDULED_FLUSH_THREAD_FACTORY);
120+
101121
apiPhaser = new Phaser(); // Ensure API calls are completed when flushing
102122
apiPhaser.register(); // Register on behalf of the scheduled executor thread.
103123
}
@@ -107,13 +127,20 @@ public void write(final List<DDSpan> trace) {
107127
// We can't add events after shutdown otherwise it will never complete shutting down.
108128
if (running) {
109129
final boolean published = disruptor.getRingBuffer().tryPublishEvent(TRANSLATOR, trace);
110-
if (!published) {
130+
131+
if (published) {
132+
monitor.onPublish(DDAgentWriter.this, trace);
133+
} else {
111134
// We're discarding the trace, but we still want to count it.
112135
traceCount.incrementAndGet();
113136
log.debug("Trace written to overfilled buffer. Counted but dropping trace: {}", trace);
137+
138+
monitor.onFailedPublish(this, trace);
114139
}
115140
} else {
116141
log.debug("Trace written after shutdown. Ignoring trace: {}", trace);
142+
143+
monitor.onFailedPublish(this, trace);
117144
}
118145
}
119146

@@ -131,11 +158,16 @@ public void start() {
131158
disruptor.start();
132159
running = true;
133160
scheduleFlush();
161+
162+
monitor.onStart(this);
134163
}
135164

136165
@Override
137166
public void close() {
138167
running = false;
168+
169+
boolean flushSuccess = true;
170+
139171
// We have to shutdown scheduled executor first to make sure no flush events issued after
140172
// disruptor has been shutdown.
141173
// Otherwise those events will never be processed and flush call will wait forever.
@@ -144,13 +176,17 @@ public void close() {
144176
scheduledWriterExecutor.awaitTermination(flushFrequencySeconds, SECONDS);
145177
} catch (final InterruptedException e) {
146178
log.warn("Waiting for flush executor shutdown interrupted.", e);
179+
180+
flushSuccess = false;
147181
}
148-
flush();
182+
flushSuccess |= flush();
149183
disruptor.shutdown();
184+
185+
monitor.onShutdown(this, flushSuccess);
150186
}
151187

152188
/** This method will block until the flush is complete. */
153-
public void flush() {
189+
public boolean flush() {
154190
if (running) {
155191
log.info("Flushing any remaining traces.");
156192
// Register with the phaser so we can block until the flush completion.
@@ -159,9 +195,15 @@ public void flush() {
159195
try {
160196
// Allow thread to be interrupted.
161197
apiPhaser.awaitAdvanceInterruptibly(apiPhaser.arriveAndDeregister());
198+
199+
return true;
162200
} catch (final InterruptedException e) {
163201
log.warn("Waiting for flush interrupted.", e);
202+
203+
return false;
164204
}
205+
} else {
206+
return false;
165207
}
166208
}
167209

@@ -175,9 +217,13 @@ private void scheduleFlush() {
175217
final ScheduledFuture<?> previous =
176218
flushSchedule.getAndSet(
177219
scheduledWriterExecutor.schedule(flushTask, flushFrequencySeconds, SECONDS));
178-
if (previous != null) {
220+
221+
final boolean previousIncomplete = (previous != null);
222+
if (previousIncomplete) {
179223
previous.cancel(true);
180224
}
225+
226+
monitor.onScheduleFlush(this, previousIncomplete);
181227
}
182228
}
183229

@@ -205,10 +251,16 @@ public void onEvent(
205251
final byte[] serializedTrace = api.serializeTrace(trace);
206252
payloadSize += serializedTrace.length;
207253
serializedTraces.add(serializedTrace);
254+
255+
monitor.onSerialize(DDAgentWriter.this, trace, serializedTrace);
208256
} catch (final JsonProcessingException e) {
209257
log.warn("Error serializing trace", e);
258+
259+
monitor.onFailedSerialize(DDAgentWriter.this, trace, e);
210260
} catch (final Throwable e) {
211261
log.debug("Error while serializing trace", e);
262+
263+
monitor.onFailedSerialize(DDAgentWriter.this, trace, e);
212264
}
213265
}
214266
if (event.shouldFlush || payloadSize >= FLUSH_PAYLOAD_BYTES) {
@@ -242,15 +294,30 @@ public void run() {
242294

243295
if (response.success()) {
244296
log.debug("Successfully sent {} traces to the API", toSend.size());
297+
298+
monitor.onSend(DDAgentWriter.this, representativeCount, sizeInBytes, response);
245299
} else {
246300
log.debug(
247301
"Failed to send {} traces (representing {}) of size {} bytes to the API",
248302
toSend.size(),
249303
representativeCount,
250304
sizeInBytes);
305+
306+
monitor.onFailedSend(
307+
DDAgentWriter.this, representativeCount, sizeInBytes, response);
251308
}
252309
} catch (final Throwable e) {
253310
log.debug("Failed to send traces to the API: {}", e.getMessage());
311+
312+
// DQH - 10/2019 - DDApi should wrap most exceptions itself, so this really
313+
// shouldn't occur.
314+
// However, just to be safe to start, create a failed Response to handle any
315+
// spurious Throwable-s.
316+
monitor.onFailedSend(
317+
DDAgentWriter.this,
318+
representativeCount,
319+
sizeInBytes,
320+
DDApi.Response.failed(e));
254321
} finally {
255322
apiPhaser.arrive(); // Flush completed.
256323
}
@@ -274,4 +341,91 @@ public Event<T> newInstance() {
274341
return new Event<>();
275342
}
276343
}
344+
345+
/**
346+
* Callback interface for monitoring the health of the DDAgentWriter. Provides hooks for major
347+
* lifecycle events...
348+
*
349+
* <ul>
350+
* <li>start
351+
* <li>shutdown
352+
* <li>publishing to disruptor
353+
* <li>serializing
354+
* <li>sending to agent
355+
* </ul>
356+
*/
357+
public interface Monitor {
358+
void onStart(final DDAgentWriter agentWriter);
359+
360+
void onShutdown(final DDAgentWriter agentWriter, final boolean flushSuccess);
361+
362+
void onPublish(final DDAgentWriter agentWriter, final List<DDSpan> trace);
363+
364+
void onFailedPublish(final DDAgentWriter agentWriter, final List<DDSpan> trace);
365+
366+
void onScheduleFlush(final DDAgentWriter agentWriter, final boolean previousIncomplete);
367+
368+
void onSerialize(
369+
final DDAgentWriter agentWriter, final List<DDSpan> trace, final byte[] serializedTrace);
370+
371+
void onFailedSerialize(
372+
final DDAgentWriter agentWriter, final List<DDSpan> trace, final Throwable optionalCause);
373+
374+
void onSend(
375+
final DDAgentWriter agentWriter,
376+
final int representativeCount,
377+
final int sizeInBytes,
378+
final DDApi.Response response);
379+
380+
void onFailedSend(
381+
final DDAgentWriter agentWriter,
382+
final int representativeCount,
383+
final int sizeInBytes,
384+
final DDApi.Response response);
385+
}
386+
387+
public static final class NoopMonitor implements Monitor {
388+
@Override
389+
public void onStart(final DDAgentWriter agentWriter) {}
390+
391+
@Override
392+
public void onShutdown(final DDAgentWriter agentWriter, final boolean flushSuccess) {}
393+
394+
@Override
395+
public void onPublish(final DDAgentWriter agentWriter, final List<DDSpan> trace) {}
396+
397+
@Override
398+
public void onFailedPublish(final DDAgentWriter agentWriter, final List<DDSpan> trace) {}
399+
400+
@Override
401+
public void onScheduleFlush(
402+
final DDAgentWriter agentWriter, final boolean previousIncomplete) {}
403+
404+
@Override
405+
public void onSerialize(
406+
final DDAgentWriter agentWriter, final List<DDSpan> trace, final byte[] serializedTrace) {}
407+
408+
@Override
409+
public void onFailedSerialize(
410+
final DDAgentWriter agentWriter, final List<DDSpan> trace, final Throwable optionalCause) {}
411+
412+
@Override
413+
public void onSend(
414+
final DDAgentWriter agentWriter,
415+
final int representativeCount,
416+
final int sizeInBytes,
417+
final DDApi.Response response) {}
418+
419+
@Override
420+
public void onFailedSend(
421+
final DDAgentWriter agentWriter,
422+
final int representativeCount,
423+
final int sizeInBytes,
424+
final DDApi.Response response) {}
425+
426+
@Override
427+
public String toString() {
428+
return "NoOp";
429+
}
430+
}
277431
}

0 commit comments

Comments
 (0)