Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

[PAN-2819] [PAN-2820] Mark Sweep #1638

Merged
merged 56 commits into from
Aug 7, 2019
Merged
Show file tree
Hide file tree
Changes from 54 commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
064409b
Mark Sweep Pruner - first pass
ajsutton May 8, 2019
ee116fb
narrow Pruner dependency: ProtocolContext -> Blockchain
RatanRSur Jun 26, 2019
0c6af0d
test scaffolding
RatanRSur Jun 26, 2019
7073fae
explainer comments and beginnings of block creation
RatanRSur Jun 27, 2019
a95ab99
make Pruner.State public and add getter
RatanRSur Jun 27, 2019
023dee9
sweeping log
RatanRSur Jun 27, 2019
aef1c1e
get test properly failing
RatanRSur Jun 27, 2019
1af66ea
gate sweeping on marked block being on canonical chain
RatanRSur Jul 2, 2019
0bccea4
privitize state and remove getter
RatanRSur Jul 2, 2019
adbc86a
add transient fork outliving
RatanRSur Jul 2, 2019
216907a
add test check that markStorage is empty after sweeping
RatanRSur Jul 3, 2019
a69bf0f
disable pruner for now
RatanRSur Jul 3, 2019
4615be3
set useColumns to false
RatanRSur Jul 3, 2019
f268c57
remove unused variable
RatanRSur Jul 3, 2019
ef2499c
remove resolved TODOs
RatanRSur Jul 5, 2019
26a5972
remove TODO that is now tracked by JIRA issue
RatanRSur Jul 5, 2019
d26b30c
unmock BlockDataGenerator
RatanRSur Jul 5, 2019
c7224cd
remove outdated comment
RatanRSur Jul 11, 2019
6eaded2
remove mark/sweep irrelevant changes
RatanRSur Jul 15, 2019
33a5b9d
make nodeAddedListenerID volatile
RatanRSur Jul 15, 2019
3641159
don't exit on pruning failure
RatanRSur Jul 15, 2019
a395a3d
add mockito runner to pruner tests
RatanRSur Jul 15, 2019
7e004b1
awaitTermination of Pruner instead of shutdownNow
RatanRSur Jul 15, 2019
e57885b
gate pruning with boolean instead of comment
RatanRSur Jul 15, 2019
89a44ad
inject ExecutorService dependency into Pruner and move MockExecutorSe…
RatanRSur Jul 15, 2019
e4632a4
add happy path testing
RatanRSur Jul 16, 2019
5a129cf
swap pruner parameters
RatanRSur Jul 16, 2019
47fdc47
add transient fork and retention period test
RatanRSur Jul 16, 2019
94062e8
add negative checks to period testing
RatanRSur Jul 16, 2019
d27d7cd
rename mayContainKey test to containsKey
RatanRSur Jul 16, 2019
ea2bc38
Pruner IllegalArgumentException testing
RatanRSur Jul 16, 2019
f4de1b4
fix exception string formatting
RatanRSur Jul 16, 2019
fcb96b9
spotless
RatanRSur Jul 16, 2019
cd2fbec
fix abort test after pruner state changing change
RatanRSur Jul 16, 2019
e57cd00
build ExecutorService on demand
RatanRSur Jul 23, 2019
18de8a9
rename test
RatanRSur Jul 23, 2019
71f2c61
add pruner stopping to tests
RatanRSur Jul 23, 2019
4367f7f
give unloading responsibility to AllNodesVisitor
RatanRSur Jul 23, 2019
4280ecc
add override
RatanRSur Jul 23, 2019
af061e9
remove more references to singleUseNodes
RatanRSur Jul 23, 2019
61645ef
remove unnecessary intellij refactoring
RatanRSur Jul 23, 2019
fcd6670
Merge branch 'master' into mark-sweep
RatanRSur Aug 1, 2019
105ff70
update to reflect WorldStatePreimage changes
RatanRSur Aug 1, 2019
2cffb78
revert ExecutorServiceBuilder change and reduce shutdown timeout
RatanRSur Aug 2, 2019
8e95369
make pruner cleanup pruning strategy on shutdown
RatanRSur Aug 2, 2019
896b5e3
nest ifs to draw attention to side-effecting checks
RatanRSur Aug 2, 2019
707ebca
add comment explaining interrupt handling
RatanRSur Aug 3, 2019
3b7a020
remove unnecessary check to pending marks when sweeping because it wi…
RatanRSur Aug 3, 2019
bc032b1
relax argument requirements
RatanRSur Aug 6, 2019
54ba517
extract block appending test util method
RatanRSur Aug 6, 2019
aaa0191
fix IllegalArgumentException error message
RatanRSur Aug 6, 2019
5f3e57a
revert cascading between states and fix tests to accomodate
RatanRSur Aug 6, 2019
1e01636
use "must" instead of "should" in exception message
RatanRSur Aug 7, 2019
320dd5e
Merge branch 'master' into mark-sweep
RatanRSur Aug 7, 2019
245433f
make executor Service final
RatanRSur Aug 7, 2019
e4d119a
Merge branch 'master' into mark-sweep
RatanRSur Aug 7, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import tech.pegasys.pantheon.ethereum.privacy.PrivateTransactionStorage;
import tech.pegasys.pantheon.ethereum.worldstate.WorldStatePreimageStorage;
import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage;
import tech.pegasys.pantheon.services.kvstore.KeyValueStorage;

import java.io.Closeable;

Expand All @@ -32,4 +33,6 @@ public interface StorageProvider extends Closeable {
PrivateTransactionStorage createPrivateTransactionStorage();

PrivateStateStorage createPrivateStateStorage();

KeyValueStorage createPruningStorage();
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ public PrivateStateStorage createPrivateStateStorage() {
return new PrivateStateKeyValueStorage(privateStateStorage);
}

@Override
public KeyValueStorage createPruningStorage() {
return pruningStorage;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.worldstate;

import tech.pegasys.pantheon.ethereum.core.Hash;
import tech.pegasys.pantheon.ethereum.rlp.RLP;
import tech.pegasys.pantheon.ethereum.trie.MerklePatriciaTrie;
import tech.pegasys.pantheon.ethereum.trie.StoredMerklePatriciaTrie;
import tech.pegasys.pantheon.metrics.Counter;
import tech.pegasys.pantheon.metrics.MetricsSystem;
import tech.pegasys.pantheon.metrics.PantheonMetricCategory;
import tech.pegasys.pantheon.services.kvstore.KeyValueStorage;
import tech.pegasys.pantheon.services.kvstore.KeyValueStorage.Transaction;
import tech.pegasys.pantheon.util.bytes.Bytes32;
import tech.pegasys.pantheon.util.bytes.BytesValue;

import java.util.Collection;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class MarkSweepPruner {
private static final Logger LOG = LogManager.getLogger();
private static final BytesValue IN_USE = BytesValue.of(1);
private static final int MARKS_PER_TRANSACTION = 1000;
private final WorldStateStorage worldStateStorage;
private final KeyValueStorage markStorage;
private final Counter markedNodesCounter;
private final Counter markOperationCounter;
private final Counter sweepOperationCounter;
private final Counter sweptNodesCounter;
private volatile long nodeAddedListenerId;
private final ReentrantLock markLock = new ReentrantLock(true);
private final Set<BytesValue> pendingMarks = Collections.newSetFromMap(new ConcurrentHashMap<>());

public MarkSweepPruner(
final WorldStateStorage worldStateStorage,
final KeyValueStorage markStorage,
final MetricsSystem metricsSystem) {
this.worldStateStorage = worldStateStorage;
this.markStorage = markStorage;

markedNodesCounter =
metricsSystem.createCounter(
PantheonMetricCategory.PRUNER,
"marked_nodes_total",
"Total number of nodes marked as in use");
markOperationCounter =
metricsSystem.createCounter(
PantheonMetricCategory.PRUNER,
"mark_operations_total",
"Total number of mark operations performed");

sweptNodesCounter =
metricsSystem.createCounter(
PantheonMetricCategory.PRUNER,
"swept_nodes_total",
"Total number of unused nodes removed");
sweepOperationCounter =
metricsSystem.createCounter(
PantheonMetricCategory.PRUNER,
"sweep_operations_total",
"Total number of sweep operations performed");
}

public void prepare() {
worldStateStorage.removeNodeAddedListener(nodeAddedListenerId); // Just in case.
nodeAddedListenerId = worldStateStorage.addNodeAddedListener(this::markNewNodes);
}

public void cleanup() {
worldStateStorage.removeNodeAddedListener(nodeAddedListenerId);
}

public void mark(final Hash rootHash) {
markOperationCounter.inc();
markStorage.clear();
createStateTrie(rootHash)
.visitAll(
node -> {
if (Thread.interrupted()) {
RatanRSur marked this conversation as resolved.
Show resolved Hide resolved
// Since we don't expect to abort marking ourselves,
// our abort process consists only of handling interrupts
throw new RuntimeException("Interrupted while marking");
}
markNode(node.getHash());
node.getValue().ifPresent(this::processAccountState);
});
LOG.info("Completed marking used nodes for pruning");
}

public void sweep() {
flushPendingMarks();
sweepOperationCounter.inc();
LOG.info("Sweeping unused nodes");
final long prunedNodeCount = worldStateStorage.prune(markStorage::containsKey);
sweptNodesCounter.inc(prunedNodeCount);
worldStateStorage.removeNodeAddedListener(nodeAddedListenerId);
markStorage.clear();
LOG.info("Completed sweeping unused nodes");
}

private MerklePatriciaTrie<Bytes32, BytesValue> createStateTrie(final Bytes32 rootHash) {
return new StoredMerklePatriciaTrie<>(
worldStateStorage::getAccountStateTrieNode,
rootHash,
Function.identity(),
Function.identity());
}

private MerklePatriciaTrie<Bytes32, BytesValue> createStorageTrie(final Bytes32 rootHash) {
return new StoredMerklePatriciaTrie<>(
worldStateStorage::getAccountStorageTrieNode,
rootHash,
Function.identity(),
Function.identity());
}

private void processAccountState(final BytesValue value) {
final StateTrieAccountValue accountValue = StateTrieAccountValue.readFrom(RLP.input(value));
markNode(accountValue.getCodeHash());

createStorageTrie(accountValue.getStorageRoot())
.visitAll(storageNode -> markNode(storageNode.getHash()));
}

private void markNode(final Bytes32 hash) {
markedNodesCounter.inc();
markLock.lock();
try {
pendingMarks.add(hash);
maybeFlushPendingMarks();
} finally {
markLock.unlock();
}
}

private void maybeFlushPendingMarks() {
if (pendingMarks.size() > MARKS_PER_TRANSACTION) {
flushPendingMarks();
}
}

void flushPendingMarks() {
markLock.lock();
try {
final Transaction transaction = markStorage.startTransaction();
pendingMarks.forEach(node -> transaction.put(node, IN_USE));
transaction.commit();
pendingMarks.clear();
} finally {
markLock.unlock();
}
}

private void markNewNodes(final Collection<Bytes32> nodeHashes) {
markedNodesCounter.inc(nodeHashes.size());
markLock.lock();
try {
pendingMarks.addAll(nodeHashes);
maybeFlushPendingMarks();
} finally {
markLock.unlock();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.worldstate;

import tech.pegasys.pantheon.ethereum.chain.BlockAddedEvent;
import tech.pegasys.pantheon.ethereum.chain.Blockchain;
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.core.Hash;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class Pruner {
private static final Logger LOG = LogManager.getLogger();

private final MarkSweepPruner pruningStrategy;
private final Blockchain blockchain;
private ExecutorService executorService;
RatanRSur marked this conversation as resolved.
Show resolved Hide resolved
private final long retentionPeriodInBlocks;
private final AtomicReference<State> state = new AtomicReference<>(State.IDLE);
private volatile long markBlockNumber = 0;
private volatile BlockHeader markedBlockHeader;
private long transientForkThreshold;

public Pruner(
final MarkSweepPruner pruningStrategy,
final Blockchain blockchain,
final ExecutorService executorService,
final long transientForkThreshold,
final long retentionPeriodInBlocks) {
this.pruningStrategy = pruningStrategy;
this.executorService = executorService;
this.blockchain = blockchain;
if (transientForkThreshold < 0 || retentionPeriodInBlocks < 0) {
throw new IllegalArgumentException(
RatanRSur marked this conversation as resolved.
Show resolved Hide resolved
String.format(
"TransientForkThreshold and RetentionPeriodInBlocks must be non-negative. transientForkThreshold=%d, retentionPeriodInBlocks=%d",
transientForkThreshold, retentionPeriodInBlocks));
}
this.retentionPeriodInBlocks = retentionPeriodInBlocks;
this.transientForkThreshold = transientForkThreshold;
}

public void start() {
blockchain.observeBlockAdded((event, blockchain) -> handleNewBlock(event));
}

public void stop() throws InterruptedException {
pruningStrategy.cleanup();
executorService.awaitTermination(10, TimeUnit.SECONDS);
}

private void handleNewBlock(final BlockAddedEvent event) {
if (!event.isNewCanonicalHead()) {
return;
}

final long blockNumber = event.getBlock().getHeader().getNumber();
if (state.compareAndSet(State.IDLE, State.TRANSIENT_FORK_OUTLIVING)) {
RatanRSur marked this conversation as resolved.
Show resolved Hide resolved
pruningStrategy.prepare();
markBlockNumber = blockNumber;
} else if (blockNumber >= markBlockNumber + transientForkThreshold
&& state.compareAndSet(State.TRANSIENT_FORK_OUTLIVING, State.MARKING)) {
RatanRSur marked this conversation as resolved.
Show resolved Hide resolved
markedBlockHeader = blockchain.getBlockHeader(markBlockNumber).get();
mark(markedBlockHeader);
} else if (blockNumber >= markBlockNumber + retentionPeriodInBlocks
&& blockchain.blockIsOnCanonicalChain(markedBlockHeader.getHash())
&& state.compareAndSet(State.MARKING_COMPLETE, State.SWEEPING)) {
sweep();
}
}

private void mark(final BlockHeader header) {
markBlockNumber = header.getNumber();
final Hash stateRoot = header.getStateRoot();
LOG.info(
"Begin marking used nodes for pruning. Block number: {} State root: {}",
markBlockNumber,
stateRoot);
execute(
() -> {
pruningStrategy.mark(stateRoot);
state.compareAndSet(State.MARKING, State.MARKING_COMPLETE);
});
}

private void sweep() {
LOG.info(
"Begin sweeping unused nodes for pruning. Retention period: {}", retentionPeriodInBlocks);
execute(
() -> {
pruningStrategy.sweep();
state.compareAndSet(State.SWEEPING, State.IDLE);
});
}

private void execute(final Runnable action) {
try {
executorService.execute(action);
} catch (final Throwable t) {
LOG.error("Pruning failed", t);
state.set(State.IDLE);
}
}

private enum State {
IDLE,
TRANSIENT_FORK_OUTLIVING,
MARKING,
MARKING_COMPLETE,
SWEEPING;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage;
import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem;
import tech.pegasys.pantheon.services.kvstore.InMemoryKeyValueStorage;
import tech.pegasys.pantheon.services.kvstore.KeyValueStorage;

public class InMemoryStorageProvider implements StorageProvider {

Expand Down Expand Up @@ -86,6 +87,11 @@ public PrivateStateStorage createPrivateStateStorage() {
return new PrivateStateKeyValueStorage(new InMemoryKeyValueStorage());
}

@Override
public KeyValueStorage createPruningStorage() {
return new InMemoryKeyValueStorage();
}

@Override
public void close() {}
}
Loading