Skip to content

Commit

Permalink
[improve][broker] Allow to use io_uring instead of epoll (#18385)
Browse files Browse the repository at this point in the history
* Allow to use io_uring instead of epoll

* fix pom
  • Loading branch information
coderzc authored Nov 10, 2022
1 parent a6933d5 commit dd07241
Show file tree
Hide file tree
Showing 9 changed files with 88 additions and 23 deletions.
3 changes: 3 additions & 0 deletions distribution/server/src/assemble/LICENSE.bin.txt
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,9 @@ The Apache Software License, Version 2.0
- io.netty-netty-tcnative-boringssl-static-2.0.52.Final-osx-x86_64.jar
- io.netty-netty-tcnative-boringssl-static-2.0.52.Final-windows-x86_64.jar
- io.netty-netty-tcnative-classes-2.0.52.Final.jar
- io.netty.incubator-netty-incubator-transport-classes-io_uring-0.0.15.Final.jar
- io.netty.incubator-netty-incubator-transport-native-io_uring-0.0.15.Final-linux-x86_64.jar
- io.netty.incubator-netty-incubator-transport-native-io_uring-0.0.15.Final-linux-aarch_64.jar
* Prometheus client
- io.prometheus.jmx-collector-0.16.1.jar
- io.prometheus-simpleclient-0.16.0.jar
Expand Down
19 changes: 19 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ flexible messaging model and an intuitive client API.</description>
<curator.version>5.1.0</curator.version>
<netty.version>4.1.77.Final</netty.version>
<netty-tc-native.version>2.0.52.Final</netty-tc-native.version>
<netty-iouring.version>0.0.15.Final</netty-iouring.version>
<jetty.version>9.4.48.v20220622</jetty.version>
<conscrypt.version>2.5.2</conscrypt.version>
<jersey.version>2.34</jersey.version>
Expand Down Expand Up @@ -642,6 +643,24 @@ flexible messaging model and an intuitive client API.</description>
<version>${netty.version}</version>
</dependency>

<dependency>
<groupId>io.netty.incubator</groupId>
<artifactId>netty-incubator-transport-native-io_uring</artifactId>
<version>${netty-iouring.version}</version>
</dependency>
<dependency>
<groupId>io.netty.incubator</groupId>
<artifactId>netty-incubator-transport-native-io_uring</artifactId>
<version>${netty-iouring.version}</version>
<classifier>linux-x86_64</classifier>
</dependency>
<dependency>
<groupId>io.netty.incubator</groupId>
<artifactId>netty-incubator-transport-native-io_uring</artifactId>
<version>${netty-iouring.version}</version>
<classifier>linux-aarch_64</classifier>
</dependency>

<dependency>
<groupId>com.beust</groupId>
<artifactId>jcommander</artifactId>
Expand Down
1 change: 1 addition & 0 deletions pulsar-broker-shaded/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@
<include>com.google.*:*</include>
<include>com.fasterxml.jackson.*:*</include>
<include>io.netty:*</include>
<include>io.netty.incubator:*</include>
<include>org.apache.pulsar:pulsar-common</include>
<include>org.apache.bookkeeper:circe-checksum</include>
<include>com.yahoo.datasketches:sketches-core</include>
Expand Down
1 change: 1 addition & 0 deletions pulsar-client-admin-shaded/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@
<include>com.google.code.gson:gson</include>
<include>com.fasterxml.jackson.core</include>
<include>io.netty:*</include>
<include>io.netty.incubator:*</include>
<include>org.apache.pulsar:pulsar-common</include>
<include>org.apache.bookkeeper:*</include>
<include>com.yahoo.datasketches:sketches-core</include>
Expand Down
1 change: 1 addition & 0 deletions pulsar-client-all/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@
<include>io.perfmark:*</include>
<include>com.yahoo.datasketches:*</include>
<include>io.netty:*</include>
<include>io.netty.incubator:*</include>
<include>com.squareup.*:*</include>
<include>com.google.*:*</include>
<include>commons-*:*</include>
Expand Down
1 change: 1 addition & 0 deletions pulsar-client-shaded/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@
<include>com.fasterxml.jackson.core:jackson-core</include>
<include>com.fasterxml.jackson.dataformat</include>
<include>io.netty:*</include>
<include>io.netty.incubator:*</include>
<include>io.perfmark:*</include>
<include>org.eclipse.jetty:*</include>
<include>com.yahoo.datasketches:*</include>
Expand Down
12 changes: 12 additions & 0 deletions pulsar-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,18 @@
<artifactId>netty-tcnative-boringssl-static</artifactId>
</dependency>

<dependency>
<groupId>io.netty.incubator</groupId>
<artifactId>netty-incubator-transport-native-io_uring</artifactId>
<classifier>linux-x86_64</classifier>
</dependency>

<dependency>
<groupId>io.netty.incubator</groupId>
<artifactId>netty-incubator-transport-native-io_uring</artifactId>
<classifier>linux-aarch_64</classifier>
</dependency>

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec-haproxy</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,43 +35,61 @@
import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.incubator.channel.uring.IOUring;
import io.netty.incubator.channel.uring.IOUringDatagramChannel;
import io.netty.incubator.channel.uring.IOUringEventLoopGroup;
import io.netty.incubator.channel.uring.IOUringServerSocketChannel;
import io.netty.incubator.channel.uring.IOUringSocketChannel;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadFactory;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.common.util.affinity.CpuAffinity;
import org.apache.commons.lang3.StringUtils;

@SuppressWarnings("checkstyle:JavadocType")
@Slf4j
public class EventLoopUtil {

private static final String ENABLE_IO_URING = "enable.io_uring";

/**
* @return an EventLoopGroup suitable for the current platform
*/
public static EventLoopGroup newEventLoopGroup(int nThreads, boolean enableBusyWait, ThreadFactory threadFactory) {
if (Epoll.isAvailable()) {
if (!enableBusyWait) {
// Regular Epoll based event loop
return new EpollEventLoopGroup(nThreads, threadFactory);
}
String enableIoUring = System.getProperty(ENABLE_IO_URING);

// With low latency setting, put the Netty event loop on busy-wait loop to reduce cost of
// context switches
EpollEventLoopGroup eventLoopGroup = new EpollEventLoopGroup(nThreads, threadFactory,
() -> (selectSupplier, hasTasks) -> SelectStrategy.BUSY_WAIT);
// By default, io_uring will not be enabled, even if available. The environment variable will be used:
// enable.io_uring=1
if (StringUtils.equalsAnyIgnoreCase(enableIoUring, "1", "true")) {
// Throw exception if IOUring cannot be used
IOUring.ensureAvailability();
return new IOUringEventLoopGroup(nThreads, threadFactory);
} else {
if (!enableBusyWait) {
// Regular Epoll based event loop
return new EpollEventLoopGroup(nThreads, threadFactory);
}

// Enable CPU affinity on IO threads
for (int i = 0; i < nThreads; i++) {
eventLoopGroup.next().submit(() -> {
try {
CpuAffinity.acquireCore();
} catch (Throwable t) {
log.warn("Failed to acquire CPU core for thread {} {}", Thread.currentThread().getName(),
t.getMessage(), t);
}
});
}
// With low latency setting, put the Netty event loop on busy-wait loop to reduce cost of
// context switches
EpollEventLoopGroup eventLoopGroup = new EpollEventLoopGroup(nThreads, threadFactory,
() -> (selectSupplier, hasTasks) -> SelectStrategy.BUSY_WAIT);

return eventLoopGroup;
// Enable CPU affinity on IO threads
for (int i = 0; i < nThreads; i++) {
eventLoopGroup.next().submit(() -> {
try {
CpuAffinity.acquireCore();
} catch (Throwable t) {
log.warn("Failed to acquire CPU core for thread {} {}", Thread.currentThread().getName(),
t.getMessage(), t);
}
});
}

return eventLoopGroup;
}
} else {
// Fallback to NIO
return new NioEventLoopGroup(nThreads, threadFactory);
Expand All @@ -85,23 +103,29 @@ public static EventLoopGroup newEventLoopGroup(int nThreads, boolean enableBusyW
* @return
*/
public static Class<? extends SocketChannel> getClientSocketChannelClass(EventLoopGroup eventLoopGroup) {
if (eventLoopGroup instanceof EpollEventLoopGroup) {
if (eventLoopGroup instanceof IOUringEventLoopGroup) {
return IOUringSocketChannel.class;
} else if (eventLoopGroup instanceof EpollEventLoopGroup) {
return EpollSocketChannel.class;
} else {
return NioSocketChannel.class;
}
}

public static Class<? extends ServerSocketChannel> getServerSocketChannelClass(EventLoopGroup eventLoopGroup) {
if (eventLoopGroup instanceof EpollEventLoopGroup) {
if (eventLoopGroup instanceof IOUringEventLoopGroup) {
return IOUringServerSocketChannel.class;
} else if (eventLoopGroup instanceof EpollEventLoopGroup) {
return EpollServerSocketChannel.class;
} else {
return NioServerSocketChannel.class;
}
}

public static Class<? extends DatagramChannel> getDatagramChannelClass(EventLoopGroup eventLoopGroup) {
if (eventLoopGroup instanceof EpollEventLoopGroup) {
if (eventLoopGroup instanceof IOUringEventLoopGroup) {
return IOUringDatagramChannel.class;
} else if (eventLoopGroup instanceof EpollEventLoopGroup) {
return EpollDatagramChannel.class;
} else {
return NioDatagramChannel.class;
Expand Down
3 changes: 3 additions & 0 deletions pulsar-sql/presto-distribution/LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,9 @@ The Apache Software License, Version 2.0
- netty-transport-native-unix-common-4.1.77.Final.jar
- netty-transport-native-unix-common-4.1.77.Final-linux-x86_64.jar
- netty-codec-http2-4.1.77.Final.jar
- netty-incubator-transport-classes-io_uring-0.0.15.Final.jar
- netty-incubator-transport-native-io_uring-0.0.15.Final-linux-x86_64.jar
- netty-incubator-transport-native-io_uring-0.0.15.Final-linux-aarch_64.jar
* GRPC
- grpc-api-1.45.1.jar
- grpc-context-1.45.1.jar
Expand Down

0 comments on commit dd07241

Please sign in to comment.