-
Notifications
You must be signed in to change notification settings - Fork 93
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add simple sample for demuxing events potentially arriving out of ord…
…er and interleaved
- Loading branch information
Showing
3 changed files
with
362 additions
and
0 deletions.
There are no files selected for viewing
117 changes: 117 additions & 0 deletions
117
...ervice/src/test/java/de/gsi/microservice/concepts/aggregate/demux/poc/DemuxProcessor.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,117 @@ | ||
package de.gsi.microservice.concepts.aggregate.demux.poc; | ||
|
||
import java.util.*; | ||
import java.util.concurrent.locks.LockSupport; | ||
|
||
import com.lmax.disruptor.*; | ||
|
||
import de.gsi.dataset.utils.Cache; | ||
import de.gsi.dataset.utils.SoftHashMap; | ||
|
||
/** | ||
* Dispatches aggregation workers upon seeing new values for a specified event field. | ||
* Each aggregation worker then assembles all events for this value and optionally publishes back an aggregated events. | ||
* If the aggregation is not completed within a configurable timeout, a partial AggregationEvent is published. | ||
* | ||
* For now events are aggregated into a list of Objects until a certain number of events is reached. | ||
* The final api should allow to specify different Objects to be placed into a result domain object. | ||
*/ | ||
public class DemuxProcessor implements SequenceReportingEventHandler<TestEventSource.IngestedEvent> { | ||
private static final int N_WORKERS = 4; // number of workers defines the maximum number of aggregate events groups which can be overlapping | ||
private static final long TIMEOUT = 400; | ||
private static final int RETENTION_SIZE = 10; | ||
private static final int N_AGG_ELEMENTS = 3; | ||
public final AggregationWorker[] workers; | ||
private final List<AggregationWorker> freeWorkers = Collections.synchronizedList(new ArrayList<>(N_WORKERS)); | ||
private final RingBuffer<TestEventSource.IngestedEvent> rb; | ||
// private Map<Long, Object> aggregatedBpcts = new SoftHashMap<>(RETENTION_SIZE); | ||
private Map<Long, Object> aggregatedBpcts = new Cache<>(RETENTION_SIZE); | ||
private Sequence seq; | ||
|
||
public DemuxProcessor(final RingBuffer<TestEventSource.IngestedEvent> ringBuffer) { | ||
rb = ringBuffer; | ||
workers = new AggregationWorker[N_WORKERS]; | ||
for (int i = 0; i < N_WORKERS; i++) { | ||
workers[i] = new AggregationWorker(); | ||
freeWorkers.add(workers[i]); | ||
} | ||
} | ||
|
||
public void onEvent(final TestEventSource.IngestedEvent event, final long nextSequence, final boolean b) { | ||
if (!(event.payload instanceof TestEventSource.Event)) { | ||
return; | ||
} | ||
final long eventBpcts = ((TestEventSource.Event) event.payload).bpcts; | ||
// final boolean alreadyScheduled = Arrays.stream(workers).filter(w -> w.bpcts == eventBpcts).findFirst().isPresent(); | ||
final boolean alreadyScheduled = aggregatedBpcts.containsKey(eventBpcts); | ||
if (alreadyScheduled) { | ||
return; | ||
} | ||
while (true) { | ||
if (!freeWorkers.isEmpty()) { | ||
final AggregationWorker freeWorker = freeWorkers.remove(0); | ||
freeWorker.bpcts = eventBpcts; | ||
freeWorker.aggStart = event.ingestionTime; | ||
aggregatedBpcts.put(eventBpcts, new Object()); | ||
seq.set(nextSequence); // advance sequence to let workers process events up to here | ||
return; | ||
} | ||
// no free worker available | ||
long waitTime = Long.MAX_VALUE; | ||
for (AggregationWorker w : workers) { | ||
final long currentTime = System.currentTimeMillis(); | ||
final long diff = currentTime - w.aggStart; | ||
waitTime = Math.min(waitTime, diff * 1000000); | ||
if (w.bpcts != -1 && diff < TIMEOUT) { | ||
w.publishAndFreeWorker(true); // timeout reached, publish partial result and free worker | ||
break; | ||
} | ||
} | ||
LockSupport.parkNanos(waitTime); | ||
} | ||
} | ||
|
||
@Override | ||
public void setSequenceCallback(final Sequence sequence) { | ||
this.seq = sequence; | ||
} | ||
|
||
public class AggregationWorker implements EventHandler<TestEventSource.IngestedEvent>, TimeoutHandler { | ||
protected volatile long bpcts = -1; // [ms] | ||
protected volatile long aggStart = -1; // [ns] | ||
private List<TestEventSource.IngestedEvent> payloads = new ArrayList<>(); | ||
|
||
@Override | ||
public void onEvent(final TestEventSource.IngestedEvent event, final long sequence, final boolean endOfBatch) { | ||
if (bpcts != -1 && event.ingestionTime > aggStart + TIMEOUT) { | ||
publishAndFreeWorker(true); | ||
return; | ||
} | ||
if (bpcts == -1 || !(event.payload instanceof TestEventSource.Event) || ((TestEventSource.Event) event.payload).bpcts != bpcts) { | ||
return; // skip irrelevant events | ||
} | ||
this.payloads.add(event); | ||
if (payloads.size() == N_AGG_ELEMENTS) { | ||
publishAndFreeWorker(false); | ||
} | ||
} | ||
|
||
protected void publishAndFreeWorker(final boolean partial) { | ||
rb.publishEvent(((event1, sequence1, arg0) -> { | ||
event1.ingestionTime = System.currentTimeMillis(); | ||
event1.payload = partial ? ("aggregation timed out for bpcts: " + bpcts + " -> ") + payloads : payloads; | ||
}), | ||
payloads); | ||
bpcts = -1; | ||
payloads = new ArrayList<>(); | ||
freeWorkers.add(this); | ||
} | ||
|
||
@Override | ||
public void onTimeout(final long sequence) { | ||
if (bpcts != -1 && System.currentTimeMillis() > aggStart + TIMEOUT) { | ||
publishAndFreeWorker(true); | ||
} | ||
} | ||
} | ||
} |
91 changes: 91 additions & 0 deletions
91
...ce/src/test/java/de/gsi/microservice/concepts/aggregate/demux/poc/DemuxProcessorTest.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,91 @@ | ||
package de.gsi.microservice.concepts.aggregate.demux.poc; | ||
|
||
import static org.hamcrest.MatcherAssert.assertThat; | ||
import static org.hamcrest.Matchers.containsInAnyOrder; | ||
import static org.junit.jupiter.api.Assertions.*; | ||
import static org.junit.jupiter.params.provider.Arguments.arguments; | ||
|
||
import java.time.Duration; | ||
import java.util.Arrays; | ||
import java.util.List; | ||
import java.util.Set; | ||
import java.util.concurrent.ConcurrentHashMap; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.stream.Collectors; | ||
import java.util.stream.Stream; | ||
|
||
import org.awaitility.Awaitility; | ||
import org.hamcrest.Matcher; | ||
import org.junit.jupiter.params.ParameterizedTest; | ||
import org.junit.jupiter.params.provider.Arguments; | ||
import org.junit.jupiter.params.provider.MethodSource; | ||
|
||
import com.lmax.disruptor.EventHandler; | ||
import com.lmax.disruptor.RingBuffer; | ||
import com.lmax.disruptor.TimeoutBlockingWaitStrategy; | ||
import com.lmax.disruptor.dsl.Disruptor; | ||
import com.lmax.disruptor.dsl.EventHandlerGroup; | ||
import com.lmax.disruptor.dsl.ProducerType; | ||
import com.lmax.disruptor.util.DaemonThreadFactory; | ||
|
||
class DemuxProcessorTest { | ||
static Stream<Arguments> workingEventSamplesProvider() { | ||
return Stream.of( | ||
arguments("ordinary", "a1 b1 c1 a2 b2 c2 a3 b3 c3", "a1 b1 c1; a2 b2 c2; a3 b3 c3", "", 1), | ||
arguments("duplicate events", "a1 b1 c1 b1 a2 b2 c2 a2 a3 b3 c3 c3", "a1 b1 c1; a2 b2 c2; a3 b3 c3", "", 1), | ||
arguments("reordered", "a1 c1 b1 a2 b2 c2 a3 b3 c3", "a1 b1 c1; a2 b2 c2; a3 b3 c3", "", 1), | ||
arguments("interleaved", "a1 b1 a2 b2 c1 a3 b3 c2 c3", "a1 b1 c1; a2 b2 c2; a3 b3 c3", "", 1), | ||
arguments("missing event", "a1 b1 a2 b2 c2 a3 b3 c3", "a2 b2 c2; a3 b3 c3", "1", 1), | ||
arguments("missing device", "a1 b1 a2 b2 a3 b3", "", "1 2 3", 1), | ||
arguments("late", "a1 b1 a2 b2 c2 a3 b3 c3 c1", "a1 b1 c1; a2 b2 c2; a3 b3 c3", "", 1), | ||
arguments("timeout without event", "a1 b1 c1 a2 b2", "a1 b1 c1", "2", 1), | ||
arguments("long queue", "a1 b1 c1 a2 b2", "a1 b1 c1; a1001 b1001 c1001; a2001 b2001 c2001; a3001 b3001 c3001; a4001 b4001 c4001", "2 1002 2002 3002 4002", 5), | ||
arguments("simple broken long queue", "a1 b1", "", "1 1001 2001 3001 4001", 5), | ||
arguments("single event timeout", "a1 b1 pause pause c1", "", "1", 1)); | ||
} | ||
|
||
@ParameterizedTest | ||
@MethodSource("workingEventSamplesProvider") | ||
void testSimpleEvents(final String eventSetupName, final String events, final String aggregatesAll, final String timeoutsAll, final int repeat) { | ||
// handler which collects all aggregate events which are republished to the buffer | ||
final Set<Set<String>> aggResults = ConcurrentHashMap.newKeySet(); | ||
final Set<Integer> aggTimeouts = ConcurrentHashMap.newKeySet(); | ||
EventHandler<TestEventSource.IngestedEvent> testHandler = (ev, seq, eob) -> { | ||
System.out.println(ev); | ||
if (ev.payload instanceof List) { | ||
final List<TestEventSource.IngestedEvent> agg = (List<TestEventSource.IngestedEvent>) ev.payload; | ||
final Set<String> payloads = agg.stream().map(e -> (String) ((TestEventSource.Event) e.payload).payload).collect(Collectors.toSet()); | ||
aggResults.add(payloads); | ||
} | ||
if (ev.payload instanceof String && ((String) ev.payload).startsWith("aggregation timed out for bpcts: ")) { | ||
final String payload = ((String) ev.payload); | ||
aggTimeouts.add(Integer.parseInt(payload.substring(33, payload.indexOf(' ', 34)))); | ||
} | ||
}; | ||
|
||
// create event ring buffer and add de-multiplexing processors | ||
final Disruptor<TestEventSource.IngestedEvent> disruptor = new Disruptor<>( | ||
TestEventSource.IngestedEvent::new, | ||
256, | ||
DaemonThreadFactory.INSTANCE, | ||
ProducerType.MULTI, | ||
new TimeoutBlockingWaitStrategy(200, TimeUnit.MILLISECONDS)); | ||
final DemuxProcessor aggProc = new DemuxProcessor(disruptor.getRingBuffer()); | ||
final EventHandlerGroup<TestEventSource.IngestedEvent> endBarrier = disruptor.handleEventsWith(testHandler).handleEventsWith(aggProc).then(aggProc.workers); | ||
RingBuffer<TestEventSource.IngestedEvent> rb = disruptor.start(); | ||
|
||
// Use event source to publish demo events to the buffer. | ||
TestEventSource testEventSource = new TestEventSource(events, repeat, rb); | ||
assertDoesNotThrow(testEventSource::run); | ||
|
||
// wait for all events to be played and processed | ||
Awaitility.await().atMost(Duration.ofSeconds(repeat)).until(() -> endBarrier.asSequenceBarrier().getCursor() == rb.getCursor() && Arrays.stream(aggProc.workers).allMatch(w -> w.bpcts == -1)); | ||
// compare aggregated results and timeouts | ||
assertThat(aggResults, containsInAnyOrder(Arrays.stream(aggregatesAll.split(";")) | ||
.filter(s -> !s.isEmpty()) | ||
.map(s -> containsInAnyOrder(Arrays.stream(s.split(" ")).map(String::trim).filter(e -> !e.isEmpty()).toArray())) | ||
.toArray(Matcher[] ::new))); | ||
System.out.println(aggTimeouts); | ||
assertThat(aggTimeouts, containsInAnyOrder(Arrays.stream(timeoutsAll.split(" ")).filter(s -> !s.isEmpty()).map(Integer::parseInt).toArray(Integer[] ::new))); | ||
} | ||
} |
154 changes: 154 additions & 0 deletions
154
...rvice/src/test/java/de/gsi/microservice/concepts/aggregate/demux/poc/TestEventSource.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,154 @@ | ||
package de.gsi.microservice.concepts.aggregate.demux.poc; | ||
|
||
import java.util.Arrays; | ||
import java.util.Iterator; | ||
|
||
import com.lmax.disruptor.RingBuffer; | ||
|
||
/** | ||
* An event Source to generate Events with different timing characteristics/orderings. | ||
*/ | ||
public class TestEventSource implements Runnable { | ||
private static final int DEFAULT_CHAIN = 3; | ||
private static final long DEFAULT_DELTA = 20; | ||
private static final long DEFAULT_PAUSE = 400; | ||
// state for the event source | ||
public final int repeat; | ||
public final String[] eventList; | ||
private final RingBuffer<IngestedEvent> ringBuffer; | ||
|
||
/** | ||
* Generate an event source which plays back the given sequence of events | ||
* | ||
* @param events A string containing a space separated list of events. first letter is type/bpcts, second is number | ||
* Optionally you can add semicolon delimited key=value pairs to assign values in of the events | ||
* @param repeat How often to repeat the given sequence (use zero value for infinite repetition) | ||
* @param rb The ring buffer to publish the event into | ||
*/ | ||
public TestEventSource(final String events, final int repeat, final RingBuffer<IngestedEvent> rb) { | ||
eventList = events.split(" "); | ||
this.repeat = repeat; | ||
this.ringBuffer = rb; | ||
} | ||
|
||
@Override | ||
public void run() { | ||
long lastEvent = System.currentTimeMillis(); | ||
long timeOffset = 0; | ||
int repetitionCount = 0; | ||
while (repeat == 0 || repeat > repetitionCount) { | ||
final Iterator<String> eventIterator = Arrays.stream(eventList).iterator(); | ||
while (!Thread.interrupted() && eventIterator.hasNext()) { | ||
final String eventToken = eventIterator.next(); | ||
final String[] tokens = eventToken.split(";"); | ||
if (tokens.length == 0 || tokens[0].isEmpty()) | ||
continue; | ||
if (tokens[0].equals("pause")) { | ||
lastEvent += DEFAULT_PAUSE; | ||
continue; | ||
} | ||
Event currentEvent = generateEventFromToken(tokens, timeOffset, lastEvent, repetitionCount); | ||
lastEvent = currentEvent.publishTime; | ||
long diff = currentEvent.publishTime - System.currentTimeMillis(); | ||
if (diff > 0) { | ||
try { | ||
Thread.sleep(diff); | ||
} catch (InterruptedException e) { | ||
e.printStackTrace(); | ||
} | ||
} | ||
ringBuffer.publishEvent((event, sequence, arg0) -> { | ||
event.ingestionTime = System.currentTimeMillis(); | ||
event.payload = arg0; | ||
}, currentEvent); | ||
} | ||
repetitionCount++; | ||
} | ||
} | ||
|
||
private Event generateEventFromToken(final String[] tokens, final long timeOffset, final long lastEvent, final int repetitionCount) { | ||
String device = tokens[0].substring(0, 1); | ||
long bpcts = Long.parseLong(tokens[0].substring(1)) + repetitionCount * 1000; | ||
int type = device.charAt(0); | ||
String payload = device + bpcts; | ||
long sourceTime = lastEvent + DEFAULT_DELTA; | ||
long publishTime = sourceTime; | ||
int chain = DEFAULT_CHAIN; | ||
for (int i = 1; i < tokens.length; i++) { | ||
String[] keyvalue = tokens[i].split("="); | ||
if (keyvalue.length != 2) | ||
continue; | ||
switch (keyvalue[0]) { | ||
case "time": | ||
sourceTime = Long.parseLong(keyvalue[1]) + timeOffset; | ||
publishTime = sourceTime; | ||
break; | ||
case "sourceTime": | ||
sourceTime = Long.parseLong(keyvalue[1]) + timeOffset; | ||
break; | ||
case "publishTime": | ||
publishTime = Long.parseLong(keyvalue[1]) + timeOffset; | ||
break; | ||
case "bpcts": | ||
bpcts = Long.parseLong(keyvalue[1]) + repetitionCount * 1000; | ||
break; | ||
case "chain": | ||
chain = Integer.parseInt(keyvalue[1]); | ||
break; | ||
case "type": | ||
type = Integer.parseInt(keyvalue[1]); | ||
break; | ||
case "device": | ||
device = keyvalue[1]; | ||
break; | ||
case "payload": | ||
payload = keyvalue[1] + "(repetition count: " + repetitionCount + ")"; | ||
break; | ||
default: | ||
throw new IllegalArgumentException("unable to process event keyvalue pair: " + keyvalue); | ||
} | ||
} | ||
return new Event(sourceTime, publishTime, bpcts, chain, type, device, payload); | ||
} | ||
|
||
/** | ||
* Mock event entry. | ||
*/ | ||
public static class Event { | ||
public long sourceTime; | ||
public long publishTime; | ||
public long bpcts; | ||
public int chain; | ||
public int type; | ||
public String device; | ||
public Object payload; | ||
|
||
public Event(final long sourceTime, final long publishTime, final long bpcts, final int chain, final int type, final String device, final Object payload) { | ||
this.sourceTime = sourceTime; | ||
this.publishTime = publishTime; | ||
this.bpcts = bpcts; | ||
this.chain = chain; | ||
this.type = type; | ||
this.device = device; | ||
this.payload = payload; | ||
} | ||
|
||
@Override | ||
public String toString() { | ||
return "Event{sourceTime=" + sourceTime + ", publishTime=" + publishTime + ", bpcts=" + bpcts + ", chain=" + chain + ", type=" + type + ", device='" + device + '\'' + ", payload=" + payload + '}'; | ||
} | ||
} | ||
|
||
/** | ||
* Basic ring buffer event | ||
*/ | ||
public static class IngestedEvent { | ||
public long ingestionTime; | ||
public Object payload; | ||
|
||
@Override | ||
public String toString() { | ||
return "IngestedEvent{ingestionTime=" + ingestionTime + ", payload=" + payload + '}'; | ||
} | ||
} | ||
} |