Skip to content

Commit

Permalink
Merge pull request #1498 from DependencyTrack/rocksdb-memory
Browse files Browse the repository at this point in the history
Limit memory usage of RocksDB and make it more configurable
  • Loading branch information
nscuro authored Sep 11, 2024
2 parents e847e6c + d8946c3 commit f363133
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,34 +20,102 @@

import io.quarkus.runtime.annotations.RegisterForReflection;
import org.apache.kafka.streams.state.RocksDBConfigSetter;
import org.apache.kafka.streams.state.internals.BlockBasedTableConfigWithAccessibleCache;
import org.dependencytrack.common.config.QuarkusConfigUtil;
import org.dependencytrack.kstreams.statestore.StateStoreConfig.RocksDbConfig;
import org.rocksdb.Cache;
import org.rocksdb.LRUCache;
import org.rocksdb.Options;
import org.rocksdb.WriteBufferManager;

import java.util.Map;
import java.util.Optional;
import java.util.OptionalDouble;
import java.util.concurrent.locks.ReentrantLock;

/**
* An implementation of {@link RocksDBConfigSetter} for customizing RocksDB.
* <p>
* Customizations are configurable via Quarkus Config. Available configuration options are defined in {@link StateStoreConfig.RocksDbConfig}.
* Customizations are configurable via Quarkus Config. Available configuration options are defined in {@link RocksDbConfig}.
*
* @see <a href="https://kafka.apache.org/34/documentation/streams/developer-guide/config-streams#rocksdb-config-setter">Kafka Streams Documentation</a>
*/
@RegisterForReflection
public class RocksDbConfigSetter implements RocksDBConfigSetter {

private static Cache BLOCK_CACHE;
private static WriteBufferManager WRITE_BUFFER_MANAGER;
private static boolean INITIALIZED = false;
private static final ReentrantLock INIT_LOCK = new ReentrantLock();

@Override
public void setConfig(final String storeName, final Options options, final Map<String, Object> configs) {
QuarkusConfigUtil.getConfigMapping(StateStoreConfig.class)
.map(StateStoreConfig::rocksDb)
.ifPresent(config -> {
config.compactionStyle().ifPresent(options::setCompactionStyle);
config.compressionType().ifPresent(options::setCompressionType);
});
final Optional<RocksDbConfig> optionalConfig = QuarkusConfigUtil
.getConfigMapping(StateStoreConfig.class)
.map(StateStoreConfig::rocksDb);
if (optionalConfig.isEmpty()) {
return;
}

final RocksDbConfig config = optionalConfig.get();

INIT_LOCK.lock();
try {
maybeInit(config);
} finally {
INIT_LOCK.unlock();
}

final var tableConfig = (BlockBasedTableConfigWithAccessibleCache) options.tableFormatConfig();

// Kafka Streams configures a default cache of size 50MiB for each state store.
// Be a good citizen and ensure that default cache is closed after overwriting it.
try (final Cache ignoredDefaultCache = tableConfig.blockCache()) {
tableConfig.setBlockCache(BLOCK_CACHE);
}

// Ensure the memory used by RocksDB is limited to the size of the block cache.
tableConfig.setCacheIndexAndFilterBlocks(true);
tableConfig.setBlockSize(config.blockSizeBytes());
options.setWriteBufferManager(WRITE_BUFFER_MANAGER);

// If a high priority pool ratio is configured, ensure that RocksDB can make use of that.
// https://github.com/facebook/rocksdb/wiki/Block-Cache#caching-index-filter-and-compression-dictionary-blocks
if (config.highPriorityPoolRatio().isPresent()) {
tableConfig.setCacheIndexAndFilterBlocksWithHighPriority(true);
tableConfig.setPinTopLevelIndexAndFilter(true);
}

config.compactionStyle().ifPresent(options::setCompactionStyle);
config.compressionType().ifPresent(options::setCompressionType);
}

@Override
public void close(final String storeName, final Options options) {
// Nothing to close here.
}

private void maybeInit(final RocksDbConfig config) {
if (INITIALIZED) {
return;
}

final OptionalDouble highPriorityPoolRatio = config.highPriorityPoolRatio();
if (highPriorityPoolRatio.isPresent()) {
BLOCK_CACHE = new LRUCache(
config.blockCacheSizeBytes(),
/* numShardBits */ -1,
/* strictCapacityLimit */ false,
highPriorityPoolRatio.getAsDouble());
} else {
BLOCK_CACHE = new LRUCache(config.blockCacheSizeBytes());
}

WRITE_BUFFER_MANAGER = new WriteBufferManager(
config.writeBufferSize(),
BLOCK_CACHE);

INITIALIZED = true;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.rocksdb.CompressionType;

import java.util.Optional;
import java.util.OptionalDouble;

@ConfigMapping(prefix = "state-store")
public interface StateStoreConfig {
Expand All @@ -36,6 +37,25 @@ public interface StateStoreConfig {

interface RocksDbConfig {

/**
* The Kafka Streams default is 50MiB <em>per state store</em>.
* We use the same cache across all stores, so this should be considered.
*/
@WithDefault(/* 128MiB */ "134217728")
long blockCacheSizeBytes();

/**
* Ratio of the block cache size that shall be reserved for high priority blocks,
* such as indexes and filters, preventing them from being evicted.
*/
OptionalDouble highPriorityPoolRatio();

@WithDefault(/* 16MiB */ "16777216")
long writeBufferSize();

@WithDefault(/* 4KiB */ "4096")
long blockSizeBytes();

Optional<CompactionStyle> compactionStyle();

Optional<CompressionType> compressionType();
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ kafka-streams.exception.thresholds.production.interval=PT30M
## Kafka Streams State Stores
#
state-store.type=in_memory
# state-store.rocks-db.block-size-bytes=
# state-store.rocks-db.block-cache-size-bytes=
# state-store.rocks-db.high-priority-pool-ratio=
# state-store.rocks-db.write-buffer-size=
# state-store.rocks-db.compaction-style=
# state-store.rocks-db.compression-type=

Expand Down

0 comments on commit f363133

Please sign in to comment.