Skip to content

Commit

Permalink
Merge pull request #1913 from softwaremill/sync-ws-api
Browse files Browse the repository at this point in the history
Introduce a synchronous web socket interface
  • Loading branch information
adamw authored Aug 18, 2023
2 parents 2b3c81e + b5e99eb commit 6f00ad2
Show file tree
Hide file tree
Showing 23 changed files with 333 additions and 109 deletions.
68 changes: 0 additions & 68 deletions core/src/main/scala/sttp/client4/SttpApi.scala
Original file line number Diff line number Diff line change
Expand Up @@ -255,72 +255,4 @@ trait SttpApi extends SttpExtensions with UriInterpolator {
StreamBody(s)(b),
contentType = Some(MediaType.ApplicationOctetStream)
)

// websocket response specifications

def asWebSocket[F[_], T](f: WebSocket[F] => F[T]): WebSocketResponseAs[F, Either[String, T]] =
asWebSocketEither(asStringAlways, asWebSocketAlways(f))

def asWebSocketWithMetadata[F[_], T](
f: (WebSocket[F], ResponseMetadata) => F[T]
): WebSocketResponseAs[F, Either[String, T]] =
asWebSocketEither(asStringAlways, asWebSocketAlwaysWithMetadata(f))

def asWebSocketAlways[F[_], T](f: WebSocket[F] => F[T]): WebSocketResponseAs[F, T] =
asWebSocketAlwaysWithMetadata((w, _) => f(w))

def asWebSocketAlwaysWithMetadata[F[_], T](f: (WebSocket[F], ResponseMetadata) => F[T]): WebSocketResponseAs[F, T] =
WebSocketResponseAs(ResponseAsWebSocket(f))

def asWebSocketUnsafe[F[_]]: WebSocketResponseAs[F, Either[String, WebSocket[F]]] =
asWebSocketEither(asStringAlways, asWebSocketAlwaysUnsafe)

def asWebSocketAlwaysUnsafe[F[_]]: WebSocketResponseAs[F, WebSocket[F]] =
WebSocketResponseAs(ResponseAsWebSocketUnsafe())

def fromMetadata[F[_], T](
default: ResponseAs[T],
conditions: ConditionalResponseAs[WebSocketResponseAs[F, T]]*
): WebSocketResponseAs[F, T] =
WebSocketResponseAs(ResponseAsFromMetadata(conditions.map(_.map(_.delegate)).toList, default.delegate))

/** Uses the `onSuccess` response specification for 101 responses (switching protocols) on JVM/Native, 200 responses
* on JS. Otherwise, use the `onError` specification.
*/
def asWebSocketEither[F[_], A, B](
onError: ResponseAs[A],
onSuccess: WebSocketResponseAs[F, B]
): WebSocketResponseAs[F, Either[A, B]] =
SttpExtensions.asWebSocketEitherPlatform(onError, onSuccess)

// websocket stream response specifications

def asWebSocketStream[S](
s: Streams[S]
)(p: s.Pipe[WebSocketFrame.Data[_], WebSocketFrame]): WebSocketStreamResponseAs[Either[String, Unit], S] =
asWebSocketEither(asStringAlways, asWebSocketStreamAlways(s)(p))

def asWebSocketStreamAlways[S](s: Streams[S])(
p: s.Pipe[WebSocketFrame.Data[_], WebSocketFrame]
): WebSocketStreamResponseAs[Unit, S] = WebSocketStreamResponseAs[Unit, S](ResponseAsWebSocketStream(s, p))

def fromMetadata[T, S](
default: ResponseAs[T],
conditions: ConditionalResponseAs[WebSocketStreamResponseAs[T, S]]*
): WebSocketStreamResponseAs[T, S] =
WebSocketStreamResponseAs[T, S](
ResponseAsFromMetadata(conditions.map(_.map(_.delegate)).toList, default.delegate)
)

/** Uses the `onSuccess` response specification for 101 responses (switching protocols), and the `onError`
* specification otherwise.
*/
def asWebSocketEither[A, B, S](
onError: ResponseAs[A],
onSuccess: WebSocketStreamResponseAs[B, S]
): WebSocketStreamResponseAs[Either[A, B], S] =
fromMetadata(
onError.map(Left(_)),
ConditionalResponseAs(_.code == StatusCode.SwitchingProtocols, onSuccess.map(Right(_)))
).showAs(s"either(${onError.show}, ${onSuccess.show})")
}
41 changes: 41 additions & 0 deletions core/src/main/scala/sttp/client4/SttpWebSocketAsyncApi.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package sttp.client4

import sttp.model.ResponseMetadata
import sttp.ws.WebSocket

trait SttpWebSocketAsyncApi {
def asWebSocket[F[_], T](f: WebSocket[F] => F[T]): WebSocketResponseAs[F, Either[String, T]] =
asWebSocketEither(asStringAlways, asWebSocketAlways(f))

def asWebSocketWithMetadata[F[_], T](
f: (WebSocket[F], ResponseMetadata) => F[T]
): WebSocketResponseAs[F, Either[String, T]] =
asWebSocketEither(asStringAlways, asWebSocketAlwaysWithMetadata(f))

def asWebSocketAlways[F[_], T](f: WebSocket[F] => F[T]): WebSocketResponseAs[F, T] =
asWebSocketAlwaysWithMetadata((w, _) => f(w))

def asWebSocketAlwaysWithMetadata[F[_], T](f: (WebSocket[F], ResponseMetadata) => F[T]): WebSocketResponseAs[F, T] =
WebSocketResponseAs(ResponseAsWebSocket(f))

def asWebSocketUnsafe[F[_]]: WebSocketResponseAs[F, Either[String, WebSocket[F]]] =
asWebSocketEither(asStringAlways, asWebSocketAlwaysUnsafe)

def asWebSocketAlwaysUnsafe[F[_]]: WebSocketResponseAs[F, WebSocket[F]] =
WebSocketResponseAs(ResponseAsWebSocketUnsafe())

def fromMetadata[F[_], T](
default: ResponseAs[T],
conditions: ConditionalResponseAs[WebSocketResponseAs[F, T]]*
): WebSocketResponseAs[F, T] =
WebSocketResponseAs(ResponseAsFromMetadata(conditions.map(_.map(_.delegate)).toList, default.delegate))

/** Uses the `onSuccess` response specification for 101 responses (switching protocols) on JVM/Native, 200 responses
* on JS. Otherwise, use the `onError` specification.
*/
def asWebSocketEither[F[_], A, B](
onError: ResponseAs[A],
onSuccess: WebSocketResponseAs[F, B]
): WebSocketResponseAs[F, Either[A, B]] =
SttpExtensions.asWebSocketEitherPlatform(onError, onSuccess)
}
37 changes: 37 additions & 0 deletions core/src/main/scala/sttp/client4/SttpWebSocketStreamApi.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package sttp.client4

import sttp.capabilities.Streams
import sttp.model.StatusCode
import sttp.ws.WebSocketFrame

trait SttpWebSocketStreamApi {
def asWebSocketStream[S](
s: Streams[S]
)(p: s.Pipe[WebSocketFrame.Data[_], WebSocketFrame]): WebSocketStreamResponseAs[Either[String, Unit], S] =
asWebSocketEither(asStringAlways, asWebSocketStreamAlways(s)(p))

def asWebSocketStreamAlways[S](s: Streams[S])(
p: s.Pipe[WebSocketFrame.Data[_], WebSocketFrame]
): WebSocketStreamResponseAs[Unit, S] = WebSocketStreamResponseAs[Unit, S](ResponseAsWebSocketStream(s, p))

def fromMetadata[T, S](
default: ResponseAs[T],
conditions: ConditionalResponseAs[WebSocketStreamResponseAs[T, S]]*
): WebSocketStreamResponseAs[T, S] =
WebSocketStreamResponseAs[T, S](
ResponseAsFromMetadata(conditions.map(_.map(_.delegate)).toList, default.delegate)
)

/** Uses the `onSuccess` response specification for 101 responses (switching protocols), and the `onError`
* specification otherwise.
*/
def asWebSocketEither[A, B, S](
onError: ResponseAs[A],
onSuccess: WebSocketStreamResponseAs[B, S]
): WebSocketStreamResponseAs[Either[A, B], S] =
fromMetadata(
onError.map(Left(_)),
ConditionalResponseAs(_.code == StatusCode.SwitchingProtocols, onSuccess.map(Right(_)))
).showAs(s"either(${onError.show}, ${onSuccess.show})")

}
44 changes: 44 additions & 0 deletions core/src/main/scala/sttp/client4/SttpWebSocketSyncApi.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package sttp.client4

import sttp.client4.ws.SyncWebSocket
import sttp.model.ResponseMetadata
import sttp.ws.WebSocket

trait SttpWebSocketSyncApi {
def asWebSocket[T](f: SyncWebSocket => T): WebSocketResponseAs[Identity, Either[String, T]] =
asWebSocketEither(asStringAlways, asWebSocketAlways(f))

def asWebSocketWithMetadata[T](
f: (SyncWebSocket, ResponseMetadata) => T
): WebSocketResponseAs[Identity, Either[String, T]] =
asWebSocketEither(asStringAlways, asWebSocketAlwaysWithMetadata(f))

def asWebSocketAlways[T](f: SyncWebSocket => T): WebSocketResponseAs[Identity, T] =
asWebSocketAlwaysWithMetadata((w, _) => f(w))

def asWebSocketAlwaysWithMetadata[T](
f: (SyncWebSocket, ResponseMetadata) => T
): WebSocketResponseAs[Identity, T] =
WebSocketResponseAs[Identity, T](ResponseAsWebSocket[Identity, T]((ws, m) => f(new SyncWebSocket(ws), m)))

def asWebSocketUnsafe: WebSocketResponseAs[Identity, Either[String, SyncWebSocket]] =
asWebSocketEither(asStringAlways, asWebSocketAlwaysUnsafe)

def asWebSocketAlwaysUnsafe: WebSocketResponseAs[Identity, SyncWebSocket] =
WebSocketResponseAs[Identity, WebSocket[Identity]](ResponseAsWebSocketUnsafe()).map(new SyncWebSocket(_))

def fromMetadata[T](
default: ResponseAs[T],
conditions: ConditionalResponseAs[WebSocketResponseAs[Identity, T]]*
): WebSocketResponseAs[Identity, T] =
WebSocketResponseAs(ResponseAsFromMetadata(conditions.map(_.map(_.delegate)).toList, default.delegate))

/** Uses the `onSuccess` response specification for 101 responses (switching protocols) on JVM/Native, 200 responses
* on JS. Otherwise, use the `onError` specification.
*/
def asWebSocketEither[A, B](
onError: ResponseAs[A],
onSuccess: WebSocketResponseAs[Identity, B]
): WebSocketResponseAs[Identity, Either[A, B]] =
SttpExtensions.asWebSocketEitherPlatform(onError, onSuccess)
}
140 changes: 140 additions & 0 deletions core/src/main/scala/sttp/client4/ws/SyncWebSocket.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package sttp.client4.ws

import sttp.client4.Identity
import sttp.model.Headers
import sttp.ws.{WebSocket, WebSocketClosed, WebSocketFrame}

/** Allows interacting with a web socket. Interactions can happen:
*
* - on the frame level, by sending and receiving raw [[WebSocketFrame]] s
* - using the provided `receive*` methods to obtain concatenated data frames, or string/byte payloads, and the
* `send*` method to send string/binary frames.
*
* The `send*` and `receive*` methods may result in a failed effect, with either one of [[sttp.ws.WebSocketException]]
* exceptions, or a backend-specific exception. Specifically, they will fail with [[WebSocketClosed]] if the web socket
* is closed.
*
* See the `either` and `eitherClose` method to lift web socket closed events to the value level.
*/
class SyncWebSocket(val delegate: WebSocket[Identity]) {

/** Receive the next frame from the web socket. This can be a data frame, or a control frame including
* [[WebSocketFrame.Close]]. After receiving a close frame, no further interactions with the web socket should
* happen.
*
* However, not all implementations expose the close frame, and web sockets might also get closed without the proper
* close frame exchange. In such cases, as well as when invoking `receive`/`send` after receiving a close frame, a
* [[WebSocketClosed]] exception will be thrown.
*
* *Should be only called sequentially!* (from a single thread/fiber). Because web socket frames might be fragmented,
* calling this method concurrently might result in threads/fibers receiving fragments of the same frame.
*/
def receive(): WebSocketFrame = delegate.receive()

/** Sends a web socket frame. Can be safely called from multiple threads.
*
* May result in an exception, in case of a network error, or if the socket is closed.
*/
def send(f: WebSocketFrame, isContinuation: Boolean = false): Unit = delegate.send(f, isContinuation)

def isOpen(): Boolean = delegate.isOpen()

/** Receive a single data frame, ignoring others. The frame might be a fragment. Will throw [[WebSocketClosed]] if the
* web socket is closed, or if a close frame is received.
*
* *Should be only called sequentially!* (from a single thread/fiber).
*
* @param pongOnPing
* Should a [[WebSocketFrame.Pong]] be sent when a [[WebSocketFrame.Ping]] is received.
*/
def receiveDataFrame(pongOnPing: Boolean = true): WebSocketFrame.Data[_] = delegate.receiveDataFrame(pongOnPing)

/** Receive a single text data frame, ignoring others. The frame might be a fragment. To receive whole messages, use
* [[receiveText]]. Will throw [[WebSocketClosed]] if the web socket is closed, or if a close frame is received.
*
* *Should be only called sequentially!* (from a single thread/fiber).
*
* @param pongOnPing
* Should a [[WebSocketFrame.Pong]] be sent when a [[WebSocketFrame.Ping]] is received.
*/
def receiveTextFrame(pongOnPing: Boolean = true): WebSocketFrame.Text = delegate.receiveTextFrame(pongOnPing)

/** Receive a single binary data frame, ignoring others. The frame might be a fragment. To receive whole messages, use
* [[receiveBinary]]. Will throw [[WebSocketClosed]] if the web socket is closed, or if a close frame is received.
*
* *Should be only called sequentially!* (from a single thread/fiber).
*
* @param pongOnPing
* Should a [[WebSocketFrame.Pong]] be sent when a [[WebSocketFrame.Ping]] is received.
*/
def receiveBinaryFrame(pongOnPing: Boolean = true): WebSocketFrame.Binary = delegate.receiveBinaryFrame(pongOnPing)

/** Receive a single text message (which might come from multiple, fragmented frames). Ignores non-text frames and
* returns combined results. Will throw [[WebSocketClosed]] if the web socket is closed, or if a close frame is
* received.
*
* *Should be only called sequentially!* (from a single thread/fiber).
*
* @param pongOnPing
* Should a [[WebSocketFrame.Pong]] be sent when a [[WebSocketFrame.Ping]] is received.
*/
def receiveText(pongOnPing: Boolean = true): String = delegate.receiveText(pongOnPing)

/** Receive a single binary message (which might come from multiple, fragmented frames). Ignores non-binary frames and
* returns combined results. Will throw [[WebSocketClosed]] if the web socket is closed, or if a close frame is
* received.
*
* *Should be only called sequentially!* (from a single thread/fiber).
*
* @param pongOnPing
* Should a [[WebSocketFrame.Pong]] be sent when a [[WebSocketFrame.Ping]] is received.
*/
def receiveBinary(pongOnPing: Boolean): Array[Byte] = delegate.receiveBinary(pongOnPing)

/** Extracts the received close frame (if available) as the left side of an either, or returns the original result on
* the right.
*
* Will throw [[WebSocketClosed]] if the web socket is closed, but no close frame is available.
*
* @param f
* The effect describing web socket interactions.
*/
def eitherClose[T](f: => T): Either[WebSocketFrame.Close, T] =
try Right(f)
catch {
case WebSocketClosed(Some(close)) => Left(close)
}

/** Returns an effect computing a:
*
* - `Left` if the web socket is closed - optionally with the received close frame (if available).
* - `Right` with the original result otherwise.
*
* Will never throw a [[WebSocketClosed]].
*
* @param f
* The effect describing web socket interactions.
*/
def either[T](f: => T): Either[Option[WebSocketFrame.Close], T] =
try Right(f)
catch {
case WebSocketClosed(close) => Left(close)
}

/** Sends a web socket frame with the given payload. Can be safely called from multiple threads.
*
* May result in an exception, in case of a network error, or if the socket is closed.
*/
def sendText(payload: String): Unit = delegate.sendText(payload)

/** Sends a web socket frame with the given payload. Can be safely called from multiple threads.
*
* May result in an exception, in case of a network error, or if the socket is closed.
*/
def sendBinary(payload: Array[Byte]): Unit = delegate.sendBinary(payload)

/** Idempotent when used sequentially. */
def close(): Unit = delegate.close()

def upgradeHeaders: Headers = delegate.upgradeHeaders
}
7 changes: 7 additions & 0 deletions core/src/main/scala/sttp/client4/ws/package.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package sttp.client4

package object ws {
object async extends SttpWebSocketAsyncApi
object sync extends SttpWebSocketSyncApi
object stream extends SttpWebSocketStreamApi
}
2 changes: 1 addition & 1 deletion core/src/main/scalajs/sttp/client4/SttpExtensions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ object SttpExtensions {
onError: ResponseAs[A],
onSuccess: WebSocketResponseAs[F, B]
): WebSocketResponseAs[F, Either[A, B]] =
fromMetadata(
ws.async.fromMetadata(
onError.map(Left(_)),
ConditionalResponseAs(_.code == StatusCode.Ok, onSuccess.map(Right(_)))
).showAs(s"either(${onError.show}, ${onSuccess.show})")
Expand Down
10 changes: 6 additions & 4 deletions core/src/main/scalajvm/sttp/client4/SttpExtensions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,10 @@ object SttpExtensions {
onError: ResponseAs[A],
onSuccess: WebSocketResponseAs[F, B]
): WebSocketResponseAs[F, Either[A, B]] =
fromMetadata(
onError.map(Left(_)),
ConditionalResponseAs(_.code == StatusCode.SwitchingProtocols, onSuccess.map(Right(_)))
).showAs(s"either(${onError.show}, ${onSuccess.show})")
ws.async
.fromMetadata(
onError.map(Left(_)),
ConditionalResponseAs(_.code == StatusCode.SwitchingProtocols, onSuccess.map(Right(_)))
)
.showAs(s"either(${onError.show}, ${onSuccess.show})")
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ object SttpExtensions {
onError: ResponseAs[A],
onSuccess: WebSocketResponseAs[F, B]
): WebSocketResponseAs[F, Either[A, B]] =
fromMetadata(
sttp.client4.ws.async.fromMetadata(
onError.map(Left(_)),
ConditionalResponseAs(_.code == StatusCode.SwitchingProtocols, onSuccess.map(Right(_)))
).showAs(s"either(${onError.show}, ${onSuccess.show})")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import sttp.client4.SttpClientException.ReadException
import sttp.client4._
import sttp.client4.internal._
import sttp.client4.monad.IdMonad
import sttp.client4.ws.async._
import sttp.model._
import sttp.monad.{FutureMonad, TryMonad}
import sttp.ws.WebSocketFrame
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import org.scalatest.Suite
import org.scalatest.flatspec.AsyncFlatSpecLike
import sttp.client4.testing.HttpTest.wsEndpoint
import sttp.client4._
import sttp.client4.ws.async._
import sttp.monad.MonadError
import sttp.client4.testing.ConvertToFuture

Expand Down
Loading

0 comments on commit 6f00ad2

Please sign in to comment.