diff --git a/Blockchain/Sources/Blockchain/Blockchain.swift b/Blockchain/Sources/Blockchain/Blockchain.swift index 3bc70206..b7b64e48 100644 --- a/Blockchain/Sources/Blockchain/Blockchain.swift +++ b/Blockchain/Sources/Blockchain/Blockchain.swift @@ -2,8 +2,6 @@ import Foundation import TracingUtils import Utils -private let logger = Logger(label: "Blockchain") - public final class Blockchain: ServiceBase, @unchecked Sendable { public let dataProvider: BlockchainDataProvider public let timeProvider: TimeProvider @@ -17,9 +15,9 @@ public final class Blockchain: ServiceBase, @unchecked Sendable { self.dataProvider = dataProvider self.timeProvider = timeProvider - super.init(config, eventBus) + super.init(logger: Logger(label: "Blockchain"), config: config, eventBus: eventBus) - await subscribe(RuntimeEvents.BlockAuthored.self) { [weak self] event in + await subscribe(RuntimeEvents.BlockAuthored.self, id: "Blockchain.BlockAuthored") { [weak self] event in try await self?.on(blockAuthored: event) } } @@ -36,8 +34,9 @@ public final class Blockchain: ServiceBase, @unchecked Sendable { let runtime = Runtime(config: config) let parent = try await dataProvider.getState(hash: block.header.parentHash) - let timeslot = timeProvider.getTimeslot() - let state = try runtime.apply(block: block, state: parent, context: .init(timeslot: timeslot)) + let timeslot = timeProvider.getTime().timeToTimeslot(config: config) + // TODO: figure out what is the best way to deal with block received a bit too early + let state = try runtime.apply(block: block, state: parent, context: .init(timeslot: timeslot + 1)) try await dataProvider.blockImported(block: block, state: state) diff --git a/Blockchain/Sources/Blockchain/Config/ProtocolConfig+Timing.swift b/Blockchain/Sources/Blockchain/Config/ProtocolConfig+Timing.swift new file mode 100644 index 00000000..c7528702 --- /dev/null +++ b/Blockchain/Sources/Blockchain/Config/ProtocolConfig+Timing.swift @@ -0,0 +1,38 @@ +import Foundation +import Utils + +extension ProtocolConfigRef { + // TODO: determine appropriate values + public var authoringStartTimeDelta: TimeInterval { + -TimeInterval(value.slotPeriodSeconds) + } + + public var authoringDeadline: TimeInterval { + TimeInterval(value.slotPeriodSeconds) / 3 + } + + public var guaranteeingStartTimeDelta: TimeInterval { + authoringStartTimeDelta - TimeInterval(value.slotPeriodSeconds) + } + + public var guaranteeingDeadline: TimeInterval { + TimeInterval(value.slotPeriodSeconds) / 3 * 2 + } + + public var prepareEpochStartTimeDelta: TimeInterval { + -TimeInterval(value.slotPeriodSeconds) * 3 + } + + public func scheduleTimeForAuthoring(timeslot: TimeslotIndex) -> TimeInterval { + TimeInterval(timeslot.timeslotToTime(config: self)) + authoringStartTimeDelta + } + + public func scheduleTimeForGuaranteeing(timeslot: TimeslotIndex) -> TimeInterval { + TimeInterval(timeslot.timeslotToTime(config: self)) + guaranteeingStartTimeDelta + } + + public func scheduleTimeForPrepareEpoch(epoch: EpochIndex) -> TimeInterval { + let timeslot = epoch.epochToTimeslotIndex(config: self) + return TimeInterval(timeslot.timeslotToTime(config: self)) + prepareEpochStartTimeDelta + } +} diff --git a/Blockchain/Sources/Blockchain/Scheduler/Date+Extension.swift b/Blockchain/Sources/Blockchain/Scheduler/Date+Extension.swift index 9921e95c..0fbce54b 100644 --- a/Blockchain/Sources/Blockchain/Scheduler/Date+Extension.swift +++ b/Blockchain/Sources/Blockchain/Scheduler/Date+Extension.swift @@ -7,9 +7,9 @@ extension Date { 1_704_110_400 } - public var timeIntervalSinceJamCommonEra: UInt32 { + public var timeIntervalSinceJamCommonEra: TimeInterval { let beginning = Double(Date.jamCommonEraBeginning) let now = timeIntervalSince1970 - return UInt32(now - beginning) + return now - beginning } } diff --git a/Blockchain/Sources/Blockchain/Scheduler/Scheduler.swift b/Blockchain/Sources/Blockchain/Scheduler/Scheduler.swift index 72311973..89ad9b33 100644 --- a/Blockchain/Sources/Blockchain/Scheduler/Scheduler.swift +++ b/Blockchain/Sources/Blockchain/Scheduler/Scheduler.swift @@ -42,36 +42,20 @@ extension Scheduler { task: @escaping @Sendable () async -> Void, onCancel: (@Sendable () -> Void)? = nil ) -> Cancellable { - logger.trace("scheduling task: \(id)", metadata: ["delay": "\(delay)", "repeats": "\(repeats)"]) + guard delay >= 0 else { + logger.error("scheduling task with negative delay", metadata: ["id": "\(id)", "delay": "\(delay)", "repeats": "\(repeats)"]) + return Cancellable {} + } + logger.trace("scheduling task", metadata: ["id": "\(id)", "delay": "\(delay)", "repeats": "\(repeats)"]) let cancellable = scheduleImpl(delay: delay, repeats: repeats, task: { - logger.trace("executing task: \(id)") + logger.trace("executing task", metadata: ["id": "\(id)"]) await task() }, onCancel: onCancel) return Cancellable { - logger.trace("cancelling task: \(id)") + logger.trace("cancelling task", metadata: ["id": "\(id)"]) cancellable.cancel() } } - - public func schedule( - id: UniqueId = "", - at timeslot: TimeslotIndex, - task: @escaping @Sendable () async -> Void, - onCancel: (@Sendable () -> Void)? = nil - ) -> Cancellable { - let nowTimeslot = timeProvider.getTimeslot() - if timeslot == nowTimeslot { - return schedule(id: id, delay: 0, repeats: false, task: task, onCancel: onCancel) - } - - let deadline = timeProvider.timeslotToTime(timeslot) - let now = timeProvider.getTime() - if deadline < now { - logger.error("scheduling task in the past", metadata: ["deadline": "\(deadline)", "now": "\(now)"]) - return Cancellable {} - } - return schedule(id: id, delay: TimeInterval(deadline - now), repeats: false, task: task, onCancel: onCancel) - } } extension Scheduler { diff --git a/Blockchain/Sources/Blockchain/Scheduler/TimeProvider.swift b/Blockchain/Sources/Blockchain/Scheduler/TimeProvider.swift index 9eea360c..8a29fd02 100644 --- a/Blockchain/Sources/Blockchain/Scheduler/TimeProvider.swift +++ b/Blockchain/Sources/Blockchain/Scheduler/TimeProvider.swift @@ -2,55 +2,39 @@ import Foundation import Utils public protocol TimeProvider: Sendable { - var slotPeriodSeconds: UInt32 { get } - - func getTime() -> UInt32 + func getTimeInterval() -> TimeInterval } extension TimeProvider { - public func getTimeslot() -> TimeslotIndex { - timeToTimeslot(getTime()) - } - - public func timeslotToTime(_ timeslot: TimeslotIndex) -> UInt32 { - timeslot * slotPeriodSeconds - } - - public func timeToTimeslot(_ time: UInt32) -> TimeslotIndex { - time / slotPeriodSeconds + public func getTime() -> UInt32 { + UInt32(getTimeInterval()) } } public final class SystemTimeProvider: TimeProvider { - public let slotPeriodSeconds: UInt32 - - public init(slotPeriodSeconds: UInt32) { - self.slotPeriodSeconds = slotPeriodSeconds - } + public init() {} - public func getTime() -> UInt32 { + public func getTimeInterval() -> TimeInterval { Date().timeIntervalSinceJamCommonEra } } public final class MockTimeProvider: TimeProvider { - public let slotPeriodSeconds: UInt32 - public let time: ThreadSafeContainer + public let time: ThreadSafeContainer - public init(slotPeriodSeconds: UInt32, time: UInt32 = 0) { - self.slotPeriodSeconds = slotPeriodSeconds + public init(time: TimeInterval = 0) { self.time = ThreadSafeContainer(time) } - public func getTime() -> UInt32 { + public func getTimeInterval() -> TimeInterval { time.value } - public func advance(by interval: UInt32) { + public func advance(by interval: TimeInterval) { time.write { $0 += interval } } - public func advance(to: UInt32) { + public func advance(to: TimeInterval) { time.value = to } } diff --git a/Blockchain/Sources/Blockchain/Types/primitives.swift b/Blockchain/Sources/Blockchain/Types/primitives.swift index 0a0ec673..30c914fa 100644 --- a/Blockchain/Sources/Blockchain/Types/primitives.swift +++ b/Blockchain/Sources/Blockchain/Types/primitives.swift @@ -2,7 +2,7 @@ import Utils public typealias Balance = Utils.Balance public typealias ServiceIndex = UInt32 -public typealias TimeslotIndex = UInt32 +public typealias TimeslotIndex = UInt32 // TODO: use new type public typealias Gas = Utils.Gas public typealias DataLength = UInt32 @@ -19,14 +19,20 @@ public typealias BandersnatchRingVRFProof = Data784 public typealias BandersnatchRingVRFRoot = Data144 public typealias BLSKey = Data144 -extension TimeslotIndex { +extension UInt32 { public func timeslotToEpochIndex(config: ProtocolConfigRef) -> EpochIndex { self / EpochIndex(config.value.epochLength) } -} -extension EpochIndex { public func epochToTimeslotIndex(config: ProtocolConfigRef) -> TimeslotIndex { self * TimeslotIndex(config.value.epochLength) } + + public func timeslotToTime(config: ProtocolConfigRef) -> UInt32 { + self * UInt32(config.value.slotPeriodSeconds) + } + + public func timeToTimeslot(config: ProtocolConfigRef) -> UInt32 { + self / UInt32(config.value.slotPeriodSeconds) + } } diff --git a/Blockchain/Sources/Blockchain/Validator/BlockAuthor.swift b/Blockchain/Sources/Blockchain/Validator/BlockAuthor.swift index bce6e58e..4a31c64a 100644 --- a/Blockchain/Sources/Blockchain/Validator/BlockAuthor.swift +++ b/Blockchain/Sources/Blockchain/Validator/BlockAuthor.swift @@ -3,8 +3,6 @@ import Foundation import TracingUtils import Utils -private let logger = Logger(label: "BlockAuthor") - public final class BlockAuthor: ServiceBase2, @unchecked Sendable { private let dataProvider: BlockchainDataProvider private let keystore: KeyStore @@ -24,27 +22,32 @@ public final class BlockAuthor: ServiceBase2, @unchecked Sendable { self.keystore = keystore self.extrinsicPool = extrinsicPool - super.init(config, eventBus, scheduler) + super.init(logger: Logger(label: "BlockAuthor"), config: config, eventBus: eventBus, scheduler: scheduler) - await subscribe(RuntimeEvents.SafroleTicketsGenerated.self) { [weak self] event in + await subscribe(RuntimeEvents.SafroleTicketsGenerated.self, id: "BlockAuthor.SafroleTicketsGenerated") { [weak self] event in try await self?.on(safroleTicketsGenerated: event) } - scheduleForNextEpoch("BlockAuthor.scheduleForNextEpoch") { [weak self] timeslot in - await self?.onBeforeEpoch(timeslot: timeslot) + scheduleForNextEpoch("BlockAuthor.scheduleForNextEpoch") { [weak self] epoch in + await self?.onBeforeEpoch(epoch: epoch) } } public func on(genesis state: StateRef) async { - await scheduleNewBlocks(ticketsOrKeys: state.value.safroleState.ticketsOrKeys, timeslot: timeProvider.getTimeslot()) + await scheduleNewBlocks( + ticketsOrKeys: state.value.safroleState.ticketsOrKeys, + timeslot: timeProvider.getTime().timeToTimeslot(config: config) + ) } - public func createNewBlock(claim: Either<(TicketItemAndOutput, Bandersnatch.PublicKey), Bandersnatch.PublicKey>) async throws + public func createNewBlock( + timeslot: TimeslotIndex, + claim: Either<(TicketItemAndOutput, Bandersnatch.PublicKey), Bandersnatch.PublicKey> + ) async throws -> BlockRef { let parentHash = dataProvider.bestHead let state = try await dataProvider.getState(hash: parentHash) - let timeslot = timeProvider.getTimeslot() let epoch = timeslot.timeslotToEpochIndex(config: config) let pendingTickets = await extrinsicPool.getPendingTickets(epoch: epoch) @@ -146,9 +149,13 @@ public final class BlockAuthor: ServiceBase2, @unchecked Sendable { return BlockRef(block) } - private func newBlock(claim: Either<(TicketItemAndOutput, Bandersnatch.PublicKey), Bandersnatch.PublicKey>) async { + private func newBlock( + timeslot: TimeslotIndex, + claim: Either<(TicketItemAndOutput, Bandersnatch.PublicKey), Bandersnatch.PublicKey> + ) async { await withSpan("BlockAuthor.newBlock", logger: logger) { _ in - let block = try await createNewBlock(claim: claim) + // TODO: add timeout + let block = try await createNewBlock(timeslot: timeslot, claim: claim) logger.info("New block created: #\(block.header.timeslot) \(block.hash)") publish(RuntimeEvents.BlockAuthored(block: block)) } @@ -158,10 +165,11 @@ public final class BlockAuthor: ServiceBase2, @unchecked Sendable { tickets.write { $0.append(event) } } - private func onBeforeEpoch(timeslot: TimeslotIndex) async { - logger.debug("scheduling new blocks for epoch \(timeslot.timeslotToEpochIndex(config: config))") + private func onBeforeEpoch(epoch: EpochIndex) async { + logger.debug("scheduling new blocks for epoch \(epoch)") await withSpan("BlockAuthor.onBeforeEpoch", logger: logger) { _ in tickets.value = [] + let timeslot = epoch.epochToTimeslotIndex(config: config) let bestHead = dataProvider.bestHead let state = try await dataProvider.getState(hash: bestHead) @@ -179,10 +187,11 @@ public final class BlockAuthor: ServiceBase2, @unchecked Sendable { } } - private func scheduleNewBlocks(ticketsOrKeys: SafroleTicketsOrKeys, timeslot now: TimeslotIndex) async { + private func scheduleNewBlocks(ticketsOrKeys: SafroleTicketsOrKeys, timeslot base: TimeslotIndex) async { let selfTickets = tickets.value - let epochBase = now.timeslotToEpochIndex(config: config) + let epochBase = base.timeslotToEpochIndex(config: config) let timeslotBase = epochBase.epochToTimeslotIndex(config: config) + let now = timeProvider.getTimeInterval() switch ticketsOrKeys { case let .left(tickets): if selfTickets.isEmpty { @@ -191,13 +200,15 @@ public final class BlockAuthor: ServiceBase2, @unchecked Sendable { for (idx, ticket) in tickets.enumerated() { if let claim = selfTickets.first(withOutput: ticket.id) { let timeslot = timeslotBase + TimeslotIndex(idx) - if timeslot <= now { + let time = config.scheduleTimeForAuthoring(timeslot: timeslot) + let delay = time - now + if delay < 0 { continue } logger.debug("Scheduling new block task at timeslot \(timeslot)") - schedule(id: "BlockAuthor.newBlock", at: timeslot) { [weak self] in + schedule(id: "BlockAuthor.newBlock", delay: delay) { [weak self] in if let self { - await newBlock(claim: .left(claim)) + await newBlock(timeslot: timeslot, claim: .left(claim)) } } } @@ -207,13 +218,15 @@ public final class BlockAuthor: ServiceBase2, @unchecked Sendable { let pubkey = try? Bandersnatch.PublicKey(data: key) if let pubkey, await keystore.contains(publicKey: pubkey) { let timeslot = timeslotBase + TimeslotIndex(idx) - if timeslot < now { + let time = config.scheduleTimeForAuthoring(timeslot: timeslot) + let delay = time - now + if delay < 0 { continue } logger.debug("Scheduling new block task at timeslot \(timeslot)") - schedule(id: "BlockAuthor.newBlock", at: timeslot) { [weak self] in + schedule(id: "BlockAuthor.newBlock", delay: delay) { [weak self] in if let self { - await newBlock(claim: .right(pubkey)) + await newBlock(timeslot: timeslot, claim: .right(pubkey)) } } } diff --git a/Blockchain/Sources/Blockchain/Validator/ExtrinsicPoolService.swift b/Blockchain/Sources/Blockchain/Validator/ExtrinsicPoolService.swift index 2f13a5e6..6a9f0522 100644 --- a/Blockchain/Sources/Blockchain/Validator/ExtrinsicPoolService.swift +++ b/Blockchain/Sources/Blockchain/Validator/ExtrinsicPoolService.swift @@ -3,9 +3,9 @@ import Utils private typealias TicketItem = ExtrinsicTickets.TicketItem -private let logger = Logger(label: "ExtrinsicPoolService") - private actor ServiceStorage { + let logger: Logger + // sorted array ordered by output var pendingTickets: SortedUniqueArray = .init() var epoch: EpochIndex = 0 @@ -13,7 +13,8 @@ private actor ServiceStorage { var entropy: Data32 = .init() let ringContext: Bandersnatch.RingContext - init(ringContext: Bandersnatch.RingContext) { + init(logger: Logger, ringContext: Bandersnatch.RingContext) { + self.logger = logger self.ringContext = ringContext } @@ -76,20 +77,22 @@ public final class ExtrinsicPoolService: ServiceBase, @unchecked Sendable { ) async { self.dataProvider = dataProvider + let logger = Logger(label: "ExtrinsicPoolService") + let ringContext = try! Bandersnatch.RingContext(size: UInt(config.value.totalNumberOfValidators)) - storage = ServiceStorage(ringContext: ringContext) + storage = ServiceStorage(logger: logger, ringContext: ringContext) - super.init(config, eventBus) + super.init(logger: logger, config: config, eventBus: eventBus) - await subscribe(RuntimeEvents.SafroleTicketsGenerated.self) { [weak self] event in + await subscribe(RuntimeEvents.SafroleTicketsGenerated.self, id: "ExtrinsicPool.SafroleTicketsGenerated") { [weak self] event in try await self?.on(safroleTicketsGenerated: event) } - await subscribe(RuntimeEvents.BlockFinalized.self) { [weak self] event in + await subscribe(RuntimeEvents.BlockFinalized.self, id: "ExtrinsicPool.BlockFinalized") { [weak self] event in try await self?.on(blockFinalized: event) } - await subscribe(RuntimeEvents.SafroleTicketsReceived.self) { [weak self] event in + await subscribe(RuntimeEvents.SafroleTicketsReceived.self, id: "ExtrinsicPool.SafroleTicketsReceived") { [weak self] event in try await self?.on(safroleTicketsReceived: event) } } diff --git a/Blockchain/Sources/Blockchain/Validator/GuaranteeingService.swift b/Blockchain/Sources/Blockchain/Validator/GuaranteeingService.swift index 51969e31..2c58c0ac 100644 --- a/Blockchain/Sources/Blockchain/Validator/GuaranteeingService.swift +++ b/Blockchain/Sources/Blockchain/Validator/GuaranteeingService.swift @@ -2,8 +2,6 @@ import Foundation import TracingUtils import Utils -private let logger = Logger(label: "BlockAuthor") - // find out when and which core we are guaranteeing for and schedule a task for it // get work package from the pool // try to guarantee the work package @@ -30,7 +28,7 @@ public final class GuaranteeingService: ServiceBase2, @unchecked Sendable { self.runtime = runtime self.extrinsicPool = extrinsicPool - super.init(config, eventBus, scheduler) + super.init(logger: Logger(label: "BlockAuthor"), config: config, eventBus: eventBus, scheduler: scheduler) } public func on(genesis _: StateRef) async {} diff --git a/Blockchain/Sources/Blockchain/Validator/SafroleService.swift b/Blockchain/Sources/Blockchain/Validator/SafroleService.swift index a58febf4..22d59a90 100644 --- a/Blockchain/Sources/Blockchain/Validator/SafroleService.swift +++ b/Blockchain/Sources/Blockchain/Validator/SafroleService.swift @@ -2,8 +2,6 @@ import Foundation import TracingUtils import Utils -private let logger = Logger(label: "SafroleService") - public struct TicketItemAndOutput: Comparable, Sendable, Codable { public let ticket: ExtrinsicTickets.TicketItem public let output: Data32 @@ -29,9 +27,9 @@ public final class SafroleService: ServiceBase, @unchecked Sendable { self.keystore = keystore ringContext = try! Bandersnatch.RingContext(size: UInt(config.value.totalNumberOfValidators)) - super.init(config, eventBus) + super.init(logger: Logger(label: "SafroleService"), config: config, eventBus: eventBus) - await subscribe(RuntimeEvents.BlockImported.self) { [weak self] event in + await subscribe(RuntimeEvents.BlockImported.self, id: "SafroleService.BlockImported") { [weak self] event in try await self?.on(blockImported: event) } } diff --git a/Blockchain/Sources/Blockchain/Validator/ServiceBase.swift b/Blockchain/Sources/Blockchain/Validator/ServiceBase.swift index 3bf5fc5c..84effa38 100644 --- a/Blockchain/Sources/Blockchain/Validator/ServiceBase.swift +++ b/Blockchain/Sources/Blockchain/Validator/ServiceBase.swift @@ -1,17 +1,30 @@ +import TracingUtils import Utils public class ServiceBase { + public let logger: Logger public let config: ProtocolConfigRef private let eventBus: EventBus private let subscriptionTokens: ThreadSafeContainer<[EventBus.SubscriptionToken]> = .init([]) - init(_ config: ProtocolConfigRef, _ eventBus: EventBus) { + init(logger: Logger, config: ProtocolConfigRef, eventBus: EventBus) { + self.logger = logger self.config = config self.eventBus = eventBus } + deinit { + let eventBus = self.eventBus + let subscriptionTokens = self.subscriptionTokens + Task { + for token in subscriptionTokens.value { + await eventBus.unsubscribe(token: token) + } + } + } + @discardableResult - func subscribe(_ eventType: T.Type, handler: @escaping @Sendable (T) async throws -> Void) async -> EventBus + func subscribe(_ eventType: T.Type, id _: UniqueId, handler: @escaping @Sendable (T) async throws -> Void) async -> EventBus .SubscriptionToken { let token = await eventBus.subscribe(eventType, handler: handler) @@ -29,14 +42,4 @@ public class ServiceBase { func publish(_ event: some Event) { eventBus.publish(event) } - - deinit { - let eventBus = self.eventBus - let subscriptionTokens = self.subscriptionTokens - Task { - for token in subscriptionTokens.value { - await eventBus.unsubscribe(token: token) - } - } - } } diff --git a/Blockchain/Sources/Blockchain/Validator/ServiceBase2.swift b/Blockchain/Sources/Blockchain/Validator/ServiceBase2.swift index a2dc6c55..3de4f700 100644 --- a/Blockchain/Sources/Blockchain/Validator/ServiceBase2.swift +++ b/Blockchain/Sources/Blockchain/Validator/ServiceBase2.swift @@ -1,5 +1,6 @@ import Atomics import Foundation +import TracingUtils import Utils private struct IdCancellable: Hashable, Sendable { @@ -19,9 +20,9 @@ public class ServiceBase2: ServiceBase, @unchecked Sendable { private let scheduler: Scheduler private let cancellables: ThreadSafeContainer> = .init([]) - public init(_ config: ProtocolConfigRef, _ eventBus: EventBus, _ scheduler: Scheduler) { + public init(logger: Logger, config: ProtocolConfigRef, eventBus: EventBus, scheduler: Scheduler) { self.scheduler = scheduler - super.init(config, eventBus) + super.init(logger: logger, config: config, eventBus: eventBus) } deinit { @@ -35,9 +36,12 @@ public class ServiceBase2: ServiceBase, @unchecked Sendable { } @discardableResult - public func schedule(id: UniqueId, delay: TimeInterval, repeats: Bool = false, - task: @escaping @Sendable () async -> Void) -> Cancellable - { + public func schedule( + id: UniqueId, + delay: TimeInterval, + repeats: Bool = false, + task: @escaping @Sendable () async -> Void + ) -> Cancellable { let cancellables = cancellables let cancellable = scheduler.schedule(id: id, delay: delay, repeats: repeats) { if !repeats { @@ -52,30 +56,29 @@ public class ServiceBase2: ServiceBase, @unchecked Sendable { } @discardableResult - public func schedule(id: UniqueId, at timeslot: TimeslotIndex, task: @escaping @Sendable () async -> Void) -> Cancellable { - let cancellables = cancellables - let cancellable = scheduler.schedule(id: id, at: timeslot) { - cancellables.write { $0.remove(IdCancellable(id: id, cancellable: nil)) } - await task() - } onCancel: { - cancellables.write { $0.remove(IdCancellable(id: id, cancellable: nil)) } - } - cancellables.write { $0.insert(IdCancellable(id: id, cancellable: cancellable)) } - return cancellable + public func scheduleForNextEpoch(_ id: UniqueId, fn: @escaping @Sendable (EpochIndex) async -> Void) -> Cancellable { + let now = timeProvider.getTimeInterval() + let nowTimeslot = UInt32(now).timeToTimeslot(config: config) + let nextEpoch = (nowTimeslot + 1).timeslotToEpochIndex(config: config) + 1 + return scheduleFor(epoch: nextEpoch, id: id, fn: fn) } @discardableResult - public func scheduleForNextEpoch(_ id: UniqueId, fn: @escaping @Sendable (TimeslotIndex) async -> Void) -> Cancellable { - let now = timeProvider.getTimeslot() - let nextEpoch = (now + 1).timeslotToEpochIndex(config: config) + 1 - let timeslot = nextEpoch.epochToTimeslotIndex(config: config) - - // at end of an epoch, try to determine the block author of next epoch - // and schedule new block task - return schedule(id: id, at: timeslot - 1) { [weak self] in + private func scheduleFor(epoch: EpochIndex, id: UniqueId, fn: @escaping @Sendable (EpochIndex) async -> Void) -> Cancellable { + let scheduleTime = config.scheduleTimeForPrepareEpoch(epoch: epoch) + let now = timeProvider.getTimeInterval() + let delay = scheduleTime - now + if delay < 0 { + // too late / current epoch is about to end + // schedule for the one after + logger.debug("\(id): skipping epoch \(epoch) because it is too late") + return scheduleFor(epoch: epoch + 1, id: id, fn: fn) + } + logger.trace("\(id): scheduling epoch \(epoch) in \(delay)") + return schedule(id: id, delay: delay) { [weak self] in if let self { - scheduleForNextEpoch(id, fn: fn) - await fn(timeslot) + scheduleFor(epoch: epoch + 1, id: id, fn: fn) + await fn(epoch) } } } diff --git a/Blockchain/Tests/BlockchainTests/BlockAuthorTests.swift b/Blockchain/Tests/BlockchainTests/BlockAuthorTests.swift index ac6c41e4..7861814b 100644 --- a/Blockchain/Tests/BlockchainTests/BlockAuthorTests.swift +++ b/Blockchain/Tests/BlockchainTests/BlockAuthorTests.swift @@ -20,7 +20,7 @@ struct BlockAuthorTests { // setupTestLogger() config = ProtocolConfigRef.dev - timeProvider = MockTimeProvider(slotPeriodSeconds: UInt32(config.value.slotPeriodSeconds), time: 1000) + timeProvider = MockTimeProvider(time: 988) dataProvider = try await BlockchainDataProvider(InMemoryDataProvider(genesis: StateRef(State.devGenesis(config: config)))) @@ -47,15 +47,17 @@ struct BlockAuthorTests { func createNewBlockWithFallbackKey() async throws { let genesisState = try await dataProvider.getState(hash: Data32()) + let timeslot = timeProvider.getTime().timeToTimeslot(config: config) + // get the validator key - let idx = scheduler.timeProvider.getTimeslot() % UInt32(config.value.totalNumberOfValidators) + let idx = timeslot % UInt32(config.value.totalNumberOfValidators) let devKey = try DevKeyStore.getDevKey(seed: idx) // Create a new block - let block = try await blockAuthor.createNewBlock(claim: .right(devKey.bandersnatch)) + let block = try await blockAuthor.createNewBlock(timeslot: timeslot, claim: .right(devKey.bandersnatch)) // Verify block - try _ = runtime.apply(block: block, state: genesisState, context: .init(timeslot: timeProvider.getTimeslot() + 1)) + try _ = runtime.apply(block: block, state: genesisState, context: .init(timeslot: timeslot + 1)) } @Test @@ -68,8 +70,10 @@ struct BlockAuthorTests { ctx: Bandersnatch.RingContext(size: UInt(config.value.totalNumberOfValidators)) ).data + let timeslot = timeProvider.getTime().timeToTimeslot(config: config) + // get the validator key - let idx = scheduler.timeProvider.getTimeslot() % UInt32(config.value.epochLength) + let idx = timeslot % UInt32(config.value.epochLength) let devKey = try DevKeyStore.getDevKey(seed: idx % UInt32(config.value.totalNumberOfValidators)) let secretKey = await keystore.get(Bandersnatch.self, publicKey: devKey.bandersnatch)! @@ -96,10 +100,10 @@ struct BlockAuthorTests { try await dataProvider.add(state: newStateRef) // Create a new block - let block = try await blockAuthor.createNewBlock(claim: .left((ticket, devKey.bandersnatch))) + let block = try await blockAuthor.createNewBlock(timeslot: timeslot, claim: .left((ticket, devKey.bandersnatch))) // Verify block - try _ = runtime.apply(block: block, state: newStateRef, context: .init(timeslot: timeProvider.getTimeslot() + 1)) + try _ = runtime.apply(block: block, state: newStateRef, context: .init(timeslot: timeslot + 1)) } @Test @@ -110,7 +114,7 @@ struct BlockAuthorTests { #expect(scheduler.storage.value.tasks.count > 0) - await scheduler.advance(by: 1) + await scheduler.advance(by: 2) let events = await storeMiddleware.wait() #expect(events.count == 1) @@ -118,8 +122,10 @@ struct BlockAuthorTests { let block = events.first as! RuntimeEvents.BlockAuthored + let timeslot = timeProvider.getTime().timeToTimeslot(config: config) + // Verify block - try _ = runtime.apply(block: block.block, state: genesisState, context: .init(timeslot: timeProvider.getTimeslot() + 1)) + try _ = runtime.apply(block: block.block, state: genesisState, context: .init(timeslot: timeslot + 1)) } // TODO: test including extrinsic tickets from extrinsic pool diff --git a/Blockchain/Tests/BlockchainTests/DispatchQueueSchedulerTests.swift b/Blockchain/Tests/BlockchainTests/DispatchQueueSchedulerTests.swift index 0cf89fd4..1084be1b 100644 --- a/Blockchain/Tests/BlockchainTests/DispatchQueueSchedulerTests.swift +++ b/Blockchain/Tests/BlockchainTests/DispatchQueueSchedulerTests.swift @@ -6,7 +6,7 @@ import Utils struct DispatchQueueSchedulerTests { let scheduler = DispatchQueueScheduler( - timeProvider: SystemTimeProvider(slotPeriodSeconds: 6), + timeProvider: SystemTimeProvider(), queue: .global(qos: .userInteractive) // to get higher priority so results are more deterministic ) diff --git a/Blockchain/Tests/BlockchainTests/ExtrinsicPoolServiceTests.swift b/Blockchain/Tests/BlockchainTests/ExtrinsicPoolServiceTests.swift index 07a61394..b3d7ae9b 100644 --- a/Blockchain/Tests/BlockchainTests/ExtrinsicPoolServiceTests.swift +++ b/Blockchain/Tests/BlockchainTests/ExtrinsicPoolServiceTests.swift @@ -19,7 +19,7 @@ struct ExtrinsicPoolServiceTests { config = ProtocolConfigRef.dev.mutate { config in config.ticketEntriesPerValidator = 4 } - timeProvider = MockTimeProvider(slotPeriodSeconds: UInt32(config.value.slotPeriodSeconds), time: 1000) + timeProvider = MockTimeProvider(time: 1000) dataProvider = try await BlockchainDataProvider(InMemoryDataProvider(genesis: StateRef(State.devGenesis(config: config)))) diff --git a/Blockchain/Tests/BlockchainTests/MockScheduler.swift b/Blockchain/Tests/BlockchainTests/MockScheduler.swift index 0ac87683..f116e5a3 100644 --- a/Blockchain/Tests/BlockchainTests/MockScheduler.swift +++ b/Blockchain/Tests/BlockchainTests/MockScheduler.swift @@ -5,14 +5,14 @@ import Utils final class SchedulerTask: Sendable, Comparable { let id: Int - let scheduleTime: UInt32 + let scheduleTime: TimeInterval let repeats: TimeInterval? let task: @Sendable () async -> Void let cancel: (@Sendable () -> Void)? init( id: Int, - scheduleTime: UInt32, + scheduleTime: TimeInterval, repeats: TimeInterval?, task: @escaping @Sendable () async -> Void, cancel: (@Sendable () -> Void)? @@ -57,8 +57,8 @@ final class MockScheduler: Scheduler, Sendable { task: @escaping @Sendable () async -> Void, onCancel: (@Sendable () -> Void)? ) -> Cancellable { - let now = timeProvider.getTime() - let scheduleTime = now + UInt32(delay) + let now = timeProvider.getTimeInterval() + let scheduleTime = now + delay let id = Self.idGenerator.loadThenWrappingIncrement(ordering: .relaxed) let task = SchedulerTask(id: id, scheduleTime: scheduleTime, repeats: repeats ? delay : nil, task: task, cancel: onCancel) storage.write { storage in @@ -74,12 +74,12 @@ final class MockScheduler: Scheduler, Sendable { } } - func advance(by interval: UInt32) async { - let to = timeProvider.getTime() + interval + func advance(by interval: TimeInterval) async { + let to = timeProvider.getTimeInterval() + interval while await advanceNext(to: to) {} } - func advanceNext(to time: UInt32) async -> Bool { + func advanceNext(to time: TimeInterval) async -> Bool { let task: SchedulerTask? = storage.mutate { storage in if let task = storage.tasks.array.first, task.scheduleTime <= time { storage.tasks.remove(at: 0) diff --git a/Blockchain/Tests/BlockchainTests/SafroleServiceTests.swift b/Blockchain/Tests/BlockchainTests/SafroleServiceTests.swift index 45c858d2..f0c4e71c 100644 --- a/Blockchain/Tests/BlockchainTests/SafroleServiceTests.swift +++ b/Blockchain/Tests/BlockchainTests/SafroleServiceTests.swift @@ -19,7 +19,7 @@ struct SafroleServiceTests { config = ProtocolConfigRef.dev.mutate { config in config.ticketEntriesPerValidator = 4 } - timeProvider = MockTimeProvider(slotPeriodSeconds: UInt32(config.value.slotPeriodSeconds), time: 1000) + timeProvider = MockTimeProvider(time: 1000) genesisState = try StateRef(State.devGenesis(config: config)) diff --git a/Blockchain/Tests/BlockchainTests/ValidatorServiceTests.swift b/Blockchain/Tests/BlockchainTests/ValidatorServiceTests.swift index bf148af5..a918a604 100644 --- a/Blockchain/Tests/BlockchainTests/ValidatorServiceTests.swift +++ b/Blockchain/Tests/BlockchainTests/ValidatorServiceTests.swift @@ -19,7 +19,7 @@ struct ValidatorServiceTests { // setupTestLogger() config = ProtocolConfigRef.dev - timeProvider = MockTimeProvider(slotPeriodSeconds: UInt32(config.value.slotPeriodSeconds), time: 1000) + timeProvider = MockTimeProvider(time: 988) dataProvider = try await BlockchainDataProvider(InMemoryDataProvider(genesis: StateRef(State.devGenesis(config: config)))) @@ -69,7 +69,7 @@ struct ValidatorServiceTests { await validatorService.on(genesis: genesisState) // Advance time to trigger block production - await scheduler.advance(by: UInt32(config.value.slotPeriodSeconds)) + await scheduler.advance(by: TimeInterval(config.value.slotPeriodSeconds)) let events = await storeMiddleware.wait() @@ -80,7 +80,8 @@ struct ValidatorServiceTests { let blockEvent = blockAuthoredEvent as! RuntimeEvents.BlockAuthored // Verify the produced block let block = blockEvent.block - #expect(block.header.timeslot == timeProvider.getTimeslot()) + // we produce block before the timeslot starts + #expect(block.header.timeslot == timeProvider.getTime().timeToTimeslot(config: config) + 1) #expect(block.header.parentHash == genesisState.value.lastBlockHash) // Check if the block author is one of the validators @@ -106,12 +107,12 @@ struct ValidatorServiceTests { await validatorService.on(genesis: genesisState) - await scheduler.advance(by: UInt32(config.value.slotPeriodSeconds) * 20) + await scheduler.advance(by: TimeInterval(config.value.slotPeriodSeconds) * 20) let events = await storeMiddleware.wait() let blockAuthoredEvents = events.filter { $0 is RuntimeEvents.BlockAuthored } - #expect(blockAuthoredEvents.count == 21) + #expect(blockAuthoredEvents.count == 20) } } diff --git a/Node/Sources/Node/Node.swift b/Node/Sources/Node/Node.swift index ff1532ef..d6a4b57b 100644 --- a/Node/Sources/Node/Node.swift +++ b/Node/Sources/Node/Node.swift @@ -30,7 +30,7 @@ public class Node { let (genesisState, protocolConfig) = try await genesis.load() dataProvider = try await BlockchainDataProvider(InMemoryDataProvider(genesis: genesisState)) - timeProvider = SystemTimeProvider(slotPeriodSeconds: UInt32(protocolConfig.value.slotPeriodSeconds)) + timeProvider = SystemTimeProvider() blockchain = try await Blockchain( config: protocolConfig, dataProvider: dataProvider, diff --git a/Utils/Sources/Utils/EventBus/EventBus.swift b/Utils/Sources/Utils/EventBus/EventBus.swift index ba05d8c5..9972ba38 100644 --- a/Utils/Sources/Utils/EventBus/EventBus.swift +++ b/Utils/Sources/Utils/EventBus/EventBus.swift @@ -5,10 +5,8 @@ import TracingUtils private let logger = Logger(label: "EventBus") public actor EventBus: Sendable { - private static let idGenerator = ManagedAtomic(0) - public struct SubscriptionToken: Hashable, Sendable { - fileprivate let id: Int + fileprivate let id: UniqueId fileprivate let eventTypeId: ObjectIdentifier public func hash(into hasher: inout Hasher) { @@ -46,9 +44,13 @@ public actor EventBus: Sendable { self.handlerMiddleware = handlerMiddleware } - public func subscribe(_ eventType: T.Type, handler: @escaping @Sendable (T) async throws -> Void) -> SubscriptionToken { + public func subscribe( + _ eventType: T.Type, + id: UniqueId = "", + handler: @escaping @Sendable (T) async throws -> Void + ) -> SubscriptionToken { let key = ObjectIdentifier(eventType) - let token = SubscriptionToken(id: EventBus.idGenerator.loadThenWrappingIncrement(ordering: .relaxed), eventTypeId: key) + let token = SubscriptionToken(id: id, eventTypeId: key) handlers[key, default: []].append(AnyEventHandler(token, handler)) @@ -83,8 +85,12 @@ public actor EventBus: Sendable { return } for handler in eventHandlers { - try await handlerMiddleware.handle(event) { evt in - try await handler.handle(evt) + do { + try await handlerMiddleware.handle(event) { evt in + try await handler.handle(evt) + } + } catch { + logger.warning("Unhandled error for event: \(event) with error: \(error) and handler: \(handler.token.id)") } } }