Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make consistentHashing to a generic tools class #17718

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public DoraCacheFileSystem createAnInstance(FileSystem fs, FileSystemContext con
* @param context
*/
public DoraCacheFileSystem(FileSystem fs, FileSystemContext context) {
this(fs, context, new DoraCacheClient(context, new WorkerLocationPolicy(2000)));
this(fs, context, new DoraCacheClient(context, new WorkerLocationPolicy()));
}

protected DoraCacheFileSystem(FileSystem fs, FileSystemContext context,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,41 +11,28 @@

package alluxio.client.file.dora;

import static com.google.common.hash.Hashing.murmur3_32_fixed;
import static java.lang.Math.ceil;
import static java.lang.String.format;
import static java.nio.charset.StandardCharsets.UTF_8;

import alluxio.client.block.BlockWorkerInfo;
import alluxio.node.ConsistentHashingNodeProvider;
import alluxio.node.NodeProvider;
import alluxio.wire.WorkerNetAddress;

import com.google.common.collect.ImmutableList;
import com.google.common.hash.HashFunction;

import java.util.HashSet;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

/**
* An impl of WorkerLocationPolicy.
*/
public class WorkerLocationPolicy {
private static final ConsistentHashProvider HASH_PROVIDER = new ConsistentHashProvider();
private final int mNumVirtualNodes;

/**
* Constructs a new {@link WorkerLocationPolicy}.
*
* @param numVirtualNodes number of virtual nodes
*/
public WorkerLocationPolicy(int numVirtualNodes) {
mNumVirtualNodes = numVirtualNodes;
}
private static final NodeProvider<BlockWorkerInfo> HASH_PROVIDER =
ConsistentHashingNodeProvider.create(
new ArrayList<BlockWorkerInfo>(), 2000,
workerInfo -> workerInfo.getNetAddress().dumpMainInfo(),
pair -> isWorkerInfoUpdated(pair.getFirst(), pair.getSecond()));

/**
*
Expand All @@ -60,83 +47,19 @@ public List<BlockWorkerInfo> getPreferredWorkers(List<BlockWorkerInfo> blockWork
if (blockWorkerInfos.size() == 0) {
return ImmutableList.of();
}
HASH_PROVIDER.refresh(blockWorkerInfos, mNumVirtualNodes);
return HASH_PROVIDER.getMultiple(fileId, count);
HASH_PROVIDER.refresh(blockWorkerInfos);
return HASH_PROVIDER.get(fileId, count);
}

private static class ConsistentHashProvider {
private static final HashFunction HASH_FUNCTION = murmur3_32_fixed();
private static final int MAX_ATTEMPTS = 100;
private List<BlockWorkerInfo> mLastWorkerInfos = ImmutableList.of();
private NavigableMap<Integer, BlockWorkerInfo> mActiveNodesByConsistentHashing;

private volatile long mLastUpdatedTimestamp = 0L;

private final AtomicBoolean mNeedUpdate = new AtomicBoolean(false);

private static final long WORKER_INFO_UPDATE_INTERVAL_MS = 1000L;

public void refresh(List<BlockWorkerInfo> workerInfos, int numVirtualNodes) {
// check if we need to update worker info
if (mLastUpdatedTimestamp <= 0L
|| System.currentTimeMillis() - mLastUpdatedTimestamp > WORKER_INFO_UPDATE_INTERVAL_MS) {
mNeedUpdate.set(true);
}
// update worker info if needed
if (mNeedUpdate.compareAndSet(true, false)) {
if (isWorkerInfoUpdated(workerInfos, mLastWorkerInfos)) {
build(workerInfos, numVirtualNodes);
}
mLastUpdatedTimestamp = System.currentTimeMillis();
}
}

private boolean isWorkerInfoUpdated(List<BlockWorkerInfo> workerInfoList,
List<BlockWorkerInfo> anotherWorkerInfoList) {
if (workerInfoList == anotherWorkerInfoList) {
return false;
}
Set<WorkerNetAddress> workerAddressSet = workerInfoList.stream()
.map(info -> info.getNetAddress()).collect(Collectors.toSet());
Set<WorkerNetAddress> anotherWorkerAddressSet = anotherWorkerInfoList.stream()
.map(info -> info.getNetAddress()).collect(Collectors.toSet());
return !workerAddressSet.equals(anotherWorkerAddressSet);
}

public List<BlockWorkerInfo> getMultiple(String key, int count) {
Set<BlockWorkerInfo> workers = new HashSet<>();
int attempts = 0;
while (workers.size() < count && attempts < MAX_ATTEMPTS) {
attempts++;
workers.add(get(key, attempts));
}
return ImmutableList.copyOf(workers);
}

public BlockWorkerInfo get(String key, int index) {
int hashKey = HASH_FUNCTION.hashString(format("%s%d", key, index), UTF_8).asInt();
Map.Entry<Integer, BlockWorkerInfo> entry =
mActiveNodesByConsistentHashing.ceilingEntry(hashKey);
if (entry != null) {
return mActiveNodesByConsistentHashing.ceilingEntry(hashKey).getValue();
} else {
return mActiveNodesByConsistentHashing.firstEntry().getValue();
}
}

private void build(List<BlockWorkerInfo> workerInfos, int numVirtualNodes) {
NavigableMap<Integer, BlockWorkerInfo> activeNodesByConsistentHashing = new TreeMap<>();
int weight = (int) ceil(1.0 * numVirtualNodes / workerInfos.size());
for (BlockWorkerInfo workerInfo : workerInfos) {
for (int i = 0; i < weight; i++) {
activeNodesByConsistentHashing.put(
HASH_FUNCTION.hashString(format("%s%d", workerInfo.getNetAddress().dumpMainInfo(), i),
UTF_8).asInt(),
workerInfo);
}
}
mLastWorkerInfos = workerInfos;
mActiveNodesByConsistentHashing = activeNodesByConsistentHashing;
private static boolean isWorkerInfoUpdated(Collection<BlockWorkerInfo> workerInfoList,
Collection<BlockWorkerInfo> anotherWorkerInfoList) {
if (workerInfoList == anotherWorkerInfoList) {
return false;
}
Set<WorkerNetAddress> workerAddressSet = workerInfoList.stream()
.map(info -> info.getNetAddress()).collect(Collectors.toSet());
Set<WorkerNetAddress> anotherWorkerAddressSet = anotherWorkerInfoList.stream()
.map(info -> info.getNetAddress()).collect(Collectors.toSet());
return !workerAddressSet.equals(anotherWorkerAddressSet);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
/*
* The Alluxio Open Foundation licenses this work under the Apache License, version 2.0
* (the "License"). You may not use this work except in compliance with the License, which is
* available at www.apache.org/licenses/LICENSE-2.0
*
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied, as more fully set forth in the License.
*
* See the NOTICE file distributed with this work for information regarding copyright ownership.
*/

package alluxio.node;

import static com.google.common.hash.Hashing.murmur3_32_fixed;
import static java.lang.Math.ceil;
import static java.lang.String.format;
import static java.nio.charset.StandardCharsets.UTF_8;

import alluxio.Constants;
import alluxio.collections.Pair;

import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableList;
import com.google.common.hash.HashFunction;

import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;

/**
* The consistent hashing node provider implementation.
* @param <T> the type of node
*/
public class ConsistentHashingNodeProvider<T> implements NodeProvider<T> {
private static final HashFunction HASH_FUNCTION = murmur3_32_fixed();
private static final long UPDATE_INTERVAL_MS = Constants.MINUTE_MS;
private final Function<T, String> mIdentifierFunction;
private final Function<Pair<T, T>, Boolean> mNodeUpdateFunction;
private Collection<T> mLastNodes = ImmutableList.of();
private NavigableMap<Integer, T> mActiveNodesByConsistentHashing;
private final int mVirtualNodes;
private volatile long mLastUpdatedTimestamp = 0L;
private final AtomicBoolean mNeedUpdate = new AtomicBoolean(false);

/**
*
* @param nodes the nodes to select
* @param numVirtualNodes number of virtual nodes
* @param identifierFunction the function to provide identifier
* @param nodeUpdateFunction the function to check whether should update node
* @return the instance of the {@link ConsistentHashingNodeProvider}
* @param <T> the type of node
*/
public static <T> ConsistentHashingNodeProvider create(List<T> nodes, int numVirtualNodes,
Function<T, String> identifierFunction,
Function<Pair<List<T>, List<T>>, Boolean> nodeUpdateFunction) {
ConsistentHashingNodeProvider consistentHashingNodeProvider =
new ConsistentHashingNodeProvider(numVirtualNodes, identifierFunction, nodeUpdateFunction);
if (nodes != null && !nodes.isEmpty()) {
consistentHashingNodeProvider.refresh(nodes);
}
return consistentHashingNodeProvider;
}

private ConsistentHashingNodeProvider(int virtualNodes, Function<T, String> identifierFunction,
Function<Pair<T, T>, Boolean> nodeUpdateFunction) {
mVirtualNodes = virtualNodes;
mIdentifierFunction = identifierFunction;
mNodeUpdateFunction = nodeUpdateFunction;
}

@Override
public List<T> get(Object identifier, int count) {
if (count > mVirtualNodes) {
count = mVirtualNodes;
}
ImmutableList.Builder<T> nodes = ImmutableList.builder();
Set<T> unique = new HashSet<>();
int hashKey = HASH_FUNCTION.hashString(format("%s", identifier), UTF_8).asInt();
Map.Entry<Integer, T> entry = mActiveNodesByConsistentHashing.ceilingEntry(hashKey);
T candidate;
SortedMap<Integer, T> nextEntries;
if (entry != null) {
candidate = entry.getValue();
nextEntries = mActiveNodesByConsistentHashing.tailMap(entry.getKey(), false);
}
else {
candidate = mActiveNodesByConsistentHashing.firstEntry().getValue();
nextEntries = mActiveNodesByConsistentHashing.tailMap(
mActiveNodesByConsistentHashing.firstKey(), false);
}
unique.add(candidate);
nodes.add(candidate);
while (unique.size() < count) {
for (Map.Entry<Integer, T> next : nextEntries.entrySet()) {
candidate = next.getValue();
if (!unique.contains(candidate)) {
unique.add(candidate);
nodes.add(candidate);
if (unique.size() == count) {
break;
}
}
}
nextEntries = mActiveNodesByConsistentHashing;
}
return nodes.build();
}

@Override
public void refresh(List<T> nodes) {
// check if we need to update worker info
if (mLastUpdatedTimestamp <= 0L
|| System.currentTimeMillis() - mLastUpdatedTimestamp > UPDATE_INTERVAL_MS) {
mNeedUpdate.set(true);
}
// update worker info if needed
if (mNeedUpdate.compareAndSet(true, false)) {
if (mNodeUpdateFunction.apply(new Pair(nodes, mLastNodes))) {
build(nodes);
}
mLastUpdatedTimestamp = System.currentTimeMillis();
}
}

private void build(Collection<T> nodes) {
NavigableMap<Integer, T> activeNodesByConsistentHashing = new TreeMap<>();
int weight = (int) ceil(1.0 * mVirtualNodes / nodes.size());
for (T node : nodes) {
for (int i = 0; i < weight; i++) {
activeNodesByConsistentHashing.put(
HASH_FUNCTION.hashString(format("%s%d", mIdentifierFunction.apply(node), i),
UTF_8).asInt(),
node);
}
}
mLastNodes = nodes;
mActiveNodesByConsistentHashing = activeNodesByConsistentHashing;
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("VirtualNodes", mVirtualNodes)
.add("mActiveNodesSize",
mActiveNodesByConsistentHashing == null ? -1 : mActiveNodesByConsistentHashing.size())
.toString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* The Alluxio Open Foundation licenses this work under the Apache License, version 2.0
* (the "License"). You may not use this work except in compliance with the License, which is
* available at www.apache.org/licenses/LICENSE-2.0
*
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied, as more fully set forth in the License.
*
* See the NOTICE file distributed with this work for information regarding copyright ownership.
*/

package alluxio.node;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

/**
* The modular hashing node provider implementation.
* @param <T> the type of node
*/
public class ModularHashingNodeProvider<T> implements NodeProvider<T> {
private List<T> mLastNodes;

/**
* Constructs a new {@link ModularHashingNodeProvider}.
*
* @param nodes the nodes to select
*/
public ModularHashingNodeProvider(List<T> nodes) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the use case for this hash+mode algorithm?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Modular hash is simple fast and do not care the node id, ConsistentHashing is complex and have to maintain the node list dynamically.

We use these two algorithm to supply a fake file block location, so that the application can dispatch the split to the node what we want.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Modular hash does not support dynamic worker membership, so I imagine you have:

  1. If you don't expect dynamic worker membership, modular hash or this jump consistent hash Support Various Kinds of Consistent Hash #17817 may be your choice
  2. If the worker memberships may change and you want consistent cache hits, consistent hash will be your choice.

I don't think I fully understood how you fake block locations by modular operation. But I think sure you can implement this, and let us know how that works out in your environment.

In #17817 we have extracted WorkerLocationPolicy interface. Could you move your implementation under WorkerLocationPolicy ? Thanks!

mLastNodes = nodes;
}

@Override
public List<T> get(Object identifier, int count) {
if (mLastNodes == null || mLastNodes.isEmpty()) {
return Collections.emptyList();
}
int size = mLastNodes.size();
int mod = identifier.hashCode() % size;
int position = mod < 0 ? mod + size : mod;
List<T> chosenCandidates = new ArrayList<>();
for (int i = 0; i < count && i < mLastNodes.size(); i++) {
chosenCandidates.add(mLastNodes.get((position + i) % size));
}
return chosenCandidates;
}

@Override
public void refresh(List<T> nodes) {
mLastNodes = nodes;
}
}
Loading
Loading