diff --git a/commons-kstreams/src/main/java/org/dependencytrack/kstreams/statestore/RocksDbConfigSetter.java b/commons-kstreams/src/main/java/org/dependencytrack/kstreams/statestore/RocksDbConfigSetter.java
index 25c834d50..6865183e1 100644
--- a/commons-kstreams/src/main/java/org/dependencytrack/kstreams/statestore/RocksDbConfigSetter.java
+++ b/commons-kstreams/src/main/java/org/dependencytrack/kstreams/statestore/RocksDbConfigSetter.java
@@ -20,29 +20,74 @@
import io.quarkus.runtime.annotations.RegisterForReflection;
import org.apache.kafka.streams.state.RocksDBConfigSetter;
+import org.apache.kafka.streams.state.internals.BlockBasedTableConfigWithAccessibleCache;
import org.dependencytrack.common.config.QuarkusConfigUtil;
+import org.dependencytrack.kstreams.statestore.StateStoreConfig.RocksDbConfig;
+import org.rocksdb.Cache;
+import org.rocksdb.LRUCache;
import org.rocksdb.Options;
+import org.rocksdb.WriteBufferManager;
import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalDouble;
+import java.util.concurrent.locks.ReentrantLock;
/**
* An implementation of {@link RocksDBConfigSetter} for customizing RocksDB.
*
- * Customizations are configurable via Quarkus Config. Available configuration options are defined in {@link StateStoreConfig.RocksDbConfig}.
+ * Customizations are configurable via Quarkus Config. Available configuration options are defined in {@link RocksDbConfig}.
*
* @see Kafka Streams Documentation
*/
@RegisterForReflection
public class RocksDbConfigSetter implements RocksDBConfigSetter {
+ private static Cache BLOCK_CACHE;
+ private static WriteBufferManager WRITE_BUFFER_MANAGER;
+ private static boolean INITIALIZED = false;
+ private static final ReentrantLock INIT_LOCK = new ReentrantLock();
+
@Override
public void setConfig(final String storeName, final Options options, final Map configs) {
- QuarkusConfigUtil.getConfigMapping(StateStoreConfig.class)
- .map(StateStoreConfig::rocksDb)
- .ifPresent(config -> {
- config.compactionStyle().ifPresent(options::setCompactionStyle);
- config.compressionType().ifPresent(options::setCompressionType);
- });
+ final Optional optionalConfig = QuarkusConfigUtil
+ .getConfigMapping(StateStoreConfig.class)
+ .map(StateStoreConfig::rocksDb);
+ if (optionalConfig.isEmpty()) {
+ return;
+ }
+
+ final RocksDbConfig config = optionalConfig.get();
+
+ INIT_LOCK.lock();
+ try {
+ maybeInit(config);
+ } finally {
+ INIT_LOCK.unlock();
+ }
+
+ final var tableConfig = (BlockBasedTableConfigWithAccessibleCache) options.tableFormatConfig();
+
+ // Kafka Streams configures a default cache of size 50MiB for each state store.
+ // Be a good citizen and ensure that default cache is closed after overwriting it.
+ try (final Cache ignoredDefaultCache = tableConfig.blockCache()) {
+ tableConfig.setBlockCache(BLOCK_CACHE);
+ }
+
+ // Ensure the memory used by RocksDB is limited to the size of the block cache.
+ tableConfig.setCacheIndexAndFilterBlocks(true);
+ tableConfig.setBlockSize(config.blockSizeBytes());
+ options.setWriteBufferManager(WRITE_BUFFER_MANAGER);
+
+ // If a high priority pool ratio is configured, ensure that RocksDB can make use of that.
+ // https://github.com/facebook/rocksdb/wiki/Block-Cache#caching-index-filter-and-compression-dictionary-blocks
+ if (config.highPriorityPoolRatio().isPresent()) {
+ tableConfig.setCacheIndexAndFilterBlocksWithHighPriority(true);
+ tableConfig.setPinTopLevelIndexAndFilter(true);
+ }
+
+ config.compactionStyle().ifPresent(options::setCompactionStyle);
+ config.compressionType().ifPresent(options::setCompressionType);
}
@Override
@@ -50,4 +95,27 @@ public void close(final String storeName, final Options options) {
// Nothing to close here.
}
+ private void maybeInit(final RocksDbConfig config) {
+ if (INITIALIZED) {
+ return;
+ }
+
+ final OptionalDouble highPriorityPoolRatio = config.highPriorityPoolRatio();
+ if (highPriorityPoolRatio.isPresent()) {
+ BLOCK_CACHE = new LRUCache(
+ config.blockCacheSizeBytes(),
+ /* numShardBits */ -1,
+ /* strictCapacityLimit */ false,
+ highPriorityPoolRatio.getAsDouble());
+ } else {
+ BLOCK_CACHE = new LRUCache(config.blockCacheSizeBytes());
+ }
+
+ WRITE_BUFFER_MANAGER = new WriteBufferManager(
+ config.writeBufferSize(),
+ BLOCK_CACHE);
+
+ INITIALIZED = true;
+ }
+
}
diff --git a/commons-kstreams/src/main/java/org/dependencytrack/kstreams/statestore/StateStoreConfig.java b/commons-kstreams/src/main/java/org/dependencytrack/kstreams/statestore/StateStoreConfig.java
index 0696a50f6..9281810f2 100644
--- a/commons-kstreams/src/main/java/org/dependencytrack/kstreams/statestore/StateStoreConfig.java
+++ b/commons-kstreams/src/main/java/org/dependencytrack/kstreams/statestore/StateStoreConfig.java
@@ -25,6 +25,7 @@
import org.rocksdb.CompressionType;
import java.util.Optional;
+import java.util.OptionalDouble;
@ConfigMapping(prefix = "state-store")
public interface StateStoreConfig {
@@ -36,6 +37,25 @@ public interface StateStoreConfig {
interface RocksDbConfig {
+ /**
+ * The Kafka Streams default is 50MiB per state store.
+ * We use the same cache across all stores, so this should be considered.
+ */
+ @WithDefault(/* 128MiB */ "134217728")
+ long blockCacheSizeBytes();
+
+ /**
+ * Ratio of the block cache size that shall be reserved for high priority blocks,
+ * such as indexes and filters, preventing them from being evicted.
+ */
+ OptionalDouble highPriorityPoolRatio();
+
+ @WithDefault(/* 16MiB */ "16777216")
+ long writeBufferSize();
+
+ @WithDefault(/* 4KiB */ "4096")
+ long blockSizeBytes();
+
Optional compactionStyle();
Optional compressionType();
diff --git a/commons-kstreams/src/test/java/org/dependencytrack/kstreams/statestore/RocksDbConfigSetterTest.java b/commons-kstreams/src/test/java/org/dependencytrack/kstreams/statestore/RocksDbConfigSetterTest.java
deleted file mode 100644
index 04cb28d23..000000000
--- a/commons-kstreams/src/test/java/org/dependencytrack/kstreams/statestore/RocksDbConfigSetterTest.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * This file is part of Dependency-Track.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * Copyright (c) OWASP Foundation. All Rights Reserved.
- */
-package org.dependencytrack.kstreams.statestore;
-
-import io.quarkus.test.junit.QuarkusTest;
-import io.quarkus.test.junit.QuarkusTestProfile;
-import io.quarkus.test.junit.TestProfile;
-import jakarta.inject.Inject;
-import org.junit.jupiter.api.Test;
-import org.junit.platform.suite.api.SelectClasses;
-import org.junit.platform.suite.api.Suite;
-import org.rocksdb.CompactionStyle;
-import org.rocksdb.CompressionType;
-import org.rocksdb.Options;
-
-import java.util.Collections;
-import java.util.Map;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-@Suite
-@SelectClasses(value = {
- RocksDbConfigSetterTest.WithoutQuarkusConfigTest.class,
- RocksDbConfigSetterTest.WithQuarkusConfigTest.class
-})
-class RocksDbConfigSetterTest {
-
- static class WithoutQuarkusConfigTest {
-
- @Test
- void testSetConfig() {
- final var rocksDbOptions = new Options();
-
- final var configSetter = new RocksDbConfigSetter();
- configSetter.setConfig("storeName", rocksDbOptions, Collections.emptyMap());
-
- assertThat(rocksDbOptions)
- .usingRecursiveComparison()
- .ignoringFields("nativeHandle_")
- .isEqualTo(new Options());
- }
-
- }
-
- @QuarkusTest
- @TestProfile(WithQuarkusConfigTest.TestProfile.class)
- static class WithQuarkusConfigTest {
-
- public static class TestProfile implements QuarkusTestProfile {
-
- @Override
- public Map getConfigOverrides() {
- return Map.of(
- "state-store.rocks-db.compaction-style", "universal",
- "state-store.rocks-db.compression-type", "zstd_compression"
- );
- }
- }
-
- @Inject // Force injection, otherwise Quarkus will not discover the config mapping.
- @SuppressWarnings("unused")
- StateStoreConfig stateStoreConfig;
-
- @Test
- void testSetConfig() {
- final var rocksDbOptions = new Options();
-
- final var configSetter = new RocksDbConfigSetter();
- configSetter.setConfig("storeName", rocksDbOptions, Collections.emptyMap());
-
- assertThat(rocksDbOptions.compactionStyle()).isEqualTo(CompactionStyle.UNIVERSAL);
- assertThat(rocksDbOptions.compressionType()).isEqualTo(CompressionType.ZSTD_COMPRESSION);
- }
-
- }
-
-}
\ No newline at end of file
diff --git a/vulnerability-analyzer/src/main/resources/application.properties b/vulnerability-analyzer/src/main/resources/application.properties
index 77b7d3051..88a84cdc6 100644
--- a/vulnerability-analyzer/src/main/resources/application.properties
+++ b/vulnerability-analyzer/src/main/resources/application.properties
@@ -57,6 +57,10 @@ kafka-streams.exception.thresholds.production.interval=PT30M
## Kafka Streams State Stores
#
state-store.type=in_memory
+# state-store.rocks-db.block-size-bytes=
+# state-store.rocks-db.block-cache-size-bytes=
+# state-store.rocks-db.high-priority-pool-ratio=
+# state-store.rocks-db.write-buffer-size=
# state-store.rocks-db.compaction-style=
# state-store.rocks-db.compression-type=