diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/HelixHelper.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/HelixHelper.java index 4ffad84d61fb..a44b85510974 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/HelixHelper.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/HelixHelper.java @@ -19,25 +19,21 @@ package org.apache.pinot.common.utils.helix; import com.google.common.base.Preconditions; +import it.unimi.dsi.fastutil.objects.ObjectBooleanPair; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.Callable; -import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; import javax.annotation.Nullable; import org.apache.commons.collections4.CollectionUtils; -import org.apache.helix.AccessOption; import org.apache.helix.HelixAdmin; import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixManager; -import org.apache.helix.PropertyKey; import org.apache.helix.PropertyKey.Builder; import org.apache.helix.model.ExternalView; import org.apache.helix.model.HelixConfigScope; @@ -47,12 +43,8 @@ import org.apache.helix.model.builder.HelixConfigScopeBuilder; import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer; -import org.apache.helix.zookeeper.zkclient.exception.ZkBadVersionException; import org.apache.pinot.common.helix.ExtraInstanceConfig; import org.apache.pinot.common.metadata.ZKMetadataProvider; -import org.apache.pinot.common.metrics.ControllerMeter; -import org.apache.pinot.common.metrics.ControllerMetrics; -import org.apache.pinot.common.metrics.ControllerTimer; import org.apache.pinot.common.utils.config.TagNameUtils; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; @@ -69,156 +61,38 @@ public class HelixHelper { private HelixHelper() { } - private static final int NUM_PARTITIONS_THRESHOLD_TO_ENABLE_COMPRESSION = 1000; - private static final String ENABLE_COMPRESSIONS_KEY = "enableCompression"; - private static final RetryPolicy DEFAULT_RETRY_POLICY = RetryPolicies.exponentialBackoffRetryPolicy(5, 1000L, 2.0f); private static final RetryPolicy DEFAULT_TABLE_IDEALSTATES_UPDATE_RETRY_POLICY = RetryPolicies.randomDelayRetryPolicy(20, 100L, 200L); + private static final Logger LOGGER = LoggerFactory.getLogger(HelixHelper.class); private static final ZNRecordSerializer ZN_RECORD_SERIALIZER = new ZNRecordSerializer(); + private static final IdealStateGroupCommit IDEAL_STATE_GROUP_COMMIT = new IdealStateGroupCommit(); private static final String ONLINE = "ONLINE"; private static final String OFFLINE = "OFFLINE"; public static final String BROKER_RESOURCE = CommonConstants.Helix.BROKER_RESOURCE_INSTANCE; - private static int _minNumCharsInISToTurnOnCompression = -1; - - public static synchronized void setMinNumCharsInISToTurnOnCompression(int minNumChars) { - _minNumCharsInISToTurnOnCompression = minNumChars; - } - public static IdealState cloneIdealState(IdealState idealState) { return new IdealState( (ZNRecord) ZN_RECORD_SERIALIZER.deserialize(ZN_RECORD_SERIALIZER.serialize(idealState.getRecord()))); } - /** - * Updates the ideal state, retrying if necessary in case of concurrent updates to the ideal state. - * - * @param helixManager The HelixManager used to interact with the Helix cluster - * @param resourceName The resource for which to update the ideal state - * @param updater A function that returns an updated ideal state given an input ideal state - * @return updated ideal state if successful, null if not - */ - public static IdealState updateIdealState(HelixManager helixManager, String resourceName, - Function updater, RetryPolicy policy, boolean noChangeOk) { - // NOTE: ControllerMetrics could be null because this method might be invoked by Broker. - ControllerMetrics controllerMetrics = ControllerMetrics.get(); - try { - long startTimeMs = System.currentTimeMillis(); - IdealStateWrapper idealStateWrapper = new IdealStateWrapper(); - int retries = policy.attempt(new Callable<>() { - @Override - public Boolean call() { - HelixDataAccessor dataAccessor = helixManager.getHelixDataAccessor(); - PropertyKey idealStateKey = dataAccessor.keyBuilder().idealStates(resourceName); - IdealState idealState = dataAccessor.getProperty(idealStateKey); - - // Make a copy of the idealState above to pass it to the updater - // NOTE: new IdealState(idealState.getRecord()) does not work because it's shallow copy for map fields and - // list fields - IdealState idealStateCopy = cloneIdealState(idealState); - - IdealState updatedIdealState; - try { - updatedIdealState = updater.apply(idealStateCopy); - } catch (PermanentUpdaterException e) { - LOGGER.error("Caught permanent exception while updating ideal state for resource: {}", resourceName, e); - throw e; - } catch (Exception e) { - LOGGER.error("Caught exception while updating ideal state for resource: {}", resourceName, e); - return false; - } - - // If there are changes to apply, apply them - if (updatedIdealState != null && !idealState.equals(updatedIdealState)) { - ZNRecord updatedZNRecord = updatedIdealState.getRecord(); - - // Update number of partitions - int numPartitions = updatedZNRecord.getMapFields().size(); - updatedIdealState.setNumPartitions(numPartitions); - - // If the ideal state is large enough, enable compression - boolean enableCompression = shouldCompress(updatedIdealState); - if (enableCompression) { - updatedZNRecord.setBooleanField(ENABLE_COMPRESSIONS_KEY, true); - } else { - updatedZNRecord.getSimpleFields().remove(ENABLE_COMPRESSIONS_KEY); - } - - // Check version and set ideal state - try { - if (dataAccessor.getBaseDataAccessor() - .set(idealStateKey.getPath(), updatedZNRecord, idealState.getRecord().getVersion(), - AccessOption.PERSISTENT)) { - idealStateWrapper._idealState = updatedIdealState; - return true; - } else { - LOGGER.warn("Failed to update ideal state for resource: {}", resourceName); - return false; - } - } catch (ZkBadVersionException e) { - LOGGER.warn("Version changed while updating ideal state for resource: {}", resourceName); - return false; - } catch (Exception e) { - LOGGER.warn("Caught exception while updating ideal state for resource: {} (compressed={})", resourceName, - enableCompression, e); - return false; - } - } else { - if (noChangeOk) { - LOGGER.info("Idempotent or null ideal state update for resource {}, skipping update.", resourceName); - } else { - LOGGER.warn("Idempotent or null ideal state update for resource {}, skipping update.", resourceName); - } - idealStateWrapper._idealState = idealState; - return true; - } - } - - private boolean shouldCompress(IdealState is) { - if (is.getNumPartitions() > NUM_PARTITIONS_THRESHOLD_TO_ENABLE_COMPRESSION) { - return true; - } + public static ObjectBooleanPair updateIdealState(HelixManager helixManager, String resourceName, + Function updater) { + return IDEAL_STATE_GROUP_COMMIT.commit(helixManager, resourceName, updater, + DEFAULT_TABLE_IDEALSTATES_UPDATE_RETRY_POLICY, false); + } - // Find the number of characters in one partition in idealstate, and extrapolate - // to estimate the number of characters. - // We could serialize the znode to determine the exact size, but that would mean serializing every - // idealstate znode twice. We avoid some extra GC by estimating the size instead. Such estimations - // should be good for most installations that have similar segment and instance names. - Iterator it = is.getPartitionSet().iterator(); - if (it.hasNext()) { - String partitionName = it.next(); - int numChars = partitionName.length(); - Map stateMap = is.getInstanceStateMap(partitionName); - for (Map.Entry entry : stateMap.entrySet()) { - numChars += entry.getKey().length(); - numChars += entry.getValue().length(); - } - numChars *= is.getNumPartitions(); - return _minNumCharsInISToTurnOnCompression > 0 && numChars > _minNumCharsInISToTurnOnCompression; - } - return false; - } - }); - if (controllerMetrics != null) { - controllerMetrics.addMeteredValue(resourceName, ControllerMeter.IDEAL_STATE_UPDATE_RETRY, retries); - controllerMetrics.addTimedValue(resourceName, ControllerTimer.IDEAL_STATE_UPDATE_TIME_MS, - System.currentTimeMillis() - startTimeMs, TimeUnit.MILLISECONDS); - } - return idealStateWrapper._idealState; - } catch (Exception e) { - if (controllerMetrics != null) { - controllerMetrics.addMeteredValue(resourceName, ControllerMeter.IDEAL_STATE_UPDATE_FAILURE, 1L); - } - throw new RuntimeException("Caught exception while updating ideal state for resource: " + resourceName, e); - } + public static ObjectBooleanPair updateIdealState(HelixManager helixManager, String resourceName, + Function updater, RetryPolicy retryPolicy) { + return IDEAL_STATE_GROUP_COMMIT.commit(helixManager, resourceName, updater, retryPolicy, false); } - private static class IdealStateWrapper { - IdealState _idealState; + public static ObjectBooleanPair updateIdealState(HelixManager helixManager, String resourceName, + Function updater, RetryPolicy retryPolicy, boolean noChangeOk) { + return IDEAL_STATE_GROUP_COMMIT.commit(helixManager, resourceName, updater, retryPolicy, noChangeOk); } /** @@ -235,16 +109,6 @@ public PermanentUpdaterException(Throwable cause) { } } - public static IdealState updateIdealState(HelixManager helixManager, String resourceName, - Function updater) { - return updateIdealState(helixManager, resourceName, updater, DEFAULT_TABLE_IDEALSTATES_UPDATE_RETRY_POLICY, false); - } - - public static IdealState updateIdealState(final HelixManager helixManager, final String resourceName, - final Function updater, RetryPolicy policy) { - return updateIdealState(helixManager, resourceName, updater, policy, false); - } - /** * Updates broker resource ideal state for the given broker with the given broker tags. Optional {@code tablesAdded} * and {@code tablesRemoved} can be provided to track the tables added/removed during the update. diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/IdealStateGroupCommit.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/IdealStateGroupCommit.java new file mode 100644 index 000000000000..3b55edd452ec --- /dev/null +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/IdealStateGroupCommit.java @@ -0,0 +1,298 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.utils.helix; + +import it.unimi.dsi.fastutil.objects.ObjectBooleanImmutablePair; +import it.unimi.dsi.fastutil.objects.ObjectBooleanPair; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; +import org.apache.helix.AccessOption; +import org.apache.helix.HelixDataAccessor; +import org.apache.helix.HelixManager; +import org.apache.helix.PropertyKey; +import org.apache.helix.model.IdealState; +import org.apache.helix.zookeeper.datamodel.ZNRecord; +import org.apache.helix.zookeeper.zkclient.exception.ZkBadVersionException; +import org.apache.pinot.common.metrics.ControllerMeter; +import org.apache.pinot.common.metrics.ControllerMetrics; +import org.apache.pinot.common.metrics.ControllerTimer; +import org.apache.pinot.spi.utils.retry.RetryPolicy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class IdealStateGroupCommit { + private static final Logger LOGGER = LoggerFactory.getLogger(IdealStateGroupCommit.class); + + private static final int NUM_PARTITIONS_THRESHOLD_TO_ENABLE_COMPRESSION = 1000; + private static final String ENABLE_COMPRESSIONS_KEY = "enableCompression"; + + private static int _minNumCharsInISToTurnOnCompression = -1; + + private static class Queue { + final AtomicReference _running = new AtomicReference(); + final ConcurrentLinkedQueue _pending = new ConcurrentLinkedQueue(); + } + + private static class Entry { + final String _resourceName; + final Function _updater; + AtomicBoolean _sent = new AtomicBoolean(false); + + Entry(String resourceName, Function updater) { + _resourceName = resourceName; + _updater = updater; + } + } + + private final Queue[] _queues = new Queue[100]; + + /** + * Set up a group committer and its associated queues + */ + public IdealStateGroupCommit() { + // Don't use Arrays.fill(); + for (int i = 0; i < _queues.length; i++) { + _queues[i] = new Queue(); + } + } + + private Queue getQueue(String resourceName) { + return _queues[(resourceName.hashCode() & Integer.MAX_VALUE) % _queues.length]; + } + + public static synchronized void setMinNumCharsInISToTurnOnCompression(int minNumChars) { + _minNumCharsInISToTurnOnCompression = minNumChars; + } + + /** + * Do a group update for idealState associated with a given resource key + * @param helixManager helixManager with the ability to pull from the current data\ + * @param resourceName the resource name to be updated + * @param updater the idealState updater to be applied + * @return true if successful, false otherwise + */ + public ObjectBooleanPair commit(HelixManager helixManager, String resourceName, + Function updater, RetryPolicy retryPolicy, boolean noChangeOk) { + Queue queue = getQueue(resourceName); + Entry entry = new Entry(resourceName, updater); + + boolean success = true; + queue._pending.add(entry); + IdealState finalIdealState = null; + while (!entry._sent.get()) { + if (queue._running.compareAndSet(null, Thread.currentThread())) { + ArrayList processed = new ArrayList<>(); + try { + if (queue._pending.peek() == null) { + return new ObjectBooleanImmutablePair<>(finalIdealState, true); + } + // remove from queue + Entry first = queue._pending.poll(); + processed.add(first); + String mergedResourceName = first._resourceName; + HelixDataAccessor dataAccessor = helixManager.getHelixDataAccessor(); + PropertyKey idealStateKey = dataAccessor.keyBuilder().idealStates(resourceName); + IdealState idealState = dataAccessor.getProperty(idealStateKey); + + // Make a copy of the idealState above to pass it to the updater + // NOTE: new IdealState(idealState.getRecord()) does not work because it's shallow copy for map fields and + // list fields + IdealState idealStateCopy = HelixHelper.cloneIdealState(idealState); + + /** + * If the local cache does not contain a value, need to check if there is a + * value in ZK; use it as initial value if exists + */ + IdealState updatedIdealState = first._updater.apply(idealStateCopy); + Iterator it = queue._pending.iterator(); + while (it.hasNext()) { + Entry ent = it.next(); + if (!ent._resourceName.equals(mergedResourceName)) { + continue; + } + processed.add(ent); + updatedIdealState = ent._updater.apply(idealStateCopy); + // System.out.println("After merging:" + merged); + it.remove(); + } + success = false; + IdealState finalUpdatedIdealState = updatedIdealState; + finalIdealState = updatedIdealState; + updateIdealState(helixManager, resourceName, anyIdealState -> finalUpdatedIdealState, + retryPolicy, noChangeOk); + } finally { + queue._running.set(null); + for (Entry e : processed) { + synchronized (e) { + e._sent.set(true); + e.notify(); + } + } + } + } else { + synchronized (entry) { + try { + entry.wait(10); + } catch (InterruptedException e) { + LOGGER.error("Interrupted while committing change, resourceName: " + resourceName + ", updater: " + updater, + e); + // Restore interrupt status + Thread.currentThread().interrupt(); + return new ObjectBooleanImmutablePair<>(finalIdealState, false); + } + } + } + } + return new ObjectBooleanImmutablePair<>(finalIdealState, success); + } + + private static class IdealStateWrapper { + IdealState _idealState; + } + + /** + * Updates the ideal state, retrying if necessary in case of concurrent updates to the ideal state. + * + * @param helixManager The HelixManager used to interact with the Helix cluster + * @param resourceName The resource for which to update the ideal state + * @param updater A function that returns an updated ideal state given an input ideal state + * @return updated ideal state if successful, null if not + */ + private static IdealState updateIdealState(HelixManager helixManager, String resourceName, + Function updater, RetryPolicy policy, boolean noChangeOk) { + // NOTE: ControllerMetrics could be null because this method might be invoked by Broker. + ControllerMetrics controllerMetrics = ControllerMetrics.get(); + try { + long startTimeMs = System.currentTimeMillis(); + IdealStateWrapper idealStateWrapper = new IdealStateWrapper(); + int retries = policy.attempt(new Callable<>() { + @Override + public Boolean call() { + HelixDataAccessor dataAccessor = helixManager.getHelixDataAccessor(); + PropertyKey idealStateKey = dataAccessor.keyBuilder().idealStates(resourceName); + IdealState idealState = dataAccessor.getProperty(idealStateKey); + + // Make a copy of the idealState above to pass it to the updater + // NOTE: new IdealState(idealState.getRecord()) does not work because it's shallow copy for map fields and + // list fields + IdealState idealStateCopy = HelixHelper.cloneIdealState(idealState); + IdealState updatedIdealState; + try { + updatedIdealState = updater.apply(idealStateCopy); + } catch (HelixHelper.PermanentUpdaterException e) { + LOGGER.error("Caught permanent exception while updating ideal state for resource: {}", resourceName, e); + throw e; + } catch (Exception e) { + LOGGER.error("Caught exception while updating ideal state for resource: {}", resourceName, e); + return false; + } + + // If there are changes to apply, apply them + if (updatedIdealState != null && !idealState.equals(updatedIdealState)) { + ZNRecord updatedZNRecord = updatedIdealState.getRecord(); + + // Update number of partitions + int numPartitions = updatedZNRecord.getMapFields().size(); + updatedIdealState.setNumPartitions(numPartitions); + + // If the ideal state is large enough, enable compression + boolean enableCompression = shouldCompress(updatedIdealState); + if (enableCompression) { + updatedZNRecord.setBooleanField(ENABLE_COMPRESSIONS_KEY, true); + } else { + updatedZNRecord.getSimpleFields().remove(ENABLE_COMPRESSIONS_KEY); + } + + // Check version and set ideal state + try { + if (dataAccessor.getBaseDataAccessor() + .set(idealStateKey.getPath(), updatedZNRecord, idealState.getRecord().getVersion(), + AccessOption.PERSISTENT)) { + idealStateWrapper._idealState = updatedIdealState; + return true; + } else { + LOGGER.warn("Failed to update ideal state for resource: {}", resourceName); + return false; + } + } catch (ZkBadVersionException e) { + LOGGER.warn("Version changed while updating ideal state for resource: {}", resourceName); + return false; + } catch (Exception e) { + LOGGER.warn("Caught exception while updating ideal state for resource: {} (compressed={})", resourceName, + enableCompression, e); + return false; + } + } else { + if (noChangeOk) { + LOGGER.info("Idempotent or null ideal state update for resource {}, skipping update.", resourceName); + } else { + LOGGER.warn("Idempotent or null ideal state update for resource {}, skipping update.", resourceName); + } + idealStateWrapper._idealState = idealState; + return true; + } + } + + private boolean shouldCompress(IdealState is) { + if (is.getNumPartitions() > NUM_PARTITIONS_THRESHOLD_TO_ENABLE_COMPRESSION) { + return true; + } + + // Find the number of characters in one partition in idealstate, and extrapolate + // to estimate the number of characters. + // We could serialize the znode to determine the exact size, but that would mean serializing every + // idealstate znode twice. We avoid some extra GC by estimating the size instead. Such estimations + // should be good for most installations that have similar segment and instance names. + Iterator it = is.getPartitionSet().iterator(); + if (it.hasNext()) { + String partitionName = it.next(); + int numChars = partitionName.length(); + Map stateMap = is.getInstanceStateMap(partitionName); + for (Map.Entry entry : stateMap.entrySet()) { + numChars += entry.getKey().length(); + numChars += entry.getValue().length(); + } + numChars *= is.getNumPartitions(); + return _minNumCharsInISToTurnOnCompression > 0 && numChars > _minNumCharsInISToTurnOnCompression; + } + return false; + } + }); + if (controllerMetrics != null) { + controllerMetrics.addMeteredValue(resourceName, ControllerMeter.IDEAL_STATE_UPDATE_RETRY, retries); + controllerMetrics.addTimedValue(resourceName, ControllerTimer.IDEAL_STATE_UPDATE_TIME_MS, + System.currentTimeMillis() - startTimeMs, TimeUnit.MILLISECONDS); + } + return idealStateWrapper._idealState; + } catch (Exception e) { + if (controllerMetrics != null) { + controllerMetrics.addMeteredValue(resourceName, ControllerMeter.IDEAL_STATE_UPDATE_FAILURE, 1L); + } + throw new RuntimeException("Caught exception while updating ideal state for resource: " + resourceName, e); + } + } +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java index eae4276b8422..5e4ff8751ff6 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java @@ -76,6 +76,7 @@ import org.apache.pinot.common.utils.ServiceStatus; import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory; import org.apache.pinot.common.utils.helix.HelixHelper; +import org.apache.pinot.common.utils.helix.IdealStateGroupCommit; import org.apache.pinot.common.utils.helix.LeadControllerUtils; import org.apache.pinot.common.utils.log.DummyLogFileServer; import org.apache.pinot.common.utils.log.LocalLogFileServer; @@ -213,7 +214,7 @@ public void init(PinotConfiguration pinotConfiguration) CommonConstants.DEFAULT_PINOT_INSECURE_MODE))); setupHelixSystemProperties(); - HelixHelper.setMinNumCharsInISToTurnOnCompression(_config.getMinNumCharsInISToTurnOnCompression()); + IdealStateGroupCommit.setMinNumCharsInISToTurnOnCompression(_config.getMinNumCharsInISToTurnOnCompression()); _listenerConfigs = ListenerConfigUtil.buildControllerConfigs(_config); _controllerMode = _config.getControllerMode(); inferHostnameIfNeeded(_config); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java index bc7d16bde6ed..edd81515989e 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java @@ -129,15 +129,18 @@ import static org.apache.pinot.spi.utils.CommonConstants.SWAGGER_AUTHORIZATION_KEY; -@Api(tags = Constants.TABLE_TAG, authorizations = {@Authorization(value = SWAGGER_AUTHORIZATION_KEY), - @Authorization(value = DATABASE)}) +@Api(tags = Constants.TABLE_TAG, authorizations = { + @Authorization(value = SWAGGER_AUTHORIZATION_KEY), + @Authorization(value = DATABASE) +}) @SwaggerDefinition(securityDefinition = @SecurityDefinition(apiKeyAuthDefinitions = { @ApiKeyAuthDefinition(name = HttpHeaders.AUTHORIZATION, in = ApiKeyAuthDefinition.ApiKeyLocation.HEADER, key = SWAGGER_AUTHORIZATION_KEY, description = "The format of the key is ```\"Basic \" or \"Bearer \"```"), @ApiKeyAuthDefinition(name = DATABASE, in = ApiKeyAuthDefinition.ApiKeyLocation.HEADER, key = DATABASE, description = "Database context passed through http header. If no context is provided 'default' database " - + "context will be considered.")})) + + "context will be considered.") +})) @Path("/") public class PinotTableRestletResource { /** @@ -1176,7 +1179,7 @@ public SuccessResponse setTimeBoundary( is.getRecord() .setSimpleField(CommonConstants.IdealState.HYBRID_TABLE_TIME_BOUNDARY, Long.toString(timeBoundaryMs)); return is; - }, RetryPolicies.exponentialBackoffRetryPolicy(5, 1000L, 1.2f)); + }, RetryPolicies.exponentialBackoffRetryPolicy(5, 1000L, 1.2f)).left(); if (idealState == null) { throw new ControllerApplicationException(LOGGER, "Could not update time boundary", @@ -1206,7 +1209,7 @@ public SuccessResponse deleteTimeBoundary( HelixHelper.updateIdealState(_pinotHelixResourceManager.getHelixZkManager(), offlineTableName, is -> { is.getRecord().getSimpleFields().remove(CommonConstants.IdealState.HYBRID_TABLE_TIME_BOUNDARY); return is; - }, RetryPolicies.exponentialBackoffRetryPolicy(5, 1000L, 1.2f)); + }, RetryPolicies.exponentialBackoffRetryPolicy(5, 1000L, 1.2f)).left(); if (idealState == null) { throw new ControllerApplicationException(LOGGER, "Could not remove time boundary", diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java index 910291ff0062..01425141f4b7 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java @@ -981,7 +981,7 @@ IdealState updateIdealStateOnSegmentCompletion(String realtimeTableName, String updateInstanceStatesForNewConsumingSegment(idealState.getRecord().getMapFields(), committingSegmentName, isTablePaused(idealState) ? null : newSegmentName, segmentAssignment, instancePartitionsMap); return idealState; - }, RetryPolicies.exponentialBackoffRetryPolicy(10, 1000L, 1.2f)); + }, RetryPolicies.exponentialBackoffRetryPolicy(10, 1000L, 1.2f)).left(); } public static boolean isTablePaused(IdealState idealState) {