From 51ec6a344592b79c6f9edfb648d15de3f9e4e952 Mon Sep 17 00:00:00 2001 From: elega <445092967@qq.com> Date: Mon, 27 Feb 2023 21:22:27 +0800 Subject: [PATCH 1/3] Cache Block Location to save memory --- .../util/proto/BlockLocationUtils.java | 87 +++++++++++++++++++ .../util/proto/BlockLocationUtilsTest.java | 49 +++++++++++ .../master/block/DefaultBlockMaster.java | 20 ++--- 3 files changed, 145 insertions(+), 11 deletions(-) create mode 100644 core/common/src/main/java/alluxio/util/proto/BlockLocationUtils.java create mode 100644 core/common/src/test/java/alluxio/util/proto/BlockLocationUtilsTest.java diff --git a/core/common/src/main/java/alluxio/util/proto/BlockLocationUtils.java b/core/common/src/main/java/alluxio/util/proto/BlockLocationUtils.java new file mode 100644 index 000000000000..a705c09ad2c9 --- /dev/null +++ b/core/common/src/main/java/alluxio/util/proto/BlockLocationUtils.java @@ -0,0 +1,87 @@ +/* + * The Alluxio Open Foundation licenses this work under the Apache License, version 2.0 + * (the "License"). You may not use this work except in compliance with the License, which is + * available at www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied, as more fully set forth in the License. + * + * See the NOTICE file distributed with this work for information regarding copyright ownership. + */ + +package alluxio.util.proto; + +import alluxio.collections.IndexDefinition; +import alluxio.collections.IndexedSet; +import alluxio.proto.meta.Block.BlockLocation; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Sets; + +import java.util.Set; + +/** + * An util class to create cached grpc block locations. + */ +public class BlockLocationUtils { + private static final IndexDefinition OBJECT_INDEX = + IndexDefinition.ofUnique((b) -> b); + + private static final IndexDefinition WORKER_ID_INDEX = + IndexDefinition.ofNonUnique(BlockLocation::getWorkerId); + + // TODO(maobaolong): Add a metric to monitor the size of mLocationCacheMap + private static final IndexedSet BLOCK_LOCATION_CACHE = + new IndexedSet<>(OBJECT_INDEX, WORKER_ID_INDEX); + + private static final Set VALID_MEDIUM_TYPE_VALUES = + Sets.newHashSet("MEM", "HDD", "SSD"); + + /** + * Get a shared grpc block location object. If it does not exist, create and cache it. + * Because the valid values of tierAlias and mediumType are only MEM, SSD and HDD, + * The size of the cache map is limited. + * + * @param workerId the worker id + * @param tierAlias the tier alias + * @param mediumType the medium type + * @return a shared block location object from the cache + */ + public static BlockLocation getCached( + long workerId, String tierAlias, String mediumType) { + BlockLocation location = BlockLocation + .newBuilder() + .setWorkerId(workerId) + .setTier(tierAlias) + .setMediumType(mediumType) + .build(); + return getCached(location); + } + + /** + * Get a shared grpc block location object. If it does not exist, create and cache it. + * Because the valid values of tierAlias and mediumType are only MEM, SSD and HDD, + * The size of the cache map is limited. + * + * @param blockLocation the block location to cache + * @return a shared block location object from the cache + */ + public static BlockLocation getCached(BlockLocation blockLocation) { + Preconditions.checkState(VALID_MEDIUM_TYPE_VALUES.contains(blockLocation.getTier()), + "TierAlias must be one of the following MEM, HDD and SSD but got %s", + blockLocation.getTier()); + Preconditions.checkState(VALID_MEDIUM_TYPE_VALUES.contains(blockLocation.getMediumType()), + "MediumType must be one of the following MEM, HDD and SSD but got %s", + blockLocation.getMediumType()); + BLOCK_LOCATION_CACHE.add(blockLocation); + return BLOCK_LOCATION_CACHE.getFirstByField(OBJECT_INDEX, blockLocation); + } + + /** + * Evict cache entries by worker id. + * @param workerId the worker id + */ + public static void evictByWorkerId(long workerId) { + BLOCK_LOCATION_CACHE.removeByField(WORKER_ID_INDEX, workerId); + } +} diff --git a/core/common/src/test/java/alluxio/util/proto/BlockLocationUtilsTest.java b/core/common/src/test/java/alluxio/util/proto/BlockLocationUtilsTest.java new file mode 100644 index 000000000000..2655aee3e36d --- /dev/null +++ b/core/common/src/test/java/alluxio/util/proto/BlockLocationUtilsTest.java @@ -0,0 +1,49 @@ +/* + * The Alluxio Open Foundation licenses this work under the Apache License, version 2.0 + * (the "License"). You may not use this work except in compliance with the License, which is + * available at www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied, as more fully set forth in the License. + * + * See the NOTICE file distributed with this work for information regarding copyright ownership. + */ + +package alluxio.util.proto; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertSame; + +import alluxio.proto.meta.Block.BlockLocation; + +import org.junit.Test; + +public class BlockLocationUtilsTest { + @Test + public void testBlockLocationCached() { + BlockLocation location1 = BlockLocationUtils.getCached(1, "HDD", "SSD"); + assertEquals("HDD", location1.getTier()); + assertEquals("SSD", location1.getMediumType()); + assertEquals(1, location1.getWorkerId()); + + BlockLocation location2 = BlockLocationUtils.getCached(1, "HDD", "SSD"); + assertSame(location1, location2); + assertEquals(location1, location2); + + BlockLocation location3 = BlockLocationUtils.getCached(location2); + assertSame(location1, location3); + assertEquals(location1, location3); + + BlockLocationUtils.evictByWorkerId(1); + + BlockLocation location4 = BlockLocationUtils.getCached(1, "HDD", "SSD"); + assertNotSame(location1, location4); + assertEquals(location1, location4); + } + + @Test(expected = IllegalStateException.class) + public void testInvalidValue() { + BlockLocationUtils.getCached(1, "INVALID", "SSD"); + } +} diff --git a/core/server/master/src/main/java/alluxio/master/block/DefaultBlockMaster.java b/core/server/master/src/main/java/alluxio/master/block/DefaultBlockMaster.java index d6f093f5f5fa..d1891d0add25 100644 --- a/core/server/master/src/main/java/alluxio/master/block/DefaultBlockMaster.java +++ b/core/server/master/src/main/java/alluxio/master/block/DefaultBlockMaster.java @@ -73,6 +73,7 @@ import alluxio.util.executor.ExecutorServiceFactories; import alluxio.util.executor.ExecutorServiceFactory; import alluxio.util.network.NetworkAddressUtils; +import alluxio.util.proto.BlockLocationUtils; import alluxio.wire.Address; import alluxio.wire.BlockInfo; import alluxio.wire.RegisterLease; @@ -409,11 +410,10 @@ public boolean processJournalEntry(JournalEntry entry) { return true; } // The master is running and the journal is from an existing worker - mBlockMetaStore.addLocation(blockInfoEntry.getBlockId(), BlockLocation.newBuilder() - .setWorkerId(workerId) - .setTier(blockLocation.getTierAlias()) - .setMediumType(blockLocation.getMediumType()) - .build()); + mBlockMetaStore.addLocation(blockInfoEntry.getBlockId(), BlockLocationUtils.getCached( + workerId, blockLocation.getTierAlias(), blockLocation.getMediumType()) + ); + worker.addBlock(blockInfoEntry.getBlockId()); LOG.debug("Added BlockLocation for {} to worker {}", blockInfoEntry.getBlockId(), workerId); } @@ -984,11 +984,8 @@ public void commitBlock(long workerId, long usedBytesOnTier, String tierAlias, } } // Update the block metadata with the new worker location. - mBlockMetaStore.addLocation(blockId, BlockLocation.newBuilder() - .setWorkerId(workerId) - .setTier(tierAlias) - .setMediumType(mediumType) - .build()); + mBlockMetaStore.addLocation(blockId, BlockLocationUtils.getCached( + workerId, tierAlias, mediumType)); // This worker has this block, so it is no longer lost. mLostBlocks.remove(blockId); @@ -1544,7 +1541,7 @@ private void processWorkerAddedBlocks(MasterWorkerInfo workerInfo, Preconditions.checkState(location.getWorkerId() == workerInfo.getId(), "BlockLocation has a different workerId %s from the request sender's workerId %s", location.getWorkerId(), workerInfo.getId()); - mBlockMetaStore.addLocation(blockId, location); + mBlockMetaStore.addLocation(blockId, BlockLocationUtils.getCached(location)); mLostBlocks.remove(blockId); } else { invalidBlockCount++; @@ -1752,6 +1749,7 @@ private void processLostWorker(MasterWorkerInfo worker) { // mark these blocks to-remove from the worker. // So if the worker comes back again the blocks are kept. processWorkerRemovedBlocks(worker, worker.getBlocks(), false); + BlockLocationUtils.evictByWorkerId(worker.getId()); } private void deleteWorkerMetadata(MasterWorkerInfo worker) { From b708f5a956b9fb377d43f9cda59f359a13e759f7 Mon Sep 17 00:00:00 2001 From: elega <445092967@qq.com> Date: Tue, 28 Feb 2023 11:07:07 +0800 Subject: [PATCH 2/3] fix test --- .../main/java/alluxio/stress/cli/RpcBenchPreparationUtils.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/stress/shell/src/main/java/alluxio/stress/cli/RpcBenchPreparationUtils.java b/stress/shell/src/main/java/alluxio/stress/cli/RpcBenchPreparationUtils.java index 8a362fece565..e3f9269593fc 100644 --- a/stress/shell/src/main/java/alluxio/stress/cli/RpcBenchPreparationUtils.java +++ b/stress/shell/src/main/java/alluxio/stress/cli/RpcBenchPreparationUtils.java @@ -181,7 +181,8 @@ public static Map> generateBlockIdOnTiers( for (int i = 0; i < dirConfigs.size(); i++) { int dirNumBlocks = dirConfigs.get(i); LOG.info("Found dir on tier {} with {} blocks", tierConfig.getKey(), dirNumBlocks); - BlockStoreLocation loc = new BlockStoreLocation(tierConfig.getKey().toString(), i); + BlockStoreLocation loc = new BlockStoreLocation( + tierConfig.getKey().toString(), i, tierConfig.getKey().toString()); List blockIds = generateDecreasingNumbers(blockIdStart, dirNumBlocks); blockMap.put(loc, blockIds); blockIdStart -= dirNumBlocks; From a5b608204d5a71c261f184cd1fc332e18f6a7a55 Mon Sep 17 00:00:00 2001 From: elega <445092967@qq.com> Date: Fri, 3 Mar 2023 14:28:20 +0800 Subject: [PATCH 3/3] update --- .../src/main/java/alluxio/util/proto/BlockLocationUtils.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/common/src/main/java/alluxio/util/proto/BlockLocationUtils.java b/core/common/src/main/java/alluxio/util/proto/BlockLocationUtils.java index a705c09ad2c9..d53ec25fda47 100644 --- a/core/common/src/main/java/alluxio/util/proto/BlockLocationUtils.java +++ b/core/common/src/main/java/alluxio/util/proto/BlockLocationUtils.java @@ -68,10 +68,10 @@ public static BlockLocation getCached( */ public static BlockLocation getCached(BlockLocation blockLocation) { Preconditions.checkState(VALID_MEDIUM_TYPE_VALUES.contains(blockLocation.getTier()), - "TierAlias must be one of the following MEM, HDD and SSD but got %s", + "TierAlias must be one of {MEM, HDD and SSD} but got %s", blockLocation.getTier()); Preconditions.checkState(VALID_MEDIUM_TYPE_VALUES.contains(blockLocation.getMediumType()), - "MediumType must be one of the following MEM, HDD and SSD but got %s", + "MediumType must be one of {MEM, HDD and SSD} but got %s", blockLocation.getMediumType()); BLOCK_LOCATION_CACHE.add(blockLocation); return BLOCK_LOCATION_CACHE.getFirstByField(OBJECT_INDEX, blockLocation);