Skip to content
Open
Prev Previous commit
Next Next commit
simple wiremock test
  • Loading branch information
apuig committed Mar 21, 2025
commit 82fb90840e23b2ff6112667ca23b093d196536df
7 changes: 7 additions & 0 deletions analytics/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,14 @@
<groupId>org.wiremock</groupId>
<artifactId>wiremock-standalone</artifactId>
<version>3.2.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>4.2.2</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import okhttp3.HttpUrl;
import retrofit2.Call;
import retrofit2.Response;
Expand Down Expand Up @@ -129,6 +128,7 @@ public AnalyticsClient(
this.isShutDown = isShutDown;
this.writeKey = writeKey;
this.gsonInstance = gsonInstance;
looperThread.start();
}

public int messageSizeInBytes(Message message) {
Expand All @@ -141,7 +141,10 @@ public boolean offer(Message message) {
return messageQueue.offer(message);
}

public void enqueue(Message message) {}
public void enqueue(Message message) {

enqueueSend(message);
}

public void enqueueSend(Message message) {
if (isShutDown.get()) {
Expand Down Expand Up @@ -289,7 +292,7 @@ static BatchUploadTask create(AnalyticsClient client, Batch batch, int maxRetrie
private void notifyCallbacksWithException(Batch batch, Exception exception) {
for (Message message : batch.batch()) {
for (Callback callback : client.callbacks) {
callback.failure(message, exception);
callback.failure(message, exception);
}
}
}
Expand Down
78 changes: 72 additions & 6 deletions analytics/src/test/java/com/segment/analytics/SegmentTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,34 +7,100 @@
import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.wireMockConfig;

import com.github.tomakehurst.wiremock.junit.WireMockRule;
import com.github.tomakehurst.wiremock.stubbing.ServeEvent;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.segment.analytics.gson.AutoValueAdapterFactory;
import com.segment.analytics.gson.ISO8601DateAdapter;
import com.segment.analytics.messages.TrackMessage;
import java.util.UUID;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.awaitility.Awaitility;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import wiremock.com.fasterxml.jackson.core.JsonProcessingException;
import wiremock.com.fasterxml.jackson.databind.JsonNode;
import wiremock.com.fasterxml.jackson.databind.ObjectMapper;

public class SegmentTest {

@Rule
public WireMockRule wireMockRule = new WireMockRule(wireMockConfig().dynamicPort(), false);
public WireMockRule wireMockRule =
new WireMockRule(wireMockConfig().dynamicPort().gzipDisabled(true), false);

Analytics analytics;

GsonBuilder gsonBuilder = new GsonBuilder()
.registerTypeAdapterFactory(new AutoValueAdapterFactory())
.registerTypeAdapter(Date.class, new ISO8601DateAdapter());

Gson gson = gsonBuilder.create();

@Before
public void confWireMock() {
stubFor(post(urlEqualTo("/v1/import/")).willReturn(okJson("{\"success\": \"true\"}")));

analytics = Analytics.builder("write-key")
.endpoint(wireMockRule.baseUrl())
.flushInterval(1, TimeUnit.SECONDS)
.queueCapacity(500)
// callback
// http client
.build();
}

@Test
public void test() {
analytics.enqueue(TrackMessage.builder("my-track")
.messageId(UUID.randomUUID().toString())
.userId("userId"));
public void test() throws Throwable {
analytics.enqueue(TrackMessage.builder("my-track").messageId("m1").userId("userId"));
analytics.enqueue(TrackMessage.builder("my-track").messageId("m2").userId("userId"));

Awaitility.await().until(() -> sentMessagesEqualsTo("m1", "m2"));
}

@Test
public void testMore() throws Throwable {
System.err.println("wm at " + wireMockRule.baseUrl());
int num = 100_000;
String[] expectedIds = new String[num];
for (int i = 0; i < num; i++) {
String id = "m" + i;
expectedIds[i] = id;
analytics.enqueue(TrackMessage.builder("my-track").messageId(id).userId("userId"));
}

Awaitility.await().atMost(1, TimeUnit.MINUTES).until(() -> sentMessagesEqualsTo(expectedIds));
}

private static final ObjectMapper OM = new ObjectMapper();

private boolean sentMessagesEqualsTo(String... msgIds) {
return new HashSet<>(sentMessages()).equals(new HashSet<>(Arrays.asList(msgIds)));
}

private List<String> sentMessages() {
List<String> messageIds = new ArrayList<>();
for (ServeEvent event : wireMockRule.getAllServeEvents()) {
JsonNode batch;
try {
JsonNode json = OM.readTree(event.getRequest().getBodyAsString());
batch = json.get("batch");
if (batch == null) {
continue;
}
} catch (JsonProcessingException e) {
continue;
}
Iterator<JsonNode> msgs = batch.elements();
while (msgs.hasNext()) {
messageIds.add(msgs.next().get("messageId").asText());
}
}
return messageIds;
}
}