Skip to content

Commit

Permalink
added RingBufferEvent domain-object concepts
Browse files Browse the repository at this point in the history
  • Loading branch information
RalphSteinhagen committed Oct 28, 2020
1 parent b6d3c3a commit 6e435df
Show file tree
Hide file tree
Showing 10 changed files with 902 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package de.gsi.microservice.concepts.aggregate;

/**
* Basic filter interface description
*
* @author rstein
* N.B. while 'toString()', 'hashCode()' and 'equals()' is ubiquously defined via the Java 'Object' class, these definition are kept for symmetry with the C++ implementation
*/
public interface Filter {
/**
* reinitialises the filter to safe default values
*/
void clear();

/**
* @param other filter this filter should copy its data to
*/
void copyTo(Filter other);

boolean equals(Object other);

int hashCode();

/**
* @return filter description including internal state (if any).
*/
String toString();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package de.gsi.microservice.concepts.aggregate;

import java.util.function.Predicate;

//@FunctionalInterface
public interface FilterPredicate {
/**
* Evaluates this predicate on the given arguments.
*
* @param filterClass the filter class
* @param filterPredicate the filter predicate object
* @return {@code true} if the input arguments match the predicate, otherwise {@code false}
*/
<R extends Filter> boolean test(Class<R> filterClass, Predicate<R> filterPredicate);

// /**
// * @param other a filter predicate that will be logically-ANDed with this predicate
// * @return a composed predicate that represents the short-circuiting logical AND of this predicate and the {@code other} predicate
// */
// FilterPredicate and(FilterPredicate other);

// /**
// * Returns a predicate that represents the logical negation of this
// * predicate.
// *
// * @return a predicate that represents the logical negation of this predicate
// */
// <R extends Filter> FilterPredicate negate();

//
// /**
// * @param other a predicate that will be logically-ORed with this predicate
// * @return a composed predicate that represents the short-circuiting logical OR of this predicate and the {@code other} predicate
// * @throws NullPointerException if other is null
// */
// default FilterPredicate or(FilterPredicate other) {
// Objects.requireNonNull(other);
// return (t, u) -> test(t, u) || other.test(t, u);
// }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
package de.gsi.microservice.concepts.aggregate;

import java.io.PrintWriter;
import java.io.StringWriter;
import java.lang.reflect.InvocationTargetException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Predicate;

import com.lmax.disruptor.EventHandler;
import de.gsi.microservice.utils.SharedPointer;

public class RingBufferEvent implements FilterPredicate {
/**
* local UTC event arrival time-stamp [ms]
*/
public long arrivalTimeStamp;

/**
* reference to the parent's disruptor sequence ID number
*/
public long parentSequenceNumber;

/**
* list of known filters. N.B. this
*/
public final Filter[] filters;

/**
* domain object carried by this ring buffer event
*/
public SharedPointer<Object> payload;

/**
* collection of exceptions that have been issued while handling this RingBuffer event
*/
public final List<Throwable> throwables = new ArrayList<>();

/**
*
* @param filterConfig static filter configuration
*/
@SafeVarargs
public RingBufferEvent(final Class<? extends Filter>... filterConfig) {
this.filters = new Filter[filterConfig.length];
for (int i = 0; i < filters.length; i++) {
try {
filters[i] = filterConfig[i].getConstructor().newInstance();
} catch (InstantiationException | IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
throw new IllegalArgumentException("filter initialisations error - could not instantiate class:" + filterConfig[i], e);
}
}
clear();
}

public <T extends Filter> T getFilter(final Class<T> filterType) {
for (Filter filter : filters) {
if (filter.getClass().isAssignableFrom(filterType)) {
return filterType.cast(filter);
}
}
final StringBuilder builder = new StringBuilder();
builder.append("requested filter type '").append(filterType.getSimpleName()).append(" not part of ").append(RingBufferEvent.class.getSimpleName()).append(" definition: ");
printToStringArrayList(builder, "[", "]", (Object[]) filters);
throw new IllegalArgumentException(builder.toString());
}

public boolean matches(final Predicate<RingBufferEvent> predicate) {
return predicate.test(this);
}

public <T extends Filter> boolean matches(Class<T> filterType, final Predicate<T> predicate) {
return predicate.test(filterType.cast(getFilter(filterType)));
}

// public <T> boolean test(Class<T> filterType, final Predicate<T> predicate) {
// return predicate.test(filterType.cast(getFilter(filterType)));
// }

public final void clear() {
arrivalTimeStamp = 0L;
parentSequenceNumber = -1L;
for (Filter filter : filters) {
filter.clear();
}
throwables.clear();
if (payload != null) {
payload.release();
}
payload = null;
}

@Override
public <R extends Filter> boolean test(final Class<R> filterClass, final Predicate<R> filterPredicate) {
return filterPredicate.test(filterClass.cast(getFilter(filterClass)));
}

public String toString() {
final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX");
final StringBuilder builder = new StringBuilder();
builder.append(RingBufferEvent.class.getSimpleName()).append(": arrivalTimeStamp ").append(arrivalTimeStamp).append(" (").append(sdf.format(arrivalTimeStamp)).append(") parent sequence number: ").append(parentSequenceNumber).append(" - filter: ");
printToStringArrayList(builder, "[", "]", (Object[]) filters);
if (!throwables.isEmpty()) {
builder.append(" - exceptions (n=").append(throwables.size()).append("):\n");
for (Throwable t : throwables) {
builder.append(getPrintableStackTrace(t)).append('\n');
}
}
return builder.toString();
}

public static void printToStringArrayList(final StringBuilder builder, final String prefix, final String postFix, final Object... items) {
if (prefix != null && !prefix.isBlank()) {
builder.append(prefix);
}
boolean more = false;
for (Object o : items) {
if (more) {
builder.append(", ");
}
builder.append(o.getClass().getSimpleName()).append(':').append(o.toString());
more = true;
}
if (postFix != null && !postFix.isBlank()) {
builder.append(postFix);
}
//TODO: refactor into a common utility class
}

public static String getPrintableStackTrace(final Throwable t) {
if (t == null) {
return "<null stack trace>";
}
final StringWriter sw = new StringWriter();
t.printStackTrace(new PrintWriter(sw));
return sw.toString();
//TODO: refactor into a common utility class
}

/**
* default buffer element clearing handler
*/
public static class ClearEventHandler implements EventHandler<RingBufferEvent> {
public void onEvent(RingBufferEvent event, long sequence, boolean endOfBatch) {
event.clear();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package de.gsi.microservice.concepts.aggregate;

import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

import org.junit.jupiter.api.Test;

import de.gsi.microservice.concepts.aggregate.filter.CtxFilter;
import de.gsi.microservice.concepts.aggregate.filter.EvtTypeFilter;
import de.gsi.microservice.utils.SharedPointer;

class RingBufferEventTests {
@Test
void basicTests() {
assertDoesNotThrow(() -> new RingBufferEvent(CtxFilter.class));
assertThrows(IllegalArgumentException.class, () -> new RingBufferEvent(CtxFilter.class, BogusFilter.class));

final RingBufferEvent evt = new RingBufferEvent(CtxFilter.class);
evt.payload = new SharedPointer<>();
evt.throwables.add(new Throwable("test"));
assertNotNull(evt.toString());
assertDoesNotThrow(evt::clear);
assertEquals(0, evt.throwables.size());
assertEquals(0, evt.arrivalTimeStamp);

final long timeNowMicros = System.currentTimeMillis() * 1000;
final CtxFilter ctxFilter = evt.getFilter(CtxFilter.class);
assertNotNull(ctxFilter);
assertThrows(IllegalArgumentException.class, () -> evt.getFilter(BogusFilter.class));

ctxFilter.setSelector("FAIR.SELECTOR.C=3:S=2", timeNowMicros);
}

@Test
void basicUsageTests() {
final RingBufferEvent evt = new RingBufferEvent(CtxFilter.class, EvtTypeFilter.class);
assertNotNull(evt);
final long timeNowMicros = System.currentTimeMillis() * 1000;
evt.arrivalTimeStamp = timeNowMicros;
evt.getFilter(EvtTypeFilter.class).evtType = EvtTypeFilter.EvtType.DEVICE_DATA;
evt.getFilter(EvtTypeFilter.class).typeName = "MyDevice";
evt.getFilter(CtxFilter.class).setSelector("FAIR.SELECTOR.C=3:S=2", timeNowMicros);

evt.matches(CtxFilter.class, ctx -> {
System.err.println("received ctx = " + ctx);
return true;
});

// fall-back filter: the whole RingBufferEvent, all Filters etc are accessible
assertTrue(evt.matches(e -> e.arrivalTimeStamp == timeNowMicros));

// filter only on given filter trait - here CtxFilter
assertTrue(evt.matches(CtxFilter.class, CtxFilter.matches(3, 2)));
evt.test(CtxFilter.class, CtxFilter.matches(3, 2));

// combination of filter traits
assertTrue(evt.test(CtxFilter.class, CtxFilter.matches(3, 2)) && evt.test(EvtTypeFilter.class, dataType -> dataType.evtType == EvtTypeFilter.EvtType.DEVICE_DATA));
assertTrue(evt.test(CtxFilter.class, CtxFilter.matches(3, 2)) && evt.test(EvtTypeFilter.class, EvtTypeFilter.isDeviceData("MyDevice")));
assertTrue(evt.test(CtxFilter.class, CtxFilter.matches(3, 2).and(CtxFilter.isNewerBpcts(timeNowMicros - 1L))));
}

@Test
void testClearEventHandler() {
final RingBufferEvent evt = new RingBufferEvent(CtxFilter.class, EvtTypeFilter.class);
assertNotNull(evt);
final long timeNowMicros = System.currentTimeMillis() * 1000;
evt.arrivalTimeStamp = timeNowMicros;

assertEquals(timeNowMicros, evt.arrivalTimeStamp);
assertDoesNotThrow(RingBufferEvent.ClearEventHandler::new);

final RingBufferEvent.ClearEventHandler clearHandler = new RingBufferEvent.ClearEventHandler();
assertNotNull(clearHandler);

clearHandler.onEvent(evt, 0, false);
assertEquals(0, evt.arrivalTimeStamp);
}

@Test
void testHelper() {
assertNotNull(RingBufferEvent.getPrintableStackTrace(new Throwable("pretty print")));
assertNotNull(RingBufferEvent.getPrintableStackTrace(null));
StringBuilder builder = new StringBuilder();
assertDoesNotThrow(() -> RingBufferEvent.printToStringArrayList(builder, "[", "]", 1, 2, 3, 4));
assertDoesNotThrow(() -> RingBufferEvent.printToStringArrayList(builder, null, "]", 1, 2, 3, 4));
assertDoesNotThrow(() -> RingBufferEvent.printToStringArrayList(builder, "[", null, 1, 2, 3, 4));
assertDoesNotThrow(() -> RingBufferEvent.printToStringArrayList(builder, "", "]", 1, 2, 3, 4));
assertDoesNotThrow(() -> RingBufferEvent.printToStringArrayList(builder, "[", "", 1, 2, 3, 4));
}

private class BogusFilter implements Filter {
public BogusFilter() {
throw new IllegalStateException("should not call/use this filter");
}

@Override
public void clear() {
// never called
}

@Override
public void copyTo(final Filter other) {
// never called
}
}
}
Loading

0 comments on commit 6e435df

Please sign in to comment.