diff --git a/README.md b/README.md index cb025058..faf7d8d4 100644 --- a/README.md +++ b/README.md @@ -18,7 +18,7 @@ const watcher = require('@parcel/watcher'); const path = require('path'); // Subscribe to events -let subscription = await watcher.subscribe(process.cwd(), (events) => { +let subscription = await watcher.subscribe(process.cwd(), (err, events) => { console.log(events); }); @@ -42,7 +42,7 @@ Events are throttled and coalesced for performance during large changes like `gi Only one notification will be emitted per file. For example, if a file was both created and updated since the last event, you'll get only a `create` event. If a file is both created and deleted, you will not be notifed of that file. Renames cause two events: a `delete` for the old name, and a `create` for the new name. ```javascript -let subscription = await watcher.subscribe(process.cwd(), (events) => { +let subscription = await watcher.subscribe(process.cwd(), (err, events) => { console.log(events); }); ``` diff --git a/src/Backend.cc b/src/Backend.cc index 113e2d8e..379eb4a0 100644 --- a/src/Backend.cc +++ b/src/Backend.cc @@ -75,7 +75,11 @@ void removeShared(Backend *backend) { void Backend::run() { mThread = std::thread([this] () { - start(); + try { + start(); + } catch (std::exception &err) { + handleError(err); + } }); if (mThread.joinable()) { @@ -92,24 +96,28 @@ void Backend::start() { } Backend::~Backend() { - std::unique_lock lock(mMutex); - - // Unwatch all subscriptions so that their state gets cleaned up - for (auto it = mSubscriptions.begin(); it != mSubscriptions.end(); it++) { - unwatch(**it); - } - // Wait for thread to stop if (mThread.joinable()) { - mThread.join(); + // If the backend is being destroyed from the thread itself, detach, otherwise join. + if (mThread.get_id() == std::this_thread::get_id()) { + mThread.detach(); + } else { + mThread.join(); + } } } void Backend::watch(Watcher &watcher) { std::unique_lock lock(mMutex); - auto res = mSubscriptions.insert(&watcher); - if (res.second) { - this->subscribe(watcher); + auto res = mSubscriptions.find(&watcher); + if (res == mSubscriptions.end()) { + try { + this->subscribe(watcher); + mSubscriptions.insert(&watcher); + } catch (std::exception &err) { + unref(); + throw; + } } } @@ -127,3 +135,17 @@ void Backend::unref() { removeShared(this); } } + +void Backend::handleWatcherError(WatcherError &err) { + unwatch(*err.mWatcher); + err.mWatcher->notifyError(err); +} + +void Backend::handleError(std::exception &err) { + std::unique_lock lock(mMutex); + for (auto it = mSubscriptions.begin(); it != mSubscriptions.end(); it++) { + (*it)->notifyError(err); + } + + removeShared(this); +} diff --git a/src/Backend.hh b/src/Backend.hh index 3abebcd6..cb940dff 100644 --- a/src/Backend.hh +++ b/src/Backend.hh @@ -23,12 +23,15 @@ public: void watch(Watcher &watcher); void unwatch(Watcher &watcher); void unref(); + void handleWatcherError(WatcherError &err); std::mutex mMutex; std::thread mThread; private: std::unordered_set mSubscriptions; Signal mStartedSignal; + + void handleError(std::exception &err); }; #endif diff --git a/src/PromiseRunner.hh b/src/PromiseRunner.hh index 4a920dee..064768f3 100644 --- a/src/PromiseRunner.hh +++ b/src/PromiseRunner.hh @@ -41,13 +41,14 @@ public: private: napi_async_work work; + std::string error; static void onExecute(napi_env env, void *this_pointer) { PromiseRunner* self = (PromiseRunner*) this_pointer; try { self->execute(); - } catch (const char *err) { - self->onError(Error::New(env, err)); + } catch (std::exception &err) { + self->error = err.what(); } } @@ -58,7 +59,11 @@ private: if (status == napi_ok) { status = napi_delete_async_work(self->env, self->work); if (status == napi_ok) { - self->onOK(); + if (self->error.size() == 0) { + self->onOK(); + } else { + self->onError(Error::New(self->env, self->error)); + } delete self; return; } @@ -87,7 +92,7 @@ private: deferred.Resolve(result); } - void onError(const Error& e) { + void onError(const Error &e) { deferred.Reject(e.Value()); } }; diff --git a/src/Signal.hh b/src/Signal.hh index 274cc1f9..e577319d 100644 --- a/src/Signal.hh +++ b/src/Signal.hh @@ -6,10 +6,11 @@ class Signal { public: - Signal() : mFlag(false) {} + Signal() : mFlag(false), mWaiting(false) {} void wait() { std::unique_lock lock(mMutex); while (!mFlag) { + mWaiting = true; mCond.wait(lock); } } @@ -28,10 +29,16 @@ public: void reset() { std::unique_lock lock(mMutex); mFlag = false; + mWaiting = false; + } + + bool isWaiting() { + return mWaiting; } private: bool mFlag; + bool mWaiting; std::mutex mMutex; std::condition_variable mCond; }; diff --git a/src/Watcher.cc b/src/Watcher.cc index fe727acb..8d775d1f 100644 --- a/src/Watcher.cc +++ b/src/Watcher.cc @@ -50,7 +50,6 @@ Watcher::Watcher(std::string dir, std::unordered_set ignore) } Watcher::~Watcher() { - std::unique_lock lk(mMutex); mDebounce->remove(this); } @@ -68,8 +67,19 @@ void Watcher::notify() { } } +void Watcher::notifyError(std::exception &err) { + std::unique_lock lk(mMutex); + if (mCallingCallbacks) { + mCallbackSignal.wait(); + mCallbackSignal.reset(); + } + + mError = err.what(); + triggerCallbacks(); +} + void Watcher::triggerCallbacks() { - if (mCallbacks.size() > 0 && mEvents.size() > 0) { + if (mCallbacks.size() > 0 && (mEvents.size() > 0 || mError.size() > 0)) { if (mCallingCallbacks) { mCallbackSignal.wait(); mCallbackSignal.reset(); @@ -90,7 +100,9 @@ void Watcher::fireCallbacks(uv_async_t *handle) { while (watcher->mCallbacksIterator != watcher->mCallbacks.end()) { auto it = watcher->mCallbacksIterator; HandleScope scope(it->Env()); - it->Call(std::initializer_list{watcher->mCallbackEvents.toJS(it->Env())}); + auto err = watcher->mError.size() > 0 ? Error::New(it->Env(), watcher->mError).Value() : it->Env().Null(); + auto events = watcher->mCallbackEvents.toJS(it->Env()); + it->Call(std::initializer_list{err, events}); // If the iterator was changed, then the callback trigged an unwatch. // The iterator will have been set to the next valid callback. @@ -101,6 +113,11 @@ void Watcher::fireCallbacks(uv_async_t *handle) { } watcher->mCallingCallbacks = false; + + if (watcher->mError.size() > 0) { + watcher->mCallbacks.clear(); + } + if (watcher->mCallbacks.size() == 0) { watcher->unref(); } else { @@ -108,9 +125,9 @@ void Watcher::fireCallbacks(uv_async_t *handle) { } } -bool Watcher::watch(Function callback) { +bool Watcher::watch(FunctionReference callback) { std::unique_lock lk(mMutex); - auto res = mCallbacks.insert(Persistent(callback)); + auto res = mCallbacks.insert(std::move(callback)); if (res.second && !mWatched) { mAsync = new uv_async_t; mAsync->data = (void *)this; diff --git a/src/Watcher.hh b/src/Watcher.hh index fde574da..f80c3a70 100644 --- a/src/Watcher.hh +++ b/src/Watcher.hh @@ -29,7 +29,8 @@ struct Watcher { void wait(); void notify(); - bool watch(Function callback); + void notifyError(std::exception &err); + bool watch(FunctionReference callback); bool unwatch(Function callback); void unref(); bool isIgnored(std::string path); @@ -46,10 +47,18 @@ private: EventList mCallbackEvents; std::shared_ptr mDebounce; Signal mCallbackSignal; + std::string mError; void triggerCallbacks(); static void fireCallbacks(uv_async_t *handle); static void onClose(uv_handle_t *handle); }; +class WatcherError : public std::runtime_error { +public: + Watcher *mWatcher; + WatcherError(std::string msg, Watcher *watcher) : std::runtime_error(msg), mWatcher(watcher) {} + WatcherError(const char *msg, Watcher *watcher) : std::runtime_error(msg), mWatcher(watcher) {} +}; + #endif diff --git a/src/binding.cc b/src/binding.cc index 03c23ca1..1863f67d 100644 --- a/src/binding.cc +++ b/src/binding.cc @@ -135,18 +135,17 @@ class SubscribeRunner : public PromiseRunner { ); backend = getBackend(env, opts); - shouldWatch = watcher->watch(fn.As()); + callback = Persistent(fn.As()); } private: std::shared_ptr watcher; std::shared_ptr backend; - bool shouldWatch; + FunctionReference callback; void execute() override { - if (shouldWatch) { - backend->watch(*watcher); - } + backend->watch(*watcher); + watcher->watch(std::move(callback)); } }; diff --git a/src/linux/InotifyBackend.cc b/src/linux/InotifyBackend.cc index d1857990..280640b3 100644 --- a/src/linux/InotifyBackend.cc +++ b/src/linux/InotifyBackend.cc @@ -14,13 +14,13 @@ void InotifyBackend::start() { // Create a pipe that we will write to when we want to end the thread. int err = pipe2(mPipe, O_CLOEXEC | O_NONBLOCK); if (err == -1) { - throw "Unable to open pipe"; + throw std::runtime_error(std::string("Unable to open pipe: ") + strerror(errno)); } // Init inotify file descriptor. mInotify = inotify_init1(IN_NONBLOCK | IN_CLOEXEC); if (mInotify == -1) { - throw "Unable to initialize inotify"; + throw std::runtime_error(std::string("Unable to initialize inotify: ") + strerror(errno)); } pollfd pollfds[2]; @@ -37,7 +37,7 @@ void InotifyBackend::start() { while (true) { int result = poll(pollfds, 2, 500); if (result < 0) { - throw "Unable to poll"; + throw std::runtime_error(std::string("Unable to poll: ") + strerror(errno)); } if (pollfds[0].revents) { @@ -75,7 +75,7 @@ void InotifyBackend::subscribe(Watcher &watcher) { void InotifyBackend::watchDir(Watcher &watcher, DirEntry *entry, std::shared_ptr tree) { int wd = inotify_add_watch(mInotify, entry->path.c_str(), INOTIFY_MASK); if (wd == -1) { - throw "inotify_add_watch failed"; + throw WatcherError(std::string("inotify_add_watch failed: ") + strerror(errno), &watcher); } std::shared_ptr sub = std::make_shared(); @@ -99,7 +99,7 @@ void InotifyBackend::handleEvents() { break; } - throw "Error reading from inotify"; + throw std::runtime_error(std::string("Error reading from inotify: ") + strerror(errno)); } if (n == 0) { @@ -201,7 +201,7 @@ void InotifyBackend::unsubscribe(Watcher &watcher) { if (mSubscriptions.count(it->first) == 1) { int err = inotify_rm_watch(mInotify, it->first); if (err == -1) { - throw "Unable to remove watcher"; + throw WatcherError(std::string("Unable to remove watcher: ") + strerror(errno), &watcher); } } diff --git a/src/macos/FSEventsBackend.cc b/src/macos/FSEventsBackend.cc index 4a871111..7986b76e 100644 --- a/src/macos/FSEventsBackend.cc +++ b/src/macos/FSEventsBackend.cc @@ -108,7 +108,20 @@ void FSEventsCallback( } } -void FSEventsBackend::startStream(Watcher &watcher, FSEventStreamEventId id) { +void checkWatcher(Watcher &watcher) { + struct stat file; + if (stat(watcher.mDir.c_str(), &file)) { + throw WatcherError(strerror(errno), &watcher); + } + + if (!S_ISDIR(file.st_mode)) { + throw WatcherError(strerror(ENOTDIR), &watcher); + } +} + +void FSEventsBackend::startStream(Watcher &watcher, FSEventStreamEventId id) { + checkWatcher(watcher); + CFAbsoluteTime latency = 0.01; CFStringRef fileWatchPath = CFStringCreateWithCString( NULL, @@ -148,10 +161,16 @@ void FSEventsBackend::startStream(Watcher &watcher, FSEventStreamEventId id) { FSEventStreamSetExclusionPaths(stream, exclusions); FSEventStreamScheduleWithRunLoop(stream, mRunLoop, kCFRunLoopDefaultMode); - FSEventStreamStart(stream); + bool started = FSEventStreamStart(stream); + CFRelease(pathsToWatch); CFRelease(fileWatchPath); + if (!started) { + FSEventStreamRelease(stream); + throw WatcherError("Error starting FSEvents stream", &watcher); + } + State *s = (State *)watcher.state; s->tree = std::make_shared(watcher.mDir); s->stream = stream; @@ -178,6 +197,8 @@ FSEventsBackend::~FSEventsBackend() { void FSEventsBackend::writeSnapshot(Watcher &watcher, std::string *snapshotPath) { std::unique_lock lock(mMutex); + checkWatcher(watcher); + FSEventStreamEventId id = FSEventsGetCurrentEventId(); std::ofstream ofs(*snapshotPath); ofs << id; diff --git a/src/unix/fts.cc b/src/unix/fts.cc index 5c3dbf3a..e66f47ab 100644 --- a/src/unix/fts.cc +++ b/src/unix/fts.cc @@ -12,15 +12,31 @@ void BruteForceBackend::readTree(Watcher &watcher, std::shared_ptr tree) { char *paths[2] {(char *)watcher.mDir.c_str(), NULL}; FTS *fts = fts_open(paths, FTS_NOCHDIR | FTS_PHYSICAL, NULL); + if (!fts) { + throw WatcherError(strerror(errno), &watcher); + } + FTSENT *node; + bool isRoot = true; while ((node = fts_read(fts)) != NULL) { + if (node->fts_errno) { + fts_close(fts); + throw WatcherError(strerror(node->fts_errno), &watcher); + } + + if (isRoot && !(node->fts_info & FTS_D)) { + fts_close(fts); + throw WatcherError(strerror(ENOTDIR), &watcher); + } + if (watcher.mIgnore.count(std::string(node->fts_path)) > 0) { fts_set(fts, node, FTS_SKIP); continue; } tree->add(node->fts_path, CONVERT_TIME(node->fts_statp->st_mtim), (node->fts_info & FTS_D) == FTS_D); + isRoot = false; } fts_close(fts); diff --git a/src/watchman/BSER.cc b/src/watchman/BSER.cc index 951e848e..1fbcd45b 100644 --- a/src/watchman/BSER.cc +++ b/src/watchman/BSER.cc @@ -10,7 +10,7 @@ BSERType decodeType(std::istream &iss) { void expectType(std::istream &iss, BSERType expected) { BSERType got = decodeType(iss); if (got != expected) { - throw "Unexpected BSER type"; + throw std::runtime_error("Unexpected BSER type"); } } @@ -59,7 +59,7 @@ class BSERInteger : public Value { value = int64; break; default: - throw "Invalid BSER int type"; + throw std::runtime_error("Invalid BSER int type"); } } @@ -257,7 +257,7 @@ BSER::BSER(std::istream &iss) { m_ptr = decodeTemplate(iss); break; default: - throw "unknown BSER type"; + throw std::runtime_error("unknown BSER type"); } } @@ -283,7 +283,7 @@ void BSER::encode(std::ostream &oss) { int64_t BSER::decodeLength(std::istream &iss) { char pdu[2]; if (!iss.read(pdu, 2) || pdu[0] != 0 || pdu[1] != 1) { - throw "Invalid BSER"; + throw std::runtime_error("Invalid BSER"); } return BSERInteger(iss).intValue(); diff --git a/src/watchman/IPC.hh b/src/watchman/IPC.hh index ccbef792..3641b6d6 100644 --- a/src/watchman/IPC.hh +++ b/src/watchman/IPC.hh @@ -33,12 +33,12 @@ public: } if (GetLastError() != ERROR_PIPE_BUSY) { - throw "Could not open pipe"; + throw std::runtime_error("Could not open pipe"); } // Wait for pipe to become available if it is busy if (!WaitNamedPipe(path.data(), 30000)) { - throw "Error waiting for pipe"; + throw std::runtime_error("Error waiting for pipe"); } } @@ -52,7 +52,7 @@ public: mSock = socket(AF_UNIX, SOCK_STREAM, 0); if (connect(mSock, (struct sockaddr *) &addr, sizeof(struct sockaddr_un))) { - throw "Error connecting to socket"; + throw std::runtime_error("Error connecting to socket"); } #endif } @@ -87,18 +87,18 @@ public: if (!success) { if (GetLastError() != ERROR_IO_PENDING) { - throw "Write error"; + throw std::runtime_error("Write error"); } } DWORD written; success = GetOverlappedResult(mPipe, &overlapped, &written, true); if (!success) { - throw "GetOverlappedResult failed"; + throw std::runtime_error("GetOverlappedResult failed"); } if (written != buf.size()) { - throw "Wrong number of bytes written"; + throw std::runtime_error("Wrong number of bytes written"); } #else int r = 0; @@ -110,7 +110,7 @@ public: } else if (mStopped) { return; } else { - throw "Write error"; + throw std::runtime_error("Write error"); } } } @@ -131,25 +131,29 @@ public: if (!success && !mStopped) { if (GetLastError() != ERROR_IO_PENDING) { - throw "Read error"; + throw std::runtime_error("Read error"); } } DWORD read = 0; success = GetOverlappedResult(mPipe, &overlapped, &read, true); if (!success && !mStopped) { - throw "GetOverlappedResult failed"; + throw std::runtime_error("GetOverlappedResult failed"); } return read; #else - int r = ::read(mSock, buf, len); + int r = ::read(mSock, buf, len); + if (r == 0 && !mStopped) { + throw std::runtime_error("Socket ended unexpectedly"); + } + if (r < 0) { if (mStopped) { return 0; } - throw strerror(errno); + throw std::runtime_error(strerror(errno)); } return r; diff --git a/src/watchman/WatchmanBackend.cc b/src/watchman/WatchmanBackend.cc index cb25f166..324645e1 100644 --- a/src/watchman/WatchmanBackend.cc +++ b/src/watchman/WatchmanBackend.cc @@ -47,7 +47,7 @@ std::string getSockPath() { FILE *fp = popen("watchman --output-encoding=bser get-sockname", "r"); if (fp == NULL || errno == ECHILD) { - throw "Failed to execute watchman"; + throw std::runtime_error("Failed to execute watchman"); } BSER b = readBSER([fp] (char *buf, size_t len) { @@ -72,10 +72,17 @@ BSER watchmanRead(IPC *ipc) { BSER::Object WatchmanBackend::watchmanRequest(BSER b) { std::string cmd = b.encode(); mIPC->write(cmd); - mRequestSignal.notify(); + mResponseSignal.wait(); mResponseSignal.reset(); + + if (!mError.empty()) { + std::runtime_error err = std::runtime_error(mError); + mError = std::string(); + throw err; + } + return mResponse; } @@ -90,7 +97,7 @@ bool WatchmanBackend::checkAvailable() { try { watchmanConnect(); return true; - } catch (const char *err) { + } catch (std::exception &err) { return false; } } @@ -98,7 +105,7 @@ bool WatchmanBackend::checkAvailable() { void handleFiles(Watcher &watcher, BSER::Object obj) { auto found = obj.find("files"); if (found == obj.end()) { - throw "Error reading changes from watchman"; + throw WatcherError("Error reading changes from watchman", &watcher); } auto files = found->second.arrayValue(); @@ -135,8 +142,12 @@ void WatchmanBackend::handleSubscription(BSER::Object obj) { } auto watcher = it->second; - handleFiles(*watcher, obj); - watcher->notify(); + try { + handleFiles(*watcher, obj); + watcher->notify(); + } catch (WatcherError &err) { + handleWatcherError(err); + } } void WatchmanBackend::start() { @@ -160,18 +171,25 @@ void WatchmanBackend::start() { BSER b; try { b = watchmanRead(&*mIPC); - } catch (const char *err) { + } catch (std::exception &err) { if (mStopped) { break; + } else if (mResponseSignal.isWaiting()) { + mError = err.what(); + mResponseSignal.notify(); } else { - throw err; + // Throwing causes the backend to be destroyed, but we never reach the code below to notify the signal + mEndedSignal.notify(); + throw; } } auto obj = b.objectValue(); auto error = obj.find("error"); if (error != obj.end()) { - throw error->second.stringValue().c_str(); + mError = error->second.stringValue(); + mResponseSignal.notify(); + continue; } // If this message is for a subscription, handle it, otherwise notify the request. @@ -188,8 +206,6 @@ void WatchmanBackend::start() { } WatchmanBackend::~WatchmanBackend() { - std::unique_lock lock(mMutex); - // Mark the watcher as stopped, close the socket, and trigger the lock. // This will cause the read loop to be broken and the thread to exit. mStopped = true; @@ -208,7 +224,7 @@ std::string WatchmanBackend::clock(Watcher &watcher) { BSER::Object obj = watchmanRequest(cmd); auto found = obj.find("clock"); if (found == obj.end()) { - throw "Error reading clock from watchman"; + throw WatcherError("Error reading clock from watchman", &watcher); } return found->second.stringValue(); @@ -254,8 +270,6 @@ void WatchmanBackend::subscribe(Watcher &watcher) { watchmanWatch(watcher.mDir); std::string id = getId(watcher); - mSubscriptions.emplace(id, &watcher); - BSER::Array cmd; cmd.push_back("subscribe"); cmd.push_back(normalizePath(watcher.mDir)); @@ -295,6 +309,9 @@ void WatchmanBackend::subscribe(Watcher &watcher) { cmd.push_back(opts); watchmanRequest(cmd); + + mSubscriptions.emplace(id, &watcher); + mRequestSignal.notify(); } void WatchmanBackend::unsubscribe(Watcher &watcher) { diff --git a/src/watchman/WatchmanBackend.hh b/src/watchman/WatchmanBackend.hh index 501cd40d..903f228c 100644 --- a/src/watchman/WatchmanBackend.hh +++ b/src/watchman/WatchmanBackend.hh @@ -21,6 +21,7 @@ private: Signal mRequestSignal; Signal mResponseSignal; BSER::Object mResponse; + std::string mError; std::unordered_map mSubscriptions; bool mStopped; Signal mEndedSignal; diff --git a/src/windows/WindowsBackend.cc b/src/windows/WindowsBackend.cc index 255cf66b..4443607b 100644 --- a/src/windows/WindowsBackend.cc +++ b/src/windows/WindowsBackend.cc @@ -6,6 +6,8 @@ #include "./WindowsBackend.hh" #include "./win_utils.hh" +#define DEFAULT_BUF_SIZE 1024 * 1024 +#define NETWORK_BUF_SIZE 64 * 1024 #define CONVERT_TIME(ft) ULARGE_INTEGER{ft.dwLowDateTime, ft.dwHighDateTime}.QuadPart void BruteForceBackend::readTree(Watcher &watcher, std::shared_ptr tree) { @@ -23,6 +25,11 @@ void BruteForceBackend::readTree(Watcher &watcher, std::shared_ptr tree hFind = FindFirstFile(spec.c_str(), &ffd); if (hFind == INVALID_HANDLE_VALUE) { + if (path == watcher.mDir) { + FindClose(hFind); + throw WatcherError("Error opening directory", &watcher); + } + tree->remove(path); continue; } @@ -55,8 +62,6 @@ void WindowsBackend::start() { } WindowsBackend::~WindowsBackend() { - std::unique_lock lock(mMutex); - // Mark as stopped, and queue a noop function in the thread to break the loop mRunning = false; QueueUserAPC([](__in ULONG_PTR) {}, mThread.native_handle(), (ULONG_PTR)this); @@ -64,14 +69,15 @@ WindowsBackend::~WindowsBackend() { class Subscription { public: - Subscription(Watcher *watcher, std::shared_ptr tree) { + Subscription(WindowsBackend *backend, Watcher *watcher, std::shared_ptr tree) { mRunning = true; + mBackend = backend; mWatcher = watcher; mTree = tree; ZeroMemory(&mOverlapped, sizeof(OVERLAPPED)); mOverlapped.hEvent = this; - mReadBuffer.resize(1024 * 1024); - mWriteBuffer.resize(1024 * 1024); + mReadBuffer.resize(DEFAULT_BUF_SIZE); + mWriteBuffer.resize(DEFAULT_BUF_SIZE); mDirectoryHandle = CreateFileW( utf8ToUtf16(watcher->mDir).data(), @@ -84,7 +90,22 @@ class Subscription { ); if (mDirectoryHandle == INVALID_HANDLE_VALUE) { - throw "Invalid handle"; + throw WatcherError("Invalid handle", mWatcher); + } + + // Ensure that the path is a directory + BY_HANDLE_FILE_INFORMATION info; + bool success = GetFileInformationByHandle( + mDirectoryHandle, + &info + ); + + if (!success) { + throw WatcherError("Could not get file information", mWatcher); + } + + if (!(info.dwFileAttributes & FILE_ATTRIBUTE_DIRECTORY)) { + throw WatcherError("Not a directory", mWatcher); } } @@ -94,6 +115,14 @@ class Subscription { CloseHandle(mDirectoryHandle); } + void run() { + try { + poll(); + } catch (WatcherError &err) { + mBackend->handleWatcherError(err); + } + } + void poll() { if (!mRunning) { return; @@ -111,12 +140,16 @@ class Subscription { &mOverlapped, [](DWORD errorCode, DWORD numBytes, LPOVERLAPPED overlapped) { auto subscription = reinterpret_cast(overlapped->hEvent); - subscription->processEvents(errorCode); + try { + subscription->processEvents(errorCode); + } catch (WatcherError &err) { + subscription->mBackend->handleWatcherError(err); + } } ); if (!success) { - throw "Unexpected shutdown"; + throw WatcherError("Failed to read changes", mWatcher); } } @@ -125,17 +158,20 @@ class Subscription { return; } - // TODO: error handling switch (errorCode) { case ERROR_OPERATION_ABORTED: return; case ERROR_INVALID_PARAMETER: + // resize buffers to network size (64kb), and try again + mReadBuffer.resize(NETWORK_BUF_SIZE); + mWriteBuffer.resize(NETWORK_BUF_SIZE); + poll(); return; case ERROR_NOTIFY_ENUM_DIR: - return; + throw WatcherError("Buffer overflow. Some events may have been lost.", mWatcher); default: if (errorCode != ERROR_SUCCESS) { - throw "Unknown error"; + throw WatcherError("Unknown error", mWatcher); } } @@ -194,6 +230,7 @@ class Subscription { } private: + WindowsBackend *mBackend; Watcher *mWatcher; std::shared_ptr mTree; bool mRunning; @@ -205,14 +242,18 @@ class Subscription { void WindowsBackend::subscribe(Watcher &watcher) { // Create a subscription for this watcher - Subscription *sub = new Subscription(&watcher, getTree(watcher, false)); + Subscription *sub = new Subscription(this, &watcher, getTree(watcher, false)); watcher.state = (void *)sub; // Queue polling for this subscription in the correct thread. - QueueUserAPC([](__in ULONG_PTR ptr) { + bool success = QueueUserAPC([](__in ULONG_PTR ptr) { Subscription *sub = (Subscription *)ptr; - sub->poll(); + sub->run(); }, mThread.native_handle(), (ULONG_PTR)sub); + + if (!success) { + throw std::runtime_error("Unable to queue APC"); + } } void WindowsBackend::unsubscribe(Watcher &watcher) { diff --git a/test/since.js b/test/since.js index b363b312..99e3a674 100644 --- a/test/since.js +++ b/test/since.js @@ -482,6 +482,40 @@ describe('since', () => { ]); }); }); + + describe('errors', () => { + it('should error if the watched directory does not exist', async () => { + let dir = path.join(fs.realpathSync(require('os').tmpdir()), Math.random().toString(31).slice(2)); + + let threw = false; + try { + await fschanges.writeSnapshot(dir, snapshotPath, {backend}); + } catch (err) { + threw = true; + } + + assert(threw, 'did not throw'); + }); + + it('should error if the watched path is not a directory', async () => { + if (backend === 'watchman' && process.platform === 'win32') { + // There is a bug in watchman on windows where the `watch` command hangs if the path is not a directory. + return; + } + + let file = path.join(fs.realpathSync(require('os').tmpdir()), Math.random().toString(31).slice(2)); + fs.writeFileSync(file, 'test'); + + let threw = false; + try { + await fschanges.writeSnapshot(file, snapshotPath, {backend}); + } catch (err) { + threw = true; + } + + assert(threw, 'did not throw'); + }); + }); }); }); }); diff --git a/test/watcher.js b/test/watcher.js index d306c788..b4a14d76 100644 --- a/test/watcher.js +++ b/test/watcher.js @@ -2,6 +2,7 @@ const fschanges = require('../'); const assert = require('assert'); const fs = require('fs-extra'); const path = require('path'); +const {execSync} = require('child_process'); let backends = []; if (process.platform === 'darwin') { @@ -24,7 +25,11 @@ describe('watcher', () => { }); }; - let fn = events => { + let fn = (err, events) => { + if (err) { + throw err; + } + setImmediate(() => { for (let cb of cbs) { cb(events); @@ -443,7 +448,7 @@ describe('watcher', () => { function listen() { return new Promise(async resolve => { - let sub = await fschanges.subscribe(dir, async events => { + let sub = await fschanges.subscribe(dir, async (err, events) => { setImmediate(() => resolve(events)); await sub.unsubscribe(); }, {backend}); @@ -470,7 +475,7 @@ describe('watcher', () => { function listen(ignore) { return new Promise(async resolve => { - let sub = await fschanges.subscribe(dir, async events => { + let sub = await fschanges.subscribe(dir, async (err, events) => { setImmediate(() => resolve(events)); await sub.unsubscribe(); }, {backend, ignore}); @@ -500,7 +505,7 @@ describe('watcher', () => { function listen(dir) { return new Promise(async resolve => { - let sub = await fschanges.subscribe(dir, async events => { + let sub = await fschanges.subscribe(dir, async (err, events) => { setImmediate(() => resolve(events)); await sub.unsubscribe(); }, {backend}); @@ -529,9 +534,9 @@ describe('watcher', () => { function listen(dir) { return new Promise(async resolve => { - let sub = await fschanges.subscribe(dir, events => { + let sub = await fschanges.subscribe(dir, (err, events) => { setImmediate(() => resolve([events, sub])); - }); + }, {backend}); }); } @@ -560,6 +565,63 @@ describe('watcher', () => { await sub.unsubscribe(); }); }); + + describe('errors', () => { + it('should error if the watched directory does not exist', async () => { + let dir = path.join(fs.realpathSync(require('os').tmpdir()), Math.random().toString(31).slice(2)); + + let threw = false; + try { + await fschanges.subscribe(dir, (err, events) => { + assert(false, 'Should not get here'); + }, {backend}); + } catch (err) { + threw = true; + } + + assert(threw, 'did not throw'); + }); + + it('should error if the watched path is not a directory', async () => { + if (backend === 'watchman' && process.platform === 'win32') { + // There is a bug in watchman on windows where the `watch` command hangs if the path is not a directory. + return; + } + + let file = path.join(fs.realpathSync(require('os').tmpdir()), Math.random().toString(31).slice(2)); + fs.writeFileSync(file, 'test'); + + let threw = false; + try { + await fschanges.subscribe(file, (err, events) => { + assert(false, 'Should not get here'); + }, {backend}); + } catch (err) { + threw = true; + } + + assert(threw, 'did not throw'); + }); + }); + }); + }); + + describe('watchman errors', () => { + it('should emit an error when watchman dies', async () => { + let dir = path.join(fs.realpathSync(require('os').tmpdir()), Math.random().toString(31).slice(2)); + fs.mkdirpSync(dir); + await new Promise(resolve => setTimeout(resolve, 100)); + + let p = new Promise(resolve => { + fschanges.subscribe(dir, (err, events) => { + setImmediate(() => resolve(err)); + }, {backend: 'watchman'}); + }); + + execSync('watchman shutdown-server'); + + let err = await p; + assert(err, 'No error was emitted'); }); }); });