From d550b494b238eed545cca30eaa034b4fa238aab2 Mon Sep 17 00:00:00 2001 From: zebin kang Date: Sat, 8 Jul 2017 17:16:28 -0400 Subject: [PATCH 1/3] modify bench for record latency --- src/test/java/io/nats/examples/NatsBench.java | 53 ++++++++++++++++--- src/test/java/io/nats/examples/Payload.java | 28 ++++++++++ 2 files changed, 74 insertions(+), 7 deletions(-) create mode 100644 src/test/java/io/nats/examples/Payload.java diff --git a/src/test/java/io/nats/examples/NatsBench.java b/src/test/java/io/nats/examples/NatsBench.java index e882317b4..45ecbe1a4 100644 --- a/src/test/java/io/nats/examples/NatsBench.java +++ b/src/test/java/io/nats/examples/NatsBench.java @@ -30,12 +30,17 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.io.BufferedWriter; +import java.io.FileWriter; + + /** * A utility class for measuring NATS performance. */ public class NatsBench { final BlockingQueue errorQueue = new LinkedBlockingQueue(); + final int TIME_SLICE=1000; // Default test values private int numMsgs = 100000; @@ -86,6 +91,7 @@ public NatsBench(String[] args) { } } + /** * Properties-based constructor for NatsBench. * @@ -165,10 +171,16 @@ public void onException(NATSException ex) { final long start = System.nanoTime(); Subscription sub = nc.subscribe(subject, new MessageHandler() { + private Payload convertor=new Payload(size); + private long[] timeSlots=new long[numMsgs]; + private int i=0; @Override - public void onMessage(Message msg) { + public void onMessage(Message msg) { received.incrementAndGet(); + timeSlots[i] = System.nanoTime() - convertor.extractTime(msg.getData()); + i += 1; if (received.get() >= numMsgs) { + concludeData(timeSlots,"latency/subTime.csv"); bench.addSubSample(new Sample(numMsgs, size, start, System.nanoTime(), nc)); phaser.arrive(); nc.setDisconnectedCallback(null); @@ -203,27 +215,54 @@ public void run() { public void runPublisher() throws Exception { try (Connection nc = Nats.connect(urls, opts)) { - byte[] payload = null; - if (size > 0) { - payload = new byte[size]; - } - + Payload convertor=new Payload(size); final long start = System.nanoTime(); + long[] timeSlots=new long[numMsgs+1]; + for (int i = 0; i < numMsgs; i++) { + timeSlots[i]=System.nanoTime(); sent.incrementAndGet(); - nc.publish(subject, payload); + nc.publish(subject, convertor.preparePayload()); } + timeSlots[numMsgs]=System.nanoTime(); nc.flush(); bench.addPubSample(new Sample(numMsgs, size, start, System.nanoTime(), nc)); Statistics s = nc.getStats(); System.out.println("NATS publish connection statistics:"); System.out.printf(" Bytes out: %d\n", s.getOutBytes()); System.out.printf(" Msgs out: %d\n", s.getOutMsgs()); + for(int i=0;i Date: Sun, 9 Jul 2017 12:46:47 -0400 Subject: [PATCH 2/3] move the report part to the class Benchmark --- .../java/io/nats/benchmark/Benchmark.java | 66 +++++++++++++++-- .../benchmark/BenchmarkFunctionalTest.java | 4 +- src/test/java/io/nats/examples/NatsBench.java | 71 +++++-------------- src/test/java/io/nats/examples/Payload.java | 13 +++- 4 files changed, 92 insertions(+), 62 deletions(-) diff --git a/src/main/java/io/nats/benchmark/Benchmark.java b/src/main/java/io/nats/benchmark/Benchmark.java index 92d0caae6..05989156d 100644 --- a/src/main/java/io/nats/benchmark/Benchmark.java +++ b/src/main/java/io/nats/benchmark/Benchmark.java @@ -7,7 +7,12 @@ package io.nats.benchmark; import io.nats.client.NUID; + +import java.io.BufferedWriter; +import java.io.FileWriter; +import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; @@ -16,19 +21,22 @@ * A utility class for collecting and calculating benchmark metrics. */ public class Benchmark extends Sample { + private final int TIME_SLICE=1000; private String name = null; private String runId = null; private final SampleGroup pubs = new SampleGroup(); private final SampleGroup subs = new SampleGroup(); private BlockingQueue pubChannel; private BlockingQueue subChannel; + private int numMsgs; + private long[] pubTimeSlots; + private long[] subTimeSlots; + private int[] pubSlotIndex; + private int[] subSlotIndex; - public Benchmark() { - // TODO Auto-generated constructor stub - } - public Benchmark(String name, int subCnt, int pubCnt) { - this(name, NUID.nextGlobal(), subCnt, pubCnt); + public Benchmark(String name, int subCnt, int pubCnt, int numMsgs) { + this(name, NUID.nextGlobal(), subCnt, pubCnt, numMsgs); } /** @@ -36,15 +44,19 @@ public Benchmark(String name, int subCnt, int pubCnt) { * collecting samples, call endBenchmark. * * @param name a descriptive name for this test run - * @param runId a unique id for this test run (typically a guid) * @param subCnt the number of subscribers * @param pubCnt the number of publishers */ - public Benchmark(String name, String runId, int subCnt, int pubCnt) { + public Benchmark(String name, String runId, int subCnt, int pubCnt,int num) { this.name = name; this.runId = runId; this.subChannel = new LinkedBlockingQueue(); this.pubChannel = new LinkedBlockingQueue(); + numMsgs=num; + subTimeSlots=new long[subCnt*num]; + pubTimeSlots=new long[pubCnt*num]; + subSlotIndex=new int[subCnt]; + pubSlotIndex=new int[pubCnt]; } public final void addPubSample(Sample sample) { @@ -55,6 +67,16 @@ public final void addSubSample(Sample sample) { subChannel.add(sample); } + public final void addPubLatency(int pubIndex, long latency) { + pubTimeSlots[pubSlotIndex[pubIndex]+numMsgs*pubIndex]=latency; + pubSlotIndex[pubIndex]+=1; + } + + public final void addSubLatency(int subIndex, long latency) { + subTimeSlots[subSlotIndex[subIndex]+numMsgs*subIndex]=latency; + subSlotIndex[subIndex]+=1; + } + /** * Closes this benchmark and calculates totals and times. */ @@ -89,6 +111,7 @@ public final void close() { * @return the report as a String. */ public final String report() { + reportLatency(); StringBuilder sb = new StringBuilder(); String indent = ""; if (pubs.hasSamples() && subs.hasSamples()) { @@ -174,4 +197,33 @@ public final SampleGroup getPubs() { public final SampleGroup getSubs() { return subs; } + + private void reportLatency(){ + if(pubSlotIndex.length>0) + exportLatencyData(pubTimeSlots,"latency/pubTime.csv"); + if(subSlotIndex.length>0) + exportLatencyData(subTimeSlots,"latency/subTime.csv"); + } + + private void exportLatencyData(long[] timeSlots, String fileName) { + Arrays.sort(timeSlots); + int numMsgsForEachSlice = timeSlots.length / TIME_SLICE; + int indexWithinSlice = 0; + int indexOfSlice = 0; + try (BufferedWriter bw = new BufferedWriter(new FileWriter(fileName))) { + bw.write("Index,Latency\n"); + for (int i = 0; i < numMsgs; i += 1) { + indexWithinSlice += 1; + if (indexWithinSlice == numMsgsForEachSlice) { + indexOfSlice += 1; + bw.write(indexOfSlice + "," + timeSlots[i - numMsgsForEachSlice + 1] + "\n"); + indexWithinSlice = 0; + } + } + } catch (IOException e) { + + e.printStackTrace(); + + } + } } diff --git a/src/test/java/io/nats/benchmark/BenchmarkFunctionalTest.java b/src/test/java/io/nats/benchmark/BenchmarkFunctionalTest.java index 390b0df4f..f3435e6a0 100644 --- a/src/test/java/io/nats/benchmark/BenchmarkFunctionalTest.java +++ b/src/test/java/io/nats/benchmark/BenchmarkFunctionalTest.java @@ -189,7 +189,7 @@ public void testStdDev() { @Test public void testBenchSetup() { - Benchmark bench = new Benchmark("test", 1, 1); + Benchmark bench = new Benchmark("test", 1, 1, 1000); bench.addPubSample(millionMessagesSecondSample(1)); bench.addSubSample(millionMessagesSecondSample(1)); bench.close(); @@ -210,7 +210,7 @@ public void testBenchSetup() { * @return the created Benchmark */ private Benchmark makeBench(int subs, int pubs) { - Benchmark bench = new Benchmark("test", subs, pubs); + Benchmark bench = new Benchmark("test", subs, pubs, 1000); for (int i = 0; i < subs; i++) { bench.addSubSample(millionMessagesSecondSample(1)); } diff --git a/src/test/java/io/nats/examples/NatsBench.java b/src/test/java/io/nats/examples/NatsBench.java index 45ecbe1a4..918a2e069 100644 --- a/src/test/java/io/nats/examples/NatsBench.java +++ b/src/test/java/io/nats/examples/NatsBench.java @@ -30,9 +30,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import java.io.BufferedWriter; -import java.io.FileWriter; - /** * A utility class for measuring NATS performance. @@ -40,7 +37,6 @@ public class NatsBench { final BlockingQueue errorQueue = new LinkedBlockingQueue(); - final int TIME_SLICE=1000; // Default test values private int numMsgs = 100000; @@ -117,11 +113,13 @@ class Worker implements Runnable { final Phaser phaser; final int num; final int size; + final int workerIndex; - Worker(Phaser phaser, int numMsgs, int size) { + Worker(Phaser phaser, int numMsgs, int size, int i) { this.phaser = phaser; this.num = numMsgs; this.size = size; + workerIndex=i; } @Override @@ -130,8 +128,8 @@ public void run() { } class SubWorker extends Worker { - SubWorker(Phaser phaser, int numMsgs, int size) { - super(phaser, numMsgs, size); + SubWorker(Phaser phaser, int numMsgs, int size,int subIndex) { + super(phaser, numMsgs, size,subIndex); } @Override @@ -170,17 +168,13 @@ public void onException(NATSException ex) { }); final long start = System.nanoTime(); - Subscription sub = nc.subscribe(subject, new MessageHandler() { - private Payload convertor=new Payload(size); - private long[] timeSlots=new long[numMsgs]; - private int i=0; + Subscription sub = nc.subscribe(subject+workerIndex, new MessageHandler() { + private Payload payload =new Payload(size); @Override public void onMessage(Message msg) { received.incrementAndGet(); - timeSlots[i] = System.nanoTime() - convertor.extractTime(msg.getData()); - i += 1; - if (received.get() >= numMsgs) { - concludeData(timeSlots,"latency/subTime.csv"); + bench.addSubLatency(workerIndex,System.nanoTime() - payload.extractTime(msg.getData())); + if (payload.extractMsgIndex(msg.getData()) >= numMsgs) { bench.addSubSample(new Sample(numMsgs, size, start, System.nanoTime(), nc)); phaser.arrive(); nc.setDisconnectedCallback(null); @@ -198,8 +192,8 @@ public void onMessage(Message msg) { } class PubWorker extends Worker { - PubWorker(Phaser phaser, int numMsgs, int size) { - super(phaser, numMsgs, size); + PubWorker(Phaser phaser, int numMsgs, int size,int pubIndex) { + super(phaser, numMsgs, size,pubIndex); } @Override @@ -215,53 +209,26 @@ public void run() { public void runPublisher() throws Exception { try (Connection nc = Nats.connect(urls, opts)) { - Payload convertor=new Payload(size); + Payload payload=new Payload(size); final long start = System.nanoTime(); - - long[] timeSlots=new long[numMsgs+1]; + long previousTime; for (int i = 0; i < numMsgs; i++) { - timeSlots[i]=System.nanoTime(); + previousTime=System.nanoTime(); sent.incrementAndGet(); - nc.publish(subject, convertor.preparePayload()); + nc.publish(subject+workerIndex, payload.preparePayload(i+1)); + bench.addPubLatency(workerIndex,System.nanoTime()-previousTime); } - timeSlots[numMsgs]=System.nanoTime(); nc.flush(); bench.addPubSample(new Sample(numMsgs, size, start, System.nanoTime(), nc)); Statistics s = nc.getStats(); System.out.println("NATS publish connection statistics:"); System.out.printf(" Bytes out: %d\n", s.getOutBytes()); System.out.printf(" Msgs out: %d\n", s.getOutMsgs()); - for(int i=0;i Date: Mon, 10 Jul 2017 22:25:09 -0400 Subject: [PATCH 3/3] add some logs --- .../java/io/nats/benchmark/Benchmark.java | 10 ++---- src/test/java/io/nats/examples/NatsBench.java | 33 +++++++++++++++---- src/test/java/io/nats/examples/Payload.java | 4 +-- 3 files changed, 31 insertions(+), 16 deletions(-) diff --git a/src/main/java/io/nats/benchmark/Benchmark.java b/src/main/java/io/nats/benchmark/Benchmark.java index 05989156d..cf5db13dc 100644 --- a/src/main/java/io/nats/benchmark/Benchmark.java +++ b/src/main/java/io/nats/benchmark/Benchmark.java @@ -21,7 +21,6 @@ * A utility class for collecting and calculating benchmark metrics. */ public class Benchmark extends Sample { - private final int TIME_SLICE=1000; private String name = null; private String runId = null; private final SampleGroup pubs = new SampleGroup(); @@ -35,9 +34,6 @@ public class Benchmark extends Sample { private int[] subSlotIndex; - public Benchmark(String name, int subCnt, int pubCnt, int numMsgs) { - this(name, NUID.nextGlobal(), subCnt, pubCnt, numMsgs); - } /** * Initializes a Benchmark. After creating a bench call addSubSample/addPubSample. When done @@ -47,9 +43,8 @@ public Benchmark(String name, int subCnt, int pubCnt, int numMsgs) { * @param subCnt the number of subscribers * @param pubCnt the number of publishers */ - public Benchmark(String name, String runId, int subCnt, int pubCnt,int num) { + public Benchmark(String name, int subCnt, int pubCnt,int num) { this.name = name; - this.runId = runId; this.subChannel = new LinkedBlockingQueue(); this.pubChannel = new LinkedBlockingQueue(); numMsgs=num; @@ -207,12 +202,13 @@ private void reportLatency(){ private void exportLatencyData(long[] timeSlots, String fileName) { Arrays.sort(timeSlots); + int TIME_SLICE = 1000; int numMsgsForEachSlice = timeSlots.length / TIME_SLICE; int indexWithinSlice = 0; int indexOfSlice = 0; try (BufferedWriter bw = new BufferedWriter(new FileWriter(fileName))) { bw.write("Index,Latency\n"); - for (int i = 0; i < numMsgs; i += 1) { + for (int i = 0; i < timeSlots.length; i += 1) { indexWithinSlice += 1; if (indexWithinSlice == numMsgsForEachSlice) { indexOfSlice += 1; diff --git a/src/test/java/io/nats/examples/NatsBench.java b/src/test/java/io/nats/examples/NatsBench.java index 918a2e069..235d23f0c 100644 --- a/src/test/java/io/nats/examples/NatsBench.java +++ b/src/test/java/io/nats/examples/NatsBench.java @@ -40,7 +40,7 @@ public class NatsBench { // Default test values private int numMsgs = 100000; - private int numPubs = 1; + private int numPubs = 0; private int numSubs = 0; private int size = 128; @@ -170,11 +170,21 @@ public void onException(NATSException ex) { final long start = System.nanoTime(); Subscription sub = nc.subscribe(subject+workerIndex, new MessageHandler() { private Payload payload =new Payload(size); + private long currTime=0; + private long i=0; @Override public void onMessage(Message msg) { + currTime=System.currentTimeMillis(); received.incrementAndGet(); - bench.addSubLatency(workerIndex,System.nanoTime() - payload.extractTime(msg.getData())); - if (payload.extractMsgIndex(msg.getData()) >= numMsgs) { + bench.addSubLatency(workerIndex,currTime - payload.extractTime(msg.getData())); + i=payload.extractMsgIndex(msg.getData()); + if(i%(numMsgs/100)==0&&workerIndex==numSubs-1) { + System.out.println("sub"+workerIndex + ":" + i / (numMsgs / 100)); + System.out.println(payload.extractTime(msg.getData())); + System.out.println(currTime); + + } + if (payload.extractMsgIndex(msg.getData())+1 >= numMsgs) { bench.addSubSample(new Sample(numMsgs, size, start, System.nanoTime(), nc)); phaser.arrive(); nc.setDisconnectedCallback(null); @@ -211,13 +221,21 @@ public void runPublisher() throws Exception { try (Connection nc = Nats.connect(urls, opts)) { Payload payload=new Payload(size); final long start = System.nanoTime(); - long previousTime; + long previousTime=System.nanoTime(); + long currTime; + long currMillis; for (int i = 0; i < numMsgs; i++) { - previousTime=System.nanoTime(); sent.incrementAndGet(); - nc.publish(subject+workerIndex, payload.preparePayload(i+1)); - bench.addPubLatency(workerIndex,System.nanoTime()-previousTime); + previousTime=System.nanoTime(); + currMillis=System.currentTimeMillis(); + nc.publish(subject+workerIndex, payload.preparePayload(i,currMillis)); + currTime=System.nanoTime(); + bench.addPubLatency(workerIndex,currTime-previousTime); + if(i%(numMsgs/100)==0&&workerIndex==numPubs-1) { + System.out.println("pub"+workerIndex + ":" + i / (numMsgs / 100)); + System.out.println(currMillis); + } } nc.flush(); bench.addPubSample(new Sample(numMsgs, size, start, System.nanoTime(), nc)); @@ -256,6 +274,7 @@ public void run() throws Exception { // Now publishers for (int i = 0; i < numPubs; i++) { + System.out.println(i); phaser.register(); exec.execute(new PubWorker(phaser, numMsgs, size,i)); } diff --git a/src/test/java/io/nats/examples/Payload.java b/src/test/java/io/nats/examples/Payload.java index d26ad354a..de8e5da89 100644 --- a/src/test/java/io/nats/examples/Payload.java +++ b/src/test/java/io/nats/examples/Payload.java @@ -11,8 +11,8 @@ public Payload(int size){ buffer=ByteBuffer.allocate(Long.SIZE/Byte.SIZE); } - public byte[] preparePayload(long msgIndex) { - buffer.putLong(0, System.nanoTime()); + public byte[] preparePayload(long msgIndex,long currTime) { + buffer.putLong(0, currTime); for(int i=0;i