Skip to content

Commit

Permalink
Add level/edge trigger param to FileEventImpl and DispatcherImpl::cre… (
Browse files Browse the repository at this point in the history
#456)

c-ares needs level triggering (#143).
  • Loading branch information
htuch authored and mattklein123 committed Feb 10, 2017
1 parent 3bf1d94 commit 71ee2f6
Show file tree
Hide file tree
Showing 12 changed files with 148 additions and 53 deletions.
3 changes: 2 additions & 1 deletion include/envoy/event/dispatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,9 @@ class Dispatcher {
* can be used for any file like interface (files, sockets, etc.).
* @param fd supplies the fd to watch.
* @param cb supplies the callback to fire when the file is ready.
* @param trigger specifies whether to edge or level trigger.
*/
virtual FileEventPtr createFileEvent(int fd, FileReadyCb cb) PURE;
virtual FileEventPtr createFileEvent(int fd, FileReadyCb cb, FileTriggerType trigger) PURE;

/**
* @return Filesystem::WatcherPtr a filesystem watcher owned by the caller.
Expand Down
8 changes: 8 additions & 0 deletions include/envoy/event/file_event.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ struct FileReadyType {
static const uint32_t Write = 0x2;
};

enum class FileTriggerType { Level, Edge };

/**
* Callback invoked when a FileEvent is ready for reading or writing.
*/
Expand All @@ -26,6 +28,12 @@ class FileEvent {
* events.
*/
virtual void activate(uint32_t events) PURE;

/**
* Enable the file event explicitly for a set of events. This allows read and
* write events to be independently enabled/disabled for a file event.
*/
virtual void setEnabled(uint32_t events) PURE;
};

typedef std::unique_ptr<FileEvent> FileEventPtr;
Expand Down
4 changes: 2 additions & 2 deletions source/common/event/dispatcher_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ Network::DnsResolverPtr DispatcherImpl::createDnsResolver() {
return Network::DnsResolverPtr{new Network::DnsResolverImpl(*this)};
}

FileEventPtr DispatcherImpl::createFileEvent(int fd, FileReadyCb cb) {
return FileEventPtr{new FileEventImpl(*this, fd, cb)};
FileEventPtr DispatcherImpl::createFileEvent(int fd, FileReadyCb cb, FileTriggerType trigger) {
return FileEventPtr{new FileEventImpl(*this, fd, cb, trigger)};
}

Filesystem::WatcherPtr DispatcherImpl::createFilesystemWatcher() {
Expand Down
2 changes: 1 addition & 1 deletion source/common/event/dispatcher_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class DispatcherImpl : Logger::Loggable<Logger::Id::main>, public Dispatcher {
createSslClientConnection(Ssl::ClientContext& ssl_ctx,
Network::Address::InstancePtr address) override;
Network::DnsResolverPtr createDnsResolver() override;
FileEventPtr createFileEvent(int fd, FileReadyCb cb) override;
FileEventPtr createFileEvent(int fd, FileReadyCb cb, FileTriggerType trigger) override;
Filesystem::WatcherPtr createFilesystemWatcher() override;
Network::ListenerPtr createListener(Network::ConnectionHandler& conn_handler,
Network::ListenSocket& socket, Network::ListenerCallbacks& cb,
Expand Down
46 changes: 30 additions & 16 deletions source/common/event/file_event_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,32 @@

namespace Event {

FileEventImpl::FileEventImpl(DispatcherImpl& dispatcher, int fd, FileReadyCb cb) : cb_(cb) {
event_assign(&raw_event_, &dispatcher.base(), fd, EV_PERSIST | EV_ET | EV_READ | EV_WRITE,
FileEventImpl::FileEventImpl(DispatcherImpl& dispatcher, int fd, FileReadyCb cb,
FileTriggerType trigger)
: cb_(cb), base_(&dispatcher.base()), fd_(fd), trigger_(trigger) {
assignEvents(FileReadyType::Read | FileReadyType::Write);
event_add(&raw_event_, nullptr);
}

void FileEventImpl::activate(uint32_t events) {
int libevent_events = 0;
if (events & FileReadyType::Read) {
libevent_events |= EV_READ;
}

if (events & FileReadyType::Write) {
libevent_events |= EV_WRITE;
}

ASSERT(libevent_events);
event_active(&raw_event_, libevent_events, 0);
}

void FileEventImpl::assignEvents(uint32_t events) {
event_assign(&raw_event_, base_, fd_,
EV_PERSIST | (trigger_ == FileTriggerType::Level ? 0 : EV_ET) |
(events & FileReadyType::Read ? EV_READ : 0) |
(events & FileReadyType::Write ? EV_WRITE : 0),
[](evutil_socket_t, short what, void* arg) -> void {
FileEventImpl* event = static_cast<FileEventImpl*>(arg);
uint32_t events = 0;
Expand All @@ -24,22 +48,12 @@ FileEventImpl::FileEventImpl(DispatcherImpl& dispatcher, int fd, FileReadyCb cb)
event->cb_(events);
},
this);

event_add(&raw_event_, nullptr);
}

void FileEventImpl::activate(uint32_t events) {
int libevent_events = 0;
if (events & FileReadyType::Read) {
libevent_events |= EV_READ;
}

if (events & FileReadyType::Write) {
libevent_events |= EV_WRITE;
}

ASSERT(libevent_events);
event_active(&raw_event_, libevent_events, 0);
void FileEventImpl::setEnabled(uint32_t events) {
event_del(&raw_event_);
assignEvents(events);
event_add(&raw_event_, nullptr);
}

} // Event
11 changes: 8 additions & 3 deletions source/common/event/file_event_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,23 @@
namespace Event {

/**
* Implementation of FileEvent for libevent that uses edge triggered persistent events and assumes
* the user will read/write until EAGAIN is returned from the file.
* Implementation of FileEvent for libevent that uses persistent events and
* assumes the user will read/write until EAGAIN is returned from the file.
*/
class FileEventImpl : public FileEvent, ImplBase {
public:
FileEventImpl(DispatcherImpl& dispatcher, int fd, FileReadyCb cb);
FileEventImpl(DispatcherImpl& dispatcher, int fd, FileReadyCb cb, FileTriggerType trigger);

// Event::FileEvent
void activate(uint32_t events) override;
void setEnabled(uint32_t events) override;

private:
void assignEvents(uint32_t events);
FileReadyCb cb_;
event_base* base_;
int fd_;
FileTriggerType trigger_;
};

} // Event
2 changes: 1 addition & 1 deletion source/common/filesystem/watcher_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ WatcherImpl::WatcherImpl(Event::Dispatcher& dispatcher)
if (events & Event::FileReadyType::Read) {
onInotifyEvent();
}
})) {}
}, Event::FileTriggerType::Edge)) {}

WatcherImpl::~WatcherImpl() { close(inotify_fd_); }

Expand Down
4 changes: 2 additions & 2 deletions source/common/network/connection_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ ConnectionImpl::ConnectionImpl(Event::DispatcherImpl& dispatcher, int fd,
// condition and just crash.
RELEASE_ASSERT(fd_ != -1);

file_event_ =
dispatcher_.createFileEvent(fd_, [this](uint32_t events) -> void { onFileEvent(events); });
file_event_ = dispatcher_.createFileEvent(
fd_, [this](uint32_t events) -> void { onFileEvent(events); }, Event::FileTriggerType::Edge);
}

ConnectionImpl::~ConnectionImpl() {
Expand Down
2 changes: 1 addition & 1 deletion source/common/network/proxy_protocol.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ ProxyProtocol::ActiveConnection::ActiveConnection(ProxyProtocol& parent,
if (events & Event::FileReadyType::Read) {
onRead();
}
});
}, Event::FileTriggerType::Edge);
}

ProxyProtocol::ActiveConnection::~ActiveConnection() {
Expand Down
2 changes: 1 addition & 1 deletion source/exe/hot_restart.cc
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ void HotRestartImpl::initialize(Event::Dispatcher& dispatcher, Server::Instance&
if (events & Event::FileReadyType::Read) {
onSocketEvent();
}
});
}, Event::FileTriggerType::Edge);
server_ = &server;
}

Expand Down
111 changes: 89 additions & 22 deletions test/common/event/file_event_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,34 +6,101 @@

namespace Event {

TEST(FileEventImplTest, All) {
int fds[2];
int rc = socketpair(AF_UNIX, SOCK_DGRAM, 0, fds);
ASSERT_EQ(0, rc);
class FileEventImplTest : public testing::Test {
public:
void SetUp() override {
int rc = socketpair(AF_UNIX, SOCK_DGRAM, 0, fds_);
ASSERT_EQ(0, rc);
int data = 1;
rc = write(fds_[1], &data, sizeof(data));
ASSERT_EQ(sizeof(data), static_cast<size_t>(rc));
}

void TearDown() override {
close(fds_[0]);
close(fds_[1]);
}

protected:
int fds_[2];
};

TEST_F(FileEventImplTest, EdgeTrigger) {
DispatcherImpl dispatcher;
ReadyWatcher read_event;
EXPECT_CALL(read_event, ready()).Times(1);
ReadyWatcher write_event;
EXPECT_CALL(write_event, ready()).Times(1);

Event::FileEventPtr file_event =
dispatcher.createFileEvent(fds_[0], [&](uint32_t events) -> void {
if (events & FileReadyType::Read) {
read_event.ready();
}

if (events & FileReadyType::Write) {
write_event.ready();
}
}, FileTriggerType::Edge);

dispatcher.run(Event::Dispatcher::RunType::NonBlock);
}

TEST_F(FileEventImplTest, LevelTrigger) {
DispatcherImpl dispatcher;
ReadyWatcher read_event;
EXPECT_CALL(read_event, ready());
EXPECT_CALL(read_event, ready()).Times(2);
ReadyWatcher write_event;
EXPECT_CALL(write_event, ready());

int data = 1;
rc = write(fds[1], &data, sizeof(data));
ASSERT_EQ(sizeof(data), static_cast<size_t>(rc));
Event::FileEventPtr file_event = dispatcher.createFileEvent(fds[0], [&](uint32_t events) -> void {
if (events & FileReadyType::Read) {
read_event.ready();
}

if (events & FileReadyType::Write) {
write_event.ready();
dispatcher.exit();
}
});
EXPECT_CALL(write_event, ready()).Times(2);

int count = 2;
Event::FileEventPtr file_event =
dispatcher.createFileEvent(fds_[0], [&](uint32_t events) -> void {
if (--count == 0) {
dispatcher.exit();
return;
}
if (events & FileReadyType::Read) {
read_event.ready();
}

if (events & FileReadyType::Write) {
write_event.ready();
}
}, FileTriggerType::Level);

dispatcher.run(Event::Dispatcher::RunType::Block);
close(fds[0]);
close(fds[1]);
}

TEST_F(FileEventImplTest, SetEnabled) {
DispatcherImpl dispatcher;
ReadyWatcher read_event;
EXPECT_CALL(read_event, ready()).Times(2);
ReadyWatcher write_event;
EXPECT_CALL(write_event, ready()).Times(2);

Event::FileEventPtr file_event =
dispatcher.createFileEvent(fds_[0], [&](uint32_t events) -> void {
if (events & FileReadyType::Read) {
read_event.ready();
}

if (events & FileReadyType::Write) {
write_event.ready();
}
}, FileTriggerType::Edge);

file_event->setEnabled(FileReadyType::Read);
dispatcher.run(Event::Dispatcher::RunType::NonBlock);

file_event->setEnabled(FileReadyType::Write);
dispatcher.run(Event::Dispatcher::RunType::NonBlock);

file_event->setEnabled(0);
dispatcher.run(Event::Dispatcher::RunType::NonBlock);

file_event->setEnabled(FileReadyType::Read | FileReadyType::Write);
dispatcher.run(Event::Dispatcher::RunType::NonBlock);
}

} // Event
6 changes: 3 additions & 3 deletions test/mocks/event/mocks.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ class MockDispatcher : public Dispatcher {
return Network::DnsResolverPtr{createDnsResolver_()};
}

FileEventPtr createFileEvent(int fd, FileReadyCb cb) override {
return FileEventPtr{createFileEvent_(fd, cb)};
FileEventPtr createFileEvent(int fd, FileReadyCb cb, FileTriggerType trigger) override {
return FileEventPtr{createFileEvent_(fd, cb, trigger)};
}

Filesystem::WatcherPtr createFilesystemWatcher() override {
Expand Down Expand Up @@ -80,7 +80,7 @@ class MockDispatcher : public Dispatcher {
Network::ClientConnection*(Ssl::ClientContext& ssl_ctx,
Network::Address::InstancePtr address));
MOCK_METHOD0(createDnsResolver_, Network::DnsResolver*());
MOCK_METHOD2(createFileEvent_, FileEvent*(int fd, FileReadyCb cb));
MOCK_METHOD3(createFileEvent_, FileEvent*(int fd, FileReadyCb cb, FileTriggerType trigger));
MOCK_METHOD0(createFilesystemWatcher_, Filesystem::Watcher*());
MOCK_METHOD7(createListener_,
Network::Listener*(Network::ConnectionHandler& conn_handler,
Expand Down

0 comments on commit 71ee2f6

Please sign in to comment.