@@ -71,12 +71,20 @@ public void translateTo(final Event<List<DDSpan>> event, final long sequence) {
7171 private final Phaser apiPhaser ;
7272 private volatile boolean running = false ;
7373
74+ private final Monitor monitor ;
75+
7476 public DDAgentWriter () {
75- this (new DDApi (DEFAULT_AGENT_HOST , DEFAULT_TRACE_AGENT_PORT , DEFAULT_AGENT_UNIX_DOMAIN_SOCKET ));
77+ this (
78+ new DDApi (DEFAULT_AGENT_HOST , DEFAULT_TRACE_AGENT_PORT , DEFAULT_AGENT_UNIX_DOMAIN_SOCKET ),
79+ new NoopMonitor ());
80+ }
81+
82+ public DDAgentWriter (final DDApi api , final Monitor monitor ) {
83+ this (api , monitor , DISRUPTOR_BUFFER_SIZE , FLUSH_PAYLOAD_DELAY );
7684 }
7785
7886 public DDAgentWriter (final DDApi api ) {
79- this (api , DISRUPTOR_BUFFER_SIZE , FLUSH_PAYLOAD_DELAY );
87+ this (api , new NoopMonitor () );
8088 }
8189
8290 /**
@@ -87,8 +95,17 @@ public DDAgentWriter(final DDApi api) {
8795 * @param flushFrequencySeconds value < 1 disables scheduled flushes
8896 */
8997 private DDAgentWriter (final DDApi api , final int disruptorSize , final int flushFrequencySeconds ) {
98+ this (api , new NoopMonitor (), disruptorSize , flushFrequencySeconds );
99+ }
100+
101+ private DDAgentWriter (
102+ final DDApi api ,
103+ final Monitor monitor ,
104+ final int disruptorSize ,
105+ final int flushFrequencySeconds ) {
90106 this .api = api ;
91- this .flushFrequencySeconds = flushFrequencySeconds ;
107+ this .monitor = monitor ;
108+
92109 disruptor =
93110 new Disruptor <>(
94111 new DisruptorEventFactory <List <DDSpan >>(),
@@ -97,7 +114,10 @@ private DDAgentWriter(final DDApi api, final int disruptorSize, final int flushF
97114 ProducerType .MULTI ,
98115 new SleepingWaitStrategy (0 , TimeUnit .MILLISECONDS .toNanos (5 )));
99116 disruptor .handleEventsWith (new TraceConsumer ());
117+
118+ this .flushFrequencySeconds = flushFrequencySeconds ;
100119 scheduledWriterExecutor = Executors .newScheduledThreadPool (1 , SCHEDULED_FLUSH_THREAD_FACTORY );
120+
101121 apiPhaser = new Phaser (); // Ensure API calls are completed when flushing
102122 apiPhaser .register (); // Register on behalf of the scheduled executor thread.
103123 }
@@ -107,13 +127,20 @@ public void write(final List<DDSpan> trace) {
107127 // We can't add events after shutdown otherwise it will never complete shutting down.
108128 if (running ) {
109129 final boolean published = disruptor .getRingBuffer ().tryPublishEvent (TRANSLATOR , trace );
110- if (!published ) {
130+
131+ if (published ) {
132+ monitor .onPublish (DDAgentWriter .this , trace );
133+ } else {
111134 // We're discarding the trace, but we still want to count it.
112135 traceCount .incrementAndGet ();
113136 log .debug ("Trace written to overfilled buffer. Counted but dropping trace: {}" , trace );
137+
138+ monitor .onFailedPublish (this , trace );
114139 }
115140 } else {
116141 log .debug ("Trace written after shutdown. Ignoring trace: {}" , trace );
142+
143+ monitor .onFailedPublish (this , trace );
117144 }
118145 }
119146
@@ -131,11 +158,16 @@ public void start() {
131158 disruptor .start ();
132159 running = true ;
133160 scheduleFlush ();
161+
162+ monitor .onStart (this );
134163 }
135164
136165 @ Override
137166 public void close () {
138167 running = false ;
168+
169+ boolean flushSuccess = true ;
170+
139171 // We have to shutdown scheduled executor first to make sure no flush events issued after
140172 // disruptor has been shutdown.
141173 // Otherwise those events will never be processed and flush call will wait forever.
@@ -144,13 +176,17 @@ public void close() {
144176 scheduledWriterExecutor .awaitTermination (flushFrequencySeconds , SECONDS );
145177 } catch (final InterruptedException e ) {
146178 log .warn ("Waiting for flush executor shutdown interrupted." , e );
179+
180+ flushSuccess = false ;
147181 }
148- flush ();
182+ flushSuccess |= flush ();
149183 disruptor .shutdown ();
184+
185+ monitor .onShutdown (this , flushSuccess );
150186 }
151187
152188 /** This method will block until the flush is complete. */
153- public void flush () {
189+ public boolean flush () {
154190 if (running ) {
155191 log .info ("Flushing any remaining traces." );
156192 // Register with the phaser so we can block until the flush completion.
@@ -159,9 +195,15 @@ public void flush() {
159195 try {
160196 // Allow thread to be interrupted.
161197 apiPhaser .awaitAdvanceInterruptibly (apiPhaser .arriveAndDeregister ());
198+
199+ return true ;
162200 } catch (final InterruptedException e ) {
163201 log .warn ("Waiting for flush interrupted." , e );
202+
203+ return false ;
164204 }
205+ } else {
206+ return false ;
165207 }
166208 }
167209
@@ -175,9 +217,13 @@ private void scheduleFlush() {
175217 final ScheduledFuture <?> previous =
176218 flushSchedule .getAndSet (
177219 scheduledWriterExecutor .schedule (flushTask , flushFrequencySeconds , SECONDS ));
178- if (previous != null ) {
220+
221+ final boolean previousIncomplete = (previous != null );
222+ if (previousIncomplete ) {
179223 previous .cancel (true );
180224 }
225+
226+ monitor .onScheduleFlush (this , previousIncomplete );
181227 }
182228 }
183229
@@ -205,10 +251,16 @@ public void onEvent(
205251 final byte [] serializedTrace = api .serializeTrace (trace );
206252 payloadSize += serializedTrace .length ;
207253 serializedTraces .add (serializedTrace );
254+
255+ monitor .onSerialize (DDAgentWriter .this , trace , serializedTrace );
208256 } catch (final JsonProcessingException e ) {
209257 log .warn ("Error serializing trace" , e );
258+
259+ monitor .onFailedSerialize (DDAgentWriter .this , trace , e );
210260 } catch (final Throwable e ) {
211261 log .debug ("Error while serializing trace" , e );
262+
263+ monitor .onFailedSerialize (DDAgentWriter .this , trace , e );
212264 }
213265 }
214266 if (event .shouldFlush || payloadSize >= FLUSH_PAYLOAD_BYTES ) {
@@ -242,15 +294,30 @@ public void run() {
242294
243295 if (response .success ()) {
244296 log .debug ("Successfully sent {} traces to the API" , toSend .size ());
297+
298+ monitor .onSend (DDAgentWriter .this , representativeCount , sizeInBytes , response );
245299 } else {
246300 log .debug (
247301 "Failed to send {} traces (representing {}) of size {} bytes to the API" ,
248302 toSend .size (),
249303 representativeCount ,
250304 sizeInBytes );
305+
306+ monitor .onFailedSend (
307+ DDAgentWriter .this , representativeCount , sizeInBytes , response );
251308 }
252309 } catch (final Throwable e ) {
253310 log .debug ("Failed to send traces to the API: {}" , e .getMessage ());
311+
312+ // DQH - 10/2019 - DDApi should wrap most exceptions itself, so this really
313+ // shouldn't occur.
314+ // However, just to be safe to start, create a failed Response to handle any
315+ // spurious Throwable-s.
316+ monitor .onFailedSend (
317+ DDAgentWriter .this ,
318+ representativeCount ,
319+ sizeInBytes ,
320+ DDApi .Response .failed (e ));
254321 } finally {
255322 apiPhaser .arrive (); // Flush completed.
256323 }
@@ -274,4 +341,91 @@ public Event<T> newInstance() {
274341 return new Event <>();
275342 }
276343 }
344+
345+ /**
346+ * Callback interface for monitoring the health of the DDAgentWriter. Provides hooks for major
347+ * lifecycle events...
348+ *
349+ * <ul>
350+ * <li>start
351+ * <li>shutdown
352+ * <li>publishing to disruptor
353+ * <li>serializing
354+ * <li>sending to agent
355+ * </ul>
356+ */
357+ public interface Monitor {
358+ void onStart (final DDAgentWriter agentWriter );
359+
360+ void onShutdown (final DDAgentWriter agentWriter , final boolean flushSuccess );
361+
362+ void onPublish (final DDAgentWriter agentWriter , final List <DDSpan > trace );
363+
364+ void onFailedPublish (final DDAgentWriter agentWriter , final List <DDSpan > trace );
365+
366+ void onScheduleFlush (final DDAgentWriter agentWriter , final boolean previousIncomplete );
367+
368+ void onSerialize (
369+ final DDAgentWriter agentWriter , final List <DDSpan > trace , final byte [] serializedTrace );
370+
371+ void onFailedSerialize (
372+ final DDAgentWriter agentWriter , final List <DDSpan > trace , final Throwable optionalCause );
373+
374+ void onSend (
375+ final DDAgentWriter agentWriter ,
376+ final int representativeCount ,
377+ final int sizeInBytes ,
378+ final DDApi .Response response );
379+
380+ void onFailedSend (
381+ final DDAgentWriter agentWriter ,
382+ final int representativeCount ,
383+ final int sizeInBytes ,
384+ final DDApi .Response response );
385+ }
386+
387+ public static final class NoopMonitor implements Monitor {
388+ @ Override
389+ public void onStart (final DDAgentWriter agentWriter ) {}
390+
391+ @ Override
392+ public void onShutdown (final DDAgentWriter agentWriter , final boolean flushSuccess ) {}
393+
394+ @ Override
395+ public void onPublish (final DDAgentWriter agentWriter , final List <DDSpan > trace ) {}
396+
397+ @ Override
398+ public void onFailedPublish (final DDAgentWriter agentWriter , final List <DDSpan > trace ) {}
399+
400+ @ Override
401+ public void onScheduleFlush (
402+ final DDAgentWriter agentWriter , final boolean previousIncomplete ) {}
403+
404+ @ Override
405+ public void onSerialize (
406+ final DDAgentWriter agentWriter , final List <DDSpan > trace , final byte [] serializedTrace ) {}
407+
408+ @ Override
409+ public void onFailedSerialize (
410+ final DDAgentWriter agentWriter , final List <DDSpan > trace , final Throwable optionalCause ) {}
411+
412+ @ Override
413+ public void onSend (
414+ final DDAgentWriter agentWriter ,
415+ final int representativeCount ,
416+ final int sizeInBytes ,
417+ final DDApi .Response response ) {}
418+
419+ @ Override
420+ public void onFailedSend (
421+ final DDAgentWriter agentWriter ,
422+ final int representativeCount ,
423+ final int sizeInBytes ,
424+ final DDApi .Response response ) {}
425+
426+ @ Override
427+ public String toString () {
428+ return "NoOp" ;
429+ }
430+ }
277431}
0 commit comments