Skip to content

Commit

Permalink
Merge pull request #44556 from cescoffier/websocket-next-subscription
Browse files Browse the repository at this point in the history
Extend websocket next documentation to explain when and when not to subscribe to a Uni or Multi
  • Loading branch information
mkouba authored Nov 20, 2024
2 parents 021a05e + 03a5ea7 commit 26db59d
Showing 1 changed file with 90 additions and 6 deletions.
96 changes: 90 additions & 6 deletions docs/src/main/asciidoc/websockets-next-reference.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,29 @@ However, it may also accept the following parameters:
The message object represents the data sent and can be accessed as either raw content (`String`, `JsonObject`, `JsonArray`, `Buffer` or `byte[]`) or deserialized high-level objects, which is the recommended approach.

When receiving a `Multi`, the method is invoked once per connection, and the provided `Multi` receives the items transmitted by this connection.
The method must subscribe to the `Multi` to receive these items (or return a Multi).
If the method returns a `Multi` (constructed from the received one), Quarkus will automatically subscribe to it and write the emitted items until completion, failure, or cancellation.
However, if your method does not return a `Multi`, you must subscribe to the incoming `Multi` to consume the data.

Here are two examples:

[source,java]
----
// No need to subscribe to the incoming Multi as the method returns a Multi derived from the incoming one
@OnTextMessage
public Multi<ChatMessage> stream(Multi<ChatMessage> incoming) {
return incoming.log();
}
// ...
// Must subscribe to the incoming Multi as the method does not return a Multi, otherwise no data will be consumed
@OnTextMessage
public void stream(Multi<ChatMessage> incoming) {
incoming.subscribe().with(item -> log(item));
}
----

See <<subscribe-or-not-subscribe>> to learn more about subscribing to the incoming `Multi`.

==== Supported return types

Expand Down Expand Up @@ -319,7 +341,9 @@ Multi<ResponseMessage> stream(Message m) {
}
----

When returning a Multi, Quarkus subscribes to the returned Multi automatically and writes the emitted items until completion, failure, or cancellation. Failure or cancellation terminates the connection.
Methods returning `Uni` and `Multi` are considered non-blocking.
In addition, Quarkus automatically subscribes to the returned `Multi` / `Uni` and writes the emitted items until completion, failure, or cancellation.
Failure or cancellation terminates the connection.

==== Streams

Expand All @@ -340,7 +364,8 @@ public Multi<ChatMessage> stream(Multi<ChatMessage> incoming) {

This approach allows bi-directional streaming.

When the method returns `void`, it must subscribe to the incoming `Multi`:
When the method returns `void`, and so does not return a `Multi`, the code must subscribe to the incoming `Multi`.
Otherwise, no data will be consumed, and the connection will not be closed:

[source, java]
----
Expand All @@ -350,6 +375,10 @@ public void stream(Multi<ChatMessage> incoming) {
}
----

Also note that the `stream` method will complete before the `Multi` completes.

See <<subscribe-or-not-subscribe>> to learn more about subscribing to the incoming `Multi`.

==== Skipping reply

When a method is intended to produce a message written to the client, it can emit `null`.
Expand Down Expand Up @@ -636,7 +665,19 @@ String param = connection.pathParam("foo");
The `WebSocketConnection` provides both a blocking and a non-blocking method variants to send messages:

- `sendTextAndAwait(String message)`: Sends a text message to the client and waits for the message to be sent. It's blocking and should only be called from an executor thread.
- `sendText(String message)`: Sends a text message to the client. It returns a `Uni`. It's non-blocking, but you must subscribe to it.
- `sendText(String message)`: Sends a text message to the client. It returns a `Uni`. It's non-blocking. Make sure you or Quarkus subscribes to the returned `Uni` to send the message.
If you return the `Uni` from a method invoked by Quarkus (like with Quarkus REST, Quarkus WebSocket Next or Quarkus Messaging), it will subscribe to it and send the message.
For example:

[source,java]
----
@POST
public Uni<Void> send() {
return connection.sendText("Hello!"); // Quarkus automatically subscribes to the returned Uni and sends the message.
}
----

See <<subscribe-or-not-subscribe>> to learn more about subscribing to the `Uni`.

[[list-open-connections]]
==== List open connections
Expand All @@ -655,7 +696,7 @@ class MyBean {
OpenConnections connections;
void logAllOpenConnections() {
Log.infof("Open connections: %s", connections.listAll()); <1>
Log.infof("Open connections: %s", connections.listAll()); // <1>
}
}
----
Expand Down Expand Up @@ -1078,7 +1119,17 @@ String param = connection.pathParam("foo");
The `WebSocketClientConnection` provides both a blocking and a non-blocking method variants to send messages:

- `sendTextAndAwait(String message)`: Sends a text message to the client and waits for the message to be sent. It's blocking and should only be called from an executor thread.
- `sendText(String message)`: Sends a text message to the client. It returns a `Uni`. It's non-blocking, but you must subscribe to it.
- `sendText(String message)`: Sends a text message to the client. It returns a `Uni`. It's non-blocking. Make sure you or Quarkus subscribes to the returned `Uni` to send the message.
If you return the `Uni` from a method invoked by Quarkus (like with Quarkus REST, Quarkus WebSocket Next or Quarkus Messaging), it will subscribe to it and send the message.
For example:

[source,java]
----
@POST
public Uni<Void> send() {
return connection.sendText("Hello!"); // Quarkus automatically subscribes to the returned Uni and sends the message.
}
----

[[list-open-client-connections]]
==== List open client connections
Expand Down Expand Up @@ -1203,6 +1254,39 @@ quarkus.log.category."io.quarkus.websockets.next.traffic".level=DEBUG <3>
<2> Set the number of characters of a text message payload which will be logged.
<3> Enable `DEBUG` level is for the logger `io.quarkus.websockets.next.traffic`.

[[subscribe-or-not-subscribe]]
== When to subscribe to a `Uni` or `Multi`

`Uni` and `Multi` are lazy types, which means that they do not start processing until they are subscribed to.

When you get (from a parameter or from a method you called) a `Uni` or a `Multi`, whether you should subscribe to it depends on the context:

- if you return the `Uni` or `Multi` in a method invoked by Quarkus (like with Quarkus REST, Quarkus WebSocket Next or Quarkus Messaging), Quarkus subscribes to it and processes the items emitted by the `Multi` or the item emitted by the `Uni`:

[source, java]
----
@Incoming("...")
@Outgoing("...")
public Multi<String> process(Multi<String> input) {
// No need to subscribe to the input Multi, the `process` method is called by Quarkus (Messaging).
return input.map(String::toUpperCase);
}
----

When a `Uni` or `Multi` is returned from a method annotated with `@OnOpen`, `@OnTextMessage`, `@OnBinaryMessage`, or `@OnClose`, Quarkus subscribes to it automatically.

- if you do not return the `Uni` or `Multi` in a method invoked by Quarkus, you should subscribe to it:

[source, java]
----
@Incoming("...")
@Outgoing("...")
public void process(Multi<String> input) {
input.map(String::toUpperCase)
.subscribe().with(s -> log(s));
}
----

[[telemetry]]
== Telemetry

Expand Down

0 comments on commit 26db59d

Please sign in to comment.