Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
remove
  • Loading branch information
apuig committed Mar 20, 2025
commit 3d232fadb3f9d87446c2ef3c5ef0335100f4422b
5 changes: 0 additions & 5 deletions analytics/src/main/java/com/segment/analytics/Analytics.java
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,6 @@ public boolean offer(MessageBuilder builder) {
return client.offer(message);
}

/** Flush events in the message queue. */
public void flush() {
client.flush();
}

Comment on lines -93 to -97
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: Looks to be a breaking change. Could we implement the flush method based on the new architecture? or at least depreciate it with empty or minimal implementation (warning log)?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this method is misleading. Flushing does not guarantee that messages will be delivered.
IMHO the API for segment should be a simple, "hey, there is the message, do your best"

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand your point. I'm just thinking about binary compatibility, the upgrade should ideally not require any changes from users.

/** Stops this instance from processing further requests. */
public void shutdown() {
client.shutdown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -54,17 +52,17 @@ public class AnalyticsClient {
}

private final BlockingQueue<Message> messageQueue;
private final BlockingQueue<Message> pendingQueue;
private final HttpUrl uploadUrl;
private final SegmentService service;
private final int size;
private final long flushIntervalInMillis;
private final int maximumRetries;
private final int maximumQueueByteSize;
private int currentQueueSizeInBytes;
private final Log log;
private final List<Callback> callbacks;
private final ExecutorService networkExecutor;
private final ExecutorService looperExecutor;
private final ScheduledExecutorService flushScheduler;
private final Thread looperThread;
private final AtomicBoolean isShutDown;
private final String writeKey;

Expand All @@ -83,6 +81,7 @@ public static AnalyticsClient create(
String writeKey,
Gson gsonInstance) {
return new AnalyticsClient(
new LinkedBlockingQueue<Message>(queueCapacity),
new LinkedBlockingQueue<Message>(queueCapacity),
uploadUrl,
segmentService,
Expand All @@ -101,6 +100,7 @@ public static AnalyticsClient create(

public AnalyticsClient(
BlockingQueue<Message> messageQueue,
BlockingQueue<Message> pendingQueue,
HttpUrl uploadUrl,
SegmentService service,
int maxQueueSize,
Expand All @@ -115,34 +115,20 @@ public AnalyticsClient(
String writeKey,
Gson gsonInstance) {
this.messageQueue = messageQueue;
this.pendingQueue = pendingQueue;
this.uploadUrl = uploadUrl;
this.service = service;
this.size = maxQueueSize;
this.flushIntervalInMillis = flushIntervalInMillis;
this.maximumRetries = maximumRetries;
this.maximumQueueByteSize = maximumQueueSizeInBytes;
this.log = log;
this.callbacks = callbacks;
this.looperExecutor = Executors.newSingleThreadExecutor(threadFactory);
this.looperThread = threadFactory.newThread(new Looper());
this.networkExecutor = networkExecutor;
this.isShutDown = isShutDown;
this.writeKey = writeKey;
this.gsonInstance = gsonInstance;

this.currentQueueSizeInBytes = 0;

if (!isShutDown.get()) looperExecutor.submit(new Looper());

flushScheduler = Executors.newScheduledThreadPool(1, threadFactory);
flushScheduler.scheduleAtFixedRate(
new Runnable() {
@Override
public void run() {
flush();
}
},
flushIntervalInMillis,
flushIntervalInMillis,
TimeUnit.MILLISECONDS);
}

public int messageSizeInBytes(Message message) {
Expand All @@ -151,74 +137,33 @@ public int messageSizeInBytes(Message message) {
return stringifiedMessage.getBytes(ENCODING).length;
}

private Boolean isBackPressuredAfterSize(int incomingSize) {
int POISON_BYTE_SIZE = messageSizeInBytes(FlushMessage.POISON);
int sizeAfterAdd = this.currentQueueSizeInBytes + incomingSize + POISON_BYTE_SIZE;
// Leave a 10% buffer since the unsynchronized enqueue could add multiple at a time
return sizeAfterAdd >= Math.min(this.maximumQueueByteSize, BATCH_MAX_SIZE) * 0.9;
}

public boolean offer(Message message) {
return messageQueue.offer(message);
}

public void enqueue(Message message) {
if (message != StopMessage.STOP && isShutDown.get()) {
public void enqueue(Message message) {}

public void enqueueSend(Message message) {
if (isShutDown.get()) {
log.print(ERROR, "Attempt to enqueue a message when shutdown has been called %s.", message);
return;
}

try {
// @jorgen25 message here could be regular msg, POISON or STOP. Only do regular logic if its
// valid message
if (message != StopMessage.STOP && message != FlushMessage.POISON) {
int messageByteSize = messageSizeInBytes(message);

// @jorgen25 check if message is below 32kb limit for individual messages, no need to check
// for extra characters
if (messageByteSize <= MSG_MAX_SIZE) {
if (isBackPressuredAfterSize(messageByteSize)) {
this.currentQueueSizeInBytes = messageByteSize;
messageQueue.put(FlushMessage.POISON);
messageQueue.put(message);

log.print(VERBOSE, "Maximum storage size has been hit Flushing...");
} else {
messageQueue.put(message);
this.currentQueueSizeInBytes += messageByteSize;
}
} else {
log.print(
ERROR, "Message was above individual limit. MessageId: %s", message.messageId());
throw new IllegalArgumentException(
"Message was above individual limit. MessageId: " + message.messageId());
}
} else {
messageQueue.put(message);
}
messageQueue.put(message);
} catch (InterruptedException e) {
log.print(ERROR, e, "Interrupted while adding message %s.", message);
Thread.currentThread().interrupt();
}
}

public void flush() {
if (!isShutDown.get()) {
enqueue(FlushMessage.POISON);
}
}

public void shutdown() {
if (isShutDown.compareAndSet(false, true)) {
final long start = System.currentTimeMillis();

// first let's tell the system to stop
enqueue(StopMessage.STOP);

// we can shutdown the flush scheduler without worrying
flushScheduler.shutdownNow();
looperThread.interrupt();

shutdownAndWait(looperExecutor, "looper");
shutdownAndWait(networkExecutor, "network");

log.print(
Expand Down Expand Up @@ -247,49 +192,44 @@ public void shutdownAndWait(ExecutorService executor, String name) {
* messages, it triggers a flush.
*/
class Looper implements Runnable {
private boolean stop;

public Looper() {
this.stop = false;
}

@Override
public void run() {
LinkedList<Message> messages = new LinkedList<>();
AtomicInteger currentBatchSize = new AtomicInteger();
int currentBatchSize = 0;
boolean batchSizeLimitReached = false;
int contextSize = gsonInstance.toJson(CONTEXT).getBytes(ENCODING).length;
try {
while (!stop) {
Message message = messageQueue.take();

if (message == StopMessage.STOP) {
log.print(VERBOSE, "Stopping the Looper");
stop = true;
} else if (message == FlushMessage.POISON) {
if (!messages.isEmpty()) {
log.print(VERBOSE, "Flushing messages.");
}
} else {
while (!Thread.currentThread().isInterrupted()) {
Message message = messageQueue.poll(flushIntervalInMillis, TimeUnit.MILLISECONDS);

if (message != null) {
// we do +1 because we are accounting for this new message we just took from the queue
// which is not in list yet
// need to check if this message is going to make us go over the limit considering
// default batch size as well
int defaultBatchSize =
BatchUtility.getBatchDefaultSize(contextSize, messages.size() + 1);
int msgSize = messageSizeInBytes(message);
if (currentBatchSize.get() + msgSize + defaultBatchSize <= BATCH_MAX_SIZE) {
if (currentBatchSize + msgSize + defaultBatchSize <= BATCH_MAX_SIZE) {
messages.add(message);
currentBatchSize.addAndGet(msgSize);
currentBatchSize+=msgSize;
} else {
// put message that did not make the cut this time back on the queue, we already took
// this message if we dont put it back its lost
// we take care of that after submitting the batch
batchSizeLimitReached = true;
}
}

if (messages.isEmpty()) {
continue;
}

Boolean isBlockingSignal = message == FlushMessage.POISON || message == StopMessage.STOP;
Boolean isBlockingSignal = message == null;
Boolean isOverflow = messages.size() >= size;

if (!messages.isEmpty() && (isOverflow || isBlockingSignal || batchSizeLimitReached)) {
Expand All @@ -302,7 +242,7 @@ public void run() {
networkExecutor.submit(
BatchUploadTask.create(AnalyticsClient.this, batch, maximumRetries));

currentBatchSize.set(0);
currentBatchSize=0;
messages.clear();
if (batchSizeLimitReached) {
// If this is true that means the last message that would make us go over the limit
Expand All @@ -315,8 +255,9 @@ public void run() {
}
} catch (InterruptedException e) {
log.print(DEBUG, "Looper interrupted while polling for messages.");
Thread.currentThread().interrupt();
Thread.currentThread().interrupt(); //XXX
}
// SEND pending
log.print(VERBOSE, "Looper stopped");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,6 @@ public void shutdownIsDispatched() {
verify(client).shutdown();
}

@Test
public void flushIsDispatched() {
analytics.flush();

verify(client).flush();
}

@Test
public void offerIsDispatched(MessageBuilderTest builder) {
MessageBuilder messageBuilder = builder.get().userId("dummy");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public class AnalyticsClientTest {

ThreadFactory threadFactory;
@Spy LinkedBlockingQueue<Message> messageQueue;
@Spy LinkedBlockingQueue<Message> pendingQueue;
@Mock SegmentService segmentService;
@Mock ExecutorService networkExecutor;
@Mock Callback callback;
Expand All @@ -95,6 +96,7 @@ public void setUp() {
AnalyticsClient newClient() {
return new AnalyticsClient(
messageQueue,
pendingQueue,
null,
segmentService,
50,
Expand Down Expand Up @@ -131,15 +133,6 @@ public void shutdown() throws InterruptedException {
verify(networkExecutor).awaitTermination(1, TimeUnit.SECONDS);
}

@Test
public void flushInsertsPoison() throws InterruptedException {
AnalyticsClient client = newClient();

client.flush();

verify(messageQueue).put(FlushMessage.POISON);
}

/** Wait until the queue is drained. */
static void wait(Queue<?> queue) {
// noinspection StatementWithEmptyBody
Expand Down Expand Up @@ -198,21 +191,6 @@ private static String generateDataOfSizeSpecialChars(
return builder.toString();
}

@Test
public void flushSubmitsToExecutor() {
messageQueue = new LinkedBlockingQueue<>();
AnalyticsClient client = newClient();

TrackMessage first = TrackMessage.builder("foo").userId("bar").build();
TrackMessage second = TrackMessage.builder("qaz").userId("qux").build();
client.enqueue(first);
client.enqueue(second);
client.flush();
wait(messageQueue);

assertThat(captureBatch(networkExecutor).batch()).containsExactly(first, second);
}

@Test
public void enqueueMaxTriggersFlush() {
messageQueue = new LinkedBlockingQueue<>();
Expand Down Expand Up @@ -284,6 +262,7 @@ public void flushHowManyTimesNecessaryToStayWithinLimit() throws InterruptedExce
AnalyticsClient client =
new AnalyticsClient(
messageQueue,
pendingQueue,
null,
segmentService,
50,
Expand Down Expand Up @@ -544,24 +523,6 @@ public boolean matches(IOException exception) {
}));
}

@Test
public void flushWhenNotShutDown() throws InterruptedException {
AnalyticsClient client = newClient();

client.flush();
verify(messageQueue).put(POISON);
}

@Test
public void flushWhenShutDown() throws InterruptedException {
AnalyticsClient client = newClient();
isShutDown.set(true);

client.flush();

verify(messageQueue, times(0)).put(any(Message.class));
}

@Test
public void enqueueWithRegularMessageWhenNotShutdown(MessageBuilderTest builder)
throws InterruptedException {
Expand Down Expand Up @@ -860,6 +821,7 @@ public void submitBatchBelowThreshold() throws InterruptedException, IllegalArgu
AnalyticsClient client =
new AnalyticsClient(
messageQueue,
pendingQueue,
null,
segmentService,
50,
Expand Down Expand Up @@ -902,6 +864,7 @@ public void submitBatchAboveThreshold() throws InterruptedException, IllegalArgu
AnalyticsClient client =
new AnalyticsClient(
messageQueue,
pendingQueue,
null,
segmentService,
50,
Expand Down Expand Up @@ -937,6 +900,7 @@ public void submitManySmallMessagesBatchAboveThreshold() throws InterruptedExcep
AnalyticsClient client =
new AnalyticsClient(
messageQueue,
pendingQueue,
null,
segmentService,
50,
Expand Down