diff --git a/src/FSChanges.cc b/src/FSChanges.cc index 5f519311..03c23ca1 100644 --- a/src/FSChanges.cc +++ b/src/FSChanges.cc @@ -5,12 +5,10 @@ #include "Event.hh" #include "Backend.hh" #include "Watcher.hh" +#include "PromiseRunner.hh" using namespace Napi; -class FSAsyncRunner; -typedef void (*AsyncFunction)(FSAsyncRunner *); - std::unordered_set getIgnore(Env env, Value opts) { std::unordered_set ignore; @@ -40,129 +38,66 @@ std::shared_ptr getBackend(Env env, Value opts) { return Backend::getShared(backendName); } -class FSAsyncRunner { +class WriteSnapshotRunner : public PromiseRunner { public: - const Env env; - - std::shared_ptr backend; - std::shared_ptr watcher; - std::string snapshotPath; - bool returnEvents; - bool useSharedWatcher; - - FSAsyncRunner(Env env, Value dir, Value snap, Value opts, Promise::Deferred r, AsyncFunction func, bool useSharedWatcher) - : env(env), - snapshotPath(std::string(snap.As().Utf8Value().c_str())), - returnEvents(false), - useSharedWatcher(useSharedWatcher), - func(func), deferred(r) { - - napi_status status = napi_create_async_work(env, nullptr, env.Undefined(), - OnExecute, OnWorkComplete, this, &this->work); - if(status != napi_ok) { - work = nullptr; - const napi_extended_error_info *error_info = 0; - napi_get_last_error_info(env, &error_info); - if(error_info->error_message) - Error::New(env, error_info->error_message).ThrowAsJavaScriptException(); - else - Error::New(env).ThrowAsJavaScriptException(); - } - - if (useSharedWatcher) { - watcher = Watcher::getShared( - std::string(dir.As().Utf8Value().c_str()), - getIgnore(env, opts) - ); - } else { - watcher = std::make_shared( - std::string(dir.As().Utf8Value().c_str()), - getIgnore(env, opts) - ); - } + WriteSnapshotRunner(Env env, Value dir, Value snap, Value opts) + : PromiseRunner(env), + snapshotPath(std::string(snap.As().Utf8Value().c_str())) { + watcher = Watcher::getShared( + std::string(dir.As().Utf8Value().c_str()), + getIgnore(env, opts) + ); backend = getBackend(env, opts); } - void Queue() { - if(this->work) { - napi_status status = napi_queue_async_work(env, this->work); - NAPI_THROW_IF_FAILED_VOID(env, status); - } + ~WriteSnapshotRunner() { + watcher->unref(); + backend->unref(); } - private: - napi_async_work work; - AsyncFunction func; - Promise::Deferred deferred; - - static void OnExecute(napi_env env, void* this_pointer) { - FSAsyncRunner* self = (FSAsyncRunner*) this_pointer; - self->Execute(); - } - - static void OnWorkComplete(napi_env env, napi_status status, void* this_pointer) { - FSAsyncRunner* self = (FSAsyncRunner*) this_pointer; - if (status != napi_cancelled) { - HandleScope scope(self->env); - if(status == napi_ok) { - status = napi_delete_async_work(self->env, self->work); - if(status == napi_ok) { - self->OnOK(); - delete self; - return; - } - } - } + std::shared_ptr backend; + std::shared_ptr watcher; + std::string snapshotPath; - // fallthrough for error handling - const napi_extended_error_info *error_info = 0; - napi_get_last_error_info(env, &error_info); - if(error_info->error_message){ - self->OnError(Error::New(env, error_info->error_message)); - } else { - self->OnError(Error::New(env)); - } - delete self; + void execute() override { + backend->writeSnapshot(*watcher, &snapshotPath); } +}; +class GetEventsSinceRunner : public PromiseRunner { +public: + GetEventsSinceRunner(Env env, Value dir, Value snap, Value opts) + : PromiseRunner(env), + snapshotPath(std::string(snap.As().Utf8Value().c_str())) { + watcher = std::make_shared( + std::string(dir.As().Utf8Value().c_str()), + getIgnore(env, opts) + ); - void Execute() { - this->func(this); + backend = getBackend(env, opts); } - void OnOK() { - HandleScope scope(env); - Value result; - - if (this->returnEvents) { - result = watcher->mEvents.toJS(env); - } else { - result = env.Null(); - } - + ~GetEventsSinceRunner() { watcher->unref(); backend->unref(); - this->deferred.Resolve(result); } +private: + std::shared_ptr backend; + std::shared_ptr watcher; + std::string snapshotPath; - void OnError(const Error& e) { - watcher->unref(); - backend->unref(); - this->deferred.Reject(e.Value()); + void execute() override { + backend->getEventsSince(*watcher, &snapshotPath); } -}; -void writeSnapshotAsync(FSAsyncRunner *runner) { - runner->backend->writeSnapshot(*runner->watcher, &runner->snapshotPath); -} - -void getEventsSinceAsync(FSAsyncRunner *runner) { - runner->backend->getEventsSince(*runner->watcher, &runner->snapshotPath); - runner->returnEvents = true; -} + Value getResult() override { + return watcher->mEvents.toJS(env); + } +}; -Value queueWork(const CallbackInfo& info, AsyncFunction func, bool useSharedWatcher) { +template +Value queueSnapshotWork(const CallbackInfo& info) { Env env = info.Env(); if (info.Length() < 1 || !info[0].IsString()) { TypeError::New(env, "Expected a string").ThrowAsJavaScriptException(); @@ -179,57 +114,68 @@ Value queueWork(const CallbackInfo& info, AsyncFunction func, bool useSharedWatc return env.Null(); } - Promise::Deferred deferred = Promise::Deferred::New(env); - FSAsyncRunner *runner = new FSAsyncRunner(info.Env(), info[0], info[1], info[2], deferred, func, useSharedWatcher); - runner->Queue(); - - return deferred.Promise(); + Runner *runner = new Runner(info.Env(), info[0], info[1], info[2]); + return runner->queue(); } Value writeSnapshot(const CallbackInfo& info) { - return queueWork(info, writeSnapshotAsync, true); + return queueSnapshotWork(info); } Value getEventsSince(const CallbackInfo& info) { - return queueWork(info, getEventsSinceAsync, false); + return queueSnapshotWork(info); } -Value subscribe(const CallbackInfo& info) { - Env env = info.Env(); - if (info.Length() < 1 || !info[0].IsString()) { - TypeError::New(env, "Expected a string").ThrowAsJavaScriptException(); - return env.Null(); - } +class SubscribeRunner : public PromiseRunner { +public: + SubscribeRunner(Env env, Value dir, Value fn, Value opts) : PromiseRunner(env) { + watcher = Watcher::getShared( + std::string(dir.As().Utf8Value().c_str()), + getIgnore(env, opts) + ); - if (info.Length() < 2 || !info[1].IsFunction()) { - TypeError::New(env, "Expected a function").ThrowAsJavaScriptException(); - return env.Null(); + backend = getBackend(env, opts); + shouldWatch = watcher->watch(fn.As()); } - if (info.Length() >= 3 && !info[2].IsObject()) { - TypeError::New(env, "Expected an object").ThrowAsJavaScriptException(); - return env.Null(); +private: + std::shared_ptr watcher; + std::shared_ptr backend; + bool shouldWatch; + + void execute() override { + if (shouldWatch) { + backend->watch(*watcher); + } } +}; - try { - std::shared_ptr watcher = Watcher::getShared( - std::string(info[0].As().Utf8Value().c_str()), - getIgnore(env, info[2]) +class UnsubscribeRunner : public PromiseRunner { +public: + UnsubscribeRunner(Env env, Value dir, Value fn, Value opts) : PromiseRunner(env) { + watcher = Watcher::getShared( + std::string(dir.As().Utf8Value().c_str()), + getIgnore(env, opts) ); - bool added = watcher->watch(info[1].As()); - if (added) { - std::shared_ptr b = getBackend(env, info[2]); - b->watch(*watcher); - } - } catch (const char *err) { - Error::New(env, err).ThrowAsJavaScriptException(); + backend = getBackend(env, opts); + shouldUnwatch = watcher->unwatch(fn.As()); } - return env.Null(); -} +private: + std::shared_ptr watcher; + std::shared_ptr backend; + bool shouldUnwatch; -Value unsubscribe(const CallbackInfo& info) { + void execute() override { + if (shouldUnwatch) { + backend->unwatch(*watcher); + } + } +}; + +template +Value queueSubscriptionWork(const CallbackInfo& info) { Env env = info.Env(); if (info.Length() < 1 || !info[0].IsString()) { TypeError::New(env, "Expected a string").ThrowAsJavaScriptException(); @@ -246,22 +192,16 @@ Value unsubscribe(const CallbackInfo& info) { return env.Null(); } - try { - std::shared_ptr watcher = Watcher::getShared( - std::string(info[0].As().Utf8Value().c_str()), - getIgnore(env, info[2]) - ); + Runner *runner = new Runner(info.Env(), info[0], info[1], info[2]); + return runner->queue(); +} - bool removed = watcher->unwatch(info[1].As()); - if (removed) { - std::shared_ptr b = getBackend(env, info[2]);; - b->unwatch(*watcher); - } - } catch (const char *err) { - Error::New(env, err).ThrowAsJavaScriptException(); - } - - return env.Null(); +Value subscribe(const CallbackInfo& info) { + return queueSubscriptionWork(info); +} + +Value unsubscribe(const CallbackInfo& info) { + return queueSubscriptionWork(info); } Object Init(Env env, Object exports) { diff --git a/src/PromiseRunner.hh b/src/PromiseRunner.hh new file mode 100644 index 00000000..4a920dee --- /dev/null +++ b/src/PromiseRunner.hh @@ -0,0 +1,95 @@ +#ifndef PROMISE_RUNNER_H +#define PROMISE_RUNNER_H + +#include +#include + +using namespace Napi; + +class PromiseRunner { +public: + const Env env; + Promise::Deferred deferred; + + PromiseRunner(Env env) : env(env), deferred(Promise::Deferred::New(env)) { + napi_status status = napi_create_async_work(env, nullptr, env.Undefined(), + onExecute, onWorkComplete, this, &work); + if (status != napi_ok) { + work = nullptr; + const napi_extended_error_info *error_info = 0; + napi_get_last_error_info(env, &error_info); + if (error_info->error_message) { + Error::New(env, error_info->error_message).ThrowAsJavaScriptException(); + } else { + Error::New(env).ThrowAsJavaScriptException(); + } + } + } + + virtual ~PromiseRunner() {} + + Value queue() { + if (work) { + napi_status status = napi_queue_async_work(env, work); + if (status != napi_ok) { + onError(Error::New(env)); + } + } + + return deferred.Promise(); + } + +private: + napi_async_work work; + + static void onExecute(napi_env env, void *this_pointer) { + PromiseRunner* self = (PromiseRunner*) this_pointer; + try { + self->execute(); + } catch (const char *err) { + self->onError(Error::New(env, err)); + } + } + + static void onWorkComplete(napi_env env, napi_status status, void *this_pointer) { + PromiseRunner* self = (PromiseRunner*) this_pointer; + if (status != napi_cancelled) { + HandleScope scope(self->env); + if (status == napi_ok) { + status = napi_delete_async_work(self->env, self->work); + if (status == napi_ok) { + self->onOK(); + delete self; + return; + } + } + } + + // fallthrough for error handling + const napi_extended_error_info *error_info = 0; + napi_get_last_error_info(env, &error_info); + if (error_info->error_message){ + self->onError(Error::New(env, error_info->error_message)); + } else { + self->onError(Error::New(env)); + } + delete self; + } + + virtual void execute() {} + virtual Value getResult() { + return env.Null(); + } + + void onOK() { + HandleScope scope(env); + Value result = getResult(); + deferred.Resolve(result); + } + + void onError(const Error& e) { + deferred.Reject(e.Value()); + } +}; + +#endif diff --git a/test/watcher.js b/test/watcher.js index ab6b62b4..e3dc4065 100644 --- a/test/watcher.js +++ b/test/watcher.js @@ -44,11 +44,11 @@ describe('watcher', () => { ignoreDir = getFilename(); ignoreFile = getFilename(); await new Promise(resolve => setTimeout(resolve, 100)); - fschanges.subscribe(tmpDir, fn, {backend, ignore: [ignoreDir, ignoreFile]}); + await fschanges.subscribe(tmpDir, fn, {backend, ignore: [ignoreDir, ignoreFile]}); }); after(async () => { - fschanges.unsubscribe(tmpDir, fn, {backend, ignore: [ignoreDir, ignoreFile]}); + await fschanges.unsubscribe(tmpDir, fn, {backend, ignore: [ignoreDir, ignoreFile]}); }); describe('files', () => {