Skip to content
This repository has been archived by the owner on Jan 22, 2020. It is now read-only.

Mark skeletons as the default; Move original into CHEAT files #7

Open
wants to merge 6 commits into
base: mesoscon
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 14 additions & 7 deletions cpp/Makefile
Original file line number Diff line number Diff line change
@@ -1,22 +1,29 @@
# Prefix where Mesos is installed
MESOS_PREFIX = /usr
#MESOS_PREFIX = $(HOME)/usr

CXX = g++
CXXFLAGS = -g -O2
LDFLAGS += -lmesos
CXXFLAGS = -g -g2 -O2 -std=c++11
LDFLAGS = -L$(MESOS_PREFIX)/lib
INCLUDES = -I$(MESOS_PREFIX)/include

LDFLAGS += -lmesos -lprotobuf -lpthread
CXXCOMPILE = $(CXX) $(INCLUDES) $(CXXFLAGS) -c -o $@
CXXLINK = $(CXX) $(INCLUDES) $(CXXFLAGS) $(LDFLAGS) -o $@
CXXLINK = $(CXX) $(INCLUDES) $(CXXFLAGS) -o $@ $< $(LDFLAGS)

default: all
all: rendler crawl_executor render_executor
all: rendler rendler_skeleton crawl_executor render_executor

HEADERS = rendler_helper.hpp

crawl_executor: crawl_executor.cpp $(HEADERS)
$(CXXLINK) $< -lboost_regex -lcurl
$(CXXLINK) -lboost_regex -lcurl

%: %.cpp $(HEADERS)
$(CXXLINK) $<
$(CXXLINK)

check: crawl
./crawl http://mesosphere.io/team/

clean:
(rm -f core crawl_executor render_executor rendler)
(rm -f core crawl_executor render_executor rendler rendler_skeleton)
8 changes: 4 additions & 4 deletions cpp/README → cpp/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ RENDLER implementation in C++

Dependencies:
============
- libboost_regex.so
- libboost\_regex.so
- libcurl.so
- Makefile assumes all 3rdparty libraries/headers to be available in the
default include path (/usr/include?).
Expand All @@ -18,7 +18,7 @@ Limitations/Features:
====================
- Doesn't store the images to S3, just locally.
- Image files are kept in rendler-work-dir in the same folder as the
render_executor executable.
render\_executor executable.
- Images files are named R<N> where N is a monotonouly increasing integer.
- It wouldn't crawl outside of the given base URL (it will still render those
webpages) to avoid pulling in too much data.
Expand All @@ -28,5 +28,5 @@ Limitations/Features:
Communication between Scheduler and Executors:
=============================================
- Each framework message consists of a vector of strings:
RenderExecuter->Scheduler: { taskId, taskUrl, filepath }
CrawlExecuter->Scheduler: { taskId, taskUrl, <urls>+ }
- RenderExecuter->Scheduler: { taskId, taskUrl, filepath }
- CrawlExecuter->Scheduler: { taskId, taskUrl, \<urls>+ }
186 changes: 109 additions & 77 deletions cpp/rendler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
* limitations under the License.
*/

// See the Mesos Framework Development Guide:
// http://mesos.apache.org/documentation/latest/app-framework-development-guide

#include <libgen.h>
#include <stdlib.h>
#include <limits.h>
Expand Down Expand Up @@ -73,100 +76,127 @@ class Rendler : public Scheduler
crawlQueue.push(seedUrl);
renderQueue.push(seedUrl);
processed[seedUrl] = nextUrlId++;
size_t lsp = seedUrl.find_last_of('/'); // skip the http:// part
size_t lsp = seedUrl.find_last_of('/');
baseUrl = seedUrl.substr(0, lsp); // No trailing slash
TASK_RESOURCES = Resources::parse(
"cpus:" + stringify<float>(CPUS_PER_TASK) +
";mem:" + stringify<size_t>(MEM_PER_TASK)).get();

}

virtual ~Rendler() {}


// Invoked when the scheduler successfully registers with a Mesos master.
// It is called with the frameworkId, a unique ID generated by the
// master, and the masterInfo which is information about the master
// itself.
virtual void registered(SchedulerDriver*,
const FrameworkID&,
const MasterInfo&)
{
cout << "Registered!" << endl;
}

// Invoked when the scheduler re-registers with a newly elected Mesos
// master. This is only called when the scheduler has previously been
// registered. masterInfo contains information about the newly elected
// master.
virtual void reregistered(SchedulerDriver*, const MasterInfo& masterInfo) {}

virtual void disconnected(SchedulerDriver* driver) {}

virtual void resourceOffers(SchedulerDriver* driver,
const vector<Offer>& offers)
size_t maxTasksForOffer(Offer offer)
{
for (size_t i = 0; i < offers.size(); i++) {
const Offer& offer = offers[i];
Resources remaining = offer.resources();

static Resources TASK_RESOURCES = Resources::parse(
"cpus:" + stringify<float>(CPUS_PER_TASK) +
";mem:" + stringify<size_t>(MEM_PER_TASK)).get();

size_t maxTasks = 0;
while (TASK_RESOURCES <= remaining) {
maxTasks++;
remaining -= TASK_RESOURCES;
}
size_t count = 0;
Resources remaining = offer.resources();
//
// TODO
//
return count;
}

// Launch tasks.
vector<TaskInfo> tasks;
for (size_t i = 0; i < maxTasks / 2 && crawlQueue.size() > 0; i++) {
string url = crawlQueue.front();
crawlQueue.pop();
string urlId = "C" + stringify<size_t>(processed[url]);
TaskInfo task;
task.set_name("Crawler " + urlId);
task.mutable_task_id()->set_value(urlId);
task.mutable_slave_id()->MergeFrom(offer.slave_id());
task.mutable_executor()->MergeFrom(crawler);
task.mutable_resources()->MergeFrom(TASK_RESOURCES);
task.set_data(url);
tasks.push_back(task);
tasksLaunched++;
cout << "Crawler " << urlId << " " << url << endl;
}
for (size_t i = maxTasks/2; i < maxTasks && renderQueue.size() > 0; i++) {
string url = renderQueue.front();
renderQueue.pop();
string urlId = "R" + stringify<size_t>(processed[url]);
TaskInfo task;
task.set_name("Renderer " + urlId);
task.mutable_task_id()->set_value(urlId);
task.mutable_slave_id()->MergeFrom(offer.slave_id());
task.mutable_executor()->MergeFrom(renderer);
task.mutable_resources()->MergeFrom(TASK_RESOURCES);
task.set_data(url);
tasks.push_back(task);
tasksLaunched++;
cout << "Renderer " << urlId << " " << url << endl;
}
TaskInfo makeTaskPrototype(Offer& offer, const ExecutorInfo& executor)
{
TaskInfo task;
task.mutable_slave_id()->MergeFrom(offer.slave_id());
task.mutable_executor()->MergeFrom(executor);
task.mutable_resources()->MergeFrom(TASK_RESOURCES);
return task;
}

driver->launchTasks(offer.id(), tasks);
}
TaskInfo makeCrawlTask(Offer& offer)
{
string url = crawlQueue.front();
crawlQueue.pop();

TaskInfo task = makeTaskPrototype(offer, crawler);
//
// TODO: Fill in task name, id, and data
//
return task;
}

TaskInfo makeRenderTask(Offer& offer)
{
string url = renderQueue.front();
renderQueue.pop();

TaskInfo task = makeTaskPrototype(offer, renderer);
//
// TODO: Fill in task name, id, and data
//
return task;
}

// Invoked when resources have been offered to this framework. A single
// offer will only contain resources from a single slave. Resources
// associated with an offer will not be re-offered to _this_ framework
// until either (a) this framework has rejected those resources (see
// SchedulerDriver.launchTasks) or (b) those resources have been
// rescinded (see Scheduler.offerRescinded). Note that resources may be
// concurrently offered to more than one framework at a time (depending
// on the allocator being used). In that case, the first framework to
// launch tasks using those resources will be able to use them while the
// other frameworks will have those resources rescinded (or if a
// framework has already launched tasks with those resources then those
// tasks will fail with a TASK_LOST status and a message saying as much).
virtual void resourceOffers(SchedulerDriver* driver,
const vector<Offer>& offers)
{
cout << "Received resource offer(s)" << endl;
//
// TODO: Launch tasks.
//
}

virtual void offerRescinded(SchedulerDriver* driver,
const OfferID& offerId) {}

// Invoked when the status of a task has changed (e.g., a slave is lost
// and so the task is lost, a task finishes and an executor sends a
// status update saying so, etc.) Note that returning from this callback
// acknowledges receipt of this status update. If for whatever reason
// the scheduler aborts during this callback (or the process exits)
// another status update will be delivered. Note, however, that this is
// currently not true if the slave sending the status update is lost or
// fails during that time.
virtual void statusUpdate(SchedulerDriver* driver, const TaskStatus& status)
{
if (status.state() == TASK_FINISHED) {
cout << "Task " << status.task_id().value() << " finished" << endl;
tasksFinished++;
}

if (tasksFinished == tasksLaunched &&
crawlQueue.empty() &&
renderQueue.empty()) {
// Wait to receive any pending framework messages
// If some framework messages are lost, it may hang indefinitely.
while (frameworkMessagesReceived != tasksFinished) {
sleep(1);
}
shutdown();
driver->stop();
}
//
// TODO: Terminate by calling shutdown() and driver->stop() if all running
// tasks have finished and there are no more queued tasks.
//
}

// Invoked when an executor sends a message. These messages are best
// effort; do not expect a framework message to be retransmitted in any
// reliable fashion.
virtual void frameworkMessage(SchedulerDriver* driver,
const ExecutorID& executorId,
const SlaveID& slaveId,
Expand All @@ -178,33 +208,34 @@ class Rendler : public Scheduler

if (executorId.value() == crawler.executor_id().value()) {
cout << "Crawler msg received: " << taskId << endl;

for (size_t i = 2; i < strVector.size(); i++) {
string& newURL = strVector[i];
crawlResults[url].push_back(newURL);
if (processed.find(newURL) == processed.end()) {
processed[newURL] = nextUrlId++;
if (newURL.substr(0, baseUrl.length()) == baseUrl) {
crawlQueue.push(newURL);
}
renderQueue.push(newURL);
}
}
vector<string> newURLs(strVector.begin() + 2, strVector.end());
//
// TODO
//
} else {
if (access(strVector[2].c_str(), R_OK) == 0) {
renderResults[url] = strVector[2];
}
cout << "Renderer msg received: " << taskId << endl;
string path = strVector[2];
//
// TODO
//
}
frameworkMessagesReceived++;
}

// Invoked when a slave has been determined unreachable (e.g., machine
// failure, network partition.) Most frameworks will need to reschedule
// any tasks launched on this slave on a new slave.
virtual void slaveLost(SchedulerDriver* driver, const SlaveID& sid) {}

// Invoked when an executor has exited/terminated. Note that any tasks
// running will have TASK_LOST status updates automatically generated.
virtual void executorLost(SchedulerDriver* driver,
const ExecutorID& executorID,
const SlaveID& slaveID,
int status) {}

// Invoked when there is an unrecoverable error in the scheduler or
// scheduler driver. The driver will be aborted BEFORE invoking this
// callback.
virtual void error(SchedulerDriver* driver, const string& message)
{
cout << message << endl;
Expand All @@ -218,6 +249,7 @@ class Rendler : public Scheduler
size_t tasksLaunched;
size_t tasksFinished;
size_t frameworkMessagesReceived;
Resources TASK_RESOURCES;
};

static void shutdown()
Expand Down
Loading