Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update reopen up stream #218

Merged
merged 4 commits into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Networking/Sources/MsQuicSwift/QuicStream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,6 @@ private class StreamHandle {

case QUIC_STREAM_EVENT_PEER_SEND_ABORTED:
logger.trace("Peer send aborted")
// TODO: check if we need to close the stream completely

case QUIC_STREAM_EVENT_SHUTDOWN_COMPLETE:
logger.trace("Stream shutdown complete")
Expand Down
26 changes: 25 additions & 1 deletion Networking/Sources/Networking/Connection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -234,13 +234,37 @@ public final class Connection<Handler: StreamHandler>: Sendable, ConnectionInfoP
return
}
if let upKind = Handler.PresistentHandler.StreamKind(rawValue: byte) {
// TODO: handle duplicated UP streams
// Check for duplicate UP streams
let existingStream = presistentStreams.read { presistentStreams in
presistentStreams[upKind]
}
if let existingStream {
if existingStream.stream.id < stream.stream.id {
// The new stream has a higher ID, so reset the existing one
existingStream.close(abort: false)
logger.debug(
"Reset older UP stream with lower ID",
metadata: ["existingStreamId": "\(existingStream.stream.id)", "newStreamId": "\(stream.stream.id)"]
)
} else {
// The existing stream has a higher ID or is equal, so reset the new one
stream.close(abort: false)
logger.debug(
"Duplicate UP stream detected, closing new stream with lower or equal ID",
metadata: ["existingStreamId": "\(existingStream.stream.id)", "newStreamId": "\(stream.stream.id)"]
)
return // Exit without replacing the existing stream
}
}

// Write the new stream as the active one for this UP kind
presistentStreams.write { presistentStreams in
presistentStreams[upKind] = stream
}
runPresistentStreamLoop(stream: stream, kind: upKind)
return
}

if let ceKind = Handler.EphemeralHandler.StreamKind(rawValue: byte) {
logger.debug("stream opened. kind: \(ceKind)")

Expand Down
130 changes: 96 additions & 34 deletions Networking/Sources/Networking/Peer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public enum PeerRole: Sendable, Hashable {
// case proxy // not yet specified
}

struct ReconnectState {
struct BackoffState {
var attempt: Int
var delay: TimeInterval

Expand All @@ -25,7 +25,6 @@ struct ReconnectState {
delay = 1
}

// Initializer with custom values
init(attempt: Int = 0, delay: TimeInterval = 1) {
self.attempt = attempt
self.delay = delay
Expand Down Expand Up @@ -234,9 +233,10 @@ final class PeerImpl<Handler: StreamHandler>: Sendable {

fileprivate let connections: ThreadSafeContainer<ConnectionStorage> = .init(.init())
fileprivate let streams: ThreadSafeContainer<[UniqueId: Stream<Handler>]> = .init([:])
fileprivate let reconnectStates: ThreadSafeContainer<[NetAddr: ReconnectState]> = .init([:])
fileprivate let reconnectStates: ThreadSafeContainer<[NetAddr: BackoffState]> = .init([:])
fileprivate let reopenStates: ThreadSafeContainer<[UniqueId: BackoffState]> = .init([:])

let reconnectMaxRetryAttempts = 5
let maxRetryAttempts = 5
let presistentStreamHandler: Handler.PresistentHandler
let ephemeralStreamHandler: Handler.EphemeralHandler

Expand Down Expand Up @@ -297,39 +297,70 @@ final class PeerImpl<Handler: StreamHandler>: Sendable {
let state = reconnectStates.read { reconnectStates in
reconnectStates[address] ?? .init()
}
if state.attempt < reconnectMaxRetryAttempts {
reconnectStates.write { reconnectStates in
if var state = reconnectStates[address] {
state.applyBackoff()
reconnectStates[address] = state
}

guard state.attempt < maxRetryAttempts else {
logger.warning("reconnecting to \(address) exceeded max attempts")
return
}

reconnectStates.write { reconnectStates in
if var state = reconnectStates[address] {
state.applyBackoff()
reconnectStates[address] = state
}
Task {
try await Task.sleep(for: .seconds(state.delay))
try connections.write { connections in
if connections.byAddr[address] != nil {
logger.warning("reconnecting to \(address) already connected")
return
}
let quicConn = try QuicConnection(
handler: PeerEventHandler(self),
registration: clientConfiguration.registration,
configuration: clientConfiguration
)
try quicConn.connect(to: address)
let conn = Connection(
quicConn,
impl: self,
role: role,
remoteAddress: address,
initiatedByLocal: true
)
connections.byAddr[address] = conn
connections.byId[conn.id] = conn
}
Task {
try await Task.sleep(for: .seconds(state.delay))
try connections.write { connections in
if connections.byAddr[address] != nil {
logger.warning("reconnecting to \(address) already connected")
return
}
let quicConn = try QuicConnection(
handler: PeerEventHandler(self),
registration: clientConfiguration.registration,
configuration: clientConfiguration
)
try quicConn.connect(to: address)
let conn = Connection(
quicConn,
impl: self,
role: role,
remoteAddress: address,
initiatedByLocal: true
)
connections.byAddr[address] = conn
connections.byId[conn.id] = conn
}
}
}

func reopenUpStream(connection: Connection<Handler>, kind: Handler.PresistentHandler.StreamKind) {
let state = reopenStates.read { states in
states[connection.id] ?? .init()
}

guard state.attempt < maxRetryAttempts else {
logger.warning("Reopen attempt for stream \(kind) on connection \(connection.id) exceeded max attempts")
return
}

reopenStates.write { states in
if var state = states[connection.id] {
state.applyBackoff()
states[connection.id] = state
}
}

Task {
try await Task.sleep(for: .seconds(state.delay))
do {
logger.debug("Attempting to reopen UP stream of kind \(kind) for connection \(connection.id)")
try connection.createPreistentStream(kind: kind)
} catch {
logger.error("Failed to reopen UP stream for connection \(connection.id): \(error)")
reopenUpStream(connection: connection, kind: kind)
}
} else {
logger.warning("reconnect attempt exceeded max attempts")
}
}

Expand Down Expand Up @@ -546,6 +577,10 @@ private struct PeerEventHandler<Handler: StreamHandler>: QuicEventHandler {
}
if let conn {
conn.streamStarted(stream: stream)
// Check
impl.reopenStates.write { states in
states[conn.id] = nil
}
}
}

Expand All @@ -562,12 +597,25 @@ private struct PeerEventHandler<Handler: StreamHandler>: QuicEventHandler {
let stream = impl.streams.read { streams in
streams[quicStream.id]
}

if let stream {
let connection = impl.connections.read { connections in
connections.byId[stream.connectionId]
}
if let connection {
connection.streamClosed(stream: stream, abort: !status.isSucceeded)
if shouldReopenStream(connection: connection, stream: stream, status: status) {
do {
if let kind = stream.kind {
// impl.reopenUpStream(connection: connection, kind: kind);
do {
try connection.createPreistentStream(kind: kind)
} catch {
logger.error("Attempt to recreate the persistent stream failed: \(error)")
}
}
}
}
} else {
logger.warning(
"Stream closed but connection is gone?", metadata: ["streamId": "\(stream.id)"]
Expand All @@ -579,4 +627,18 @@ private struct PeerEventHandler<Handler: StreamHandler>: QuicEventHandler {
)
}
}

// TODO: Add all the cases about reopen up stream
private func shouldReopenStream(connection: Connection<Handler>, stream: Stream<Handler>, status: QuicStatus) -> Bool {
// Only reopen if the stream is a persistent UP stream and the closure was unexpected
if connection.isClosed || connection.needReconnect || stream.kind == nil {
return false
}
switch QuicStatusCode(rawValue: status.rawValue) {
case .connectionIdle, .badCert:
return false
default:
return !status.isSucceeded
}
}
}
121 changes: 120 additions & 1 deletion Networking/Tests/NetworkingTests/PeerTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,124 @@ struct PeerTests {
typealias EphemeralHandler = MockEphemeralStreamHandler
}

@Test
func reopenUpStream() async throws {
let handler2 = MockPresentStreamHandler()
var messageData = Data("reopen up stream".utf8)
let peer1 = try Peer(
options: PeerOptions<MockStreamHandler>(
role: .validator,
listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 0)!,
genesisHeader: Data32(),
secretKey: Ed25519.SecretKey(from: Data32.random()),
presistentStreamHandler: MockPresentStreamHandler(),
ephemeralStreamHandler: MockEphemeralStreamHandler(),
serverSettings: .defaultSettings,
clientSettings: .defaultSettings
)
)
let peer2 = try Peer(
options: PeerOptions<MockStreamHandler>(
role: .validator,
listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 0)!,
genesisHeader: Data32(),
secretKey: Ed25519.SecretKey(from: Data32.random()),
presistentStreamHandler: handler2,
ephemeralStreamHandler: MockEphemeralStreamHandler(),
serverSettings: .defaultSettings,
clientSettings: .defaultSettings
)
)
try? await Task.sleep(for: .milliseconds(100))

let connection = try peer1.connect(
to: peer2.listenAddress(), role: .validator
)
try? await Task.sleep(for: .milliseconds(100))

peer1.broadcast(
kind: .uniqueA, message: .init(kind: .uniqueA, data: messageData)
)
try? await Task.sleep(for: .milliseconds(100))
let lastReceivedData = await handler2.lastReceivedData
#expect(lastReceivedData == messageData)

try? await Task.sleep(for: .milliseconds(100))
// Simulate abnormal close stream
let stream = connection.presistentStreams.read { presistentStreams in
presistentStreams[.uniqueA]
}
stream!.close(abort: true)
// Wait to simulate downtime & reopen up stream 3~5s
try? await Task.sleep(for: .milliseconds(3000))
messageData = Data("reopen up stream data".utf8)
peer1.broadcast(
kind: .uniqueA, message: .init(kind: .uniqueA, data: messageData)
)
try await Task.sleep(for: .milliseconds(1000))
let lastReceivedData2 = await handler2.lastReceivedData
#expect(lastReceivedData2 == messageData)
}

@Test
func regularClosedStream() async throws {
let handler2 = MockPresentStreamHandler()
var messageData = Data("reopen up stream".utf8)
let peer1 = try Peer(
options: PeerOptions<MockStreamHandler>(
role: .validator,
listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 0)!,
genesisHeader: Data32(),
secretKey: Ed25519.SecretKey(from: Data32.random()),
presistentStreamHandler: MockPresentStreamHandler(),
ephemeralStreamHandler: MockEphemeralStreamHandler(),
serverSettings: .defaultSettings,
clientSettings: .defaultSettings
)
)
let peer2 = try Peer(
options: PeerOptions<MockStreamHandler>(
role: .validator,
listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 0)!,
genesisHeader: Data32(),
secretKey: Ed25519.SecretKey(from: Data32.random()),
presistentStreamHandler: handler2,
ephemeralStreamHandler: MockEphemeralStreamHandler(),
serverSettings: .defaultSettings,
clientSettings: .defaultSettings
)
)
try? await Task.sleep(for: .milliseconds(100))

let connection = try peer1.connect(
to: peer2.listenAddress(), role: .validator
)
try? await Task.sleep(for: .milliseconds(100))

peer1.broadcast(
kind: .uniqueA, message: .init(kind: .uniqueA, data: messageData)
)
try? await Task.sleep(for: .milliseconds(100))
let lastReceivedData = await handler2.lastReceivedData
#expect(lastReceivedData == messageData)

try? await Task.sleep(for: .milliseconds(100))
// Simulate regular close stream
let stream = connection.presistentStreams.read { presistentStreams in
presistentStreams[.uniqueA]
}
stream!.close(abort: false)
// Wait to simulate downtime
try? await Task.sleep(for: .milliseconds(3000))
messageData = Data("close up stream".utf8)
peer1.broadcast(
kind: .uniqueA, message: .init(kind: .uniqueA, data: messageData)
)
try await Task.sleep(for: .milliseconds(1000))
let lastReceivedData2 = await handler2.lastReceivedData
#expect(lastReceivedData2 != messageData)
}

@Test
func concurrentPeerConnection() async throws {
let peer1 = try Peer(
Expand Down Expand Up @@ -611,10 +729,11 @@ struct PeerTests {
data: Data("Message from peer \(i)".utf8)
)
peer.broadcast(kind: message.kind, message: message)
try? await Task.sleep(for: .milliseconds(50))
}

// Wait for message propagation
try? await Task.sleep(for: .milliseconds(100))
try? await Task.sleep(for: .milliseconds(1000))

// everyone should receive two messages
for (idx, handler) in handlers.enumerated() {
Expand Down
4 changes: 4 additions & 0 deletions Utils/Sources/Utils/UniqueId.swift
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ extension UniqueId: Equatable {
public static func == (lhs: UniqueId, rhs: UniqueId) -> Bool {
lhs.id == rhs.id
}

public static func < (lhs: UniqueId, rhs: UniqueId) -> Bool {
lhs.id < rhs.id
}
}

extension UniqueId: Hashable {
Expand Down