Skip to content

Commit

Permalink
added DisruptorTests
Browse files Browse the repository at this point in the history
  • Loading branch information
RalphSteinhagen committed Oct 26, 2020
1 parent eb7a286 commit b6d3c3a
Show file tree
Hide file tree
Showing 5 changed files with 350 additions and 13 deletions.
6 changes: 6 additions & 0 deletions microservice/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,12 @@
<version>0.5.2</version>
</dependency>

<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.4.2</version>
</dependency>

<dependency>
<groupId>org.eclipse.jetty.http2</groupId>
<artifactId>http2-server</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class BlockingQueueTests {
public class BlockingQueueTests { // NOPMD NOSONAR -- nomen est omen
private static final int N_ITERATIONS = 100_000;
private final BlockingQueue<byte[]> outputQueue = new ArrayBlockingQueue<>(N_ITERATIONS);
private final BlockingQueue<byte[]> inputQueue = new ArrayBlockingQueue<>(N_ITERATIONS);
Expand Down Expand Up @@ -49,9 +49,9 @@ public void stopWorker() {
}

public static void main(String[] argv) {
for (int nThreads : new int[] { 1, 10, 100 }) {
for (int nThreads : new int[] { 1, 2, 4, 8, 10 }) {
final BlockingQueueTests test = new BlockingQueueTests(nThreads);
System.out.println("running: " + BlockingQueueTests.class.getName() + " with nThreads = " + nThreads);
System.out.println("running: " + test.getClass().getName() + " with nThreads = " + nThreads);

measure("nThreads=" + nThreads + " sync loop - simple", 3, () -> {
test.sendMessage("testString".getBytes(StandardCharsets.ISO_8859_1));
Expand Down Expand Up @@ -80,15 +80,16 @@ public static void main(String[] argv) {
}

private static void measure(final String topic, final int nExec, final Runnable... runnable) {
final long start = System.currentTimeMillis();
final long start = System.nanoTime();

for (Runnable run : runnable) {
for (int i = 0; i < nExec; i++) {
run.run();
}
}

final long stop = System.currentTimeMillis();
System.out.printf("%-40s: %10d calls/second\n", topic, (stop - start) > 0 ? (1000 * nExec) / (stop - start) : -1);
final long stop = System.nanoTime();
final double diff = (stop - start) * 1e-9;
System.out.printf("%-40s: %10d calls/second\n", topic, diff > 0 ? (int) (nExec / diff) : -1);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package de.gsi.microservice.concepts;

import java.nio.charset.StandardCharsets;

import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import com.lmax.disruptor.util.DaemonThreadFactory;
import com.lmax.disruptor.util.Util;

public class DisruptorTests { // NOPMD NOSONAR -- nomen est omen
// https://github.com/LMAX-Exchange/disruptor/wiki/Getting-Started
private static final int N_ITERATIONS = 100_000;
private final int bufferSize = Util.ceilingNextPowerOfTwo(N_ITERATIONS); // specify the size of the ring buffer, must be power of 2.
private final Disruptor<ByteArrayEvent> disruptorOut;
private final RingBuffer<ByteArrayEvent> outputBuffer;
private final Disruptor<ByteArrayEvent> disruptorIn;
private final RingBuffer<ByteArrayEvent> inputBuffer;
private long readPosition = 0;

public DisruptorTests(int nWorkers) {
disruptorOut = new Disruptor<>(ByteArrayEvent::new, bufferSize, DaemonThreadFactory.INSTANCE, ProducerType.SINGLE, new BlockingWaitStrategy());
outputBuffer = disruptorOut.getRingBuffer();
disruptorIn = new Disruptor<>(ByteArrayEvent::new, bufferSize, DaemonThreadFactory.INSTANCE, nWorkers <= 1 ? ProducerType.SINGLE : ProducerType.MULTI, new BlockingWaitStrategy());
inputBuffer = disruptorIn.getRingBuffer();

// Connect the parallel handler
for (int i = 0; i < nWorkers; i++) {
final int threadWorkerID = i;
disruptorOut.handleEventsWith((inputEvent, sequence, endOfBatch) -> {
if (sequence % nWorkers != threadWorkerID) {
return;
}
inputBuffer.publishEvent((returnEvent, returnSequence, returnBuffer) -> returnEvent.array = inputEvent.array);
});
}
// Start the Disruptor, starts all threads running
disruptorOut.start();
disruptorIn.start();
}

public void sendMessage(final byte[] msg) {
outputBuffer.publishEvent((event, sequence, buffer) -> event.array = msg);
}

public byte[] receiveMessage() {
//System.err.println("inputBuffer.getCursor() = " + inputBuffer.getCursor());
long cursor;
while ((cursor = inputBuffer.getCursor()) < 0 || readPosition > cursor) { // NOPMD NOSONAR -- busy loop
// empty block on purpose - busy loop optimises latency
}
if (readPosition <= cursor) {
final ByteArrayEvent value = inputBuffer.get(readPosition);
readPosition++;
return value.array;
} else {
return null;
}
}

public void stopWorker() {
disruptorOut.shutdown();
disruptorIn.shutdown();
}

public static void main(String[] argv) {
for (int nThreads : new int[] { 1, 2, 4, 8, 10 }) {
final DisruptorTests test = new DisruptorTests(nThreads);
System.out.println("running: " + test.getClass().getName() + " with nThreads = " + nThreads);

measure("nThreads=" + nThreads + " sync loop - simple", 3, () -> {
test.sendMessage("testString".getBytes(StandardCharsets.ISO_8859_1));
final byte[] msg = test.receiveMessage();
assert msg != null : "message must be non-null";
});

for (int i = 0; i < 3; i++) {
measure("nThreads=" + nThreads + " sync loop#" + i, N_ITERATIONS, () -> {
test.sendMessage("testString".getBytes(StandardCharsets.ISO_8859_1));
final byte[] msg = test.receiveMessage();
assert msg != null : "message must be non-null";
});
}

for (int i = 0; i < 3; i++) {
measure("nThreads=" + nThreads + " async loop#" + i, N_ITERATIONS, () -> test.sendMessage("testString".getBytes(StandardCharsets.ISO_8859_1)), //
() -> {
final byte[] msg = test.receiveMessage();
assert msg != null : "message must be non-null";
});
}
test.stopWorker();
}
}

private static void measure(final String topic, final int nExec, final Runnable... runnable) {
final long start = System.nanoTime();

for (Runnable run : runnable) {
for (int i = 0; i < nExec; i++) {
run.run();
}
}

final long stop = System.nanoTime();
final double diff = (stop - start) * 1e-9;
System.out.printf("%-40s: %10d calls/second\n", topic, diff > 0 ? (int) (nExec / diff) : -1);
}

private class ByteArrayEvent {
public byte[] array;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
package de.gsi.microservice.concepts;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import org.jetbrains.annotations.NotNull;

public class FutureTests { // NOPMD NOSONAR -- nomen est omen
private static final int N_ITERATIONS = 100_000;
private final BlockingQueue<CustomFuture<byte[]>> outputQueue = new ArrayBlockingQueue<>(N_ITERATIONS);
private final List<Thread> workerList = new ArrayList<>();

public FutureTests(int nWorkers) {
for (int i = 0; i < nWorkers; i++) {
final int nWorker = i;
final Thread worker = new Thread() {
public void run() {
this.setName("worker#" + nWorker);
try {
while (!this.isInterrupted()) {
final CustomFuture<byte[]> msgFuture;
if (outputQueue.isEmpty()) {
msgFuture = outputQueue.take();
} else {
msgFuture = outputQueue.poll();
}
if (msgFuture == null) {
continue;
}
msgFuture.running.set(true);
if (msgFuture.payload != null) {
msgFuture.setReply(msgFuture.payload);
continue;
}
msgFuture.cancelled.set(true);
}
} catch (InterruptedException e) {
if (!outputQueue.isEmpty()) {
e.printStackTrace();
}
}
}
};

workerList.add(worker);
worker.start();
}
}

public Future<byte[]> sendMessage(final byte[] msg) {
CustomFuture<byte[]> msgFuture = new CustomFuture<>(msg);
outputQueue.offer(msgFuture);
return msgFuture;
}

public void stopWorker() {
workerList.forEach(Thread::interrupt);
}

public static void main(String[] argv) {
for (int nThreads : new int[] { 1, 2, 4, 8, 10 }) {
final FutureTests test = new FutureTests(nThreads);
System.out.println("running: " + test.getClass().getName() + " with nThreads = " + nThreads);

measure("nThreads=" + nThreads + " sync loop - simple", 3, () -> {
final Future<byte[]> reply = test.sendMessage("testString".getBytes(StandardCharsets.ISO_8859_1));
try {
final byte[] msg = reply.get();
assert msg != null : "message must be non-null";
System.out.println("received = " + (new String(msg, StandardCharsets.ISO_8859_1)));
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
});

for (int i = 0; i < 3; i++) {
measure("nThreads=" + nThreads + " sync loop#" + i, N_ITERATIONS, () -> {
final Future<byte[]> reply = test.sendMessage("testString".getBytes(StandardCharsets.ISO_8859_1));
try {
final byte[] msg = reply.get();
assert msg != null : "message must be non-null";
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
});
}

for (int i = 0; i < 3; i++) {
measureAsync("nThreads=" + nThreads + " async loop#" + i, N_ITERATIONS,
() -> {
final List<Future<byte[]>> replies = new ArrayList<>(N_ITERATIONS);
for (int k = 0; k < N_ITERATIONS; k++) {
replies.add(test.sendMessage("testString".getBytes(StandardCharsets.ISO_8859_1)));
}
assert replies.size() == N_ITERATIONS : "did not receive sufficient events";
replies.forEach(reply -> {
try {
final byte[] msg = reply.get();
assert msg != null : "message must be non-null";
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
});
});
}
test.stopWorker();
}
}

private static void measure(final String topic, final int nExec, final Runnable... runnable) {
final long start = System.nanoTime();

for (Runnable run : runnable) {
for (int i = 0; i < nExec; i++) {
run.run();
}
}

final long stop = System.nanoTime();
final double diff = (stop - start) * 1e-9;
System.out.printf("%-40s: %10d calls/second\n", topic, diff > 0 ? (int) (nExec / diff) : -1);
}

private static void measureAsync(final String topic, final int nExec, final Runnable... runnable) {
final long start = System.nanoTime();

for (Runnable run : runnable) {
run.run();
}

final long stop = System.nanoTime();
final double diff = (stop - start) * 1e-9;
System.out.printf("%-40s: %10d calls/second\n", topic, diff > 0 ? (int) (nExec / diff) : -1);
}

private class CustomFuture<T> implements Future<T> {
private final Lock lock = new ReentrantLock();
private final Condition processorNotifyCondition = lock.newCondition();
private final AtomicBoolean running = new AtomicBoolean(false);
private final AtomicBoolean requestCancel = new AtomicBoolean(false);
private final AtomicBoolean cancelled = new AtomicBoolean(false);
private final T payload;
private T reply = null;

private CustomFuture(final T input) {
this.payload = input;
}

@Override
public boolean cancel(final boolean mayInterruptIfRunning) {
if (!running.get()) {
cancelled.set(true);
return !requestCancel.getAndSet(true);
}
return false;
}

@Override
public T get() throws InterruptedException, ExecutionException {
return get(0, TimeUnit.NANOSECONDS);
}

@Override
public T get(final long timeout, @NotNull final TimeUnit unit) throws InterruptedException {
if (isDone()) {
return reply;
}
lock.lock();
try {
while (!isDone()) {
processorNotifyCondition.await(timeout, TimeUnit.NANOSECONDS);
}
} finally {
lock.unlock();
}
return reply;
}

@Override
public boolean isCancelled() {
return cancelled.get();
}

@Override
public boolean isDone() {
return (reply != null && !running.get()) || cancelled.get();
}

public void setReply(final T newValue) {
if (running.getAndSet(false)) {
this.reply = newValue;
}
notifyListener();
}

private void notifyListener() {
lock.lock();
try {
processorNotifyCondition.signalAll();
} finally {
lock.unlock();
}
}
}
}
Loading

0 comments on commit b6d3c3a

Please sign in to comment.