Skip to content

Commit

Permalink
Migrate to kotlinx-io and ktor 3.0 (#280)
Browse files Browse the repository at this point in the history
  • Loading branch information
whyoleg authored Nov 30, 2024
1 parent dc76dc4 commit aa46c25
Show file tree
Hide file tree
Showing 124 changed files with 962 additions and 1,195 deletions.
14 changes: 7 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ val stream: Flow<Payload> = rSocket.requestStream(

//take 5 values and print response
stream.take(5).collect { payload: Payload ->
println(payload.data.readText())
println(payload.data.readString())
}
```

Expand All @@ -143,20 +143,20 @@ embeddedServer(CIO) {
routing {
//configure route `/rsocket`
rSocket("rsocket") {
println(config.setupPayload.data.readText()) //print setup payload data
println(config.setupPayload.data.readString()) //print setup payload data

RSocketRequestHandler {
//handler for request/response
requestResponse { request: Payload ->
println(request.data.readText()) //print request payload data
println(request.data.readString()) //print request payload data
delay(500) // work emulation
buildPayload {
data("""{ "data": "Server response" }""")
}
}
//handler for request/stream
requestStream { request: Payload ->
println(request.data.readText()) //print request payload data
println(request.data.readString()) //print request payload data
flow {
repeat(10) { i ->
emit(
Expand Down Expand Up @@ -208,7 +208,7 @@ val connector = RSocketConnector {
val rsocket: RSocket = connector.connect(transport)
//use rsocket to do request
val response = rsocket.requestResponse(buildPayload { data("""{ "data": "hello world" }""") })
println(response.data.readText())
println(response.data.readString())
```

Example of usage standalone server transport:
Expand All @@ -223,7 +223,7 @@ val server: TcpServer = server.bind(transport) {
RSocketRequestHandler {
//handler for request/response
requestResponse { request: Payload ->
println(request.data.readText()) //print request payload data
println(request.data.readString()) //print request payload data
delay(500) // work emulation
buildPayload {
data("""{ "data": "Server response" }""")
Expand Down Expand Up @@ -267,7 +267,7 @@ val stream: Flow<Payload> = client.requestStream(Payload("data"))
//so on call `collect`, requestStream frame with requestN will be sent, and then, after 5 elements will be collected
//new requestN with 5 will be sent, so collect will be smooth
stream.flowOn(PrefetchStrategy(requestSize = 10, requestOn = 5)).collect { payload: Payload ->
println(payload.data.readText())
println(payload.data.readString())
}
```

Expand Down
4 changes: 3 additions & 1 deletion gradle/libs.versions.toml
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
[versions]
kotlin = "2.1.0"

kotlinx-io = "0.6.0"
kotlinx-atomicfu = "0.26.0"
kotlinx-coroutines = "1.9.0"
kotlinx-benchmark = "0.4.8"
kotlinx-bcv = "0.16.3"

ktor = "2.3.12"
ktor = "3.0.1"

netty = "4.1.115.Final"
netty-quic = "0.0.69.Final"
Expand All @@ -23,6 +24,7 @@ jmh = "1.36"
maven-publish = "0.30.0"

[libraries]
kotlinx-io-core = { module = "org.jetbrains.kotlinx:kotlinx-io-core", version.ref = "kotlinx-io" }
kotlinx-coroutines-core = { module = "org.jetbrains.kotlinx:kotlinx-coroutines-core", version.ref = "kotlinx-coroutines" }
kotlinx-coroutines-test = { module = "org.jetbrains.kotlinx:kotlinx-coroutines-test", version.ref = "kotlinx-coroutines" }
kotlinx-coroutines-reactor = { module = "org.jetbrains.kotlinx:kotlinx-coroutines-reactor", version.ref = "kotlinx-coroutines" }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import io.ktor.server.websocket.WebSockets as ServerWebSockets
import io.rsocket.kotlin.ktor.client.RSocketSupport as ClientRSocketSupport
import io.rsocket.kotlin.ktor.server.RSocketSupport as ServerRSocketSupport

class WebSocketConnectionTest : SuspendTest, TestWithLeakCheck {
class WebSocketConnectionTest : SuspendTest {
private val client = HttpClient(ClientCIO) {
install(ClientWebSockets)
install(ClientRSocketSupport) {
Expand All @@ -57,7 +57,7 @@ class WebSocketConnectionTest : SuspendTest, TestWithLeakCheck {
install(ServerRSocketSupport) {
server(TestServer())
}
install(Routing) {
routing {
rSocket {
RSocketRequestHandler {
requestStream {
Expand Down Expand Up @@ -87,7 +87,7 @@ class WebSocketConnectionTest : SuspendTest, TestWithLeakCheck {

@Test
fun testWorks() = test {
val port = server.resolvedConnectors().single().port
val port = server.engine.resolvedConnectors().single().port
val rSocket = client.rSocket(port = port)
val requesterJob = rSocket.coroutineContext.job

Expand Down
Loading

0 comments on commit aa46c25

Please sign in to comment.