-
Notifications
You must be signed in to change notification settings - Fork 7
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement UringSystem and FS2 Sockets using netty io_uring API #78
base: feature/jvm
Are you sure you want to change the base?
Changes from 185 commits
786abd1
16e6305
46f3dc8
fd2c459
be118ef
6e9ea0a
88f02e7
7be84f6
08eb506
486a164
09b79cb
d767b7b
408252c
3cb48c7
d6e7d76
2b0f5aa
0d1ae85
18dd9f3
3fdda21
f294e33
1bc768e
57b6ff3
6af5dbc
ca5a434
33eeebb
85c8699
8445b01
21ca1e7
4d432c4
50514ee
118ad86
5f74452
54dde73
42fe04b
18e2466
6a243be
2f37b74
79e9f03
19249e4
285782f
07629a9
27cb02b
4d5e8dc
51cfed9
a842e88
348f684
dfbc637
9c23c2b
070655b
6c9f301
28ffbca
4667297
032a279
0e7365c
d42665d
d273a5e
c96fc7d
4cce052
c0281fa
be3bf0f
ecb3e58
d52b7e7
7fa8b05
37e713a
49ba2b1
2215a4d
a518d30
59fa37c
ca46d74
ff89a44
dd1f553
f8f331b
f19997f
1714e86
3abe875
7f739eb
6b4940a
c738459
1df8f92
c9d9fe4
fe438f7
f152bd3
9562db6
3b7b38c
059ff22
5443096
23205a8
4f522f0
393c98c
79ebcf7
a2e14ef
d0b673b
20b3ff2
a342b06
1b468d9
5ee436f
110958d
4422d87
8273561
7f2d0ba
c8fccc3
776f411
3561b51
a322a73
801cb40
cfd82f4
bdf2268
0659f5b
6659989
7e057db
053cb86
40bc403
30c1e5f
ce99d4d
89cac94
b3e83b2
95eacc8
80a39a9
b8433bb
1d9bfaf
6df58b5
0789124
730ad95
51367ce
1967536
1a32946
e91ab6f
5f6c476
190d060
e01b2a3
6ae4b33
9f81ecd
8b80653
be666a3
0556843
92ce059
ae0c8fa
5f40156
4186384
edbac06
a6dcf88
be86490
2430e9e
54852f8
121d030
f482e39
99ababa
7a25084
e6da2a7
8a20450
4f55885
048722d
ef64368
d43e280
1293817
9876348
e98acdd
3028ac0
664b428
56e8673
17166af
bb1b3db
94d6a23
26ea3d5
83c39f0
1dd1e52
e76b9b2
5e095ac
85cc34f
58f4783
f391a26
a59e5d0
892b94e
7ed808a
4506020
198a4a0
af9488d
4e332b9
b655bb4
dfbef6b
51d90f8
d9b481b
fba3875
cea2b12
c8678d7
5f9922c
7afdf02
b4cb557
53f32de
6342f80
3471231
a61f589
b8ee8d3
58e628f
fc7f1a4
21b2af7
b678106
eee6522
32a3282
b5cfd0b
6880b0c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,76 @@ | ||
/* | ||
* Copyright 2022 Arman Bilge | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package fs2.io.uring | ||
|
||
import java.io.IOException | ||
import java.net.ConnectException | ||
import java.net.BindException | ||
import java.net.SocketException | ||
import java.net.SocketTimeoutException | ||
import java.net.NoRouteToHostException | ||
|
||
private[uring] object IOExceptionHelper { | ||
|
||
def apply(errno: Int): IOException = errno match { | ||
case 9 => // EBADF | ||
new IOException("Bad file descriptor") | ||
|
||
case 11 => // EAGAIN | ||
new IOException("Resource temporarily unavailable") | ||
|
||
case 13 => // EACCES | ||
new IOException("Permission denied") | ||
|
||
case 14 => // EFAULT | ||
new IOException("Bad address") | ||
|
||
case 22 => // EINVAL | ||
new IOException("Invalid argument") | ||
|
||
case 24 => // EMFILE | ||
new IOException("Too many open files") | ||
|
||
case 28 => // ENOSPC | ||
new IOException("No space left on device") | ||
|
||
case 32 => // EPIPE | ||
new IOException("Broken pipe") | ||
|
||
case 98 => // EADDRINUSE | ||
new BindException("Address already in use") | ||
|
||
case 99 => // EADDRNOTAVAIL | ||
new BindException("Cannot assign requested address") | ||
|
||
case 107 => // ECONNABORTED | ||
new SocketException("Connection aborted") | ||
|
||
case 110 => // ETIMEDOUT | ||
new SocketTimeoutException("Connection timed out") | ||
|
||
case 111 => // ECONNREFUSED | ||
new ConnectException("Connection refused") | ||
|
||
case 113 => // EHOSTUNREACH | ||
new NoRouteToHostException("No route to host") | ||
|
||
case 104 => // ECONNRESET | ||
new SocketException("Connection reset by peer") | ||
|
||
case _ => new IOException(errno.toString) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,59 @@ | ||
/* | ||
* Copyright 2022 Arman Bilge | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package fs2.io.uring | ||
|
||
import cats.effect.IO | ||
import cats.effect.LiftIO | ||
import cats.effect.kernel.Resource | ||
import cats.syntax.all._ | ||
|
||
abstract class Uring private[uring] { | ||
private[this] val noopMask: Int => Boolean = _ => false | ||
|
||
def call( | ||
op: Byte, | ||
flags: Int = 0, | ||
rwFlags: Int = 0, | ||
fd: Int = 0, | ||
bufferAddress: Long = 0, | ||
length: Int = 0, | ||
offset: Long = 0, | ||
mask: Int => Boolean = noopMask | ||
): IO[Int] | ||
|
||
def bracket( | ||
op: Byte, | ||
flags: Int = 0, | ||
rwFlags: Int = 0, | ||
fd: Int = 0, | ||
bufferAddress: Long = 0, | ||
length: Int = 0, | ||
offset: Long = 0, | ||
mask: Int => Boolean = noopMask | ||
)(release: Int => IO[Unit]): Resource[IO, Int] | ||
} | ||
|
||
object Uring { | ||
|
||
def get[F[_]: LiftIO]: F[Uring] = | ||
IO.pollers.flatMap { | ||
_.collectFirst { case ring: Uring => | ||
ring | ||
}.liftTo[IO](new RuntimeException("No UringSystem installed")) | ||
}.to | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
/* | ||
* Copyright 2022 Arman Bilge | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package fs2.io.uring | ||
|
||
import cats.effect.IOApp | ||
import fs2.io.uring.unsafe.UringSystem | ||
|
||
trait UringApp extends IOApp { | ||
|
||
override protected final def pollingSystem = UringSystem | ||
|
||
} | ||
|
||
object UringApp { | ||
trait Simple extends IOApp.Simple with UringApp | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
/* | ||
* Copyright 2022 Arman Bilge | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package fs2.io.uring | ||
|
||
import cats.effect.LiftIO | ||
import cats.effect.kernel.Async | ||
import com.comcast.ip4s.Dns | ||
import fs2.io.net.Network | ||
|
||
object implicits { | ||
|
||
@inline implicit def network[F[_]: Async: Dns: LiftIO]: Network[F] = ??? | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
/* | ||
* Copyright 2022 Arman Bilge | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package fs2.io.uring.net | ||
|
||
import cats.effect.kernel.Resource | ||
|
||
import com.comcast.ip4s._ | ||
|
||
import fs2.io.net.DatagramSocketGroup | ||
import fs2.io.net.DatagramSocket | ||
import fs2.io.net._ | ||
|
||
import java.net.ProtocolFamily | ||
|
||
private final class UringDatagramSocketGroup[F[_]] extends DatagramSocketGroup[F] { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess we can work on this in a follow-up PR :) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Great idea! I'll handle it :)
I think we aren't, so it should be easy to do. |
||
|
||
override def openDatagramSocket( | ||
address: Option[Host], | ||
port: Option[Port], | ||
options: List[DatagramSocketOption], | ||
protocolFamily: Option[ProtocolFamily] | ||
): Resource[F, DatagramSocket[F]] = ??? | ||
|
||
} | ||
|
||
object UringDatagramSocketGroup { | ||
def apply[F[_]]: DatagramSocketGroup[F] = new UringDatagramSocketGroup | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,87 @@ | ||
/* | ||
* Copyright 2022 Arman Bilge | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package fs2.io.uring.net | ||
|
||
import cats.effect.LiftIO | ||
import cats.effect.kernel.Async | ||
import cats.effect.kernel.Resource | ||
|
||
import com.comcast.ip4s.Dns | ||
import com.comcast.ip4s.Host | ||
import com.comcast.ip4s.Port | ||
import com.comcast.ip4s.SocketAddress | ||
import com.comcast.ip4s.IpAddress | ||
|
||
import fs2.Stream | ||
import fs2.io.net.Network | ||
import fs2.io.net.SocketOption | ||
import fs2.io.net.tls.TLSContext | ||
import fs2.io.net.SocketGroup | ||
import fs2.io.net.Socket | ||
import fs2.io.net.DatagramSocket | ||
import fs2.io.net.DatagramSocketGroup | ||
import fs2.io.net.DatagramSocketOption | ||
|
||
import java.net.ProtocolFamily | ||
|
||
import java.util.concurrent.ThreadFactory | ||
|
||
private[net] final class UringNetwork[F[_]]( | ||
sg: UringSocketGroup[F], | ||
dsg: UringDatagramSocketGroup[F], | ||
val tlsContext: TLSContext.Builder[F] | ||
) extends Network.UnsealedNetwork[F] { | ||
|
||
def socketGroup(threadCount: Int, threadFactory: ThreadFactory): Resource[F, SocketGroup[F]] = | ||
Resource.pure[F, SocketGroup[F]](sg) | ||
|
||
def datagramSocketGroup(threadFactory: ThreadFactory): Resource[F, DatagramSocketGroup[F]] = | ||
Resource.pure[F, DatagramSocketGroup[F]](dsg) | ||
|
||
def client(to: SocketAddress[Host], options: List[SocketOption]): Resource[F, Socket[F]] = | ||
sg.client(to, options) | ||
|
||
def server( | ||
address: Option[Host], | ||
port: Option[Port], | ||
options: List[SocketOption] | ||
): Stream[F, Socket[F]] = sg.server(address, port, options) | ||
|
||
def serverResource( | ||
address: Option[Host], | ||
port: Option[Port], | ||
options: List[SocketOption] | ||
): Resource[F, (SocketAddress[IpAddress], Stream[F, Socket[F]])] = | ||
sg.serverResource(address, port, options) | ||
|
||
def openDatagramSocket( | ||
address: Option[Host], | ||
port: Option[Port], | ||
options: List[DatagramSocketOption], | ||
protocolFamily: Option[ProtocolFamily] | ||
): Resource[F, DatagramSocket[F]] = | ||
dsg.openDatagramSocket(address, port, options, protocolFamily) | ||
} | ||
|
||
object UringNetwork { | ||
def apply[F[_]: Async: Dns: LiftIO]: Network[F] = | ||
new UringNetwork( | ||
new UringSocketGroup[F], | ||
new UringDatagramSocketGroup[F], | ||
TLSContext.Builder.forAsync[F] | ||
) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should probably update the copyright. :-)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
its minez 😏
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
😂