diff --git a/ethereum/core/src/jmh/java/org/hyperledger/besu/ethereum/vm/operations/OperationBenchmarkHelper.java b/ethereum/core/src/jmh/java/org/hyperledger/besu/ethereum/vm/operations/OperationBenchmarkHelper.java index a55ad27f20a..b051b0f7976 100644 --- a/ethereum/core/src/jmh/java/org/hyperledger/besu/ethereum/vm/operations/OperationBenchmarkHelper.java +++ b/ethereum/core/src/jmh/java/org/hyperledger/besu/ethereum/vm/operations/OperationBenchmarkHelper.java @@ -31,7 +31,7 @@ import org.hyperledger.besu.plugin.services.storage.rocksdb.RocksDBMetricsFactory; import org.hyperledger.besu.plugin.services.storage.rocksdb.configuration.RocksDBConfigurationBuilder; import org.hyperledger.besu.plugin.services.storage.rocksdb.segmented.OptimisticRocksDBColumnarKeyValueStorage; -import org.hyperledger.besu.services.kvstore.SnappableSegmentedKeyValueStorageAdapter; +import org.hyperledger.besu.services.kvstore.SegmentedKeyValueStorageAdapter; import java.io.IOException; import java.nio.file.Files; @@ -70,7 +70,7 @@ public static OperationBenchmarkHelper create() throws IOException { RocksDBMetricsFactory.PUBLIC_ROCKS_DB_METRICS); final KeyValueStorage keyValueStorage = - new SnappableSegmentedKeyValueStorageAdapter<>( + new SegmentedKeyValueStorageAdapter( KeyValueSegmentIdentifier.BLOCKCHAIN, optimisticRocksDBColumnarKeyValueStorage); final ExecutionContextTestFixture executionContext = diff --git a/plugin-api/src/main/java/org/hyperledger/besu/plugin/services/storage/KeyValueStorageFactory.java b/plugin-api/src/main/java/org/hyperledger/besu/plugin/services/storage/KeyValueStorageFactory.java index 8a983153dac..583354a4b96 100644 --- a/plugin-api/src/main/java/org/hyperledger/besu/plugin/services/storage/KeyValueStorageFactory.java +++ b/plugin-api/src/main/java/org/hyperledger/besu/plugin/services/storage/KeyValueStorageFactory.java @@ -24,7 +24,7 @@ /** Factory for creating key-value storage instances. */ @Unstable -public interface KeyValueStorageFactory extends Closeable { +public interface KeyValueStorageFactory extends Closeable { /** * Retrieves the identity of the key-value storage factory. diff --git a/plugin-api/src/main/java/org/hyperledger/besu/plugin/services/storage/PrivacyKeyValueStorageFactory.java b/plugin-api/src/main/java/org/hyperledger/besu/plugin/services/storage/PrivacyKeyValueStorageFactory.java index 64997cb10bb..b8d35a1ad7b 100644 --- a/plugin-api/src/main/java/org/hyperledger/besu/plugin/services/storage/PrivacyKeyValueStorageFactory.java +++ b/plugin-api/src/main/java/org/hyperledger/besu/plugin/services/storage/PrivacyKeyValueStorageFactory.java @@ -18,7 +18,7 @@ /** The interface Privacy key value storage factory. */ @Unstable -public interface PrivacyKeyValueStorageFactory extends KeyValueStorageFactory { +public interface PrivacyKeyValueStorageFactory extends KeyValueStorageFactory { /** * Retrieves the version of the key-value storage factory. * diff --git a/plugin-api/src/main/java/org/hyperledger/besu/plugin/services/storage/SnappableKeyValueStorage.java b/plugin-api/src/main/java/org/hyperledger/besu/plugin/services/storage/SnappableKeyValueStorage.java index 850111d02dc..8383ec36a36 100644 --- a/plugin-api/src/main/java/org/hyperledger/besu/plugin/services/storage/SnappableKeyValueStorage.java +++ b/plugin-api/src/main/java/org/hyperledger/besu/plugin/services/storage/SnappableKeyValueStorage.java @@ -17,7 +17,6 @@ /** The interface Snappable key value storage. * - * @param the type which will be returned from takeSnapshot() */ public interface SnappableKeyValueStorage extends SegmentedKeyValueStorage { diff --git a/plugins/rocksdb/src/main/java/org/hyperledger/besu/plugin/services/storage/rocksdb/RocksDBKeyValuePrivacyStorageFactory.java b/plugins/rocksdb/src/main/java/org/hyperledger/besu/plugin/services/storage/rocksdb/RocksDBKeyValuePrivacyStorageFactory.java index 95801e46d5e..34fe0e1ab1e 100644 --- a/plugins/rocksdb/src/main/java/org/hyperledger/besu/plugin/services/storage/rocksdb/RocksDBKeyValuePrivacyStorageFactory.java +++ b/plugins/rocksdb/src/main/java/org/hyperledger/besu/plugin/services/storage/rocksdb/RocksDBKeyValuePrivacyStorageFactory.java @@ -20,11 +20,13 @@ import org.hyperledger.besu.plugin.services.storage.KeyValueStorage; import org.hyperledger.besu.plugin.services.storage.PrivacyKeyValueStorageFactory; import org.hyperledger.besu.plugin.services.storage.SegmentIdentifier; +import org.hyperledger.besu.plugin.services.storage.SegmentedKeyValueStorage; import org.hyperledger.besu.plugin.services.storage.rocksdb.configuration.DatabaseMetadata; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; +import java.util.List; import java.util.Set; import org.slf4j.Logger; @@ -75,6 +77,23 @@ public KeyValueStorage create( return publicFactory.create(segment, commonConfiguration, metricsSystem); } + @Override + public SegmentedKeyValueStorage create( + final List segments, + final BesuConfiguration commonConfiguration, + final MetricsSystem metricsSystem) + throws StorageException { + if (databaseVersion == null) { + try { + databaseVersion = readDatabaseVersion(commonConfiguration); + } catch (final IOException e) { + throw new StorageException("Failed to retrieve the RocksDB database meta version", e); + } + } + + return publicFactory.create(segments, commonConfiguration, metricsSystem); + } + @Override public boolean isSegmentIsolationSupported() { return publicFactory.isSegmentIsolationSupported(); diff --git a/plugins/rocksdb/src/main/java/org/hyperledger/besu/plugin/services/storage/rocksdb/RocksDBKeyValueStorageFactory.java b/plugins/rocksdb/src/main/java/org/hyperledger/besu/plugin/services/storage/rocksdb/RocksDBKeyValueStorageFactory.java index 5b7b509791b..90644bd7703 100644 --- a/plugins/rocksdb/src/main/java/org/hyperledger/besu/plugin/services/storage/rocksdb/RocksDBKeyValueStorageFactory.java +++ b/plugins/rocksdb/src/main/java/org/hyperledger/besu/plugin/services/storage/rocksdb/RocksDBKeyValueStorageFactory.java @@ -14,8 +14,6 @@ */ package org.hyperledger.besu.plugin.services.storage.rocksdb; -import static com.google.common.base.Preconditions.checkNotNull; - import org.hyperledger.besu.ethereum.worldstate.DataStorageFormat; import org.hyperledger.besu.plugin.services.BesuConfiguration; import org.hyperledger.besu.plugin.services.MetricsSystem; @@ -32,7 +30,6 @@ import org.hyperledger.besu.plugin.services.storage.rocksdb.segmented.RocksDBColumnarKeyValueStorage; import org.hyperledger.besu.plugin.services.storage.rocksdb.segmented.TransactionDBRocksDBColumnarKeyValueStorage; import org.hyperledger.besu.services.kvstore.SegmentedKeyValueStorageAdapter; -import org.hyperledger.besu.services.kvstore.SnappableSegmentedKeyValueStorageAdapter; import java.io.IOException; import java.nio.file.Files; @@ -45,8 +42,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** The Rocks db key value storage factory. */ -public class RocksDBKeyValueStorageFactory implements KeyValueStorageFactory { +/** + * The Rocks db key value storage factory creates segmented storage and uses a adapter to support + * unsegmented keyvalue storage. + * + */ +public class RocksDBKeyValueStorageFactory implements KeyValueStorageFactory { private static final Logger LOG = LoggerFactory.getLogger(RocksDBKeyValueStorageFactory.class); private static final int DEFAULT_VERSION = 1; @@ -151,6 +152,15 @@ public KeyValueStorage create( final BesuConfiguration commonConfiguration, final MetricsSystem metricsSystem) throws StorageException { + return new SegmentedKeyValueStorageAdapter(segment, create(List.of(segment), commonConfiguration, metricsSystem)); + } + + @Override + public SegmentedKeyValueStorage create( + final List segments, + final BesuConfiguration commonConfiguration, + final MetricsSystem metricsSystem) + throws StorageException { final boolean isForestStorageFormat = DataStorageFormat.FOREST.getDatabaseVersion() == commonConfiguration.getDatabaseVersion(); if (requiresInit()) { @@ -169,38 +179,23 @@ public KeyValueStorage create( .collect(Collectors.toList()); if (isForestStorageFormat) { LOG.debug("FOREST mode detected, using TransactionDB."); - segmentedStorage = - new TransactionDBRocksDBColumnarKeyValueStorage( - rocksDBConfiguration, - segmentsForVersion, - ignorableSegments, - metricsSystem, - rocksDBMetricsFactory); + segmentedStorage = new TransactionDBRocksDBColumnarKeyValueStorage( + rocksDBConfiguration, + segmentsForVersion, + ignorableSegments, + metricsSystem, + rocksDBMetricsFactory); } else { LOG.debug("Using OptimisticTransactionDB."); - segmentedStorage = - new OptimisticRocksDBColumnarKeyValueStorage( - rocksDBConfiguration, - segmentsForVersion, - ignorableSegments, - metricsSystem, - rocksDBMetricsFactory); + segmentedStorage = new OptimisticRocksDBColumnarKeyValueStorage( + rocksDBConfiguration, + segmentsForVersion, + ignorableSegments, + metricsSystem, + rocksDBMetricsFactory); } } - - final RocksDbSegmentIdentifier rocksSegment = - segmentedStorage.getSegmentIdentifierByName(segment); - - if (isForestStorageFormat) { - return new SegmentedKeyValueStorageAdapter<>(segment, segmentedStorage); - } else { - return new SnappableSegmentedKeyValueStorageAdapter<>( - segment, - segmentedStorage, - () -> - ((OptimisticRocksDBColumnarKeyValueStorage) segmentedStorage) - .takeSnapshot(rocksSegment)); - } + return segmentedStorage; } default -> throw new IllegalStateException( String.format( @@ -209,12 +204,6 @@ public KeyValueStorage create( } } - @Override - public SegmentedKeyValueStorage create(final List segments, final BesuConfiguration configuration, final MetricsSystem metricsSystem) throws StorageException { - //TODO: write me - return null; - } - /** * Storage path. * diff --git a/plugins/rocksdb/src/main/java/org/hyperledger/besu/plugin/services/storage/rocksdb/RocksDBTransaction.java b/plugins/rocksdb/src/main/java/org/hyperledger/besu/plugin/services/storage/rocksdb/RocksDBTransaction.java index b56040ff488..988839adf23 100644 --- a/plugins/rocksdb/src/main/java/org/hyperledger/besu/plugin/services/storage/rocksdb/RocksDBTransaction.java +++ b/plugins/rocksdb/src/main/java/org/hyperledger/besu/plugin/services/storage/rocksdb/RocksDBTransaction.java @@ -17,7 +17,12 @@ import org.hyperledger.besu.plugin.services.exception.StorageException; import org.hyperledger.besu.plugin.services.metrics.OperationTimer; import org.hyperledger.besu.plugin.services.storage.KeyValueStorageTransaction; +import org.hyperledger.besu.plugin.services.storage.SegmentIdentifier; +import org.hyperledger.besu.plugin.services.storage.SegmentedKeyValueStorageTransaction; +import java.util.function.Function; + +import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.RocksDBException; import org.rocksdb.Transaction; import org.rocksdb.WriteOptions; @@ -25,13 +30,14 @@ import org.slf4j.LoggerFactory; /** The RocksDb transaction. */ -public class RocksDBTransaction implements KeyValueStorageTransaction { +public class RocksDBTransaction implements SegmentedKeyValueStorageTransaction { private static final Logger logger = LoggerFactory.getLogger(RocksDBTransaction.class); private static final String NO_SPACE_LEFT_ON_DEVICE = "No space left on device"; private final RocksDBMetrics metrics; private final Transaction innerTx; private final WriteOptions options; + private final Function columnFamilyMapper; /** * Instantiates a new RocksDb transaction. @@ -41,16 +47,18 @@ public class RocksDBTransaction implements KeyValueStorageTransaction { * @param metrics the metrics */ public RocksDBTransaction( + final Function columnFamilyMapper, final Transaction innerTx, final WriteOptions options, final RocksDBMetrics metrics) { + this.columnFamilyMapper = columnFamilyMapper; this.innerTx = innerTx; this.options = options; this.metrics = metrics; } @Override - public void put(final byte[] key, final byte[] value) { + public void put(final SegmentIdentifier segmentId, final byte[] key, final byte[] value) { try (final OperationTimer.TimingContext ignored = metrics.getWriteLatency().startTimer()) { - innerTx.put(key, value); + innerTx.put(columnFamilyMapper.apply(segmentId), key, value); } catch (final RocksDBException e) { if (e.getMessage().contains(NO_SPACE_LEFT_ON_DEVICE)) { logger.error(e.getMessage()); @@ -61,9 +69,9 @@ public void put(final byte[] key, final byte[] value) { } @Override - public void remove(final byte[] key) { + public void remove(final SegmentIdentifier segmentId, final byte[] key) { try (final OperationTimer.TimingContext ignored = metrics.getRemoveLatency().startTimer()) { - innerTx.delete(key); + innerTx.delete(columnFamilyMapper.apply(segmentId), key); } catch (final RocksDBException e) { if (e.getMessage().contains(NO_SPACE_LEFT_ON_DEVICE)) { logger.error(e.getMessage()); diff --git a/plugins/rocksdb/src/main/java/org/hyperledger/besu/plugin/services/storage/rocksdb/segmented/OptimisticRocksDBColumnarKeyValueStorage.java b/plugins/rocksdb/src/main/java/org/hyperledger/besu/plugin/services/storage/rocksdb/segmented/OptimisticRocksDBColumnarKeyValueStorage.java index 2b96e771cab..ae3b771a581 100644 --- a/plugins/rocksdb/src/main/java/org/hyperledger/besu/plugin/services/storage/rocksdb/segmented/OptimisticRocksDBColumnarKeyValueStorage.java +++ b/plugins/rocksdb/src/main/java/org/hyperledger/besu/plugin/services/storage/rocksdb/segmented/OptimisticRocksDBColumnarKeyValueStorage.java @@ -16,13 +16,13 @@ import org.hyperledger.besu.plugin.services.MetricsSystem; import org.hyperledger.besu.plugin.services.exception.StorageException; -import org.hyperledger.besu.plugin.services.storage.KeyValueStorageTransaction; import org.hyperledger.besu.plugin.services.storage.SegmentIdentifier; +import org.hyperledger.besu.plugin.services.storage.SegmentedKeyValueStorageTransaction; +import org.hyperledger.besu.plugin.services.storage.SnappableKeyValueStorage; import org.hyperledger.besu.plugin.services.storage.rocksdb.RocksDBMetricsFactory; import org.hyperledger.besu.plugin.services.storage.rocksdb.RocksDBTransaction; -import org.hyperledger.besu.plugin.services.storage.rocksdb.RocksDbSegmentIdentifier; import org.hyperledger.besu.plugin.services.storage.rocksdb.configuration.RocksDBConfiguration; -import org.hyperledger.besu.services.kvstore.KeyValueStorageTransactionValidatorDecorator; +import org.hyperledger.besu.services.kvstore.SegmentedKeyValueStorageTransactionValidatorDecorator; import java.util.List; @@ -32,7 +32,7 @@ import org.rocksdb.WriteOptions; /** Optimistic RocksDB Columnar key value storage */ -public class OptimisticRocksDBColumnarKeyValueStorage extends RocksDBColumnarKeyValueStorage { +public class OptimisticRocksDBColumnarKeyValueStorage extends RocksDBColumnarKeyValueStorage implements SnappableKeyValueStorage { private final OptimisticTransactionDB db; /** @@ -78,24 +78,23 @@ RocksDB getDB() { * @throws StorageException the storage exception */ @Override - public KeyValueStorageTransaction startTransaction() throws StorageException { + public SegmentedKeyValueStorageTransaction startTransaction() throws StorageException { throwIfClosed(); final WriteOptions writeOptions = new WriteOptions(); writeOptions.setIgnoreMissingColumnFamilies(true); - return new KeyValueStorageTransactionValidatorDecorator( - new RocksDBTransaction(db.beginTransaction(writeOptions), writeOptions, this.metrics), this.closed::get); + return new SegmentedKeyValueStorageTransactionValidatorDecorator( + new RocksDBTransaction(this::safeColumnHandle, db.beginTransaction(writeOptions), writeOptions, this.metrics), this.closed::get); } /** * Take snapshot RocksDb columnar key value snapshot. * - * @param segment the segment * @return the RocksDb columnar key value snapshot * @throws StorageException the storage exception */ - public RocksDBColumnarKeyValueSnapshot takeSnapshot(final RocksDbSegmentIdentifier segment) + public RocksDBColumnarKeyValueSnapshot takeSnapshot() throws StorageException { throwIfClosed(); - return new RocksDBColumnarKeyValueSnapshot(db, segment, metrics); + return new RocksDBColumnarKeyValueSnapshot(db, metrics); } } diff --git a/plugins/rocksdb/src/main/java/org/hyperledger/besu/plugin/services/storage/rocksdb/segmented/RocksDBColumnarKeyValueSnapshot.java b/plugins/rocksdb/src/main/java/org/hyperledger/besu/plugin/services/storage/rocksdb/segmented/RocksDBColumnarKeyValueSnapshot.java index bfc780218d8..158cbab9f9b 100644 --- a/plugins/rocksdb/src/main/java/org/hyperledger/besu/plugin/services/storage/rocksdb/segmented/RocksDBColumnarKeyValueSnapshot.java +++ b/plugins/rocksdb/src/main/java/org/hyperledger/besu/plugin/services/storage/rocksdb/segmented/RocksDBColumnarKeyValueSnapshot.java @@ -19,6 +19,8 @@ import org.hyperledger.besu.plugin.services.exception.StorageException; import org.hyperledger.besu.plugin.services.storage.KeyValueStorageTransaction; +import org.hyperledger.besu.plugin.services.storage.SegmentIdentifier; +import org.hyperledger.besu.plugin.services.storage.SegmentedKeyValueStorage; import org.hyperledger.besu.plugin.services.storage.SegmentedKeyValueStorageTransaction; import org.hyperledger.besu.plugin.services.storage.SnappedKeyValueStorage; import org.hyperledger.besu.plugin.services.storage.rocksdb.RocksDBMetrics; @@ -38,7 +40,7 @@ import org.slf4j.LoggerFactory; /** The RocksDb columnar key value snapshot. */ -public class RocksDBColumnarKeyValueSnapshot implements SnappedKeyValueStorage { +public class RocksDBColumnarKeyValueSnapshot implements SegmentedKeyValueStorage, SnappedKeyValueStorage { private static final Logger LOG = LoggerFactory.getLogger(RocksDBColumnarKeyValueSnapshot.class); @@ -60,54 +62,54 @@ public class RocksDBColumnarKeyValueSnapshot implements SnappedKeyValueStorage { final OptimisticTransactionDB db, final RocksDBMetrics metrics) { this.db = db; - this.snapTx = new RocksDBSnapshotTransaction(db, segment.get(), metrics); + this.snapTx = new RocksDBSnapshotTransaction(db, /*TODO get segment MAPPER*/ null, metrics); } @Override - public Optional get(final byte[] key) throws StorageException { + public Optional get(final SegmentIdentifier segment, final byte[] key) throws StorageException { throwIfClosed(); - return snapTx.get(key); + return snapTx.get(segment, key); } @Override - public Stream> stream() { + public Stream> stream(final SegmentIdentifier segment) { throwIfClosed(); - return snapTx.stream(); + return snapTx.stream(segment); } @Override - public Stream> streamFromKey(final byte[] startKey) { - return stream().filter(e -> Bytes.wrap(startKey).compareTo(Bytes.wrap(e.getKey())) <= 0); + public Stream> streamFromKey(final SegmentIdentifier segment, final byte[] startKey) { + return stream(segment).filter(e -> Bytes.wrap(startKey).compareTo(Bytes.wrap(e.getKey())) <= 0); } @Override - public Stream streamKeys() { + public Stream streamKeys(final SegmentIdentifier segment) { throwIfClosed(); - return snapTx.streamKeys(); + return snapTx.streamKeys(segment); } @Override - public boolean tryDelete(final byte[] key) throws StorageException { + public boolean tryDelete(final SegmentIdentifier segment, final byte[] key) throws StorageException { throwIfClosed(); - snapTx.remove(key); + snapTx.remove(segment, key); return true; } @Override - public Set getAllKeysThat(final Predicate returnCondition) { - return streamKeys().filter(returnCondition).collect(toUnmodifiableSet()); + public Set getAllKeysThat(final SegmentIdentifier segment, final Predicate returnCondition) { + return streamKeys(segment).filter(returnCondition).collect(toUnmodifiableSet()); } @Override - public Set getAllValuesFromKeysThat(final Predicate returnCondition) { - return stream() + public Set getAllValuesFromKeysThat(final SegmentIdentifier segment, final Predicate returnCondition) { + return stream(segment) .filter(pair -> returnCondition.test(pair.getKey())) .map(Pair::getValue) .collect(toUnmodifiableSet()); } @Override - public KeyValueStorageTransaction startTransaction() throws StorageException { + public SegmentedKeyValueStorageTransaction startTransaction() throws StorageException { // The use of a transaction on a transaction based key value store is dubious // at best. return our snapshot transaction instead. return snapTx; @@ -119,15 +121,15 @@ public boolean isClosed() { } @Override - public void clear() { + public void clear(final SegmentIdentifier segment) { throw new UnsupportedOperationException( "RocksDBColumnarKeyValueSnapshot does not support clear"); } @Override - public boolean containsKey(final byte[] key) throws StorageException { + public boolean containsKey(final SegmentIdentifier segment, final byte[] key) throws StorageException { throwIfClosed(); - return snapTx.get(key).isPresent(); + return snapTx.get(segment, key).isPresent(); } @Override @@ -145,7 +147,7 @@ private void throwIfClosed() { } @Override - public KeyValueStorageTransaction getSnapshotTransaction() { + public SegmentedKeyValueStorageTransaction getSnapshotTransaction() { return snapTx; } } diff --git a/plugins/rocksdb/src/main/java/org/hyperledger/besu/plugin/services/storage/rocksdb/segmented/RocksDBColumnarKeyValueStorage.java b/plugins/rocksdb/src/main/java/org/hyperledger/besu/plugin/services/storage/rocksdb/segmented/RocksDBColumnarKeyValueStorage.java index 6012f5920e7..9e45606bba9 100644 --- a/plugins/rocksdb/src/main/java/org/hyperledger/besu/plugin/services/storage/rocksdb/segmented/RocksDBColumnarKeyValueStorage.java +++ b/plugins/rocksdb/src/main/java/org/hyperledger/besu/plugin/services/storage/rocksdb/segmented/RocksDBColumnarKeyValueStorage.java @@ -38,6 +38,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -66,7 +67,7 @@ /** The RocksDb columnar key value storage. */ public abstract class RocksDBColumnarKeyValueStorage - implements SegmentedKeyValueStorage, SnappableKeyValueStorage { + implements SegmentedKeyValueStorage { private static final Logger LOG = LoggerFactory.getLogger(RocksDBColumnarKeyValueStorage.class); static final String DEFAULT_COLUMN = "default"; @@ -242,7 +243,7 @@ BlockBasedTableConfig createBlockBasedTableConfig(final RocksDBConfiguration con .setBlockSize(ROCKSDB_BLOCK_SIZE); } - ColumnFamilyHandle safeColumnHandle(final SegmentIdentifier segment) { + protected ColumnFamilyHandle safeColumnHandle(final SegmentIdentifier segment) { RocksDbSegmentIdentifier safeRef = columnHandlesBySegmentIdentifier.get(segment); if (safeRef == null) { throw new RuntimeException("Column handle not found for segment " + segment.getName()); diff --git a/plugins/rocksdb/src/main/java/org/hyperledger/besu/plugin/services/storage/rocksdb/segmented/RocksDBSnapshotTransaction.java b/plugins/rocksdb/src/main/java/org/hyperledger/besu/plugin/services/storage/rocksdb/segmented/RocksDBSnapshotTransaction.java index dbe460b3ffb..8c579494551 100644 --- a/plugins/rocksdb/src/main/java/org/hyperledger/besu/plugin/services/storage/rocksdb/segmented/RocksDBSnapshotTransaction.java +++ b/plugins/rocksdb/src/main/java/org/hyperledger/besu/plugin/services/storage/rocksdb/segmented/RocksDBSnapshotTransaction.java @@ -18,11 +18,16 @@ import org.hyperledger.besu.plugin.services.exception.StorageException; import org.hyperledger.besu.plugin.services.metrics.OperationTimer; import org.hyperledger.besu.plugin.services.storage.KeyValueStorageTransaction; +import org.hyperledger.besu.plugin.services.storage.SegmentIdentifier; +import org.hyperledger.besu.plugin.services.storage.SegmentedKeyValueStorageTransaction; import org.hyperledger.besu.plugin.services.storage.rocksdb.RocksDBMetrics; import org.hyperledger.besu.plugin.services.storage.rocksdb.RocksDbIterator; +import org.hyperledger.besu.plugin.services.storage.rocksdb.RocksDbSegmentIdentifier; +import java.util.Map; import java.util.Optional; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; import java.util.stream.Stream; import org.apache.commons.lang3.tuple.Pair; @@ -37,12 +42,12 @@ import org.slf4j.LoggerFactory; /** The Rocks db snapshot transaction. */ -public class RocksDBSnapshotTransaction implements KeyValueStorageTransaction, AutoCloseable { +public class RocksDBSnapshotTransaction implements SegmentedKeyValueStorageTransaction, AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(RocksDBSnapshotTransaction.class); private static final String NO_SPACE_LEFT_ON_DEVICE = "No space left on device"; private final RocksDBMetrics metrics; private final OptimisticTransactionDB db; - private final ColumnFamilyHandle columnFamilyHandle; + private final Function columnFamilyMapper; private final Transaction snapTx; private final RocksDBSnapshot snapshot; private final WriteOptions writeOptions; @@ -53,16 +58,16 @@ public class RocksDBSnapshotTransaction implements KeyValueStorageTransaction, A * Instantiates a new RocksDb snapshot transaction. * * @param db the db - * @param columnFamilyHandle the column family handle + * @param columnFamilyMapper mapper from segment identifier to column family handle * @param metrics the metrics */ RocksDBSnapshotTransaction( final OptimisticTransactionDB db, - final ColumnFamilyHandle columnFamilyHandle, + final Function columnFamilyMapper, final RocksDBMetrics metrics) { this.metrics = metrics; this.db = db; - this.columnFamilyHandle = columnFamilyHandle; + this.columnFamilyMapper = columnFamilyMapper; this.snapshot = new RocksDBSnapshot(db); this.writeOptions = new WriteOptions(); this.snapTx = db.beginTransaction(writeOptions); @@ -72,14 +77,14 @@ public class RocksDBSnapshotTransaction implements KeyValueStorageTransaction, A private RocksDBSnapshotTransaction( final OptimisticTransactionDB db, - final ColumnFamilyHandle columnFamilyHandle, + final Function columnFamilyMapper, final RocksDBMetrics metrics, final RocksDBSnapshot snapshot, final Transaction snapTx, final ReadOptions readOptions) { this.metrics = metrics; this.db = db; - this.columnFamilyHandle = columnFamilyHandle; + this.columnFamilyMapper = columnFamilyMapper; this.snapshot = snapshot; this.writeOptions = new WriteOptions(); this.readOptions = readOptions; @@ -92,22 +97,22 @@ private RocksDBSnapshotTransaction( * @param key the key * @return the optional data */ - public Optional get(final byte[] key) { + public Optional get(final SegmentIdentifier segmentId, final byte[] key) { throwIfClosed(); try (final OperationTimer.TimingContext ignored = metrics.getReadLatency().startTimer()) { - return Optional.ofNullable(snapTx.get(columnFamilyHandle, readOptions, key)); + return Optional.ofNullable(snapTx.get(columnFamilyMapper.apply(segmentId), readOptions, key)); } catch (final RocksDBException e) { throw new StorageException(e); } } @Override - public void put(final byte[] key, final byte[] value) { + public void put(final SegmentIdentifier segmentId, final byte[] key, final byte[] value) { throwIfClosed(); try (final OperationTimer.TimingContext ignored = metrics.getWriteLatency().startTimer()) { - snapTx.put(columnFamilyHandle, key, value); + snapTx.put(columnFamilyMapper.apply(segmentId), key, value); } catch (final RocksDBException e) { if (e.getMessage().contains(NO_SPACE_LEFT_ON_DEVICE)) { LOG.error(e.getMessage()); @@ -118,11 +123,11 @@ public void put(final byte[] key, final byte[] value) { } @Override - public void remove(final byte[] key) { + public void remove(final SegmentIdentifier segmentId, final byte[] key) { throwIfClosed(); try (final OperationTimer.TimingContext ignored = metrics.getRemoveLatency().startTimer()) { - snapTx.delete(columnFamilyHandle, key); + snapTx.delete(columnFamilyMapper.apply(segmentId), key); } catch (final RocksDBException e) { if (e.getMessage().contains(NO_SPACE_LEFT_ON_DEVICE)) { LOG.error(e.getMessage()); @@ -137,10 +142,10 @@ public void remove(final byte[] key) { * * @return the stream */ - public Stream> stream() { + public Stream> stream(final SegmentIdentifier segmentId) { throwIfClosed(); - final RocksIterator rocksIterator = db.newIterator(columnFamilyHandle, readOptions); + final RocksIterator rocksIterator = db.newIterator(columnFamilyMapper.apply(segmentId), readOptions); rocksIterator.seekToFirst(); return RocksDbIterator.create(rocksIterator).toStream(); } @@ -150,10 +155,10 @@ public Stream> stream() { * * @return the stream */ - public Stream streamKeys() { + public Stream streamKeys(final SegmentIdentifier segmentId) { throwIfClosed(); - final RocksIterator rocksIterator = db.newIterator(columnFamilyHandle, readOptions); + final RocksIterator rocksIterator = db.newIterator(columnFamilyMapper.apply(segmentId), readOptions); rocksIterator.seekToFirst(); return RocksDbIterator.create(rocksIterator).toStreamKeys(); } @@ -193,7 +198,7 @@ public RocksDBSnapshotTransaction copy() { var copySnapTx = db.beginTransaction(writeOptions); copySnapTx.rebuildFromWriteBatch(snapTx.getWriteBatch().getWriteBatch()); return new RocksDBSnapshotTransaction( - db, columnFamilyHandle, metrics, snapshot, copySnapTx, copyReadOptions); + db, columnFamilyMapper, metrics, snapshot, copySnapTx, copyReadOptions); } catch (Exception ex) { LOG.error("Failed to copy snapshot transaction", ex); snapshot.unMarkSnapshot(); diff --git a/plugins/rocksdb/src/main/java/org/hyperledger/besu/plugin/services/storage/rocksdb/segmented/TransactionDBRocksDBColumnarKeyValueStorage.java b/plugins/rocksdb/src/main/java/org/hyperledger/besu/plugin/services/storage/rocksdb/segmented/TransactionDBRocksDBColumnarKeyValueStorage.java index 0798172cf9e..8a439c57c06 100644 --- a/plugins/rocksdb/src/main/java/org/hyperledger/besu/plugin/services/storage/rocksdb/segmented/TransactionDBRocksDBColumnarKeyValueStorage.java +++ b/plugins/rocksdb/src/main/java/org/hyperledger/besu/plugin/services/storage/rocksdb/segmented/TransactionDBRocksDBColumnarKeyValueStorage.java @@ -18,10 +18,12 @@ import org.hyperledger.besu.plugin.services.exception.StorageException; import org.hyperledger.besu.plugin.services.storage.KeyValueStorageTransaction; import org.hyperledger.besu.plugin.services.storage.SegmentIdentifier; +import org.hyperledger.besu.plugin.services.storage.SegmentedKeyValueStorageTransaction; import org.hyperledger.besu.plugin.services.storage.rocksdb.RocksDBMetricsFactory; import org.hyperledger.besu.plugin.services.storage.rocksdb.RocksDBTransaction; import org.hyperledger.besu.plugin.services.storage.rocksdb.configuration.RocksDBConfiguration; import org.hyperledger.besu.services.kvstore.KeyValueStorageTransactionValidatorDecorator; +import org.hyperledger.besu.services.kvstore.SegmentedKeyValueStorageTransactionValidatorDecorator; import java.util.List; @@ -82,11 +84,11 @@ RocksDB getDB() { * @throws StorageException the storage exception */ @Override - public KeyValueStorageTransaction startTransaction() throws StorageException { + public SegmentedKeyValueStorageTransaction startTransaction() throws StorageException { throwIfClosed(); final WriteOptions writeOptions = new WriteOptions(); writeOptions.setIgnoreMissingColumnFamilies(true); - return new KeyValueStorageTransactionValidatorDecorator( - new RocksDBTransaction(db.beginTransaction(writeOptions), writeOptions, metrics), this.closed::get); + return new SegmentedKeyValueStorageTransactionValidatorDecorator( + new RocksDBTransaction(this::safeColumnHandle, db.beginTransaction(writeOptions), writeOptions, metrics), this.closed::get); } } diff --git a/plugins/rocksdb/src/test/java/org/hyperledger/besu/plugin/services/storage/rocksdb/segmented/RocksDBColumnarKeyValueStorageTest.java b/plugins/rocksdb/src/test/java/org/hyperledger/besu/plugin/services/storage/rocksdb/segmented/RocksDBColumnarKeyValueStorageTest.java index 3585bbfa5ae..f0fa7f5e814 100644 --- a/plugins/rocksdb/src/test/java/org/hyperledger/besu/plugin/services/storage/rocksdb/segmented/RocksDBColumnarKeyValueStorageTest.java +++ b/plugins/rocksdb/src/test/java/org/hyperledger/besu/plugin/services/storage/rocksdb/segmented/RocksDBColumnarKeyValueStorageTest.java @@ -36,7 +36,6 @@ import org.hyperledger.besu.plugin.services.storage.rocksdb.RocksDbSegmentIdentifier; import org.hyperledger.besu.plugin.services.storage.SegmentedKeyValueStorage; import org.hyperledger.besu.plugin.services.storage.SegmentedKeyValueStorage.Transaction; -import org.hyperledger.besu.services.kvstore.SnappableSegmentedKeyValueStorageAdapter; import java.nio.charset.StandardCharsets; import java.nio.file.Path; diff --git a/services/kvstore/src/main/java/org/hyperledger/besu/services/kvstore/InMemoryKeyValueStorageAdapter.java b/services/kvstore/src/main/java/org/hyperledger/besu/services/kvstore/InMemoryKeyValueStorageAdapter.java index 7050ec05341..d91cd910e87 100644 --- a/services/kvstore/src/main/java/org/hyperledger/besu/services/kvstore/InMemoryKeyValueStorageAdapter.java +++ b/services/kvstore/src/main/java/org/hyperledger/besu/services/kvstore/InMemoryKeyValueStorageAdapter.java @@ -63,7 +63,7 @@ public InMemoryKeyValueStorageAdapter() { rwLock = ((SegmentedInMemoryKeyValueStorage) storage).rwLock; } - public InMemoryKeyValueStorageAdapter(Map> initialMap) { + public InMemoryKeyValueStorageAdapter(final Map> initialMap) { super(SEGMENT_IDENTIFIER, new SegmentedInMemoryKeyValueStorage(asSegmentMap(initialMap))); rwLock = ((SegmentedInMemoryKeyValueStorage) storage).rwLock; } diff --git a/services/kvstore/src/main/java/org/hyperledger/besu/services/kvstore/InMemoryStoragePlugin.java b/services/kvstore/src/main/java/org/hyperledger/besu/services/kvstore/InMemoryStoragePlugin.java index c823352e81c..87c3cd2ea6d 100644 --- a/services/kvstore/src/main/java/org/hyperledger/besu/services/kvstore/InMemoryStoragePlugin.java +++ b/services/kvstore/src/main/java/org/hyperledger/besu/services/kvstore/InMemoryStoragePlugin.java @@ -89,7 +89,7 @@ private void createFactoriesAndRegisterWithStorageService() { } /** The Memory key value storage factory. */ - public static class InMemoryKeyValueStorageFactory implements KeyValueStorageFactory { + public static class InMemoryKeyValueStorageFactory implements KeyValueStorageFactory { private final String name; diff --git a/services/kvstore/src/main/java/org/hyperledger/besu/services/kvstore/LayeredKeyValueStorage.java b/services/kvstore/src/main/java/org/hyperledger/besu/services/kvstore/LayeredKeyValueStorage.java index ead9729136e..2120ba28574 100644 --- a/services/kvstore/src/main/java/org/hyperledger/besu/services/kvstore/LayeredKeyValueStorage.java +++ b/services/kvstore/src/main/java/org/hyperledger/besu/services/kvstore/LayeredKeyValueStorage.java @@ -98,11 +98,11 @@ public Stream> stream(final SegmentIdentifier segmentId) { final Lock lock = rwLock.readLock(); lock.lock(); try { - // immutable copy of our in memory store to use for streaming and filtering: - var ourLayerState = ImmutableMap.copyOf(hashValueStore); + // copy of our in memory store to use for streaming and filtering: + var ourLayerState = new HashMap<>(hashValueStore); return Streams.concat( - ourLayerState.computeIfAbsent(segmentId, __ -> new HashMap()).entrySet().stream() + ourLayerState.computeIfAbsent(segmentId, __ -> new HashMap<>()).entrySet().stream() .filter(entry -> entry.getValue().isPresent()) .map( bytesEntry -> @@ -127,8 +127,8 @@ public Stream streamKeys(final SegmentIdentifier segmentId) { final Lock lock = rwLock.readLock(); lock.lock(); try { - // immutable copy of our in memory store to use for streaming and filtering: - var ourLayerState = ImmutableMap.copyOf(hashValueStore); + // copy of our in memory store to use for streaming and filtering: + var ourLayerState = new HashMap<>(hashValueStore); return Streams.concat( ourLayerState.computeIfAbsent(segmentId, __ -> new HashMap<>()).entrySet().stream() diff --git a/services/kvstore/src/main/java/org/hyperledger/besu/services/kvstore/SegmentedInMemoryKeyValueStorage.java b/services/kvstore/src/main/java/org/hyperledger/besu/services/kvstore/SegmentedInMemoryKeyValueStorage.java index 0b35ff348c6..180425f7b34 100644 --- a/services/kvstore/src/main/java/org/hyperledger/besu/services/kvstore/SegmentedInMemoryKeyValueStorage.java +++ b/services/kvstore/src/main/java/org/hyperledger/besu/services/kvstore/SegmentedInMemoryKeyValueStorage.java @@ -176,7 +176,7 @@ public class SegmentedInMemoryTransaction implements SegmentedKeyValueStorageTra @Override - public void put(final SegmentIdentifier segmentIdentifier, byte[] key, final byte[] value) { + public void put(final SegmentIdentifier segmentIdentifier, final byte[] key, final byte[] value) { updatedValues.computeIfAbsent(segmentIdentifier, __ -> new HashMap<>()) .put(Bytes.wrap(key), Optional.of(value)); removedKeys.computeIfAbsent(segmentIdentifier, __ -> new HashSet<>()) diff --git a/services/kvstore/src/main/java/org/hyperledger/besu/services/kvstore/SnappableSegmentedKeyValueStorageAdapter.java b/services/kvstore/src/main/java/org/hyperledger/besu/services/kvstore/SnappableSegmentedKeyValueStorageAdapter.java deleted file mode 100644 index 9c641313246..00000000000 --- a/services/kvstore/src/main/java/org/hyperledger/besu/services/kvstore/SnappableSegmentedKeyValueStorageAdapter.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Copyright ConsenSys AG. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - */ -package org.hyperledger.besu.services.kvstore; - -import org.hyperledger.besu.plugin.services.storage.SegmentIdentifier; -import org.hyperledger.besu.plugin.services.storage.SegmentedKeyValueStorage; -import org.hyperledger.besu.plugin.services.storage.SnappableKeyValueStorage; -import org.hyperledger.besu.plugin.services.storage.SnappedKeyValueStorage; - -import java.util.function.Supplier; - -/** - * The type Segmented key value storage adapter. - * - * @param the type parameter - */ -public class SnappableSegmentedKeyValueStorageAdapter extends SegmentedKeyValueStorageAdapter - implements SnappableKeyValueStorage { - private final Supplier snapshotSupplier; - - /** - * Instantiates a new Segmented key value storage adapter. - * - * @param segment the segment - * @param storage the storage - */ - public SnappableSegmentedKeyValueStorageAdapter( - final SegmentIdentifier segment, final SegmentedKeyValueStorage storage) { - this( - segment, - storage, - () -> { - throw new UnsupportedOperationException("Snapshot not supported"); - }); - } - - /** - * Instantiates a new Segmented key value storage adapter. - * - * @param segment the segment - * @param storage the storage - * @param snapshotSupplier the snapshot supplier - */ - public SnappableSegmentedKeyValueStorageAdapter( - final SegmentIdentifier segment, - final SegmentedKeyValueStorage storage, - final Supplier snapshotSupplier) { - super(segment, storage); - this.snapshotSupplier = snapshotSupplier; - } - - @Override - public SnappedKeyValueStorage takeSnapshot() { - return snapshotSupplier.get(); - } -}