Skip to content

Commit

Permalink
Error handling support (#15)
Browse files Browse the repository at this point in the history
  • Loading branch information
devongovett authored Apr 7, 2019
1 parent c016a4e commit a66e9eb
Show file tree
Hide file tree
Showing 18 changed files with 345 additions and 87 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ const watcher = require('@parcel/watcher');
const path = require('path');

// Subscribe to events
let subscription = await watcher.subscribe(process.cwd(), (events) => {
let subscription = await watcher.subscribe(process.cwd(), (err, events) => {
console.log(events);
});

Expand All @@ -42,7 +42,7 @@ Events are throttled and coalesced for performance during large changes like `gi
Only one notification will be emitted per file. For example, if a file was both created and updated since the last event, you'll get only a `create` event. If a file is both created and deleted, you will not be notifed of that file. Renames cause two events: a `delete` for the old name, and a `create` for the new name.

```javascript
let subscription = await watcher.subscribe(process.cwd(), (events) => {
let subscription = await watcher.subscribe(process.cwd(), (err, events) => {
console.log(events);
});
```
Expand Down
46 changes: 34 additions & 12 deletions src/Backend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,11 @@ void removeShared(Backend *backend) {

void Backend::run() {
mThread = std::thread([this] () {
start();
try {
start();
} catch (std::exception &err) {
handleError(err);
}
});

if (mThread.joinable()) {
Expand All @@ -92,24 +96,28 @@ void Backend::start() {
}

Backend::~Backend() {
std::unique_lock<std::mutex> lock(mMutex);

// Unwatch all subscriptions so that their state gets cleaned up
for (auto it = mSubscriptions.begin(); it != mSubscriptions.end(); it++) {
unwatch(**it);
}

// Wait for thread to stop
if (mThread.joinable()) {
mThread.join();
// If the backend is being destroyed from the thread itself, detach, otherwise join.
if (mThread.get_id() == std::this_thread::get_id()) {
mThread.detach();
} else {
mThread.join();
}
}
}

void Backend::watch(Watcher &watcher) {
std::unique_lock<std::mutex> lock(mMutex);
auto res = mSubscriptions.insert(&watcher);
if (res.second) {
this->subscribe(watcher);
auto res = mSubscriptions.find(&watcher);
if (res == mSubscriptions.end()) {
try {
this->subscribe(watcher);
mSubscriptions.insert(&watcher);
} catch (std::exception &err) {
unref();
throw;
}
}
}

Expand All @@ -127,3 +135,17 @@ void Backend::unref() {
removeShared(this);
}
}

void Backend::handleWatcherError(WatcherError &err) {
unwatch(*err.mWatcher);
err.mWatcher->notifyError(err);
}

void Backend::handleError(std::exception &err) {
std::unique_lock<std::mutex> lock(mMutex);
for (auto it = mSubscriptions.begin(); it != mSubscriptions.end(); it++) {
(*it)->notifyError(err);
}

removeShared(this);
}
3 changes: 3 additions & 0 deletions src/Backend.hh
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,15 @@ public:
void watch(Watcher &watcher);
void unwatch(Watcher &watcher);
void unref();
void handleWatcherError(WatcherError &err);

std::mutex mMutex;
std::thread mThread;
private:
std::unordered_set<Watcher *> mSubscriptions;
Signal mStartedSignal;

void handleError(std::exception &err);
};

#endif
13 changes: 9 additions & 4 deletions src/PromiseRunner.hh
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,14 @@ public:

private:
napi_async_work work;
std::string error;

static void onExecute(napi_env env, void *this_pointer) {
PromiseRunner* self = (PromiseRunner*) this_pointer;
try {
self->execute();
} catch (const char *err) {
self->onError(Error::New(env, err));
} catch (std::exception &err) {
self->error = err.what();
}
}

Expand All @@ -58,7 +59,11 @@ private:
if (status == napi_ok) {
status = napi_delete_async_work(self->env, self->work);
if (status == napi_ok) {
self->onOK();
if (self->error.size() == 0) {
self->onOK();
} else {
self->onError(Error::New(self->env, self->error));
}
delete self;
return;
}
Expand Down Expand Up @@ -87,7 +92,7 @@ private:
deferred.Resolve(result);
}

void onError(const Error& e) {
void onError(const Error &e) {
deferred.Reject(e.Value());
}
};
Expand Down
9 changes: 8 additions & 1 deletion src/Signal.hh
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@

class Signal {
public:
Signal() : mFlag(false) {}
Signal() : mFlag(false), mWaiting(false) {}
void wait() {
std::unique_lock<std::mutex> lock(mMutex);
while (!mFlag) {
mWaiting = true;
mCond.wait(lock);
}
}
Expand All @@ -28,10 +29,16 @@ public:
void reset() {
std::unique_lock<std::mutex> lock(mMutex);
mFlag = false;
mWaiting = false;
}

bool isWaiting() {
return mWaiting;
}

private:
bool mFlag;
bool mWaiting;
std::mutex mMutex;
std::condition_variable mCond;
};
Expand Down
27 changes: 22 additions & 5 deletions src/Watcher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ Watcher::Watcher(std::string dir, std::unordered_set<std::string> ignore)
}

Watcher::~Watcher() {
std::unique_lock<std::mutex> lk(mMutex);
mDebounce->remove(this);
}

Expand All @@ -68,8 +67,19 @@ void Watcher::notify() {
}
}

void Watcher::notifyError(std::exception &err) {
std::unique_lock<std::mutex> lk(mMutex);
if (mCallingCallbacks) {
mCallbackSignal.wait();
mCallbackSignal.reset();
}

mError = err.what();
triggerCallbacks();
}

void Watcher::triggerCallbacks() {
if (mCallbacks.size() > 0 && mEvents.size() > 0) {
if (mCallbacks.size() > 0 && (mEvents.size() > 0 || mError.size() > 0)) {
if (mCallingCallbacks) {
mCallbackSignal.wait();
mCallbackSignal.reset();
Expand All @@ -90,7 +100,9 @@ void Watcher::fireCallbacks(uv_async_t *handle) {
while (watcher->mCallbacksIterator != watcher->mCallbacks.end()) {
auto it = watcher->mCallbacksIterator;
HandleScope scope(it->Env());
it->Call(std::initializer_list<napi_value>{watcher->mCallbackEvents.toJS(it->Env())});
auto err = watcher->mError.size() > 0 ? Error::New(it->Env(), watcher->mError).Value() : it->Env().Null();
auto events = watcher->mCallbackEvents.toJS(it->Env());
it->Call(std::initializer_list<napi_value>{err, events});

// If the iterator was changed, then the callback trigged an unwatch.
// The iterator will have been set to the next valid callback.
Expand All @@ -101,16 +113,21 @@ void Watcher::fireCallbacks(uv_async_t *handle) {
}

watcher->mCallingCallbacks = false;

if (watcher->mError.size() > 0) {
watcher->mCallbacks.clear();
}

if (watcher->mCallbacks.size() == 0) {
watcher->unref();
} else {
watcher->mCallbackSignal.notify();
}
}

bool Watcher::watch(Function callback) {
bool Watcher::watch(FunctionReference callback) {
std::unique_lock<std::mutex> lk(mMutex);
auto res = mCallbacks.insert(Persistent(callback));
auto res = mCallbacks.insert(std::move(callback));
if (res.second && !mWatched) {
mAsync = new uv_async_t;
mAsync->data = (void *)this;
Expand Down
11 changes: 10 additions & 1 deletion src/Watcher.hh
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ struct Watcher {

void wait();
void notify();
bool watch(Function callback);
void notifyError(std::exception &err);
bool watch(FunctionReference callback);
bool unwatch(Function callback);
void unref();
bool isIgnored(std::string path);
Expand All @@ -46,10 +47,18 @@ private:
EventList mCallbackEvents;
std::shared_ptr<Debounce> mDebounce;
Signal mCallbackSignal;
std::string mError;

void triggerCallbacks();
static void fireCallbacks(uv_async_t *handle);
static void onClose(uv_handle_t *handle);
};

class WatcherError : public std::runtime_error {
public:
Watcher *mWatcher;
WatcherError(std::string msg, Watcher *watcher) : std::runtime_error(msg), mWatcher(watcher) {}
WatcherError(const char *msg, Watcher *watcher) : std::runtime_error(msg), mWatcher(watcher) {}
};

#endif
9 changes: 4 additions & 5 deletions src/binding.cc
Original file line number Diff line number Diff line change
Expand Up @@ -135,18 +135,17 @@ class SubscribeRunner : public PromiseRunner {
);

backend = getBackend(env, opts);
shouldWatch = watcher->watch(fn.As<Function>());
callback = Persistent(fn.As<Function>());
}

private:
std::shared_ptr<Watcher> watcher;
std::shared_ptr<Backend> backend;
bool shouldWatch;
FunctionReference callback;

void execute() override {
if (shouldWatch) {
backend->watch(*watcher);
}
backend->watch(*watcher);
watcher->watch(std::move(callback));
}
};

Expand Down
12 changes: 6 additions & 6 deletions src/linux/InotifyBackend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@ void InotifyBackend::start() {
// Create a pipe that we will write to when we want to end the thread.
int err = pipe2(mPipe, O_CLOEXEC | O_NONBLOCK);
if (err == -1) {
throw "Unable to open pipe";
throw std::runtime_error(std::string("Unable to open pipe: ") + strerror(errno));
}

// Init inotify file descriptor.
mInotify = inotify_init1(IN_NONBLOCK | IN_CLOEXEC);
if (mInotify == -1) {
throw "Unable to initialize inotify";
throw std::runtime_error(std::string("Unable to initialize inotify: ") + strerror(errno));
}

pollfd pollfds[2];
Expand All @@ -37,7 +37,7 @@ void InotifyBackend::start() {
while (true) {
int result = poll(pollfds, 2, 500);
if (result < 0) {
throw "Unable to poll";
throw std::runtime_error(std::string("Unable to poll: ") + strerror(errno));
}

if (pollfds[0].revents) {
Expand Down Expand Up @@ -75,7 +75,7 @@ void InotifyBackend::subscribe(Watcher &watcher) {
void InotifyBackend::watchDir(Watcher &watcher, DirEntry *entry, std::shared_ptr<DirTree> tree) {
int wd = inotify_add_watch(mInotify, entry->path.c_str(), INOTIFY_MASK);
if (wd == -1) {
throw "inotify_add_watch failed";
throw WatcherError(std::string("inotify_add_watch failed: ") + strerror(errno), &watcher);
}

std::shared_ptr<InotifySubscription> sub = std::make_shared<InotifySubscription>();
Expand All @@ -99,7 +99,7 @@ void InotifyBackend::handleEvents() {
break;
}

throw "Error reading from inotify";
throw std::runtime_error(std::string("Error reading from inotify: ") + strerror(errno));
}

if (n == 0) {
Expand Down Expand Up @@ -201,7 +201,7 @@ void InotifyBackend::unsubscribe(Watcher &watcher) {
if (mSubscriptions.count(it->first) == 1) {
int err = inotify_rm_watch(mInotify, it->first);
if (err == -1) {
throw "Unable to remove watcher";
throw WatcherError(std::string("Unable to remove watcher: ") + strerror(errno), &watcher);
}
}

Expand Down
25 changes: 23 additions & 2 deletions src/macos/FSEventsBackend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,20 @@ void FSEventsCallback(
}
}

void FSEventsBackend::startStream(Watcher &watcher, FSEventStreamEventId id) {
void checkWatcher(Watcher &watcher) {
struct stat file;
if (stat(watcher.mDir.c_str(), &file)) {
throw WatcherError(strerror(errno), &watcher);
}

if (!S_ISDIR(file.st_mode)) {
throw WatcherError(strerror(ENOTDIR), &watcher);
}
}

void FSEventsBackend::startStream(Watcher &watcher, FSEventStreamEventId id) {
checkWatcher(watcher);

CFAbsoluteTime latency = 0.01;
CFStringRef fileWatchPath = CFStringCreateWithCString(
NULL,
Expand Down Expand Up @@ -148,10 +161,16 @@ void FSEventsBackend::startStream(Watcher &watcher, FSEventStreamEventId id) {
FSEventStreamSetExclusionPaths(stream, exclusions);

FSEventStreamScheduleWithRunLoop(stream, mRunLoop, kCFRunLoopDefaultMode);
FSEventStreamStart(stream);
bool started = FSEventStreamStart(stream);

CFRelease(pathsToWatch);
CFRelease(fileWatchPath);

if (!started) {
FSEventStreamRelease(stream);
throw WatcherError("Error starting FSEvents stream", &watcher);
}

State *s = (State *)watcher.state;
s->tree = std::make_shared<DirTree>(watcher.mDir);
s->stream = stream;
Expand All @@ -178,6 +197,8 @@ FSEventsBackend::~FSEventsBackend() {

void FSEventsBackend::writeSnapshot(Watcher &watcher, std::string *snapshotPath) {
std::unique_lock<std::mutex> lock(mMutex);
checkWatcher(watcher);

FSEventStreamEventId id = FSEventsGetCurrentEventId();
std::ofstream ofs(*snapshotPath);
ofs << id;
Expand Down
Loading

0 comments on commit a66e9eb

Please sign in to comment.