Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make subscribe and unsubscribe asynchronous #13

Merged
merged 1 commit into from
Apr 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
248 changes: 94 additions & 154 deletions src/FSChanges.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,10 @@
#include "Event.hh"
#include "Backend.hh"
#include "Watcher.hh"
#include "PromiseRunner.hh"

using namespace Napi;

class FSAsyncRunner;
typedef void (*AsyncFunction)(FSAsyncRunner *);

std::unordered_set<std::string> getIgnore(Env env, Value opts) {
std::unordered_set<std::string> ignore;

Expand Down Expand Up @@ -40,129 +38,66 @@ std::shared_ptr<Backend> getBackend(Env env, Value opts) {
return Backend::getShared(backendName);
}

class FSAsyncRunner {
class WriteSnapshotRunner : public PromiseRunner {
public:
const Env env;

std::shared_ptr<Backend> backend;
std::shared_ptr<Watcher> watcher;
std::string snapshotPath;
bool returnEvents;
bool useSharedWatcher;

FSAsyncRunner(Env env, Value dir, Value snap, Value opts, Promise::Deferred r, AsyncFunction func, bool useSharedWatcher)
: env(env),
snapshotPath(std::string(snap.As<String>().Utf8Value().c_str())),
returnEvents(false),
useSharedWatcher(useSharedWatcher),
func(func), deferred(r) {

napi_status status = napi_create_async_work(env, nullptr, env.Undefined(),
OnExecute, OnWorkComplete, this, &this->work);
if(status != napi_ok) {
work = nullptr;
const napi_extended_error_info *error_info = 0;
napi_get_last_error_info(env, &error_info);
if(error_info->error_message)
Error::New(env, error_info->error_message).ThrowAsJavaScriptException();
else
Error::New(env).ThrowAsJavaScriptException();
}

if (useSharedWatcher) {
watcher = Watcher::getShared(
std::string(dir.As<String>().Utf8Value().c_str()),
getIgnore(env, opts)
);
} else {
watcher = std::make_shared<Watcher>(
std::string(dir.As<String>().Utf8Value().c_str()),
getIgnore(env, opts)
);
}
WriteSnapshotRunner(Env env, Value dir, Value snap, Value opts)
: PromiseRunner(env),
snapshotPath(std::string(snap.As<String>().Utf8Value().c_str())) {
watcher = Watcher::getShared(
std::string(dir.As<String>().Utf8Value().c_str()),
getIgnore(env, opts)
);

backend = getBackend(env, opts);
}

void Queue() {
if(this->work) {
napi_status status = napi_queue_async_work(env, this->work);
NAPI_THROW_IF_FAILED_VOID(env, status);
}
~WriteSnapshotRunner() {
watcher->unref();
backend->unref();
}

private:
napi_async_work work;
AsyncFunction func;
Promise::Deferred deferred;

static void OnExecute(napi_env env, void* this_pointer) {
FSAsyncRunner* self = (FSAsyncRunner*) this_pointer;
self->Execute();
}

static void OnWorkComplete(napi_env env, napi_status status, void* this_pointer) {
FSAsyncRunner* self = (FSAsyncRunner*) this_pointer;
if (status != napi_cancelled) {
HandleScope scope(self->env);
if(status == napi_ok) {
status = napi_delete_async_work(self->env, self->work);
if(status == napi_ok) {
self->OnOK();
delete self;
return;
}
}
}
std::shared_ptr<Backend> backend;
std::shared_ptr<Watcher> watcher;
std::string snapshotPath;

// fallthrough for error handling
const napi_extended_error_info *error_info = 0;
napi_get_last_error_info(env, &error_info);
if(error_info->error_message){
self->OnError(Error::New(env, error_info->error_message));
} else {
self->OnError(Error::New(env));
}
delete self;
void execute() override {
backend->writeSnapshot(*watcher, &snapshotPath);
}
};

class GetEventsSinceRunner : public PromiseRunner {
public:
GetEventsSinceRunner(Env env, Value dir, Value snap, Value opts)
: PromiseRunner(env),
snapshotPath(std::string(snap.As<String>().Utf8Value().c_str())) {
watcher = std::make_shared<Watcher>(
std::string(dir.As<String>().Utf8Value().c_str()),
getIgnore(env, opts)
);

void Execute() {
this->func(this);
backend = getBackend(env, opts);
}

void OnOK() {
HandleScope scope(env);
Value result;

if (this->returnEvents) {
result = watcher->mEvents.toJS(env);
} else {
result = env.Null();
}

~GetEventsSinceRunner() {
watcher->unref();
backend->unref();
this->deferred.Resolve(result);
}
private:
std::shared_ptr<Backend> backend;
std::shared_ptr<Watcher> watcher;
std::string snapshotPath;

void OnError(const Error& e) {
watcher->unref();
backend->unref();
this->deferred.Reject(e.Value());
void execute() override {
backend->getEventsSince(*watcher, &snapshotPath);
}
};

void writeSnapshotAsync(FSAsyncRunner *runner) {
runner->backend->writeSnapshot(*runner->watcher, &runner->snapshotPath);
}

void getEventsSinceAsync(FSAsyncRunner *runner) {
runner->backend->getEventsSince(*runner->watcher, &runner->snapshotPath);
runner->returnEvents = true;
}
Value getResult() override {
return watcher->mEvents.toJS(env);
}
};

Value queueWork(const CallbackInfo& info, AsyncFunction func, bool useSharedWatcher) {
template<class Runner>
Value queueSnapshotWork(const CallbackInfo& info) {
Env env = info.Env();
if (info.Length() < 1 || !info[0].IsString()) {
TypeError::New(env, "Expected a string").ThrowAsJavaScriptException();
Expand All @@ -179,57 +114,68 @@ Value queueWork(const CallbackInfo& info, AsyncFunction func, bool useSharedWatc
return env.Null();
}

Promise::Deferred deferred = Promise::Deferred::New(env);
FSAsyncRunner *runner = new FSAsyncRunner(info.Env(), info[0], info[1], info[2], deferred, func, useSharedWatcher);
runner->Queue();

return deferred.Promise();
Runner *runner = new Runner(info.Env(), info[0], info[1], info[2]);
return runner->queue();
}

Value writeSnapshot(const CallbackInfo& info) {
return queueWork(info, writeSnapshotAsync, true);
return queueSnapshotWork<WriteSnapshotRunner>(info);
}

Value getEventsSince(const CallbackInfo& info) {
return queueWork(info, getEventsSinceAsync, false);
return queueSnapshotWork<GetEventsSinceRunner>(info);
}

Value subscribe(const CallbackInfo& info) {
Env env = info.Env();
if (info.Length() < 1 || !info[0].IsString()) {
TypeError::New(env, "Expected a string").ThrowAsJavaScriptException();
return env.Null();
}
class SubscribeRunner : public PromiseRunner {
public:
SubscribeRunner(Env env, Value dir, Value fn, Value opts) : PromiseRunner(env) {
watcher = Watcher::getShared(
std::string(dir.As<String>().Utf8Value().c_str()),
getIgnore(env, opts)
);

if (info.Length() < 2 || !info[1].IsFunction()) {
TypeError::New(env, "Expected a function").ThrowAsJavaScriptException();
return env.Null();
backend = getBackend(env, opts);
shouldWatch = watcher->watch(fn.As<Function>());
}

if (info.Length() >= 3 && !info[2].IsObject()) {
TypeError::New(env, "Expected an object").ThrowAsJavaScriptException();
return env.Null();
private:
std::shared_ptr<Watcher> watcher;
std::shared_ptr<Backend> backend;
bool shouldWatch;

void execute() override {
if (shouldWatch) {
backend->watch(*watcher);
}
}
};

try {
std::shared_ptr<Watcher> watcher = Watcher::getShared(
std::string(info[0].As<String>().Utf8Value().c_str()),
getIgnore(env, info[2])
class UnsubscribeRunner : public PromiseRunner {
public:
UnsubscribeRunner(Env env, Value dir, Value fn, Value opts) : PromiseRunner(env) {
watcher = Watcher::getShared(
std::string(dir.As<String>().Utf8Value().c_str()),
getIgnore(env, opts)
);

bool added = watcher->watch(info[1].As<Function>());
if (added) {
std::shared_ptr<Backend> b = getBackend(env, info[2]);
b->watch(*watcher);
}
} catch (const char *err) {
Error::New(env, err).ThrowAsJavaScriptException();
backend = getBackend(env, opts);
shouldUnwatch = watcher->unwatch(fn.As<Function>());
}

return env.Null();
}
private:
std::shared_ptr<Watcher> watcher;
std::shared_ptr<Backend> backend;
bool shouldUnwatch;

Value unsubscribe(const CallbackInfo& info) {
void execute() override {
if (shouldUnwatch) {
backend->unwatch(*watcher);
}
}
};

template<class Runner>
Value queueSubscriptionWork(const CallbackInfo& info) {
Env env = info.Env();
if (info.Length() < 1 || !info[0].IsString()) {
TypeError::New(env, "Expected a string").ThrowAsJavaScriptException();
Expand All @@ -246,22 +192,16 @@ Value unsubscribe(const CallbackInfo& info) {
return env.Null();
}

try {
std::shared_ptr<Watcher> watcher = Watcher::getShared(
std::string(info[0].As<String>().Utf8Value().c_str()),
getIgnore(env, info[2])
);
Runner *runner = new Runner(info.Env(), info[0], info[1], info[2]);
return runner->queue();
}

bool removed = watcher->unwatch(info[1].As<Function>());
if (removed) {
std::shared_ptr<Backend> b = getBackend(env, info[2]);;
b->unwatch(*watcher);
}
} catch (const char *err) {
Error::New(env, err).ThrowAsJavaScriptException();
}

return env.Null();
Value subscribe(const CallbackInfo& info) {
return queueSubscriptionWork<SubscribeRunner>(info);
}

Value unsubscribe(const CallbackInfo& info) {
return queueSubscriptionWork<UnsubscribeRunner>(info);
}

Object Init(Env env, Object exports) {
Expand Down
Loading