Skip to content

Commit

Permalink
Solve #978, Add properties support in move plan to control mover
Browse files Browse the repository at this point in the history
behavior
  • Loading branch information
littlezhou committed Aug 19, 2017
1 parent a804aee commit cc15d14
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,16 @@

import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* Plan of MoverScheduler to indicate block, source and target.
*/
public class FileMovePlan {
public static final String MAX_CONCURRENT_MOVES = "maxConcurrentMoves";
public static final String MAX_NUM_RETRIES = "maxNumRetries";
// info of the namenode
private URI namenode;

Expand All @@ -45,6 +49,8 @@ public class FileMovePlan {
// info of block
private List<Long> blockIds;

private Map<String, String> properties;

public FileMovePlan(URI namenode, String fileName) {
this.namenode = namenode;
this.fileName = fileName;
Expand All @@ -53,6 +59,7 @@ public FileMovePlan(URI namenode, String fileName) {
targetIpAddrs = new ArrayList<>();
targetXferPorts = new ArrayList<>();
targetStorageTypes = new ArrayList<>();
properties = new HashMap<>();
blockIds = new ArrayList<>();
}

Expand Down Expand Up @@ -110,6 +117,27 @@ public URI getNamenode() {
return namenode;
}

public void addProperty(String property, String value) {
if (property != null) {
properties.put(property, value);
}
}

public String getPropertyValue(String property, String defaultValue) {
if (properties.containsKey(property)) {
return properties.get(property);
}
return defaultValue;
}

public int getPropertyValueInt(String property, int defaultValue) {
String v = getPropertyValue(property, null);
if (v == null) {
return defaultValue;
}
return Integer.parseInt(v);
}

@Override
public String toString() {
Gson gson = new Gson();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ public MoverBasedMoveRunner(Configuration conf, MoverStatus actionStatus) {
}

public void move(String file, FileMovePlan plan) throws Exception {
MoverExecutor executor = new MoverExecutor(conf, 10, 20);
int maxMoves = plan.getPropertyValueInt(FileMovePlan.MAX_CONCURRENT_MOVES, 10);
int maxRetries = plan.getPropertyValueInt(FileMovePlan.MAX_NUM_RETRIES, 10);
MoverExecutor executor = new MoverExecutor(conf, maxRetries, maxMoves);
executor.executeMove(plan);
}
}

0 comments on commit cc15d14

Please sign in to comment.