Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

State Management Flexibility #63

Merged
merged 1 commit into from
May 5, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public class ReplicationPluginLoader implements PluginLoader {
public Map<String, TaskManagerFactory> loadTaskFactories() {
Map<String, TaskManagerFactory> factoryMap;

factoryMap = new HashMap<String, TaskManagerFactory>();
factoryMap = new HashMap<>();

factoryMap.put("read-change-interval", new IntervalDownloaderFactory());
factoryMap.put("rci", new IntervalDownloaderFactory());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,24 +40,29 @@ public abstract class BaseReplicationDownloader implements RunnableTask {
private static final String LOCK_FILE = "download.lock";
private static final String CONFIG_FILE = "configuration.txt";
private static final String LOCAL_STATE_FILE = "state.txt";
private static final String CUSTOM_SERVER_STATE_FILE = "custom.state.txt";


private File workingDirectory;
private ReplicationSequenceFormatter sequenceFormatter;
private ServerStateReader serverStateReader;
private boolean single;


/**
* Creates a new instance.
*
* @param workingDirectory
* The directory containing configuration and tracking files.
* @param single
* Set to true if you want to only replicate a single diff file from the server
*/
public BaseReplicationDownloader(File workingDirectory) {
public BaseReplicationDownloader(File workingDirectory, boolean single) {
this.workingDirectory = workingDirectory;

sequenceFormatter = new ReplicationSequenceFormatter(9, 3);
serverStateReader = new ServerStateReader();
this.single = single;
}


Expand Down Expand Up @@ -236,6 +241,12 @@ private ReplicationState download(ReplicationDownloaderConfiguration configurati

// Update the local state to reflect the file state just processed.
localState = fileReplicationState;

// if single is set to true it means that we only want to get a single replication file
// and not up to the current one.
if (single) {
break;
}
}

return localState;
Expand All @@ -251,6 +262,17 @@ private void runImpl() {

// Instantiate utility objects.
configuration = new ReplicationDownloaderConfiguration(new File(workingDirectory, CONFIG_FILE));

// check for custom server state file
File customServerStateFile = new File(workingDirectory, CUSTOM_SERVER_STATE_FILE);
if (customServerStateFile.exists()) {
serverState = new ReplicationState(new PropertiesPersister(customServerStateFile).loadMap());
LOG.info(String.format("Reading custom server state. [%s]", serverState.toString()));
} else {
// Obtain the server state.
serverState = serverStateReader.getServerState(configuration.getBaseUrl());
LOG.info(String.format("Reading current server state. [%s]", serverState.toString()));
}

// Obtain the server state.
LOG.fine("Reading current server state.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,11 @@ public class ReplicationDownloader extends BaseReplicationDownloader implements
*
* @param workingDirectory
* The directory containing configuration and tracking files.
* @param single
* Set to true if you want to only replicate a single diff file from the server
*/
public ReplicationDownloader(File workingDirectory) {
super(workingDirectory);
public ReplicationDownloader(File workingDirectory, boolean single) {
super(workingDirectory, single);

// We will sort all contents prior to sending to the sink. This adds overhead that may not
// always be required, but provides consistent behaviour.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,21 @@
* The task manager factory for a replication file downloader.
*/
public class ReplicationDownloaderFactory extends WorkingTaskManagerFactory {
private static final String ARG_SINGLE = "single";
private static final boolean DEFAULT_SINGLE = false;

/**
* {@inheritDoc}
*/
@Override
protected TaskManager createTaskManagerImpl(TaskConfiguration taskConfig) {
boolean single = getBooleanArgument(taskConfig, ARG_SINGLE, DEFAULT_SINGLE);

return new RunnableChangeSourceManager(
taskConfig.getId(),
new ReplicationDownloader(
this.getWorkingDirectory(taskConfig)
this.getWorkingDirectory(taskConfig),
single
),
taskConfig.getPipeArgs()
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,11 @@ public class ReplicationFileMerger extends BaseReplicationDownloader {
*
* @param workingDirectory
* The directory containing configuration and tracking files.
* @param single
* Set to true if you want to only replicate a single diff file from the server
*/
public ReplicationFileMerger(File workingDirectory) {
super(workingDirectory);
public ReplicationFileMerger(File workingDirectory, boolean single) {
super(workingDirectory, single);

replicationStore = new FileReplicationStore(new File(getWorkingDirectory(), DATA_DIRECTORY), true);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,21 @@
* The task manager factory for a replication file merger.
*/
public class ReplicationFileMergerFactory extends WorkingTaskManagerFactory {

private static final String ARG_SINGLE = "single";
private static final boolean DEFAULT_SINGLE = false;

/**
* {@inheritDoc}
*/
@Override
protected TaskManager createTaskManagerImpl(TaskConfiguration taskConfig) {
boolean single = getBooleanArgument(taskConfig, ARG_SINGLE, DEFAULT_SINGLE);

return new RunnableTaskManager(
taskConfig.getId(),
new ReplicationFileMerger(
this.getWorkingDirectory(taskConfig)
this.getWorkingDirectory(taskConfig),
single
),
taskConfig.getPipeArgs()
);
Expand Down