Skip to content

Commit

Permalink
update reopen up stream (#218)
Browse files Browse the repository at this point in the history
* update reopen up stream

* update reopen up stream

* update reopen up stream

* update test
  • Loading branch information
MacOMNI authored Nov 13, 2024
1 parent 123f14d commit e3879eb
Show file tree
Hide file tree
Showing 5 changed files with 245 additions and 37 deletions.
1 change: 0 additions & 1 deletion Networking/Sources/MsQuicSwift/QuicStream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,6 @@ private class StreamHandle {

case QUIC_STREAM_EVENT_PEER_SEND_ABORTED:
logger.trace("Peer send aborted")
// TODO: check if we need to close the stream completely

case QUIC_STREAM_EVENT_SHUTDOWN_COMPLETE:
logger.trace("Stream shutdown complete")
Expand Down
26 changes: 25 additions & 1 deletion Networking/Sources/Networking/Connection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -234,13 +234,37 @@ public final class Connection<Handler: StreamHandler>: Sendable, ConnectionInfoP
return
}
if let upKind = Handler.PresistentHandler.StreamKind(rawValue: byte) {
// TODO: handle duplicated UP streams
// Check for duplicate UP streams
let existingStream = presistentStreams.read { presistentStreams in
presistentStreams[upKind]
}
if let existingStream {
if existingStream.stream.id < stream.stream.id {
// The new stream has a higher ID, so reset the existing one
existingStream.close(abort: false)
logger.debug(
"Reset older UP stream with lower ID",
metadata: ["existingStreamId": "\(existingStream.stream.id)", "newStreamId": "\(stream.stream.id)"]
)
} else {
// The existing stream has a higher ID or is equal, so reset the new one
stream.close(abort: false)
logger.debug(
"Duplicate UP stream detected, closing new stream with lower or equal ID",
metadata: ["existingStreamId": "\(existingStream.stream.id)", "newStreamId": "\(stream.stream.id)"]
)
return // Exit without replacing the existing stream
}
}

// Write the new stream as the active one for this UP kind
presistentStreams.write { presistentStreams in
presistentStreams[upKind] = stream
}
runPresistentStreamLoop(stream: stream, kind: upKind)
return
}

if let ceKind = Handler.EphemeralHandler.StreamKind(rawValue: byte) {
logger.debug("stream opened. kind: \(ceKind)")

Expand Down
130 changes: 96 additions & 34 deletions Networking/Sources/Networking/Peer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public enum PeerRole: Sendable, Hashable {
// case proxy // not yet specified
}

struct ReconnectState {
struct BackoffState {
var attempt: Int
var delay: TimeInterval

Expand All @@ -25,7 +25,6 @@ struct ReconnectState {
delay = 1
}

// Initializer with custom values
init(attempt: Int = 0, delay: TimeInterval = 1) {
self.attempt = attempt
self.delay = delay
Expand Down Expand Up @@ -234,9 +233,10 @@ final class PeerImpl<Handler: StreamHandler>: Sendable {

fileprivate let connections: ThreadSafeContainer<ConnectionStorage> = .init(.init())
fileprivate let streams: ThreadSafeContainer<[UniqueId: Stream<Handler>]> = .init([:])
fileprivate let reconnectStates: ThreadSafeContainer<[NetAddr: ReconnectState]> = .init([:])
fileprivate let reconnectStates: ThreadSafeContainer<[NetAddr: BackoffState]> = .init([:])
fileprivate let reopenStates: ThreadSafeContainer<[UniqueId: BackoffState]> = .init([:])

let reconnectMaxRetryAttempts = 5
let maxRetryAttempts = 5
let presistentStreamHandler: Handler.PresistentHandler
let ephemeralStreamHandler: Handler.EphemeralHandler

Expand Down Expand Up @@ -297,39 +297,70 @@ final class PeerImpl<Handler: StreamHandler>: Sendable {
let state = reconnectStates.read { reconnectStates in
reconnectStates[address] ?? .init()
}
if state.attempt < reconnectMaxRetryAttempts {
reconnectStates.write { reconnectStates in
if var state = reconnectStates[address] {
state.applyBackoff()
reconnectStates[address] = state
}

guard state.attempt < maxRetryAttempts else {
logger.warning("reconnecting to \(address) exceeded max attempts")
return
}

reconnectStates.write { reconnectStates in
if var state = reconnectStates[address] {
state.applyBackoff()
reconnectStates[address] = state
}
Task {
try await Task.sleep(for: .seconds(state.delay))
try connections.write { connections in
if connections.byAddr[address] != nil {
logger.warning("reconnecting to \(address) already connected")
return
}
let quicConn = try QuicConnection(
handler: PeerEventHandler(self),
registration: clientConfiguration.registration,
configuration: clientConfiguration
)
try quicConn.connect(to: address)
let conn = Connection(
quicConn,
impl: self,
role: role,
remoteAddress: address,
initiatedByLocal: true
)
connections.byAddr[address] = conn
connections.byId[conn.id] = conn
}
Task {
try await Task.sleep(for: .seconds(state.delay))
try connections.write { connections in
if connections.byAddr[address] != nil {
logger.warning("reconnecting to \(address) already connected")
return
}
let quicConn = try QuicConnection(
handler: PeerEventHandler(self),
registration: clientConfiguration.registration,
configuration: clientConfiguration
)
try quicConn.connect(to: address)
let conn = Connection(
quicConn,
impl: self,
role: role,
remoteAddress: address,
initiatedByLocal: true
)
connections.byAddr[address] = conn
connections.byId[conn.id] = conn
}
}
}

func reopenUpStream(connection: Connection<Handler>, kind: Handler.PresistentHandler.StreamKind) {
let state = reopenStates.read { states in
states[connection.id] ?? .init()
}

guard state.attempt < maxRetryAttempts else {
logger.warning("Reopen attempt for stream \(kind) on connection \(connection.id) exceeded max attempts")
return
}

reopenStates.write { states in
if var state = states[connection.id] {
state.applyBackoff()
states[connection.id] = state
}
}

Task {
try await Task.sleep(for: .seconds(state.delay))
do {
logger.debug("Attempting to reopen UP stream of kind \(kind) for connection \(connection.id)")
try connection.createPreistentStream(kind: kind)
} catch {
logger.error("Failed to reopen UP stream for connection \(connection.id): \(error)")
reopenUpStream(connection: connection, kind: kind)
}
} else {
logger.warning("reconnect attempt exceeded max attempts")
}
}

Expand Down Expand Up @@ -546,6 +577,10 @@ private struct PeerEventHandler<Handler: StreamHandler>: QuicEventHandler {
}
if let conn {
conn.streamStarted(stream: stream)
// Check
impl.reopenStates.write { states in
states[conn.id] = nil
}
}
}

Expand All @@ -562,12 +597,25 @@ private struct PeerEventHandler<Handler: StreamHandler>: QuicEventHandler {
let stream = impl.streams.read { streams in
streams[quicStream.id]
}

if let stream {
let connection = impl.connections.read { connections in
connections.byId[stream.connectionId]
}
if let connection {
connection.streamClosed(stream: stream, abort: !status.isSucceeded)
if shouldReopenStream(connection: connection, stream: stream, status: status) {
do {
if let kind = stream.kind {
// impl.reopenUpStream(connection: connection, kind: kind);
do {
try connection.createPreistentStream(kind: kind)
} catch {
logger.error("Attempt to recreate the persistent stream failed: \(error)")
}
}
}
}
} else {
logger.warning(
"Stream closed but connection is gone?", metadata: ["streamId": "\(stream.id)"]
Expand All @@ -579,4 +627,18 @@ private struct PeerEventHandler<Handler: StreamHandler>: QuicEventHandler {
)
}
}

// TODO: Add all the cases about reopen up stream
private func shouldReopenStream(connection: Connection<Handler>, stream: Stream<Handler>, status: QuicStatus) -> Bool {
// Only reopen if the stream is a persistent UP stream and the closure was unexpected
if connection.isClosed || connection.needReconnect || stream.kind == nil {
return false
}
switch QuicStatusCode(rawValue: status.rawValue) {
case .connectionIdle, .badCert:
return false
default:
return !status.isSucceeded
}
}
}
121 changes: 120 additions & 1 deletion Networking/Tests/NetworkingTests/PeerTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,124 @@ struct PeerTests {
typealias EphemeralHandler = MockEphemeralStreamHandler
}

@Test
func reopenUpStream() async throws {
let handler2 = MockPresentStreamHandler()
var messageData = Data("reopen up stream".utf8)
let peer1 = try Peer(
options: PeerOptions<MockStreamHandler>(
role: .validator,
listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 0)!,
genesisHeader: Data32(),
secretKey: Ed25519.SecretKey(from: Data32.random()),
presistentStreamHandler: MockPresentStreamHandler(),
ephemeralStreamHandler: MockEphemeralStreamHandler(),
serverSettings: .defaultSettings,
clientSettings: .defaultSettings
)
)
let peer2 = try Peer(
options: PeerOptions<MockStreamHandler>(
role: .validator,
listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 0)!,
genesisHeader: Data32(),
secretKey: Ed25519.SecretKey(from: Data32.random()),
presistentStreamHandler: handler2,
ephemeralStreamHandler: MockEphemeralStreamHandler(),
serverSettings: .defaultSettings,
clientSettings: .defaultSettings
)
)
try? await Task.sleep(for: .milliseconds(100))

let connection = try peer1.connect(
to: peer2.listenAddress(), role: .validator
)
try? await Task.sleep(for: .milliseconds(100))

peer1.broadcast(
kind: .uniqueA, message: .init(kind: .uniqueA, data: messageData)
)
try? await Task.sleep(for: .milliseconds(100))
let lastReceivedData = await handler2.lastReceivedData
#expect(lastReceivedData == messageData)

try? await Task.sleep(for: .milliseconds(100))
// Simulate abnormal close stream
let stream = connection.presistentStreams.read { presistentStreams in
presistentStreams[.uniqueA]
}
stream!.close(abort: true)
// Wait to simulate downtime & reopen up stream 3~5s
try? await Task.sleep(for: .milliseconds(3000))
messageData = Data("reopen up stream data".utf8)
peer1.broadcast(
kind: .uniqueA, message: .init(kind: .uniqueA, data: messageData)
)
try await Task.sleep(for: .milliseconds(1000))
let lastReceivedData2 = await handler2.lastReceivedData
#expect(lastReceivedData2 == messageData)
}

@Test
func regularClosedStream() async throws {
let handler2 = MockPresentStreamHandler()
var messageData = Data("reopen up stream".utf8)
let peer1 = try Peer(
options: PeerOptions<MockStreamHandler>(
role: .validator,
listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 0)!,
genesisHeader: Data32(),
secretKey: Ed25519.SecretKey(from: Data32.random()),
presistentStreamHandler: MockPresentStreamHandler(),
ephemeralStreamHandler: MockEphemeralStreamHandler(),
serverSettings: .defaultSettings,
clientSettings: .defaultSettings
)
)
let peer2 = try Peer(
options: PeerOptions<MockStreamHandler>(
role: .validator,
listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 0)!,
genesisHeader: Data32(),
secretKey: Ed25519.SecretKey(from: Data32.random()),
presistentStreamHandler: handler2,
ephemeralStreamHandler: MockEphemeralStreamHandler(),
serverSettings: .defaultSettings,
clientSettings: .defaultSettings
)
)
try? await Task.sleep(for: .milliseconds(100))

let connection = try peer1.connect(
to: peer2.listenAddress(), role: .validator
)
try? await Task.sleep(for: .milliseconds(100))

peer1.broadcast(
kind: .uniqueA, message: .init(kind: .uniqueA, data: messageData)
)
try? await Task.sleep(for: .milliseconds(100))
let lastReceivedData = await handler2.lastReceivedData
#expect(lastReceivedData == messageData)

try? await Task.sleep(for: .milliseconds(100))
// Simulate regular close stream
let stream = connection.presistentStreams.read { presistentStreams in
presistentStreams[.uniqueA]
}
stream!.close(abort: false)
// Wait to simulate downtime
try? await Task.sleep(for: .milliseconds(3000))
messageData = Data("close up stream".utf8)
peer1.broadcast(
kind: .uniqueA, message: .init(kind: .uniqueA, data: messageData)
)
try await Task.sleep(for: .milliseconds(1000))
let lastReceivedData2 = await handler2.lastReceivedData
#expect(lastReceivedData2 != messageData)
}

@Test
func concurrentPeerConnection() async throws {
let peer1 = try Peer(
Expand Down Expand Up @@ -611,10 +729,11 @@ struct PeerTests {
data: Data("Message from peer \(i)".utf8)
)
peer.broadcast(kind: message.kind, message: message)
try? await Task.sleep(for: .milliseconds(50))
}

// Wait for message propagation
try? await Task.sleep(for: .milliseconds(100))
try? await Task.sleep(for: .milliseconds(1000))

// everyone should receive two messages
for (idx, handler) in handlers.enumerated() {
Expand Down
4 changes: 4 additions & 0 deletions Utils/Sources/Utils/UniqueId.swift
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ extension UniqueId: Equatable {
public static func == (lhs: UniqueId, rhs: UniqueId) -> Bool {
lhs.id == rhs.id
}

public static func < (lhs: UniqueId, rhs: UniqueId) -> Bool {
lhs.id < rhs.id
}
}

extension UniqueId: Hashable {
Expand Down

0 comments on commit e3879eb

Please sign in to comment.