From d3fe99f694c12b174a7f663af0165cfb49887c27 Mon Sep 17 00:00:00 2001 From: Brendan Almonte Date: Wed, 17 Sep 2014 18:12:12 -0700 Subject: [PATCH 1/4] Add streaming events API --- pom.xml | 2 +- .../github/dockerjava/api/DockerClient.java | 3 + .../api/command/DockerCmdExecFactory.java | 2 + .../dockerjava/api/command/EventsCmd.java | 22 ++++ .../github/dockerjava/api/model/Event.java | 37 +++++++ .../dockerjava/api/model/EventStream.java | 85 +++++++++++++++ .../com/github/dockerjava/api/model/Info.java | 2 +- .../dockerjava/core/DockerClientImpl.java | 5 + .../core/command/EventsCmdImpl.java | 52 +++++++++ .../jaxrs/DockerCmdExecFactoryImpl.java | 8 +- .../dockerjava/jaxrs/EventsCmdExec.java | 29 +++++ .../core/command/EventsCmdImplTest.java | 101 ++++++++++++++++++ 12 files changed, 345 insertions(+), 3 deletions(-) create mode 100644 src/main/java/com/github/dockerjava/api/command/EventsCmd.java create mode 100644 src/main/java/com/github/dockerjava/api/model/Event.java create mode 100644 src/main/java/com/github/dockerjava/api/model/EventStream.java create mode 100644 src/main/java/com/github/dockerjava/core/command/EventsCmdImpl.java create mode 100644 src/main/java/com/github/dockerjava/jaxrs/EventsCmdExec.java create mode 100644 src/test/java/com/github/dockerjava/core/command/EventsCmdImplTest.java diff --git a/pom.xml b/pom.xml index 99a6c603e..d8f1fbac6 100644 --- a/pom.xml +++ b/pom.xml @@ -63,7 +63,7 @@ 1.7.5 1.3.9 0.3 - 11.0.1 + 18.0 1.0.1 diff --git a/src/main/java/com/github/dockerjava/api/DockerClient.java b/src/main/java/com/github/dockerjava/api/DockerClient.java index 15e66bef8..ea2fcefe2 100644 --- a/src/main/java/com/github/dockerjava/api/DockerClient.java +++ b/src/main/java/com/github/dockerjava/api/DockerClient.java @@ -10,6 +10,7 @@ import com.github.dockerjava.api.command.CopyFileFromContainerCmd; import com.github.dockerjava.api.command.CreateContainerCmd; import com.github.dockerjava.api.command.CreateImageCmd; +import com.github.dockerjava.api.command.EventsCmd; import com.github.dockerjava.api.command.InfoCmd; import com.github.dockerjava.api.command.InspectContainerCmd; import com.github.dockerjava.api.command.InspectImageCmd; @@ -114,6 +115,8 @@ public CopyFileFromContainerCmd copyFileFromContainerCmd( public UnpauseContainerCmd unpauseContainerCmd(String containerId); + public EventsCmd eventsCmd(); + public void close() throws IOException; } \ No newline at end of file diff --git a/src/main/java/com/github/dockerjava/api/command/DockerCmdExecFactory.java b/src/main/java/com/github/dockerjava/api/command/DockerCmdExecFactory.java index 6534bb547..24c0a4650 100644 --- a/src/main/java/com/github/dockerjava/api/command/DockerCmdExecFactory.java +++ b/src/main/java/com/github/dockerjava/api/command/DockerCmdExecFactory.java @@ -69,6 +69,8 @@ public interface DockerCmdExecFactory extends Closeable { public UnpauseContainerCmd.Exec createUnpauseContainerCmdExec(); + public EventsCmd.Exec createEventsCmdExec(); + public void close() throws IOException; } \ No newline at end of file diff --git a/src/main/java/com/github/dockerjava/api/command/EventsCmd.java b/src/main/java/com/github/dockerjava/api/command/EventsCmd.java new file mode 100644 index 000000000..71590c842 --- /dev/null +++ b/src/main/java/com/github/dockerjava/api/command/EventsCmd.java @@ -0,0 +1,22 @@ +package com.github.dockerjava.api.command; + +import com.github.dockerjava.api.model.EventStream; + +/** + * Get events + * + * @param since - Show all events created since timestamp + * @param until - Stream events until this timestamp + */ +public interface EventsCmd extends DockerCmd { + public EventsCmd withSince(String since); + + public EventsCmd withUntil(String until); + + public String getSince(); + + public String getUntil(); + + public static interface Exec extends DockerCmdExec { + } +} diff --git a/src/main/java/com/github/dockerjava/api/model/Event.java b/src/main/java/com/github/dockerjava/api/model/Event.java new file mode 100644 index 000000000..674cb66de --- /dev/null +++ b/src/main/java/com/github/dockerjava/api/model/Event.java @@ -0,0 +1,37 @@ +package com.github.dockerjava.api.model; + +import org.apache.commons.lang.builder.ToStringBuilder; + +/** + * Representation of a Docker event. + */ +public class Event { + private String status; + + private String id; + + private String from; + + private long time; + + public String getStatus() { + return status; + } + + public String getId() { + return id; + } + + public String getFrom() { + return from; + } + + public long getTime() { + return time; + } + + @Override + public String toString() { + return ToStringBuilder.reflectionToString(this); + } +} diff --git a/src/main/java/com/github/dockerjava/api/model/EventStream.java b/src/main/java/com/github/dockerjava/api/model/EventStream.java new file mode 100644 index 000000000..0f47c25fd --- /dev/null +++ b/src/main/java/com/github/dockerjava/api/model/EventStream.java @@ -0,0 +1,85 @@ +package com.github.dockerjava.api.model; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonToken; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Queues; + +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import java.util.Queue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +/** + * EventStream API + *

+ * Spawns a thread to poll for events to fill a BlockingQueue + */ +public class EventStream implements Closeable { + private final ExecutorService executor = Executors.newSingleThreadExecutor(); + private final BlockingQueue queue; + private final EventRunner eventRunner; + + private EventStream(InputStream inputStream) { + queue = Queues.newLinkedBlockingQueue(); + eventRunner = new EventRunner(queue, inputStream); + } + + public static EventStream create(InputStream inputStream) { + return new EventStream(inputStream).startRunner(); + } + + public Event pollEvent() { + return queue.poll(); + } + + public Event pollEvent(long timeout, TimeUnit unit) throws InterruptedException { + return queue.poll(timeout, unit); + } + + @Override + public void close() throws IOException { + eventRunner.initiateStop(); + executor.shutdown(); + } + + private EventStream startRunner() { + executor.execute(eventRunner); + return this; + } + + private static class EventRunner implements Runnable { + private static final JsonFactory JSON_FACTORY = new JsonFactory(); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private final Queue queue; + private final InputStream inputStream; + + public EventRunner(Queue queue, InputStream inputStream) { + this.queue = queue; + this.inputStream = inputStream; + } + + public void initiateStop() throws IOException { + inputStream.close(); + } + + @Override + public void run() { + try { + JsonParser jp = JSON_FACTORY.createParser(inputStream); + while (jp.nextToken() != JsonToken.END_OBJECT && !jp.isClosed()) { + queue.add(OBJECT_MAPPER.readValue(jp, Event.class)); + } + inputStream.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } +} diff --git a/src/main/java/com/github/dockerjava/api/model/Info.java b/src/main/java/com/github/dockerjava/api/model/Info.java index f71b75c93..7fe6ae798 100644 --- a/src/main/java/com/github/dockerjava/api/model/Info.java +++ b/src/main/java/com/github/dockerjava/api/model/Info.java @@ -10,7 +10,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.annotation.JsonSerialize; -/**Ø +/** * * @author Konstantin Pelykh (kpelykh@gmail.com) * diff --git a/src/main/java/com/github/dockerjava/core/DockerClientImpl.java b/src/main/java/com/github/dockerjava/core/DockerClientImpl.java index baa5b5da5..810476369 100644 --- a/src/main/java/com/github/dockerjava/core/DockerClientImpl.java +++ b/src/main/java/com/github/dockerjava/core/DockerClientImpl.java @@ -252,6 +252,11 @@ public UnpauseContainerCmd unpauseContainerCmd(String containerId) { return new UnpauseContainerCmdImpl(getDockerCmdExecFactory().createUnpauseContainerCmdExec(), containerId); } + @Override + public EventsCmd eventsCmd() { + return new EventsCmdImpl(getDockerCmdExecFactory().createEventsCmdExec()); + } + @Override public void close() throws IOException { getDockerCmdExecFactory().close(); diff --git a/src/main/java/com/github/dockerjava/core/command/EventsCmdImpl.java b/src/main/java/com/github/dockerjava/core/command/EventsCmdImpl.java new file mode 100644 index 000000000..0b045838d --- /dev/null +++ b/src/main/java/com/github/dockerjava/core/command/EventsCmdImpl.java @@ -0,0 +1,52 @@ +package com.github.dockerjava.core.command; + +import com.github.dockerjava.api.command.EventsCmd; +import com.github.dockerjava.api.model.EventStream; + +import java.io.InputStream; + +/** + * Stream docker events + */ +public class EventsCmdImpl extends AbstrDockerCmd implements EventsCmd { + + private String since; + private String until; + + public EventsCmdImpl(EventsCmd.Exec exec) { + super(exec); + } + + @Override + public EventsCmd withSince(String since) { + this.since = since; + return this; + } + + @Override + public EventsCmd withUntil(String until) { + this.until = until; + return this; + } + + public String getSince() { + return since; + } + + public String getUntil() { + return until; + } + + @Override + public EventStream exec() { + return super.exec(); + } + + @Override + public String toString() { + return new StringBuilder("events") + .append(since != null ? " --since=" + since : "") + .append(until != null ? " --until=" + until : "") + .toString(); + } +} diff --git a/src/main/java/com/github/dockerjava/jaxrs/DockerCmdExecFactoryImpl.java b/src/main/java/com/github/dockerjava/jaxrs/DockerCmdExecFactoryImpl.java index 8ba7edd5f..cb59f200b 100644 --- a/src/main/java/com/github/dockerjava/jaxrs/DockerCmdExecFactoryImpl.java +++ b/src/main/java/com/github/dockerjava/jaxrs/DockerCmdExecFactoryImpl.java @@ -7,6 +7,7 @@ import javax.ws.rs.client.ClientBuilder; import javax.ws.rs.client.WebTarget; +import com.github.dockerjava.api.command.EventsCmd; import org.glassfish.jersey.client.ClientConfig; import org.glassfish.jersey.client.ClientProperties; @@ -241,7 +242,12 @@ public PauseContainerCmd.Exec createPauseContainerCmdExec() { public UnpauseContainerCmd.Exec createUnpauseContainerCmdExec() { return new UnpauseContainerCmdExec(baseResource); } - + + @Override + public EventsCmd.Exec createEventsCmdExec() { + return new EventsCmdExec(getBaseResource()); + } + @Override public void close() throws IOException { Preconditions.checkNotNull(client, "Factory not initialized. You probably forgot to call init()!"); diff --git a/src/main/java/com/github/dockerjava/jaxrs/EventsCmdExec.java b/src/main/java/com/github/dockerjava/jaxrs/EventsCmdExec.java new file mode 100644 index 000000000..60739fc61 --- /dev/null +++ b/src/main/java/com/github/dockerjava/jaxrs/EventsCmdExec.java @@ -0,0 +1,29 @@ +package com.github.dockerjava.jaxrs; + +import com.github.dockerjava.api.command.EventsCmd; +import com.github.dockerjava.api.model.EventStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.ws.rs.client.WebTarget; +import javax.ws.rs.core.Response; +import java.io.InputStream; + +public class EventsCmdExec extends AbstrDockerCmdExec implements EventsCmd.Exec { + private static final Logger LOGGER = LoggerFactory.getLogger(EventsCmdExec.class); + + public EventsCmdExec(WebTarget baseResource) { + super(baseResource); + } + + @Override + public EventStream exec(EventsCmd command) { + WebTarget webResource = getBaseResource().path("/events") + .queryParam("since", command.getSince()) + .queryParam("until", command.getUntil()); + + LOGGER.trace("GET: {}", webResource); + InputStream inputStream = webResource.request().get(Response.class).readEntity(InputStream.class); + return EventStream.create(inputStream); + } +} diff --git a/src/test/java/com/github/dockerjava/core/command/EventsCmdImplTest.java b/src/test/java/com/github/dockerjava/core/command/EventsCmdImplTest.java new file mode 100644 index 000000000..33c43ea10 --- /dev/null +++ b/src/test/java/com/github/dockerjava/core/command/EventsCmdImplTest.java @@ -0,0 +1,101 @@ +package com.github.dockerjava.core.command; + +import com.github.dockerjava.api.DockerException; +import com.github.dockerjava.api.command.CreateContainerResponse; +import com.github.dockerjava.api.model.Event; +import com.github.dockerjava.api.model.EventStream; +import com.github.dockerjava.client.AbstractDockerClientTest; +import com.google.common.collect.Lists; +import org.testng.ITestResult; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.AfterTest; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.BeforeTest; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.lang.reflect.Method; +import java.util.List; +import java.util.concurrent.TimeUnit; + +public class EventsCmdImplTest extends AbstractDockerClientTest { + + private static int KNOWN_NUM_EVENTS = 4; + + private static String getEpochTime() { + return String.valueOf(System.currentTimeMillis() / 1000); + } + + @BeforeTest + public void beforeTest() throws DockerException { + super.beforeTest(); + } + + @AfterTest + public void afterTest() { + super.afterTest(); + } + + @BeforeMethod + public void beforeMethod(Method method) { + super.beforeMethod(method); + } + + @AfterMethod + public void afterMethod(ITestResult result) { + super.afterMethod(result); + } + + @Test + public void testEventStreamTimeBound() throws InterruptedException, IOException { + // Don't include other tests events + TimeUnit.SECONDS.sleep(1); + String startTime = getEpochTime(); + generateEvents(); + String endTime = getEpochTime(); + + EventStream eventStream = dockerClient.eventsCmd().withSince(startTime).withUntil(endTime).exec(); + List eventList = pollEvents(eventStream); + eventStream.close(); + LOG.info("Events: {}", eventList); + assertEquals(eventList.size(), KNOWN_NUM_EVENTS, "Expected 4 events, [create, start, die, stop]"); + } + + @Test + public void testEventStream() throws InterruptedException, IOException { + // Don't include other tests events + TimeUnit.SECONDS.sleep(1); + String startTime = getEpochTime(); + generateEvents(); + + EventStream eventStream = dockerClient.eventsCmd().withSince(startTime).exec(); + List eventList = pollEvents(eventStream); + eventStream.close(); + LOG.info("Events: {}", eventList); + assertEquals(eventList.size(), KNOWN_NUM_EVENTS, "Expected 4 events, [create, start, die, stop]"); + } + + /** + * This method generates {#link KNOWN_NUM_EVENTS} events + */ + private void generateEvents() { + String testImage = "busybox"; + asString(dockerClient.pullImageCmd(testImage).exec()); + CreateContainerResponse container1 = dockerClient + .createContainerCmd(testImage).withCmd("echo").exec(); + dockerClient.startContainerCmd(container1.getId()).exec(); + dockerClient.stopContainerCmd(container1.getId()).exec(); + } + + private List pollEvents(EventStream eventStream) throws InterruptedException { + List eventList = Lists.newArrayList(); + Event event = null; + do { + event = eventStream.pollEvent(1, TimeUnit.SECONDS); + if (event != null) { + eventList.add(event); + } + } while (event != null); + return eventList; + } +} From 3d2a3f017f976e8b740ccd653c75b9755706dc98 Mon Sep 17 00:00:00 2001 From: Brendan Almonte Date: Wed, 17 Sep 2014 18:12:12 -0700 Subject: [PATCH 2/4] Events API: Switch to a callback approach rather than a BlockingQueue --- .../github/dockerjava/api/DockerClient.java | 3 +- .../dockerjava/api/command/EventCallback.java | 10 +++ .../dockerjava/api/command/EventsCmd.java | 8 +- .../dockerjava/api/model/EventNotifier.java | 51 +++++++++++ .../dockerjava/api/model/EventStream.java | 85 ------------------- .../dockerjava/core/DockerClientImpl.java | 4 +- .../core/command/EventsCmdImpl.java | 21 +++-- .../dockerjava/jaxrs/EventsCmdExec.java | 8 +- .../core/command/EventsCmdImplTest.java | 69 ++++++++------- 9 files changed, 128 insertions(+), 131 deletions(-) create mode 100644 src/main/java/com/github/dockerjava/api/command/EventCallback.java create mode 100644 src/main/java/com/github/dockerjava/api/model/EventNotifier.java delete mode 100644 src/main/java/com/github/dockerjava/api/model/EventStream.java diff --git a/src/main/java/com/github/dockerjava/api/DockerClient.java b/src/main/java/com/github/dockerjava/api/DockerClient.java index ea2fcefe2..52323942f 100644 --- a/src/main/java/com/github/dockerjava/api/DockerClient.java +++ b/src/main/java/com/github/dockerjava/api/DockerClient.java @@ -10,6 +10,7 @@ import com.github.dockerjava.api.command.CopyFileFromContainerCmd; import com.github.dockerjava.api.command.CreateContainerCmd; import com.github.dockerjava.api.command.CreateImageCmd; +import com.github.dockerjava.api.command.EventCallback; import com.github.dockerjava.api.command.EventsCmd; import com.github.dockerjava.api.command.InfoCmd; import com.github.dockerjava.api.command.InspectContainerCmd; @@ -115,7 +116,7 @@ public CopyFileFromContainerCmd copyFileFromContainerCmd( public UnpauseContainerCmd unpauseContainerCmd(String containerId); - public EventsCmd eventsCmd(); + public EventsCmd eventsCmd(EventCallback eventCallback); public void close() throws IOException; diff --git a/src/main/java/com/github/dockerjava/api/command/EventCallback.java b/src/main/java/com/github/dockerjava/api/command/EventCallback.java new file mode 100644 index 000000000..009cc5998 --- /dev/null +++ b/src/main/java/com/github/dockerjava/api/command/EventCallback.java @@ -0,0 +1,10 @@ +package com.github.dockerjava.api.command; + +import com.github.dockerjava.api.model.Event; + +/** + * Event callback + */ +public interface EventCallback { + public void onEvent(Event event); +} diff --git a/src/main/java/com/github/dockerjava/api/command/EventsCmd.java b/src/main/java/com/github/dockerjava/api/command/EventsCmd.java index 71590c842..28b5f5095 100644 --- a/src/main/java/com/github/dockerjava/api/command/EventsCmd.java +++ b/src/main/java/com/github/dockerjava/api/command/EventsCmd.java @@ -1,6 +1,6 @@ package com.github.dockerjava.api.command; -import com.github.dockerjava.api.model.EventStream; +import com.github.dockerjava.api.model.EventNotifier; /** * Get events @@ -8,7 +8,7 @@ * @param since - Show all events created since timestamp * @param until - Stream events until this timestamp */ -public interface EventsCmd extends DockerCmd { +public interface EventsCmd extends DockerCmd { public EventsCmd withSince(String since); public EventsCmd withUntil(String until); @@ -17,6 +17,8 @@ public interface EventsCmd extends DockerCmd { public String getUntil(); - public static interface Exec extends DockerCmdExec { + public EventCallback getEventCallback(); + + public static interface Exec extends DockerCmdExec { } } diff --git a/src/main/java/com/github/dockerjava/api/model/EventNotifier.java b/src/main/java/com/github/dockerjava/api/model/EventNotifier.java new file mode 100644 index 000000000..f05225765 --- /dev/null +++ b/src/main/java/com/github/dockerjava/api/model/EventNotifier.java @@ -0,0 +1,51 @@ +package com.github.dockerjava.api.model; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonToken; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.github.dockerjava.api.command.EventCallback; +import com.google.common.base.Preconditions; + +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import java.util.concurrent.Callable; + +/** + * EventStream API + *

+ * Spawns a thread to poll for events to fill a BlockingQueue + */ +public class EventNotifier implements Closeable, Callable { + private static final JsonFactory JSON_FACTORY = new JsonFactory(); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private final EventCallback eventCallback; + private final InputStream inputStream; + + private EventNotifier(EventCallback eventCallback, InputStream inputStream) { + this.eventCallback = eventCallback; + this.inputStream = inputStream; + } + + public static EventNotifier create(EventCallback eventCallback, InputStream inputStream) { + Preconditions.checkNotNull(eventCallback, "An EventCallback must be provided"); + Preconditions.checkNotNull(inputStream, "An InputStream must be provided"); + return new EventNotifier(eventCallback, inputStream); + } + + @Override + public void close() throws IOException { + inputStream.close(); + } + + @Override + public Void call() throws Exception { + JsonParser jp = JSON_FACTORY.createParser(inputStream); + while (jp.nextToken() != JsonToken.END_OBJECT && !jp.isClosed()) { + eventCallback.onEvent(OBJECT_MAPPER.readValue(jp, Event.class)); + } + return null; + } +} diff --git a/src/main/java/com/github/dockerjava/api/model/EventStream.java b/src/main/java/com/github/dockerjava/api/model/EventStream.java deleted file mode 100644 index 0f47c25fd..000000000 --- a/src/main/java/com/github/dockerjava/api/model/EventStream.java +++ /dev/null @@ -1,85 +0,0 @@ -package com.github.dockerjava.api.model; - -import com.fasterxml.jackson.core.JsonFactory; -import com.fasterxml.jackson.core.JsonParser; -import com.fasterxml.jackson.core.JsonToken; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.collect.Queues; - -import java.io.Closeable; -import java.io.IOException; -import java.io.InputStream; -import java.util.Queue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; - -/** - * EventStream API - *

- * Spawns a thread to poll for events to fill a BlockingQueue - */ -public class EventStream implements Closeable { - private final ExecutorService executor = Executors.newSingleThreadExecutor(); - private final BlockingQueue queue; - private final EventRunner eventRunner; - - private EventStream(InputStream inputStream) { - queue = Queues.newLinkedBlockingQueue(); - eventRunner = new EventRunner(queue, inputStream); - } - - public static EventStream create(InputStream inputStream) { - return new EventStream(inputStream).startRunner(); - } - - public Event pollEvent() { - return queue.poll(); - } - - public Event pollEvent(long timeout, TimeUnit unit) throws InterruptedException { - return queue.poll(timeout, unit); - } - - @Override - public void close() throws IOException { - eventRunner.initiateStop(); - executor.shutdown(); - } - - private EventStream startRunner() { - executor.execute(eventRunner); - return this; - } - - private static class EventRunner implements Runnable { - private static final JsonFactory JSON_FACTORY = new JsonFactory(); - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - - private final Queue queue; - private final InputStream inputStream; - - public EventRunner(Queue queue, InputStream inputStream) { - this.queue = queue; - this.inputStream = inputStream; - } - - public void initiateStop() throws IOException { - inputStream.close(); - } - - @Override - public void run() { - try { - JsonParser jp = JSON_FACTORY.createParser(inputStream); - while (jp.nextToken() != JsonToken.END_OBJECT && !jp.isClosed()) { - queue.add(OBJECT_MAPPER.readValue(jp, Event.class)); - } - inputStream.close(); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - } -} diff --git a/src/main/java/com/github/dockerjava/core/DockerClientImpl.java b/src/main/java/com/github/dockerjava/core/DockerClientImpl.java index 810476369..5dfb4c13f 100644 --- a/src/main/java/com/github/dockerjava/core/DockerClientImpl.java +++ b/src/main/java/com/github/dockerjava/core/DockerClientImpl.java @@ -253,8 +253,8 @@ public UnpauseContainerCmd unpauseContainerCmd(String containerId) { } @Override - public EventsCmd eventsCmd() { - return new EventsCmdImpl(getDockerCmdExecFactory().createEventsCmdExec()); + public EventsCmd eventsCmd(EventCallback eventCallback) { + return new EventsCmdImpl(getDockerCmdExecFactory().createEventsCmdExec(), eventCallback); } @Override diff --git a/src/main/java/com/github/dockerjava/core/command/EventsCmdImpl.java b/src/main/java/com/github/dockerjava/core/command/EventsCmdImpl.java index 0b045838d..6129aa237 100644 --- a/src/main/java/com/github/dockerjava/core/command/EventsCmdImpl.java +++ b/src/main/java/com/github/dockerjava/core/command/EventsCmdImpl.java @@ -1,20 +1,22 @@ package com.github.dockerjava.core.command; +import com.github.dockerjava.api.command.EventCallback; import com.github.dockerjava.api.command.EventsCmd; -import com.github.dockerjava.api.model.EventStream; - -import java.io.InputStream; +import com.github.dockerjava.api.model.EventNotifier; /** * Stream docker events */ -public class EventsCmdImpl extends AbstrDockerCmd implements EventsCmd { +public class EventsCmdImpl extends AbstrDockerCmd implements EventsCmd { + + private final EventCallback eventCallback; private String since; private String until; - public EventsCmdImpl(EventsCmd.Exec exec) { + public EventsCmdImpl(EventsCmd.Exec exec, EventCallback eventCallback) { super(exec); + this.eventCallback = eventCallback; } @Override @@ -29,16 +31,23 @@ public EventsCmd withUntil(String until) { return this; } + @Override public String getSince() { return since; } + @Override public String getUntil() { return until; } @Override - public EventStream exec() { + public EventCallback getEventCallback() { + return eventCallback; + } + + @Override + public EventNotifier exec() { return super.exec(); } diff --git a/src/main/java/com/github/dockerjava/jaxrs/EventsCmdExec.java b/src/main/java/com/github/dockerjava/jaxrs/EventsCmdExec.java index 60739fc61..8f8a1c832 100644 --- a/src/main/java/com/github/dockerjava/jaxrs/EventsCmdExec.java +++ b/src/main/java/com/github/dockerjava/jaxrs/EventsCmdExec.java @@ -1,7 +1,7 @@ package com.github.dockerjava.jaxrs; import com.github.dockerjava.api.command.EventsCmd; -import com.github.dockerjava.api.model.EventStream; +import com.github.dockerjava.api.model.EventNotifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -9,7 +9,7 @@ import javax.ws.rs.core.Response; import java.io.InputStream; -public class EventsCmdExec extends AbstrDockerCmdExec implements EventsCmd.Exec { +public class EventsCmdExec extends AbstrDockerCmdExec implements EventsCmd.Exec { private static final Logger LOGGER = LoggerFactory.getLogger(EventsCmdExec.class); public EventsCmdExec(WebTarget baseResource) { @@ -17,13 +17,13 @@ public EventsCmdExec(WebTarget baseResource) { } @Override - public EventStream exec(EventsCmd command) { + protected EventNotifier execute(EventsCmd command) { WebTarget webResource = getBaseResource().path("/events") .queryParam("since", command.getSince()) .queryParam("until", command.getUntil()); LOGGER.trace("GET: {}", webResource); InputStream inputStream = webResource.request().get(Response.class).readEntity(InputStream.class); - return EventStream.create(inputStream); + return EventNotifier.create(command.getEventCallback(), inputStream); } } diff --git a/src/test/java/com/github/dockerjava/core/command/EventsCmdImplTest.java b/src/test/java/com/github/dockerjava/core/command/EventsCmdImplTest.java index 33c43ea10..350c840ef 100644 --- a/src/test/java/com/github/dockerjava/core/command/EventsCmdImplTest.java +++ b/src/test/java/com/github/dockerjava/core/command/EventsCmdImplTest.java @@ -2,10 +2,10 @@ import com.github.dockerjava.api.DockerException; import com.github.dockerjava.api.command.CreateContainerResponse; +import com.github.dockerjava.api.command.EventCallback; import com.github.dockerjava.api.model.Event; -import com.github.dockerjava.api.model.EventStream; +import com.github.dockerjava.api.model.EventNotifier; import com.github.dockerjava.client.AbstractDockerClientTest; -import com.google.common.collect.Lists; import org.testng.ITestResult; import org.testng.annotations.AfterMethod; import org.testng.annotations.AfterTest; @@ -15,7 +15,8 @@ import java.io.IOException; import java.lang.reflect.Method; -import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; public class EventsCmdImplTest extends AbstractDockerClientTest { @@ -50,52 +51,60 @@ public void afterMethod(ITestResult result) { public void testEventStreamTimeBound() throws InterruptedException, IOException { // Don't include other tests events TimeUnit.SECONDS.sleep(1); + String startTime = getEpochTime(); - generateEvents(); + int expectedEvents = generateEvents(); String endTime = getEpochTime(); - EventStream eventStream = dockerClient.eventsCmd().withSince(startTime).withUntil(endTime).exec(); - List eventList = pollEvents(eventStream); - eventStream.close(); - LOG.info("Events: {}", eventList); - assertEquals(eventList.size(), KNOWN_NUM_EVENTS, "Expected 4 events, [create, start, die, stop]"); + CountDownLatch countDownLatch = new CountDownLatch(expectedEvents); + EventCallback eventCallback = new EventCallbackTest(countDownLatch); + + EventNotifier eventNotifier = dockerClient.eventsCmd(eventCallback).withSince(startTime).withUntil(endTime).exec(); + boolean zeroCount = countDownLatch.await(30, TimeUnit.SECONDS); + eventNotifier.close(); + assertTrue(zeroCount, "Expected 4 events, [create, start, die, stop]"); } @Test - public void testEventStream() throws InterruptedException, IOException { + public void testEventStreaming() throws InterruptedException, IOException { // Don't include other tests events TimeUnit.SECONDS.sleep(1); - String startTime = getEpochTime(); + + CountDownLatch countDownLatch = new CountDownLatch(KNOWN_NUM_EVENTS); + EventCallback eventCallback = new EventCallbackTest(countDownLatch); + EventNotifier eventNotifier = dockerClient.eventsCmd(eventCallback).withSince(getEpochTime()).exec(); + generateEvents(); - EventStream eventStream = dockerClient.eventsCmd().withSince(startTime).exec(); - List eventList = pollEvents(eventStream); - eventStream.close(); - LOG.info("Events: {}", eventList); - assertEquals(eventList.size(), KNOWN_NUM_EVENTS, "Expected 4 events, [create, start, die, stop]"); + boolean zeroCount = countDownLatch.await(30, TimeUnit.SECONDS); + eventNotifier.close(); + assertTrue(zeroCount, "Expected 4 events, [create, start, die, stop]"); } /** * This method generates {#link KNOWN_NUM_EVENTS} events */ - private void generateEvents() { + private int generateEvents() { String testImage = "busybox"; asString(dockerClient.pullImageCmd(testImage).exec()); - CreateContainerResponse container1 = dockerClient + CreateContainerResponse container = dockerClient .createContainerCmd(testImage).withCmd("echo").exec(); - dockerClient.startContainerCmd(container1.getId()).exec(); - dockerClient.stopContainerCmd(container1.getId()).exec(); + dockerClient.startContainerCmd(container.getId()).exec(); + dockerClient.stopContainerCmd(container.getId()).exec(); + return KNOWN_NUM_EVENTS; } - private List pollEvents(EventStream eventStream) throws InterruptedException { - List eventList = Lists.newArrayList(); - Event event = null; - do { - event = eventStream.pollEvent(1, TimeUnit.SECONDS); - if (event != null) { - eventList.add(event); - } - } while (event != null); - return eventList; + private class EventCallbackTest implements EventCallback { + private final CountDownLatch countDownLatch; + + public EventCallbackTest(CountDownLatch countDownLatch) { + this.countDownLatch = countDownLatch; + } + + @Override + public void onEvent(Event event) { + LOG.info("Received event #{}: {}", countDownLatch.getCount(), event); + countDownLatch.countDown(); + } } } From 5ad41ae313093d91e65e31f797310d63d0c9baed Mon Sep 17 00:00:00 2001 From: Brendan Almonte Date: Sat, 20 Sep 2014 23:15:20 -0700 Subject: [PATCH 3/4] EventsAPI: Complete callback based EventNotifier --- .../dockerjava/api/command/EventsCmd.java | 10 +++-- .../dockerjava/api/model/EventNotifier.java | 41 ++++++++++--------- .../core/command/EventsCmdImpl.java | 19 +++++++-- .../dockerjava/jaxrs/EventsCmdExec.java | 11 +++-- .../core/command/EventsCmdImplTest.java | 20 +++++---- 5 files changed, 61 insertions(+), 40 deletions(-) diff --git a/src/main/java/com/github/dockerjava/api/command/EventsCmd.java b/src/main/java/com/github/dockerjava/api/command/EventsCmd.java index 28b5f5095..0386c86aa 100644 --- a/src/main/java/com/github/dockerjava/api/command/EventsCmd.java +++ b/src/main/java/com/github/dockerjava/api/command/EventsCmd.java @@ -1,6 +1,6 @@ package com.github.dockerjava.api.command; -import com.github.dockerjava.api.model.EventNotifier; +import java.util.concurrent.ExecutorService; /** * Get events @@ -8,7 +8,7 @@ * @param since - Show all events created since timestamp * @param until - Stream events until this timestamp */ -public interface EventsCmd extends DockerCmd { +public interface EventsCmd extends DockerCmd { public EventsCmd withSince(String since); public EventsCmd withUntil(String until); @@ -19,6 +19,10 @@ public interface EventsCmd extends DockerCmd { public EventCallback getEventCallback(); - public static interface Exec extends DockerCmdExec { + public ExecutorService getExecutorService(); + + public void stop(); + + public static interface Exec extends DockerCmdExec { } } diff --git a/src/main/java/com/github/dockerjava/api/model/EventNotifier.java b/src/main/java/com/github/dockerjava/api/model/EventNotifier.java index f05225765..39d42931e 100644 --- a/src/main/java/com/github/dockerjava/api/model/EventNotifier.java +++ b/src/main/java/com/github/dockerjava/api/model/EventNotifier.java @@ -7,44 +7,45 @@ import com.github.dockerjava.api.command.EventCallback; import com.google.common.base.Preconditions; -import java.io.Closeable; -import java.io.IOException; +import javax.ws.rs.client.WebTarget; +import javax.ws.rs.core.Response; import java.io.InputStream; import java.util.concurrent.Callable; /** - * EventStream API - *

- * Spawns a thread to poll for events to fill a BlockingQueue + * EventNotifier API */ -public class EventNotifier implements Closeable, Callable { +public class EventNotifier implements Callable { private static final JsonFactory JSON_FACTORY = new JsonFactory(); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private final EventCallback eventCallback; - private final InputStream inputStream; + private final WebTarget webTarget; - private EventNotifier(EventCallback eventCallback, InputStream inputStream) { + private EventNotifier(EventCallback eventCallback, WebTarget webTarget) { this.eventCallback = eventCallback; - this.inputStream = inputStream; + this.webTarget = webTarget; } - public static EventNotifier create(EventCallback eventCallback, InputStream inputStream) { + public static EventNotifier create(EventCallback eventCallback, WebTarget webTarget) { Preconditions.checkNotNull(eventCallback, "An EventCallback must be provided"); - Preconditions.checkNotNull(inputStream, "An InputStream must be provided"); - return new EventNotifier(eventCallback, inputStream); - } - - @Override - public void close() throws IOException { - inputStream.close(); + Preconditions.checkNotNull(webTarget, "An WebTarget must be provided"); + return new EventNotifier(eventCallback, webTarget); } @Override public Void call() throws Exception { - JsonParser jp = JSON_FACTORY.createParser(inputStream); - while (jp.nextToken() != JsonToken.END_OBJECT && !jp.isClosed()) { - eventCallback.onEvent(OBJECT_MAPPER.readValue(jp, Event.class)); + Response response = webTarget.request().get(Response.class); + InputStream inputStream = response.readEntity(InputStream.class); + try { + JsonParser jp = JSON_FACTORY.createParser(inputStream); + while (jp.nextToken() != JsonToken.END_OBJECT && !jp.isClosed()) { + eventCallback.onEvent(OBJECT_MAPPER.readValue(jp, Event.class)); + } + } finally { + if (response != null) { + response.close(); + } } return null; } diff --git a/src/main/java/com/github/dockerjava/core/command/EventsCmdImpl.java b/src/main/java/com/github/dockerjava/core/command/EventsCmdImpl.java index 6129aa237..5f053ed9d 100644 --- a/src/main/java/com/github/dockerjava/core/command/EventsCmdImpl.java +++ b/src/main/java/com/github/dockerjava/core/command/EventsCmdImpl.java @@ -2,15 +2,18 @@ import com.github.dockerjava.api.command.EventCallback; import com.github.dockerjava.api.command.EventsCmd; -import com.github.dockerjava.api.model.EventNotifier; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; /** * Stream docker events */ -public class EventsCmdImpl extends AbstrDockerCmd implements EventsCmd { +public class EventsCmdImpl extends AbstrDockerCmd implements EventsCmd { private final EventCallback eventCallback; + private ExecutorService executorService = Executors.newSingleThreadExecutor(); private String since; private String until; @@ -47,7 +50,17 @@ public EventCallback getEventCallback() { } @Override - public EventNotifier exec() { + public ExecutorService getExecutorService() { + return executorService; + } + + @Override + public void stop() { + executorService.shutdown(); + } + + @Override + public Void exec() { return super.exec(); } diff --git a/src/main/java/com/github/dockerjava/jaxrs/EventsCmdExec.java b/src/main/java/com/github/dockerjava/jaxrs/EventsCmdExec.java index 8f8a1c832..8511c048e 100644 --- a/src/main/java/com/github/dockerjava/jaxrs/EventsCmdExec.java +++ b/src/main/java/com/github/dockerjava/jaxrs/EventsCmdExec.java @@ -6,10 +6,8 @@ import org.slf4j.LoggerFactory; import javax.ws.rs.client.WebTarget; -import javax.ws.rs.core.Response; -import java.io.InputStream; -public class EventsCmdExec extends AbstrDockerCmdExec implements EventsCmd.Exec { +public class EventsCmdExec extends AbstrDockerCmdExec implements EventsCmd.Exec { private static final Logger LOGGER = LoggerFactory.getLogger(EventsCmdExec.class); public EventsCmdExec(WebTarget baseResource) { @@ -17,13 +15,14 @@ public EventsCmdExec(WebTarget baseResource) { } @Override - protected EventNotifier execute(EventsCmd command) { + protected Void execute(EventsCmd command) { WebTarget webResource = getBaseResource().path("/events") .queryParam("since", command.getSince()) .queryParam("until", command.getUntil()); LOGGER.trace("GET: {}", webResource); - InputStream inputStream = webResource.request().get(Response.class).readEntity(InputStream.class); - return EventNotifier.create(command.getEventCallback(), inputStream); + EventNotifier eventNotifier = EventNotifier.create(command.getEventCallback(), webResource); + command.getExecutorService().submit(eventNotifier); + return null; } } diff --git a/src/test/java/com/github/dockerjava/core/command/EventsCmdImplTest.java b/src/test/java/com/github/dockerjava/core/command/EventsCmdImplTest.java index 350c840ef..82bc7ffee 100644 --- a/src/test/java/com/github/dockerjava/core/command/EventsCmdImplTest.java +++ b/src/test/java/com/github/dockerjava/core/command/EventsCmdImplTest.java @@ -3,8 +3,8 @@ import com.github.dockerjava.api.DockerException; import com.github.dockerjava.api.command.CreateContainerResponse; import com.github.dockerjava.api.command.EventCallback; +import com.github.dockerjava.api.command.EventsCmd; import com.github.dockerjava.api.model.Event; -import com.github.dockerjava.api.model.EventNotifier; import com.github.dockerjava.client.AbstractDockerClientTest; import org.testng.ITestResult; import org.testng.annotations.AfterMethod; @@ -15,7 +15,6 @@ import java.io.IOException; import java.lang.reflect.Method; -import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -59,9 +58,12 @@ public void testEventStreamTimeBound() throws InterruptedException, IOException CountDownLatch countDownLatch = new CountDownLatch(expectedEvents); EventCallback eventCallback = new EventCallbackTest(countDownLatch); - EventNotifier eventNotifier = dockerClient.eventsCmd(eventCallback).withSince(startTime).withUntil(endTime).exec(); - boolean zeroCount = countDownLatch.await(30, TimeUnit.SECONDS); - eventNotifier.close(); + EventsCmd eventsCmd = dockerClient.eventsCmd(eventCallback).withSince(startTime).withUntil(endTime); + eventsCmd.exec(); + + boolean zeroCount = countDownLatch.await(5, TimeUnit.SECONDS); + + eventsCmd.stop(); assertTrue(zeroCount, "Expected 4 events, [create, start, die, stop]"); } @@ -72,12 +74,14 @@ public void testEventStreaming() throws InterruptedException, IOException { CountDownLatch countDownLatch = new CountDownLatch(KNOWN_NUM_EVENTS); EventCallback eventCallback = new EventCallbackTest(countDownLatch); - EventNotifier eventNotifier = dockerClient.eventsCmd(eventCallback).withSince(getEpochTime()).exec(); + + EventsCmd eventsCmd = dockerClient.eventsCmd(eventCallback).withSince(getEpochTime()); + eventsCmd.exec(); generateEvents(); - boolean zeroCount = countDownLatch.await(30, TimeUnit.SECONDS); - eventNotifier.close(); + boolean zeroCount = countDownLatch.await(5, TimeUnit.SECONDS); + eventsCmd.stop(); assertTrue(zeroCount, "Expected 4 events, [create, start, die, stop]"); } From f79e5112452f8ea03f4c73c95d8456e811e4b26c Mon Sep 17 00:00:00 2001 From: Marcus Linke Date: Mon, 22 Sep 2014 21:59:28 +0200 Subject: [PATCH 4/4] Modified Event Steam API --- .../dockerjava/api/command/EventsCmd.java | 11 ++-- .../core/command/EventsCmdImpl.java | 31 ++++------ .../dockerjava/jaxrs/EventsCmdExec.java | 62 +++++++++++++++++-- .../core/command/EventsCmdImplTest.java | 10 +-- 4 files changed, 80 insertions(+), 34 deletions(-) diff --git a/src/main/java/com/github/dockerjava/api/command/EventsCmd.java b/src/main/java/com/github/dockerjava/api/command/EventsCmd.java index 0386c86aa..cfdb23a64 100644 --- a/src/main/java/com/github/dockerjava/api/command/EventsCmd.java +++ b/src/main/java/com/github/dockerjava/api/command/EventsCmd.java @@ -2,13 +2,14 @@ import java.util.concurrent.ExecutorService; + /** * Get events * * @param since - Show all events created since timestamp * @param until - Stream events until this timestamp */ -public interface EventsCmd extends DockerCmd { +public interface EventsCmd extends DockerCmd { public EventsCmd withSince(String since); public EventsCmd withUntil(String until); @@ -18,11 +19,9 @@ public interface EventsCmd extends DockerCmd { public String getUntil(); public EventCallback getEventCallback(); + + public EventsCmd withEventCallback(EventCallback eventCallback); - public ExecutorService getExecutorService(); - - public void stop(); - - public static interface Exec extends DockerCmdExec { + public static interface Exec extends DockerCmdExec { } } diff --git a/src/main/java/com/github/dockerjava/core/command/EventsCmdImpl.java b/src/main/java/com/github/dockerjava/core/command/EventsCmdImpl.java index 5f053ed9d..ac55de714 100644 --- a/src/main/java/com/github/dockerjava/core/command/EventsCmdImpl.java +++ b/src/main/java/com/github/dockerjava/core/command/EventsCmdImpl.java @@ -1,25 +1,22 @@ package com.github.dockerjava.core.command; +import java.util.concurrent.ExecutorService; + import com.github.dockerjava.api.command.EventCallback; import com.github.dockerjava.api.command.EventsCmd; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - /** * Stream docker events */ -public class EventsCmdImpl extends AbstrDockerCmd implements EventsCmd { +public class EventsCmdImpl extends AbstrDockerCmd implements EventsCmd { - private final EventCallback eventCallback; - - private ExecutorService executorService = Executors.newSingleThreadExecutor(); private String since; private String until; + private EventCallback eventCallback; public EventsCmdImpl(EventsCmd.Exec exec, EventCallback eventCallback) { super(exec); - this.eventCallback = eventCallback; + withEventCallback(eventCallback); } @Override @@ -33,6 +30,12 @@ public EventsCmd withUntil(String until) { this.until = until; return this; } + + @Override + public EventsCmd withEventCallback(EventCallback eventCallback) { + this.eventCallback = eventCallback; + return this; + } @Override public String getSince() { @@ -50,17 +53,7 @@ public EventCallback getEventCallback() { } @Override - public ExecutorService getExecutorService() { - return executorService; - } - - @Override - public void stop() { - executorService.shutdown(); - } - - @Override - public Void exec() { + public ExecutorService exec() { return super.exec(); } diff --git a/src/main/java/com/github/dockerjava/jaxrs/EventsCmdExec.java b/src/main/java/com/github/dockerjava/jaxrs/EventsCmdExec.java index 8511c048e..63db6772a 100644 --- a/src/main/java/com/github/dockerjava/jaxrs/EventsCmdExec.java +++ b/src/main/java/com/github/dockerjava/jaxrs/EventsCmdExec.java @@ -1,28 +1,80 @@ package com.github.dockerjava.jaxrs; +import java.io.InputStream; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonToken; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.github.dockerjava.api.command.EventCallback; import com.github.dockerjava.api.command.EventsCmd; +import com.github.dockerjava.api.model.Event; import com.github.dockerjava.api.model.EventNotifier; +import com.google.common.base.Preconditions; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.ws.rs.client.WebTarget; +import javax.ws.rs.core.Response; -public class EventsCmdExec extends AbstrDockerCmdExec implements EventsCmd.Exec { +public class EventsCmdExec extends AbstrDockerCmdExec implements EventsCmd.Exec { private static final Logger LOGGER = LoggerFactory.getLogger(EventsCmdExec.class); - + public EventsCmdExec(WebTarget baseResource) { super(baseResource); } @Override - protected Void execute(EventsCmd command) { + protected ExecutorService execute(EventsCmd command) { + ExecutorService executorService = Executors.newSingleThreadExecutor(); + WebTarget webResource = getBaseResource().path("/events") .queryParam("since", command.getSince()) .queryParam("until", command.getUntil()); LOGGER.trace("GET: {}", webResource); EventNotifier eventNotifier = EventNotifier.create(command.getEventCallback(), webResource); - command.getExecutorService().submit(eventNotifier); - return null; + executorService.submit(eventNotifier); + return executorService; + } + + private static class EventNotifier implements Callable { + private static final JsonFactory JSON_FACTORY = new JsonFactory(); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private final EventCallback eventCallback; + private final WebTarget webTarget; + + private EventNotifier(EventCallback eventCallback, WebTarget webTarget) { + this.eventCallback = eventCallback; + this.webTarget = webTarget; + } + + public static EventNotifier create(EventCallback eventCallback, WebTarget webTarget) { + Preconditions.checkNotNull(eventCallback, "An EventCallback must be provided"); + Preconditions.checkNotNull(webTarget, "An WebTarget must be provided"); + return new EventNotifier(eventCallback, webTarget); + } + + @Override + public Void call() throws Exception { + Response response = webTarget.request().get(Response.class); + InputStream inputStream = response.readEntity(InputStream.class); + try { + JsonParser jp = JSON_FACTORY.createParser(inputStream); + while (jp.nextToken() != JsonToken.END_OBJECT && !jp.isClosed()) { + eventCallback.onEvent(OBJECT_MAPPER.readValue(jp, Event.class)); + } + } finally { + if (response != null) { + response.close(); + } + } + return null; + } } } diff --git a/src/test/java/com/github/dockerjava/core/command/EventsCmdImplTest.java b/src/test/java/com/github/dockerjava/core/command/EventsCmdImplTest.java index 82bc7ffee..6d3651c07 100644 --- a/src/test/java/com/github/dockerjava/core/command/EventsCmdImplTest.java +++ b/src/test/java/com/github/dockerjava/core/command/EventsCmdImplTest.java @@ -6,6 +6,7 @@ import com.github.dockerjava.api.command.EventsCmd; import com.github.dockerjava.api.model.Event; import com.github.dockerjava.client.AbstractDockerClientTest; + import org.testng.ITestResult; import org.testng.annotations.AfterMethod; import org.testng.annotations.AfterTest; @@ -16,6 +17,7 @@ import java.io.IOException; import java.lang.reflect.Method; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; public class EventsCmdImplTest extends AbstractDockerClientTest { @@ -59,11 +61,11 @@ public void testEventStreamTimeBound() throws InterruptedException, IOException EventCallback eventCallback = new EventCallbackTest(countDownLatch); EventsCmd eventsCmd = dockerClient.eventsCmd(eventCallback).withSince(startTime).withUntil(endTime); - eventsCmd.exec(); + ExecutorService executorService = eventsCmd.exec(); boolean zeroCount = countDownLatch.await(5, TimeUnit.SECONDS); - eventsCmd.stop(); + executorService.shutdown(); assertTrue(zeroCount, "Expected 4 events, [create, start, die, stop]"); } @@ -76,12 +78,12 @@ public void testEventStreaming() throws InterruptedException, IOException { EventCallback eventCallback = new EventCallbackTest(countDownLatch); EventsCmd eventsCmd = dockerClient.eventsCmd(eventCallback).withSince(getEpochTime()); - eventsCmd.exec(); + ExecutorService executorService = eventsCmd.exec(); generateEvents(); boolean zeroCount = countDownLatch.await(5, TimeUnit.SECONDS); - eventsCmd.stop(); + executorService.shutdown(); assertTrue(zeroCount, "Expected 4 events, [create, start, die, stop]"); }