Skip to content

Commit

Permalink
update peer
Browse files Browse the repository at this point in the history
  • Loading branch information
MacOMNI committed Oct 8, 2024
1 parent 456562c commit ce8f542
Show file tree
Hide file tree
Showing 10 changed files with 75 additions and 117 deletions.
56 changes: 6 additions & 50 deletions Networking/Sources/Networking/Peer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@ public protocol PeerMessage: Equatable, Sendable {
}

public struct PeerMessageReceived: Event {
public let messageID: Int64
public let messageID: String
public let message: QuicMessage
}

// TODO: add error or remove it
public struct PeerErrorReceived: Event {
public let messageID: Int64?
public let messageID: String?
public let error: QuicError
}

Expand Down Expand Up @@ -53,12 +53,12 @@ public actor Peer {
}

// Respond to a message with a specific messageID using Data
func respond(to messageID: Int64, with data: Data) async -> QuicStatus {
func respond(to messageID: String, with data: Data) async -> QuicStatus {
await quicServer.respondGetStatus(to: messageID, with: data)
}

// Respond to a message with a specific messageID using PeerMessage
func respond(to messageID: Int64, with message: any PeerMessage) async -> QuicStatus {
func respond(to messageID: String, with message: any PeerMessage) async -> QuicStatus {
let messageType = message.getMessageType()
return
await quicServer
Expand All @@ -69,57 +69,13 @@ public actor Peer {
)
}

// // Respond to a message with a specific messageID using PeerMessage (async throws)
// func respond(to messageID: Int64, with message: any PeerMessage) async throws {
// let messageType = message.getMessageType()
// let quicMessage = try await quicServer.respondGetMessage(
// to: messageID, with: message.getData(),
// kind: (messageType == .uniquePersistent) ? .uniquePersistent : .commonEphemeral
// )
// if quicMessage.type != .received {
// throw QuicError.sendFailed
// }
// }

// // Sends a message to another peer asynchronously
// func sendMessage(to peer: NetAddr, with message: any PeerMessage) async throws -> QuicMessage {
// let buffer = message.getData()
// let messageType = message.getMessageType()
// return try await sendDataToPeer(buffer, to: peer, messageType: messageType)
// }

// Sends a message to another peer and returns the status
func sendMessage(to peer: NetAddr, with message: any PeerMessage) async throws -> QuicStatus {
let buffer = message.getData()
let messageType = message.getMessageType()
return try await sendDataToPeer(buffer, to: peer, messageType: messageType)
}

// // send message to other peer wait for response quicMessage
// private func sendDataToPeer(_ data: Data, to peerAddr: NetAddr, messageType: PeerMessageType)
// async throws -> QuicMessage
// {
// if let client = clients[peerAddr] {
// // Client already exists, use it to send the data
// return try await client.send(
// message: data,
// streamKind: messageType == .uniquePersistent ? .uniquePersistent : .commonEphemeral
// )
// } else {
// let config = QuicConfig(
// id: config.id, cert: config.cert, key: config.key, alpn: config.alpn,
// ipAddress: peerAddr.ipAddress, port: peerAddr.port
// )
// // Client does not exist, create a new one
// let client = try await QuicClient(config: config, messageHandler: self)
// clients[peerAddr] = client
// return try await client.send(
// message: data,
// streamKind: messageType == .uniquePersistent ? .uniquePersistent : .commonEphemeral
// )
// }
// }

private func sendDataToPeer(_ data: Data, to peerAddr: NetAddr, messageType: PeerMessageType)
async throws -> QuicStatus
{
Expand Down Expand Up @@ -174,7 +130,7 @@ extension Peer: QuicClientMessageHandler {

// QuicServerMessageHandler methods
extension Peer: QuicServerMessageHandler {
public func didReceiveMessage(server _: QuicServer, messageID: Int64, message: QuicMessage) async {
public func didReceiveMessage(server _: QuicServer, messageID: String, message: QuicMessage) async {
switch message.type {
case .received:
await eventBus.publish(PeerMessageReceived(messageID: messageID, message: message))
Expand All @@ -185,7 +141,7 @@ extension Peer: QuicServerMessageHandler {
}
}

public func didReceiveError(server _: QuicServer, messageID: Int64, error: QuicError) async {
public func didReceiveError(server _: QuicServer, messageID: String, error: QuicError) async {
peerLogger.error("Failed to receive message: \(error)")
await eventBus.publish(PeerErrorReceived(messageID: messageID, error: error))
}
Expand Down
2 changes: 1 addition & 1 deletion Networking/Sources/Networking/msquic/QuicClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public actor QuicClient: Sendable, QuicConnectionMessageHandler {
} else {
try await connection.createCommonEphemeralStream()
}
return sendStream.respond(with: data, kind: streamKind)
return sendStream.send(with: data, kind: streamKind)
}

func getNetAddr() -> NetAddr {
Expand Down
4 changes: 2 additions & 2 deletions Networking/Sources/Networking/msquic/QuicConfig.swift
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ public struct QuicConfig: Sendable {
) throws {
// Initialize QUIC settings
var settings = QuicSettings()
settings.IdleTimeoutMs = 10000
settings.IdleTimeoutMs = 30000
settings.IsSet.IdleTimeoutMs = 1
settings.ServerResumptionLevel = 2 // QUIC_SERVER_RESUME_AND_ZERORTT
settings.IsSet.ServerResumptionLevel = 1
settings.PeerBidiStreamCount = 1
settings.PeerBidiStreamCount = 100
settings.IsSet.PeerBidiStreamCount = 1

// Use withCString to avoid manual memory management
Expand Down
20 changes: 5 additions & 15 deletions Networking/Sources/Networking/msquic/QuicConnection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,6 @@ actor StreamManager {
}
}

func changeTypeToCommon(_ stream: QuicStream) {
removeStream(stream)
stream.changeTypeToCommon()
addCommonStream(stream)
}

func closeAllCommonStreams() {
for stream in commonStreams {
stream.close()
Expand Down Expand Up @@ -242,7 +236,6 @@ extension QuicConnection {
stream: nil,
message: QuicMessage(type: .shutdownComplete, data: nil)
)
quicConnection.messageHandler = nil
}
}

Expand Down Expand Up @@ -279,12 +272,6 @@ extension QuicConnection: QuicStreamMessageHandler {
stream.close()
await streamManager.removeStream(stream)
}
case .changeStreamType:
Task {
if stream.kind == .uniquePersistent {
await streamManager.changeTypeToCommon(stream)
}
}
default:
break
}
Expand All @@ -298,8 +285,11 @@ extension QuicConnection: QuicStreamMessageHandler {
}

// Handles errors received from the stream
public func didReceiveError(_: QuicStream, error: QuicError) {
public func didReceiveError(_ stream: QuicStream, error: QuicError) {
logger.error("Failed to receive message: \(error)")
// await messageHandler?.didReceiveError(connection: self, stream: stream, error: error)
Task { [weak self] in
guard let self else { return }
await messageHandler?.didReceiveError(connection: self, stream: stream, error: error)
}
}
}
1 change: 0 additions & 1 deletion Networking/Sources/Networking/msquic/QuicListener.swift
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,6 @@ public class QuicListener: @unchecked Sendable {
await listener.connectionsManager.add(quicConnection)
}
status = quicConnection.setCallbackHandler()

default:
break
}
Expand Down
1 change: 0 additions & 1 deletion Networking/Sources/Networking/msquic/QuicMessage.swift
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ enum QuicMessageType: String, Codable {
case close
case connected
case shutdownComplete
case changeStreamType
}

public struct QuicMessage: Sendable, Equatable, Codable {
Expand Down
13 changes: 6 additions & 7 deletions Networking/Sources/Networking/msquic/QuicServer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import Utils
let serverLogger: Logger = .init(label: "QuicServer")

public protocol QuicServerMessageHandler: AnyObject, Sendable {
func didReceiveMessage(server: QuicServer, messageID: Int64, message: QuicMessage) async
func didReceiveError(server: QuicServer, messageID: Int64, error: QuicError) async
func didReceiveMessage(server: QuicServer, messageID: String, message: QuicMessage) async
func didReceiveError(server: QuicServer, messageID: String, error: QuicError) async
}

public actor QuicServer: Sendable, QuicListenerMessageHandler {
Expand All @@ -17,7 +17,7 @@ public actor QuicServer: Sendable, QuicListenerMessageHandler {
private var listener: QuicListener?
private let config: QuicConfig
private weak var messageHandler: QuicServerMessageHandler?
private var pendingMessages: [Int64: (QuicConnection, QuicStream)]
private var pendingMessages: [String: (QuicConnection, QuicStream)]

init(config: QuicConfig, messageHandler: QuicServerMessageHandler? = nil) async throws {
self.config = config
Expand Down Expand Up @@ -70,14 +70,14 @@ public actor QuicServer: Sendable, QuicListenerMessageHandler {
}

// Respond to a message with a specific messageID using Data
func respondGetStatus(to messageID: Int64, with data: Data, kind: StreamKind? = nil) async
func respondGetStatus(to messageID: String, with data: Data, kind: StreamKind? = nil) async
-> QuicStatus
{
var status = QuicStatusCode.internalError.rawValue
if let (_, stream) = pendingMessages[messageID] {
let streamKind = kind ?? stream.kind
pendingMessages.removeValue(forKey: messageID)
status = stream.respond(with: data, kind: streamKind)
status = stream.send(with: data, kind: streamKind)
} else {
serverLogger.error("Message not found")
}
Expand All @@ -89,9 +89,8 @@ public actor QuicServer: Sendable, QuicListenerMessageHandler {
) async {
switch message.type {
case .received:
let messageID = Int64(Date().timeIntervalSince1970 * 1000)
let messageID = UUID().uuidString
pendingMessages[messageID] = (connection, stream)

// Call messageHandler safely in the actor context
Task { [weak self] in
guard let self else { return }
Expand Down
29 changes: 7 additions & 22 deletions Networking/Sources/Networking/msquic/QuicStream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public class QuicStream: @unchecked Sendable {
private var stream: HQuic?
private let api: UnsafePointer<QuicApiTable>
private let connection: HQuic?
public private(set) var kind: StreamKind
public let kind: StreamKind
private weak var messageHandler: QuicStreamMessageHandler?
private var streamCallback: StreamCallback?
// private var sendCompletion: CheckedContinuation<QuicMessage, Error>?
Expand Down Expand Up @@ -93,10 +93,6 @@ public class QuicStream: @unchecked Sendable {
streamLogger.debug("QuicStream close")
}

func changeTypeToCommon() {
kind = .commonEphemeral
}

// Sets the callback handler for the stream
func setCallbackHandler() {
guard let stream else {
Expand All @@ -114,8 +110,8 @@ public class QuicStream: @unchecked Sendable {
)
}

func respond(with data: Data, kind: StreamKind? = nil) -> QuicStatus {
streamLogger.info("[\(String(describing: stream))] Respond data...")
func send(with data: Data, kind: StreamKind? = nil) -> QuicStatus {
streamLogger.info("[\(String(describing: stream))] Sending data...")
var status = QuicStatusCode.success.rawValue
let messageLength = data.count

Expand All @@ -136,8 +132,8 @@ public class QuicStream: @unchecked Sendable {
// Use the provided kind if available, otherwise use the stream's kind
let effectiveKind = kind ?? self.kind
let flags = (effectiveKind == .uniquePersistent) ? QUIC_SEND_FLAG_NONE : QUIC_SEND_FLAG_FIN
streamLogger.info("stream response flags: \((effectiveKind == .uniquePersistent) ? "QUIC_SEND_FLAG_NONE" : "QUIC_SEND_FLAG_FIN")")
status = api.pointee.StreamSend(stream, sendBuffer, 1, flags, sendBufferRaw)
// streamLogger.info("stream response flags: \((effectiveKind == .uniquePersistent) ? "QUIC_SEND_FLAG_NONE" : "QUIC_SEND_FLAG_FIN")")
status = api.pointee.StreamSend(stream, sendBuffer, 1, QUIC_SEND_FLAG_NONE, sendBufferRaw)
if status.isFailed {
streamLogger.error("StreamSend failed, \(status)!")
let shutdown: QuicStatus =
Expand Down Expand Up @@ -181,29 +177,18 @@ extension QuicStream {
}
if event.pointee.RECEIVE.Flags.rawValue & QUIC_RECEIVE_FLAG_FIN.rawValue != 0 {
streamLogger.warning("[\(String(describing: stream))] FIN received in QUIC stream")
// quicStream.messageHandler?.didReceiveMessage(quicStream, message: QuicMessage(type: .changeStreamType, data: nil))
// quicStream.kind = .commonEphemeral
}
let messageString = String(
[UInt8](receivedData).map { Character(UnicodeScalar($0)) }
)
streamLogger.info("[\(String(describing: stream))] RECEIVE message \(messageString)")

if receivedData.count > 0 {
quicStream.messageHandler?.didReceiveMessage(
quicStream, message: QuicMessage(type: .received, data: receivedData)
)
}
// streamLogger.info("[\(String(describing: stream))] \(QuicStatus.pending.value)")

// return QuicStatus.pending

case QUIC_STREAM_EVENT_PEER_SEND_SHUTDOWN:
streamLogger.warning("[\(String(describing: stream))] Peer send shutdown")
if quicStream.kind == .uniquePersistent {
status =
quicStream.api.pointee.StreamShutdown(stream, QUIC_STREAM_SHUTDOWN_FLAG_GRACEFUL, 0)
}
status =
quicStream.api.pointee.StreamShutdown(stream, QUIC_STREAM_SHUTDOWN_FLAG_GRACEFUL, 0)

case QUIC_STREAM_EVENT_PEER_SEND_ABORTED:
streamLogger.error("[\(String(describing: stream))] Peer send aborted")
Expand Down
Loading

0 comments on commit ce8f542

Please sign in to comment.