Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Group commit IdealState updates #13976

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ public enum ControllerMeter implements AbstractMetrics.Meter {
TABLE_REBALANCE_RETRY_TOO_MANY_TIMES("TableRebalanceRetryTooManyTimes", false),
NUMBER_ADHOC_TASKS_SUBMITTED("adhocTasks", false),
IDEAL_STATE_UPDATE_FAILURE("IdealStateUpdateFailure", false),
IDEAL_STATE_UPDATE_RETRY("IdealStateUpdateRetry", false);
IDEAL_STATE_UPDATE_RETRY("IdealStateUpdateRetry", false),
IDEAL_STATE_UPDATE_SUCCESS("IdealStateUpdateSuccess", false);


private final String _brokerMeterName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,17 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.helix.AccessOption;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.PropertyKey;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.HelixConfigScope;
Expand All @@ -47,12 +42,8 @@
import org.apache.helix.model.builder.HelixConfigScopeBuilder;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
import org.apache.helix.zookeeper.zkclient.exception.ZkBadVersionException;
import org.apache.pinot.common.helix.ExtraInstanceConfig;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metrics.ControllerMeter;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.common.metrics.ControllerTimer;
import org.apache.pinot.common.utils.config.TagNameUtils;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
Expand All @@ -69,156 +60,38 @@ public class HelixHelper {
private HelixHelper() {
}

private static final int NUM_PARTITIONS_THRESHOLD_TO_ENABLE_COMPRESSION = 1000;
private static final String ENABLE_COMPRESSIONS_KEY = "enableCompression";

private static final RetryPolicy DEFAULT_RETRY_POLICY = RetryPolicies.exponentialBackoffRetryPolicy(5, 1000L, 2.0f);
private static final RetryPolicy DEFAULT_TABLE_IDEALSTATES_UPDATE_RETRY_POLICY =
RetryPolicies.randomDelayRetryPolicy(20, 100L, 200L);

private static final Logger LOGGER = LoggerFactory.getLogger(HelixHelper.class);
private static final ZNRecordSerializer ZN_RECORD_SERIALIZER = new ZNRecordSerializer();
private static final IdealStateGroupCommit IDEAL_STATE_GROUP_COMMIT = new IdealStateGroupCommit();

private static final String ONLINE = "ONLINE";
private static final String OFFLINE = "OFFLINE";

public static final String BROKER_RESOURCE = CommonConstants.Helix.BROKER_RESOURCE_INSTANCE;

private static int _minNumCharsInISToTurnOnCompression = -1;

public static synchronized void setMinNumCharsInISToTurnOnCompression(int minNumChars) {
_minNumCharsInISToTurnOnCompression = minNumChars;
}

public static IdealState cloneIdealState(IdealState idealState) {
return new IdealState(
(ZNRecord) ZN_RECORD_SERIALIZER.deserialize(ZN_RECORD_SERIALIZER.serialize(idealState.getRecord())));
}

/**
* Updates the ideal state, retrying if necessary in case of concurrent updates to the ideal state.
*
* @param helixManager The HelixManager used to interact with the Helix cluster
* @param resourceName The resource for which to update the ideal state
* @param updater A function that returns an updated ideal state given an input ideal state
* @return updated ideal state if successful, null if not
*/
public static IdealState updateIdealState(HelixManager helixManager, String resourceName,
Function<IdealState, IdealState> updater, RetryPolicy policy, boolean noChangeOk) {
// NOTE: ControllerMetrics could be null because this method might be invoked by Broker.
ControllerMetrics controllerMetrics = ControllerMetrics.get();
try {
long startTimeMs = System.currentTimeMillis();
IdealStateWrapper idealStateWrapper = new IdealStateWrapper();
int retries = policy.attempt(new Callable<>() {
@Override
public Boolean call() {
HelixDataAccessor dataAccessor = helixManager.getHelixDataAccessor();
PropertyKey idealStateKey = dataAccessor.keyBuilder().idealStates(resourceName);
IdealState idealState = dataAccessor.getProperty(idealStateKey);

// Make a copy of the idealState above to pass it to the updater
// NOTE: new IdealState(idealState.getRecord()) does not work because it's shallow copy for map fields and
// list fields
IdealState idealStateCopy = cloneIdealState(idealState);

IdealState updatedIdealState;
try {
updatedIdealState = updater.apply(idealStateCopy);
} catch (PermanentUpdaterException e) {
LOGGER.error("Caught permanent exception while updating ideal state for resource: {}", resourceName, e);
throw e;
} catch (Exception e) {
LOGGER.error("Caught exception while updating ideal state for resource: {}", resourceName, e);
return false;
}

// If there are changes to apply, apply them
if (updatedIdealState != null && !idealState.equals(updatedIdealState)) {
ZNRecord updatedZNRecord = updatedIdealState.getRecord();

// Update number of partitions
int numPartitions = updatedZNRecord.getMapFields().size();
updatedIdealState.setNumPartitions(numPartitions);

// If the ideal state is large enough, enable compression
boolean enableCompression = shouldCompress(updatedIdealState);
if (enableCompression) {
updatedZNRecord.setBooleanField(ENABLE_COMPRESSIONS_KEY, true);
} else {
updatedZNRecord.getSimpleFields().remove(ENABLE_COMPRESSIONS_KEY);
}

// Check version and set ideal state
try {
if (dataAccessor.getBaseDataAccessor()
.set(idealStateKey.getPath(), updatedZNRecord, idealState.getRecord().getVersion(),
AccessOption.PERSISTENT)) {
idealStateWrapper._idealState = updatedIdealState;
return true;
} else {
LOGGER.warn("Failed to update ideal state for resource: {}", resourceName);
return false;
}
} catch (ZkBadVersionException e) {
LOGGER.warn("Version changed while updating ideal state for resource: {}", resourceName);
return false;
} catch (Exception e) {
LOGGER.warn("Caught exception while updating ideal state for resource: {} (compressed={})", resourceName,
enableCompression, e);
return false;
}
} else {
if (noChangeOk) {
LOGGER.info("Idempotent or null ideal state update for resource {}, skipping update.", resourceName);
} else {
LOGGER.warn("Idempotent or null ideal state update for resource {}, skipping update.", resourceName);
}
idealStateWrapper._idealState = idealState;
return true;
}
}

private boolean shouldCompress(IdealState is) {
if (is.getNumPartitions() > NUM_PARTITIONS_THRESHOLD_TO_ENABLE_COMPRESSION) {
return true;
}
Function<IdealState, IdealState> updater) {
return IDEAL_STATE_GROUP_COMMIT.commit(helixManager, resourceName, updater,
DEFAULT_TABLE_IDEALSTATES_UPDATE_RETRY_POLICY, false);
}

// Find the number of characters in one partition in idealstate, and extrapolate
// to estimate the number of characters.
// We could serialize the znode to determine the exact size, but that would mean serializing every
// idealstate znode twice. We avoid some extra GC by estimating the size instead. Such estimations
// should be good for most installations that have similar segment and instance names.
Iterator<String> it = is.getPartitionSet().iterator();
if (it.hasNext()) {
String partitionName = it.next();
int numChars = partitionName.length();
Map<String, String> stateMap = is.getInstanceStateMap(partitionName);
for (Map.Entry<String, String> entry : stateMap.entrySet()) {
numChars += entry.getKey().length();
numChars += entry.getValue().length();
}
numChars *= is.getNumPartitions();
return _minNumCharsInISToTurnOnCompression > 0 && numChars > _minNumCharsInISToTurnOnCompression;
}
return false;
}
});
if (controllerMetrics != null) {
controllerMetrics.addMeteredValue(resourceName, ControllerMeter.IDEAL_STATE_UPDATE_RETRY, retries);
controllerMetrics.addTimedValue(resourceName, ControllerTimer.IDEAL_STATE_UPDATE_TIME_MS,
System.currentTimeMillis() - startTimeMs, TimeUnit.MILLISECONDS);
}
return idealStateWrapper._idealState;
} catch (Exception e) {
if (controllerMetrics != null) {
controllerMetrics.addMeteredValue(resourceName, ControllerMeter.IDEAL_STATE_UPDATE_FAILURE, 1L);
}
throw new RuntimeException("Caught exception while updating ideal state for resource: " + resourceName, e);
}
public static IdealState updateIdealState(HelixManager helixManager, String resourceName,
Function<IdealState, IdealState> updater, RetryPolicy retryPolicy) {
return IDEAL_STATE_GROUP_COMMIT.commit(helixManager, resourceName, updater, retryPolicy, false);
}

private static class IdealStateWrapper {
IdealState _idealState;
public static IdealState updateIdealState(HelixManager helixManager, String resourceName,
Function<IdealState, IdealState> updater, RetryPolicy retryPolicy, boolean noChangeOk) {
return IDEAL_STATE_GROUP_COMMIT.commit(helixManager, resourceName, updater, retryPolicy, noChangeOk);
}

/**
Expand All @@ -235,16 +108,6 @@ public PermanentUpdaterException(Throwable cause) {
}
}

public static IdealState updateIdealState(HelixManager helixManager, String resourceName,
Function<IdealState, IdealState> updater) {
return updateIdealState(helixManager, resourceName, updater, DEFAULT_TABLE_IDEALSTATES_UPDATE_RETRY_POLICY, false);
}

public static IdealState updateIdealState(final HelixManager helixManager, final String resourceName,
final Function<IdealState, IdealState> updater, RetryPolicy policy) {
return updateIdealState(helixManager, resourceName, updater, policy, false);
}

/**
* Updates broker resource ideal state for the given broker with the given broker tags. Optional {@code tablesAdded}
* and {@code tablesRemoved} can be provided to track the tables added/removed during the update.
Expand Down Expand Up @@ -554,7 +417,6 @@ public static List<String> getInstancesWithoutTag(List<InstanceConfig> instanceC
return instancesWithoutTag.stream().map(InstanceConfig::getInstanceName).collect(Collectors.toList());
}


public static List<InstanceConfig> getInstancesConfigsWithTag(List<InstanceConfig> instanceConfigs, String tag) {
List<InstanceConfig> instancesWithTag = new ArrayList<>();
for (InstanceConfig instanceConfig : instanceConfigs) {
Expand Down
Loading
Loading