Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support initial-response in RPC based event streams #1165

Merged
merged 17 commits into from
Oct 12, 2023
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -555,4 +555,4 @@ let servicesWithIntegrationTests: [String] = [
servicesWithIntegrationTests.forEach(addIntegrationTestTarget)

// Uncomment this line to enable protocol tests
// addProtocolTests()
// addProtocolTests()
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ extension AWSEventStream {
private var decoder: EventStreamMessageDecoder?
private var messageBuffer: [EventStream.Message] = []
private var error: Error?
private var initialMessage: Data = Data()
private var onInitialResponseReceived: ((Data?) -> Void)?
private var didProcessInitialMessage = false

private var decodedPayload = Data()
private var decodededHeaders: [EventStreamHeader] = []
Expand Down Expand Up @@ -44,8 +47,20 @@ extension AWSEventStream {
self.logger.debug("onComplete")
let message = EventStream.Message(headers: self.decodededHeaders.toHeaders(),
payload: self.decodedPayload)
self.messageBuffer.append(message)
if message.headers.contains(
EventStream.Header(name: ":event-type", value: .string("initial-response"))
) {
self.initialMessage = message.payload
self.onInitialResponseReceived?(self.initialMessage)
self.didProcessInitialMessage = true
} else {
self.messageBuffer.append(message)

if !self.didProcessInitialMessage {
self.onInitialResponseReceived?(nil) // Signal that initial-response will never come.
self.didProcessInitialMessage = true
}
}
// This could be end of the stream, hence reset the state
self.decodedPayload = Data()
self.decodededHeaders = []
Expand Down Expand Up @@ -88,6 +103,38 @@ extension AWSEventStream {
return message
}

// Responsible for waiting on the initial response.
// It uses Swift's concurrency model to asynchronously return the data.
public func awaitInitialResponse() async -> Data? {
// The 'withCheckedContinuation' function is used to bridge asynchronous code
// that doesn't use Swift's concurrency model with code that does.
return await withCheckedContinuation { continuation in
// Here, we attempt to retrieve the initial response.
// Once the data is retrieved (or determined to be nil),
// the continuation is resumed with the result.
retrieveInitialResponse { data in
continuation.resume(returning: data)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is required, in all cases, that the continuation gets called (it's a rather large memory leak and a frozen path of execution if it doesn't.)

Considering that retrieveInitialResponse() will in some cases save the continuation to an instance var, how do we guarantee that it gets called in every case?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we can guarantee that it is called in every case.

If didProcessInitialMessage is true the continuation is called immediately through the completion callback: completion(initialMessage)

If didProcessInitialMessage is false the continuation is stored in the onInitialResponseReceived instance variable: self.onInitialResponseReceived = completion

  • In this case, for the stored continuation to be resumed later onInitialResponseReceived must be invoked. It gets invoked during the onComplete phase of EventStreamMessageDecoder which should always be invoked at least once per event.
  • Within onComplete in either case whether we spot an initial-response event-type message, either we set self.onInitialResponseReceived to self.initialMessage or to nil. That should cover all cases.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed in separate conversation that immediately after running the continuation stored in didProcessInitialMessage, set it to nil so it is released and it cannot be called again.

}
}
}

// Attempt to get the initial response.
// If the initial message has been processed, it immediately calls the completion handler.
// Otherwise, it sets up a callback to be triggered once the initial response is received.
private func retrieveInitialResponse(completion: @escaping (Data?) -> Void) {
// Check if the initial message has already been processed.
if self.didProcessInitialMessage {
// If it has been processed, immediately call the completion handler
// with the potentially nil or populated 'initialMessage' value.
completion(initialMessage)
} else {
// If the initial message hasn't been processed,
// set the 'onInitialResponseReceived' callback to our completion handler,
// so it can be called later once the initial response is received.
self.onInitialResponseReceived = completion
}
}

/// Throws an error if one has occurred.
/// This should be called before any other methods to make sure
/// that the decoder is in a valid state.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@ final class AWSMessageDecoderTests: XCTestCase {
XCTAssertEqual(validMessageNoHeaders, decodedMessage)
}

func testDecode_MessageWithInitialResponse() {
try! sut.feed(data: validInitialResponseMessageData)
let decodedMessage = try! sut.message()
// initialResponse message should not be added to the messageBuffer
XCTAssertNil(decodedMessage)
}

func testEndOfStream_StreamClosed() {
try! sut.feed(data: validMessageDataNoHeaders[0..<validMessageDataNoHeaders.count-1])
XCTAssertThrowsError(try sut.endOfStream()) { error in
Expand All @@ -40,4 +47,16 @@ final class AWSMessageDecoderTests: XCTestCase {
XCTAssertEqual("Stream ended before message was complete", message)
}
}

func testAwaitInitialResponse_MessageWithInitialResponse() async {
try! sut.feed(data: validInitialResponseMessageData)
guard let initialResponse = await sut.awaitInitialResponse() else {
XCTFail("Error!")
return
}
XCTAssertEqual(
String(data: validInitialResponseMessage.payload, encoding: .utf8),
String(data: initialResponse, encoding: .utf8)
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,9 @@ final class AWSMessageEncoderTests: XCTestCase {
let encodedMessage = try! sut.encode(message: validMessageNoHeaders)
XCTAssertEqual(validMessageDataNoHeaders, encodedMessage)
}

func testEncode_InitialResponseMessage() {
let encodedMessage = try! sut.encode(message: validInitialResponseMessage)
XCTAssertEqual(validInitialResponseMessageData, encodedMessage)
}
dayaffe marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,3 +92,25 @@ let validMessageDataNoHeaders = Data([
0x68, 0x65, 0x72, 0x20, 0x74, 0x65, 0x73, 0x74, 0x20, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64,
0x8d, 0xf8, 0x0e, 0x65,
])

let validInitialResponseMessage: EventStream.Message = EventStream.Message(
headers: [
.init(name: ":event-type", value: .string("initial-response")),
.init(name: ":message-type", value: .string("event")),
.init(name: ":content-type", value: .string("application/x-amz-json-1.1"))
],
payload: "{\"someMetadata\": \"test\"}".data(using: .utf8)!
)

/// This is the encoded version of the above message validInitialResponseMessage
let validInitialResponseMessageData = Data([
0x00, 0x00, 0x00, 0x88, 0x00, 0x00, 0x00, 0x60, 0xa9, 0x06, 0x45, 0x62, 0x0b, 0x3a, 0x65, 0x76,
0x65, 0x6e, 0x74, 0x2d, 0x74, 0x79, 0x70, 0x65, 0x07, 0x00, 0x10, 0x69, 0x6e, 0x69, 0x74, 0x69,
0x61, 0x6c, 0x2d, 0x72, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x0d, 0x3a, 0x6d, 0x65, 0x73,
0x73, 0x61, 0x67, 0x65, 0x2d, 0x74, 0x79, 0x70, 0x65, 0x07, 0x00, 0x05, 0x65, 0x76, 0x65, 0x6e,
0x74, 0x0d, 0x3a, 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x2d, 0x74, 0x79, 0x70, 0x65, 0x07,
0x00, 0x1a, 0x61, 0x70, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2f, 0x78, 0x2d,
0x61, 0x6d, 0x7a, 0x2d, 0x6a, 0x73, 0x6f, 0x6e, 0x2d, 0x31, 0x2e, 0x31, 0x7b, 0x22, 0x73, 0x6f,
0x6d, 0x65, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x22, 0x3a, 0x20, 0x22, 0x74, 0x65,
0x73, 0x74, 0x22, 0x7d, 0xe6, 0x23, 0x00, 0xf1,
])
Loading