Skip to content

Commit

Permalink
Add simple sample for demuxing events potentially arriving out of ord…
Browse files Browse the repository at this point in the history
…er and interleaved
  • Loading branch information
wirew0rm authored and RalphSteinhagen committed Nov 3, 2020
1 parent 6e435df commit c779d5b
Show file tree
Hide file tree
Showing 3 changed files with 361 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package de.gsi.microservice.concepts.aggregate.demux.poc;

import java.util.*;
import java.util.concurrent.locks.LockSupport;

import com.lmax.disruptor.*;

import de.gsi.dataset.utils.Cache;

/**
* Dispatches aggregation workers upon seeing new values for a specified event field.
* Each aggregation worker then assembles all events for this value and optionally publishes back an aggregated events.
* If the aggregation is not completed within a configurable timeout, a partial AggregationEvent is published.
*
* For now events are aggregated into a list of Objects until a certain number of events is reached.
* The final api should allow to specify different Objects to be placed into a result domain object.
*/
public class DemuxProcessor implements SequenceReportingEventHandler<TestEventSource.IngestedEvent> {
private static final int N_WORKERS = 4; // number of workers defines the maximum number of aggregate events groups which can be overlapping
private static final long TIMEOUT = 400;
private static final int RETENTION_SIZE = 10;
private static final int N_AGG_ELEMENTS = 3;
public final AggregationWorker[] workers;
private final List<AggregationWorker> freeWorkers = Collections.synchronizedList(new ArrayList<>(N_WORKERS));
private final RingBuffer<TestEventSource.IngestedEvent> rb;
// private Map<Long, Object> aggregatedBpcts = new SoftHashMap<>(RETENTION_SIZE);
private Map<Long, Object> aggregatedBpcts = new Cache<>(RETENTION_SIZE);
private Sequence seq;

public DemuxProcessor(final RingBuffer<TestEventSource.IngestedEvent> ringBuffer) {
rb = ringBuffer;
workers = new AggregationWorker[N_WORKERS];
for (int i = 0; i < N_WORKERS; i++) {
workers[i] = new AggregationWorker();
freeWorkers.add(workers[i]);
}
}

public void onEvent(final TestEventSource.IngestedEvent event, final long nextSequence, final boolean b) {
if (!(event.payload instanceof TestEventSource.Event)) {
return;
}
final long eventBpcts = ((TestEventSource.Event) event.payload).bpcts;
// final boolean alreadyScheduled = Arrays.stream(workers).filter(w -> w.bpcts == eventBpcts).findFirst().isPresent();
final boolean alreadyScheduled = aggregatedBpcts.containsKey(eventBpcts);
if (alreadyScheduled) {
return;
}
while (true) {
if (!freeWorkers.isEmpty()) {
final AggregationWorker freeWorker = freeWorkers.remove(0);
freeWorker.bpcts = eventBpcts;
freeWorker.aggStart = event.ingestionTime;
aggregatedBpcts.put(eventBpcts, new Object());
seq.set(nextSequence); // advance sequence to let workers process events up to here
return;
}
// no free worker available
long waitTime = Long.MAX_VALUE;
for (AggregationWorker w : workers) {
final long currentTime = System.currentTimeMillis();
final long diff = currentTime - w.aggStart;
waitTime = Math.min(waitTime, diff * 1000000);
if (w.bpcts != -1 && diff < TIMEOUT) {
w.publishAndFreeWorker(true); // timeout reached, publish partial result and free worker
break;
}
}
LockSupport.parkNanos(waitTime);
}
}

@Override
public void setSequenceCallback(final Sequence sequence) {
this.seq = sequence;
}

public class AggregationWorker implements EventHandler<TestEventSource.IngestedEvent>, TimeoutHandler {
protected volatile long bpcts = -1; // [ms]
protected volatile long aggStart = -1; // [ns]
private List<TestEventSource.IngestedEvent> payloads = new ArrayList<>();

@Override
public void onEvent(final TestEventSource.IngestedEvent event, final long sequence, final boolean endOfBatch) {
if (bpcts != -1 && event.ingestionTime > aggStart + TIMEOUT) {
publishAndFreeWorker(true);
return;
}
if (bpcts == -1 || !(event.payload instanceof TestEventSource.Event) || ((TestEventSource.Event) event.payload).bpcts != bpcts) {
return; // skip irrelevant events
}
this.payloads.add(event);
if (payloads.size() == N_AGG_ELEMENTS) {
publishAndFreeWorker(false);
}
}

protected void publishAndFreeWorker(final boolean partial) {
rb.publishEvent(((event1, sequence1, arg0) -> {
event1.ingestionTime = System.currentTimeMillis();
event1.payload = partial ? ("aggregation timed out for bpcts: " + bpcts + " -> ") + payloads : payloads;
}),
payloads);
bpcts = -1;
payloads = new ArrayList<>();
freeWorkers.add(this);
}

@Override
public void onTimeout(final long sequence) {
if (bpcts != -1 && System.currentTimeMillis() > aggStart + TIMEOUT) {
publishAndFreeWorker(true);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package de.gsi.microservice.concepts.aggregate.demux.poc;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.jupiter.api.Assertions.*;
import static org.junit.jupiter.params.provider.Arguments.arguments;

import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.awaitility.Awaitility;
import org.hamcrest.Matcher;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.TimeoutBlockingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.EventHandlerGroup;
import com.lmax.disruptor.dsl.ProducerType;
import com.lmax.disruptor.util.DaemonThreadFactory;

class DemuxProcessorTest {
static Stream<Arguments> workingEventSamplesProvider() {
return Stream.of(
arguments("ordinary", "a1 b1 c1 a2 b2 c2 a3 b3 c3", "a1 b1 c1; a2 b2 c2; a3 b3 c3", "", 1),
arguments("duplicate events", "a1 b1 c1 b1 a2 b2 c2 a2 a3 b3 c3 c3", "a1 b1 c1; a2 b2 c2; a3 b3 c3", "", 1),
arguments("reordered", "a1 c1 b1 a2 b2 c2 a3 b3 c3", "a1 b1 c1; a2 b2 c2; a3 b3 c3", "", 1),
arguments("interleaved", "a1 b1 a2 b2 c1 a3 b3 c2 c3", "a1 b1 c1; a2 b2 c2; a3 b3 c3", "", 1),
arguments("missing event", "a1 b1 a2 b2 c2 a3 b3 c3", "a2 b2 c2; a3 b3 c3", "1", 1),
arguments("missing device", "a1 b1 a2 b2 a3 b3", "", "1 2 3", 1),
arguments("late", "a1 b1 a2 b2 c2 a3 b3 c3 c1", "a1 b1 c1; a2 b2 c2; a3 b3 c3", "", 1),
arguments("timeout without event", "a1 b1 c1 a2 b2", "a1 b1 c1", "2", 1),
arguments("long queue", "a1 b1 c1 a2 b2", "a1 b1 c1; a1001 b1001 c1001; a2001 b2001 c2001; a3001 b3001 c3001; a4001 b4001 c4001", "2 1002 2002 3002 4002", 5),
arguments("simple broken long queue", "a1 b1", "", "1 1001 2001 3001 4001", 5),
arguments("single event timeout", "a1 b1 pause pause c1", "", "1", 1));
}

@ParameterizedTest
@MethodSource("workingEventSamplesProvider")
void testSimpleEvents(final String eventSetupName, final String events, final String aggregatesAll, final String timeoutsAll, final int repeat) {
// handler which collects all aggregate events which are republished to the buffer
final Set<Set<String>> aggResults = ConcurrentHashMap.newKeySet();
final Set<Integer> aggTimeouts = ConcurrentHashMap.newKeySet();
EventHandler<TestEventSource.IngestedEvent> testHandler = (ev, seq, eob) -> {
System.out.println(ev);
if (ev.payload instanceof List) {
final List<TestEventSource.IngestedEvent> agg = (List<TestEventSource.IngestedEvent>) ev.payload;
final Set<String> payloads = agg.stream().map(e -> (String) ((TestEventSource.Event) e.payload).payload).collect(Collectors.toSet());
aggResults.add(payloads);
}
if (ev.payload instanceof String && ((String) ev.payload).startsWith("aggregation timed out for bpcts: ")) {
final String payload = ((String) ev.payload);
aggTimeouts.add(Integer.parseInt(payload.substring(33, payload.indexOf(' ', 34))));
}
};

// create event ring buffer and add de-multiplexing processors
final Disruptor<TestEventSource.IngestedEvent> disruptor = new Disruptor<>(
TestEventSource.IngestedEvent::new,
256,
DaemonThreadFactory.INSTANCE,
ProducerType.MULTI,
new TimeoutBlockingWaitStrategy(200, TimeUnit.MILLISECONDS));
final DemuxProcessor aggProc = new DemuxProcessor(disruptor.getRingBuffer());
final EventHandlerGroup<TestEventSource.IngestedEvent> endBarrier = disruptor.handleEventsWith(testHandler).handleEventsWith(aggProc).then(aggProc.workers);
RingBuffer<TestEventSource.IngestedEvent> rb = disruptor.start();

// Use event source to publish demo events to the buffer.
TestEventSource testEventSource = new TestEventSource(events, repeat, rb);
assertDoesNotThrow(testEventSource::run);

// wait for all events to be played and processed
Awaitility.await().atMost(Duration.ofSeconds(repeat)).until(() -> endBarrier.asSequenceBarrier().getCursor() == rb.getCursor() && Arrays.stream(aggProc.workers).allMatch(w -> w.bpcts == -1));
// compare aggregated results and timeouts
assertThat(aggResults, containsInAnyOrder(Arrays.stream(aggregatesAll.split(";"))
.filter(s -> !s.isEmpty())
.map(s -> containsInAnyOrder(Arrays.stream(s.split(" ")).map(String::trim).filter(e -> !e.isEmpty()).toArray()))
.toArray(Matcher[] ::new)));
System.out.println(aggTimeouts);
assertThat(aggTimeouts, containsInAnyOrder(Arrays.stream(timeoutsAll.split(" ")).filter(s -> !s.isEmpty()).map(Integer::parseInt).toArray(Integer[] ::new)));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
package de.gsi.microservice.concepts.aggregate.demux.poc;

import java.util.Arrays;
import java.util.Iterator;

import com.lmax.disruptor.RingBuffer;

/**
* An event Source to generate Events with different timing characteristics/orderings.
*/
public class TestEventSource implements Runnable {
private static final int DEFAULT_CHAIN = 3;
private static final long DEFAULT_DELTA = 20;
private static final long DEFAULT_PAUSE = 400;
// state for the event source
public final int repeat;
public final String[] eventList;
private final RingBuffer<IngestedEvent> ringBuffer;

/**
* Generate an event source which plays back the given sequence of events
*
* @param events A string containing a space separated list of events. first letter is type/bpcts, second is number
* Optionally you can add semicolon delimited key=value pairs to assign values in of the events
* @param repeat How often to repeat the given sequence (use zero value for infinite repetition)
* @param rb The ring buffer to publish the event into
*/
public TestEventSource(final String events, final int repeat, final RingBuffer<IngestedEvent> rb) {
eventList = events.split(" ");
this.repeat = repeat;
this.ringBuffer = rb;
}

@Override
public void run() {
long lastEvent = System.currentTimeMillis();
long timeOffset = 0;
int repetitionCount = 0;
while (repeat == 0 || repeat > repetitionCount) {
final Iterator<String> eventIterator = Arrays.stream(eventList).iterator();
while (!Thread.interrupted() && eventIterator.hasNext()) {
final String eventToken = eventIterator.next();
final String[] tokens = eventToken.split(";");
if (tokens.length == 0 || tokens[0].isEmpty())
continue;
if (tokens[0].equals("pause")) {
lastEvent += DEFAULT_PAUSE;
continue;
}
Event currentEvent = generateEventFromToken(tokens, timeOffset, lastEvent, repetitionCount);
lastEvent = currentEvent.publishTime;
long diff = currentEvent.publishTime - System.currentTimeMillis();
if (diff > 0) {
try {
Thread.sleep(diff);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
ringBuffer.publishEvent((event, sequence, arg0) -> {
event.ingestionTime = System.currentTimeMillis();
event.payload = arg0;
}, currentEvent);
}
repetitionCount++;
}
}

private Event generateEventFromToken(final String[] tokens, final long timeOffset, final long lastEvent, final int repetitionCount) {
String device = tokens[0].substring(0, 1);
long bpcts = Long.parseLong(tokens[0].substring(1)) + repetitionCount * 1000;
int type = device.charAt(0);
String payload = device + bpcts;
long sourceTime = lastEvent + DEFAULT_DELTA;
long publishTime = sourceTime;
int chain = DEFAULT_CHAIN;
for (int i = 1; i < tokens.length; i++) {
String[] keyvalue = tokens[i].split("=");
if (keyvalue.length != 2)
continue;
switch (keyvalue[0]) {
case "time":
sourceTime = Long.parseLong(keyvalue[1]) + timeOffset;
publishTime = sourceTime;
break;
case "sourceTime":
sourceTime = Long.parseLong(keyvalue[1]) + timeOffset;
break;
case "publishTime":
publishTime = Long.parseLong(keyvalue[1]) + timeOffset;
break;
case "bpcts":
bpcts = Long.parseLong(keyvalue[1]) + repetitionCount * 1000;
break;
case "chain":
chain = Integer.parseInt(keyvalue[1]);
break;
case "type":
type = Integer.parseInt(keyvalue[1]);
break;
case "device":
device = keyvalue[1];
break;
case "payload":
payload = keyvalue[1] + "(repetition count: " + repetitionCount + ")";
break;
default:
throw new IllegalArgumentException("unable to process event keyvalue pair: " + keyvalue);
}
}
return new Event(sourceTime, publishTime, bpcts, chain, type, device, payload);
}

/**
* Mock event entry.
*/
public static class Event {
public long sourceTime;
public long publishTime;
public long bpcts;
public int chain;
public int type;
public String device;
public Object payload;

public Event(final long sourceTime, final long publishTime, final long bpcts, final int chain, final int type, final String device, final Object payload) {
this.sourceTime = sourceTime;
this.publishTime = publishTime;
this.bpcts = bpcts;
this.chain = chain;
this.type = type;
this.device = device;
this.payload = payload;
}

@Override
public String toString() {
return "Event{sourceTime=" + sourceTime + ", publishTime=" + publishTime + ", bpcts=" + bpcts + ", chain=" + chain + ", type=" + type + ", device='" + device + '\'' + ", payload=" + payload + '}';
}
}

/**
* Basic ring buffer event
*/
public static class IngestedEvent {
public long ingestionTime;
public Object payload;

@Override
public String toString() {
return "IngestedEvent{ingestionTime=" + ingestionTime + ", payload=" + payload + '}';
}
}
}

0 comments on commit c779d5b

Please sign in to comment.