Skip to content

Commit

Permalink
Merge branch 'master' into dev_actor_adjust
Browse files Browse the repository at this point in the history
* master:
  able to custimize timing details for services (#154)
  add handler id for event bus (#149)
  • Loading branch information
MacOMNI committed Oct 8, 2024
2 parents ce8f542 + 2f1e652 commit 0195852
Show file tree
Hide file tree
Showing 20 changed files with 208 additions and 166 deletions.
11 changes: 5 additions & 6 deletions Blockchain/Sources/Blockchain/Blockchain.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ import Foundation
import TracingUtils
import Utils

private let logger = Logger(label: "Blockchain")

public final class Blockchain: ServiceBase, @unchecked Sendable {
public let dataProvider: BlockchainDataProvider
public let timeProvider: TimeProvider
Expand All @@ -17,9 +15,9 @@ public final class Blockchain: ServiceBase, @unchecked Sendable {
self.dataProvider = dataProvider
self.timeProvider = timeProvider

super.init(config, eventBus)
super.init(logger: Logger(label: "Blockchain"), config: config, eventBus: eventBus)

await subscribe(RuntimeEvents.BlockAuthored.self) { [weak self] event in
await subscribe(RuntimeEvents.BlockAuthored.self, id: "Blockchain.BlockAuthored") { [weak self] event in
try await self?.on(blockAuthored: event)
}
}
Expand All @@ -36,8 +34,9 @@ public final class Blockchain: ServiceBase, @unchecked Sendable {

let runtime = Runtime(config: config)
let parent = try await dataProvider.getState(hash: block.header.parentHash)
let timeslot = timeProvider.getTimeslot()
let state = try runtime.apply(block: block, state: parent, context: .init(timeslot: timeslot))
let timeslot = timeProvider.getTime().timeToTimeslot(config: config)
// TODO: figure out what is the best way to deal with block received a bit too early
let state = try runtime.apply(block: block, state: parent, context: .init(timeslot: timeslot + 1))

try await dataProvider.blockImported(block: block, state: state)

Expand Down
38 changes: 38 additions & 0 deletions Blockchain/Sources/Blockchain/Config/ProtocolConfig+Timing.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import Foundation
import Utils

extension ProtocolConfigRef {
// TODO: determine appropriate values
public var authoringStartTimeDelta: TimeInterval {
-TimeInterval(value.slotPeriodSeconds)
}

public var authoringDeadline: TimeInterval {
TimeInterval(value.slotPeriodSeconds) / 3
}

public var guaranteeingStartTimeDelta: TimeInterval {
authoringStartTimeDelta - TimeInterval(value.slotPeriodSeconds)
}

public var guaranteeingDeadline: TimeInterval {
TimeInterval(value.slotPeriodSeconds) / 3 * 2
}

public var prepareEpochStartTimeDelta: TimeInterval {
-TimeInterval(value.slotPeriodSeconds) * 3
}

public func scheduleTimeForAuthoring(timeslot: TimeslotIndex) -> TimeInterval {
TimeInterval(timeslot.timeslotToTime(config: self)) + authoringStartTimeDelta
}

public func scheduleTimeForGuaranteeing(timeslot: TimeslotIndex) -> TimeInterval {
TimeInterval(timeslot.timeslotToTime(config: self)) + guaranteeingStartTimeDelta
}

public func scheduleTimeForPrepareEpoch(epoch: EpochIndex) -> TimeInterval {
let timeslot = epoch.epochToTimeslotIndex(config: self)
return TimeInterval(timeslot.timeslotToTime(config: self)) + prepareEpochStartTimeDelta
}
}
4 changes: 2 additions & 2 deletions Blockchain/Sources/Blockchain/Scheduler/Date+Extension.swift
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ extension Date {
1_704_110_400
}

public var timeIntervalSinceJamCommonEra: UInt32 {
public var timeIntervalSinceJamCommonEra: TimeInterval {
let beginning = Double(Date.jamCommonEraBeginning)
let now = timeIntervalSince1970
return UInt32(now - beginning)
return now - beginning
}
}
30 changes: 7 additions & 23 deletions Blockchain/Sources/Blockchain/Scheduler/Scheduler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -42,36 +42,20 @@ extension Scheduler {
task: @escaping @Sendable () async -> Void,
onCancel: (@Sendable () -> Void)? = nil
) -> Cancellable {
logger.trace("scheduling task: \(id)", metadata: ["delay": "\(delay)", "repeats": "\(repeats)"])
guard delay >= 0 else {
logger.error("scheduling task with negative delay", metadata: ["id": "\(id)", "delay": "\(delay)", "repeats": "\(repeats)"])
return Cancellable {}
}
logger.trace("scheduling task", metadata: ["id": "\(id)", "delay": "\(delay)", "repeats": "\(repeats)"])
let cancellable = scheduleImpl(delay: delay, repeats: repeats, task: {
logger.trace("executing task: \(id)")
logger.trace("executing task", metadata: ["id": "\(id)"])
await task()
}, onCancel: onCancel)
return Cancellable {
logger.trace("cancelling task: \(id)")
logger.trace("cancelling task", metadata: ["id": "\(id)"])
cancellable.cancel()
}
}

public func schedule(
id: UniqueId = "",
at timeslot: TimeslotIndex,
task: @escaping @Sendable () async -> Void,
onCancel: (@Sendable () -> Void)? = nil
) -> Cancellable {
let nowTimeslot = timeProvider.getTimeslot()
if timeslot == nowTimeslot {
return schedule(id: id, delay: 0, repeats: false, task: task, onCancel: onCancel)
}

let deadline = timeProvider.timeslotToTime(timeslot)
let now = timeProvider.getTime()
if deadline < now {
logger.error("scheduling task in the past", metadata: ["deadline": "\(deadline)", "now": "\(now)"])
return Cancellable {}
}
return schedule(id: id, delay: TimeInterval(deadline - now), repeats: false, task: task, onCancel: onCancel)
}
}

extension Scheduler {
Expand Down
36 changes: 10 additions & 26 deletions Blockchain/Sources/Blockchain/Scheduler/TimeProvider.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2,55 +2,39 @@ import Foundation
import Utils

public protocol TimeProvider: Sendable {
var slotPeriodSeconds: UInt32 { get }

func getTime() -> UInt32
func getTimeInterval() -> TimeInterval
}

extension TimeProvider {
public func getTimeslot() -> TimeslotIndex {
timeToTimeslot(getTime())
}

public func timeslotToTime(_ timeslot: TimeslotIndex) -> UInt32 {
timeslot * slotPeriodSeconds
}

public func timeToTimeslot(_ time: UInt32) -> TimeslotIndex {
time / slotPeriodSeconds
public func getTime() -> UInt32 {
UInt32(getTimeInterval())
}
}

public final class SystemTimeProvider: TimeProvider {
public let slotPeriodSeconds: UInt32

public init(slotPeriodSeconds: UInt32) {
self.slotPeriodSeconds = slotPeriodSeconds
}
public init() {}

public func getTime() -> UInt32 {
public func getTimeInterval() -> TimeInterval {
Date().timeIntervalSinceJamCommonEra
}
}

public final class MockTimeProvider: TimeProvider {
public let slotPeriodSeconds: UInt32
public let time: ThreadSafeContainer<UInt32>
public let time: ThreadSafeContainer<TimeInterval>

public init(slotPeriodSeconds: UInt32, time: UInt32 = 0) {
self.slotPeriodSeconds = slotPeriodSeconds
public init(time: TimeInterval = 0) {
self.time = ThreadSafeContainer(time)
}

public func getTime() -> UInt32 {
public func getTimeInterval() -> TimeInterval {
time.value
}

public func advance(by interval: UInt32) {
public func advance(by interval: TimeInterval) {
time.write { $0 += interval }
}

public func advance(to: UInt32) {
public func advance(to: TimeInterval) {
time.value = to
}
}
14 changes: 10 additions & 4 deletions Blockchain/Sources/Blockchain/Types/primitives.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import Utils

public typealias Balance = Utils.Balance
public typealias ServiceIndex = UInt32
public typealias TimeslotIndex = UInt32
public typealias TimeslotIndex = UInt32 // TODO: use new type
public typealias Gas = Utils.Gas
public typealias DataLength = UInt32

Expand All @@ -19,14 +19,20 @@ public typealias BandersnatchRingVRFProof = Data784
public typealias BandersnatchRingVRFRoot = Data144
public typealias BLSKey = Data144

extension TimeslotIndex {
extension UInt32 {
public func timeslotToEpochIndex(config: ProtocolConfigRef) -> EpochIndex {
self / EpochIndex(config.value.epochLength)
}
}

extension EpochIndex {
public func epochToTimeslotIndex(config: ProtocolConfigRef) -> TimeslotIndex {
self * TimeslotIndex(config.value.epochLength)
}

public func timeslotToTime(config: ProtocolConfigRef) -> UInt32 {
self * UInt32(config.value.slotPeriodSeconds)
}

public func timeToTimeslot(config: ProtocolConfigRef) -> UInt32 {
self / UInt32(config.value.slotPeriodSeconds)
}
}
55 changes: 34 additions & 21 deletions Blockchain/Sources/Blockchain/Validator/BlockAuthor.swift
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ import Foundation
import TracingUtils
import Utils

private let logger = Logger(label: "BlockAuthor")

public final class BlockAuthor: ServiceBase2, @unchecked Sendable {
private let dataProvider: BlockchainDataProvider
private let keystore: KeyStore
Expand All @@ -24,27 +22,32 @@ public final class BlockAuthor: ServiceBase2, @unchecked Sendable {
self.keystore = keystore
self.extrinsicPool = extrinsicPool

super.init(config, eventBus, scheduler)
super.init(logger: Logger(label: "BlockAuthor"), config: config, eventBus: eventBus, scheduler: scheduler)

await subscribe(RuntimeEvents.SafroleTicketsGenerated.self) { [weak self] event in
await subscribe(RuntimeEvents.SafroleTicketsGenerated.self, id: "BlockAuthor.SafroleTicketsGenerated") { [weak self] event in
try await self?.on(safroleTicketsGenerated: event)
}

scheduleForNextEpoch("BlockAuthor.scheduleForNextEpoch") { [weak self] timeslot in
await self?.onBeforeEpoch(timeslot: timeslot)
scheduleForNextEpoch("BlockAuthor.scheduleForNextEpoch") { [weak self] epoch in
await self?.onBeforeEpoch(epoch: epoch)
}
}

public func on(genesis state: StateRef) async {
await scheduleNewBlocks(ticketsOrKeys: state.value.safroleState.ticketsOrKeys, timeslot: timeProvider.getTimeslot())
await scheduleNewBlocks(
ticketsOrKeys: state.value.safroleState.ticketsOrKeys,
timeslot: timeProvider.getTime().timeToTimeslot(config: config)
)
}

public func createNewBlock(claim: Either<(TicketItemAndOutput, Bandersnatch.PublicKey), Bandersnatch.PublicKey>) async throws
public func createNewBlock(
timeslot: TimeslotIndex,
claim: Either<(TicketItemAndOutput, Bandersnatch.PublicKey), Bandersnatch.PublicKey>
) async throws
-> BlockRef
{
let parentHash = dataProvider.bestHead
let state = try await dataProvider.getState(hash: parentHash)
let timeslot = timeProvider.getTimeslot()
let epoch = timeslot.timeslotToEpochIndex(config: config)

let pendingTickets = await extrinsicPool.getPendingTickets(epoch: epoch)
Expand Down Expand Up @@ -146,9 +149,13 @@ public final class BlockAuthor: ServiceBase2, @unchecked Sendable {
return BlockRef(block)
}

private func newBlock(claim: Either<(TicketItemAndOutput, Bandersnatch.PublicKey), Bandersnatch.PublicKey>) async {
private func newBlock(
timeslot: TimeslotIndex,
claim: Either<(TicketItemAndOutput, Bandersnatch.PublicKey), Bandersnatch.PublicKey>
) async {
await withSpan("BlockAuthor.newBlock", logger: logger) { _ in
let block = try await createNewBlock(claim: claim)
// TODO: add timeout
let block = try await createNewBlock(timeslot: timeslot, claim: claim)
logger.info("New block created: #\(block.header.timeslot) \(block.hash)")
publish(RuntimeEvents.BlockAuthored(block: block))
}
Expand All @@ -158,10 +165,11 @@ public final class BlockAuthor: ServiceBase2, @unchecked Sendable {
tickets.write { $0.append(event) }
}

private func onBeforeEpoch(timeslot: TimeslotIndex) async {
logger.debug("scheduling new blocks for epoch \(timeslot.timeslotToEpochIndex(config: config))")
private func onBeforeEpoch(epoch: EpochIndex) async {
logger.debug("scheduling new blocks for epoch \(epoch)")
await withSpan("BlockAuthor.onBeforeEpoch", logger: logger) { _ in
tickets.value = []
let timeslot = epoch.epochToTimeslotIndex(config: config)

let bestHead = dataProvider.bestHead
let state = try await dataProvider.getState(hash: bestHead)
Expand All @@ -179,10 +187,11 @@ public final class BlockAuthor: ServiceBase2, @unchecked Sendable {
}
}

private func scheduleNewBlocks(ticketsOrKeys: SafroleTicketsOrKeys, timeslot now: TimeslotIndex) async {
private func scheduleNewBlocks(ticketsOrKeys: SafroleTicketsOrKeys, timeslot base: TimeslotIndex) async {
let selfTickets = tickets.value
let epochBase = now.timeslotToEpochIndex(config: config)
let epochBase = base.timeslotToEpochIndex(config: config)
let timeslotBase = epochBase.epochToTimeslotIndex(config: config)
let now = timeProvider.getTimeInterval()
switch ticketsOrKeys {
case let .left(tickets):
if selfTickets.isEmpty {
Expand All @@ -191,13 +200,15 @@ public final class BlockAuthor: ServiceBase2, @unchecked Sendable {
for (idx, ticket) in tickets.enumerated() {
if let claim = selfTickets.first(withOutput: ticket.id) {
let timeslot = timeslotBase + TimeslotIndex(idx)
if timeslot <= now {
let time = config.scheduleTimeForAuthoring(timeslot: timeslot)
let delay = time - now
if delay < 0 {
continue
}
logger.debug("Scheduling new block task at timeslot \(timeslot)")
schedule(id: "BlockAuthor.newBlock", at: timeslot) { [weak self] in
schedule(id: "BlockAuthor.newBlock", delay: delay) { [weak self] in
if let self {
await newBlock(claim: .left(claim))
await newBlock(timeslot: timeslot, claim: .left(claim))
}
}
}
Expand All @@ -207,13 +218,15 @@ public final class BlockAuthor: ServiceBase2, @unchecked Sendable {
let pubkey = try? Bandersnatch.PublicKey(data: key)
if let pubkey, await keystore.contains(publicKey: pubkey) {
let timeslot = timeslotBase + TimeslotIndex(idx)
if timeslot < now {
let time = config.scheduleTimeForAuthoring(timeslot: timeslot)
let delay = time - now
if delay < 0 {
continue
}
logger.debug("Scheduling new block task at timeslot \(timeslot)")
schedule(id: "BlockAuthor.newBlock", at: timeslot) { [weak self] in
schedule(id: "BlockAuthor.newBlock", delay: delay) { [weak self] in
if let self {
await newBlock(claim: .right(pubkey))
await newBlock(timeslot: timeslot, claim: .right(pubkey))
}
}
}
Expand Down
Loading

0 comments on commit 0195852

Please sign in to comment.