Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Decouple TrieLogManager and CachedWorldStorageManager #6072

Merged
merged 7 commits into from
Oct 25, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public class BonsaiWorldStateProvider implements WorldStateArchive {

private final Blockchain blockchain;

private final CachedWorldStorageManager cachedWorldStorageManager;
private final TrieLogManager trieLogManager;
private final BonsaiWorldState persistedState;
private final BonsaiWorldStateKeyValueStorage worldStateStorage;
Expand Down Expand Up @@ -91,15 +92,12 @@ public BonsaiWorldStateProvider(
final ObservableMetricsSystem metricsSystem,
final BesuContext pluginContext) {

this.cachedWorldStorageManager =
new CachedWorldStorageManager(this, worldStateStorage, metricsSystem);
// TODO: de-dup constructors
this.trieLogManager =
new CachedWorldStorageManager(
this,
blockchain,
worldStateStorage,
metricsSystem,
maxLayersToLoad.orElse(RETAINED_LAYERS),
pluginContext);
new TrieLogManager(
blockchain, worldStateStorage, maxLayersToLoad.orElse(RETAINED_LAYERS), pluginContext);
this.blockchain = blockchain;
this.worldStateStorage = worldStateStorage;
this.cachedMerkleTrieLoader = cachedMerkleTrieLoader;
Expand All @@ -108,16 +106,18 @@ public BonsaiWorldStateProvider(
.getBlockHeader(persistedState.getWorldStateBlockHash())
.ifPresent(
blockHeader ->
this.trieLogManager.addCachedLayer(
this.cachedWorldStorageManager.addCachedLayer(
blockHeader, persistedState.getWorldStateRootHash(), persistedState));
}

@VisibleForTesting
BonsaiWorldStateProvider(
final CachedWorldStorageManager cachedWorldStorageManager,
final TrieLogManager trieLogManager,
final BonsaiWorldStateKeyValueStorage worldStateStorage,
final Blockchain blockchain,
final CachedMerkleTrieLoader cachedMerkleTrieLoader) {
this.cachedWorldStorageManager = cachedWorldStorageManager;
this.trieLogManager = trieLogManager;
this.blockchain = blockchain;
this.worldStateStorage = worldStateStorage;
Expand All @@ -127,13 +127,13 @@ public BonsaiWorldStateProvider(
.getBlockHeader(persistedState.getWorldStateBlockHash())
.ifPresent(
blockHeader ->
this.trieLogManager.addCachedLayer(
this.cachedWorldStorageManager.addCachedLayer(
blockHeader, persistedState.getWorldStateRootHash(), persistedState));
}

@Override
public Optional<WorldState> get(final Hash rootHash, final Hash blockHash) {
return trieLogManager
return cachedWorldStorageManager
.getWorldState(blockHash)
.or(
() -> {
Expand All @@ -148,7 +148,7 @@ public Optional<WorldState> get(final Hash rootHash, final Hash blockHash) {

@Override
public boolean isWorldStateAvailable(final Hash rootHash, final Hash blockHash) {
return trieLogManager.containWorldStateStorage(blockHash)
return cachedWorldStorageManager.containWorldStateStorage(blockHash)
|| persistedState.blockHash().equals(blockHash)
|| worldStateStorage.isWorldStateAvailable(rootHash, blockHash);
}
Expand All @@ -167,10 +167,10 @@ public Optional<MutableWorldState> getMutable(
trieLogManager.getMaxLayersToLoad());
return Optional.empty();
}
return trieLogManager
return cachedWorldStorageManager
.getWorldState(blockHeader.getHash())
.or(() -> trieLogManager.getNearestWorldState(blockHeader))
.or(() -> trieLogManager.getHeadWorldState(blockchain::getBlockHeader))
.or(() -> cachedWorldStorageManager.getNearestWorldState(blockHeader))
.or(() -> cachedWorldStorageManager.getHeadWorldState(blockchain::getBlockHeader))
.flatMap(
bonsaiWorldState ->
rollMutableStateToBlockHash(bonsaiWorldState, blockHeader.getHash()))
Expand Down Expand Up @@ -354,11 +354,15 @@ public TrieLogManager getTrieLogManager() {
return trieLogManager;
}

public CachedWorldStorageManager getCachedWorldStorageManager() {
return cachedWorldStorageManager;
}

@Override
public void resetArchiveStateTo(final BlockHeader blockHeader) {
persistedState.resetWorldStateTo(blockHeader);
this.trieLogManager.reset();
this.trieLogManager.addCachedLayer(
this.cachedWorldStorageManager.reset();
this.cachedWorldStorageManager.addCachedLayer(
blockHeader, persistedState.getWorldStateRootHash(), persistedState);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,9 @@
import org.hyperledger.besu.ethereum.bonsai.storage.BonsaiWorldStateKeyValueStorage;
import org.hyperledger.besu.ethereum.bonsai.storage.BonsaiWorldStateKeyValueStorage.BonsaiStorageSubscriber;
import org.hyperledger.besu.ethereum.bonsai.storage.BonsaiWorldStateLayerStorage;
import org.hyperledger.besu.ethereum.bonsai.trielog.AbstractTrieLogManager;
import org.hyperledger.besu.ethereum.bonsai.trielog.TrieLogFactoryImpl;
import org.hyperledger.besu.ethereum.bonsai.worldview.BonsaiWorldState;
import org.hyperledger.besu.ethereum.chain.Blockchain;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.metrics.ObservableMetricsSystem;
import org.hyperledger.besu.plugin.BesuContext;
import org.hyperledger.besu.plugin.services.TrieLogService;
import org.hyperledger.besu.plugin.services.trielogs.TrieLog;
import org.hyperledger.besu.plugin.services.trielogs.TrieLogFactory;
import org.hyperledger.besu.plugin.services.trielogs.TrieLogProvider;

import java.util.ArrayList;
import java.util.Comparator;
Expand All @@ -39,52 +31,39 @@
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.stream.LongStream;
import java.util.stream.Stream;

import com.google.common.annotations.VisibleForTesting;
import org.apache.tuweni.bytes.Bytes32;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CachedWorldStorageManager extends AbstractTrieLogManager
implements BonsaiStorageSubscriber {
public class CachedWorldStorageManager implements BonsaiStorageSubscriber {
public static final long RETAINED_LAYERS = 512; // at least 256 + typical rollbacks
private static final Logger LOG = LoggerFactory.getLogger(CachedWorldStorageManager.class);
private final BonsaiWorldStateProvider archive;
private final ObservableMetricsSystem metricsSystem;

CachedWorldStorageManager(
private final BonsaiWorldStateKeyValueStorage rootWorldStateStorage;
private final Map<Bytes32, CachedBonsaiWorldView> cachedWorldStatesByHash;

private CachedWorldStorageManager(
final BonsaiWorldStateProvider archive,
final Blockchain blockchain,
final BonsaiWorldStateKeyValueStorage worldStateStorage,
final long maxLayersToLoad,
final Map<Bytes32, CachedBonsaiWorldView> cachedWorldStatesByHash,
final BesuContext pluginContext,
final ObservableMetricsSystem metricsSystem) {
super(blockchain, worldStateStorage, maxLayersToLoad, cachedWorldStatesByHash, pluginContext);
worldStateStorage.subscribe(this);
this.rootWorldStateStorage = worldStateStorage;
this.cachedWorldStatesByHash = cachedWorldStatesByHash;
this.archive = archive;
this.metricsSystem = metricsSystem;
}

public CachedWorldStorageManager(
final BonsaiWorldStateProvider archive,
final Blockchain blockchain,
final BonsaiWorldStateKeyValueStorage worldStateStorage,
final ObservableMetricsSystem metricsSystem,
final long maxLayersToLoad,
final BesuContext pluginContext) {
this(
archive,
blockchain,
worldStateStorage,
maxLayersToLoad,
new ConcurrentHashMap<>(),
pluginContext,
metricsSystem);
final ObservableMetricsSystem metricsSystem) {
this(archive, worldStateStorage, new ConcurrentHashMap<>(), metricsSystem);
}

@Override
public synchronized void addCachedLayer(
final BlockHeader blockHeader,
final Hash worldStateRootHash,
Expand Down Expand Up @@ -132,7 +111,20 @@ public synchronized void addCachedLayer(
scrubCachedLayers(blockHeader.getNumber());
}

@Override
private synchronized void scrubCachedLayers(final long newMaxHeight) {
if (cachedWorldStatesByHash.size() > RETAINED_LAYERS) {
final long waterline = newMaxHeight - RETAINED_LAYERS;
cachedWorldStatesByHash.values().stream()
.filter(layer -> layer.getBlockNumber() < waterline)
.toList()
.forEach(
layer -> {
cachedWorldStatesByHash.remove(layer.getBlockHash());
layer.close();
});
}
}

public Optional<BonsaiWorldState> getWorldState(final Hash blockHash) {
if (cachedWorldStatesByHash.containsKey(blockHash)) {
// return a new worldstate using worldstate storage and an isolated copy of the updater
Expand All @@ -150,7 +142,6 @@ public Optional<BonsaiWorldState> getWorldState(final Hash blockHash) {
return Optional.empty();
}

@Override
public Optional<BonsaiWorldState> getNearestWorldState(final BlockHeader blockHeader) {
LOG.atDebug()
.setMessage("getting nearest worldstate for {}")
Expand Down Expand Up @@ -183,7 +174,6 @@ public Optional<BonsaiWorldState> getNearestWorldState(final BlockHeader blockHe
archive, new BonsaiWorldStateLayerStorage(storage)));
}

@Override
public Optional<BonsaiWorldState> getHeadWorldState(
final Function<Hash, Optional<BlockHeader>> hashBlockHeaderFunction) {

Expand All @@ -203,7 +193,10 @@ public Optional<BonsaiWorldState> getHeadWorldState(
});
}

@Override
public boolean containWorldStateStorage(final Hash blockHash) {
return cachedWorldStatesByHash.containsKey(blockHash);
}

public void reset() {
this.cachedWorldStatesByHash.clear();
}
Expand All @@ -227,76 +220,4 @@ public void onClearTrieLog() {
public void onCloseStorage() {
this.cachedWorldStatesByHash.clear();
}

@VisibleForTesting
@Override
protected TrieLogFactory setupTrieLogFactory(final BesuContext pluginContext) {
// if we have a TrieLogService from pluginContext, use it.
var trieLogServicez =
Optional.ofNullable(pluginContext)
.flatMap(context -> context.getService(TrieLogService.class));

if (trieLogServicez.isPresent()) {
var trieLogService = trieLogServicez.get();
// push the TrieLogProvider into the TrieLogService
trieLogService.configureTrieLogProvider(getTrieLogProvider());

// configure plugin observers:
trieLogService.getObservers().forEach(trieLogObservers::subscribe);

// return the TrieLogFactory implementation from the TrieLogService
return trieLogService.getTrieLogFactory();
} else {
// Otherwise default to TrieLogFactoryImpl
return new TrieLogFactoryImpl();
}
}

@VisibleForTesting
TrieLogProvider getTrieLogProvider() {
return new TrieLogProvider() {
@Override
public Optional<TrieLog> getTrieLogLayer(final Hash blockHash) {
return CachedWorldStorageManager.this.getTrieLogLayer(blockHash);
}

@Override
public Optional<TrieLog> getTrieLogLayer(final long blockNumber) {
return CachedWorldStorageManager.this
.blockchain
.getBlockHeader(blockNumber)
.map(BlockHeader::getHash)
.flatMap(CachedWorldStorageManager.this::getTrieLogLayer);
}

@Override
public List<TrieLogRangeTuple> getTrieLogsByRange(
final long fromBlockNumber, final long toBlockNumber) {
return rangeAsStream(fromBlockNumber, toBlockNumber)
.map(blockchain::getBlockHeader)
.map(
headerOpt ->
headerOpt.flatMap(
header ->
CachedWorldStorageManager.this
.getTrieLogLayer(header.getBlockHash())
.map(
layer ->
new TrieLogRangeTuple(
header.getBlockHash(), header.getNumber(), layer))))
.filter(Optional::isPresent)
.map(Optional::get)
.toList();
}

Stream<Long> rangeAsStream(final long fromBlockNumber, final long toBlockNumber) {
if (Math.abs(toBlockNumber - fromBlockNumber) > LOG_RANGE_LIMIT) {
throw new IllegalArgumentException("Requested Range too large");
}
long left = Math.min(fromBlockNumber, toBlockNumber);
long right = Math.max(fromBlockNumber, toBlockNumber);
return LongStream.range(left, right).boxed();
}
};
}
}
Loading
Loading