diff --git a/Sources/momento/TopicClient.swift b/Sources/momento/TopicClient.swift index e7a17e9..bd9cc5b 100644 --- a/Sources/momento/TopicClient.swift +++ b/Sources/momento/TopicClient.swift @@ -69,7 +69,39 @@ public class TopicClient: TopicClientProtocol { ) } } - + + public func publish( + cacheName: String, + topicName: String, + value: Data + ) async -> TopicPublishResponse { + if cacheName.count < 1 { + return TopicPublishError( + error: InvalidArgumentError(message: "Must provide a cache name") + ) + } + if topicName.count < 1 { + return TopicPublishError( + error: InvalidArgumentError(message: "Must provide a topic name") + ) + } + do { + let result = try await self.pubsubClient.publish( + cacheName: cacheName, + topicName: topicName, + value: value + ) + return result + } catch { + return TopicPublishError( + error: UnknownError( + message: "Unknown error from publish", + innerException: error + ) + ) + } + } + public func subscribe(cacheName: String, topicName: String) async -> TopicSubscribeResponse { if cacheName.count < 1 { return TopicSubscribeError( diff --git a/Sources/momento/internal/PubsubClient.swift b/Sources/momento/internal/PubsubClient.swift index 3a67010..2fc7e3c 100644 --- a/Sources/momento/internal/PubsubClient.swift +++ b/Sources/momento/internal/PubsubClient.swift @@ -1,3 +1,4 @@ +import Foundation import GRPC import NIO import NIOHPACK @@ -50,7 +51,13 @@ protocol PubsubClientProtocol { topicName: String, value: String ) async throws -> TopicPublishResponse - + + func publish( + cacheName: String, + topicName: String, + value: Data + ) async throws -> TopicPublishResponse + func subscribe( cacheName: String, topicName: String @@ -117,6 +124,22 @@ class PubsubClient: PubsubClientProtocol { request.cacheName = cacheName request.topic = topicName request.value.text = value + return await self.doPublish(request: request) + } + + func publish( + cacheName: String, + topicName: String, + value: Data + ) async -> TopicPublishResponse { + var request = CacheClient_Pubsub__PublishRequest() + request.cacheName = cacheName + request.topic = topicName + request.value.binary = value + return await doPublish(request: request) + } + + private func doPublish(request: CacheClient_Pubsub__PublishRequest) async -> TopicPublishResponse { do { let result = try await self.client.publish(request) // Successful publish returns client_sdk_swift.CacheClient_Pubsub__Empty diff --git a/Tests/client-sdk-swiftTests/client_sdk_swiftTests.swift b/Tests/client-sdk-swiftTests/client_sdk_swiftTests.swift index 1469116..07918fc 100644 --- a/Tests/client-sdk-swiftTests/client_sdk_swiftTests.swift +++ b/Tests/client-sdk-swiftTests/client_sdk_swiftTests.swift @@ -126,7 +126,6 @@ final class client_sdk_swiftTests: XCTestCase { pubResp is TopicPublishSuccess, "Unexpected response: \((pubResp as! TopicPublishError).description)" ) - try await Task.sleep(nanoseconds: 1000) let subscription = (subResp as! TopicSubscribeSuccess).subscription for try await item in subscription { @@ -141,7 +140,52 @@ final class client_sdk_swiftTests: XCTestCase { XCTAssertEqual(value, "publishing and subscribing!", "unexpected topic subscription item value: \(value)") break } - + } + + func testTopicClientPublishesAndSubscribesBinary() async throws { + let creds = try CredentialProvider.fromEnvironmentVariable(envVariableName: "MOMENTO_API_KEY") + let client = TopicClient(configuration: TopicConfigurations.Default.latest(), credentialProvider: creds) + XCTAssertNotNil(client) + + let subResp = await client.subscribe( + cacheName: "test-cache", + topicName: "test-topic" + ) + XCTAssertTrue( + subResp is TopicSubscribeSuccess, + "Unexpected response: \((subResp as! TopicSubscribeError).description)" + ) + + try await Task.sleep(nanoseconds: 1000) + let binaryValue = "publishing and subscribing!".data(using: .utf8)! + let pubResp = await client.publish( + cacheName: "test-cache", + topicName: "test-topic", + value: binaryValue + ) + XCTAssertTrue( + pubResp is TopicPublishSuccess, + "Unexpected response: \((pubResp as! TopicPublishError).description)" + ) + + let subscription = (subResp as! TopicSubscribeSuccess).subscription + for try await item in subscription { + print("Received item: \(String(describing: item))") + XCTAssertTrue( + item is TopicSubscriptionItemBinary, + "received subscription item that was not binary: \(String(describing: item))" + ) + + let value = (item as! TopicSubscriptionItemBinary).value + print("Received value: \(String(decoding: value, as: UTF8.self))") + XCTAssertEqual( + value, + binaryValue, + "unexpected topic subscription item value: \(value)" + ) + break + } + client.close() print("closed subscription") }