Skip to content

Commit

Permalink
Fix some mark sweep pruner bugs where nodes that should be kept were …
Browse files Browse the repository at this point in the history
…being swept (Re-merge of hyperledger#38) (hyperledger#50)

Adds integration tests to catch bug where the mark storage was being cleared before a mark began. Instead, the mark storage is now cleared upon preparing the mark sweep pruner

Fixes bug where the pending marks where not being checked while pruning was occurring. By removing the flush in sweepBefore, the existing tests catch the bug.

Signed-off-by: Ratan Rai Sur <ratan.r.sur@gmail.com>
Signed-off-by: edwardmack <ed@edwardmack.com>
  • Loading branch information
RatanRSur authored and edwardmack committed Nov 4, 2019
1 parent 497e825 commit d94cfdd
Show file tree
Hide file tree
Showing 6 changed files with 324 additions and 135 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,255 @@
/*
* Copyright ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.worldstate;

import static org.assertj.core.api.Assertions.assertThat;
import static org.hyperledger.besu.ethereum.core.InMemoryStorageProvider.createInMemoryBlockchain;

import org.hyperledger.besu.ethereum.chain.MutableBlockchain;
import org.hyperledger.besu.ethereum.core.Block;
import org.hyperledger.besu.ethereum.core.BlockDataGenerator;
import org.hyperledger.besu.ethereum.core.BlockDataGenerator.BlockOptions;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.Hash;
import org.hyperledger.besu.ethereum.core.MutableWorldState;
import org.hyperledger.besu.ethereum.core.TransactionReceipt;
import org.hyperledger.besu.ethereum.core.WorldState;
import org.hyperledger.besu.ethereum.rlp.RLP;
import org.hyperledger.besu.ethereum.storage.keyvalue.WorldStateKeyValueStorage;
import org.hyperledger.besu.ethereum.storage.keyvalue.WorldStatePreimageKeyValueStorage;
import org.hyperledger.besu.ethereum.trie.MerklePatriciaTrie;
import org.hyperledger.besu.ethereum.trie.StoredMerklePatriciaTrie;
import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem;
import org.hyperledger.besu.services.kvstore.InMemoryKeyValueStorage;
import org.hyperledger.besu.testutil.MockExecutorService;
import org.hyperledger.besu.util.bytes.Bytes32;
import org.hyperledger.besu.util.bytes.BytesValue;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;

import org.junit.Test;

public class PrunerIntegrationTest {

private final BlockDataGenerator gen = new BlockDataGenerator();
private final NoOpMetricsSystem metricsSystem = new NoOpMetricsSystem();
private final Map<BytesValue, byte[]> hashValueStore = new HashMap<>();
private final InMemoryKeyValueStorage stateStorage = new TestInMemoryStorage(hashValueStore);
private final WorldStateStorage worldStateStorage = new WorldStateKeyValueStorage(stateStorage);
private final WorldStateArchive worldStateArchive =
new WorldStateArchive(
worldStateStorage, new WorldStatePreimageKeyValueStorage(new InMemoryKeyValueStorage()));
private final InMemoryKeyValueStorage markStorage = new InMemoryKeyValueStorage();
private final Block genesisBlock = gen.genesisBlock();
private final MutableBlockchain blockchain = createInMemoryBlockchain(genesisBlock);

@Test
public void pruner_smallState_manyOpsPerTx() throws InterruptedException {
testPruner(3, 1, 1, 4, 1000);
}

@Test
public void pruner_largeState_fewOpsPerTx() throws InterruptedException {
testPruner(2, 5, 5, 6, 5);
}

@Test
public void pruner_emptyBlocks() throws InterruptedException {
testPruner(5, 0, 2, 5, 10);
}

@Test
public void pruner_markChainhead() throws InterruptedException {
testPruner(4, 2, 1, 10, 20);
}

@Test
public void pruner_lowRelativeBlockConfirmations() throws InterruptedException {
testPruner(3, 2, 1, 4, 20);
}

@Test
public void pruner_highRelativeBlockConfirmations() throws InterruptedException {
testPruner(3, 2, 9, 10, 20);
}

private void testPruner(
final int numCycles,
final int accountsPerBlock,
final long blockConfirmations,
final int numBlocksToKeep,
final int opsPerTransaction)
throws InterruptedException {

final var markSweepPruner =
new MarkSweepPruner(
worldStateStorage, blockchain, markStorage, metricsSystem, opsPerTransaction);
final var pruner =
new Pruner(
markSweepPruner,
blockchain,
new MockExecutorService(),
new PruningConfiguration(blockConfirmations, numBlocksToKeep));

pruner.start();

for (int cycle = 0; cycle < numCycles; ++cycle) {
int numBlockInCycle =
numBlocksToKeep
+ 1; // +1 to get it to switch from MARKING_COMPLETE TO SWEEPING on each cycle
var fullyMarkedBlockNum = cycle * numBlockInCycle + 1;

// This should cause a full mark and sweep cycle
assertThat(pruner.getState()).isEqualByComparingTo(Pruner.State.IDLE);
generateBlockchainData(numBlockInCycle, accountsPerBlock);
assertThat(pruner.getState()).isEqualByComparingTo(Pruner.State.IDLE);

// Collect the nodes we expect to keep
final Set<BytesValue> expectedNodes = new HashSet<>();
for (int i = fullyMarkedBlockNum; i <= blockchain.getChainHeadBlockNumber(); i++) {
final Hash stateRoot = blockchain.getBlockHeader(i).get().getStateRoot();
collectWorldStateNodes(stateRoot, expectedNodes);
}

if (accountsPerBlock != 0) {
assertThat(hashValueStore.size())
.isGreaterThanOrEqualTo(expectedNodes.size()); // Sanity check
}

// Assert that blocks from mark point onward are still accessible
for (int i = fullyMarkedBlockNum; i <= blockchain.getChainHeadBlockNumber(); i++) {
final Hash stateRoot = blockchain.getBlockHeader(i).get().getStateRoot();
assertThat(worldStateArchive.get(stateRoot)).isPresent();
final WorldState markedState = worldStateArchive.get(stateRoot).get();
// Traverse accounts and make sure all are accessible
final int expectedAccounts = accountsPerBlock * i;
final long accounts =
markedState.streamAccounts(Bytes32.ZERO, expectedAccounts * 2).count();
assertThat(accounts).isEqualTo(expectedAccounts);
// Traverse storage to ensure that all storage is accessible
markedState
.streamAccounts(Bytes32.ZERO, expectedAccounts * 2)
.forEach(a -> a.storageEntriesFrom(Bytes32.ZERO, 1000));
}

// All other state roots should have been removed
for (int i = 0; i < fullyMarkedBlockNum; i++) {
final BlockHeader curHeader = blockchain.getBlockHeader(i).get();
if (!curHeader.getStateRoot().equals(Hash.EMPTY_TRIE_HASH)) {
assertThat(worldStateArchive.get(curHeader.getStateRoot())).isEmpty();
}
}

// Check that storage contains only the values we expect
assertThat(hashValueStore.size()).isEqualTo(expectedNodes.size());
assertThat(hashValueStore.values())
.containsExactlyInAnyOrderElementsOf(
expectedNodes.stream().map(BytesValue::getArrayUnsafe).collect(Collectors.toSet()));
}

pruner.stop();
}

private void generateBlockchainData(final int numBlocks, final int numAccounts) {
Block parentBlock = blockchain.getChainHeadBlock();
for (int i = 0; i < numBlocks; i++) {
final MutableWorldState worldState =
worldStateArchive.getMutable(parentBlock.getHeader().getStateRoot()).get();
gen.createRandomContractAccountsWithNonEmptyStorage(worldState, numAccounts);
final Hash stateRoot = worldState.rootHash();

final Block block =
gen.block(
BlockOptions.create()
.setStateRoot(stateRoot)
.setBlockNumber(parentBlock.getHeader().getNumber() + 1L)
.setParentHash(parentBlock.getHash()));
final List<TransactionReceipt> receipts = gen.receipts(block);
blockchain.appendBlock(block, receipts);
parentBlock = block;
}
}

private Set<BytesValue> collectWorldStateNodes(
final Hash stateRootHash, final Set<BytesValue> collector) {
final List<Hash> storageRoots = new ArrayList<>();
final MerklePatriciaTrie<Bytes32, BytesValue> stateTrie = createStateTrie(stateRootHash);

// Collect storage roots and code
stateTrie
.entriesFrom(Bytes32.ZERO, 1000)
.forEach(
(key, val) -> {
final StateTrieAccountValue accountValue =
StateTrieAccountValue.readFrom(RLP.input(val));
stateStorage
.get(accountValue.getCodeHash().getArrayUnsafe())
.ifPresent(v -> collector.add(BytesValue.wrap(v)));
storageRoots.add(accountValue.getStorageRoot());
});

// Collect state nodes
collectTrieNodes(stateTrie, collector);
// Collect storage nodes
for (Hash storageRoot : storageRoots) {
final MerklePatriciaTrie<Bytes32, BytesValue> storageTrie = createStorageTrie(storageRoot);
collectTrieNodes(storageTrie, collector);
}

return collector;
}

private void collectTrieNodes(
final MerklePatriciaTrie<Bytes32, BytesValue> trie, final Set<BytesValue> collector) {
final Bytes32 rootHash = trie.getRootHash();
trie.visitAll(
(node) -> {
if (node.isReferencedByHash() || node.getHash().equals(rootHash)) {
collector.add(node.getRlp());
}
});
}

private MerklePatriciaTrie<Bytes32, BytesValue> createStateTrie(final Bytes32 rootHash) {
return new StoredMerklePatriciaTrie<>(
worldStateStorage::getAccountStateTrieNode,
rootHash,
Function.identity(),
Function.identity());
}

private MerklePatriciaTrie<Bytes32, BytesValue> createStorageTrie(final Bytes32 rootHash) {
return new StoredMerklePatriciaTrie<>(
worldStateStorage::getAccountStorageTrieNode,
rootHash,
Function.identity(),
Function.identity());
}

// Proxy class so that we have access to the constructor that takes our own map
private static class TestInMemoryStorage extends InMemoryKeyValueStorage {

public TestInMemoryStorage(final Map<BytesValue, byte[]> hashValueStore) {
super(hashValueStore);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public class MarkSweepPruner {
private final Counter sweptNodesCounter;
private volatile long nodeAddedListenerId;
private final ReentrantLock markLock = new ReentrantLock(true);
private final Set<BytesValue> pendingMarks = Collections.newSetFromMap(new ConcurrentHashMap<>());
private final Set<Bytes32> pendingMarks = Collections.newSetFromMap(new ConcurrentHashMap<>());

public MarkSweepPruner(
final WorldStateStorage worldStateStorage,
Expand Down Expand Up @@ -98,7 +98,9 @@ public MarkSweepPruner(

public void prepare() {
worldStateStorage.removeNodeAddedListener(nodeAddedListenerId); // Just in case.
nodeAddedListenerId = worldStateStorage.addNodeAddedListener(this::markNewNodes);
markStorage.clear();
pendingMarks.clear();
nodeAddedListenerId = worldStateStorage.addNodeAddedListener(this::markNodes);
}

public void cleanup() {
Expand All @@ -107,7 +109,6 @@ public void cleanup() {

public void mark(final Hash rootHash) {
markOperationCounter.inc();
markStorage.clear();
createStateTrie(rootHash)
.visitAll(
node -> {
Expand All @@ -119,13 +120,12 @@ public void mark(final Hash rootHash) {
markNode(node.getHash());
node.getValue().ifPresent(this::processAccountState);
});
LOG.info("Completed marking used nodes for pruning");
LOG.debug("Completed marking used nodes for pruning");
}

public void sweepBefore(final long markedBlockNumber) {
flushPendingMarks();
sweepOperationCounter.inc();
LOG.info("Sweeping unused nodes");
LOG.debug("Sweeping unused nodes");
// Sweep state roots first, walking backwards until we get to a state root that isn't in the
// storage
long prunedNodeCount = 0;
Expand All @@ -138,7 +138,7 @@ public void sweepBefore(final long markedBlockNumber) {
break;
}

if (!markStorage.containsKey(candidateStateRootHash.getArrayUnsafe())) {
if (!isMarked(candidateStateRootHash)) {
updater.removeAccountStateTrieNode(candidateStateRootHash);
prunedNodeCount++;
if (prunedNodeCount % operationsPerTransaction == 0) {
Expand All @@ -149,11 +149,19 @@ public void sweepBefore(final long markedBlockNumber) {
}
updater.commit();
// Sweep non-state-root nodes
prunedNodeCount += worldStateStorage.prune(markStorage::containsKey);
prunedNodeCount += worldStateStorage.prune(this::isMarked);
sweptNodesCounter.inc(prunedNodeCount);
worldStateStorage.removeNodeAddedListener(nodeAddedListenerId);
markStorage.clear();
LOG.info("Completed sweeping unused nodes");
LOG.debug("Completed sweeping unused nodes");
}

private boolean isMarked(final Bytes32 key) {
return pendingMarks.contains(key) || markStorage.containsKey(key.getArrayUnsafe());
}

private boolean isMarked(final byte[] key) {
return pendingMarks.contains(Bytes32.wrap(key)) || markStorage.containsKey(key);
}

private MerklePatriciaTrie<Bytes32, BytesValue> createStateTrie(final Bytes32 rootHash) {
Expand Down Expand Up @@ -182,10 +190,14 @@ private void processAccountState(final BytesValue value) {

@VisibleForTesting
void markNode(final Bytes32 hash) {
markedNodesCounter.inc();
markNodes(Collections.singleton(hash));
}

private void markNodes(final Collection<Bytes32> nodeHashes) {
markedNodesCounter.inc(nodeHashes.size());
markLock.lock();
try {
pendingMarks.add(hash);
pendingMarks.addAll(nodeHashes);
maybeFlushPendingMarks();
} finally {
markLock.unlock();
Expand All @@ -209,15 +221,4 @@ void flushPendingMarks() {
markLock.unlock();
}
}

private void markNewNodes(final Collection<Bytes32> nodeHashes) {
markedNodesCounter.inc(nodeHashes.size());
markLock.lock();
try {
pendingMarks.addAll(nodeHashes);
maybeFlushPendingMarks();
} finally {
markLock.unlock();
}
}
}
Loading

0 comments on commit d94cfdd

Please sign in to comment.