Skip to content

Commit

Permalink
ios: implement gRPC & add tests (#416)
Browse files Browse the repository at this point in the history
Implements the [gRPC protocol](https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md) on top of the existing Envoy Mobile interfaces, allowing for easily sending gRPC requests over Envoy.

Note: The `GRPCResponseHandler` isn't kept in memory by the library (similarly to `ResponseHandler`) - only its closures are passed down to the core. Thus, no state is stored on the handler, and the necessary data is kept in memory by the closures capturing it.

Throughout the buffering function, we're currently copying some data by doing `append` calls. This may be optimized further in the future.

This PR also includes a set of tests for this new functionality.

Example of sending a gRPC request:

```swift
let handler = GRPCResponseHandler(queue: .main)
    .onHeaders { headers, grpcStatus, _ in
        print("gRPC status: \(grpcStatus), headers: \(headers)")
    }
    .onMessage { messageData, _ in
        print("Got message over gRPC: \(messageData)")
    }
    .onError { error in
        print("gRPC failed with error: \(error)")
    }

let requestBuilder = GRPCRequestBuilder(path: "/pb.api.v1.foo.Bar/Baz",
                                        authority: "api.foo.com")
let emitter = GRPCClient(httpClient: envoy).send(requestBuilder.build(), handler: handler)
emitter.sendMessage(someProtoMessage.serializedData())
...
```

Risk Level: Low (new feature)
Testing: Unit tests, tested locally

Signed-off-by: Michael Rebello <me@michaelrebello.com>
Signed-off-by: JP Simard <jp@jpsim.com>
  • Loading branch information
rebello95 authored and jpsim committed Nov 28, 2022
1 parent 87477fb commit efd9047
Show file tree
Hide file tree
Showing 10 changed files with 615 additions and 0 deletions.
5 changes: 5 additions & 0 deletions mobile/library/swift/src/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,15 @@ load("//bazel:swift_static_framework.bzl", "swift_static_framework")
swift_static_framework(
name = "ios_framework",
srcs = [
"Data+Extension.swift",
"EnvoyClient.swift",
"EnvoyClientBuilder.swift",
"EnvoyError.swift",
"EnvoyStreamEmitter.swift",
"GRPCClient.swift",
"GRPCRequestBuilder.swift",
"GRPCResponseHandler.swift",
"GRPCStreamEmitter.swift",
"HTTPClient.swift",
"LogLevel.swift",
"Request.swift",
Expand Down
23 changes: 23 additions & 0 deletions mobile/library/swift/src/Data+Extension.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import Foundation

extension Data {
/// Gets the integer at the provided index using the size of `T`.
/// Returns nil if the data is too small.
///
/// - parameter index: The index at which to get the integer value.
///
/// - returns: The next integer in the data, or nil.
func integer<T: FixedWidthInteger>(atIndex index: Int) -> T? {
let size = MemoryLayout<T>.size
guard self.count >= index + size else {
return nil
}

var value: T = 0
_ = Swift.withUnsafeMutableBytes(of: &value) { valuePointer in
self.copyBytes(to: valuePointer, from: index ..< index + size)
}

return value
}
}
25 changes: 25 additions & 0 deletions mobile/library/swift/src/GRPCClient.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import Foundation

/// Client that supports sending and receiving gRPC traffic.
@objcMembers
public final class GRPCClient: NSObject {
private let httpClient: HTTPClient

/// Create a new gRPC client instance.
///
/// - parameter httpClient: The HTTP client to use for gRPC streams.
public init(httpClient: HTTPClient) {
self.httpClient = httpClient
}

/// Send a gRPC request with the provided handler.
///
/// - parameter request: The outbound gRPC request. See `GRPCRequestBuilder` for creation.
/// - parameter handler: Handler for receiving responses.
///
/// - returns: An emitter that can be used for sending more traffic over the stream.
public func send(_ request: Request, handler: GRPCResponseHandler) -> GRPCStreamEmitter {
let emitter = self.httpClient.send(request, handler: handler.underlyingHandler)
return GRPCStreamEmitter(emitter: emitter)
}
}
79 changes: 79 additions & 0 deletions mobile/library/swift/src/GRPCRequestBuilder.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import Foundation

/// Builder used for creating new gRPC `Request` instances.
@objcMembers
public final class GRPCRequestBuilder: NSObject {
private let underlyingBuilder: RequestBuilder

/// Initialize a new builder.
///
/// - parameter path: Path of the RPC (i.e., `/pb.api.v1.Foo/GetBar`).
/// - parameter authority: Authority to use for the RPC (i.e., `api.foo.com`).
/// - parameter useHTTPS: Whether to use HTTPS (or HTTP).
public init(path: String, authority: String, useHTTPS: Bool = true) {
self.underlyingBuilder = RequestBuilder(method: .post,
scheme: useHTTPS ? "https" : "http",
authority: authority,
path: path)
self.underlyingBuilder.addHeader(name: "content-type", value: "application/grpc")
}

/// Append a value to the header key.
///
/// - parameter name: The header key.
/// - parameter value: Value the value associated to the header key.
///
/// - returns: This builder.
@discardableResult
public func addHeader(name: String, value: String) -> GRPCRequestBuilder {
self.underlyingBuilder.addHeader(name: name, value: value)
return self
}

/// Remove all headers with this name.
///
/// - parameter name: The header key to remove.
///
/// - returns: This builder.
@discardableResult
public func removeHeaders(name: String) -> GRPCRequestBuilder {
self.underlyingBuilder.removeHeaders(name: name)
return self
}

/// Remove the value in the specified header.
///
/// - parameter name: The header key to remove.
/// - parameter value: The value to be removed.
///
/// - returns: This builder.
@discardableResult
public func removeHeader(name: String, value: String) -> GRPCRequestBuilder {
self.underlyingBuilder.removeHeader(name: name, value: value)
return self
}

/// Add a specific timeout for the gRPC request. This will be sent in the `grpc-timeout` header.
///
/// - parameter timeoutMS: Timeout, in milliseconds.
///
/// - returns: This builder.
@discardableResult
public func addTimeoutMS(_ timeoutMS: UInt?) -> GRPCRequestBuilder {
let headerName = "grpc-timeout"
if let timeoutMS = timeoutMS {
self.addHeader(name: headerName, value: "\(timeoutMS)m")
} else {
self.removeHeaders(name: headerName)
}

return self
}

/// Creates a request object from the builder.
///
/// - returns: The new request object.
public func build() -> Request {
return self.underlyingBuilder.build()
}
}
152 changes: 152 additions & 0 deletions mobile/library/swift/src/GRPCResponseHandler.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
import Foundation

/// Handler for responses sent over gRPC.
@objcMembers
public final class GRPCResponseHandler: NSObject {
/// Represents the state of a response stream's body data.
private enum State {
/// Awaiting a gRPC compression flag.
case expectingCompressionFlag
/// Awaiting the length specification of the next message.
case expectingMessageLength
/// Awaiting a message with the specified length.
case expectingMessage(messageLength: UInt32)
}

/// Underlying response handler which should be called with response data.
let underlyingHandler: ResponseHandler

/// Initialize a new instance of the handler.
///
/// - parameter queue: Dispatch queue upon which callbacks will be called.
public init(queue: DispatchQueue = .main) {
self.underlyingHandler = ResponseHandler(queue: queue)
}

/// Specify a callback for when response headers are received by the stream.
/// If `endStream` is `true`, the stream is complete.
///
/// - parameter closure: Closure which will receive the headers, gRPC status,
/// and flag indicating if the stream is headers-only.
@discardableResult
public func onHeaders(_ closure:
@escaping (_ headers: [String: [String]], _ grpcStatus: Int, _ endStream: Bool) -> Void)
-> GRPCResponseHandler
{
self.underlyingHandler.onHeaders { headers, _, endStream in
let grpcStatus = GRPCResponseHandler.grpcStatus(fromHeaders: headers)
closure(headers, grpcStatus, endStream)
}

return self
}

/// Specify a callback for when a new message has been received by the stream.
///
/// - parameter closure: Closure which will receive messages on the stream.
@discardableResult
public func onMessage(_ closure:
@escaping (_ message: Data) -> Void)
-> GRPCResponseHandler
{
var buffer = Data()
var state = State.expectingCompressionFlag
self.underlyingHandler.onData { chunk, _ in
// Appending might result in extra copying that can be optimized in the future.
buffer.append(chunk)
// gRPC always sends trailers, so the stream will not complete here.
GRPCResponseHandler.processBuffer(&buffer, state: &state, onMessage: closure)
}

return self
}

/// Specify a callback for when trailers are received by the stream.
/// If the closure is called, the stream is complete.
///
/// - parameter closure: Closure which will receive the trailers.
@discardableResult
public func onTrailers(_ closure:
@escaping (_ trailers: [String: [String]]) -> Void)
-> GRPCResponseHandler
{
self.underlyingHandler.onTrailers(closure)
return self
}

/// Specify a callback for when an internal Envoy exception occurs with the stream.
/// If the closure is called, the stream is complete.
///
/// - parameter closure: Closure which will be called when an error occurs.
@discardableResult
public func onError(_ closure:
@escaping (_ error: EnvoyError) -> Void)
-> GRPCResponseHandler
{
self.underlyingHandler.onError(closure)
return self
}

// MARK: - Helpers

/// Parses out the gRPC status from the provided HTTP headers.
///
/// - parameter headers: The headers from which to obtain the gRPC status.
///
/// - returns: The HTTP status code from the headers, or 0 if none is set.
static func grpcStatus(fromHeaders headers: [String: [String]]) -> Int {
return headers["grpc-status"]?
.compactMap(Int.init)
.first ?? 0
}

/// Recursively processes a buffer of data, buffering it into messages based on state.
/// When a message has been fully buffered, `onMessage` will be called with the message.
///
/// - parameter buffer: The buffer of data from which to determine state and messages.
/// - parameter state: The current state of the buffering.
/// - parameter onMessage: Closure to call when a new message is available.
private static func processBuffer(_ buffer: inout Data, state: inout State,
onMessage: (_ message: Data) -> Void)
{
switch state {
case .expectingCompressionFlag:
guard let compressionFlag: UInt8 = buffer.integer(atIndex: 0) else {
return
}

guard compressionFlag == 0 else {
assertionFailure("gRPC decompression is not supported")
buffer.removeAll()
state = .expectingCompressionFlag
return
}

state = .expectingMessageLength

case .expectingMessageLength:
guard let messageLength: UInt32 = buffer.integer(atIndex: 1) else {
return
}

state = .expectingMessage(messageLength: CFSwapInt32BigToHost(messageLength))

case .expectingMessage(let messageLength):
let prefixedLength = kGRPCPrefixLength + Int(messageLength)
if buffer.count < prefixedLength {
return
}

if messageLength > 0 {
onMessage(buffer.subdata(in: kGRPCPrefixLength..<prefixedLength))
} else {
onMessage(Data())
}

buffer.removeSubrange(0..<prefixedLength)
state = .expectingCompressionFlag
}

self.processBuffer(&buffer, state: &state, onMessage: onMessage)
}
}
49 changes: 49 additions & 0 deletions mobile/library/swift/src/GRPCStreamEmitter.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import Foundation

/// gRPC prefix length: 1 byte for compression and 4 bytes for message length.
let kGRPCPrefixLength: Int = 5

/// Emitter that allows for sending additional data over gRPC.
@objcMembers
public final class GRPCStreamEmitter: NSObject {
private let underlyingEmitter: StreamEmitter

// MARK: - Internal

/// Initialize a new emitter.
///
/// - parameter emitter: The underlying stream emitter to use for sending data.
init(emitter: StreamEmitter) {
self.underlyingEmitter = emitter
}

// MARK: - Public

/// Send a protobuf message's binary data over the gRPC stream.
///
/// - parameter messageData: Binary data of a protobuf message to send.
public func sendMessage(_ messageData: Data) {
// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests
// Length-Prefixed-Message = Compressed-Flag | Message-Length | Message
// Compressed-Flag = 0 / 1, encoded as 1 byte unsigned integer
// Message-Length = length of Message, encoded as 4 byte unsigned integer (big endian)
// Message = binary representation of protobuf message
var prefixData = Data(capacity: kGRPCPrefixLength)

// Compression flag (1 byte) - 0, not compressed
prefixData.append(0)

// Message length (4 bytes)
var length = UInt32(messageData.count).bigEndian
prefixData.append(UnsafeBufferPointer(start: &length, count: 1))

// Send prefix data followed by message data
self.underlyingEmitter.sendData(prefixData)
self.underlyingEmitter.sendData(messageData)
}

/// Close this connection.
public func close() {
self.underlyingEmitter.close()
}
}
21 changes: 21 additions & 0 deletions mobile/library/swift/test/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,27 @@ envoy_mobile_swift_test(
],
)

envoy_mobile_swift_test(
name = "grpc_request_builder_tests",
srcs = [
"GRPCRequestBuilderTests.swift",
],
)

envoy_mobile_swift_test(
name = "grpc_response_handler_tests",
srcs = [
"GRPCResponseHandlerTests.swift",
],
)

envoy_mobile_swift_test(
name = "grpc_stream_emitter_tests",
srcs = [
"GRPCStreamEmitterTests.swift",
],
)

envoy_mobile_swift_test(
name = "request_builder_tests",
srcs = [
Expand Down
Loading

0 comments on commit efd9047

Please sign in to comment.