Skip to content

Commit

Permalink
Add simple sample for demuxing events potentially arriving out of ord…
Browse files Browse the repository at this point in the history
…er and interleaved
  • Loading branch information
wirew0rm committed Nov 3, 2020
1 parent 6e435df commit 44a8d6a
Show file tree
Hide file tree
Showing 3 changed files with 359 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package de.gsi.microservice.concepts.aggregate.demux.poc;

import java.util.*;

import com.lmax.disruptor.*;

/**
* Dispatches aggregation workers upon seeing new values for a specified event field.
* Each aggregation worker then assembles all events for this value and optionally publishes back an aggregated events.
* If the aggregation is not completed within a configurable timeout, a partial AggregationEvent is published.
*
* For now events are aggregated into a list of Objects until a certain number of events is reached.
* The final api should allow to specify different Objects to be placed into a result domain object.
*
* TODO: build api with lambdas (aggregateType, input Type => (input, aggregate) -> add input to the aggregate if valid, timeout, () -> TimeoutEvent)
*/
public class DemuxProcessor implements SequenceReportingEventHandler<TestEventSource.IngestedEvent> {
private static final int N_WORKERS = 4; // number of workers defines the maximum number of aggregate events groups which can be overlapping
private static final long TIMEOUT = 400;
public AggregationWorker[] workers;
private final RingBuffer<TestEventSource.IngestedEvent> rb;
private Set<Long> aggregatedBpcts = new HashSet<>(); // TODO: evict old entries?
private Sequence seq;

public DemuxProcessor(final RingBuffer<TestEventSource.IngestedEvent> ringBuffer) {
rb = ringBuffer;
workers = new AggregationWorker[N_WORKERS];
for (int i = 0; i < N_WORKERS; i++) {
workers[i] = new AggregationWorker();
}
}

public void onEvent(final TestEventSource.IngestedEvent event, final long nextSequence, final boolean b) {
if (!(event.payload instanceof TestEventSource.Event)) {
return;
}
final long eventBpcts = ((TestEventSource.Event) event.payload).bpcts;
// final boolean alreadyScheduled = Arrays.stream(workers).filter(w -> w.bpcts == eventBpcts).findFirst().isPresent();
final boolean alreadyScheduled = aggregatedBpcts.contains(eventBpcts);
if (alreadyScheduled) {
return;
}
while (true) {
final Optional<AggregationWorker> freeWorker = Arrays.stream(workers).filter(w -> w.bpcts == -1).findFirst();
if (freeWorker.isPresent()) {
freeWorker.get().bpcts = eventBpcts;
freeWorker.get().aggStart = event.ingestionTime;
aggregatedBpcts.add(eventBpcts);
seq.set(nextSequence); // advance sequence to let workers process events up to here
return;
}
Thread.yield();
// timouts on other workers will not be called while this worker is blocked? :/
// after some time, timeout oldest aggregation? THIS WILL HAVE TO BE MADE THREAD SAFE!
final long currentTime = System.currentTimeMillis();
Arrays.stream(workers).filter(w -> w.bpcts != -1 && w.aggStart + TIMEOUT < currentTime).forEach(w -> {
rb.publishEvent(((event1, sequence1, arg0) -> {
event1.ingestionTime = System.currentTimeMillis();
event1.payload = "aggregation timed out for bpcts: " + w.bpcts + " -> " + w.payloads;
}),
w.payloads);
w.bpcts = -1;
w.payloads = new ArrayList<>();
});
}
//throw new IllegalStateException("No free workers, todo: implement strategy"); // timeout oldest aggregation?
}

@Override
public void setSequenceCallback(final Sequence sequence) {
this.seq = sequence;
}

public class AggregationWorker implements EventHandler<TestEventSource.IngestedEvent>, TimeoutHandler {
protected volatile long bpcts = -1; // [ms]
private volatile long aggStart = -1; // [ns]
private List<TestEventSource.IngestedEvent> payloads = new ArrayList<>();

@Override
public void onEvent(final TestEventSource.IngestedEvent event, final long sequence, final boolean endOfBatch) {
if (bpcts != -1 && event.ingestionTime > aggStart + TIMEOUT) {
rb.publishEvent(((event1, sequence1, arg0) -> {
event1.ingestionTime = System.currentTimeMillis();
event1.payload = "aggregation timed out for bpcts: " + bpcts + " -> " + payloads;
}),
payloads);
bpcts = -1;
payloads = new ArrayList<>();
return;
}
if (bpcts == -1 || !(event.payload instanceof TestEventSource.Event) || ((TestEventSource.Event) event.payload).bpcts != bpcts) {
return; // skip irrelevant events
}
this.payloads.add(event);
if (payloads.size() == 3) {
rb.publishEvent(((event1, sequence1, arg0) -> {
event1.ingestionTime = System.currentTimeMillis();
event1.payload = payloads;
}),
payloads);
bpcts = -1;
payloads = new ArrayList<>();
}
}

@Override
public void onTimeout(final long sequence) {
if (bpcts != -1 && System.currentTimeMillis() > aggStart + TIMEOUT) {
rb.publishEvent(((event1, sequence1, arg0) -> {
event1.ingestionTime = System.currentTimeMillis();
event1.payload = "aggregation timed out for bpcts: " + bpcts + " -> " + payloads;
}),
payloads);
bpcts = -1;
payloads = new ArrayList<>();
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package de.gsi.microservice.concepts.aggregate.demux.poc;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.jupiter.api.Assertions.*;
import static org.junit.jupiter.params.provider.Arguments.arguments;

import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.awaitility.Awaitility;
import org.hamcrest.Matcher;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.TimeoutBlockingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.EventHandlerGroup;
import com.lmax.disruptor.dsl.ProducerType;
import com.lmax.disruptor.util.DaemonThreadFactory;

class DemuxProcessorTest {
static Stream<Arguments> workingEventSamplesProvider() {
return Stream.of(
arguments("ordinary", "a1 b1 c1 a2 b2 c2 a3 b3 c3", "a1 b1 c1; a2 b2 c2; a3 b3 c3", "", 1),
arguments("duplicate events", "a1 b1 c1 b1 a2 b2 c2 a2 a3 b3 c3 c3", "a1 b1 c1; a2 b2 c2; a3 b3 c3", "", 1),
arguments("reordered", "a1 c1 b1 a2 b2 c2 a3 b3 c3", "a1 b1 c1; a2 b2 c2; a3 b3 c3", "", 1),
arguments("interleaved", "a1 b1 a2 b2 c1 a3 b3 c2 c3", "a1 b1 c1; a2 b2 c2; a3 b3 c3", "", 1),
arguments("missing event", "a1 b1 a2 b2 c2 a3 b3 c3", "a2 b2 c2; a3 b3 c3", "1", 1),
arguments("missing device", "a1 b1 a2 b2 a3 b3", "", "1 2 3", 1),
arguments("late", "a1 b1 a2 b2 c2 a3 b3 c3 c1", "a1 b1 c1; a2 b2 c2; a3 b3 c3", "", 1),
arguments("timeout without event", "a1 b1 c1 a2 b2", "a1 b1 c1", "2", 1),
arguments("long queue", "a1 b1 c1 a2 b2", "a1 b1 c1; a1001 b1001 c1001; a2001 b2001 c2001; a3001 b3001 c3001; a4001 b4001 c4001", "2 1002 2002 3002 4002", 5),
arguments("single event timeout", "a1 b1 pause pause c1", "", "1", 1));
}

@ParameterizedTest
@MethodSource("workingEventSamplesProvider")
void testSimpleEvents(final String eventSetupName, final String events, final String aggregatesAll, final String timeoutsAll, final int repeat) {
// handler which collects all aggregate events which are republished to the buffer
final Set<Set<String>> aggResults = ConcurrentHashMap.newKeySet();
final Set<Integer> aggTimeouts = ConcurrentHashMap.newKeySet();
EventHandler<TestEventSource.IngestedEvent> testHandler = (ev, seq, eob) -> {
System.out.println(ev);
if (ev.payload instanceof List) {
final List<TestEventSource.IngestedEvent> agg = (List<TestEventSource.IngestedEvent>) ev.payload;
final Set<String> payloads = agg.stream().map(e -> (String) ((TestEventSource.Event) e.payload).payload).collect(Collectors.toSet());
aggResults.add(payloads);
}
if (ev.payload instanceof String && ((String) ev.payload).startsWith("aggregation timed out for bpcts: ")) {
final String payload = ((String) ev.payload);
aggTimeouts.add(Integer.parseInt(payload.substring(33, payload.indexOf(' ', 34))));
}
};

// create event ring buffer and add de-multiplexing processors
final Disruptor<TestEventSource.IngestedEvent> disruptor = new Disruptor<>(
TestEventSource.IngestedEvent::new,
256,
DaemonThreadFactory.INSTANCE,
ProducerType.MULTI,
new TimeoutBlockingWaitStrategy(200, TimeUnit.MILLISECONDS));
final DemuxProcessor aggProc = new DemuxProcessor(disruptor.getRingBuffer());
final EventHandlerGroup<TestEventSource.IngestedEvent> endBarrier = disruptor.handleEventsWith(testHandler).handleEventsWith(aggProc).then(aggProc.workers);
RingBuffer<TestEventSource.IngestedEvent> rb = disruptor.start();

// Use event source to publish demo events to the buffer.
TestEventSource testEventSource = new TestEventSource(events, repeat, rb);
assertDoesNotThrow(testEventSource::run);

// wait for all events to be played and processed
Awaitility.await().atMost(Duration.ofSeconds(repeat)).until(() -> endBarrier.asSequenceBarrier().getCursor() == rb.getCursor() && Arrays.stream(aggProc.workers).allMatch(w -> w.bpcts == -1));
// compare aggregated results and timeouts
assertThat(aggResults, containsInAnyOrder(Arrays.stream(aggregatesAll.split(";"))
.filter(s -> !s.isEmpty())
.map(s -> containsInAnyOrder(Arrays.stream(s.split(" ")).map(String::trim).filter(e -> !e.isEmpty()).toArray()))
.toArray(Matcher[] ::new)));
System.out.println(aggTimeouts);
assertThat(aggTimeouts, containsInAnyOrder(Arrays.stream(timeoutsAll.split(" ")).filter(s -> !s.isEmpty()).map(Integer::parseInt).toArray(Integer[] ::new)));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
package de.gsi.microservice.concepts.aggregate.demux.poc;

import java.util.Arrays;
import java.util.Iterator;

import com.lmax.disruptor.RingBuffer;

/**
* An event Source to generate Events with different timing characteristics/orderings.
*/
public class TestEventSource implements Runnable {
private static final int DEFAULT_CHAIN = 3;
private static final long DEFAULT_DELTA = 20;
private static final long DEFAULT_PAUSE = 400;
// state for the event source
public final int repeat;
public final String[] eventList;
private final RingBuffer<IngestedEvent> ringBuffer;

/**
* Generate an event source which plays back the given sequence of events
*
* @param events A string containing a space separated list of events. first letter is type/bpcts, second is number
* Optionally you can add semicolon delimited key=value pairs to assign values in of the events
* @param repeat How often to repeat the given sequence (use zero value for infinite repetition)
* @param rb The ring buffer to publish the event into
*/
public TestEventSource(final String events, final int repeat, final RingBuffer<IngestedEvent> rb) {
eventList = events.split(" ");
this.repeat = repeat;
this.ringBuffer = rb;
}

@Override
public void run() {
long startTime = System.currentTimeMillis();
long lastEvent = startTime;
long timeOffset = 0;
int repetitionCount = 0;
while (repeat == 0 || repeat > repetitionCount) {
final Iterator<String> eventIterator = Arrays.stream(eventList).iterator();
while (!Thread.interrupted() && eventIterator.hasNext()) {
final String eventToken = eventIterator.next();
final String[] tokens = eventToken.split(";");
if (tokens.length == 0 || tokens[0].isEmpty())
continue;
if (tokens[0].equals("pause")) {
lastEvent += DEFAULT_PAUSE;
continue;
}
String device = tokens[0].substring(0, 1);
long bpcts = Long.parseLong(tokens[0].substring(1)) + repetitionCount * 1000;
int type = device.charAt(0);
String payload = device + bpcts;
long sourceTime = lastEvent + DEFAULT_DELTA;
long publishTime = sourceTime;
int chain = DEFAULT_CHAIN;
for (int i = 1; i < tokens.length; i++) {
String[] keyvalue = tokens[i].split("=");
if (keyvalue.length != 2)
continue;
switch (keyvalue[0]) {
case "time":
sourceTime = Long.parseLong(keyvalue[1]) + timeOffset;
publishTime = sourceTime;
break;
case "sourceTime":
sourceTime = Long.parseLong(keyvalue[1]) + timeOffset;
break;
case "publishTime":
publishTime = Long.parseLong(keyvalue[1]) + timeOffset;
break;
case "bpcts":
bpcts = Long.parseLong(keyvalue[1]) + repetitionCount * 1000;
break;
case "chain":
chain = Integer.parseInt(keyvalue[1]);
break;
case "type":
type = Integer.parseInt(keyvalue[1]);
break;
case "device":
device = keyvalue[1];
break;
case "payload":
payload = keyvalue[1] + "(repetition count: " + repetitionCount + ")";
break;
}
lastEvent = publishTime;
}
final Event currentEvent = new Event(sourceTime, publishTime, bpcts, chain, type, device, payload);
long diff = publishTime - System.currentTimeMillis();
if (diff > 0) {
try {
Thread.sleep(diff);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
ringBuffer.publishEvent((event, sequence, arg0) -> {
event.ingestionTime = System.currentTimeMillis();
event.payload = arg0;
}, currentEvent);
}
repetitionCount++;
}
}

/**
* Mock event entry.
*/
public static class Event {
public long sourceTime;
public long publishTime;
public long bpcts;
public int chain;
public int type;
public String device;
public Object payload;

public Event(final long sourceTime, final long publishTime, final long bpcts, final int chain, final int type, final String device, final Object payload) {
this.sourceTime = sourceTime;
this.publishTime = publishTime;
this.bpcts = bpcts;
this.chain = chain;
this.type = type;
this.device = device;
this.payload = payload;
}

@Override
public String toString() {
return "Event{sourceTime=" + sourceTime + ", publishTime=" + publishTime + ", bpcts=" + bpcts + ", chain=" + chain + ", type=" + type + ", device='" + device + '\'' + ", payload=" + payload + '}';
}
}

/**
* Basic ring buffer event
*/
public static class IngestedEvent {
public long ingestionTime;
public Object payload;

@Override
public String toString() {
return "IngestedEvent{"
+ "ingestionTime=" + ingestionTime + ", payload=" + payload + '}';
}
}
}

0 comments on commit 44a8d6a

Please sign in to comment.