Skip to content

Commit

Permalink
use concurrentHashMap explicitly
Browse files Browse the repository at this point in the history
Signed-off-by: Kiran Prakash <awskiran@amazon.com>
  • Loading branch information
kiranprakash154 committed Jun 17, 2024
1 parent 21d3aaa commit 53fb91c
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 9 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Removed

### Fixed
- Used ConcurrentHashMap explicitly in IndicesRequestCache ([#14409](https://github.com/opensearch-project/OpenSearch/pull/14409))

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -506,15 +507,15 @@ public int hashCode() {
* */
class IndicesRequestCacheCleanupManager implements Closeable {
private final Set<CleanupKey> keysToClean;
private final ConcurrentMap<ShardId, ConcurrentMap<String, Integer>> cleanupKeyToCountMap;
private final ConcurrentHashMap<ShardId, ConcurrentHashMap<String, Integer>> cleanupKeyToCountMap;
private final AtomicInteger staleKeysCount;
private volatile double stalenessThreshold;
private final IndicesRequestCacheCleaner cacheCleaner;

IndicesRequestCacheCleanupManager(ThreadPool threadpool, TimeValue cleanInterval, double stalenessThreshold) {
this.stalenessThreshold = stalenessThreshold;
this.keysToClean = ConcurrentCollections.newConcurrentSet();
this.cleanupKeyToCountMap = ConcurrentCollections.newConcurrentMap();
this.cleanupKeyToCountMap = new ConcurrentHashMap<>();
this.staleKeysCount = new AtomicInteger(0);
this.cacheCleaner = new IndicesRequestCacheCleaner(this, threadpool, cleanInterval);
threadpool.schedule(cacheCleaner, cleanInterval, ThreadPool.Names.SAME);
Expand Down Expand Up @@ -572,8 +573,13 @@ private void updateStaleCountOnCacheInsert(CleanupKey cleanupKey) {

// pkg-private for testing
void addToCleanupKeyToCountMap(ShardId shardId, String readerCacheKeyId) {
cleanupKeyToCountMap.computeIfAbsent(shardId, k -> ConcurrentCollections.newConcurrentMap())
.merge(readerCacheKeyId, 1, Integer::sum);
cleanupKeyToCountMap.compute(shardId, (currentShardId, readerCacheKeyMap) -> {
if (readerCacheKeyMap == null) {
readerCacheKeyMap = new ConcurrentHashMap<>();
}
readerCacheKeyMap.compute(readerCacheKeyId, (currentReaderCacheKeyId, count) -> (count == null) ? 1 : count + 1);
return readerCacheKeyMap;
});
}

/**
Expand Down Expand Up @@ -831,7 +837,7 @@ public void close() {
}

// for testing
ConcurrentMap<ShardId, ConcurrentMap<String, Integer>> getCleanupKeyToCountMap() {
ConcurrentHashMap<ShardId, ConcurrentHashMap<String, Integer>> getCleanupKeyToCountMap() {
return cleanupKeyToCountMap;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -491,7 +490,8 @@ public void testStaleCount_OnRemovalNotificationOfStaleKey_DecrementsStaleCount(
indexShard.hashCode()
);
// test the mapping
ConcurrentMap<ShardId, ConcurrentMap<String, Integer>> cleanupKeyToCountMap = cache.cacheCleanupManager.getCleanupKeyToCountMap();
ConcurrentHashMap<ShardId, ConcurrentHashMap<String, Integer>> cleanupKeyToCountMap = cache.cacheCleanupManager
.getCleanupKeyToCountMap();
// shard id should exist
assertTrue(cleanupKeyToCountMap.containsKey(shardId));
// reader CacheKeyId should NOT exist
Expand Down Expand Up @@ -554,7 +554,8 @@ public void testStaleCount_OnRemovalNotificationOfNonStaleKey_DoesNotDecrementsS
);

// test the mapping
ConcurrentMap<ShardId, ConcurrentMap<String, Integer>> cleanupKeyToCountMap = cache.cacheCleanupManager.getCleanupKeyToCountMap();
ConcurrentHashMap<ShardId, ConcurrentHashMap<String, Integer>> cleanupKeyToCountMap = cache.cacheCleanupManager
.getCleanupKeyToCountMap();
// shard id should exist
assertTrue(cleanupKeyToCountMap.containsKey(shardId));
// reader CacheKeyId should NOT exist
Expand Down Expand Up @@ -722,7 +723,8 @@ public void testCleanupKeyToCountMapAreSetAppropriately() throws Exception {
cache.getOrCompute(getEntity(indexShard), getLoader(reader), reader, getTermBytes());
assertEquals(1, cache.count());
// test the mappings
ConcurrentMap<ShardId, ConcurrentMap<String, Integer>> cleanupKeyToCountMap = cache.cacheCleanupManager.getCleanupKeyToCountMap();
ConcurrentHashMap<ShardId, ConcurrentHashMap<String, Integer>> cleanupKeyToCountMap = cache.cacheCleanupManager
.getCleanupKeyToCountMap();
assertEquals(1, (int) cleanupKeyToCountMap.get(shardId).get(getReaderCacheKeyId(reader)));

cache.getOrCompute(getEntity(indexShard), getLoader(secondReader), secondReader, getTermBytes());
Expand Down

0 comments on commit 53fb91c

Please sign in to comment.