Skip to content

Commit

Permalink
Add tests for multiple simultaneous watchers (#4)
Browse files Browse the repository at this point in the history
  • Loading branch information
devongovett authored Mar 30, 2019
1 parent dbda1ec commit 728f9a4
Show file tree
Hide file tree
Showing 7 changed files with 170 additions and 42 deletions.
14 changes: 10 additions & 4 deletions src/Debounce.hh
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#define DEBOUNCE_H

#include <thread>
#include <unordered_map>
#include "Signal.hh"

class Debounce {
Expand Down Expand Up @@ -30,9 +31,14 @@ public:
mThread.join();
}

void add(std::function<void()> cb) {
void add(void *key, std::function<void()> cb) {
std::unique_lock<std::mutex> lock(mMutex);
mCallbacks.push_back(cb);
mCallbacks.emplace(key, cb);
}

void remove(void *key) {
std::unique_lock<std::mutex> lock(mMutex);
mCallbacks.erase(key);
}

void trigger() {
Expand All @@ -45,7 +51,7 @@ private:
std::mutex mMutex;
Signal mWaitSignal;
std::thread mThread;
std::vector<std::function<void()>> mCallbacks;
std::unordered_map<void *, std::function<void()>> mCallbacks;

void loop() {
while (mRunning) {
Expand All @@ -65,7 +71,7 @@ private:
std::unique_lock<std::mutex> lock(mMutex);

for (auto it = mCallbacks.begin(); it != mCallbacks.end(); it++) {
auto cb = *it;
auto cb = it->second;
cb();
}

Expand Down
12 changes: 8 additions & 4 deletions src/Watcher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,16 @@ Watcher::Watcher(std::string dir, std::unordered_set<std::string> ignore)
mAsync(NULL),
mCallingCallbacks(false) {
mDebounce = Debounce::getShared();
mDebounce->add([this] () {
mDebounce->add(this, [this] () {
triggerCallbacks();
});
}

Watcher::~Watcher() {
std::unique_lock<std::mutex> lk(mMutex);
mDebounce->remove(this);
}

void Watcher::wait() {
std::unique_lock<std::mutex> lk(mMutex);
mCond.wait(lk);
Expand All @@ -74,7 +79,6 @@ void Watcher::triggerCallbacks() {
mCallbackEvents = mEvents;
mEvents.clear();

// mDebounce->trigger();
uv_async_send(mAsync);
}
}
Expand All @@ -100,9 +104,9 @@ void Watcher::fireCallbacks(uv_async_t *handle) {
watcher->mCallingCallbacks = false;
if (watcher->mCallbacks.size() == 0) {
watcher->unref();
} else {
watcher->mCallbackSignal.notify();
}

watcher->mCallbackSignal.notify();
}

bool Watcher::watch(Function callback) {
Expand Down
3 changes: 2 additions & 1 deletion src/Watcher.hh
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ struct Watcher {
DirTree *mTree;

Watcher(std::string dir, std::unordered_set<std::string> ignore);
~Watcher();

bool operator==(const Watcher &other) const {
return mDir == other.mDir;
return mDir == other.mDir && mIgnore == other.mIgnore;
}

void wait();
Expand Down
57 changes: 33 additions & 24 deletions src/linux/InotifyBackend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,10 @@ void InotifyBackend::watchDir(Watcher &watcher, DirEntry *entry) {
throw "inotify_add_watch failed";
}

entry->state = (void *)&watcher;
mSubscriptions.emplace(wd, entry);
InotifySubscription *sub = new InotifySubscription;
sub->entry = entry;
sub->watcher = &watcher;
mSubscriptions.emplace(wd, sub);
}

void InotifyBackend::handleEvents() {
Expand Down Expand Up @@ -110,10 +112,7 @@ void InotifyBackend::handleEvents() {
continue;
}

Watcher *watcher = handleEvent(event);
if (watcher) {
watchers.insert(watcher);
}
handleEvent(event, watchers);
}
}

Expand All @@ -122,25 +121,33 @@ void InotifyBackend::handleEvents() {
}
}

Watcher *InotifyBackend::handleEvent(struct inotify_event *event) {
void InotifyBackend::handleEvent(struct inotify_event *event, std::unordered_set<Watcher *> &watchers) {
std::unique_lock<std::mutex> lock(mMutex);

// Find a subscription for this watch descriptor
auto entry = mSubscriptions.find(event->wd);
if (entry == mSubscriptions.end()) {
// Unknown path
return NULL;
// Find the subscriptions for this watch descriptor
auto range = mSubscriptions.equal_range(event->wd);
std::unordered_set<InotifySubscription *> set;
for (auto it = range.first; it != range.second; it++) {
set.insert(it->second);
}

for (auto it = set.begin(); it != set.end(); it++) {
if (handleSubscription(event, *it)) {
watchers.insert((*it)->watcher);
}
}
}

bool InotifyBackend::handleSubscription(struct inotify_event *event, InotifySubscription *sub) {
// Build full path and check if its in our ignore list.
Watcher *watcher = (Watcher *)entry->second->state;
std::string path = std::string(entry->second->path);
Watcher *watcher = sub->watcher;
std::string path = std::string(sub->entry->path);
if (event->len > 0) {
path += "/" + std::string(event->name);
}

if (watcher->mIgnore.count(path) > 0) {
return NULL;
return false;
}

// If this is a create, check if it's a directory and start watching if it is.
Expand All @@ -164,15 +171,15 @@ Watcher *InotifyBackend::handleEvent(struct inotify_event *event) {
} else if (event->mask & (IN_DELETE | IN_DELETE_SELF | IN_MOVED_FROM | IN_MOVE_SELF)) {
// Ignore delete/move self events unless this is the recursive watch root
if ((event->mask & (IN_DELETE_SELF | IN_MOVE_SELF)) && path != watcher->mDir) {
return NULL;
return false;
}

// If the entry being deleted/moved is a directory, remove it from the list of subscriptions
auto entry = watcher->mTree->find(path);
if (entry && entry->isDir) {
for (auto it = mSubscriptions.begin(); it != mSubscriptions.end(); it++) {
if (it->second == &*entry) {
it->second->state = NULL;
if (it->second->entry == &*entry) {
delete it->second;
mSubscriptions.erase(it);
break;
}
Expand All @@ -183,19 +190,21 @@ Watcher *InotifyBackend::handleEvent(struct inotify_event *event) {
watcher->mTree->remove(path);
}

return watcher;
return true;
}

void InotifyBackend::unsubscribe(Watcher &watcher) {
// Find any subscriptions pointing to this watcher, and remove them.
for (auto it = mSubscriptions.begin(); it != mSubscriptions.end();) {
if (it->second->state == &watcher) {
int err = inotify_rm_watch(mInotify, it->first);
if (err == -1) {
throw "Unable to remove watcher";
if (it->second->watcher == &watcher) {
if (mSubscriptions.count(it->first) == 1) {
int err = inotify_rm_watch(mInotify, it->first);
if (err == -1) {
throw "Unable to remove watcher";
}
}

it->second->state = NULL;
delete it->second;
it = mSubscriptions.erase(it);
} else {
it++;
Expand Down
10 changes: 8 additions & 2 deletions src/linux/InotifyBackend.hh
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@
#include "../DirTree.hh"
#include "../Signal.hh"

struct InotifySubscription {
DirEntry *entry;
Watcher *watcher;
};

class InotifyBackend : public BruteForceBackend {
public:
void start() override;
Expand All @@ -16,12 +21,13 @@ public:
private:
int mPipe[2];
int mInotify;
std::unordered_map<int, DirEntry *> mSubscriptions;
std::unordered_multimap<int, InotifySubscription *> mSubscriptions;
Signal mEndedSignal;

void watchDir(Watcher &watcher, DirEntry *entry);
void handleEvents();
Watcher *handleEvent(struct inotify_event *event);
void handleEvent(struct inotify_event *event, std::unordered_set<Watcher *> &watchers);
bool handleSubscription(struct inotify_event *event, InotifySubscription *sub);
};

#endif
26 changes: 19 additions & 7 deletions src/watchman/watchman.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ BSER readBSER(T &&do_read) {
int r;
int64_t len = -1;
do {
r = do_read(buffer, sizeof(buffer));
// Start by reading a minimal amount of data in order to decode the length.
// After that, attempt to read the remaining length, up to the buffer size.
r = do_read(buffer, len == -1 ? 20 : (len < 256 ? len : 256));
if (r < 0) {
throw strerror(errno);
}
Expand Down Expand Up @@ -142,8 +144,8 @@ void handleFiles(Watcher &watcher, BSER::Object obj) {

void WatchmanBackend::handleSubscription(BSER::Object obj) {
std::unique_lock<std::mutex> lock(mMutex);
auto root = obj.find("root")->second.stringValue();
auto it = mSubscriptions.find(root);
auto subscription = obj.find("subscription")->second.stringValue();
auto it = mSubscriptions.find(subscription);
if (it == mSubscriptions.end()) {
return;
}
Expand Down Expand Up @@ -257,14 +259,23 @@ void WatchmanBackend::getEventsSince(Watcher &watcher, std::string *snapshotPath
handleFiles(watcher, obj);
}

std::string getId(Watcher &watcher) {
std::ostringstream id;
id << "fschanges-";
id << (void *)&watcher;
return id.str();
}

void WatchmanBackend::subscribe(Watcher &watcher) {
watchmanWatch(watcher.mDir);
mSubscriptions.emplace(watcher.mDir, &watcher);

std::string id = getId(watcher);
mSubscriptions.emplace(id, &watcher);

BSER::Array cmd;
cmd.push_back("subscribe");
cmd.push_back(watcher.mDir);
cmd.push_back("fschanges");
cmd.push_back(id);

BSER::Array fields;
fields.push_back("name");
Expand Down Expand Up @@ -303,13 +314,14 @@ void WatchmanBackend::subscribe(Watcher &watcher) {
}

void WatchmanBackend::unsubscribe(Watcher &watcher) {
auto erased = mSubscriptions.erase(watcher.mDir);
std::string id = getId(watcher);
auto erased = mSubscriptions.erase(id);

if (erased) {
BSER::Array cmd;
cmd.push_back("unsubscribe");
cmd.push_back(watcher.mDir);
cmd.push_back("fschanges");
cmd.push_back(id);

watchmanRequest(cmd);
}
Expand Down
90 changes: 90 additions & 0 deletions test/watcher.js
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,96 @@ describe('watcher', () => {
]);
});
});

describe('multiple', () => {
it('should support multiple watchers for the same directory', async () => {
let dir = path.join(fs.realpathSync(require('os').tmpdir()), Math.random().toString(31).slice(2));
fs.mkdirpSync(dir);
await new Promise(resolve => setTimeout(resolve, 100));

function listen() {
return new Promise(resolve => {
let fn = events => {
setImmediate(() => resolve(events));
fschanges.unsubscribe(dir, fn, {backend});
};

fschanges.subscribe(dir, fn, {backend});
});
}

let l1 = listen();
let l2 = listen();

fs.writeFile(path.join(dir, 'test1.txt'), 'test1');

let res = await Promise.all([l1, l2]);
assert.deepEqual(res, [
[{type: 'create', path: path.join(dir, 'test1.txt')}],
[{type: 'create', path: path.join(dir, 'test1.txt')}]
]);
});

it('should support multiple watchers for the same directory with different ignore paths', async () => {
let dir = path.join(fs.realpathSync(require('os').tmpdir()), Math.random().toString(31).slice(2));
fs.mkdirpSync(dir);
await new Promise(resolve => setTimeout(resolve, 100));

function listen(ignore) {
return new Promise(resolve => {
let fn = events => {
setImmediate(() => resolve(events));
fschanges.unsubscribe(dir, fn, {backend, ignore});
};

fschanges.subscribe(dir, fn, {backend, ignore});
});
}

let l1 = listen([path.join(dir, 'test1.txt')]);
let l2 = listen([path.join(dir, 'test2.txt')]);

fs.writeFile(path.join(dir, 'test1.txt'), 'test1');
fs.writeFile(path.join(dir, 'test2.txt'), 'test1');

let res = await Promise.all([l1, l2]);
assert.deepEqual(res, [
[{type: 'create', path: path.join(dir, 'test2.txt')}],
[{type: 'create', path: path.join(dir, 'test1.txt')}]
]);
});

it('should support multiple watchers for different directories', async () => {
let dir1 = path.join(fs.realpathSync(require('os').tmpdir()), Math.random().toString(31).slice(2));
let dir2 = path.join(fs.realpathSync(require('os').tmpdir()), Math.random().toString(31).slice(2));
fs.mkdirpSync(dir1);
fs.mkdirpSync(dir2);
await new Promise(resolve => setTimeout(resolve, 100));

function listen(dir) {
return new Promise(resolve => {
let fn = events => {
setImmediate(() => resolve(events));
fschanges.unsubscribe(dir, fn, {backend});
};

fschanges.subscribe(dir, fn, {backend});
});
}

let l1 = listen(dir1);
let l2 = listen(dir2);

fs.writeFile(path.join(dir1, 'test1.txt'), 'test1');
fs.writeFile(path.join(dir2, 'test1.txt'), 'test1');

let res = await Promise.all([l1, l2]);
assert.deepEqual(res, [
[{type: 'create', path: path.join(dir1, 'test1.txt')}],
[{type: 'create', path: path.join(dir2, 'test1.txt')}]
]);
});
});
});
});
});

0 comments on commit 728f9a4

Please sign in to comment.