Skip to content

Commit

Permalink
happy with InMemory implementations
Browse files Browse the repository at this point in the history
todo:
  get rocks happy
  remove InMemoryKeyValueStorage in favor of the adapter

Signed-off-by: garyschulte <garyschulte@gmail.com>
  • Loading branch information
garyschulte committed Jul 12, 2023
1 parent 8241fcb commit f78f690
Show file tree
Hide file tree
Showing 20 changed files with 433 additions and 236 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ KeyValueStorage create(
* @return the storage instance reserved for the given segment.
* @exception StorageException problem encountered when creating storage for the segment.
*/
SegmentedKeyValueStorage<S> create(
SegmentedKeyValueStorage create(
List<SegmentIdentifier> segments, BesuConfiguration configuration, MetricsSystem metricsSystem)
throws StorageException;

Expand All @@ -90,7 +90,5 @@ SegmentedKeyValueStorage<S> create(
* @return <code>true</code> when the created storage supports snapshots <code>false</code> when
* it does not.
*/
default boolean isSnapshotIsolationSupported() {
return false;
}
boolean isSnapshotIsolationSupported();
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,8 @@
package org.hyperledger.besu.plugin.services.storage;

import org.hyperledger.besu.plugin.services.exception.StorageException;
import org.hyperledger.besu.plugin.services.storage.SegmentIdentifier;

import java.io.Closeable;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.function.Predicate;
Expand All @@ -28,20 +26,8 @@

/**
* Service provided by Besu to facilitate persistent data storage.
*
* @param <S> the segment identifier type
*/
public interface SegmentedKeyValueStorage<S> extends Closeable {

/**
* Gets segment identifier by name.
*
* @param segment the segment
* @return the segment identifier by name
*/
S getSegmentIdentifierByName(SegmentIdentifier segment);

SegmentedKeyValueStorage<S> getComposedSegmentStorage(List<SegmentIdentifier> segments);
public interface SegmentedKeyValueStorage extends Closeable {

/**
* Get the value from the associated segment and key.
Expand All @@ -51,7 +37,7 @@ public interface SegmentedKeyValueStorage<S> extends Closeable {
* @return The value persisted at the key index.
* @throws StorageException the storage exception
*/
Optional<byte[]> get(S segment, byte[] key) throws StorageException;
Optional<byte[]> get(SegmentIdentifier segment, byte[] key) throws StorageException;

/**
* Contains key.
Expand All @@ -61,7 +47,7 @@ public interface SegmentedKeyValueStorage<S> extends Closeable {
* @return the boolean
* @throws StorageException the storage exception
*/
default boolean containsKey(final S segment, final byte[] key) throws StorageException {
default boolean containsKey(final SegmentIdentifier segment, final byte[] key) throws StorageException {
return get(segment, key).isPresent();
}

Expand All @@ -71,71 +57,71 @@ default boolean containsKey(final S segment, final byte[] key) throws StorageExc
* @return An object representing the transaction.
* @throws StorageException the storage exception
*/
KeyValueStorageTransaction startTransaction() throws StorageException;
SegmentedKeyValueStorageTransaction startTransaction() throws StorageException;

/**
* Returns a stream of all keys for the segment.
*
* @param segmentHandle The segment handle whose keys we want to stream.
* @param segmentIdentifier The segment identifier whose keys we want to stream.
* @return A stream of all keys in the specified segment.
*/
Stream<Pair<byte[], byte[]>> stream(final S segmentHandle);
Stream<Pair<byte[], byte[]>> stream(final SegmentIdentifier segmentIdentifier);

/**
* Returns a stream of key-value pairs starting from the specified key. This method is used to
* retrieve a stream of data from the storage, starting from the given key. If no data is
* available from the specified key onwards, an empty stream is returned.
*
* @param segmentHandle The segment handle whose keys we want to stream.
* @param segmentIdentifier The segment identifier whose keys we want to stream.
* @param startKey The key from which the stream should start.
* @return A stream of key-value pairs starting from the specified key.
*/
Stream<Pair<byte[], byte[]>> streamFromKey(final S segmentHandle, final byte[] startKey);
Stream<Pair<byte[], byte[]>> streamFromKey(final SegmentIdentifier segmentIdentifier, final byte[] startKey);

/**
* Stream keys.
*
* @param segmentHandle the segment handle
* @param segmentIdentifier the segment identifier
* @return the stream
*/
Stream<byte[]> streamKeys(final S segmentHandle);
Stream<byte[]> streamKeys(final SegmentIdentifier segmentIdentifier);

/**
* Delete the value corresponding to the given key in the given segment if a write lock can be
* instantly acquired on the underlying storage. Do nothing otherwise.
*
* @param segmentHandle The segment handle whose keys we want to stream.
* @param segmentIdentifier The segment identifier whose keys we want to stream.
* @param key The key to delete.
* @return false if the lock on the underlying storage could not be instantly acquired, true
* otherwise
* @throws StorageException any problem encountered during the deletion attempt.
*/
boolean tryDelete(S segmentHandle, byte[] key) throws StorageException;
boolean tryDelete(SegmentIdentifier segmentIdentifier, byte[] key) throws StorageException;

/**
* Gets all keys that matches condition.
*
* @param segmentHandle the segment handle
* @param segmentIdentifier the segment identifier
* @param returnCondition the return condition
* @return set of result
*/
Set<byte[]> getAllKeysThat(S segmentHandle, Predicate<byte[]> returnCondition);
Set<byte[]> getAllKeysThat(SegmentIdentifier segmentIdentifier, Predicate<byte[]> returnCondition);

/**
* Gets all values from keys that matches condition.
*
* @param segmentHandle the segment handle
* @param segmentIdentifier the segment identifier
* @param returnCondition the return condition
* @return the set of result
*/
Set<byte[]> getAllValuesFromKeysThat(final S segmentHandle, Predicate<byte[]> returnCondition);
Set<byte[]> getAllValuesFromKeysThat(final SegmentIdentifier segmentIdentifier, Predicate<byte[]> returnCondition);

/**
* Clear.
*
* @param segmentHandle the segment handle
* @param segmentIdentifier the segment identifier
*/
void clear(S segmentHandle);
void clear(SegmentIdentifier segmentIdentifier);

/**
* Whether the underlying storage is closed.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.plugin.services.storage;

import org.hyperledger.besu.plugin.Unstable;
import org.hyperledger.besu.plugin.services.exception.StorageException;

/** A transaction that can atomically commit a sequence of operations to a segmented key-value store. */
@Unstable
public interface SegmentedKeyValueStorageTransaction {

/**
* Associates the specified value with the specified key.
*
* <p>If a previously value had been store against the given key, the old value is replaced by the
* given value.
*
* @param segmentIdentifier the segment identifier
* @param key the given value is to be associated with.
* @param value associated with the specified key.
*/
void put(SegmentIdentifier segmentIdentifier, byte[] key, byte[] value);

/**
* When the given key is present, the key and mapped value will be removed from storage.
*
* @param segmentIdentifier the segment identifier
* @param key the key and mapped value that will be removed.
*/
void remove(SegmentIdentifier segmentIdentifier, byte[] key);

/**
* Performs an atomic commit of all the operations queued in the transaction.
*
* @throws StorageException problem was encountered preventing the commit
*/
void commit() throws StorageException;

/** Reset the transaction to a state prior to any operations being queued. */
void rollback();
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@
*/
package org.hyperledger.besu.plugin.services.storage;

/** The interface Snappable key value storage. */
public interface SnappableKeyValueStorage extends KeyValueStorage {
/** The interface Snappable key value storage.
*
* @param <S> the type which will be returned from takeSnapshot()
*/
public interface SnappableKeyValueStorage extends SegmentedKeyValueStorage {

/**
* Take snapshot.
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@
package org.hyperledger.besu.plugin.services.storage;

/** The interface Snapped key value storage. */
public interface SnappedKeyValueStorage extends KeyValueStorage {
public interface SnappedKeyValueStorage extends SegmentedKeyValueStorage {

/**
* Gets snapshot transaction.
*
* @return the snapshot transaction
*/
KeyValueStorageTransaction getSnapshotTransaction();
SegmentedKeyValueStorageTransaction getSnapshotTransaction();

}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ public boolean isSegmentIsolationSupported() {
return publicFactory.isSegmentIsolationSupported();
}

@Override
public boolean isSnapshotIsolationSupported() {
return publicFactory.isSegmentIsolationSupported();
}

@Override
public void close() throws IOException {
publicFactory.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ public class RocksDBKeyValueStorageFactory implements KeyValueStorageFactory<Roc

private final int defaultVersion;
private Integer databaseVersion;
private Boolean isSegmentIsolationSupported;
private RocksDBColumnarKeyValueStorage segmentedStorage;
private RocksDBConfiguration rocksDBConfiguration;

Expand Down Expand Up @@ -236,7 +235,6 @@ private void init(final BesuConfiguration commonConfiguration) {
+ " could not be found. You may not have the appropriate permission to access the item.";
throw new StorageException(message, e);
}
isSegmentIsolationSupported = databaseVersion >= 1;
rocksDBConfiguration =
RocksDBConfigurationBuilder.from(configuration.get())
.databaseDir(storagePath(commonConfiguration))
Expand Down Expand Up @@ -285,9 +283,7 @@ public void close() throws IOException {

@Override
public boolean isSegmentIsolationSupported() {
return checkNotNull(
isSegmentIsolationSupported,
"Whether segment isolation is supported will be determined during creation. Call a creation method first");
return true;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public OptimisticRocksDBColumnarKeyValueStorage(
OptimisticTransactionDB.open(
options, configuration.getDatabaseDir().toString(), columnDescriptors, columnHandles);
initMetrics();
initColumnHandler();
initColumnHandles();

} catch (final RocksDBException e) {
throw new StorageException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.hyperledger.besu.plugin.services.exception.StorageException;
import org.hyperledger.besu.plugin.services.storage.KeyValueStorageTransaction;
import org.hyperledger.besu.plugin.services.storage.SegmentedKeyValueStorageTransaction;
import org.hyperledger.besu.plugin.services.storage.SnappedKeyValueStorage;
import org.hyperledger.besu.plugin.services.storage.rocksdb.RocksDBMetrics;
import org.hyperledger.besu.plugin.services.storage.rocksdb.RocksDbSegmentIdentifier;
Expand Down Expand Up @@ -53,19 +54,17 @@ public class RocksDBColumnarKeyValueSnapshot implements SnappedKeyValueStorage {
* Instantiates a new RocksDb columnar key value snapshot.
*
* @param db the db
* @param segment the segment
* @param metrics the metrics
*/
RocksDBColumnarKeyValueSnapshot(
final OptimisticTransactionDB db,
final RocksDbSegmentIdentifier segment,
final RocksDBMetrics metrics) {
this.db = db;
this.snapTx = new RocksDBSnapshotTransaction(db, segment.get(), metrics);
}

@Override
public Optional<byte[]> get(final byte[] key) throws StorageException {
public Optional<SegmentIdentifier segmentIdentifier, byte[]> get(final byte[] key) throws StorageException {
throwIfClosed();
return snapTx.get(key);
}
Expand Down
Loading

0 comments on commit f78f690

Please sign in to comment.