Skip to content

Commit

Permalink
Group commit IdealState updates (#13976)
Browse files Browse the repository at this point in the history
  • Loading branch information
xiangfu0 committed Sep 14, 2024
1 parent 4b38860 commit 717895b
Show file tree
Hide file tree
Showing 9 changed files with 524 additions and 267 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ public enum ControllerMeter implements AbstractMetrics.Meter {
TABLE_REBALANCE_RETRY_TOO_MANY_TIMES("TableRebalanceRetryTooManyTimes", false),
NUMBER_ADHOC_TASKS_SUBMITTED("adhocTasks", false),
IDEAL_STATE_UPDATE_FAILURE("IdealStateUpdateFailure", false),
IDEAL_STATE_UPDATE_RETRY("IdealStateUpdateRetry", false);
IDEAL_STATE_UPDATE_RETRY("IdealStateUpdateRetry", false),
IDEAL_STATE_UPDATE_SUCCESS("IdealStateUpdateSuccess", false);


private final String _brokerMeterName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,17 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.helix.AccessOption;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.PropertyKey;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.HelixConfigScope;
Expand All @@ -47,12 +42,8 @@
import org.apache.helix.model.builder.HelixConfigScopeBuilder;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
import org.apache.helix.zookeeper.zkclient.exception.ZkBadVersionException;
import org.apache.pinot.common.helix.ExtraInstanceConfig;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metrics.ControllerMeter;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.common.metrics.ControllerTimer;
import org.apache.pinot.common.utils.config.TagNameUtils;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
Expand All @@ -69,156 +60,38 @@ public class HelixHelper {
private HelixHelper() {
}

private static final int NUM_PARTITIONS_THRESHOLD_TO_ENABLE_COMPRESSION = 1000;
private static final String ENABLE_COMPRESSIONS_KEY = "enableCompression";

private static final RetryPolicy DEFAULT_RETRY_POLICY = RetryPolicies.exponentialBackoffRetryPolicy(5, 1000L, 2.0f);
private static final RetryPolicy DEFAULT_TABLE_IDEALSTATES_UPDATE_RETRY_POLICY =
RetryPolicies.randomDelayRetryPolicy(20, 100L, 200L);

private static final Logger LOGGER = LoggerFactory.getLogger(HelixHelper.class);
private static final ZNRecordSerializer ZN_RECORD_SERIALIZER = new ZNRecordSerializer();
private static final IdealStateGroupCommit IDEAL_STATE_GROUP_COMMIT = new IdealStateGroupCommit();

private static final String ONLINE = "ONLINE";
private static final String OFFLINE = "OFFLINE";

public static final String BROKER_RESOURCE = CommonConstants.Helix.BROKER_RESOURCE_INSTANCE;

private static int _minNumCharsInISToTurnOnCompression = -1;

public static synchronized void setMinNumCharsInISToTurnOnCompression(int minNumChars) {
_minNumCharsInISToTurnOnCompression = minNumChars;
}

public static IdealState cloneIdealState(IdealState idealState) {
return new IdealState(
(ZNRecord) ZN_RECORD_SERIALIZER.deserialize(ZN_RECORD_SERIALIZER.serialize(idealState.getRecord())));
}

/**
* Updates the ideal state, retrying if necessary in case of concurrent updates to the ideal state.
*
* @param helixManager The HelixManager used to interact with the Helix cluster
* @param resourceName The resource for which to update the ideal state
* @param updater A function that returns an updated ideal state given an input ideal state
* @return updated ideal state if successful, null if not
*/
public static IdealState updateIdealState(HelixManager helixManager, String resourceName,
Function<IdealState, IdealState> updater, RetryPolicy policy, boolean noChangeOk) {
// NOTE: ControllerMetrics could be null because this method might be invoked by Broker.
ControllerMetrics controllerMetrics = ControllerMetrics.get();
try {
long startTimeMs = System.currentTimeMillis();
IdealStateWrapper idealStateWrapper = new IdealStateWrapper();
int retries = policy.attempt(new Callable<>() {
@Override
public Boolean call() {
HelixDataAccessor dataAccessor = helixManager.getHelixDataAccessor();
PropertyKey idealStateKey = dataAccessor.keyBuilder().idealStates(resourceName);
IdealState idealState = dataAccessor.getProperty(idealStateKey);

// Make a copy of the idealState above to pass it to the updater
// NOTE: new IdealState(idealState.getRecord()) does not work because it's shallow copy for map fields and
// list fields
IdealState idealStateCopy = cloneIdealState(idealState);

IdealState updatedIdealState;
try {
updatedIdealState = updater.apply(idealStateCopy);
} catch (PermanentUpdaterException e) {
LOGGER.error("Caught permanent exception while updating ideal state for resource: {}", resourceName, e);
throw e;
} catch (Exception e) {
LOGGER.error("Caught exception while updating ideal state for resource: {}", resourceName, e);
return false;
}

// If there are changes to apply, apply them
if (updatedIdealState != null && !idealState.equals(updatedIdealState)) {
ZNRecord updatedZNRecord = updatedIdealState.getRecord();

// Update number of partitions
int numPartitions = updatedZNRecord.getMapFields().size();
updatedIdealState.setNumPartitions(numPartitions);

// If the ideal state is large enough, enable compression
boolean enableCompression = shouldCompress(updatedIdealState);
if (enableCompression) {
updatedZNRecord.setBooleanField(ENABLE_COMPRESSIONS_KEY, true);
} else {
updatedZNRecord.getSimpleFields().remove(ENABLE_COMPRESSIONS_KEY);
}

// Check version and set ideal state
try {
if (dataAccessor.getBaseDataAccessor()
.set(idealStateKey.getPath(), updatedZNRecord, idealState.getRecord().getVersion(),
AccessOption.PERSISTENT)) {
idealStateWrapper._idealState = updatedIdealState;
return true;
} else {
LOGGER.warn("Failed to update ideal state for resource: {}", resourceName);
return false;
}
} catch (ZkBadVersionException e) {
LOGGER.warn("Version changed while updating ideal state for resource: {}", resourceName);
return false;
} catch (Exception e) {
LOGGER.warn("Caught exception while updating ideal state for resource: {} (compressed={})", resourceName,
enableCompression, e);
return false;
}
} else {
if (noChangeOk) {
LOGGER.info("Idempotent or null ideal state update for resource {}, skipping update.", resourceName);
} else {
LOGGER.warn("Idempotent or null ideal state update for resource {}, skipping update.", resourceName);
}
idealStateWrapper._idealState = idealState;
return true;
}
}

private boolean shouldCompress(IdealState is) {
if (is.getNumPartitions() > NUM_PARTITIONS_THRESHOLD_TO_ENABLE_COMPRESSION) {
return true;
}
Function<IdealState, IdealState> updater) {
return IDEAL_STATE_GROUP_COMMIT.commit(helixManager, resourceName, updater,
DEFAULT_TABLE_IDEALSTATES_UPDATE_RETRY_POLICY, false);
}

// Find the number of characters in one partition in idealstate, and extrapolate
// to estimate the number of characters.
// We could serialize the znode to determine the exact size, but that would mean serializing every
// idealstate znode twice. We avoid some extra GC by estimating the size instead. Such estimations
// should be good for most installations that have similar segment and instance names.
Iterator<String> it = is.getPartitionSet().iterator();
if (it.hasNext()) {
String partitionName = it.next();
int numChars = partitionName.length();
Map<String, String> stateMap = is.getInstanceStateMap(partitionName);
for (Map.Entry<String, String> entry : stateMap.entrySet()) {
numChars += entry.getKey().length();
numChars += entry.getValue().length();
}
numChars *= is.getNumPartitions();
return _minNumCharsInISToTurnOnCompression > 0 && numChars > _minNumCharsInISToTurnOnCompression;
}
return false;
}
});
if (controllerMetrics != null) {
controllerMetrics.addMeteredValue(resourceName, ControllerMeter.IDEAL_STATE_UPDATE_RETRY, retries);
controllerMetrics.addTimedValue(resourceName, ControllerTimer.IDEAL_STATE_UPDATE_TIME_MS,
System.currentTimeMillis() - startTimeMs, TimeUnit.MILLISECONDS);
}
return idealStateWrapper._idealState;
} catch (Exception e) {
if (controllerMetrics != null) {
controllerMetrics.addMeteredValue(resourceName, ControllerMeter.IDEAL_STATE_UPDATE_FAILURE, 1L);
}
throw new RuntimeException("Caught exception while updating ideal state for resource: " + resourceName, e);
}
public static IdealState updateIdealState(HelixManager helixManager, String resourceName,
Function<IdealState, IdealState> updater, RetryPolicy retryPolicy) {
return IDEAL_STATE_GROUP_COMMIT.commit(helixManager, resourceName, updater, retryPolicy, false);
}

private static class IdealStateWrapper {
IdealState _idealState;
public static IdealState updateIdealState(HelixManager helixManager, String resourceName,
Function<IdealState, IdealState> updater, RetryPolicy retryPolicy, boolean noChangeOk) {
return IDEAL_STATE_GROUP_COMMIT.commit(helixManager, resourceName, updater, retryPolicy, noChangeOk);
}

/**
Expand All @@ -235,16 +108,6 @@ public PermanentUpdaterException(Throwable cause) {
}
}

public static IdealState updateIdealState(HelixManager helixManager, String resourceName,
Function<IdealState, IdealState> updater) {
return updateIdealState(helixManager, resourceName, updater, DEFAULT_TABLE_IDEALSTATES_UPDATE_RETRY_POLICY, false);
}

public static IdealState updateIdealState(final HelixManager helixManager, final String resourceName,
final Function<IdealState, IdealState> updater, RetryPolicy policy) {
return updateIdealState(helixManager, resourceName, updater, policy, false);
}

/**
* Updates broker resource ideal state for the given broker with the given broker tags. Optional {@code tablesAdded}
* and {@code tablesRemoved} can be provided to track the tables added/removed during the update.
Expand Down Expand Up @@ -554,7 +417,6 @@ public static List<String> getInstancesWithoutTag(List<InstanceConfig> instanceC
return instancesWithoutTag.stream().map(InstanceConfig::getInstanceName).collect(Collectors.toList());
}


public static List<InstanceConfig> getInstancesConfigsWithTag(List<InstanceConfig> instanceConfigs, String tag) {
List<InstanceConfig> instancesWithTag = new ArrayList<>();
for (InstanceConfig instanceConfig : instanceConfigs) {
Expand Down
Loading

0 comments on commit 717895b

Please sign in to comment.