Skip to content

Commit

Permalink
getting close
Browse files Browse the repository at this point in the history
Signed-off-by: garyschulte <garyschulte@gmail.com>
  • Loading branch information
garyschulte committed Jul 13, 2023
1 parent f78f690 commit 8b44070
Show file tree
Hide file tree
Showing 18 changed files with 135 additions and 180 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import org.hyperledger.besu.plugin.services.storage.rocksdb.RocksDBMetricsFactory;
import org.hyperledger.besu.plugin.services.storage.rocksdb.configuration.RocksDBConfigurationBuilder;
import org.hyperledger.besu.plugin.services.storage.rocksdb.segmented.OptimisticRocksDBColumnarKeyValueStorage;
import org.hyperledger.besu.services.kvstore.SnappableSegmentedKeyValueStorageAdapter;
import org.hyperledger.besu.services.kvstore.SegmentedKeyValueStorageAdapter;

import java.io.IOException;
import java.nio.file.Files;
Expand Down Expand Up @@ -70,7 +70,7 @@ public static OperationBenchmarkHelper create() throws IOException {
RocksDBMetricsFactory.PUBLIC_ROCKS_DB_METRICS);

final KeyValueStorage keyValueStorage =
new SnappableSegmentedKeyValueStorageAdapter<>(
new SegmentedKeyValueStorageAdapter(
KeyValueSegmentIdentifier.BLOCKCHAIN, optimisticRocksDBColumnarKeyValueStorage);

final ExecutionContextTestFixture executionContext =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

/** Factory for creating key-value storage instances. */
@Unstable
public interface KeyValueStorageFactory<S> extends Closeable {
public interface KeyValueStorageFactory extends Closeable {

/**
* Retrieves the identity of the key-value storage factory.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

/** The interface Privacy key value storage factory. */
@Unstable
public interface PrivacyKeyValueStorageFactory<S> extends KeyValueStorageFactory<S> {
public interface PrivacyKeyValueStorageFactory extends KeyValueStorageFactory {
/**
* Retrieves the version of the key-value storage factory.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

/** The interface Snappable key value storage.
*
* @param <S> the type which will be returned from takeSnapshot()
*/
public interface SnappableKeyValueStorage extends SegmentedKeyValueStorage {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@
import org.hyperledger.besu.plugin.services.storage.KeyValueStorage;
import org.hyperledger.besu.plugin.services.storage.PrivacyKeyValueStorageFactory;
import org.hyperledger.besu.plugin.services.storage.SegmentIdentifier;
import org.hyperledger.besu.plugin.services.storage.SegmentedKeyValueStorage;
import org.hyperledger.besu.plugin.services.storage.rocksdb.configuration.DatabaseMetadata;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import java.util.Set;

import org.slf4j.Logger;
Expand Down Expand Up @@ -75,6 +77,23 @@ public KeyValueStorage create(
return publicFactory.create(segment, commonConfiguration, metricsSystem);
}

@Override
public SegmentedKeyValueStorage create(
final List<SegmentIdentifier> segments,
final BesuConfiguration commonConfiguration,
final MetricsSystem metricsSystem)
throws StorageException {
if (databaseVersion == null) {
try {
databaseVersion = readDatabaseVersion(commonConfiguration);
} catch (final IOException e) {
throw new StorageException("Failed to retrieve the RocksDB database meta version", e);
}
}

return publicFactory.create(segments, commonConfiguration, metricsSystem);
}

@Override
public boolean isSegmentIsolationSupported() {
return publicFactory.isSegmentIsolationSupported();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@
*/
package org.hyperledger.besu.plugin.services.storage.rocksdb;

import static com.google.common.base.Preconditions.checkNotNull;

import org.hyperledger.besu.ethereum.worldstate.DataStorageFormat;
import org.hyperledger.besu.plugin.services.BesuConfiguration;
import org.hyperledger.besu.plugin.services.MetricsSystem;
Expand All @@ -32,7 +30,6 @@
import org.hyperledger.besu.plugin.services.storage.rocksdb.segmented.RocksDBColumnarKeyValueStorage;
import org.hyperledger.besu.plugin.services.storage.rocksdb.segmented.TransactionDBRocksDBColumnarKeyValueStorage;
import org.hyperledger.besu.services.kvstore.SegmentedKeyValueStorageAdapter;
import org.hyperledger.besu.services.kvstore.SnappableSegmentedKeyValueStorageAdapter;

import java.io.IOException;
import java.nio.file.Files;
Expand All @@ -45,8 +42,12 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** The Rocks db key value storage factory. */
public class RocksDBKeyValueStorageFactory implements KeyValueStorageFactory<RocksDbSegmentIdentifier> {
/**
* The Rocks db key value storage factory creates segmented storage and uses a adapter to support
* unsegmented keyvalue storage.
*
*/
public class RocksDBKeyValueStorageFactory implements KeyValueStorageFactory {

private static final Logger LOG = LoggerFactory.getLogger(RocksDBKeyValueStorageFactory.class);
private static final int DEFAULT_VERSION = 1;
Expand Down Expand Up @@ -151,6 +152,15 @@ public KeyValueStorage create(
final BesuConfiguration commonConfiguration,
final MetricsSystem metricsSystem)
throws StorageException {
return new SegmentedKeyValueStorageAdapter(segment, create(List.of(segment), commonConfiguration, metricsSystem));
}

@Override
public SegmentedKeyValueStorage create(
final List<SegmentIdentifier> segments,
final BesuConfiguration commonConfiguration,
final MetricsSystem metricsSystem)
throws StorageException {
final boolean isForestStorageFormat =
DataStorageFormat.FOREST.getDatabaseVersion() == commonConfiguration.getDatabaseVersion();
if (requiresInit()) {
Expand All @@ -169,38 +179,23 @@ public KeyValueStorage create(
.collect(Collectors.toList());
if (isForestStorageFormat) {
LOG.debug("FOREST mode detected, using TransactionDB.");
segmentedStorage =
new TransactionDBRocksDBColumnarKeyValueStorage(
rocksDBConfiguration,
segmentsForVersion,
ignorableSegments,
metricsSystem,
rocksDBMetricsFactory);
segmentedStorage = new TransactionDBRocksDBColumnarKeyValueStorage(
rocksDBConfiguration,
segmentsForVersion,
ignorableSegments,
metricsSystem,
rocksDBMetricsFactory);
} else {
LOG.debug("Using OptimisticTransactionDB.");
segmentedStorage =
new OptimisticRocksDBColumnarKeyValueStorage(
rocksDBConfiguration,
segmentsForVersion,
ignorableSegments,
metricsSystem,
rocksDBMetricsFactory);
segmentedStorage = new OptimisticRocksDBColumnarKeyValueStorage(
rocksDBConfiguration,
segmentsForVersion,
ignorableSegments,
metricsSystem,
rocksDBMetricsFactory);
}
}

final RocksDbSegmentIdentifier rocksSegment =
segmentedStorage.getSegmentIdentifierByName(segment);

if (isForestStorageFormat) {
return new SegmentedKeyValueStorageAdapter<>(segment, segmentedStorage);
} else {
return new SnappableSegmentedKeyValueStorageAdapter<>(
segment,
segmentedStorage,
() ->
((OptimisticRocksDBColumnarKeyValueStorage) segmentedStorage)
.takeSnapshot(rocksSegment));
}
return segmentedStorage;
}
default -> throw new IllegalStateException(
String.format(
Expand All @@ -209,12 +204,6 @@ public KeyValueStorage create(
}
}

@Override
public SegmentedKeyValueStorage<RocksDbSegmentIdentifier> create(final List<SegmentIdentifier> segments, final BesuConfiguration configuration, final MetricsSystem metricsSystem) throws StorageException {
//TODO: write me
return null;
}

/**
* Storage path.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,27 @@
import org.hyperledger.besu.plugin.services.exception.StorageException;
import org.hyperledger.besu.plugin.services.metrics.OperationTimer;
import org.hyperledger.besu.plugin.services.storage.KeyValueStorageTransaction;
import org.hyperledger.besu.plugin.services.storage.SegmentIdentifier;
import org.hyperledger.besu.plugin.services.storage.SegmentedKeyValueStorageTransaction;

import java.util.function.Function;

import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.RocksDBException;
import org.rocksdb.Transaction;
import org.rocksdb.WriteOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** The RocksDb transaction. */
public class RocksDBTransaction implements KeyValueStorageTransaction {
public class RocksDBTransaction implements SegmentedKeyValueStorageTransaction {
private static final Logger logger = LoggerFactory.getLogger(RocksDBTransaction.class);
private static final String NO_SPACE_LEFT_ON_DEVICE = "No space left on device";

private final RocksDBMetrics metrics;
private final Transaction innerTx;
private final WriteOptions options;
private final Function<SegmentIdentifier, ColumnFamilyHandle> columnFamilyMapper;

/**
* Instantiates a new RocksDb transaction.
Expand All @@ -41,16 +47,18 @@ public class RocksDBTransaction implements KeyValueStorageTransaction {
* @param metrics the metrics
*/
public RocksDBTransaction(
final Function<SegmentIdentifier, ColumnFamilyHandle> columnFamilyMapper,
final Transaction innerTx, final WriteOptions options, final RocksDBMetrics metrics) {
this.columnFamilyMapper = columnFamilyMapper;
this.innerTx = innerTx;
this.options = options;
this.metrics = metrics;
}

@Override
public void put(final byte[] key, final byte[] value) {
public void put(final SegmentIdentifier segmentId, final byte[] key, final byte[] value) {
try (final OperationTimer.TimingContext ignored = metrics.getWriteLatency().startTimer()) {
innerTx.put(key, value);
innerTx.put(columnFamilyMapper.apply(segmentId), key, value);
} catch (final RocksDBException e) {
if (e.getMessage().contains(NO_SPACE_LEFT_ON_DEVICE)) {
logger.error(e.getMessage());
Expand All @@ -61,9 +69,9 @@ public void put(final byte[] key, final byte[] value) {
}

@Override
public void remove(final byte[] key) {
public void remove(final SegmentIdentifier segmentId, final byte[] key) {
try (final OperationTimer.TimingContext ignored = metrics.getRemoveLatency().startTimer()) {
innerTx.delete(key);
innerTx.delete(columnFamilyMapper.apply(segmentId), key);
} catch (final RocksDBException e) {
if (e.getMessage().contains(NO_SPACE_LEFT_ON_DEVICE)) {
logger.error(e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@

import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.plugin.services.exception.StorageException;
import org.hyperledger.besu.plugin.services.storage.KeyValueStorageTransaction;
import org.hyperledger.besu.plugin.services.storage.SegmentIdentifier;
import org.hyperledger.besu.plugin.services.storage.SegmentedKeyValueStorageTransaction;
import org.hyperledger.besu.plugin.services.storage.SnappableKeyValueStorage;
import org.hyperledger.besu.plugin.services.storage.rocksdb.RocksDBMetricsFactory;
import org.hyperledger.besu.plugin.services.storage.rocksdb.RocksDBTransaction;
import org.hyperledger.besu.plugin.services.storage.rocksdb.RocksDbSegmentIdentifier;
import org.hyperledger.besu.plugin.services.storage.rocksdb.configuration.RocksDBConfiguration;
import org.hyperledger.besu.services.kvstore.KeyValueStorageTransactionValidatorDecorator;
import org.hyperledger.besu.services.kvstore.SegmentedKeyValueStorageTransactionValidatorDecorator;

import java.util.List;

Expand All @@ -32,7 +32,7 @@
import org.rocksdb.WriteOptions;

/** Optimistic RocksDB Columnar key value storage */
public class OptimisticRocksDBColumnarKeyValueStorage extends RocksDBColumnarKeyValueStorage {
public class OptimisticRocksDBColumnarKeyValueStorage extends RocksDBColumnarKeyValueStorage implements SnappableKeyValueStorage {
private final OptimisticTransactionDB db;

/**
Expand Down Expand Up @@ -78,24 +78,23 @@ RocksDB getDB() {
* @throws StorageException the storage exception
*/
@Override
public KeyValueStorageTransaction startTransaction() throws StorageException {
public SegmentedKeyValueStorageTransaction startTransaction() throws StorageException {
throwIfClosed();
final WriteOptions writeOptions = new WriteOptions();
writeOptions.setIgnoreMissingColumnFamilies(true);
return new KeyValueStorageTransactionValidatorDecorator(
new RocksDBTransaction(db.beginTransaction(writeOptions), writeOptions, this.metrics), this.closed::get);
return new SegmentedKeyValueStorageTransactionValidatorDecorator(
new RocksDBTransaction(this::safeColumnHandle, db.beginTransaction(writeOptions), writeOptions, this.metrics), this.closed::get);
}

/**
* Take snapshot RocksDb columnar key value snapshot.
*
* @param segment the segment
* @return the RocksDb columnar key value snapshot
* @throws StorageException the storage exception
*/
public RocksDBColumnarKeyValueSnapshot takeSnapshot(final RocksDbSegmentIdentifier segment)
public RocksDBColumnarKeyValueSnapshot takeSnapshot()
throws StorageException {
throwIfClosed();
return new RocksDBColumnarKeyValueSnapshot(db, segment, metrics);
return new RocksDBColumnarKeyValueSnapshot(db, metrics);
}
}
Loading

0 comments on commit 8b44070

Please sign in to comment.