Skip to content

Commit

Permalink
Merge branch 'master' of github.com:AcalaNetwork/boka
Browse files Browse the repository at this point in the history
* 'master' of github.com:AcalaNetwork/boka:
  remove unused helper (#156)
  update actor (#147)
  • Loading branch information
MacOMNI committed Oct 9, 2024
2 parents 2f1e652 + 6586caf commit 8c2ebff
Show file tree
Hide file tree
Showing 14 changed files with 878 additions and 977 deletions.
148 changes: 148 additions & 0 deletions Networking/Sources/Networking/Peer.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
import Foundation
import Logging
import Utils

let peerLogger = Logger(label: "PeerServer")

public enum PeerMessageType: Sendable {
case uniquePersistent // most messages type
case commonEphemeral
}

public protocol PeerMessage: Equatable, Sendable {
func getData() -> Data
func getMessageType() -> PeerMessageType
}

public struct PeerMessageReceived: Event {
public let messageID: String
public let message: QuicMessage
}

// TODO: add error or remove it
public struct PeerErrorReceived: Event {
public let messageID: String?
public let error: QuicError
}

// Define the Peer actor
public actor Peer {
private let config: QuicConfig
private var quicServer: QuicServer!
private var clients: [NetAddr: QuicClient]
private let eventBus: EventBus

public init(config: QuicConfig, eventBus: EventBus) async throws {
self.config = config
self.eventBus = eventBus
clients = [:]
quicServer = try await QuicServer(config: config, messageHandler: self)
}

deinit {
Task { [weak self] in
guard let self else { return }

var clients = await self.clients
for client in clients.values {
await client.close()
}
clients.removeAll()
await self.quicServer.close()
}
}

// Respond to a message with a specific messageID using Data
func respond(to messageID: String, with data: Data) async -> QuicStatus {
await quicServer.respondGetStatus(to: messageID, with: data)
}

// Respond to a message with a specific messageID using PeerMessage
func respond(to messageID: String, with message: any PeerMessage) async -> QuicStatus {
let messageType = message.getMessageType()
return
await quicServer
.respondGetStatus(
to: messageID,
with: message.getData(),
kind: (messageType == .uniquePersistent) ? .uniquePersistent : .commonEphemeral
)
}

// Sends a message to another peer and returns the status
func sendMessage(to peer: NetAddr, with message: any PeerMessage) async throws -> QuicStatus {
let buffer = message.getData()
let messageType = message.getMessageType()
return try await sendDataToPeer(buffer, to: peer, messageType: messageType)
}

private func sendDataToPeer(_ data: Data, to peerAddr: NetAddr, messageType: PeerMessageType)
async throws -> QuicStatus
{
if let client = clients[peerAddr] {
// Client already exists, use it to send the data
return try await client.send(
data: data,
streamKind: messageType == .uniquePersistent ? .uniquePersistent : .commonEphemeral
)
} else {
let config = QuicConfig(
id: config.id, cert: config.cert, key: config.key, alpn: config.alpn,
ipAddress: peerAddr.ipAddress, port: peerAddr.port
)
// Client does not exist, create a new one
let client = try await QuicClient(config: config, messageHandler: self)
clients[peerAddr] = client
return try await client.send(
data: data,
streamKind: messageType == .uniquePersistent ? .uniquePersistent : .commonEphemeral
)
}
}

private func removeClient(client: QuicClient) async {
let peerAddr = await client.getNetAddr()
await client.close()
_ = clients.removeValue(forKey: peerAddr)
}

func getPeerAddr() -> String {
"\(config.ipAddress):\(config.port)"
}
}

// QuicClientMessageHandler methods
extension Peer: QuicClientMessageHandler {
public func didReceiveMessage(quicClient: QuicClient, message: QuicMessage) async {
switch message.type {
case .shutdownComplete:
await removeClient(client: quicClient)
default:
break
}
}

public func didReceiveError(quicClient _: QuicClient, error: QuicError) async {
peerLogger.error("Failed to receive message: \(error)")
await eventBus.publish(PeerErrorReceived(messageID: nil, error: error))
}
}

// QuicServerMessageHandler methods
extension Peer: QuicServerMessageHandler {
public func didReceiveMessage(server _: QuicServer, messageID: String, message: QuicMessage) async {
switch message.type {
case .received:
await eventBus.publish(PeerMessageReceived(messageID: messageID, message: message))
case .shutdownComplete:
peerLogger.info("quic server shutdown")
default:
break
}
}

public func didReceiveError(server _: QuicServer, messageID: String, error: QuicError) async {
peerLogger.error("Failed to receive message: \(error)")
await eventBus.publish(PeerErrorReceived(messageID: messageID, error: error))
}
}
130 changes: 38 additions & 92 deletions Networking/Sources/Networking/msquic/QuicClient.swift
Original file line number Diff line number Diff line change
@@ -1,27 +1,24 @@
import Atomics
import Foundation
import Logging
import msquic
import NIO

let clientLogger = Logger(label: "QuicClient")

public protocol QuicClientMessageHandler: AnyObject {
func didReceiveMessage(quicClient: QuicClient, message: QuicMessage)
// TODO: add error or remove it
func didReceiveError(quicClient: QuicClient, error: QuicError)
public protocol QuicClientMessageHandler: AnyObject, Sendable {
func didReceiveMessage(quicClient: QuicClient, message: QuicMessage) async
func didReceiveError(quicClient: QuicClient, error: QuicError) async
}

public class QuicClient: @unchecked Sendable {
private var api: UnsafePointer<QuicApiTable>?
public actor QuicClient: Sendable, QuicConnectionMessageHandler {
private var api: UnsafePointer<QuicApiTable>
private var registration: HQuic?
private var configuration: HQuic?
private var connection: QuicConnection?
private let config: QuicConfig
private weak var messageHandler: QuicClientMessageHandler?
private let isClosed: ManagedAtomic<Bool> = .init(false)

init(config: QuicConfig, messageHandler: QuicClientMessageHandler? = nil) throws {
public init(config: QuicConfig, messageHandler: QuicClientMessageHandler? = nil) async throws {
self.config = config
self.messageHandler = messageHandler
var rawPointer: UnsafeRawPointer?
Expand All @@ -47,101 +44,51 @@ public class QuicClient: @unchecked Sendable {

api = boundPointer
registration = registrationHandle
}

deinit {
close()
clientLogger.trace("QuicClient Deinit")
}

func start() throws -> QuicStatus {
let status = QuicStatusCode.success.rawValue
try loadConfiguration()
try config.loadConfiguration(
api: api, registration: registration, configuration: &configuration
)
connection = try QuicConnection(
api: api, registration: registration, configuration: configuration, messageHandler: self
)
try connection?.start(ipAddress: config.ipAddress, port: config.port)
return status
}

// Asynchronous send method that waits for a QuicMessage reply
func send(message: Data) async throws -> QuicMessage {
try await send(message: message, streamKind: .uniquePersistent)
}

// send method that returns a QuicStatus
func send(message: Data, streamKind: StreamKind) throws -> QuicStatus {
guard let connection else {
throw QuicError.getConnectionFailed
}
let sendStream: QuicStream
// Check if there is an existing stream of the same kind
= if streamKind == .uniquePersistent
{
// If there is, send the message to the existing stream
try connection.createOrGetUniquePersistentStream(kind: streamKind)
} else {
// If there is not, create a new stream
try connection.createCommonEphemeralStream()
}
return sendStream.send(buffer: message, kind: streamKind)
}

// Asynchronous send method that waits for a QuicMessage reply
func send(message: Data, streamKind: StreamKind = .uniquePersistent) async throws -> QuicMessage {
// Send method that returns a QuicStatus
public func send(data: Data, streamKind: StreamKind) async throws -> QuicStatus {
guard let connection else {
throw QuicError.getConnectionFailed
}
let sendStream: QuicStream
// Check if there is an existing stream of the same kind
= if streamKind == .uniquePersistent
{
// If there is, send the message to the existing stream
try connection.createOrGetUniquePersistentStream(kind: streamKind)
} else {
// If there is not, create a new stream
try connection.createCommonEphemeralStream()
}
return try await sendStream.send(buffer: message)
let sendStream: QuicStream =
if streamKind == .uniquePersistent {
try await connection.createOrGetUniquePersistentStream(kind: streamKind)
} else {
try await connection.createCommonEphemeralStream()
}
return sendStream.send(with: data, kind: streamKind)
}

func getNetAddr() -> NetAddr {
NetAddr(ipAddress: config.ipAddress, port: config.port)
}

func close() {
if isClosed.compareExchange(expected: false, desired: true, ordering: .acquiring).exchanged {
if let connection {
connection.close()
self.connection = nil
}

if let configuration {
api?.pointee.ConfigurationClose(configuration)
self.configuration = nil
}

if let registration {
api?.pointee.RegistrationClose(registration)
self.registration = nil
}
public func close() async {
if let connection {
await connection.close()
self.connection = nil
}

if api != nil {
MsQuicClose(api)
api = nil
}
if let configuration {
api.pointee.ConfigurationClose(configuration)
self.configuration = nil
}

if let messageHandler {
messageHandler.didReceiveMessage(
quicClient: self, message: QuicMessage(type: .close, data: nil)
)
}
clientLogger.debug("QuicClient Close")
if let registration {
api.pointee.RegistrationClose(registration)
self.registration = nil
}
MsQuicClose(api)
}
}

extension QuicClient: QuicConnectionMessageHandler {
public func didReceiveMessage(
connection _: QuicConnection, stream _: QuicStream?, message: QuicMessage
) {
Expand All @@ -153,10 +100,15 @@ extension QuicClient: QuicConnectionMessageHandler {
)

case .shutdownComplete:
// Use [weak self] to avoid strong reference cycle
clientLogger.info(
"Client[\(getNetAddr())] shutdown"
)
// Call messageHandler safely in the actor context
Task { [weak self] in
guard let self else { return }
close()
await messageHandler?.didReceiveMessage(
quicClient: self, message: QuicMessage(type: .shutdownComplete, data: nil)
)
}

default:
Expand All @@ -170,9 +122,3 @@ extension QuicClient: QuicConnectionMessageHandler {
clientLogger.error("Failed to receive message: \(error)")
}
}

extension QuicClient {
private func loadConfiguration() throws {
try config.loadConfiguration(api: api, registration: registration, configuration: &configuration)
}
}
14 changes: 7 additions & 7 deletions Networking/Sources/Networking/msquic/QuicConfig.swift
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import Foundation
import msquic

public struct QuicConfig {
public struct QuicConfig: Sendable {
public let id: String
public let cert: String
public let key: String
Expand All @@ -20,7 +20,7 @@ public struct QuicConfig {
settings.IsSet.IdleTimeoutMs = 1
settings.ServerResumptionLevel = 2 // QUIC_SERVER_RESUME_AND_ZERORTT
settings.IsSet.ServerResumptionLevel = 1
settings.PeerBidiStreamCount = 1
settings.PeerBidiStreamCount = 100
settings.IsSet.PeerBidiStreamCount = 1

// Use withCString to avoid manual memory management
Expand All @@ -38,11 +38,11 @@ public struct QuicConfig {
QuicCredentialConfig.__Unnamed_union___Anonymous_field2(
CertificateFile: certFilePointer
),
Principal: nil, // Not needed in this context
Reserved: nil, // Not needed in this context
AsyncHandler: nil, // Not needed in this context
AllowedCipherSuites: QUIC_ALLOWED_CIPHER_SUITE_NONE, // Default value
CaCertificateFile: nil // Not needed in this context
Principal: nil,
Reserved: nil,
AsyncHandler: nil,
AllowedCipherSuites: QUIC_ALLOWED_CIPHER_SUITE_NONE,
CaCertificateFile: nil
)

// Convert ALPN to data buffer
Expand Down
Loading

0 comments on commit 8c2ebff

Please sign in to comment.