Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Collapse Service #62

Merged
merged 22 commits into from
Mar 23, 2021
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ if(COMPILE_WASM)
list(APPEND WASM_COMPILE_FLAGS -pthread -O3 -g2 -fPIC -mssse3 -msimd128)
list(APPEND WASM_COMPILE_FLAGS "SHELL:-s WASM=1" "SHELL:-s ASSERTIONS=0" "SHELL:-s DISABLE_EXCEPTION_CATCHING=1" "SHELL:-s LLD_REPORT_UNDEFINED" "SHELL:-s FORCE_FILESYSTEM=1" "SHELL:-s ALLOW_MEMORY_GROWTH=1")
list(APPEND WASM_COMPILE_FLAGS -Wno-error=pthreads-mem-growth)
list(APPEND WASM_COMPILE_FLAGS -DWASM_HIDE_THREADS)
jerinphilip marked this conversation as resolved.
Show resolved Hide resolved
endif(COMPILE_WASM)

add_subdirectory(3rd_party)
Expand Down
1 change: 0 additions & 1 deletion app/marian-decoder-new.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,5 @@ int main(int argc, char *argv[]) {
marian_decoder_minimal(response.histories(), service.targetVocab(), options);

LOG(info, "Total time: {:.5f}s wall", decoderTimer.elapsed());
service.stop();
return 0;
}
3 changes: 0 additions & 3 deletions app/service-cli-bytearray.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,6 @@ int main(int argc, char *argv[]) {
Response response = responseFuture.get();
std::cout << response.translation() << std::endl;

// Stop Service.
service.stop();

// Clear the memory used for the byte array
free(model_bytes); // Ideally, this should be done after the translation model has been gracefully shut down.

Expand Down
2 changes: 0 additions & 2 deletions app/service-cli.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,5 @@ int main(int argc, char *argv[]) {
Response response = responseFuture.get();
std::cout << response.translation() << std::endl;

// Stop Service.
service.stop();
return 0;
}
10 changes: 3 additions & 7 deletions src/translator/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,24 +1,18 @@
if (NOT USE_WASM_COMPATIBLE_SOURCES)
set(MULTITHREADED_SERVICE_SOURCE "service.cpp")
endif()

add_library(bergamot-translator STATIC
AbstractTranslationModel.cpp
TranslationModel.cpp

# Following files added from browsermt/mts@nuke
byteArrayExample.cpp
text_processor.cpp
sentence_splitter.cpp
batch_translator.cpp
multifactor_priority.cpp
request.cpp
service_base.cpp
${MULTITHREADED_SERVICE_SOURCE}
batcher.cpp
response.cpp
batch.cpp
sentence_ranges.cpp
service.cpp
)
if (COMPILE_DECODER_ONLY)
# A dirty hack because of marian's bad cmake practices
Expand All @@ -31,6 +25,8 @@ if(COMPILE_WASM)
# Enable code that is required for generating JS bindings
target_compile_definitions(bergamot-translator PRIVATE WASM_BINDINGS)
target_compile_options(bergamot-translator PRIVATE ${WASM_COMPILE_FLAGS})
else(COMPILE_WASM)
target_compile_definitions(bergamot-translator PUBLIC)
jerinphilip marked this conversation as resolved.
Show resolved Hide resolved
endif(COMPILE_WASM)

target_link_libraries(bergamot-translator marian ssplit)
Expand Down
47 changes: 4 additions & 43 deletions src/translator/TranslationModel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,53 +6,14 @@
#include <future>
#include <vector>

// All 3rd party includes
#include "3rd_party/marian-dev/src/3rd_party/yaml-cpp/yaml.h"
#include "3rd_party/marian-dev/src/common/config_parser.h"
#include "common/config_validator.h"
#include "common/options.h"

// All local project includes
#include "TranslationModel.h"
#include "translator/parser.h"
#include "translator/service_base.h"

std::shared_ptr<marian::Options> parseOptions(const std::string &config) {
marian::Options options;

// @TODO(jerinphilip) There's something off here, @XapaJIaMnu suggests
// that should not be using the defaultConfig. This function only has access
// to std::string config and needs to be able to construct Options from the
// same.

// Absent the following code-segment, there is a parsing exception thrown on
// rebuilding YAML.
//
// Error: Unhandled exception of type 'N4YAML11InvalidNodeE': invalid node;
// this may result from using a map iterator as a sequence iterator, or
// vice-versa
//
// Error: Aborted from void unhandledException() in
// 3rd_party/marian-dev/src/common/logging.cpp:113

marian::ConfigParser configParser = marian::bergamot::createConfigParser();
const YAML::Node &defaultConfig = configParser.getConfig();

options.merge(defaultConfig);

// Parse configs onto defaultConfig.
options.parse(config);
YAML::Node configCopy = options.cloneToYamlNode();

marian::ConfigValidator validator(configCopy);
validator.validateOptions(marian::cli::mode::translation);

return std::make_shared<marian::Options>(options);
}
#include "translator/service.h"

TranslationModel::TranslationModel(const std::string &config, const void * model_memory)
: configOptions_(std::move(parseOptions(config))),
AbstractTranslationModel(), service_(configOptions_, model_memory) {}
TranslationModel::TranslationModel(const std::string &config,
const void *model_memory)
: AbstractTranslationModel(), service_(config, model_memory) {}

TranslationModel::~TranslationModel() {}

Expand Down
10 changes: 6 additions & 4 deletions src/translator/TranslationModel.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

// All local project includes
#include "AbstractTranslationModel.h"
#include "translator/service_base.h"
#include "translator/service.h"

/* A Translation model that translates a plain (without any markups and emojis)
* UTF-8 encoded text. This implementation supports translation from 1 source
Expand All @@ -29,9 +29,11 @@ class TranslationModel : public AbstractTranslationModel {
*/
/**
* @param config Marian yml config file in the form of a string
* @param model_memory optional byte array (aligned to 64!!!) that contains the bytes of a model.bin.
* @param model_memory optional byte array (aligned to 64!!!) that contains
* the bytes of a model.bin.
*/
TranslationModel(const std::string &config, const void * model_memory = nullptr);
TranslationModel(const std::string &config,
const void *model_memory = nullptr);

~TranslationModel();

Expand Down Expand Up @@ -69,7 +71,7 @@ class TranslationModel : public AbstractTranslationModel {
private:
// Model configuration options
std::shared_ptr<marian::Options> configOptions_; // ORDER DEPENDECNY
marian::bergamot::NonThreadedService service_; // ORDER DEPENDENCY
marian::bergamot::Service service_; // ORDER DEPENDENCY
};

#endif /* SRC_TRANSLATOR_TRANSLATIONMODEL_H_ */
2 changes: 1 addition & 1 deletion src/translator/batch_translator.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
#include "translator/history.h"
#include "translator/scorers.h"

#ifdef WITH_PTHREADS
#ifndef WASM_HIDE_THREADS
#include "pcqueue.h"
#endif

Expand Down
2 changes: 1 addition & 1 deletion src/translator/batcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
#include "definitions.h"
#include "request.h"

#ifdef WITH_PTHREADS
#ifndef WASM_HIDE_THREADS
#include "pcqueue.h"
#endif

Expand Down
38 changes: 38 additions & 0 deletions src/translator/parser.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
#ifndef SRC_BERGAMOT_PARSER_H
#define SRC_BERGAMOT_PARSER_H

#include "3rd_party/yaml-cpp/yaml.h"
#include "common/config_parser.h"
#include "common/config_validator.h"
#include "common/options.h"
#include "marian.h"

namespace marian {
Expand All @@ -22,6 +26,40 @@ inline marian::ConfigParser createConfigParser() {
return cp;
}

inline std::shared_ptr<marian::Options>
parseOptions(const std::string &config) {
marian::Options options;

// @TODO(jerinphilip) There's something off here, @XapaJIaMnu suggests
// that should not be using the defaultConfig. This function only has access
// to std::string config and needs to be able to construct Options from the
// same.

// Absent the following code-segment, there is a parsing exception thrown on
// rebuilding YAML.
//
// Error: Unhandled exception of type 'N4YAML11InvalidNodeE': invalid node;
// this may result from using a map iterator as a sequence iterator, or
// vice-versa
//
// Error: Aborted from void unhandledException() in
// 3rd_party/marian-dev/src/common/logging.cpp:113

marian::ConfigParser configParser = createConfigParser();
const YAML::Node &defaultConfig = configParser.getConfig();

options.merge(defaultConfig);

// Parse configs onto defaultConfig.
options.parse(config);
YAML::Node configCopy = options.cloneToYamlNode();

marian::ConfigValidator validator(configCopy);
validator.validateOptions(marian::cli::mode::translation);

return std::make_shared<marian::Options>(options);
}

} // namespace bergamot
} // namespace marian

Expand Down
119 changes: 100 additions & 19 deletions src/translator/service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,77 @@
#include <string>
#include <utility>

inline std::vector<marian::Ptr<const marian::Vocab>>
loadVocabularies(marian::Ptr<marian::Options> options) {
// @TODO: parallelize vocab loading for faster startup
auto vfiles = options->get<std::vector<std::string>>("vocabs");
// with the current setup, we need at least two vocabs: src and trg
ABORT_IF(vfiles.size() < 2, "Insufficient number of vocabularies.");
std::vector<marian::Ptr<marian::Vocab const>> vocabs(vfiles.size());
std::unordered_map<std::string, marian::Ptr<marian::Vocab>> vmap;
for (size_t i = 0; i < vocabs.size(); ++i) {
auto m =
vmap.emplace(std::make_pair(vfiles[i], marian::Ptr<marian::Vocab>()));
if (m.second) { // new: load the vocab
m.first->second = marian::New<marian::Vocab>(options, i);
m.first->second->load(vfiles[i]);
}
vocabs[i] = m.first->second;
}
return vocabs;
}

namespace marian {
namespace bergamot {

Service::Service(Ptr<Options> options, const void * model_memory)
: ServiceBase(options), numWorkers_(options->get<int>("cpu-threads")),
pcqueue_(numWorkers_), model_memory_{model_memory} {
Service::Service(Ptr<Options> options, const void *model_memory)
: requestId_(0), vocabs_(std::move(loadVocabularies(options))),
text_processor_(vocabs_, options), batcher_(options),
numWorkers_(options->get<int>("cpu-threads")), model_memory_(model_memory)
#ifndef WASM_HIDE_THREADS
// 0 elements in PCQueue is illegal and can lead to failures. Adding a
// guard to have at least one entry allocated. In the single-threaded
// case, while initialized pcqueue_ remains unused.
,
pcqueue_(std::max<size_t>(1, numWorkers_))
#endif
{

if (numWorkers_ == 0) {
ABORT("Fatal: Attempt to create multithreaded instance with --cpu-threads "
"0. ");
build_translators(options, /*numTranslators=*/1);
initialize_blocking_translator();
} else {
build_translators(options, numWorkers_);
initialize_async_translators();
}
}

translators_.reserve(numWorkers_);
workers_.reserve(numWorkers_);

for (size_t cpuId = 0; cpuId < numWorkers_; cpuId++) {
void Service::build_translators(Ptr<Options> options, size_t numTranslators) {
translators_.reserve(numTranslators);
for (size_t cpuId = 0; cpuId < numTranslators; cpuId++) {
marian::DeviceId deviceId(cpuId, DeviceType::cpu);
translators_.emplace_back(deviceId, vocabs_, options, model_memory_);
}
}

void Service::initialize_blocking_translator() {
translators_.back().initialize();
}

void Service::blocking_translate() {
Batch batch;
while (batcher_ >> batch) {
auto &translator = translators_.back();
translator.translate(batch);
}
}

#ifndef WASM_HIDE_THREADS
void Service::initialize_async_translators() {
workers_.reserve(numWorkers_);

for (size_t cpuId = 0; cpuId < numWorkers_; cpuId++) {
auto &translator = translators_[cpuId];
workers_.emplace_back([&translator, this] {
translator.initialize();

Expand All @@ -42,29 +94,58 @@ Service::Service(Ptr<Options> options, const void * model_memory)
}
}

void Service::enqueue() {
void Service::async_translate() {
Batch batch;
while (batcher_ >> batch) {
pcqueue_.ProduceSwap(batch);
}
}
#else // WASM_HIDE_THREADS
void Service::initialize_async_translators() {
ABORT("Cannot run in async mode without multithreading.");
}

void Service::async_translate() {
ABORT("Cannot run in async mode without multithreading.");
}
#endif // WASM_HIDE_THREADS

std::future<Response> Service::translate(std::string &&input) {
Segments segments;
SentenceRanges sourceRanges;
text_processor_.process(input, segments, sourceRanges);

std::promise<Response> responsePromise;
auto future = responsePromise.get_future();

Ptr<Request> request = New<Request>(
requestId_++, /* lineNumberBegin = */ 0, vocabs_, std::move(input),
std::move(segments), std::move(sourceRanges), std::move(responsePromise));

batcher_.addWholeRequest(request);
if (numWorkers_ == 0) {
blocking_translate();
} else {
async_translate();
jerinphilip marked this conversation as resolved.
Show resolved Hide resolved
}
return future;
}

Service::~Service() {
#ifndef WASM_HIDE_THREADS
for (size_t workerId = 0; workerId < numWorkers_; workerId++) {

void Service::stop() {
for (auto &worker : workers_) {
Batch poison = Batch::poison();
pcqueue_.ProduceSwap(poison);
}

for (auto &worker : workers_) {
if (worker.joinable()) {
worker.join();
for (size_t workerId = 0; workerId < numWorkers_; workerId++) {
if (workers_[workerId].joinable()) {
workers_[workerId].join();
}
}

workers_.clear();
#endif
}

Service::~Service() { stop(); }

} // namespace bergamot
} // namespace marian
Loading