Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cache Block Location to save memory #16953

Merged
merged 3 commits into from
Mar 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* The Alluxio Open Foundation licenses this work under the Apache License, version 2.0
* (the "License"). You may not use this work except in compliance with the License, which is
* available at www.apache.org/licenses/LICENSE-2.0
*
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied, as more fully set forth in the License.
*
* See the NOTICE file distributed with this work for information regarding copyright ownership.
*/

package alluxio.util.proto;

import alluxio.collections.IndexDefinition;
import alluxio.collections.IndexedSet;
import alluxio.proto.meta.Block.BlockLocation;

import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;

import java.util.Set;

/**
* An util class to create cached grpc block locations.
*/
public class BlockLocationUtils {
private static final IndexDefinition<BlockLocation, BlockLocation> OBJECT_INDEX =
IndexDefinition.ofUnique((b) -> b);

private static final IndexDefinition<BlockLocation, Long> WORKER_ID_INDEX =
IndexDefinition.ofNonUnique(BlockLocation::getWorkerId);

// TODO(maobaolong): Add a metric to monitor the size of mLocationCacheMap
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As mLocationCacheMap renamed to BLOCK_LOCATION_CACHE , please update the comment together

Copy link
Contributor Author

@elega elega Mar 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

private static final IndexedSet<BlockLocation> BLOCK_LOCATION_CACHE =
new IndexedSet<>(OBJECT_INDEX, WORKER_ID_INDEX);

private static final Set<String> VALID_MEDIUM_TYPE_VALUES =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Athough MEM, SSD, HDD is the default value of MASTER_TIERED_STORE_GLOBAL_MEDIUMTYPE, but how to handle it when MASTER_TIERED_STORE_GLOBAL_MEDIUMTYPE configured to other value?

How about get the set from MASTER_TIERED_STORE_GLOBAL_MEDIUMTYPE?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'll add a comment for this one

Sets.newHashSet("MEM", "HDD", "SSD");

/**
* Get a shared grpc block location object. If it does not exist, create and cache it.
* Because the valid values of tierAlias and mediumType are only MEM, SSD and HDD,
* The size of the cache map is limited.
*
* @param workerId the worker id
* @param tierAlias the tier alias
* @param mediumType the medium type
* @return a shared block location object from the cache
*/
public static BlockLocation getCached(
long workerId, String tierAlias, String mediumType) {
BlockLocation location = BlockLocation
.newBuilder()
.setWorkerId(workerId)
.setTier(tierAlias)
.setMediumType(mediumType)
.build();
return getCached(location);
}

/**
* Get a shared grpc block location object. If it does not exist, create and cache it.
* Because the valid values of tierAlias and mediumType are only MEM, SSD and HDD,
* The size of the cache map is limited.
*
* @param blockLocation the block location to cache
* @return a shared block location object from the cache
*/
public static BlockLocation getCached(BlockLocation blockLocation) {
Preconditions.checkState(VALID_MEDIUM_TYPE_VALUES.contains(blockLocation.getTier()),
"TierAlias must be one of {MEM, HDD and SSD} but got %s",
blockLocation.getTier());
Preconditions.checkState(VALID_MEDIUM_TYPE_VALUES.contains(blockLocation.getMediumType()),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The supported mediumTypes are read from configuration too.

  PropertyKey tierDirMediumConf =
        PropertyKey.Template.WORKER_TIERED_STORE_LEVEL_DIRS_MEDIUMTYPE.format(mTierOrdinal);
    List<String> dirMedium = Configuration.getList(tierDirMediumConf);

"MediumType must be one of {MEM, HDD and SSD} but got %s",
blockLocation.getMediumType());
BLOCK_LOCATION_CACHE.add(blockLocation);
return BLOCK_LOCATION_CACHE.getFirstByField(OBJECT_INDEX, blockLocation);
}

/**
* Evict cache entries by worker id.
* @param workerId the worker id
*/
public static void evictByWorkerId(long workerId) {
BLOCK_LOCATION_CACHE.removeByField(WORKER_ID_INDEX, workerId);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* The Alluxio Open Foundation licenses this work under the Apache License, version 2.0
* (the "License"). You may not use this work except in compliance with the License, which is
* available at www.apache.org/licenses/LICENSE-2.0
*
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied, as more fully set forth in the License.
*
* See the NOTICE file distributed with this work for information regarding copyright ownership.
*/

package alluxio.util.proto;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertSame;

import alluxio.proto.meta.Block.BlockLocation;

import org.junit.Test;

public class BlockLocationUtilsTest {
@Test
public void testBlockLocationCached() {
BlockLocation location1 = BlockLocationUtils.getCached(1, "HDD", "SSD");
assertEquals("HDD", location1.getTier());
assertEquals("SSD", location1.getMediumType());
assertEquals(1, location1.getWorkerId());

BlockLocation location2 = BlockLocationUtils.getCached(1, "HDD", "SSD");
assertSame(location1, location2);
assertEquals(location1, location2);

BlockLocation location3 = BlockLocationUtils.getCached(location2);
assertSame(location1, location3);
assertEquals(location1, location3);

BlockLocationUtils.evictByWorkerId(1);

BlockLocation location4 = BlockLocationUtils.getCached(1, "HDD", "SSD");
assertNotSame(location1, location4);
assertEquals(location1, location4);
}

@Test(expected = IllegalStateException.class)
public void testInvalidValue() {
BlockLocationUtils.getCached(1, "INVALID", "SSD");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import alluxio.util.executor.ExecutorServiceFactories;
import alluxio.util.executor.ExecutorServiceFactory;
import alluxio.util.network.NetworkAddressUtils;
import alluxio.util.proto.BlockLocationUtils;
import alluxio.wire.Address;
import alluxio.wire.BlockInfo;
import alluxio.wire.RegisterLease;
Expand Down Expand Up @@ -409,11 +410,10 @@ public boolean processJournalEntry(JournalEntry entry) {
return true;
}
// The master is running and the journal is from an existing worker
mBlockMetaStore.addLocation(blockInfoEntry.getBlockId(), BlockLocation.newBuilder()
.setWorkerId(workerId)
.setTier(blockLocation.getTierAlias())
.setMediumType(blockLocation.getMediumType())
.build());
mBlockMetaStore.addLocation(blockInfoEntry.getBlockId(), BlockLocationUtils.getCached(
workerId, blockLocation.getTierAlias(), blockLocation.getMediumType())
);

worker.addBlock(blockInfoEntry.getBlockId());
LOG.debug("Added BlockLocation for {} to worker {}", blockInfoEntry.getBlockId(), workerId);
}
Expand Down Expand Up @@ -984,11 +984,8 @@ public void commitBlock(long workerId, long usedBytesOnTier, String tierAlias,
}
}
// Update the block metadata with the new worker location.
mBlockMetaStore.addLocation(blockId, BlockLocation.newBuilder()
.setWorkerId(workerId)
.setTier(tierAlias)
.setMediumType(mediumType)
.build());
mBlockMetaStore.addLocation(blockId, BlockLocationUtils.getCached(
workerId, tierAlias, mediumType));
// This worker has this block, so it is no longer lost.
mLostBlocks.remove(blockId);

Expand Down Expand Up @@ -1544,7 +1541,7 @@ private void processWorkerAddedBlocks(MasterWorkerInfo workerInfo,
Preconditions.checkState(location.getWorkerId() == workerInfo.getId(),
"BlockLocation has a different workerId %s from the request sender's workerId %s",
location.getWorkerId(), workerInfo.getId());
mBlockMetaStore.addLocation(blockId, location);
mBlockMetaStore.addLocation(blockId, BlockLocationUtils.getCached(location));
mLostBlocks.remove(blockId);
} else {
invalidBlockCount++;
Expand Down Expand Up @@ -1752,6 +1749,7 @@ private void processLostWorker(MasterWorkerInfo worker) {
// mark these blocks to-remove from the worker.
// So if the worker comes back again the blocks are kept.
processWorkerRemovedBlocks(worker, worker.getBlocks(), false);
BlockLocationUtils.evictByWorkerId(worker.getId());
}

private void deleteWorkerMetadata(MasterWorkerInfo worker) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,8 @@ public static Map<BlockStoreLocation, List<Long>> generateBlockIdOnTiers(
for (int i = 0; i < dirConfigs.size(); i++) {
int dirNumBlocks = dirConfigs.get(i);
LOG.info("Found dir on tier {} with {} blocks", tierConfig.getKey(), dirNumBlocks);
BlockStoreLocation loc = new BlockStoreLocation(tierConfig.getKey().toString(), i);
BlockStoreLocation loc = new BlockStoreLocation(
tierConfig.getKey().toString(), i, tierConfig.getKey().toString());
List<Long> blockIds = generateDecreasingNumbers(blockIdStart, dirNumBlocks);
blockMap.put(loc, blockIds);
blockIdStart -= dirNumBlocks;
Expand Down