Skip to content

Commit

Permalink
JAMNP (#172)
Browse files Browse the repository at this point in the history
* basic done

* avoid dispatch queue
  • Loading branch information
xlc authored Oct 16, 2024
1 parent 63206b9 commit 1024df2
Show file tree
Hide file tree
Showing 19 changed files with 538 additions and 66 deletions.
8 changes: 6 additions & 2 deletions Blockchain/Sources/Blockchain/Validator/ServiceBase2.swift
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,17 @@ public class ServiceBase2: ServiceBase, @unchecked Sendable {
let cancellables = cancellables
let cancellable = scheduler.schedule(id: id, delay: delay, repeats: repeats) {
if !repeats {
cancellables.write { $0.remove(IdCancellable(id: id, cancellable: nil)) }
cancellables.write {
$0.remove(IdCancellable(id: id, cancellable: nil))
}
}
await task()
} onCancel: {
cancellables.write { $0.remove(IdCancellable(id: id, cancellable: nil)) }
}
cancellables.write { $0.insert(IdCancellable(id: id, cancellable: cancellable)) }
cancellables.write {
$0.insert(IdCancellable(id: id, cancellable: cancellable))
}
return cancellable
}

Expand Down
4 changes: 2 additions & 2 deletions Blockchain/Tests/BlockchainTests/MockScheduler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ final class MockScheduler: Scheduler, Sendable {
storage.tasks.insert(task)
}
return Cancellable {
self.storage.mutate { storage in
self.storage.write { storage in
if let index = storage.tasks.array.firstIndex(where: { $0.id == id }) {
let task = storage.tasks.remove(at: index)
task.cancel?()
Expand All @@ -80,7 +80,7 @@ final class MockScheduler: Scheduler, Sendable {
}

func advanceNext(to time: TimeInterval) async -> Bool {
let task: SchedulerTask? = storage.mutate { storage in
let task: SchedulerTask? = storage.write { storage in
if let task = storage.tasks.array.first, task.scheduleTime <= time {
storage.tasks.remove(at: 0)
return task
Expand Down
2 changes: 1 addition & 1 deletion Boka/Sources/BokaLogger.swift
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public struct BokaLogger<T: LoggerFragment>: LogHandler, Sendable {
}

let defaultLevel = defaultLevel
return filters.mutate { filters in
return filters.write { filters in
for (key, value) in filters where label.hasPrefix(key) {
filters[label] = value
return value
Expand Down
2 changes: 2 additions & 0 deletions Networking/Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@ let package = Package(
.package(url: "https://github.com/apple/swift-log.git", from: "1.6.0"),
.package(url: "https://github.com/apple/swift-certificates.git", from: "1.5.0"),
.package(url: "https://github.com/apple/swift-testing.git", branch: "0.10.0"),
.package(url: "https://github.com/gh123man/Async-Channels.git", from: "1.0.0"),
],
targets: [
.target(
name: "Networking",
dependencies: [
"MsQuicSwift",
.product(name: "AsyncChannels", package: "Async-Channels"),
.product(name: "Logging", package: "swift-log"),
.product(name: "X509", package: "swift-certificates"),
]
Expand Down
1 change: 1 addition & 0 deletions Networking/Sources/MsQuicSwift/NetAddr.swift
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ public struct NetAddr: Hashable, Sendable {
self.ipAddress = ipAddress
self.port = port
self.ipv4 = ipv4
// TODO: automatically determine the ip address family
}

public init(quicAddr: QUIC_ADDR) {
Expand Down
14 changes: 12 additions & 2 deletions Networking/Sources/MsQuicSwift/QuicConnection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public final class QuicConnection: Sendable {

public func connect(to address: NetAddr) throws {
logger.debug("connecting to \(address)")
try storage.mutate { storage in
try storage.write { storage in
guard var storage2 = storage else {
throw QuicError.alreadyClosed
}
Expand All @@ -122,7 +122,7 @@ public final class QuicConnection: Sendable {

public func shutdown(errorCode: QuicErrorCode = .success) throws {
logger.debug("closing connection")
try storage.mutate { storage in
try storage.write { storage in
guard let storage2 = storage else {
throw QuicError.alreadyClosed
}
Expand Down Expand Up @@ -285,3 +285,13 @@ private func connectionCallback(

return handle.callbackHandler(event: event!).rawValue
}

extension QuicConnection: Hashable {
public func hash(into hasher: inout Hasher) {
hasher.combine(ObjectIdentifier(self))
}

public static func == (lhs: QuicConnection, rhs: QuicConnection) -> Bool {
ObjectIdentifier(lhs) == ObjectIdentifier(rhs)
}
}
2 changes: 1 addition & 1 deletion Networking/Sources/MsQuicSwift/QuicStatus.swift
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public enum QuicStatus: Equatable, Sendable, Codable, RawRepresentable {
}

extension QuicStatus {
var isSucceeded: Bool {
public var isSucceeded: Bool {
switch self {
case let .code(code):
Int32(bitPattern: code.rawValue) <= 0
Expand Down
14 changes: 12 additions & 2 deletions Networking/Sources/MsQuicSwift/QuicStream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public final class QuicStream: Sendable {
public func shutdown(errorCode: QuicErrorCode = .success) throws {
logger.debug("closing stream \(errorCode)")

try storage.mutate { storage in
try storage.write { storage in
guard let storage2 = storage else {
throw QuicError.alreadyClosed
}
Expand All @@ -88,7 +88,7 @@ public final class QuicStream: Sendable {
}
}

public func send(with data: Data, startStream: Bool = false, closeStream: Bool = false) throws {
public func send(data: Data, startStream: Bool = false, closeStream: Bool = false) throws {
logger.trace("Sending \(data.count) bytes")

try storage.read { storage in
Expand Down Expand Up @@ -241,3 +241,13 @@ private func streamCallback(

return handle.callbackHandler(event: event!).rawValue
}

extension QuicStream: Hashable {
public func hash(into hasher: inout Hasher) {
hasher.combine(ObjectIdentifier(self))
}

public static func == (lhs: QuicStream, rhs: QuicStream) -> Bool {
ObjectIdentifier(lhs) == ObjectIdentifier(rhs)
}
}
7 changes: 5 additions & 2 deletions Networking/Sources/Networking/Alpn.swift
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ import Utils
public struct Alpn: Sendable {
public let data: Data
private static let headerPrefixLength = 8
init(_ protocolName: String = "jamnp-s", version: String = "0", genesisHeader: Data32) {
data = Data("\(protocolName)/\(version)/\(genesisHeader.toHexString().prefix(Alpn.headerPrefixLength))".utf8)
init(protocolName: String = "jamnp-s", version: String = "0", genesisHeader: Data32, builder: Bool) {
let header: String.SubSequence = genesisHeader.toHexString().prefix(Alpn.headerPrefixLength)
data = Data(
"\(protocolName)/\(version)/\(header)\(builder ? "/builder" : "")".utf8
)
}
}
68 changes: 68 additions & 0 deletions Networking/Sources/Networking/Connection.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import Foundation
import MsQuicSwift
import TracingUtils
import Utils

private let logger = Logger(label: "Connection")

public final class Connection: Sendable {
let connection: QuicConnection
let impl: PeerImpl
let mode: PeerMode
let remoteAddress: NetAddr
let presistentStreams: ThreadSafeContainer<[UniquePresistentStreamKind: Stream]> = .init([:])

init(_ connection: QuicConnection, impl: PeerImpl, mode: PeerMode, remoteAddress: NetAddr) {
self.connection = connection
self.impl = impl
self.mode = mode
self.remoteAddress = remoteAddress
}

public func getStream(kind: UniquePresistentStreamKind) throws -> Stream {
let stream = presistentStreams.read { presistentStreams in
presistentStreams[kind]
}
return try stream ?? presistentStreams.write { presistentStreams in
if let stream = presistentStreams[kind] {
return stream
}
let stream = try self.createStream(kind: kind.rawValue)
presistentStreams[kind] = stream
return stream
}
}

private func createStream(kind: UInt8) throws -> Stream {
let stream = try Stream(connection.createStream(), impl: impl)
impl.addStream(stream)
try stream.send(data: Data([kind]))
return stream
}

public func createStream(kind: CommonEphemeralStreamKind) throws -> Stream {
try createStream(kind: kind.rawValue)
}

func streamStarted(stream: QuicStream) {
let stream = Stream(stream, impl: impl)
impl.addStream(stream)
Task {
guard let byte = await stream.receiveByte() else {
logger.debug("stream closed without receiving kind. status: \(stream.status)")
return
}
if let upKind = UniquePresistentStreamKind(rawValue: byte) {
// TODO: handle duplicated UP streams
presistentStreams.write { presistentStreams in
presistentStreams[upKind] = stream
}
return
}
if let ceKind = CommonEphemeralStreamKind(rawValue: byte) {
logger.debug("stream opened. kind: \(ceKind)")
// TODO: handle requests
}
}
}
}
Loading

0 comments on commit 1024df2

Please sign in to comment.