-
Notifications
You must be signed in to change notification settings - Fork 3
Conversation
...r/src/main/java/com/hotels/shunting/yard/replicator/exec/messaging/MessageReaderAdapter.java
Outdated
Show resolved
Hide resolved
MessageReaderFactory messaReaderFactory = MessageReaderFactory | ||
.newInstance(messageReaderConfig.getMessageReaderFactoryClass()); | ||
MessageReader messageReader = messaReaderFactory.newInstance(replicaHiveConf, sqsMessageSerDe); | ||
return new MessageReaderAdapter(messageReader, sourceCatalog.getHiveMetastoreUris()); | ||
return new MessageReaderAdapter(messageReader, sourceCatalog.getHiveMetastoreUris(), tableSelector); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think if possible the Adapter should not know about the filtering of tables it should get ListenerEvents that are valid for processing. Perhaps we need to look at a FilteringMessageReader
that filters out the stream of event. Can be a decarator on whaterever MessageReader implementation we use currently?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the idea of using a decorator to filter out the tables we're not interested in.
@@ -33,8 +33,8 @@ fi | |||
LIBFB303_JAR=`ls $HIVE_LIB/libfb303-*.jar | tr '\n' ':'` | |||
|
|||
SHUNTING_YARD_LIBS=$SHUNTING_YARD_HOME/lib/*\ | |||
:$HIVE_LIB/hive-exec.jar\ | |||
:$HIVE_LIB/hive-metastore.jar\ | |||
:$HIVE_LIB/hive-exec*.jar\ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need this? I'm guessing we have version numbers in some situations? (just curious what those situations are).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discussed, this is due to difference between EMR and Docker, possibly worth looking at the different jars and seeing if there is some way to craft this so it works in both setups.
import org.springframework.context.annotation.Configuration; | ||
|
||
@Configuration | ||
@ConfigurationProperties(prefix = "target-replication") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This matches the layout in the YAML right? Where it makes sense I think we should re-use the same names as we use in Circus Train, so I was thinking more like something like this:
table-replications:
- source-table:
database-name: foo
table-name: bar
i.e. in the YAML file you can provide a list of tables to replicate. We could also allow regex to be used for both the db and table name and then compile this into a list of potential matches.
MessageReaderFactory messaReaderFactory = MessageReaderFactory | ||
.newInstance(messageReaderConfig.getMessageReaderFactoryClass()); | ||
MessageReader messageReader = messaReaderFactory.newInstance(replicaHiveConf, sqsMessageSerDe); | ||
return new MessageReaderAdapter(messageReader, sourceCatalog.getHiveMetastoreUris()); | ||
return new MessageReaderAdapter(messageReader, sourceCatalog.getHiveMetastoreUris(), tableSelector); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the idea of using a decorator to filter out the tables we're not interested in.
} | ||
|
||
public boolean canProcess(ListenerEvent listenerEvent) { | ||
List<String> tableNames = targetReplication.getTableNames(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can do this in the constructor and keep a reference to the tables to match rather than the targetReplication
itself (or perhaps even better, the constructor should just take a list and not be coupled to the targetReplication
at all).
Pull Request Test Coverage Report for Build 185
💛 - Coveralls |
private @Mock ListenerEvent listenerEvent1; | ||
private @Mock ListenerEvent listenerEvent2; | ||
private @Mock ListenerEvent listenerEvent3; | ||
private @Mock SqsMessageReader delegate; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SqsMessageReader
-> MessageReader
No need to mock the concrete class
when(listenerEvent3.getDbName()).thenReturn(DB_NAME); | ||
when(listenerEvent3.getTableName()).thenReturn(TABLE_NAME3); | ||
|
||
when(delegate.hasNext()).thenReturn(true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should probably mock this in a way that it respects the MessageReader interface. So hasNext should return true if there is a next otherwise false
filteringMessageReader = new FilteringMessageReader(delegate, tableSelector); | ||
ListenerEvent event = null; | ||
|
||
for (int i = 0; i < 2; i++) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a bit weird perhaps
assertThat(filteringMessageReader.hasNext(), is(true));
ListenerEvent event = filteringMessageReader.next();
assertThat(event.getDbName()).isEqualTo(DB_NAME);
assertThat(event.getTableName()).isEqualTo(TABLE_NAME1);
assertThat(filteringMessageReader.hasNext(), is(true));
event = filteringMessageReader.next();
assertThat(event.getDbName()).isEqualTo(DB_NAME);
assertThat(event.getTableName()).isEqualTo(TABLE_NAME2);
assertThat((filteringMessageReader.hasNext(), is(false));
return true; | ||
} | ||
} | ||
return false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably best to set current to null before returning
when(delegate.hasNext()).thenReturn(false); | ||
filteringMessageReader = new FilteringMessageReader(delegate, tableSelector); | ||
|
||
assertThat(filteringMessageReader.hasNext(), is(false)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
assert that the next() == null?
Add a line to the changelog please and once we settle on how the config should look we maybe need to add a short section in the README on how to use it the feature. |
…in FilteringMessageReader
@@ -32,6 +33,12 @@ public FilteringMessageReader(MessageReader delegate, TableSelector tableSelecto | |||
this.tableSelector = tableSelector; | |||
} | |||
|
|||
/** |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove this, this is not true. This class has nothing to do with the SQSMessageReader. This filtering will work for any implementation.
The MessageReader
extends Iterator
so hasNext will not be removed in the future. We just follow the iterator contract. See https://docs.oracle.com/javase/7/docs/api/java/util/Iterator.html.
Concerning the SQSMessageReader I understand return true is confusing but it does not break the contract. It doesn't say you cannot wait indefinitely for next to return a new value it just says: "return true if the iterator has more elements"
|
||
@Override | ||
public ListenerEvent next() { | ||
return current; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking at the Iterator contract in more depth we should do:
if (current == null) {
throw new NoSuchElementException();
}
return current;
- use Optional and remove Iterator - update license headers - added legal section - added log4j config for tests
refer #6