Skip to content
Open
Prev Previous commit
Next Next commit
values
  • Loading branch information
apuig committed Mar 26, 2025
commit ab780a4649d0885bf2cb1e5f82e984e90aaf87c1
Original file line number Diff line number Diff line change
Expand Up @@ -119,14 +119,17 @@ public AnalyticsClient(
looperThread.start();

CircuitBreaker<Response<UploadResponse>> breaker = CircuitBreaker.<Response<UploadResponse>>builder()
// 5 failure in 2 minute open the circuit
.withFailureThreshold(5, Duration.ofMinutes(2))
// 10 failure in 2 minute open the circuit
.withFailureThreshold(10, Duration.ofMinutes(2))
// once open wait 30 seconds to be half-open
.withDelay(Duration.ofSeconds(30))
// after 1 success the circuit is closed
.withSuccessThreshold(1)
// 5xx or rate limit is an error
.handleResultIf(response -> is5xx(response.code()) || response.code() == 429)
.onOpen(el -> System.err.println("***\nOPEN\n***"))
.onHalfOpen(el -> System.err.println("***\nHALF OPEN\n***"))
.onClose(el -> System.err.println("***\nCLOSED\n***"))
.build();

RetryPolicy<Response<UploadResponse>> retry = RetryPolicy.<Response<UploadResponse>>builder()
Expand All @@ -137,12 +140,6 @@ public AnalyticsClient(
.handle(IOException.class)
// retry on 5xx or rate limit
.handleResultIf(response -> is5xx(response.code()) || response.code() == 429)
.onRetriesExceeded(context -> {
throw new RuntimeException("retries");
})
.onAbort(context -> {
throw new RuntimeException("aborted");
})
.build();

this.failsafe = Failsafe.with(retry, breaker).with(networkExecutor);
Expand All @@ -167,6 +164,9 @@ public void enqueue(Message message) {
if (!messageQueue.offer(message)) {
handleError(message);
}
else {
System.err.println("enqueued " + message.messageId());
}
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public void close() {
// block !!!
public void add(Message msg) {
try {
System.err.println("failed " + msg.messageId());
queue.put(msg);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
Expand Down Expand Up @@ -105,17 +106,15 @@ public void run() {

// FIXME batch
while (!msgs.isEmpty()) {
int reenqueued = 0;
boolean canEnqueue = true;
for (int i = msgs.size() - 1; canEnqueue && i >= 0; i--) {
Message msg = msgs.get(i);
canEnqueue = client.offer(msg);
if (canEnqueue) {
msgs.remove(i);
reenqueued++;
System.err.println("reenqueued " + msg.messageId());
}
}
System.err.println("reenqueued " + reenqueued);
try {
Thread.sleep(1_000);
} catch (InterruptedException e) {
Expand Down
37 changes: 25 additions & 12 deletions analytics/src/test/java/com/segment/analytics/SegmentTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Before;
Expand Down Expand Up @@ -64,23 +67,30 @@ public void test() throws Throwable {
.willReturn(
WireMock.aResponse().withStatus(503).withBody("fail").withUniformRandomDelay(100, 1_000)));

int requestsPerSecond = 10;
int numClients = 10;
int timeToRun = 90_000;
int timeToRestore = 30_000;

long start = System.currentTimeMillis();
boolean upAgain = false;
int id = 0;
final AtomicInteger id = new AtomicInteger(0);
List<String> ids = new ArrayList<>();
RateLimiter rate = RateLimiter.create(5);
while (System.currentTimeMillis() - start < 60_000) {

RateLimiter rate = RateLimiter.create(requestsPerSecond);
ExecutorService exec = Executors.newWorkStealingPool(numClients);

while (System.currentTimeMillis() - start < timeToRun) {
if (rate.tryAcquire()) {
String msgid = "m" + id++;
ids.add(msgid);
analytics.enqueue(
TrackMessage.builder("my-track").messageId(msgid).userId("userId"));
System.err.println("enqued " + msgid);
exec.submit(() -> {
String msgid = "m" + id.getAndIncrement();
ids.add(msgid);
analytics.enqueue(
TrackMessage.builder("my-track").messageId(msgid).userId("userId"));
});
}

Thread.sleep(50);

if (!upAgain && System.currentTimeMillis() - start > 20_000) {
Thread.sleep(1);
if (!upAgain && System.currentTimeMillis() - start > timeToRestore) {
upAgain = true;
stubFor(post(urlEqualTo("/v1/import/"))
.willReturn(okJson("{\"success\": \"true\"}").withUniformRandomDelay(100, 1_000)));
Expand All @@ -92,6 +102,9 @@ public void test() throws Throwable {
.atMost(10, TimeUnit.MINUTES)
.pollInterval(1, TimeUnit.SECONDS)
.until(() -> sentMessagesEqualsTo(ids.toArray(new String[ids.size()])));

exec.shutdownNow();
exec.awaitTermination(10, TimeUnit.SECONDS);
}

private static final ObjectMapper OM = new ObjectMapper();
Expand Down