Skip to content

Commit

Permalink
Merge pull request #71 from flyicewolf/master
Browse files Browse the repository at this point in the history
Merge code from Jiannan.  
#71 AppMaster should request for all containers the job requires at a time. 
#72 Remove DAG related code
  • Loading branch information
coderplay committed Jul 20, 2012
2 parents 9ec0319 + b5c7d86 commit 334bd3e
Show file tree
Hide file tree
Showing 41 changed files with 395 additions and 1,394 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,6 @@
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.realtime.conf.DragonConfiguration;
import org.apache.hadoop.realtime.dag.DirectedAcyclicGraph;
import org.apache.hadoop.realtime.job.JobCounter;
import org.apache.hadoop.realtime.records.Counter;
import org.apache.hadoop.realtime.records.CounterGroup;
import org.apache.hadoop.realtime.records.Counters;
import org.apache.hadoop.realtime.records.JobId;
Expand All @@ -42,66 +39,6 @@
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;

/**
* A {@link DragonJob} is the basic computing unit of Dragon. It allows the
* user to configure the job, submit it, control its execution, and query
* the state. The set methods only work until the job is submitted,
* otherwises they will throw an IllegalStateException. </p>
*
* <p>
* The key component of a {@link DragonJob} is a {@link DragonJobGraph},
* which is formed by a collection of {@link DragonVertex}s and directed
* {@link DragonEdge}s.
* Normally user creates the application, describes various facets of the job
* , sets {@link DragonJobGraph} via {@link DragonJob} , and then
* submits the job and monitor its progress.
* </p>
*
* <p>Here is an example on how to submit a {@link DragonJob}:</p>
* <p><blockquote><pre>
* Configuration conf = getConf();
* conf.setInt(INT_PROPERTY, 1);
* conf.set(STRING_PROPERTY, "VALUE");
* conf.set(DragonJobConfig.PROPERTY, "GRAPHJOB_VALUE");
* DragonJob job = DragonJob.getInstance(conf);
* job.setJobName("First Graph Job");
*
* DragonVertex source = new DragonVertex.Builder("source")
* .producer(EventProducer.class)
* .processor(EventProcessor.class)
* .tasks(10)
* .build();
* DragonVertex m1 = new DragonVertex.Builder("intermediate1")
* .processor(EventProcessor.class)
* .addFile("file.txt")
* .addFile("dict.dat")
* .addArchive("archive.zip")
* .tasks(10)
* .build();
* DragonVertex m2 = new DragonVertex.Builder("intermediate2")
* .processor(EventProcessor.class)
* .addFile("aux")
* .tasks(10)
* .build();
* DragonVertex dest = new DragonVertex.Builder("dest")
* .processor(EventProcessor.class)
* .tasks(10)
* .build();
* DragonJobGraph g = new DragonJobGraph();
* // check if the graph is cyclic when adding edge
* g.addEdge(source, m1).parition(HashPartitioner.class);
* g.addEdge(source, m2).parition(HashPartitioner.class);
* g.addEdge(m1, dest).parition(CustomPartitioner.class);
* g.addEdge(m2, dest).parition(CustomPartitioner.class);
* job.setJobGraph(g);
* // check all source vertex hold event producers when submitting
* job.submit();
* </pre></blockquote></p>
*
* @see DirectedAcyclicGraph
* @see DragonVertex
* @see DragonEdge
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class DragonJob {
Expand All @@ -123,8 +60,6 @@ public class DragonJob {

private JobId jobId;

private DragonJobGraph jobGraph;

protected final Credentials credentials;

private JobState state = JobState.NEW;
Expand Down Expand Up @@ -260,14 +195,6 @@ public String getQueueName() {
return null;
}

public void setJobGraph(DragonJobGraph jobGraph) {
this.jobGraph = jobGraph;
}

DragonJobGraph getJobGraph() {
return jobGraph;
}

/**
* Monitor a job and print status in real-time as progress is made and tasks
* fail.
Expand Down Expand Up @@ -394,4 +321,12 @@ public JobState getState() {
}
return null;
}

public void setMapper(Class<?> clazz){
conf.setClass(DragonJobConfig.JOB_MAP_CLASS, clazz, Object.class);
}

public void setReducer(Class<?> clazz){
conf.setClass(DragonJobConfig.JOB_REDUCE_CLASS, clazz, Object.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -291,4 +291,12 @@ public class DragonJobConfig {
"dragon.history.use-batched-flush.queue-size.threshold";
public static final int DEFAULT_JOB_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD =
50;

public static final String JOB_MAP_CLASS = "map.class";

public static final String JOB_REDUCE_CLASS = "reduce.class";

public static final String MAP_PARALLELISM = "map.parallelism";

public static final String REDUCE_PARALLELISM = "reduce.parallelism";
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
Expand All @@ -39,10 +38,8 @@
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
Expand All @@ -56,7 +53,6 @@
import org.apache.hadoop.realtime.records.TaskReport;
import org.apache.hadoop.realtime.security.TokenCache;
import org.apache.hadoop.realtime.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.realtime.serialize.HessianSerializer;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
Expand Down Expand Up @@ -204,7 +200,8 @@ public JobReport submitJob(DragonJob job) throws IOException,

populateTokenCache(conf, job.getCredentials());

copyFilesFromLocal(job.getJobGraph(), submitFs, submitJobDir, replication);
copyFilesFromLocal(submitFs, submitJobDir, replication);

copyJobJar(job, conf, submitJobDir, submitFs, replication);

// write "queue admins of the queue to which job is being submitted"
Expand All @@ -220,9 +217,6 @@ public JobReport submitJob(DragonJob job) throws IOException,
// Write job file to submit dir
writeConf(conf, submitFs, submitJobDir);

// Write the serialized job description dag to submit dir
writeJobDescription(job.getJobGraph(), submitFs, submitJobDir);

//
// Now, actually submit the job (using the submit name)
//
Expand Down Expand Up @@ -480,22 +474,6 @@ private void writeConf(final Configuration conf, final FileSystem submitFs,
}
}

private void writeJobDescription(final DragonJobGraph graph,
final FileSystem submitFs, final Path submitJobDir) throws IOException {
// Write job description to job service provider's fs
Path descFile = JobSubmissionFiles.getJobDescriptionFile(submitJobDir);
FSDataOutputStream out =
FileSystem.create(submitFs, descFile, new FsPermission(
JobSubmissionFiles.JOB_FILE_PERMISSION));
try {
HessianSerializer<DragonJobGraph> serializer =
new HessianSerializer<DragonJobGraph>();
serializer.serialize(out, graph);
} finally {
out.close();
}
}

// get secret keys and tokens and store them into TokenCache
@SuppressWarnings("unchecked")
private void populateTokenCache(Configuration conf, Credentials credentials)
Expand Down Expand Up @@ -554,11 +532,8 @@ private void readTokensFromFiles(Configuration conf, Credentials credentials)
}
}

private void copyFilesFromLocal(final DragonJobGraph jobGraph,
final FileSystem submitFs,
final Path submitJobDir,
final short replication)
throws IOException {
private void copyFilesFromLocal(final FileSystem submitFs,
final Path submitJobDir, final short replication) throws IOException {
if (submitFs.exists(submitJobDir)) {
throw new IOException("Not submitting job. Job directory " + submitJobDir
+ " already exists!! This is unexpected.Please check what's there in"
Expand All @@ -568,31 +543,33 @@ private void copyFilesFromLocal(final DragonJobGraph jobGraph,
new FsPermission(JobSubmissionFiles.JOB_DIR_PERMISSION);
FileSystem.mkdirs(submitFs, submitJobDir, sysPerms);
// add all the command line files/ jars and archive

// first copy them to dragon job service provider's
// filesystem
for (DragonVertex vertex : jobGraph.vertexSet()) {
List<String> files = vertex.getFiles();
if (files.size() > 0) {
Path filesDir = JobSubmissionFiles.getJobDistCacheFiles(submitJobDir);
FileSystem.mkdirs(submitFs, filesDir, sysPerms);
for (String file : files) {
Path newPath = new Path(filesDir, file);
submitFs.copyFromLocalFile(false, true, new Path(file), newPath);
submitFs.setReplication(newPath, replication);
}
List<String> files =
(List<String>) conf
.getStringCollection(DragonJobConfig.JOB_DiST_CACHE_FILES);
if (files.size() > 0) {
Path filesDir = JobSubmissionFiles.getJobDistCacheFiles(submitJobDir);
FileSystem.mkdirs(submitFs, filesDir, sysPerms);
for (String file : files) {
Path newPath = new Path(filesDir, file);
submitFs.copyFromLocalFile(false, true, new Path(file), newPath);
submitFs.setReplication(newPath, replication);
}
}

List<String> archives = vertex.getArchives();
if (archives.size() > 0) {
Path archivesDir =
JobSubmissionFiles.getJobDistCacheArchives(submitJobDir);
FileSystem.mkdirs(submitFs, archivesDir, sysPerms);
for (String archive : archives) {
Path newPath = new Path(archivesDir, archive);
submitFs.copyFromLocalFile(false, true, new Path(archive),
newPath);
submitFs.setReplication(newPath, replication);
}
List<String> archives =
(List<String>) conf
.getStringCollection(DragonJobConfig.JOB_DIST_CACHE_ARCHIVES);
if (archives.size() > 0) {
Path archivesDir =
JobSubmissionFiles.getJobDistCacheArchives(submitJobDir);
FileSystem.mkdirs(submitFs, archivesDir, sysPerms);
for (String archive : archives) {
Path newPath = new Path(archivesDir, archive);
submitFs.copyFromLocalFile(false, true, new Path(archive), newPath);
submitFs.setReplication(newPath, replication);
}
}
}
Expand Down
Loading

0 comments on commit 334bd3e

Please sign in to comment.