Skip to content

Commit

Permalink
fix: (0.55) post-upgrade txn detection in presence of pre-upgrade eve…
Browse files Browse the repository at this point in the history
…nts (#15835)

Signed-off-by: Michael Tinker <michael.tinker@swirldslabs.com>
  • Loading branch information
tinker-michaelj authored Oct 17, 2024
1 parent 769fe44 commit e635547
Show file tree
Hide file tree
Showing 12 changed files with 84 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -573,7 +573,8 @@ private List<StateChanges.Builder> onMigrate(
metrics));
kvStateChangeListener.reset();
boundaryStateChangeListener.reset();
if (isUpgrade && !trigger.equals(RECONNECT)) {
// For specifically a non-genesis upgrade, set in state that post-upgrade work is pending
if (isUpgrade && trigger != RECONNECT && trigger != GENESIS) {
unmarkMigrationRecordsStreamed(state);
migrationStateChanges.add(
StateChanges.newBuilder().stateChanges(boundaryStateChangeListener.allStateChanges()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ public static Bytes rootHashFrom(@NonNull final Status penultimateStatus, @NonNu
final var maxDepth = maxDepthFor(penultimateStatus.numLeaves() + 1);
for (int i = 0; i < maxDepth; i++) {
final var rightmostHash = penultimateStatus.rightmostHashes().get(i);
if (rightmostHash == Bytes.EMPTY) {
if (rightmostHash.length() == 0) {
hash = BlockImplUtils.combine(hash, HashCombiner.EMPTY_HASHES[i]);
} else {
hash = BlockImplUtils.combine(rightmostHash.toByteArray(), hash);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,9 +135,11 @@ public ResponseCodeEnum update(@NonNull final Bytes bytes) {
currentSchedule = FeeSchedule.DEFAULT;
}

// Populate the map of HederaFunctionality -> FeeData for the current schedule
this.currentFeeDataMap = new HashMap<>();
populateFeeDataMap(currentFeeDataMap, currentSchedule.transactionFeeSchedule());
// Populate the map of HederaFunctionality -> FeeData for the current schedule, but avoid mutating
// the active one in-place as other threads may be using it for ingest/query fee calculations
final var newCurrentFeeDataMap = new HashMap<Entry, FeeData>();
populateFeeDataMap(newCurrentFeeDataMap, currentSchedule.transactionFeeSchedule());
this.currentFeeDataMap = newCurrentFeeDataMap;

// Get the expiration time of the current schedule
if (currentSchedule.hasExpiryTime()) {
Expand All @@ -160,9 +162,11 @@ public ResponseCodeEnum update(@NonNull final Bytes bytes) {
logger.warn("Unable to parse next fee schedule, will default to the current fee schedule.");
nextFeeDataMap = new HashMap<>(currentFeeDataMap);
} else {
// Populate the map of HederaFunctionality -> FeeData for the current schedule
this.nextFeeDataMap = new HashMap<>();
populateFeeDataMap(nextFeeDataMap, nextSchedule.transactionFeeSchedule());
// Populate the map of HederaFunctionality -> FeeData for the next schedule, but avoid mutating
// the active one in-place as other threads may be using it for ingest/query fee calculations
final var newNextFeeDataMap = new HashMap<Entry, FeeData>();
populateFeeDataMap(newNextFeeDataMap, nextSchedule.transactionFeeSchedule());
this.nextFeeDataMap = newNextFeeDataMap;
}

return SUCCESS;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -361,11 +361,6 @@ public void advanceConsensusClock(@NonNull final Instant consensusTime, @NonNull
.consTimeOfLastHandledTxn(Timestamp.newBuilder()
.seconds(consensusTime.getEpochSecond())
.nanos(consensusTime.getNano()));
if (!this.lastBlockInfo.migrationRecordsStreamed()) {
// Any records created during migration should have been published already. Now we shut off the flag to
// disallow further publishing
builder.migrationRecordsStreamed(true);
}
final var newBlockInfo = builder.build();

// Update the latest block info in state
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ public void migrate(@NonNull final MigrationContext ctx) {
final var isGenesis = ctx.previousVersion() == null;
if (isGenesis) {
final var blocksState = ctx.newStates().getSingleton(BLOCK_INFO_STATE_KEY);
final var blocks = new BlockInfo(-1, EPOCH, Bytes.EMPTY, EPOCH, false, EPOCH);
// Note there is by convention no post-upgrade work to do if starting from genesis
final var blocks = new BlockInfo(-1, EPOCH, Bytes.EMPTY, EPOCH, true, EPOCH);
blocksState.put(blocks);
final var runningHashState = ctx.newStates().getSingleton(RUNNING_HASHES_STATE_KEY);
final var runningHashes =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,7 @@ private HandleOutput execute(@NonNull final UserTxn userTxn) {
final var blockStreamConfig = userTxn.config().getConfigData(BlockStreamConfig.class);
try {
if (isOlderSoftwareEvent(userTxn)) {
advanceConsensusClock(userTxn, blockStreamConfig);
initializeBuilderInfo(userTxn.baseBuilder(), userTxn.txnInfo(), exchangeRateManager.exchangeRates())
.status(BUSY);
// Flushes the BUSY builder to the stream, no other side effects
Expand All @@ -363,12 +364,12 @@ private HandleOutput execute(@NonNull final UserTxn userTxn) {
streamBuilder.exchangeRate(exchangeRateManager.exchangeRates());
userTxn.stack().commitTransaction(streamBuilder);
}
if (blockStreamConfig.streamRecords()) {
blockRecordManager.markMigrationRecordsStreamed();
}
}
updateNodeStakes(userTxn);
if (blockStreamConfig.streamRecords()) {
blockRecordManager.advanceConsensusClock(userTxn.consensusNow(), userTxn.state());
}
expireSchedules(userTxn);
advanceConsensusClock(userTxn, blockStreamConfig);
logPreDispatch(userTxn);
final var dispatch = dispatchFor(userTxn, blockStreamConfig);
if (userTxn.type() == GENESIS_TRANSACTION) {
Expand Down Expand Up @@ -397,6 +398,20 @@ private HandleOutput execute(@NonNull final UserTxn userTxn) {
}
}

/**
* Advances the consensus clock in state when streaming records; also expires any schedules.
* @param userTxn the user transaction
* @param blockStreamConfig the block stream configuration
*/
private void advanceConsensusClock(
@NonNull final UserTxn userTxn, @NonNull final BlockStreamConfig blockStreamConfig) {
if (blockStreamConfig.streamRecords()) {
// For POST_UPGRADE_TRANSACTION, also commits to state that the post-upgrade work is done
blockRecordManager.advanceConsensusClock(userTxn.consensusNow(), userTxn.state());
}
expireSchedules(userTxn);
}

/**
* Returns a stream of a single {@link ResponseCodeEnum#FAIL_INVALID} record
* for the given user transaction.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import com.hedera.hapi.node.base.HederaFunctionality;
import com.hedera.hapi.node.base.Key;
import com.hedera.hapi.node.state.blockrecords.BlockInfo;
import com.hedera.hapi.platform.state.PlatformState;
import com.hedera.node.app.blocks.impl.BoundaryStateChangeListener;
import com.hedera.node.app.blocks.impl.KVStateChangeListener;
import com.hedera.node.app.fees.ExchangeRateManager;
Expand Down Expand Up @@ -69,8 +68,6 @@
import com.hedera.node.config.data.ConsensusConfig;
import com.hedera.node.config.data.HederaConfig;
import com.swirlds.config.api.Configuration;
import com.swirlds.platform.state.service.PlatformStateService;
import com.swirlds.platform.state.service.schemas.V0540PlatformStateSchema;
import com.swirlds.platform.system.events.ConsensusEvent;
import com.swirlds.platform.system.transaction.ConsensusTransaction;
import com.swirlds.state.State;
Expand Down Expand Up @@ -255,6 +252,7 @@ public Dispatch newDispatch(

/**
* Returns the base stream builder for this user transaction.
*
* @return the base stream builder
*/
public StreamBuilder baseBuilder() {
Expand All @@ -263,24 +261,14 @@ public StreamBuilder baseBuilder() {

/**
* Returns whether the given state indicates this transaction is the first after an upgrade.
*
* @param state the Hedera state
* @return whether the given state indicates this transaction is the first after an upgrade
*/
private static boolean isUpgradeBoundary(@NonNull final State state) {
final var platformState = state.getReadableStates(PlatformStateService.NAME)
.<PlatformState>getSingleton(V0540PlatformStateSchema.PLATFORM_STATE_KEY)
final var blockInfo = state.getReadableStates(BlockRecordService.NAME)
.<BlockInfo>getSingleton(BLOCK_INFO_STATE_KEY)
.get();
requireNonNull(platformState);
if (platformState.freezeTime() == null
|| !platformState.freezeTimeOrThrow().equals(platformState.lastFrozenTime())) {
return false;
} else {
// Check the state directly here instead of going through BlockManager to allow us
// to manipulate this condition easily in embedded tests
final var blockInfo = state.getReadableStates(BlockRecordService.NAME)
.<BlockInfo>getSingleton(BLOCK_INFO_STATE_KEY)
.get();
return !requireNonNull(blockInfo).migrationRecordsStreamed();
}
return !requireNonNull(blockInfo).migrationRecordsStreamed();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ void testRegisterSchemas() {
assertEquals(
new RunningHashes(GENESIS_HASH, Bytes.EMPTY, Bytes.EMPTY, Bytes.EMPTY),
runningHashesCapture.getValue());
assertEquals(new BlockInfo(-1, EPOCH, Bytes.EMPTY, EPOCH, false, EPOCH), blockInfoCapture.getValue());
assertEquals(new BlockInfo(-1, EPOCH, Bytes.EMPTY, EPOCH, true, EPOCH), blockInfoCapture.getValue());
} else {
assertThat(schema).isInstanceOf(V0540BlockRecordSchema.class);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ private static class ProblemTracker {
List.of("Interrupted while waiting for signature verification"),
List.of("Could not start TLS server, will continue without it"),
List.of("Properties file", "does not exist and won't be used as configuration source"),
// Using a 1-minute staking period in CI can lead to periods with no transactions, breaking invariants
List.of("StakingRewardsHelper", "Pending rewards decreased"),
List.of("Throttle multiplier for CryptoTransfer throughput congestion has no throttle buckets"));

private int numProblems = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,9 @@
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -565,6 +567,36 @@ public static WaitForStatusOp waitForFrozenNetwork(@NonNull final Duration timeo
return new WaitForStatusOp(NodeSelector.allNodes(), FREEZE_COMPLETE, timeout);
}

/**
* Returns an operation that initiates background traffic running until the target network's
* first node has reached {@link com.swirlds.platform.system.status.PlatformStatus#FREEZE_COMPLETE}.
* @return the operation
*/
public static SpecOperation runBackgroundTrafficUntilFreezeComplete() {
return withOpContext((spec, opLog) -> {
opLog.info("Starting background traffic until freeze complete");
final var stopTraffic = new AtomicBoolean();
CompletableFuture.runAsync(() -> {
while (!stopTraffic.get()) {
allRunFor(
spec,
cryptoTransfer(tinyBarsFromTo(GENESIS, STAKING_REWARD, 1))
.deferStatusResolution()
.hasAnyStatusAtAll()
.noLogging());
}
});
spec.targetNetworkOrThrow()
.nodes()
.getFirst()
.statusFuture(FREEZE_COMPLETE, (status) -> {})
.thenRun(() -> {
stopTraffic.set(true);
opLog.info("Stopping background traffic after freeze complete");
});
});
}

public static HapiSpecSleep sleepFor(long timeMs) {
return new HapiSpecSleep(timeMs);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package com.hedera.services.bdd.spec.utilops.lifecycle.ops;

import static com.hedera.services.bdd.junit.hedera.ExternalPath.UPGRADE_ARTIFACTS_DIR;
import static com.hedera.services.bdd.junit.hedera.utils.WorkingDirUtils.CONFIG_TXT;
import static com.hedera.services.bdd.junit.hedera.utils.WorkingDirUtils.guaranteedExtantDir;
import static com.hedera.services.bdd.junit.hedera.utils.WorkingDirUtils.rm;

Expand All @@ -33,8 +32,7 @@ public PurgeUpgradeArtifactsOp(@NonNull NodeSelector selector) {

@Override
protected void run(@NonNull final HederaNode node) {
final var upgradeArtifactsLoc =
node.getExternalPath(UPGRADE_ARTIFACTS_DIR).resolve(CONFIG_TXT);
final var upgradeArtifactsLoc = node.getExternalPath(UPGRADE_ARTIFACTS_DIR);
rm(upgradeArtifactsLoc);
guaranteedExtantDir(upgradeArtifactsLoc);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,19 @@

import static com.hedera.services.bdd.junit.hedera.MarkerFile.EXEC_IMMEDIATE_MF;
import static com.hedera.services.bdd.spec.queries.QueryVerbs.getVersionInfo;
import static com.hedera.services.bdd.spec.transactions.TxnVerbs.cryptoCreate;
import static com.hedera.services.bdd.spec.transactions.TxnVerbs.cryptoTransfer;
import static com.hedera.services.bdd.spec.transactions.crypto.HapiCryptoTransfer.tinyBarsFromTo;
import static com.hedera.services.bdd.spec.utilops.UtilVerbs.blockingOrder;
import static com.hedera.services.bdd.spec.utilops.UtilVerbs.buildUpgradeZipFrom;
import static com.hedera.services.bdd.spec.utilops.UtilVerbs.doAdhoc;
import static com.hedera.services.bdd.spec.utilops.UtilVerbs.doingContextual;
import static com.hedera.services.bdd.spec.utilops.UtilVerbs.freezeOnly;
import static com.hedera.services.bdd.spec.utilops.UtilVerbs.freezeUpgrade;
import static com.hedera.services.bdd.spec.utilops.UtilVerbs.noOp;
import static com.hedera.services.bdd.spec.utilops.UtilVerbs.prepareUpgrade;
import static com.hedera.services.bdd.spec.utilops.UtilVerbs.purgeUpgradeArtifacts;
import static com.hedera.services.bdd.spec.utilops.UtilVerbs.runBackgroundTrafficUntilFreezeComplete;
import static com.hedera.services.bdd.spec.utilops.UtilVerbs.sourcing;
import static com.hedera.services.bdd.spec.utilops.UtilVerbs.updateSpecialFile;
import static com.hedera.services.bdd.spec.utilops.UtilVerbs.waitForActive;
Expand Down Expand Up @@ -175,6 +178,7 @@ default SpecOperation upgradeToNextConfigVersion(@NonNull final SpecOperation pr
default HapiSpecOperation upgradeToConfigVersion(final int version, @NonNull final SpecOperation preRestartOp) {
requireNonNull(preRestartOp);
return blockingOrder(
runBackgroundTrafficUntilFreezeComplete(),
sourcing(() -> freezeUpgrade()
.startingIn(2)
.seconds()
Expand All @@ -184,7 +188,11 @@ default HapiSpecOperation upgradeToConfigVersion(final int version, @NonNull fin
preRestartOp,
FakeNmt.restartNetwork(version),
doAdhoc(() -> CURRENT_CONFIG_VERSION.set(version)),
waitForActiveNetwork(RESTART_TIMEOUT));
waitForActiveNetwork(RESTART_TIMEOUT),
cryptoCreate("postUpgradeAccount"),
// Ensure we have a post-upgrade transaction in a new period to trigger
// system file exports while still streaming records
doingContextual(TxnUtils::triggerAndCloseAtLeastOneFileIfNotInterrupted));
}

/**
Expand Down

0 comments on commit e635547

Please sign in to comment.