diff --git a/Networking/Sources/Networking/Peer.swift b/Networking/Sources/Networking/Peer.swift new file mode 100644 index 00000000..dab0b0ba --- /dev/null +++ b/Networking/Sources/Networking/Peer.swift @@ -0,0 +1,148 @@ +import Foundation +import Logging +import Utils + +let peerLogger = Logger(label: "PeerServer") + +public enum PeerMessageType: Sendable { + case uniquePersistent // most messages type + case commonEphemeral +} + +public protocol PeerMessage: Equatable, Sendable { + func getData() -> Data + func getMessageType() -> PeerMessageType +} + +public struct PeerMessageReceived: Event { + public let messageID: String + public let message: QuicMessage +} + +// TODO: add error or remove it +public struct PeerErrorReceived: Event { + public let messageID: String? + public let error: QuicError +} + +// Define the Peer actor +public actor Peer { + private let config: QuicConfig + private var quicServer: QuicServer! + private var clients: [NetAddr: QuicClient] + private let eventBus: EventBus + + public init(config: QuicConfig, eventBus: EventBus) async throws { + self.config = config + self.eventBus = eventBus + clients = [:] + quicServer = try await QuicServer(config: config, messageHandler: self) + } + + deinit { + Task { [weak self] in + guard let self else { return } + + var clients = await self.clients + for client in clients.values { + await client.close() + } + clients.removeAll() + await self.quicServer.close() + } + } + + // Respond to a message with a specific messageID using Data + func respond(to messageID: String, with data: Data) async -> QuicStatus { + await quicServer.respondGetStatus(to: messageID, with: data) + } + + // Respond to a message with a specific messageID using PeerMessage + func respond(to messageID: String, with message: any PeerMessage) async -> QuicStatus { + let messageType = message.getMessageType() + return + await quicServer + .respondGetStatus( + to: messageID, + with: message.getData(), + kind: (messageType == .uniquePersistent) ? .uniquePersistent : .commonEphemeral + ) + } + + // Sends a message to another peer and returns the status + func sendMessage(to peer: NetAddr, with message: any PeerMessage) async throws -> QuicStatus { + let buffer = message.getData() + let messageType = message.getMessageType() + return try await sendDataToPeer(buffer, to: peer, messageType: messageType) + } + + private func sendDataToPeer(_ data: Data, to peerAddr: NetAddr, messageType: PeerMessageType) + async throws -> QuicStatus + { + if let client = clients[peerAddr] { + // Client already exists, use it to send the data + return try await client.send( + data: data, + streamKind: messageType == .uniquePersistent ? .uniquePersistent : .commonEphemeral + ) + } else { + let config = QuicConfig( + id: config.id, cert: config.cert, key: config.key, alpn: config.alpn, + ipAddress: peerAddr.ipAddress, port: peerAddr.port + ) + // Client does not exist, create a new one + let client = try await QuicClient(config: config, messageHandler: self) + clients[peerAddr] = client + return try await client.send( + data: data, + streamKind: messageType == .uniquePersistent ? .uniquePersistent : .commonEphemeral + ) + } + } + + private func removeClient(client: QuicClient) async { + let peerAddr = await client.getNetAddr() + await client.close() + _ = clients.removeValue(forKey: peerAddr) + } + + func getPeerAddr() -> String { + "\(config.ipAddress):\(config.port)" + } +} + +// QuicClientMessageHandler methods +extension Peer: QuicClientMessageHandler { + public func didReceiveMessage(quicClient: QuicClient, message: QuicMessage) async { + switch message.type { + case .shutdownComplete: + await removeClient(client: quicClient) + default: + break + } + } + + public func didReceiveError(quicClient _: QuicClient, error: QuicError) async { + peerLogger.error("Failed to receive message: \(error)") + await eventBus.publish(PeerErrorReceived(messageID: nil, error: error)) + } +} + +// QuicServerMessageHandler methods +extension Peer: QuicServerMessageHandler { + public func didReceiveMessage(server _: QuicServer, messageID: String, message: QuicMessage) async { + switch message.type { + case .received: + await eventBus.publish(PeerMessageReceived(messageID: messageID, message: message)) + case .shutdownComplete: + peerLogger.info("quic server shutdown") + default: + break + } + } + + public func didReceiveError(server _: QuicServer, messageID: String, error: QuicError) async { + peerLogger.error("Failed to receive message: \(error)") + await eventBus.publish(PeerErrorReceived(messageID: messageID, error: error)) + } +} diff --git a/Networking/Sources/Networking/msquic/QuicClient.swift b/Networking/Sources/Networking/msquic/QuicClient.swift index 73dc4b4f..56f043a4 100644 --- a/Networking/Sources/Networking/msquic/QuicClient.swift +++ b/Networking/Sources/Networking/msquic/QuicClient.swift @@ -1,4 +1,3 @@ -import Atomics import Foundation import Logging import msquic @@ -6,22 +5,20 @@ import NIO let clientLogger = Logger(label: "QuicClient") -public protocol QuicClientMessageHandler: AnyObject { - func didReceiveMessage(quicClient: QuicClient, message: QuicMessage) - // TODO: add error or remove it - func didReceiveError(quicClient: QuicClient, error: QuicError) +public protocol QuicClientMessageHandler: AnyObject, Sendable { + func didReceiveMessage(quicClient: QuicClient, message: QuicMessage) async + func didReceiveError(quicClient: QuicClient, error: QuicError) async } -public class QuicClient: @unchecked Sendable { - private var api: UnsafePointer? +public actor QuicClient: Sendable, QuicConnectionMessageHandler { + private var api: UnsafePointer private var registration: HQuic? private var configuration: HQuic? private var connection: QuicConnection? private let config: QuicConfig private weak var messageHandler: QuicClientMessageHandler? - private let isClosed: ManagedAtomic = .init(false) - init(config: QuicConfig, messageHandler: QuicClientMessageHandler? = nil) throws { + public init(config: QuicConfig, messageHandler: QuicClientMessageHandler? = nil) async throws { self.config = config self.messageHandler = messageHandler var rawPointer: UnsafeRawPointer? @@ -47,101 +44,51 @@ public class QuicClient: @unchecked Sendable { api = boundPointer registration = registrationHandle - } - - deinit { - close() - clientLogger.trace("QuicClient Deinit") - } - - func start() throws -> QuicStatus { - let status = QuicStatusCode.success.rawValue - try loadConfiguration() + try config.loadConfiguration( + api: api, registration: registration, configuration: &configuration + ) connection = try QuicConnection( api: api, registration: registration, configuration: configuration, messageHandler: self ) try connection?.start(ipAddress: config.ipAddress, port: config.port) - return status - } - - // Asynchronous send method that waits for a QuicMessage reply - func send(message: Data) async throws -> QuicMessage { - try await send(message: message, streamKind: .uniquePersistent) - } - - // send method that returns a QuicStatus - func send(message: Data, streamKind: StreamKind) throws -> QuicStatus { - guard let connection else { - throw QuicError.getConnectionFailed - } - let sendStream: QuicStream - // Check if there is an existing stream of the same kind - = if streamKind == .uniquePersistent - { - // If there is, send the message to the existing stream - try connection.createOrGetUniquePersistentStream(kind: streamKind) - } else { - // If there is not, create a new stream - try connection.createCommonEphemeralStream() - } - return sendStream.send(buffer: message, kind: streamKind) } - // Asynchronous send method that waits for a QuicMessage reply - func send(message: Data, streamKind: StreamKind = .uniquePersistent) async throws -> QuicMessage { + // Send method that returns a QuicStatus + public func send(data: Data, streamKind: StreamKind) async throws -> QuicStatus { guard let connection else { throw QuicError.getConnectionFailed } - let sendStream: QuicStream - // Check if there is an existing stream of the same kind - = if streamKind == .uniquePersistent - { - // If there is, send the message to the existing stream - try connection.createOrGetUniquePersistentStream(kind: streamKind) - } else { - // If there is not, create a new stream - try connection.createCommonEphemeralStream() - } - return try await sendStream.send(buffer: message) + let sendStream: QuicStream = + if streamKind == .uniquePersistent { + try await connection.createOrGetUniquePersistentStream(kind: streamKind) + } else { + try await connection.createCommonEphemeralStream() + } + return sendStream.send(with: data, kind: streamKind) } func getNetAddr() -> NetAddr { NetAddr(ipAddress: config.ipAddress, port: config.port) } - func close() { - if isClosed.compareExchange(expected: false, desired: true, ordering: .acquiring).exchanged { - if let connection { - connection.close() - self.connection = nil - } - - if let configuration { - api?.pointee.ConfigurationClose(configuration) - self.configuration = nil - } - - if let registration { - api?.pointee.RegistrationClose(registration) - self.registration = nil - } + public func close() async { + if let connection { + await connection.close() + self.connection = nil + } - if api != nil { - MsQuicClose(api) - api = nil - } + if let configuration { + api.pointee.ConfigurationClose(configuration) + self.configuration = nil + } - if let messageHandler { - messageHandler.didReceiveMessage( - quicClient: self, message: QuicMessage(type: .close, data: nil) - ) - } - clientLogger.debug("QuicClient Close") + if let registration { + api.pointee.RegistrationClose(registration) + self.registration = nil } + MsQuicClose(api) } -} -extension QuicClient: QuicConnectionMessageHandler { public func didReceiveMessage( connection _: QuicConnection, stream _: QuicStream?, message: QuicMessage ) { @@ -153,10 +100,15 @@ extension QuicClient: QuicConnectionMessageHandler { ) case .shutdownComplete: - // Use [weak self] to avoid strong reference cycle + clientLogger.info( + "Client[\(getNetAddr())] shutdown" + ) + // Call messageHandler safely in the actor context Task { [weak self] in guard let self else { return } - close() + await messageHandler?.didReceiveMessage( + quicClient: self, message: QuicMessage(type: .shutdownComplete, data: nil) + ) } default: @@ -170,9 +122,3 @@ extension QuicClient: QuicConnectionMessageHandler { clientLogger.error("Failed to receive message: \(error)") } } - -extension QuicClient { - private func loadConfiguration() throws { - try config.loadConfiguration(api: api, registration: registration, configuration: &configuration) - } -} diff --git a/Networking/Sources/Networking/msquic/QuicConfig.swift b/Networking/Sources/Networking/msquic/QuicConfig.swift index 134ee112..6092549f 100644 --- a/Networking/Sources/Networking/msquic/QuicConfig.swift +++ b/Networking/Sources/Networking/msquic/QuicConfig.swift @@ -1,7 +1,7 @@ import Foundation import msquic -public struct QuicConfig { +public struct QuicConfig: Sendable { public let id: String public let cert: String public let key: String @@ -20,7 +20,7 @@ public struct QuicConfig { settings.IsSet.IdleTimeoutMs = 1 settings.ServerResumptionLevel = 2 // QUIC_SERVER_RESUME_AND_ZERORTT settings.IsSet.ServerResumptionLevel = 1 - settings.PeerBidiStreamCount = 1 + settings.PeerBidiStreamCount = 100 settings.IsSet.PeerBidiStreamCount = 1 // Use withCString to avoid manual memory management @@ -38,11 +38,11 @@ public struct QuicConfig { QuicCredentialConfig.__Unnamed_union___Anonymous_field2( CertificateFile: certFilePointer ), - Principal: nil, // Not needed in this context - Reserved: nil, // Not needed in this context - AsyncHandler: nil, // Not needed in this context - AllowedCipherSuites: QUIC_ALLOWED_CIPHER_SUITE_NONE, // Default value - CaCertificateFile: nil // Not needed in this context + Principal: nil, + Reserved: nil, + AsyncHandler: nil, + AllowedCipherSuites: QUIC_ALLOWED_CIPHER_SUITE_NONE, + CaCertificateFile: nil ) // Convert ALPN to data buffer diff --git a/Networking/Sources/Networking/msquic/QuicConnection.swift b/Networking/Sources/Networking/msquic/QuicConnection.swift index 4f86a13b..87b59251 100644 --- a/Networking/Sources/Networking/msquic/QuicConnection.swift +++ b/Networking/Sources/Networking/msquic/QuicConnection.swift @@ -1,4 +1,3 @@ -import Atomics import Foundation import Logging import msquic @@ -9,26 +8,81 @@ let logger = Logger(label: "QuicConnection") public protocol QuicConnectionMessageHandler: AnyObject { func didReceiveMessage( connection: QuicConnection, stream: QuicStream?, message: QuicMessage - ) + ) async func didReceiveError( connection: QuicConnection, stream: QuicStream, error: QuicError - ) + ) async } -public class QuicConnection { +actor StreamManager { + private var uniqueStreams: [StreamKind: QuicStream] = [:] + private var commonStreams: [QuicStream] + + init() { + uniqueStreams = [:] + commonStreams = .init() + } + + func getUniqueStream(kind: StreamKind) -> QuicStream? { + uniqueStreams[kind] + } + + func addUniqueStream(kind: StreamKind, stream: QuicStream) { + uniqueStreams[kind] = stream + } + + func removeUniqueStream(kind: StreamKind) { + _ = uniqueStreams.removeValue(forKey: kind) + } + + func addCommonStream(_ stream: QuicStream) { + commonStreams.append(stream) + } + + func commonContains(_ stream: QuicStream) -> Bool { + commonStreams.contains { $0 === stream } + } + + func removeCommonStream(_ stream: QuicStream) { + commonStreams.removeAll(where: { $0 === stream }) + } + + func removeStream(_ stream: QuicStream) { + let cointain = commonContains(stream) + if cointain { + removeCommonStream(stream) + } else { + removeUniqueStream(kind: stream.kind) + } + } + + func closeAllCommonStreams() { + for stream in commonStreams { + stream.close() + } + commonStreams.removeAll() + } + + func closeAllUniqueStreams() { + for stream in uniqueStreams.values { + stream.close() + } + uniqueStreams.removeAll() + } +} + +public class QuicConnection: @unchecked Sendable { private var connection: HQuic? - private var api: UnsafePointer? + private let api: UnsafePointer private var registration: HQuic? private var configuration: HQuic? - private var uniquePersistentStreams: AtomicDictionary - private var commonEphemeralStreams: AtomicArray + private var streamManager: StreamManager private weak var messageHandler: QuicConnectionMessageHandler? private var connectionCallback: ConnectionCallback? - private let isClosed: ManagedAtomic = .init(false) // Initializer for creating a new connection init( - api: UnsafePointer?, + api: UnsafePointer, registration: HQuic?, configuration: HQuic?, messageHandler: QuicConnectionMessageHandler? = nil @@ -37,8 +91,7 @@ public class QuicConnection { self.registration = registration self.configuration = configuration self.messageHandler = messageHandler - uniquePersistentStreams = .init() - commonEphemeralStreams = .init() + streamManager = StreamManager() connectionCallback = { connection, context, event in QuicConnection.connectionCallback( connection: connection, context: context, event: event @@ -49,7 +102,7 @@ public class QuicConnection { // Initializer for wrapping an existing connection init( - api: UnsafePointer?, registration: HQuic?, configuration: HQuic?, + api: UnsafePointer, registration: HQuic?, configuration: HQuic?, connection: HQuic?, messageHandler: QuicConnectionMessageHandler? = nil ) { self.api = api @@ -57,8 +110,7 @@ public class QuicConnection { self.configuration = configuration self.connection = connection self.messageHandler = messageHandler - uniquePersistentStreams = .init() - commonEphemeralStreams = .init() + streamManager = StreamManager() connectionCallback = { connection, context, event in QuicConnection.connectionCallback( connection: connection, context: context, event: event @@ -66,18 +118,8 @@ public class QuicConnection { } } - // Deinitializer to ensure resources are cleaned up - deinit { - close() - logger.trace("QuicConnection Deinit") - } - // Sets the callback handler for the connection func setCallbackHandler() -> QuicStatus { - guard let api, let connection, let configuration else { - return QuicStatusCode.invalidParameter.rawValue - } - let callbackPointer = unsafeBitCast( connectionCallback, to: UnsafeMutableRawPointer?.self ) @@ -93,79 +135,61 @@ public class QuicConnection { // Opens the connection private func open() throws { - let status = - (api?.pointee.ConnectionOpen( - registration, - { connection, context, event -> QuicStatus in - return QuicConnection.connectionCallback( - connection: connection, context: context, event: event - ) - }, Unmanaged.passUnretained(self).toOpaque(), &connection - )).status + let status = api.pointee.ConnectionOpen( + registration, + { connection, context, event -> QuicStatus in + return QuicConnection.connectionCallback( + connection: connection, context: context, event: event + ) + }, Unmanaged.passUnretained(self).toOpaque(), &connection + ) if status.isFailed { throw QuicError.invalidStatus(status: status.code) } } // Creates or retrieves a unique persistent stream - func createOrGetUniquePersistentStream(kind: StreamKind) throws -> QuicStream { - if let stream = uniquePersistentStreams[kind] { + func createOrGetUniquePersistentStream(kind: StreamKind) async throws -> QuicStream { + if let stream = await streamManager.getUniqueStream(kind: kind) { return stream } let stream = try QuicStream(api: api, connection: connection, kind, messageHandler: self) - uniquePersistentStreams[kind] = stream + await streamManager.addUniqueStream(kind: kind, stream: stream) return stream } // Creates a common ephemeral stream - func createCommonEphemeralStream() throws -> QuicStream { - let stream = try QuicStream(api: api, connection: connection, .commonEphemeral, messageHandler: self) - commonEphemeralStreams.append(stream) + func createCommonEphemeralStream() async throws -> QuicStream { + let stream: QuicStream = try QuicStream( + api: api, connection: connection, .commonEphemeral, messageHandler: self + ) + await streamManager.addCommonStream(stream) return stream } - // Removes a stream from the connection - func removeStream(stream: QuicStream) { - stream.close() - if stream.kind == .uniquePersistent { - _ = uniquePersistentStreams.removeValue(forKey: stream.kind) - } else { - commonEphemeralStreams.removeAll(where: { $0 === stream }) + // Closes the connection and cleans up resources + func close() async { + connectionCallback = nil + messageHandler = nil + await streamManager.closeAllCommonStreams() + await streamManager.closeAllUniqueStreams() + if let connection { + api.pointee.ConnectionClose(connection) + self.connection = nil } + logger.debug("QuicConnection Close") } // Starts the connection with the specified IP address and port func start(ipAddress: String, port: UInt16) throws { - let status = - (api?.pointee.ConnectionStart( - connection, configuration, QUIC_ADDRESS_FAMILY(QUIC_ADDRESS_FAMILY_UNSPEC), - ipAddress, port - )).status + let status = api.pointee.ConnectionStart( + connection, configuration, QUIC_ADDRESS_FAMILY(QUIC_ADDRESS_FAMILY_UNSPEC), + ipAddress, port + ) if status.isFailed { throw QuicError.invalidStatus(status: status.code) } } - - // Closes the connection and cleans up resources - func close() { - if isClosed.compareExchange(expected: false, desired: true, ordering: .acquiring).exchanged { - connectionCallback = nil - messageHandler = nil - for stream in commonEphemeralStreams { - stream.close() - } - commonEphemeralStreams.removeAll() - for stream in uniquePersistentStreams.values { - stream.close() - } - uniquePersistentStreams.removeAll() - if connection != nil { - api?.pointee.ConnectionClose(connection) - connection = nil - } - logger.debug("QuicConnection close") - } - } } extension QuicConnection { @@ -183,7 +207,7 @@ extension QuicConnection { let status: QuicStatus = QuicStatusCode.success.rawValue switch event.pointee.Type { case QUIC_CONNECTION_EVENT_CONNECTED: - logger.info("[\(String(describing: connection))] Connected") + logger.debug("[\(String(describing: connection))] Connected") case QUIC_CONNECTION_EVENT_SHUTDOWN_INITIATED_BY_TRANSPORT: if event.pointee.SHUTDOWN_INITIATED_BY_TRANSPORT.Status @@ -206,13 +230,12 @@ extension QuicConnection { case QUIC_CONNECTION_EVENT_SHUTDOWN_COMPLETE: logger.info("[\(String(describing: connection))] Shutdown all done") if event.pointee.SHUTDOWN_COMPLETE.AppCloseInProgress == 0 { - if let messageHandler = quicConnection.messageHandler { - messageHandler.didReceiveMessage( + Task { + await quicConnection.messageHandler?.didReceiveMessage( connection: quicConnection, stream: nil, message: QuicMessage(type: .shutdownComplete, data: nil) ) - quicConnection.messageHandler = nil } } @@ -225,10 +248,13 @@ extension QuicConnection { logger.info("[\(String(describing: connection))] Peer stream started") let stream = event.pointee.PEER_STREAM_STARTED.Stream let quicStream = QuicStream( - api: quicConnection.api, connection: connection, stream: stream, messageHandler: quicConnection + api: quicConnection.api, connection: connection, stream: stream, + messageHandler: quicConnection ) quicStream.setCallbackHandler() - quicConnection.commonEphemeralStreams.append(quicStream) + Task { + await quicConnection.streamManager.addCommonStream(quicStream) + } default: break @@ -242,16 +268,28 @@ extension QuicConnection: QuicStreamMessageHandler { public func didReceiveMessage(_ stream: QuicStream, message: QuicMessage) { switch message.type { case .shutdownComplete: - removeStream(stream: stream) + Task { + stream.close() + await streamManager.removeStream(stream) + } default: break } - messageHandler?.didReceiveMessage(connection: self, stream: stream, message: message) + // Call messageHandler safely in the actor context + Task { [weak self] in + guard let self else { return } + await messageHandler?.didReceiveMessage( + connection: self, stream: stream, message: message + ) + } } // Handles errors received from the stream public func didReceiveError(_ stream: QuicStream, error: QuicError) { logger.error("Failed to receive message: \(error)") - messageHandler?.didReceiveError(connection: self, stream: stream, error: error) + Task { [weak self] in + guard let self else { return } + await messageHandler?.didReceiveError(connection: self, stream: stream, error: error) + } } } diff --git a/Networking/Sources/Networking/msquic/QuicListener.swift b/Networking/Sources/Networking/msquic/QuicListener.swift new file mode 100644 index 00000000..2f937509 --- /dev/null +++ b/Networking/Sources/Networking/msquic/QuicListener.swift @@ -0,0 +1,182 @@ +import Foundation +import Logging +import msquic +import Utils + +let listenLogger: Logger = .init(label: "QuicListener") + +actor ConnectionsManager { + private var connections: [QuicConnection] = [] + + func add(_ connection: QuicConnection) { + connections.append(connection) + } + + func remove(_ connection: QuicConnection) async { + await connection.close() + connections.removeAll(where: { $0 === connection }) + } + + func all() -> [QuicConnection] { + connections + } + + func removeAll() async { + for connection in connections { + await connection.close() + } + connections.removeAll() + } +} + +public protocol QuicListenerMessageHandler: AnyObject { + func didReceiveMessage(connection: QuicConnection, stream: QuicStream, message: QuicMessage) + async + func didReceiveError(connection: QuicConnection, stream: QuicStream, error: QuicError) async +} + +public class QuicListener: @unchecked Sendable { + private let api: UnsafePointer + private let registration: HQuic? + private let configuration: HQuic? + private var config: QuicConfig + private var listener: HQuic? + public weak var messageHandler: QuicListenerMessageHandler? + private let connectionsManager: ConnectionsManager + + public init( + api: UnsafePointer, + registration: HQuic?, + configuration: HQuic?, + config: QuicConfig, + messageHandler: QuicServer? = nil + ) throws { + self.api = api + self.registration = registration + self.configuration = configuration + self.config = config + self.messageHandler = messageHandler + connectionsManager = .init() + try openListener(port: config.port, listener: &listener) + } + + private func openListener(port: UInt16, listener: inout HQuic?) throws { + // Open the listener + let status = + api.pointee.ListenerOpen( + registration, + { listener, context, event -> QuicStatus in + QuicListener.serverListenerCallback( + listener: listener, context: context, event: event + ) + }, + UnsafeMutableRawPointer(Unmanaged.passUnretained(self).toOpaque()), &listener + ) + + guard status.isSucceeded else { + throw QuicError.invalidStatus(status: status.code) + } + + // Prepare ALPN buffer + let alpnData = Data(config.alpn.utf8) + try alpnData.withUnsafeBytes { bufferPointer in + var alpnBuffer = QuicBuffer( + Length: UInt32(bufferPointer.count), + Buffer: UnsafeMutablePointer( + mutating: bufferPointer.bindMemory(to: UInt8.self).baseAddress! + ) + ) + + // Prepare address + var address = QUIC_ADDR() + QuicAddrSetFamily(&address, QUIC_ADDRESS_FAMILY(QUIC_ADDRESS_FAMILY_UNSPEC)) + QuicAddrSetPort(&address, port) + + // Start the listener + let startStatus: QuicStatus = + api.pointee.ListenerStart(listener, &alpnBuffer, 1, &address) + + guard startStatus.isSucceeded else { + throw QuicError.invalidStatus(status: startStatus.code) + } + } + } + + func getNetAddr() -> NetAddr { + NetAddr(ipAddress: config.ipAddress, port: config.port) + } + + private static func serverListenerCallback( + listener _: HQuic?, context: UnsafeMutableRawPointer?, + event: UnsafePointer? + ) -> QuicStatus { + var status: QuicStatus = QuicStatusCode.notSupported.rawValue + guard let context, let event else { + return status + } + let listener: QuicListener = Unmanaged.fromOpaque(context) + .takeUnretainedValue() + switch event.pointee.Type { + case QUIC_LISTENER_EVENT_NEW_CONNECTION: + let connection: HQuic = event.pointee.NEW_CONNECTION.Connection + let quicConnection = QuicConnection( + api: listener.api, + registration: listener.registration, + configuration: listener.configuration, + connection: connection, + messageHandler: listener + ) + Task { + await listener.connectionsManager.add(quicConnection) + } + status = quicConnection.setCallbackHandler() + default: + break + } + return status + } + + public func close() async { + if let listener { + api.pointee.ListenerClose(listener) + self.listener = nil + } + await connectionsManager.removeAll() + } +} + +extension QuicListener: QuicConnectionMessageHandler { + public func didReceiveMessage( + connection: QuicConnection, stream: QuicStream?, message: QuicMessage + ) { + switch message.type { + case .shutdownComplete: + Task { + await connectionsManager.remove(connection) + } + case .received: + if let stream, let messageHandler { + Task { + await messageHandler.didReceiveMessage( + connection: connection, stream: stream, message: message + ) + } + } + default: + break + } + } + + public func didReceiveError( + connection: QuicConnection, stream: QuicStream, error: QuicError + ) { + listenLogger.error("Failed to receive message: \(error)") + if let messageHandler { + Task { + await messageHandler.didReceiveError( + connection: connection, stream: stream, error: error + ) + } + } + } +} diff --git a/Networking/Sources/Networking/msquic/QuicServer.swift b/Networking/Sources/Networking/msquic/QuicServer.swift new file mode 100644 index 00000000..0e6657b2 --- /dev/null +++ b/Networking/Sources/Networking/msquic/QuicServer.swift @@ -0,0 +1,109 @@ +import Foundation +import Logging +import msquic +import Utils + +let serverLogger: Logger = .init(label: "QuicServer") + +public protocol QuicServerMessageHandler: AnyObject, Sendable { + func didReceiveMessage(server: QuicServer, messageID: String, message: QuicMessage) async + func didReceiveError(server: QuicServer, messageID: String, error: QuicError) async +} + +public actor QuicServer: Sendable, QuicListenerMessageHandler { + private var api: UnsafePointer + private var registration: HQuic? + private var configuration: HQuic? + private var listener: QuicListener? + private let config: QuicConfig + private weak var messageHandler: QuicServerMessageHandler? + private var pendingMessages: [String: (QuicConnection, QuicStream)] + + init(config: QuicConfig, messageHandler: QuicServerMessageHandler? = nil) async throws { + self.config = config + self.messageHandler = messageHandler + pendingMessages = [:] + var rawPointer: UnsafeRawPointer? + let status: UInt32 = MsQuicOpenVersion(2, &rawPointer) + + if QuicStatus(status).isFailed { + throw QuicError.invalidStatus(status: status.code) + } + guard + let boundPointer: UnsafePointer = rawPointer?.assumingMemoryBound( + to: QuicApiTable.self + ) + else { + throw QuicError.getApiFailed + } + + var registrationHandle: HQuic? + let registrationStatus = boundPointer.pointee.RegistrationOpen(nil, ®istrationHandle) + if QuicStatus(registrationStatus).isFailed { + throw QuicError.invalidStatus(status: registrationStatus.code) + } + api = boundPointer + registration = registrationHandle + try config.loadConfiguration( + api: api, registration: registration, configuration: &configuration + ) + listener = try QuicListener( + api: boundPointer, registration: registration, configuration: configuration, config: config, + messageHandler: self + ) + } + + public func close() async { + if let listener { + await listener.close() + self.listener = nil + } + if let configuration { + api.pointee.ConfigurationClose(configuration) + self.configuration = nil + } + if let registration { + api.pointee.RegistrationClose(registration) + self.registration = nil + } + MsQuicClose(api) + } + + // Respond to a message with a specific messageID using Data + func respondGetStatus(to messageID: String, with data: Data, kind: StreamKind? = nil) async + -> QuicStatus + { + var status = QuicStatusCode.internalError.rawValue + if let (_, stream) = pendingMessages[messageID] { + let streamKind = kind ?? stream.kind + pendingMessages.removeValue(forKey: messageID) + status = stream.send(with: data, kind: streamKind) + } else { + serverLogger.error("Message not found") + } + return status + } + + public func didReceiveMessage( + connection: QuicConnection, stream: QuicStream, message: QuicMessage + ) async { + switch message.type { + case .received: + let messageID = UUID().uuidString + pendingMessages[messageID] = (connection, stream) + // Call messageHandler safely in the actor context + Task { [weak self] in + guard let self else { return } + await messageHandler?.didReceiveMessage(server: self, messageID: messageID, message: message) + } + default: + break + } + } + + public func didReceiveError( + connection _: QuicConnection, stream _: QuicStream, error: QuicError + ) async { + serverLogger.error("Failed to receive message: \(error)") + } +} diff --git a/Networking/Sources/Networking/msquic/QuicStatus.swift b/Networking/Sources/Networking/msquic/QuicStatus.swift index 1dcbe923..55dd0326 100644 --- a/Networking/Sources/Networking/msquic/QuicStatus.swift +++ b/Networking/Sources/Networking/msquic/QuicStatus.swift @@ -20,6 +20,10 @@ extension QuicStatus { Int32(bitPattern: self) <= 0 } + static var pending: UInt32 { + UInt32(bitPattern: -2) + } + init(_ value: UInt32?) { guard let value else { self = QuicStatusCode.unknown.rawValue diff --git a/Networking/Sources/Networking/msquic/QuicStream.swift b/Networking/Sources/Networking/msquic/QuicStream.swift index e0310b98..ac7d6916 100644 --- a/Networking/Sources/Networking/msquic/QuicStream.swift +++ b/Networking/Sources/Networking/msquic/QuicStream.swift @@ -1,14 +1,12 @@ -import Atomics import Foundation import Logging import msquic let streamLogger = Logger(label: "QuicStream") -public enum StreamKind { +public enum StreamKind: Sendable { case uniquePersistent case commonEphemeral - case unknown } public protocol QuicStreamMessageHandler: AnyObject { @@ -16,19 +14,17 @@ public protocol QuicStreamMessageHandler: AnyObject { func didReceiveError(_ stream: QuicStream, error: QuicError) } -public class QuicStream { +public class QuicStream: @unchecked Sendable { private var stream: HQuic? - private let api: UnsafePointer? + private let api: UnsafePointer private let connection: HQuic? public let kind: StreamKind private weak var messageHandler: QuicStreamMessageHandler? private var streamCallback: StreamCallback? - private var sendCompletion: CheckedContinuation? - private let isClosed: ManagedAtomic = .init(false) - + // private var sendCompletion: CheckedContinuation? // Initializer for creating a new stream init( - api: UnsafePointer?, connection: HQuic?, + api: UnsafePointer, connection: HQuic?, _ streamKind: StreamKind = .uniquePersistent, messageHandler: QuicStreamMessageHandler? = nil ) throws { @@ -46,14 +42,15 @@ public class QuicStream { // Initializer for wrapping an existing stream init( - api: UnsafePointer?, connection: HQuic?, stream: HQuic?, + api: UnsafePointer, connection: HQuic?, stream: HQuic?, + _ streamKind: StreamKind = .uniquePersistent, messageHandler: QuicStreamMessageHandler? = nil ) { self.api = api self.connection = connection self.messageHandler = messageHandler self.stream = stream - kind = .commonEphemeral + kind = streamKind streamCallback = { stream, context, event in QuicStream.streamCallback( stream: stream, context: context, event: event @@ -61,31 +58,24 @@ public class QuicStream { } } - // Deinitializer to ensure resources are cleaned up - deinit { - close() - streamLogger.trace("QuicStream Deinit") - } - // Opens a stream with the specified kind private func openStream(_: StreamKind = .commonEphemeral) throws { let status = - (api?.pointee.StreamOpen( + api.pointee.StreamOpen( connection, QUIC_STREAM_OPEN_FLAG_NONE, { stream, context, event -> QuicStatus in QuicStream.streamCallback(stream: stream, context: context, event: event) }, Unmanaged.passUnretained(self).toOpaque(), &stream - )).status + ) if status.isFailed { throw QuicError.invalidStatus(status: status.code) } - streamLogger.info("[\(String(describing: stream))] Stream opened") } // Starts the stream private func start() throws { try openStream(kind) - let status = (api?.pointee.StreamStart(stream, QUIC_STREAM_START_FLAG_NONE)).status + let status = api.pointee.StreamStart(stream, QUIC_STREAM_START_FLAG_NONE) if status.isFailed { throw QuicError.invalidStatus(status: status.code) } @@ -94,20 +84,18 @@ public class QuicStream { // Closes the stream and cleans up resources func close() { - if isClosed.compareExchange(expected: false, desired: true, ordering: .acquiring).exchanged { - streamCallback = nil - messageHandler = nil - if stream != nil { - api?.pointee.StreamClose(stream) - stream = nil - } - streamLogger.debug("QuicStream close") + streamCallback = nil + messageHandler = nil + if let stream { + api.pointee.StreamClose(stream) + self.stream = nil } + streamLogger.debug("QuicStream close") } // Sets the callback handler for the stream func setCallbackHandler() { - guard let api, let stream else { + guard let stream else { return } @@ -122,11 +110,10 @@ public class QuicStream { ) } - // Sends data over the stream and returns the status - func send(buffer: Data, kind: StreamKind? = nil) -> QuicStatus { + func send(with data: Data, kind: StreamKind? = nil) -> QuicStatus { streamLogger.info("[\(String(describing: stream))] Sending data...") var status = QuicStatusCode.success.rawValue - let messageLength = buffer.count + let messageLength = data.count let sendBufferRaw = UnsafeMutableRawPointer.allocate( byteCount: MemoryLayout.size + messageLength, @@ -137,7 +124,7 @@ public class QuicStream { let bufferPointer = UnsafeMutablePointer.allocate( capacity: messageLength ) - buffer.copyBytes(to: bufferPointer, count: messageLength) + data.copyBytes(to: bufferPointer, count: messageLength) sendBuffer.pointee.Buffer = bufferPointer sendBuffer.pointee.Length = UInt32(messageLength) @@ -145,64 +132,17 @@ public class QuicStream { // Use the provided kind if available, otherwise use the stream's kind let effectiveKind = kind ?? self.kind let flags = (effectiveKind == .uniquePersistent) ? QUIC_SEND_FLAG_NONE : QUIC_SEND_FLAG_FIN - - status = (api?.pointee.StreamSend(stream, sendBuffer, 1, flags, sendBufferRaw)).status + status = api.pointee.StreamSend(stream, sendBuffer, 1, QUIC_SEND_FLAG_NONE, sendBufferRaw) if status.isFailed { streamLogger.error("StreamSend failed, \(status)!") let shutdown: QuicStatus = - (api?.pointee.StreamShutdown(stream, QUIC_STREAM_SHUTDOWN_FLAG_ABORT, 0)).status + api.pointee.StreamShutdown(stream, QUIC_STREAM_SHUTDOWN_FLAG_ABORT, 0) if shutdown.isFailed { streamLogger.error("StreamShutdown failed, 0x\(String(format: "%x", shutdown))!") } } return status } - - // Sends data over the stream asynchronously and waits for the response - func send(buffer: Data, kind: StreamKind? = nil) async throws -> QuicMessage { - streamLogger.info("[\(String(describing: stream))] Sending data...") - var status = QuicStatusCode.success.rawValue - let messageLength = buffer.count - - let sendBufferRaw = UnsafeMutableRawPointer.allocate( - byteCount: MemoryLayout.size + messageLength, - alignment: MemoryLayout.alignment - ) - - let sendBuffer = sendBufferRaw.assumingMemoryBound(to: QuicBuffer.self) - let bufferPointer = UnsafeMutablePointer.allocate( - capacity: messageLength - ) - buffer.copyBytes(to: bufferPointer, count: messageLength) - - sendBuffer.pointee.Buffer = bufferPointer - sendBuffer.pointee.Length = UInt32(messageLength) - - // Use the provided kind if available, otherwise use the stream's kind - let effectiveKind = kind ?? self.kind - let flags = (effectiveKind == .uniquePersistent) ? QUIC_SEND_FLAG_NONE : QUIC_SEND_FLAG_FIN - - return try await withCheckedThrowingContinuation { [weak self] continuation in - guard let self else { - continuation.resume(throwing: QuicError.sendFailed) - return - } - sendCompletion = continuation - status = (api?.pointee.StreamSend(stream, sendBuffer, 1, flags, sendBufferRaw)).status - if status.isFailed { - streamLogger.error("StreamSend failed, \(status)!") - let shutdown: QuicStatus = - (api?.pointee.StreamShutdown(stream, QUIC_STREAM_SHUTDOWN_FLAG_ABORT, 0)).status - if shutdown.isFailed { - streamLogger.error( - "StreamShutdown failed, 0x\(String(format: "%x", shutdown))!" - ) - } - continuation.resume(throwing: QuicError.invalidStatus(status: status.code)) - sendCompletion = nil - } - } - } } extension QuicStream { @@ -216,15 +156,15 @@ extension QuicStream { let quicStream: QuicStream = Unmanaged.fromOpaque(context).takeUnretainedValue() var status: QuicStatus = QuicStatusCode.success.rawValue - streamLogger.info("[\(String(describing: stream))] Event: \(event.pointee.Type.rawValue)") switch event.pointee.Type { case QUIC_STREAM_EVENT_SEND_COMPLETE: if let clientContext = event.pointee.SEND_COMPLETE.ClientContext { free(clientContext) } - streamLogger.info("[\(String(describing: stream))] Data sent") + streamLogger.info("[\(String(describing: stream))] Stream send completed") case QUIC_STREAM_EVENT_RECEIVE: + let bufferCount: UInt32 = event.pointee.RECEIVE.BufferCount let buffers = event.pointee.RECEIVE.Buffers var receivedData = Data() @@ -232,41 +172,35 @@ extension QuicStream { let buffer = buffers![Int(i)] let bufferLength = Int(buffer.Length) let bufferData = Data(bytes: buffer.Buffer, count: bufferLength) - streamLogger.info( - " Data length \(bufferLength) bytes: \(String([UInt8](bufferData).map { Character(UnicodeScalar($0)) }))" - ) receivedData.append(bufferData) } + if event.pointee.RECEIVE.Flags.rawValue & QUIC_RECEIVE_FLAG_FIN.rawValue != 0 { + streamLogger.warning("[\(String(describing: stream))] FIN received in QUIC stream") + } + if receivedData.count > 0 { - if let continuation = quicStream.sendCompletion { - continuation.resume(returning: QuicMessage(type: .received, data: receivedData)) - quicStream.sendCompletion = nil - } quicStream.messageHandler?.didReceiveMessage( quicStream, message: QuicMessage(type: .received, data: receivedData) ) } case QUIC_STREAM_EVENT_PEER_SEND_SHUTDOWN: - streamLogger.info("[\(String(describing: stream))] Peer shut down") + streamLogger.warning("[\(String(describing: stream))] Peer send shutdown") + status = + quicStream.api.pointee.StreamShutdown(stream, QUIC_STREAM_SHUTDOWN_FLAG_GRACEFUL, 0) case QUIC_STREAM_EVENT_PEER_SEND_ABORTED: - streamLogger.error("[\(String(describing: stream))] Peer aborted") + streamLogger.error("[\(String(describing: stream))] Peer send aborted") status = - (quicStream.api?.pointee.StreamShutdown(stream, QUIC_STREAM_SHUTDOWN_FLAG_ABORT, 0)) - .status - quicStream.messageHandler?.didReceiveError( - quicStream, error: QuicError.invalidStatus(status: status.code) - ) + quicStream.api.pointee.StreamShutdown(stream, QUIC_STREAM_SHUTDOWN_FLAG_ABORT, 0) case QUIC_STREAM_EVENT_SHUTDOWN_COMPLETE: streamLogger.info("[\(String(describing: stream))] All done") if event.pointee.SHUTDOWN_COMPLETE.AppCloseInProgress == 0 { - quicStream.api?.pointee.StreamClose(stream) - } - if let continuation = quicStream.sendCompletion { - continuation.resume(throwing: QuicError.sendFailed) - quicStream.sendCompletion = nil + if let stream = quicStream.stream { + streamLogger.info("[\(String(describing: stream))] Stream closed") + quicStream.api.pointee.StreamClose(stream) + } } default: diff --git a/Networking/Tests/NetworkingTests/PeerTest.swift b/Networking/Tests/NetworkingTests/PeerTest.swift new file mode 100644 index 00000000..b63b8071 --- /dev/null +++ b/Networking/Tests/NetworkingTests/PeerTest.swift @@ -0,0 +1,148 @@ +import Foundation +import NIO +import Testing +import Utils + +@testable import Networking + +#if os(macOS) + import CoreFoundation + import Security +#endif + +struct Message: PeerMessage { + public let data: Data + public let type: PeerMessageType + public init(data: Data) { + self.data = data + type = .uniquePersistent + } + + public init(data: Data, type: PeerMessageType) { + self.data = data + self.type = type + } + + public func getMessageType() -> PeerMessageType { + type + } + + public func getData() -> Data { + data + } +} + +let cert = Bundle.module.path(forResource: "server", ofType: "cert")! +let keyFile = Bundle.module.path(forResource: "server", ofType: "key")! + +final class PeerTests { + @Test func testPeerCommunication() async throws { + let group = MultiThreadedEventLoopGroup(numberOfThreads: System.coreCount) + let eventBus1 = EventBus() + let eventBus2 = EventBus() + + // Create two Peer instances + let peer1 = try await Peer( + config: QuicConfig( + id: "public-key1", cert: cert, key: keyFile, alpn: "sample", + ipAddress: "127.0.0.1", port: 4568 + ), + eventBus: eventBus1 + ) + + let peer2 = try await Peer( + config: QuicConfig( + id: "public-key2", cert: cert, key: keyFile, alpn: "sample", + ipAddress: "127.0.0.1", port: 4569 + ), + eventBus: eventBus2 + ) + + // Subscribe to PeerMessageReceived for peer1 + let token1 = await eventBus1.subscribe(PeerMessageReceived.self) { event in + let message = String( + [UInt8](event.message.data!).map { Character(UnicodeScalar($0)) } + ) + print( + "Peer1 received message from messageID: \(event.messageID), message: \(message))" + ) + let status: QuicStatus = await peer1.respond( + to: event.messageID, with: event.message.data! + ) + print("Peer1 sent response: \(status.isFailed ? "Failed" : "Success")") + } + + // Subscribe to PeerMessageReceived for peer2 + let token2 = await eventBus2.subscribe(PeerMessageReceived.self) { event in + let message = String( + [UInt8](event.message.data!).map { Character(UnicodeScalar($0)) } + ) + print( + "Peer2 received message from messageID: \(event.messageID), message: \(message))" + ) + let status: QuicStatus = await peer2.respond( + to: event.messageID, with: event.message.data! + ) + print("Peer2 sent response: \(status.isFailed ? "Failed" : "Success")") + } + + // Schedule message sending after 5 seconds + _ = try await group.next().scheduleTask(in: .seconds(2)) { + Task { + do { + for i in 1 ... 5 { + let messageStatus2: QuicStatus = try await peer1.sendMessage( + to: NetAddr(ipAddress: "127.0.0.1", port: 4569), + with: Message( + data: Data("Hello from Peer1 - Message \(i)".utf8), + type: PeerMessageType.commonEphemeral + ) + ) + print("Peer1 sent message \(i): \(messageStatus2.isSucceeded ? "Success" : "Failed")") + + let messageStatus1: QuicStatus = try await peer2.sendMessage( + to: NetAddr(ipAddress: "127.0.0.1", port: 4568), + with: Message( + data: Data("Hello from Peer2 - Message \(i)".utf8), + type: PeerMessageType.commonEphemeral + ) + ) + print("Peer2 sent message \(i): \(messageStatus1.isSucceeded ? "Success" : "Failed")") + } + + for i in 6 ... 10 { + let messageStatus2: QuicStatus = try await peer1.sendMessage( + to: NetAddr(ipAddress: "127.0.0.1", port: 4569), + with: Message( + data: Data("Hello from Peer1 - Message \(i)".utf8), + type: PeerMessageType.uniquePersistent + ) + ) + print("Peer1 sent message \(i): \(messageStatus2.isSucceeded ? "Success" : "Failed")") + let messageStatus1: QuicStatus = try await peer2.sendMessage( + to: NetAddr(ipAddress: "127.0.0.1", port: 4568), + with: Message( + data: Data("Hello from Peer2 - Message \(i)".utf8), + type: PeerMessageType.uniquePersistent + ) + ) + print("Peer2 sent message \(i): \(messageStatus1.isSucceeded ? "Success" : "Failed")") + } + } catch { + print("Failed to send message: \(error)") + } + } + }.futureResult.get() + + _ = try await group.next().scheduleTask(in: .seconds(10)) { + Task { + await eventBus1.unsubscribe(token: token1) + await eventBus2.unsubscribe(token: token2) + print("eventBus unsubscribe") + } + }.futureResult.get() + try await group.next().scheduleTask(in: .seconds(5)) { + print("scheduleTask end") + }.futureResult.get() + } +} diff --git a/Networking/Tests/NetworkingTests/QuicClientTest.swift b/Networking/Tests/NetworkingTests/QuicClientTest.swift index cb8bea72..3e862a87 100644 --- a/Networking/Tests/NetworkingTests/QuicClientTest.swift +++ b/Networking/Tests/NetworkingTests/QuicClientTest.swift @@ -7,45 +7,93 @@ import Testing #if os(macOS) import CoreFoundation import Security +#endif +final class QuicClientTests { + @Test func start() async throws { + let group = MultiThreadedEventLoopGroup(numberOfThreads: System.coreCount) +// let quicServer = try await QuicServer( +// config: QuicConfig( +// id: "public-key", cert: cert, key: keyFile, alpn: "sample", +// ipAddress: "127.0.0.1", port: 4563 +// ), messageHandler: self +// ) + try await group.next().scheduleTask(in: .seconds(5)) {}.futureResult.get() + let quicClient = try await QuicClient( + config: QuicConfig( + id: "public-key", cert: cert, key: keyFile, alpn: "sample", + ipAddress: "127.0.0.1", port: 4563 + ), + messageHandler: self + ) - final class QuicClientTests { - @Test func start() async throws { - do { - let cert = Bundle.module.path(forResource: "server", ofType: "cert")! - let keyFile = Bundle.module.path(forResource: "server", ofType: "key")! - let group = MultiThreadedEventLoopGroup(numberOfThreads: System.coreCount) - let quicClient = try QuicClient( - config: QuicConfig( - id: "public-key", cert: cert, key: keyFile, alpn: "sample", - ipAddress: "127.0.0.1", port: 4569 + _ = try await group.next().scheduleTask(in: .seconds(2)) { + Task { + for i in 1 ... 10 { + let messageToPeer2: QuicStatus = try await quicClient.send( + data: Data("Hello from Client - Message \(i)".utf8), + streamKind: .commonEphemeral + ) + print("Client sent message \(i): \(messageToPeer2.isSucceeded ? "Success" : "Failed")") + let messageToPeer1: QuicStatus = try await quicClient.send( + data: Data("Hello from Client - Message \(i + 10)".utf8), + streamKind: .commonEphemeral ) - ) - let status = try quicClient.start() - print(status) - let message1 = try await quicClient.send( - message: Data("Hello, World!".utf8), streamKind: .uniquePersistent - ) - print("Client received 1: \(message1)") - let message2 = try await quicClient.send( - message: Data("Hello, swift!".utf8), streamKind: .commonEphemeral - ) - print("Client received 2: \(message2)") - let message3 = try await quicClient.send( - message: Data("Hello, how are you!".utf8), streamKind: .uniquePersistent - ) - print("Client received 3: \(message3)") - let message4 = try await quicClient.send( - message: Data("Hello, i am fine!".utf8), streamKind: .commonEphemeral - ) - print("Client received 4: \(message4)") - - try await group.next().scheduleTask(in: .seconds(5)) { - print("scheduleTask: 5s") - }.futureResult.get() - } catch { - // Handle the error if sending the message fails or if the connection fails - print("Failed about quic client: \(error)") + print("Client sent message \(i + 10): \(messageToPeer1.isSucceeded ? "Success" : "Failed")") + } } + }.futureResult.get() + + try await group.next().scheduleTask(in: .seconds(10)) { + print("scheduleTask: 5s") + }.futureResult.get() + } +} + +extension QuicClientTests: QuicClientMessageHandler { + func didReceiveMessage(quicClient _: QuicClient, message: QuicMessage) async { + switch message.type { + case .received: + let messageString = String( + [UInt8](message.data!).map { Character(UnicodeScalar($0)) } + ) + print("Client received message : \(messageString)") + case .shutdownComplete: + print("Client shutdown complete") + case .unknown: + print("Client unknown") + default: + break } } -#endif + + func didReceiveError(quicClient _: QuicClient, error: QuicError) async { + print("Client error: \(error)") + } +} + +extension QuicClientTests: QuicServerMessageHandler { + func didReceiveMessage(server: QuicServer, messageID: String, message: QuicMessage) async { + switch message.type { + case .received: + let messageString = String( + [UInt8](message.data!).map { Character(UnicodeScalar($0)) } + ) + print("Server received message : \(messageString)") + let status = await server.respondGetStatus(to: messageID, with: message.data!) + print("Server response message : \(status.isSucceeded ? "Success" : "Failed")") + if status.isFailed { + print("Server response failed with messageID: \(messageID) \nmessage: \(messageString) ") + } + case .shutdownComplete: + print("Server shutdown complete") + case .unknown: + print("Server unknown") + default: + break + } + } + + func didReceiveError(server _: QuicServer, messageID _: String, error: QuicError) async { + print("Server error: \(error)") + } +}