Skip to content
This repository has been archived by the owner on Jan 3, 2023. It is now read-only.

Commit

Permalink
Solve #1519, fix dest path issue in copy2s3 related rule (#1521)
Browse files Browse the repository at this point in the history
* Fix dest path issue in copy2s3 related rule.
* Add Truncate0 to action list.
* Code style.
  • Loading branch information
qiyuangong authored Jan 5, 2018
1 parent d3e28aa commit f21de11
Show file tree
Hide file tree
Showing 7 changed files with 77 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.smartdata.rule.parser.SmartRuleStringParser;
import org.smartdata.rule.parser.TranslationContext;
import org.smartdata.server.engine.rule.ExecutorScheduler;
import org.smartdata.server.engine.rule.FileCopy2S3Plugin;
import org.smartdata.server.engine.rule.FileCopyDrPlugin;
import org.smartdata.server.engine.rule.RuleExecutor;
import org.smartdata.server.engine.rule.RuleInfoRepo;
Expand Down Expand Up @@ -81,6 +82,7 @@ public RuleManager(
this.metaStore = context.getMetaStore();

RuleExecutorPluginManager.addPlugin(new FileCopyDrPlugin(context.getMetaStore()));
RuleExecutorPluginManager.addPlugin(new FileCopy2S3Plugin());
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,52 @@
package org.smartdata.server.engine.rule;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.smartdata.hdfs.action.Copy2S3Action;
import org.smartdata.model.CmdletDescriptor;
import org.smartdata.model.RuleInfo;
import org.smartdata.model.rule.RuleExecutorPlugin;
import org.smartdata.model.rule.TranslateResult;
import org.smartdata.utils.StringUtil;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

public class FileCopy2S3Plugin implements RuleExecutorPlugin {

private static final Logger LOG =
LoggerFactory.getLogger(FileCopy2S3Plugin.class.getName());
private List<String> srcBases;

public FileCopy2S3Plugin() {
srcBases = null;
}


@Override
public void onNewRuleExecutor(RuleInfo ruleInfo,
TranslateResult tResult) {
srcBases = new ArrayList<>();
List<String> pathsCheckGlob = tResult.getGlobPathCheck();
if (pathsCheckGlob.size() == 0) {
pathsCheckGlob = Collections.singletonList("/*");
}
// Get src base list
srcBases = getPathMatchesList(pathsCheckGlob);
LOG.info("Source base list = {}", srcBases);
}

private List<String> getPathMatchesList(List<String> paths) {
List<String> ret = new ArrayList<>();
for (String p : paths) {
String dir = StringUtil.getBaseDir(p);
if (dir == null) {
continue;
}
ret.add(dir);
}
return ret;
}

@Override
Expand All @@ -23,17 +58,38 @@ public boolean preExecution(RuleInfo ruleInfo,
@Override
public List<String> preSubmitCmdlet(RuleInfo ruleInfo,
List<String> objects) {
return null;
return objects;
}

@Override
public CmdletDescriptor preSubmitCmdletDescriptor(RuleInfo ruleInfo,
TranslateResult tResult, CmdletDescriptor descriptor) {
return null;
for (int i = 0; i < descriptor.actionSize(); i++) {
// O(n)
if (descriptor.getActionName(i).equals("copy2s3")) {
String srcPath = descriptor.getActionArgs(i).get(Copy2S3Action.SRC);
String destBase = descriptor.getActionArgs(i).get(Copy2S3Action.DEST);
String workPath = null;
// O(n)
for (String srcBase : srcBases) {
if (srcPath.startsWith(srcBase)) {
workPath = srcPath.replace(srcBase, "");
break;
}
}
if (workPath == null) {
LOG.error("Rule {} CmdletDescriptor {} Working Path is empty!", ruleInfo, descriptor);
}
// Update dest path
// dest base + work path = dest full path
descriptor.addActionArg(i, Copy2S3Action.DEST, destBase + workPath);
}
}
return descriptor;
}

@Override
public void onRuleExecutorExit(RuleInfo ruleInfo) {

srcBases = null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public class TestCopy2S3Action extends MiniClusterHarness {
// Map<String, String> args = new HashMap<>();
// args.put(Copy2S3Action.FILE_PATH, src);
//
// args.put(Copy2S3Action.DEST_PATH, dest);
// args.put(Copy2S3Action.DEST, dest);
// copy2S3Action.init(args);
// copy2S3Action.run();
// }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.smartdata.action.Utils;
import org.smartdata.action.annotation.ActionSignature;
import org.smartdata.conf.SmartConfKeys;
import org.smartdata.hdfs.CompatibilityHelper;
import org.smartdata.hdfs.CompatibilityHelperLoader;

import java.io.IOException;
Expand All @@ -45,14 +44,15 @@
@ActionSignature(
actionId = "copy2s3",
displayName = "copy2s3",
usage = HdfsAction.FILE_PATH + " $src " + Copy2S3Action.DEST_PATH +
usage = HdfsAction.FILE_PATH + " $src " + Copy2S3Action.DEST +
" $dest "
)
public class Copy2S3Action extends HdfsAction {
private static final Logger LOG =
LoggerFactory.getLogger(CopyFileAction.class);
public static final String BUF_SIZE = "-bufSize";
public static final String DEST_PATH = "-dest";
public static final String SRC = HdfsAction.FILE_PATH;
public static final String DEST = "-dest";
private String srcPath;
private String destPath;
private int bufferSize = 64 * 1024;
Expand All @@ -71,8 +71,8 @@ public void init(Map<String, String> args) {
}
super.init(args);
this.srcPath = args.get(FILE_PATH);
if (args.containsKey(DEST_PATH)) {
this.destPath = args.get(DEST_PATH);
if (args.containsKey(DEST)) {
this.destPath = args.get(DEST);
}
if (args.containsKey(BUF_SIZE)) {
bufferSize = Integer.valueOf(args.get(BUF_SIZE));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public class HdfsActionFactory extends AbstractActionFactory {
addAction(MergeFileAction.class);
addAction(MetaDataAction.class);
addAction(Copy2S3Action.class);
addAction(Truncate0Action.class);
// addAction("list", ListFileAction.class);
// addAction("fsck", FsckAction.class);
// addAction("diskbalance", DiskBalanceAction.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.smartdata.SmartContext;
import org.smartdata.hdfs.action.HdfsAction;
import org.smartdata.metastore.MetaStore;
import org.smartdata.metastore.MetaStoreException;
import org.smartdata.model.ActionInfo;
Expand Down Expand Up @@ -73,7 +74,7 @@ public ScheduleResult onSchedule(ActionInfo actionInfo, LaunchAction action) {

@Override
public boolean onSubmit(ActionInfo actionInfo) {
String path = actionInfo.getArgs().get("-file");
String path = actionInfo.getArgs().get(HdfsAction.FILE_PATH);
if (checkTheLengthOfFile(path) == 0) {
LOG.info("The submit file {}'s length is 0", path);
return false;
Expand All @@ -92,7 +93,7 @@ public boolean onSubmit(ActionInfo actionInfo) {

@Override
public void onActionFinished(ActionInfo actionInfo) {
String path = actionInfo.getArgs().get("-file");
String path = actionInfo.getArgs().get(HdfsAction.FILE_PATH);
// unlock filelock
if (ifLocked(path)) {
unLockTheFile(path);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.slf4j.LoggerFactory;
import org.smartdata.SmartContext;
import org.smartdata.action.SyncAction;
import org.smartdata.hdfs.action.HdfsAction;
import org.smartdata.metastore.MetaStore;
import org.smartdata.metastore.MetaStoreException;
import org.smartdata.model.ActionInfo;
Expand Down Expand Up @@ -116,7 +117,7 @@ public ScheduleResult onSchedule(ActionInfo actionInfo, LaunchAction action) {
return ScheduleResult.FAIL;
}
String srcDir = action.getArgs().get(SyncAction.SRC);
String path = action.getArgs().get("-file");
String path = action.getArgs().get(HdfsAction.FILE_PATH);
String destDir = action.getArgs().get(SyncAction.DEST);
// Check again to avoid corner cases
long did = fileDiffChainMap.get(path).getHead();
Expand All @@ -141,19 +142,19 @@ public ScheduleResult onSchedule(ActionInfo actionInfo, LaunchAction action) {
break;
case DELETE:
action.setActionType("delete");
action.getArgs().put("-file", path.replace(srcDir, destDir));
action.getArgs().put(HdfsAction.FILE_PATH, path.replace(srcDir, destDir));
break;
case RENAME:
action.setActionType("rename");
action.getArgs().put("-file", path.replace(srcDir, destDir));
action.getArgs().put(HdfsAction.FILE_PATH, path.replace(srcDir, destDir));
// TODO scope check
String remoteDest = fileDiff.getParameters().get("-dest");
action.getArgs().put("-dest", remoteDest.replace(srcDir, destDir));
fileDiff.getParameters().remove("-dest");
break;
case METADATA:
action.setActionType("metadata");
action.getArgs().put("-file", path.replace(srcDir, destDir));
action.getArgs().put(HdfsAction.FILE_PATH, path.replace(srcDir, destDir));
break;
default:
break;
Expand Down Expand Up @@ -194,7 +195,7 @@ private boolean isFileLocked(String path) {

@Override
public boolean onSubmit(ActionInfo actionInfo) {
String path = actionInfo.getArgs().get("-file");
String path = actionInfo.getArgs().get(HdfsAction.FILE_PATH);
LOG.debug("Submit file {} with lock {}", path, fileLock.keySet());
// If locked then false
if (!isFileLocked(path)) {
Expand Down

0 comments on commit f21de11

Please sign in to comment.