From 0b9b1cea5d819d76448163d1c5cc7331e720730d Mon Sep 17 00:00:00 2001 From: Kapil Arya Date: Wed, 20 Aug 2014 13:02:10 -0700 Subject: [PATCH 1/5] cpp/{README -> README.md} --- cpp/{README => README.md} | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) rename cpp/{README => README.md} (95%) diff --git a/cpp/README b/cpp/README.md similarity index 95% rename from cpp/README rename to cpp/README.md index ce0f496..53104fc 100644 --- a/cpp/README +++ b/cpp/README.md @@ -3,7 +3,7 @@ RENDLER implementation in C++ Dependencies: ============ -- libboost_regex.so +- libboost\_regex.so - libcurl.so - Makefile assumes all 3rdparty libraries/headers to be available in the default include path (/usr/include?). @@ -18,7 +18,7 @@ Limitations/Features: ==================== - Doesn't store the images to S3, just locally. - Image files are kept in rendler-work-dir in the same folder as the - render_executor executable. + render\_executor executable. - Images files are named R where N is a monotonouly increasing integer. - It wouldn't crawl outside of the given base URL (it will still render those webpages) to avoid pulling in too much data. From 2c0b7351b3955eb16fc191350ae0a579a4494171 Mon Sep 17 00:00:00 2001 From: Kapil Arya Date: Wed, 20 Aug 2014 13:04:47 -0700 Subject: [PATCH 2/5] Minor updates to cpp/README.md --- cpp/README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/README.md b/cpp/README.md index 53104fc..48fe7a3 100644 --- a/cpp/README.md +++ b/cpp/README.md @@ -28,5 +28,5 @@ Limitations/Features: Communication between Scheduler and Executors: ============================================= - Each framework message consists of a vector of strings: - RenderExecuter->Scheduler: { taskId, taskUrl, filepath } - CrawlExecuter->Scheduler: { taskId, taskUrl, + } + - RenderExecuter->Scheduler: { taskId, taskUrl, filepath } + - CrawlExecuter->Scheduler: { taskId, taskUrl, \+ } From 30852abbe073ef6cd7a3b67eebff14e35027b7d1 Mon Sep 17 00:00:00 2001 From: Kapil Arya Date: Thu, 21 Aug 2014 21:37:12 +0000 Subject: [PATCH 3/5] Fix compilation flags for C++ implementation. --- cpp/Makefile | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/cpp/Makefile b/cpp/Makefile index fd47f4f..8382958 100644 --- a/cpp/Makefile +++ b/cpp/Makefile @@ -1,8 +1,15 @@ +# Prefix where Mesos is installed +MESOS_PREFIX = /usr +#MESOS_PREFIX = $(HOME)/usr + CXX = g++ -CXXFLAGS = -g -O2 -LDFLAGS += -lmesos +CXXFLAGS = -g -g2 -O2 -std=c++11 +LDFLAGS = -L$(MESOS_PREFIX)/lib +INCLUDES = -I$(MESOS_PREFIX)/include + +LDFLAGS += -lmesos -lprotobuf -lpthread CXXCOMPILE = $(CXX) $(INCLUDES) $(CXXFLAGS) -c -o $@ -CXXLINK = $(CXX) $(INCLUDES) $(CXXFLAGS) $(LDFLAGS) -o $@ +CXXLINK = $(CXX) $(INCLUDES) $(CXXFLAGS) -o $@ $< $(LDFLAGS) default: all all: rendler crawl_executor render_executor @@ -10,10 +17,10 @@ all: rendler crawl_executor render_executor HEADERS = rendler_helper.hpp crawl_executor: crawl_executor.cpp $(HEADERS) - $(CXXLINK) $< -lboost_regex -lcurl + $(CXXLINK) -lboost_regex -lcurl %: %.cpp $(HEADERS) - $(CXXLINK) $< + $(CXXLINK) check: crawl ./crawl http://mesosphere.io/team/ From 8fac3594f850da8f4adb4b4d5e9f222048713aae Mon Sep 17 00:00:00 2001 From: Kapil Arya Date: Thu, 21 Aug 2014 23:51:20 -0700 Subject: [PATCH 4/5] Added C++ skeleton rendler. --- cpp/Makefile | 4 +- cpp/rendler_skeleton.cpp | 377 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 379 insertions(+), 2 deletions(-) create mode 100644 cpp/rendler_skeleton.cpp diff --git a/cpp/Makefile b/cpp/Makefile index 8382958..56eb2cc 100644 --- a/cpp/Makefile +++ b/cpp/Makefile @@ -12,7 +12,7 @@ CXXCOMPILE = $(CXX) $(INCLUDES) $(CXXFLAGS) -c -o $@ CXXLINK = $(CXX) $(INCLUDES) $(CXXFLAGS) -o $@ $< $(LDFLAGS) default: all -all: rendler crawl_executor render_executor +all: rendler rendler_skeleton crawl_executor render_executor HEADERS = rendler_helper.hpp @@ -26,4 +26,4 @@ check: crawl ./crawl http://mesosphere.io/team/ clean: - (rm -f core crawl_executor render_executor rendler) + (rm -f core crawl_executor render_executor rendler rendler_skeleton) diff --git a/cpp/rendler_skeleton.cpp b/cpp/rendler_skeleton.cpp new file mode 100644 index 0000000..b5a4c91 --- /dev/null +++ b/cpp/rendler_skeleton.cpp @@ -0,0 +1,377 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// See the Mesos Framework Development Guide: +// http://mesos.apache.org/documentation/latest/app-framework-development-guide + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "rendler_helper.hpp" + +using namespace mesos; + +using std::cout; +using std::endl; +using std::string; +using std::vector; +using std::queue; +using std::map; + +using mesos::Resources; + +const float CPUS_PER_TASK = 0.2; +const int32_t MEM_PER_TASK = 32; + +static queue crawlQueue; +static queue renderQueue; +static map > crawlResults; +static map renderResults; +static map processed; +static size_t nextUrlId = 0; + +MesosSchedulerDriver* schedulerDriver; + +static void shutdown(); +static void SIGINTHandler(); + +class Rendler : public Scheduler +{ +public: + Rendler(const ExecutorInfo& _crawler, + const ExecutorInfo& _renderer, + const string& _seedUrl) + : crawler(_crawler), + renderer(_renderer), + seedUrl(_seedUrl), + tasksLaunched(0), + tasksFinished(0), + frameworkMessagesReceived(0) + { + crawlQueue.push(seedUrl); + renderQueue.push(seedUrl); + processed[seedUrl] = nextUrlId++; + size_t lsp = seedUrl.find_last_of('/'); + baseUrl = seedUrl.substr(0, lsp); // No trailing slash + TASK_RESOURCES = Resources::parse( + "cpus:" + stringify(CPUS_PER_TASK) + + ";mem:" + stringify(MEM_PER_TASK)).get(); + + } + + virtual ~Rendler() {} + + + // Invoked when the scheduler successfully registers with a Mesos master. + // It is called with the frameworkId, a unique ID generated by the + // master, and the masterInfo which is information about the master + // itself. + virtual void registered(SchedulerDriver*, + const FrameworkID&, + const MasterInfo&) + { + cout << "Registered!" << endl; + } + + // Invoked when the scheduler re-registers with a newly elected Mesos + // master. This is only called when the scheduler has previously been + // registered. masterInfo contains information about the newly elected + // master. + virtual void reregistered(SchedulerDriver*, const MasterInfo& masterInfo) {} + + virtual void disconnected(SchedulerDriver* driver) {} + + size_t maxTasksForOffer(Offer offer) + { + size_t count = 0; + Resources remaining = offer.resources(); + // + // TODO + // + return count; + } + + TaskInfo makeTaskPrototype(Offer& offer, const ExecutorInfo& executor) + { + TaskInfo task; + task.mutable_slave_id()->MergeFrom(offer.slave_id()); + task.mutable_executor()->MergeFrom(executor); + task.mutable_resources()->MergeFrom(TASK_RESOURCES); + return task; + } + + TaskInfo makeCrawlTask(Offer& offer) + { + string url = crawlQueue.front(); + crawlQueue.pop(); + + TaskInfo task = makeTaskPrototype(offer, crawler); + // + // TODO: Fill in task name, id, and data + // + return task; + } + + TaskInfo makeRenderTask(Offer& offer) + { + string url = renderQueue.front(); + renderQueue.pop(); + + TaskInfo task = makeTaskPrototype(offer, renderer); + // + // TODO: Fill in task name, id, and data + // + return task; + } + + // Invoked when resources have been offered to this framework. A single + // offer will only contain resources from a single slave. Resources + // associated with an offer will not be re-offered to _this_ framework + // until either (a) this framework has rejected those resources (see + // SchedulerDriver.launchTasks) or (b) those resources have been + // rescinded (see Scheduler.offerRescinded). Note that resources may be + // concurrently offered to more than one framework at a time (depending + // on the allocator being used). In that case, the first framework to + // launch tasks using those resources will be able to use them while the + // other frameworks will have those resources rescinded (or if a + // framework has already launched tasks with those resources then those + // tasks will fail with a TASK_LOST status and a message saying as much). + virtual void resourceOffers(SchedulerDriver* driver, + const vector& offers) + { + cout << "Received resource offer(s)" << endl; + // + // TODO: Launch tasks. + // + } + + virtual void offerRescinded(SchedulerDriver* driver, + const OfferID& offerId) {} + + // Invoked when the status of a task has changed (e.g., a slave is lost + // and so the task is lost, a task finishes and an executor sends a + // status update saying so, etc.) Note that returning from this callback + // acknowledges receipt of this status update. If for whatever reason + // the scheduler aborts during this callback (or the process exits) + // another status update will be delivered. Note, however, that this is + // currently not true if the slave sending the status update is lost or + // fails during that time. + virtual void statusUpdate(SchedulerDriver* driver, const TaskStatus& status) + { + if (status.state() == TASK_FINISHED) { + cout << "Task " << status.task_id().value() << " finished" << endl; + tasksFinished++; + } + + // + // TODO: Terminate by calling shutdown() and driver->stop() if all running + // tasks have finished and there are no more queued tasks. + // + } + + // Invoked when an executor sends a message. These messages are best + // effort; do not expect a framework message to be retransmitted in any + // reliable fashion. + virtual void frameworkMessage(SchedulerDriver* driver, + const ExecutorID& executorId, + const SlaveID& slaveId, + const string& data) + { + vector strVector = stringToVector(data); + string taskId = strVector[0]; + string url = strVector[1]; + + if (executorId.value() == crawler.executor_id().value()) { + cout << "Crawler msg received: " << taskId << endl; + vector newURLs(strVector.begin() + 2, strVector.end()); + // + // TODO + // + } else { + cout << "Renderer msg received: " << taskId << endl; + string path = strVector[2]; + // + // TODO + // + } + } + + // Invoked when a slave has been determined unreachable (e.g., machine + // failure, network partition.) Most frameworks will need to reschedule + // any tasks launched on this slave on a new slave. + virtual void slaveLost(SchedulerDriver* driver, const SlaveID& sid) {} + + // Invoked when an executor has exited/terminated. Note that any tasks + // running will have TASK_LOST status updates automatically generated. + virtual void executorLost(SchedulerDriver* driver, + const ExecutorID& executorID, + const SlaveID& slaveID, + int status) {} + + // Invoked when there is an unrecoverable error in the scheduler or + // scheduler driver. The driver will be aborted BEFORE invoking this + // callback. + virtual void error(SchedulerDriver* driver, const string& message) + { + cout << message << endl; + } + +private: + const ExecutorInfo crawler; + const ExecutorInfo renderer; + string seedUrl; + string baseUrl; + size_t tasksLaunched; + size_t tasksFinished; + size_t frameworkMessagesReceived; + Resources TASK_RESOURCES; +}; + +static void shutdown() +{ + printf("Rendler is shutting down\n"); + printf("Writing results to result.dot\n"); + + FILE *f = fopen("result.dot", "w"); + fprintf(f, "digraph G {\n"); + fprintf(f, " node [shape=box];\n"); + + // Add vertices. + map::iterator rit; + for (rit = renderResults.begin(); rit != renderResults.end(); rit++) { + // Prepend character as dot vertices cannot starting with a digit. + string url_hash = "R" + stringify(processed[rit->first]); + string& filename = rit->second; + fprintf(f, + " %s[label=\"\" image=\"%s\"];\n", + url_hash.c_str(), + filename.c_str()); + } + + // Add edges. + map >::iterator cit; + for (cit = crawlResults.begin(); cit != crawlResults.end(); cit++) { + if (renderResults.find(cit->first) == renderResults.end()) { + continue; + } + string from_hash = "R" + stringify(processed[cit->first]); + vector& adjList = cit->second; + + for (size_t i = 0; i < adjList.size(); i++) { + string to_hash = "R" + stringify(processed[adjList[i]]); + if (renderResults.find(adjList[i]) != renderResults.end()) { + // DOT format is: + // A -> B; + fprintf(f, " %s -> %s;\n", from_hash.c_str(), to_hash.c_str()); + } + } + } + + fprintf(f, "}\n"); + fclose(f); +} + +static void SIGINTHandler(int signum) +{ + if (schedulerDriver != NULL) { + shutdown(); + schedulerDriver->stop(); + } + delete schedulerDriver; + exit(0); +} + +#define shift argc--,argv++ +int main(int argc, char** argv) +{ + string seedUrl, master; + shift; + while (true) { + string s = argc>0 ? argv[0] : "--help"; + if (argc > 1 && s == "--seedUrl") { + seedUrl = argv[1]; + shift; shift; + } else if (argc > 1 && s == "--master") { + master = argv[1]; + shift; shift; + } else { + break; + } + } + + if (master.length() == 0 || seedUrl.length() == 0) { + printf("Usage: rendler --seedUrl --master :\n"); + exit(1); + } + + // Find this executable's directory to locate executor. + string path = realpath(dirname(argv[0]), NULL); + string crawlerURI = path + "/crawl_executor"; + string rendererURI = path + "/render_executor"; + cout << crawlerURI << endl; + cout << rendererURI << endl; + + ExecutorInfo crawler; + crawler.mutable_executor_id()->set_value("Crawler"); + crawler.mutable_command()->set_value(crawlerURI); + crawler.set_name("Crawl Executor (C++)"); + crawler.set_source("cpp"); + + ExecutorInfo renderer; + renderer.mutable_executor_id()->set_value("Renderer"); + renderer.mutable_command()->set_value(rendererURI); + renderer.set_name("Render Executor (C++)"); + renderer.set_source("cpp"); + + Rendler scheduler(crawler, renderer, seedUrl); + + FrameworkInfo framework; + framework.set_user(""); // Have Mesos fill in the current user. + framework.set_name("Rendler Framework (C++)"); + //framework.set_role(role); + framework.set_principal("rendler-cpp"); + + // Set up the signal handler for SIGINT for clean shutdown. + struct sigaction action; + action.sa_handler = SIGINTHandler; + sigemptyset(&action.sa_mask); + action.sa_flags = 0; + sigaction(SIGINT, &action, NULL); + + schedulerDriver = new MesosSchedulerDriver(&scheduler, framework, master); + + int status = schedulerDriver->run() == DRIVER_STOPPED ? 0 : 1; + + // Ensure that the driver process terminates. + schedulerDriver->stop(); + + shutdown(); + + delete schedulerDriver; + return status; +} From 0f90abca96aa8e73d160c29daa959682d7346626 Mon Sep 17 00:00:00 2001 From: Kapil Arya Date: Fri, 22 Aug 2014 00:05:17 -0700 Subject: [PATCH 5/5] Make skeletons the default - Move fully-implemented schedulers in CHEAT file --- cpp/rendler.cpp | 186 ++++++++------ ...rendler_skeleton.cpp => rendler_CHEAT.cpp} | 186 ++++++-------- .../mesosphere/rendler/scheduler/main.go | 155 +++++------- .../main.go | 155 +++++++----- python/rendler.py | 229 ++++++++++-------- .../{rendler_skeleton.py => rendler_CHEAT.py} | 229 ++++++++---------- .../scala/mesosphere/rendler/Scheduler.scala | 64 +---- ...erSkeleton.scala => Scheduler_CHEAT.scala} | 64 ++++- 8 files changed, 634 insertions(+), 634 deletions(-) rename cpp/{rendler_skeleton.cpp => rendler_CHEAT.cpp} (63%) rename go/src/github.com/mesosphere/rendler/{scheduler_skeleton => scheduler_CHEAT}/main.go (66%) rename python/{rendler_skeleton.py => rendler_CHEAT.py} (55%) rename scala/src/main/scala/mesosphere/rendler/{SchedulerSkeleton.scala => Scheduler_CHEAT.scala} (62%) diff --git a/cpp/rendler.cpp b/cpp/rendler.cpp index 9fffd0a..b5a4c91 100644 --- a/cpp/rendler.cpp +++ b/cpp/rendler.cpp @@ -16,6 +16,9 @@ * limitations under the License. */ +// See the Mesos Framework Development Guide: +// http://mesos.apache.org/documentation/latest/app-framework-development-guide + #include #include #include @@ -73,12 +76,21 @@ class Rendler : public Scheduler crawlQueue.push(seedUrl); renderQueue.push(seedUrl); processed[seedUrl] = nextUrlId++; - size_t lsp = seedUrl.find_last_of('/'); // skip the http:// part + size_t lsp = seedUrl.find_last_of('/'); baseUrl = seedUrl.substr(0, lsp); // No trailing slash + TASK_RESOURCES = Resources::parse( + "cpus:" + stringify(CPUS_PER_TASK) + + ";mem:" + stringify(MEM_PER_TASK)).get(); + } virtual ~Rendler() {} + + // Invoked when the scheduler successfully registers with a Mesos master. + // It is called with the frameworkId, a unique ID generated by the + // master, and the masterInfo which is information about the master + // itself. virtual void registered(SchedulerDriver*, const FrameworkID&, const MasterInfo&) @@ -86,67 +98,89 @@ class Rendler : public Scheduler cout << "Registered!" << endl; } + // Invoked when the scheduler re-registers with a newly elected Mesos + // master. This is only called when the scheduler has previously been + // registered. masterInfo contains information about the newly elected + // master. virtual void reregistered(SchedulerDriver*, const MasterInfo& masterInfo) {} virtual void disconnected(SchedulerDriver* driver) {} - virtual void resourceOffers(SchedulerDriver* driver, - const vector& offers) + size_t maxTasksForOffer(Offer offer) { - for (size_t i = 0; i < offers.size(); i++) { - const Offer& offer = offers[i]; - Resources remaining = offer.resources(); - - static Resources TASK_RESOURCES = Resources::parse( - "cpus:" + stringify(CPUS_PER_TASK) + - ";mem:" + stringify(MEM_PER_TASK)).get(); - - size_t maxTasks = 0; - while (TASK_RESOURCES <= remaining) { - maxTasks++; - remaining -= TASK_RESOURCES; - } + size_t count = 0; + Resources remaining = offer.resources(); + // + // TODO + // + return count; + } - // Launch tasks. - vector tasks; - for (size_t i = 0; i < maxTasks / 2 && crawlQueue.size() > 0; i++) { - string url = crawlQueue.front(); - crawlQueue.pop(); - string urlId = "C" + stringify(processed[url]); - TaskInfo task; - task.set_name("Crawler " + urlId); - task.mutable_task_id()->set_value(urlId); - task.mutable_slave_id()->MergeFrom(offer.slave_id()); - task.mutable_executor()->MergeFrom(crawler); - task.mutable_resources()->MergeFrom(TASK_RESOURCES); - task.set_data(url); - tasks.push_back(task); - tasksLaunched++; - cout << "Crawler " << urlId << " " << url << endl; - } - for (size_t i = maxTasks/2; i < maxTasks && renderQueue.size() > 0; i++) { - string url = renderQueue.front(); - renderQueue.pop(); - string urlId = "R" + stringify(processed[url]); - TaskInfo task; - task.set_name("Renderer " + urlId); - task.mutable_task_id()->set_value(urlId); - task.mutable_slave_id()->MergeFrom(offer.slave_id()); - task.mutable_executor()->MergeFrom(renderer); - task.mutable_resources()->MergeFrom(TASK_RESOURCES); - task.set_data(url); - tasks.push_back(task); - tasksLaunched++; - cout << "Renderer " << urlId << " " << url << endl; - } + TaskInfo makeTaskPrototype(Offer& offer, const ExecutorInfo& executor) + { + TaskInfo task; + task.mutable_slave_id()->MergeFrom(offer.slave_id()); + task.mutable_executor()->MergeFrom(executor); + task.mutable_resources()->MergeFrom(TASK_RESOURCES); + return task; + } - driver->launchTasks(offer.id(), tasks); - } + TaskInfo makeCrawlTask(Offer& offer) + { + string url = crawlQueue.front(); + crawlQueue.pop(); + + TaskInfo task = makeTaskPrototype(offer, crawler); + // + // TODO: Fill in task name, id, and data + // + return task; + } + + TaskInfo makeRenderTask(Offer& offer) + { + string url = renderQueue.front(); + renderQueue.pop(); + + TaskInfo task = makeTaskPrototype(offer, renderer); + // + // TODO: Fill in task name, id, and data + // + return task; + } + + // Invoked when resources have been offered to this framework. A single + // offer will only contain resources from a single slave. Resources + // associated with an offer will not be re-offered to _this_ framework + // until either (a) this framework has rejected those resources (see + // SchedulerDriver.launchTasks) or (b) those resources have been + // rescinded (see Scheduler.offerRescinded). Note that resources may be + // concurrently offered to more than one framework at a time (depending + // on the allocator being used). In that case, the first framework to + // launch tasks using those resources will be able to use them while the + // other frameworks will have those resources rescinded (or if a + // framework has already launched tasks with those resources then those + // tasks will fail with a TASK_LOST status and a message saying as much). + virtual void resourceOffers(SchedulerDriver* driver, + const vector& offers) + { + cout << "Received resource offer(s)" << endl; + // + // TODO: Launch tasks. + // } virtual void offerRescinded(SchedulerDriver* driver, const OfferID& offerId) {} + // Invoked when the status of a task has changed (e.g., a slave is lost + // and so the task is lost, a task finishes and an executor sends a + // status update saying so, etc.) Note that returning from this callback + // acknowledges receipt of this status update. If for whatever reason + // the scheduler aborts during this callback (or the process exits) + // another status update will be delivered. Note, however, that this is + // currently not true if the slave sending the status update is lost or + // fails during that time. virtual void statusUpdate(SchedulerDriver* driver, const TaskStatus& status) { if (status.state() == TASK_FINISHED) { @@ -154,19 +188,15 @@ class Rendler : public Scheduler tasksFinished++; } - if (tasksFinished == tasksLaunched && - crawlQueue.empty() && - renderQueue.empty()) { - // Wait to receive any pending framework messages - // If some framework messages are lost, it may hang indefinitely. - while (frameworkMessagesReceived != tasksFinished) { - sleep(1); - } - shutdown(); - driver->stop(); - } + // + // TODO: Terminate by calling shutdown() and driver->stop() if all running + // tasks have finished and there are no more queued tasks. + // } + // Invoked when an executor sends a message. These messages are best + // effort; do not expect a framework message to be retransmitted in any + // reliable fashion. virtual void frameworkMessage(SchedulerDriver* driver, const ExecutorID& executorId, const SlaveID& slaveId, @@ -178,33 +208,34 @@ class Rendler : public Scheduler if (executorId.value() == crawler.executor_id().value()) { cout << "Crawler msg received: " << taskId << endl; - - for (size_t i = 2; i < strVector.size(); i++) { - string& newURL = strVector[i]; - crawlResults[url].push_back(newURL); - if (processed.find(newURL) == processed.end()) { - processed[newURL] = nextUrlId++; - if (newURL.substr(0, baseUrl.length()) == baseUrl) { - crawlQueue.push(newURL); - } - renderQueue.push(newURL); - } - } + vector newURLs(strVector.begin() + 2, strVector.end()); + // + // TODO + // } else { - if (access(strVector[2].c_str(), R_OK) == 0) { - renderResults[url] = strVector[2]; - } + cout << "Renderer msg received: " << taskId << endl; + string path = strVector[2]; + // + // TODO + // } - frameworkMessagesReceived++; } + // Invoked when a slave has been determined unreachable (e.g., machine + // failure, network partition.) Most frameworks will need to reschedule + // any tasks launched on this slave on a new slave. virtual void slaveLost(SchedulerDriver* driver, const SlaveID& sid) {} + // Invoked when an executor has exited/terminated. Note that any tasks + // running will have TASK_LOST status updates automatically generated. virtual void executorLost(SchedulerDriver* driver, const ExecutorID& executorID, const SlaveID& slaveID, int status) {} + // Invoked when there is an unrecoverable error in the scheduler or + // scheduler driver. The driver will be aborted BEFORE invoking this + // callback. virtual void error(SchedulerDriver* driver, const string& message) { cout << message << endl; @@ -218,6 +249,7 @@ class Rendler : public Scheduler size_t tasksLaunched; size_t tasksFinished; size_t frameworkMessagesReceived; + Resources TASK_RESOURCES; }; static void shutdown() diff --git a/cpp/rendler_skeleton.cpp b/cpp/rendler_CHEAT.cpp similarity index 63% rename from cpp/rendler_skeleton.cpp rename to cpp/rendler_CHEAT.cpp index b5a4c91..9fffd0a 100644 --- a/cpp/rendler_skeleton.cpp +++ b/cpp/rendler_CHEAT.cpp @@ -16,9 +16,6 @@ * limitations under the License. */ -// See the Mesos Framework Development Guide: -// http://mesos.apache.org/documentation/latest/app-framework-development-guide - #include #include #include @@ -76,21 +73,12 @@ class Rendler : public Scheduler crawlQueue.push(seedUrl); renderQueue.push(seedUrl); processed[seedUrl] = nextUrlId++; - size_t lsp = seedUrl.find_last_of('/'); + size_t lsp = seedUrl.find_last_of('/'); // skip the http:// part baseUrl = seedUrl.substr(0, lsp); // No trailing slash - TASK_RESOURCES = Resources::parse( - "cpus:" + stringify(CPUS_PER_TASK) + - ";mem:" + stringify(MEM_PER_TASK)).get(); - } virtual ~Rendler() {} - - // Invoked when the scheduler successfully registers with a Mesos master. - // It is called with the frameworkId, a unique ID generated by the - // master, and the masterInfo which is information about the master - // itself. virtual void registered(SchedulerDriver*, const FrameworkID&, const MasterInfo&) @@ -98,89 +86,67 @@ class Rendler : public Scheduler cout << "Registered!" << endl; } - // Invoked when the scheduler re-registers with a newly elected Mesos - // master. This is only called when the scheduler has previously been - // registered. masterInfo contains information about the newly elected - // master. virtual void reregistered(SchedulerDriver*, const MasterInfo& masterInfo) {} virtual void disconnected(SchedulerDriver* driver) {} - size_t maxTasksForOffer(Offer offer) - { - size_t count = 0; - Resources remaining = offer.resources(); - // - // TODO - // - return count; - } - - TaskInfo makeTaskPrototype(Offer& offer, const ExecutorInfo& executor) - { - TaskInfo task; - task.mutable_slave_id()->MergeFrom(offer.slave_id()); - task.mutable_executor()->MergeFrom(executor); - task.mutable_resources()->MergeFrom(TASK_RESOURCES); - return task; - } - - TaskInfo makeCrawlTask(Offer& offer) - { - string url = crawlQueue.front(); - crawlQueue.pop(); - - TaskInfo task = makeTaskPrototype(offer, crawler); - // - // TODO: Fill in task name, id, and data - // - return task; - } - - TaskInfo makeRenderTask(Offer& offer) - { - string url = renderQueue.front(); - renderQueue.pop(); - - TaskInfo task = makeTaskPrototype(offer, renderer); - // - // TODO: Fill in task name, id, and data - // - return task; - } - - // Invoked when resources have been offered to this framework. A single - // offer will only contain resources from a single slave. Resources - // associated with an offer will not be re-offered to _this_ framework - // until either (a) this framework has rejected those resources (see - // SchedulerDriver.launchTasks) or (b) those resources have been - // rescinded (see Scheduler.offerRescinded). Note that resources may be - // concurrently offered to more than one framework at a time (depending - // on the allocator being used). In that case, the first framework to - // launch tasks using those resources will be able to use them while the - // other frameworks will have those resources rescinded (or if a - // framework has already launched tasks with those resources then those - // tasks will fail with a TASK_LOST status and a message saying as much). virtual void resourceOffers(SchedulerDriver* driver, const vector& offers) { - cout << "Received resource offer(s)" << endl; - // - // TODO: Launch tasks. - // + for (size_t i = 0; i < offers.size(); i++) { + const Offer& offer = offers[i]; + Resources remaining = offer.resources(); + + static Resources TASK_RESOURCES = Resources::parse( + "cpus:" + stringify(CPUS_PER_TASK) + + ";mem:" + stringify(MEM_PER_TASK)).get(); + + size_t maxTasks = 0; + while (TASK_RESOURCES <= remaining) { + maxTasks++; + remaining -= TASK_RESOURCES; + } + + // Launch tasks. + vector tasks; + for (size_t i = 0; i < maxTasks / 2 && crawlQueue.size() > 0; i++) { + string url = crawlQueue.front(); + crawlQueue.pop(); + string urlId = "C" + stringify(processed[url]); + TaskInfo task; + task.set_name("Crawler " + urlId); + task.mutable_task_id()->set_value(urlId); + task.mutable_slave_id()->MergeFrom(offer.slave_id()); + task.mutable_executor()->MergeFrom(crawler); + task.mutable_resources()->MergeFrom(TASK_RESOURCES); + task.set_data(url); + tasks.push_back(task); + tasksLaunched++; + cout << "Crawler " << urlId << " " << url << endl; + } + for (size_t i = maxTasks/2; i < maxTasks && renderQueue.size() > 0; i++) { + string url = renderQueue.front(); + renderQueue.pop(); + string urlId = "R" + stringify(processed[url]); + TaskInfo task; + task.set_name("Renderer " + urlId); + task.mutable_task_id()->set_value(urlId); + task.mutable_slave_id()->MergeFrom(offer.slave_id()); + task.mutable_executor()->MergeFrom(renderer); + task.mutable_resources()->MergeFrom(TASK_RESOURCES); + task.set_data(url); + tasks.push_back(task); + tasksLaunched++; + cout << "Renderer " << urlId << " " << url << endl; + } + + driver->launchTasks(offer.id(), tasks); + } } virtual void offerRescinded(SchedulerDriver* driver, const OfferID& offerId) {} - // Invoked when the status of a task has changed (e.g., a slave is lost - // and so the task is lost, a task finishes and an executor sends a - // status update saying so, etc.) Note that returning from this callback - // acknowledges receipt of this status update. If for whatever reason - // the scheduler aborts during this callback (or the process exits) - // another status update will be delivered. Note, however, that this is - // currently not true if the slave sending the status update is lost or - // fails during that time. virtual void statusUpdate(SchedulerDriver* driver, const TaskStatus& status) { if (status.state() == TASK_FINISHED) { @@ -188,15 +154,19 @@ class Rendler : public Scheduler tasksFinished++; } - // - // TODO: Terminate by calling shutdown() and driver->stop() if all running - // tasks have finished and there are no more queued tasks. - // + if (tasksFinished == tasksLaunched && + crawlQueue.empty() && + renderQueue.empty()) { + // Wait to receive any pending framework messages + // If some framework messages are lost, it may hang indefinitely. + while (frameworkMessagesReceived != tasksFinished) { + sleep(1); + } + shutdown(); + driver->stop(); + } } - // Invoked when an executor sends a message. These messages are best - // effort; do not expect a framework message to be retransmitted in any - // reliable fashion. virtual void frameworkMessage(SchedulerDriver* driver, const ExecutorID& executorId, const SlaveID& slaveId, @@ -208,34 +178,33 @@ class Rendler : public Scheduler if (executorId.value() == crawler.executor_id().value()) { cout << "Crawler msg received: " << taskId << endl; - vector newURLs(strVector.begin() + 2, strVector.end()); - // - // TODO - // + + for (size_t i = 2; i < strVector.size(); i++) { + string& newURL = strVector[i]; + crawlResults[url].push_back(newURL); + if (processed.find(newURL) == processed.end()) { + processed[newURL] = nextUrlId++; + if (newURL.substr(0, baseUrl.length()) == baseUrl) { + crawlQueue.push(newURL); + } + renderQueue.push(newURL); + } + } } else { - cout << "Renderer msg received: " << taskId << endl; - string path = strVector[2]; - // - // TODO - // + if (access(strVector[2].c_str(), R_OK) == 0) { + renderResults[url] = strVector[2]; + } } + frameworkMessagesReceived++; } - // Invoked when a slave has been determined unreachable (e.g., machine - // failure, network partition.) Most frameworks will need to reschedule - // any tasks launched on this slave on a new slave. virtual void slaveLost(SchedulerDriver* driver, const SlaveID& sid) {} - // Invoked when an executor has exited/terminated. Note that any tasks - // running will have TASK_LOST status updates automatically generated. virtual void executorLost(SchedulerDriver* driver, const ExecutorID& executorID, const SlaveID& slaveID, int status) {} - // Invoked when there is an unrecoverable error in the scheduler or - // scheduler driver. The driver will be aborted BEFORE invoking this - // callback. virtual void error(SchedulerDriver* driver, const string& message) { cout << message << endl; @@ -249,7 +218,6 @@ class Rendler : public Scheduler size_t tasksLaunched; size_t tasksFinished; size_t frameworkMessagesReceived; - Resources TASK_RESOURCES; }; static void shutdown() diff --git a/go/src/github.com/mesosphere/rendler/scheduler/main.go b/go/src/github.com/mesosphere/rendler/scheduler/main.go index 0127721..5f3ec64 100644 --- a/go/src/github.com/mesosphere/rendler/scheduler/main.go +++ b/go/src/github.com/mesosphere/rendler/scheduler/main.go @@ -11,20 +11,34 @@ import ( "log" "os" "os/signal" - "time" "path/filepath" ) const TASK_CPUS = 0.1 const TASK_MEM = 32.0 -const SHUTDOWN_TIMEOUT = 30 // in seconds +// See the Mesos Framework Development Guide: +// http://mesos.apache.org/documentation/latest/app-framework-development-guide +// +// Scheduler, scheduler driver, executor, and executor driver definitions: +// https://github.com/apache/mesos/blob/master/src/python/src/mesos.py +// https://github.com/apache/mesos/blob/master/include/mesos/scheduler.hpp +// +// Mesos protocol buffer definitions for Python: +// https://github.com/mesosphere/deimos/blob/master/deimos/mesos_pb2.py +// https://github.com/apache/mesos/blob/master/include/mesos/mesos.proto +// +// NOTE: Feel free to strip out "_ = variable" stubs. They are in place to +// silence the Go compiler. func main() { crawlQueue := list.New() // list of string renderQueue := list.New() // list of string + _ = renderQueue processedURLs := list.New() // list of string - crawlResults := list.New() // list of CrawlEdge + _ = processedURLs + + crawlResults := list.New() // list of CrawlEdge renderResults := make(map[string]string) seedUrl := flag.String("seed", "http://mesosphere.io", "The first URL to crawl") @@ -38,7 +52,21 @@ func main() { tasksCreated := 0 tasksRunning := 0 - shuttingDown := false + + // TODO(nnielsen): based on `tasksRunning`, do + // graceful shutdown of framework (allow ongoing render tasks to + // finish). + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt, os.Kill) + go func(c chan os.Signal) { + s := <-c + fmt.Println("Got signal:", s) + + if s == os.Interrupt { + rendler.WriteDOTFile(crawlResults, renderResults) + } + os.Exit(1) + }(c) crawlCommand := "python crawl_executor.py" renderCommand := "python render_executor.py" @@ -86,25 +114,32 @@ func main() { makeCrawlTask := func(url string, offer mesos.Offer) *mesos.TaskInfo { task := makeTaskPrototype(offer) task.Name = proto.String("CRAWL_" + *task.TaskId.Value) - task.Executor = crawlExecutor - task.Data = []byte(url) + // + // TODO + // return task } + _ = makeCrawlTask makeRenderTask := func(url string, offer mesos.Offer) *mesos.TaskInfo { task := makeTaskPrototype(offer) task.Name = proto.String("RENDER_" + *task.TaskId.Value) - task.Executor = renderExecutor - task.Data = []byte(url) + // + // TODO + // return task } + _ = makeRenderTask maxTasksForOffer := func(offer mesos.Offer) int { // TODO(nnielsen): Parse offer resources. count := 0 var cpus float64 = 0 + _ = cpus + var mem float64 = 0 + _ = mem for _, resource := range offer.Resources { if resource.GetName() == "cpus" { @@ -116,14 +151,13 @@ func main() { } } - for cpus >= TASK_CPUS && mem >= TASK_MEM { - count++ - cpus -= TASK_CPUS - mem -= TASK_MEM - } + // + // TODO + // return count } + _ = maxTasksForOffer printQueueStatistics := func() { // TODO(nnielsen): Print queue lengths. @@ -148,36 +182,9 @@ func main() { ResourceOffers: func(driver *mesos.SchedulerDriver, offers []mesos.Offer) { printQueueStatistics() - for _, offer := range offers { - if shuttingDown { - fmt.Println("Shutting down: declining offer on [", offer.Hostname, "]") - driver.DeclineOffer(offer.Id) - continue - } - - tasks := []mesos.TaskInfo{} - - for i := 0; i < maxTasksForOffer(offer)/2; i++ { - if crawlQueue.Front() != nil { - url := crawlQueue.Front().Value.(string) - crawlQueue.Remove(crawlQueue.Front()) - task := makeCrawlTask(url, offer) - tasks = append(tasks, *task) - } - if renderQueue.Front() != nil { - url := renderQueue.Front().Value.(string) - renderQueue.Remove(renderQueue.Front()) - task := makeRenderTask(url, offer) - tasks = append(tasks, *task) - } - } - - if len(tasks) == 0 { - driver.DeclineOffer(offer.Id) - } else { - driver.LaunchTasks(offer.Id, tasks) - } - } + // + // TODO + // }, StatusUpdate: func(driver *mesos.SchedulerDriver, status mesos.TaskStatus) { @@ -204,26 +211,9 @@ func main() { if err != nil { log.Printf("Error deserializing CrawlResult: [%s]", err) } else { - for _, link := range result.Links { - edge := rendler.Edge{From: result.URL, To: link} - log.Printf("Appending [%s] to crawl results", edge) - crawlResults.PushBack(edge) - - alreadyProcessed := false - for e := processedURLs.Front(); e != nil && !alreadyProcessed; e = e.Next() { - processedURL := e.Value.(string) - if link == processedURL { - alreadyProcessed = true - } - } - - if !alreadyProcessed { - log.Printf("Enqueueing [%s]", link) - crawlQueue.PushBack(link) - renderQueue.PushBack(link) - processedURLs.PushBack(link) - } - } + // + // TODO + // } case *renderExecutor.ExecutorId.Value: @@ -233,10 +223,9 @@ func main() { if err != nil { log.Printf("Error deserializing RenderResult: [%s]", err) } else { - log.Printf( - "Appending [%s] to render results", - rendler.Edge{From: result.URL, To: result.ImageURL}) - renderResults[result.URL] = result.ImageURL + // + // TODO + // } default: @@ -246,36 +235,12 @@ func main() { }, } - c := make(chan os.Signal, 1) - signal.Notify(c, os.Interrupt, os.Kill) - go func(c chan os.Signal) { - s := <-c - fmt.Println("Got signal:", s) - - if s == os.Interrupt { - fmt.Println("RENDLER is shutting down") - shuttingDown = true - wait_started := time.Now() - for tasksRunning > 0 && SHUTDOWN_TIMEOUT > int(time.Since(wait_started).Seconds()) { - time.Sleep(time.Second) - } - - if tasksRunning > 0 { - fmt.Println("Shutdown by timeout,", tasksRunning, "task(s) have not completed") - } - - driver.Stop(false) - } - }(c) - driver.Init() defer driver.Destroy() driver.Start() driver.Join() driver.Stop(false) - rendler.WriteDOTFile(crawlResults, renderResults) - os.Exit(0) } func executorURIs() []*mesos.CommandInfo_URI { @@ -293,10 +258,10 @@ func executorURIs() []*mesos.CommandInfo_URI { } return []*mesos.CommandInfo_URI{ + pathToURI(baseURI+"crawl_executor.py", false), pathToURI(baseURI+"render.js", false), - pathToURI(baseURI+"python/crawl_executor.py", false), - pathToURI(baseURI+"python/render_executor.py", false), - pathToURI(baseURI+"python/results.py", false), - pathToURI(baseURI+"python/task_state.py", false), + pathToURI(baseURI+"render_executor.py", false), + pathToURI(baseURI+"results.py", false), + pathToURI(baseURI+"task_state.py", false), } } diff --git a/go/src/github.com/mesosphere/rendler/scheduler_skeleton/main.go b/go/src/github.com/mesosphere/rendler/scheduler_CHEAT/main.go similarity index 66% rename from go/src/github.com/mesosphere/rendler/scheduler_skeleton/main.go rename to go/src/github.com/mesosphere/rendler/scheduler_CHEAT/main.go index 5f3ec64..0127721 100644 --- a/go/src/github.com/mesosphere/rendler/scheduler_skeleton/main.go +++ b/go/src/github.com/mesosphere/rendler/scheduler_CHEAT/main.go @@ -11,34 +11,20 @@ import ( "log" "os" "os/signal" + "time" "path/filepath" ) const TASK_CPUS = 0.1 const TASK_MEM = 32.0 +const SHUTDOWN_TIMEOUT = 30 // in seconds -// See the Mesos Framework Development Guide: -// http://mesos.apache.org/documentation/latest/app-framework-development-guide -// -// Scheduler, scheduler driver, executor, and executor driver definitions: -// https://github.com/apache/mesos/blob/master/src/python/src/mesos.py -// https://github.com/apache/mesos/blob/master/include/mesos/scheduler.hpp -// -// Mesos protocol buffer definitions for Python: -// https://github.com/mesosphere/deimos/blob/master/deimos/mesos_pb2.py -// https://github.com/apache/mesos/blob/master/include/mesos/mesos.proto -// -// NOTE: Feel free to strip out "_ = variable" stubs. They are in place to -// silence the Go compiler. func main() { crawlQueue := list.New() // list of string renderQueue := list.New() // list of string - _ = renderQueue processedURLs := list.New() // list of string - _ = processedURLs - - crawlResults := list.New() // list of CrawlEdge + crawlResults := list.New() // list of CrawlEdge renderResults := make(map[string]string) seedUrl := flag.String("seed", "http://mesosphere.io", "The first URL to crawl") @@ -52,21 +38,7 @@ func main() { tasksCreated := 0 tasksRunning := 0 - - // TODO(nnielsen): based on `tasksRunning`, do - // graceful shutdown of framework (allow ongoing render tasks to - // finish). - c := make(chan os.Signal, 1) - signal.Notify(c, os.Interrupt, os.Kill) - go func(c chan os.Signal) { - s := <-c - fmt.Println("Got signal:", s) - - if s == os.Interrupt { - rendler.WriteDOTFile(crawlResults, renderResults) - } - os.Exit(1) - }(c) + shuttingDown := false crawlCommand := "python crawl_executor.py" renderCommand := "python render_executor.py" @@ -114,32 +86,25 @@ func main() { makeCrawlTask := func(url string, offer mesos.Offer) *mesos.TaskInfo { task := makeTaskPrototype(offer) task.Name = proto.String("CRAWL_" + *task.TaskId.Value) - // - // TODO - // + task.Executor = crawlExecutor + task.Data = []byte(url) return task } - _ = makeCrawlTask makeRenderTask := func(url string, offer mesos.Offer) *mesos.TaskInfo { task := makeTaskPrototype(offer) task.Name = proto.String("RENDER_" + *task.TaskId.Value) - // - // TODO - // + task.Executor = renderExecutor + task.Data = []byte(url) return task } - _ = makeRenderTask maxTasksForOffer := func(offer mesos.Offer) int { // TODO(nnielsen): Parse offer resources. count := 0 var cpus float64 = 0 - _ = cpus - var mem float64 = 0 - _ = mem for _, resource := range offer.Resources { if resource.GetName() == "cpus" { @@ -151,13 +116,14 @@ func main() { } } - // - // TODO - // + for cpus >= TASK_CPUS && mem >= TASK_MEM { + count++ + cpus -= TASK_CPUS + mem -= TASK_MEM + } return count } - _ = maxTasksForOffer printQueueStatistics := func() { // TODO(nnielsen): Print queue lengths. @@ -182,9 +148,36 @@ func main() { ResourceOffers: func(driver *mesos.SchedulerDriver, offers []mesos.Offer) { printQueueStatistics() - // - // TODO - // + for _, offer := range offers { + if shuttingDown { + fmt.Println("Shutting down: declining offer on [", offer.Hostname, "]") + driver.DeclineOffer(offer.Id) + continue + } + + tasks := []mesos.TaskInfo{} + + for i := 0; i < maxTasksForOffer(offer)/2; i++ { + if crawlQueue.Front() != nil { + url := crawlQueue.Front().Value.(string) + crawlQueue.Remove(crawlQueue.Front()) + task := makeCrawlTask(url, offer) + tasks = append(tasks, *task) + } + if renderQueue.Front() != nil { + url := renderQueue.Front().Value.(string) + renderQueue.Remove(renderQueue.Front()) + task := makeRenderTask(url, offer) + tasks = append(tasks, *task) + } + } + + if len(tasks) == 0 { + driver.DeclineOffer(offer.Id) + } else { + driver.LaunchTasks(offer.Id, tasks) + } + } }, StatusUpdate: func(driver *mesos.SchedulerDriver, status mesos.TaskStatus) { @@ -211,9 +204,26 @@ func main() { if err != nil { log.Printf("Error deserializing CrawlResult: [%s]", err) } else { - // - // TODO - // + for _, link := range result.Links { + edge := rendler.Edge{From: result.URL, To: link} + log.Printf("Appending [%s] to crawl results", edge) + crawlResults.PushBack(edge) + + alreadyProcessed := false + for e := processedURLs.Front(); e != nil && !alreadyProcessed; e = e.Next() { + processedURL := e.Value.(string) + if link == processedURL { + alreadyProcessed = true + } + } + + if !alreadyProcessed { + log.Printf("Enqueueing [%s]", link) + crawlQueue.PushBack(link) + renderQueue.PushBack(link) + processedURLs.PushBack(link) + } + } } case *renderExecutor.ExecutorId.Value: @@ -223,9 +233,10 @@ func main() { if err != nil { log.Printf("Error deserializing RenderResult: [%s]", err) } else { - // - // TODO - // + log.Printf( + "Appending [%s] to render results", + rendler.Edge{From: result.URL, To: result.ImageURL}) + renderResults[result.URL] = result.ImageURL } default: @@ -235,12 +246,36 @@ func main() { }, } + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt, os.Kill) + go func(c chan os.Signal) { + s := <-c + fmt.Println("Got signal:", s) + + if s == os.Interrupt { + fmt.Println("RENDLER is shutting down") + shuttingDown = true + wait_started := time.Now() + for tasksRunning > 0 && SHUTDOWN_TIMEOUT > int(time.Since(wait_started).Seconds()) { + time.Sleep(time.Second) + } + + if tasksRunning > 0 { + fmt.Println("Shutdown by timeout,", tasksRunning, "task(s) have not completed") + } + + driver.Stop(false) + } + }(c) + driver.Init() defer driver.Destroy() driver.Start() driver.Join() driver.Stop(false) + rendler.WriteDOTFile(crawlResults, renderResults) + os.Exit(0) } func executorURIs() []*mesos.CommandInfo_URI { @@ -258,10 +293,10 @@ func executorURIs() []*mesos.CommandInfo_URI { } return []*mesos.CommandInfo_URI{ - pathToURI(baseURI+"crawl_executor.py", false), pathToURI(baseURI+"render.js", false), - pathToURI(baseURI+"render_executor.py", false), - pathToURI(baseURI+"results.py", false), - pathToURI(baseURI+"task_state.py", false), + pathToURI(baseURI+"python/crawl_executor.py", false), + pathToURI(baseURI+"python/render_executor.py", false), + pathToURI(baseURI+"python/results.py", false), + pathToURI(baseURI+"python/task_state.py", false), } } diff --git a/python/rendler.py b/python/rendler.py index 3b87b8b..6b5ceb5 100644 --- a/python/rendler.py +++ b/python/rendler.py @@ -27,12 +27,17 @@ LEADING_ZEROS_COUNT = 5 # appended to task ID to facilitate lexicographical order TASK_ATTEMPTS = 5 # how many times a task is attempted -CRAWLER_TASK_SUFFIX = "-crwl" -RENDER_TASK_SUFFIX = "-rndr" - # See the Mesos Framework Development Guide: # http://mesos.apache.org/documentation/latest/app-framework-development-guide - +# +# Scheduler, scheduler driver, executor, and executor driver definitions: +# https://github.com/apache/mesos/blob/master/src/python/src/mesos.py +# https://github.com/apache/mesos/blob/master/include/mesos/scheduler.hpp +# +# Mesos protocol buffer definitions for Python: +# https://github.com/mesosphere/deimos/blob/master/deimos/mesos_pb2.py +# https://github.com/apache/mesos/blob/master/include/mesos/mesos.proto +# class RenderingCrawler(Scheduler): def __init__(self, seedUrl, maxRenderTasks, crawlExecutor, renderExecutor): print "RENDLER" @@ -55,39 +60,62 @@ def __init__(self, seedUrl, maxRenderTasks, crawlExecutor, renderExecutor): self.shuttingDown = False def registered(self, driver, frameworkId, masterInfo): + """ + Invoked when the scheduler successfully registers with a Mesos master. + It is called with the frameworkId, a unique ID generated by the + master, and the masterInfo which is information about the master + itself. + """ print "Registered with framework ID [%s]" % frameworkId.value - def makeTaskPrototype(self, offer): - task = mesos_pb2.TaskInfo() - tid = self.tasksCreated - self.tasksCreated += 1 - task.task_id.value = str(tid).zfill(LEADING_ZEROS_COUNT) - task.slave_id.value = offer.slave_id.value - cpus = task.resources.add() - cpus.name = "cpus" - cpus.type = mesos_pb2.Value.SCALAR - cpus.scalar.value = TASK_CPUS - mem = task.resources.add() - mem.name = "mem" - mem.type = mesos_pb2.Value.SCALAR - mem.scalar.value = TASK_MEM - return task + def reregistered(self, driver, masterInfo): + """ + Invoked when the scheduler re-registers with a newly elected Mesos + master. This is only called when the scheduler has previously been + registered. masterInfo contains information about the newly elected + master. + """ + print "Re-registered with Mesos master" + + def disconnected(self, driver): + """ + Invoked when the scheduler becomes disconnected from the master, e.g. + the master fails and another is taking over. + """ + pass + +def makeTaskPrototype(self, offer): + task = mesos_pb2.TaskInfo() + tid = self.tasksCreated + self.tasksCreated += 1 + task.task_id.value = str(tid).zfill(LEADING_ZEROS_COUNT) + task.slave_id.value = offer.slave_id.value + cpus = task.resources.add() + cpus.name = "cpus" + cpus.type = mesos_pb2.Value.SCALAR + cpus.scalar.value = TASK_CPUS + mem = task.resources.add() + mem.name = "mem" + mem.type = mesos_pb2.Value.SCALAR + mem.scalar.value = TASK_MEM + return task def makeCrawlTask(self, url, offer): task = self.makeTaskPrototype(offer) task.name = "crawl task %s" % task.task_id.value - task.task_id.value += CRAWLER_TASK_SUFFIX - task.executor.MergeFrom(self.crawlExecutor) - task.data = str(url) - return task + # + # TODO + # + # + pass def makeRenderTask(self, url, offer): task = self.makeTaskPrototype(offer) task.name = "render task %s" % task.task_id.value - task.task_id.value += RENDER_TASK_SUFFIX - task.executor.MergeFrom(self.renderExecutor) - task.data = str(url) - return task + # + # TODO + # + pass def retryTask(self, task_id, url): if not url in self.tasksRetrying: @@ -100,8 +128,6 @@ def retryTask(self, task_id, url): print "%s try for \"%s\"" % \ (ordinal(self.tasksRetrying[url]), url) - # TODO(alex): replace this by checking TaskStatus.executor_id, - # which is available in mesos 0.20 if task_id.endswith(CRAWLER_TASK_SUFFIX): self.crawlQueue.append(url) elif task_id.endswith(RENDER_TASK_SUFFIX): @@ -119,89 +145,100 @@ def maxTasksForOffer(self, offer): count = 0 cpus = next(rsc.scalar.value for rsc in offer.resources if rsc.name == "cpus") mem = next(rsc.scalar.value for rsc in offer.resources if rsc.name == "mem") - while cpus >= TASK_CPUS and mem >= TASK_MEM: - count += 1 - cpus -= TASK_CPUS - mem -= TASK_MEM - return count + # + # TODO + # + pass def resourceOffers(self, driver, offers): + """ + Invoked when resources have been offered to this framework. A single + offer will only contain resources from a single slave. Resources + associated with an offer will not be re-offered to _this_ framework + until either (a) this framework has rejected those resources (see + SchedulerDriver.launchTasks) or (b) those resources have been + rescinded (see Scheduler.offerRescinded). Note that resources may be + concurrently offered to more than one framework at a time (depending + on the allocator being used). In that case, the first framework to + launch tasks using those resources will be able to use them while the + other frameworks will have those resources rescinded (or if a + framework has already launched tasks with those resources then those + tasks will fail with a TASK_LOST status and a message saying as much). + """ self.printStatistics() - - if not self.crawlQueue and not self.renderQueue and self.tasksRunning <= 0: - print "Nothing to do, RENDLER is shutting down" - hard_shutdown() - - for offer in offers: - print "Got resource offer [%s]" % offer.id.value - - if self.shuttingDown: - print "Shutting down: declining offer on [%s]" % offer.hostname - driver.declineOffer(offer.id) - continue - - maxTasks = self.maxTasksForOffer(offer) - - print "maxTasksForOffer: [%d]" % maxTasks - - tasks = [] - - for i in range(maxTasks / 2): - if self.crawlQueue: - crawlTaskUrl = self.crawlQueue.popleft() - task = self.makeCrawlTask(crawlTaskUrl, offer) - tasks.append(task) - if self.renderQueue: - renderTaskUrl = self.renderQueue.popleft() - task = self.makeRenderTask(renderTaskUrl, offer) - tasks.append(task) - - if tasks: - print "Accepting offer on [%s]" % offer.hostname - driver.launchTasks(offer.id, tasks) - else: - print "Declining offer on [%s]" % offer.hostname - driver.declineOffer(offer.id) + print "Received resource offer(s)" + # + # TODO + # + pass + + def offerRescinded(self, driver, offerId): + """ + Invoked when an offer is no longer valid (e.g., the slave was lost or + another framework used resources in the offer.) If for whatever reason + an offer is never rescinded (e.g., dropped message, failing over + framework, etc.), a framwork that attempts to launch tasks using an + invalid offer will receive TASK_LOST status updats for those tasks + (see Scheduler.resourceOffers). + """ + pass def statusUpdate(self, driver, update): + """ + Invoked when the status of a task has changed (e.g., a slave is lost + and so the task is lost, a task finishes and an executor sends a + status update saying so, etc.) Note that returning from this callback + acknowledges receipt of this status update. If for whatever reason + the scheduler aborts during this callback (or the process exits) + another status update will be delivered. Note, however, that this is + currently not true if the slave sending the status update is lost or + fails during that time. + """ stateName = task_state.nameFor[update.state] print "Task [%s] is in state [%s]" % (update.task_id.value, stateName) - if update.state == 1: # Running - self.tasksRunning += 1 - - elif update.state == 3: # Failed, retry - print "Task [%s] failed with message \"%s\"" \ - % (update.task_id.value, update.message) - self.tasksRunning -= 1 - self.retryTask(update.task_id.value, update.data) - - elif self.tasksRunning > 0 and update.state > 1: # Terminal state - self.tasksRunning -= 1 - def frameworkMessage(self, driver, executorId, slaveId, message): + """ + Invoked when an executor sends a message. These messages are best + effort; do not expect a framework message to be retransmitted in any + reliable fashion. + """ o = json.loads(message) if executorId.value == crawlExecutor.executor_id.value: result = results.CrawlResult(o['taskId'], o['url'], o['links']) - for link in result.links: - edge = (result.url, link) - print "Appending [%s] to crawl results" % repr(edge) - self.crawlResults.add(edge) - if not self.renderLimitReached and self.maxRenderTasks > 0 and \ - self.maxRenderTasks <= len(self.processedURLs): - print "Render task limit (%d) reached" % self.maxRenderTasks - self.renderLimitReached = True - if not link in self.processedURLs and not self.renderLimitReached: - print "Enqueueing [%s]" % link - self.crawlQueue.append(link) - self.renderQueue.append(link) - self.processedURLs.add(link) + # + # TODO + # elif executorId.value == renderExecutor.executor_id.value: result = results.RenderResult(o['taskId'], o['url'], o['imageUrl']) - print "Appending [%s] to render results" % repr((result.url, result.imageUrl)) - self.renderResults[result.url] = result.imageUrl + # + # TODO + # + + def slaveLost(self, driver, slaveId): + """ + Invoked when a slave has been determined unreachable (e.g., machine + failure, network partition.) Most frameworks will need to reschedule + any tasks launched on this slave on a new slave. + """ + pass + + def executorLost(self, driver, executorId, slaveId, status): + """ + Invoked when an executor has exited/terminated. Note that any tasks + running will have TASK_LOST status updates automatically generated. + """ + pass + + def error(self, driver, message): + """ + Invoked when there is an unrecoverable error in the scheduler or + scheduler driver. The driver will be aborted BEFORE invoking this + callback. + """ + print "Error from Mesos: %s " % message def hard_shutdown(): driver.stop() diff --git a/python/rendler_skeleton.py b/python/rendler_CHEAT.py similarity index 55% rename from python/rendler_skeleton.py rename to python/rendler_CHEAT.py index 6b5ceb5..3b87b8b 100644 --- a/python/rendler_skeleton.py +++ b/python/rendler_CHEAT.py @@ -27,17 +27,12 @@ LEADING_ZEROS_COUNT = 5 # appended to task ID to facilitate lexicographical order TASK_ATTEMPTS = 5 # how many times a task is attempted +CRAWLER_TASK_SUFFIX = "-crwl" +RENDER_TASK_SUFFIX = "-rndr" + # See the Mesos Framework Development Guide: # http://mesos.apache.org/documentation/latest/app-framework-development-guide -# -# Scheduler, scheduler driver, executor, and executor driver definitions: -# https://github.com/apache/mesos/blob/master/src/python/src/mesos.py -# https://github.com/apache/mesos/blob/master/include/mesos/scheduler.hpp -# -# Mesos protocol buffer definitions for Python: -# https://github.com/mesosphere/deimos/blob/master/deimos/mesos_pb2.py -# https://github.com/apache/mesos/blob/master/include/mesos/mesos.proto -# + class RenderingCrawler(Scheduler): def __init__(self, seedUrl, maxRenderTasks, crawlExecutor, renderExecutor): print "RENDLER" @@ -60,62 +55,39 @@ def __init__(self, seedUrl, maxRenderTasks, crawlExecutor, renderExecutor): self.shuttingDown = False def registered(self, driver, frameworkId, masterInfo): - """ - Invoked when the scheduler successfully registers with a Mesos master. - It is called with the frameworkId, a unique ID generated by the - master, and the masterInfo which is information about the master - itself. - """ print "Registered with framework ID [%s]" % frameworkId.value - def reregistered(self, driver, masterInfo): - """ - Invoked when the scheduler re-registers with a newly elected Mesos - master. This is only called when the scheduler has previously been - registered. masterInfo contains information about the newly elected - master. - """ - print "Re-registered with Mesos master" - - def disconnected(self, driver): - """ - Invoked when the scheduler becomes disconnected from the master, e.g. - the master fails and another is taking over. - """ - pass - -def makeTaskPrototype(self, offer): - task = mesos_pb2.TaskInfo() - tid = self.tasksCreated - self.tasksCreated += 1 - task.task_id.value = str(tid).zfill(LEADING_ZEROS_COUNT) - task.slave_id.value = offer.slave_id.value - cpus = task.resources.add() - cpus.name = "cpus" - cpus.type = mesos_pb2.Value.SCALAR - cpus.scalar.value = TASK_CPUS - mem = task.resources.add() - mem.name = "mem" - mem.type = mesos_pb2.Value.SCALAR - mem.scalar.value = TASK_MEM - return task + def makeTaskPrototype(self, offer): + task = mesos_pb2.TaskInfo() + tid = self.tasksCreated + self.tasksCreated += 1 + task.task_id.value = str(tid).zfill(LEADING_ZEROS_COUNT) + task.slave_id.value = offer.slave_id.value + cpus = task.resources.add() + cpus.name = "cpus" + cpus.type = mesos_pb2.Value.SCALAR + cpus.scalar.value = TASK_CPUS + mem = task.resources.add() + mem.name = "mem" + mem.type = mesos_pb2.Value.SCALAR + mem.scalar.value = TASK_MEM + return task def makeCrawlTask(self, url, offer): task = self.makeTaskPrototype(offer) task.name = "crawl task %s" % task.task_id.value - # - # TODO - # - # - pass + task.task_id.value += CRAWLER_TASK_SUFFIX + task.executor.MergeFrom(self.crawlExecutor) + task.data = str(url) + return task def makeRenderTask(self, url, offer): task = self.makeTaskPrototype(offer) task.name = "render task %s" % task.task_id.value - # - # TODO - # - pass + task.task_id.value += RENDER_TASK_SUFFIX + task.executor.MergeFrom(self.renderExecutor) + task.data = str(url) + return task def retryTask(self, task_id, url): if not url in self.tasksRetrying: @@ -128,6 +100,8 @@ def retryTask(self, task_id, url): print "%s try for \"%s\"" % \ (ordinal(self.tasksRetrying[url]), url) + # TODO(alex): replace this by checking TaskStatus.executor_id, + # which is available in mesos 0.20 if task_id.endswith(CRAWLER_TASK_SUFFIX): self.crawlQueue.append(url) elif task_id.endswith(RENDER_TASK_SUFFIX): @@ -145,100 +119,89 @@ def maxTasksForOffer(self, offer): count = 0 cpus = next(rsc.scalar.value for rsc in offer.resources if rsc.name == "cpus") mem = next(rsc.scalar.value for rsc in offer.resources if rsc.name == "mem") - # - # TODO - # - pass + while cpus >= TASK_CPUS and mem >= TASK_MEM: + count += 1 + cpus -= TASK_CPUS + mem -= TASK_MEM + return count def resourceOffers(self, driver, offers): - """ - Invoked when resources have been offered to this framework. A single - offer will only contain resources from a single slave. Resources - associated with an offer will not be re-offered to _this_ framework - until either (a) this framework has rejected those resources (see - SchedulerDriver.launchTasks) or (b) those resources have been - rescinded (see Scheduler.offerRescinded). Note that resources may be - concurrently offered to more than one framework at a time (depending - on the allocator being used). In that case, the first framework to - launch tasks using those resources will be able to use them while the - other frameworks will have those resources rescinded (or if a - framework has already launched tasks with those resources then those - tasks will fail with a TASK_LOST status and a message saying as much). - """ self.printStatistics() - print "Received resource offer(s)" - # - # TODO - # - pass - - def offerRescinded(self, driver, offerId): - """ - Invoked when an offer is no longer valid (e.g., the slave was lost or - another framework used resources in the offer.) If for whatever reason - an offer is never rescinded (e.g., dropped message, failing over - framework, etc.), a framwork that attempts to launch tasks using an - invalid offer will receive TASK_LOST status updats for those tasks - (see Scheduler.resourceOffers). - """ - pass + + if not self.crawlQueue and not self.renderQueue and self.tasksRunning <= 0: + print "Nothing to do, RENDLER is shutting down" + hard_shutdown() + + for offer in offers: + print "Got resource offer [%s]" % offer.id.value + + if self.shuttingDown: + print "Shutting down: declining offer on [%s]" % offer.hostname + driver.declineOffer(offer.id) + continue + + maxTasks = self.maxTasksForOffer(offer) + + print "maxTasksForOffer: [%d]" % maxTasks + + tasks = [] + + for i in range(maxTasks / 2): + if self.crawlQueue: + crawlTaskUrl = self.crawlQueue.popleft() + task = self.makeCrawlTask(crawlTaskUrl, offer) + tasks.append(task) + if self.renderQueue: + renderTaskUrl = self.renderQueue.popleft() + task = self.makeRenderTask(renderTaskUrl, offer) + tasks.append(task) + + if tasks: + print "Accepting offer on [%s]" % offer.hostname + driver.launchTasks(offer.id, tasks) + else: + print "Declining offer on [%s]" % offer.hostname + driver.declineOffer(offer.id) def statusUpdate(self, driver, update): - """ - Invoked when the status of a task has changed (e.g., a slave is lost - and so the task is lost, a task finishes and an executor sends a - status update saying so, etc.) Note that returning from this callback - acknowledges receipt of this status update. If for whatever reason - the scheduler aborts during this callback (or the process exits) - another status update will be delivered. Note, however, that this is - currently not true if the slave sending the status update is lost or - fails during that time. - """ stateName = task_state.nameFor[update.state] print "Task [%s] is in state [%s]" % (update.task_id.value, stateName) + if update.state == 1: # Running + self.tasksRunning += 1 + + elif update.state == 3: # Failed, retry + print "Task [%s] failed with message \"%s\"" \ + % (update.task_id.value, update.message) + self.tasksRunning -= 1 + self.retryTask(update.task_id.value, update.data) + + elif self.tasksRunning > 0 and update.state > 1: # Terminal state + self.tasksRunning -= 1 + def frameworkMessage(self, driver, executorId, slaveId, message): - """ - Invoked when an executor sends a message. These messages are best - effort; do not expect a framework message to be retransmitted in any - reliable fashion. - """ o = json.loads(message) if executorId.value == crawlExecutor.executor_id.value: result = results.CrawlResult(o['taskId'], o['url'], o['links']) - # - # TODO - # + for link in result.links: + edge = (result.url, link) + print "Appending [%s] to crawl results" % repr(edge) + self.crawlResults.add(edge) + if not self.renderLimitReached and self.maxRenderTasks > 0 and \ + self.maxRenderTasks <= len(self.processedURLs): + print "Render task limit (%d) reached" % self.maxRenderTasks + self.renderLimitReached = True + if not link in self.processedURLs and not self.renderLimitReached: + print "Enqueueing [%s]" % link + self.crawlQueue.append(link) + self.renderQueue.append(link) + self.processedURLs.add(link) elif executorId.value == renderExecutor.executor_id.value: result = results.RenderResult(o['taskId'], o['url'], o['imageUrl']) - # - # TODO - # - - def slaveLost(self, driver, slaveId): - """ - Invoked when a slave has been determined unreachable (e.g., machine - failure, network partition.) Most frameworks will need to reschedule - any tasks launched on this slave on a new slave. - """ - pass - - def executorLost(self, driver, executorId, slaveId, status): - """ - Invoked when an executor has exited/terminated. Note that any tasks - running will have TASK_LOST status updates automatically generated. - """ - pass - - def error(self, driver, message): - """ - Invoked when there is an unrecoverable error in the scheduler or - scheduler driver. The driver will be aborted BEFORE invoking this - callback. - """ - print "Error from Mesos: %s " % message + print "Appending [%s] to render results" % repr((result.url, result.imageUrl)) + self.renderResults[result.url] = result.imageUrl def hard_shutdown(): driver.stop() diff --git a/scala/src/main/scala/mesosphere/rendler/Scheduler.scala b/scala/src/main/scala/mesosphere/rendler/Scheduler.scala index 783a629..3c6ac05 100644 --- a/scala/src/main/scala/mesosphere/rendler/Scheduler.scala +++ b/scala/src/main/scala/mesosphere/rendler/Scheduler.scala @@ -10,7 +10,7 @@ import scala.concurrent.Future import java.io.File import java.nio.charset.Charset -class Scheduler(val rendlerHome: File, seedURL: String) +class SchedulerSkeleton(val rendlerHome: File, seedURL: String) extends mesos.Scheduler with ResultProtocol with TaskUtils @@ -61,35 +61,13 @@ class Scheduler(val rendlerHome: File, seedURL: String) executorId: Protos.ExecutorID, slaveId: Protos.SlaveID, data: Array[Byte]): Unit = { + import play.api.libs.json._ println(s"Received a framework message from [${executorId.getValue}]") - val jsonString = new String(data, Charset.forName("UTF-8")) - executorId.getValue match { - case id if id == crawlExecutor.getExecutorId.getValue => - val result = Json.parse(jsonString).as[CrawlResult] - for (link <- result.links) { - val edge = Edge(result.url, link) - println(s"Appending [$edge] to crawl results") - crawlResults += edge - if (!processedURLs.contains(link)) { - println(s"Enqueueing [$link]") - crawlQueue += link - renderQueue += link - processedURLs += link - } - } - - case id if id == renderExecutor.getExecutorId.getValue => - val result = Json.parse(jsonString).as[RenderResult] - val mapping = result.url -> result.imageUrl - println(s"Appending [$mapping] to render results") - renderResults += mapping - - case _ => () - } + ??? // TODO } def offerRescinded( @@ -118,35 +96,9 @@ class Scheduler(val rendlerHome: File, seedURL: String) for (offer <- offers.asScala) { println(s"Got resource offer [$offer]") - - if (shuttingDown) { - println(s"Shutting down: declining offer on [${offer.getHostname}]") - driver.declineOffer(offer.getId) - } - else { - val maxTasks = maxTasksForOffer(offer) - - val tasks = mutable.Buffer[Protos.TaskInfo]() - - for (_ <- 0 until maxTasks / 2) { - if (crawlQueue.nonEmpty) { - val url = crawlQueue.dequeue - tasks += makeCrawlTask(s"$tasksCreated", url, offer) - tasksCreated = tasksCreated + 1 - } - if (renderQueue.nonEmpty) { - val url = renderQueue.dequeue - tasks += makeRenderTask(s"$tasksCreated", url, offer) - tasksCreated = tasksCreated + 1 - } - } - - if (tasks.nonEmpty) - driver.launchTasks(Seq(offer.getId).asJava, tasks.asJava) - else - driver.declineOffer(offer.getId) - } } + + ??? // TODO } def slaveLost( @@ -160,10 +112,8 @@ class Scheduler(val rendlerHome: File, seedURL: String) val taskId = taskStatus.getTaskId.getValue val state = taskStatus.getState println(s"Task [$taskId] is in state [$state]") - if (state == Protos.TaskState.TASK_RUNNING) - tasksRunning = tasksRunning + 1 - else if (isTerminal(state)) - tasksRunning = math.max(0, tasksRunning - 1) + + ??? // TODO } } \ No newline at end of file diff --git a/scala/src/main/scala/mesosphere/rendler/SchedulerSkeleton.scala b/scala/src/main/scala/mesosphere/rendler/Scheduler_CHEAT.scala similarity index 62% rename from scala/src/main/scala/mesosphere/rendler/SchedulerSkeleton.scala rename to scala/src/main/scala/mesosphere/rendler/Scheduler_CHEAT.scala index 3c6ac05..783a629 100644 --- a/scala/src/main/scala/mesosphere/rendler/SchedulerSkeleton.scala +++ b/scala/src/main/scala/mesosphere/rendler/Scheduler_CHEAT.scala @@ -10,7 +10,7 @@ import scala.concurrent.Future import java.io.File import java.nio.charset.Charset -class SchedulerSkeleton(val rendlerHome: File, seedURL: String) +class Scheduler(val rendlerHome: File, seedURL: String) extends mesos.Scheduler with ResultProtocol with TaskUtils @@ -61,13 +61,35 @@ class SchedulerSkeleton(val rendlerHome: File, seedURL: String) executorId: Protos.ExecutorID, slaveId: Protos.SlaveID, data: Array[Byte]): Unit = { - import play.api.libs.json._ println(s"Received a framework message from [${executorId.getValue}]") + val jsonString = new String(data, Charset.forName("UTF-8")) - ??? // TODO + executorId.getValue match { + case id if id == crawlExecutor.getExecutorId.getValue => + val result = Json.parse(jsonString).as[CrawlResult] + for (link <- result.links) { + val edge = Edge(result.url, link) + println(s"Appending [$edge] to crawl results") + crawlResults += edge + if (!processedURLs.contains(link)) { + println(s"Enqueueing [$link]") + crawlQueue += link + renderQueue += link + processedURLs += link + } + } + + case id if id == renderExecutor.getExecutorId.getValue => + val result = Json.parse(jsonString).as[RenderResult] + val mapping = result.url -> result.imageUrl + println(s"Appending [$mapping] to render results") + renderResults += mapping + + case _ => () + } } def offerRescinded( @@ -96,9 +118,35 @@ class SchedulerSkeleton(val rendlerHome: File, seedURL: String) for (offer <- offers.asScala) { println(s"Got resource offer [$offer]") - } - ??? // TODO + if (shuttingDown) { + println(s"Shutting down: declining offer on [${offer.getHostname}]") + driver.declineOffer(offer.getId) + } + else { + val maxTasks = maxTasksForOffer(offer) + + val tasks = mutable.Buffer[Protos.TaskInfo]() + + for (_ <- 0 until maxTasks / 2) { + if (crawlQueue.nonEmpty) { + val url = crawlQueue.dequeue + tasks += makeCrawlTask(s"$tasksCreated", url, offer) + tasksCreated = tasksCreated + 1 + } + if (renderQueue.nonEmpty) { + val url = renderQueue.dequeue + tasks += makeRenderTask(s"$tasksCreated", url, offer) + tasksCreated = tasksCreated + 1 + } + } + + if (tasks.nonEmpty) + driver.launchTasks(Seq(offer.getId).asJava, tasks.asJava) + else + driver.declineOffer(offer.getId) + } + } } def slaveLost( @@ -112,8 +160,10 @@ class SchedulerSkeleton(val rendlerHome: File, seedURL: String) val taskId = taskStatus.getTaskId.getValue val state = taskStatus.getState println(s"Task [$taskId] is in state [$state]") - - ??? // TODO + if (state == Protos.TaskState.TASK_RUNNING) + tasksRunning = tasksRunning + 1 + else if (isTerminal(state)) + tasksRunning = math.max(0, tasksRunning - 1) } } \ No newline at end of file