diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/HelixHelper.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/HelixHelper.java index 4604083439c..43e6210e18d 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/HelixHelper.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/HelixHelper.java @@ -78,18 +78,18 @@ public static IdealState cloneIdealState(IdealState idealState) { (ZNRecord) ZN_RECORD_SERIALIZER.deserialize(ZN_RECORD_SERIALIZER.serialize(idealState.getRecord()))); } - public static boolean updateIdealState(HelixManager helixManager, String resourceName, + public static IdealState updateIdealState(HelixManager helixManager, String resourceName, Function updater) { return IDEAL_STATE_GROUP_COMMIT.commit(helixManager, resourceName, updater, DEFAULT_TABLE_IDEALSTATES_UPDATE_RETRY_POLICY, false); } - public static boolean updateIdealState(HelixManager helixManager, String resourceName, + public static IdealState updateIdealState(HelixManager helixManager, String resourceName, Function updater, RetryPolicy retryPolicy) { return IDEAL_STATE_GROUP_COMMIT.commit(helixManager, resourceName, updater, retryPolicy, false); } - public static boolean updateIdealState(HelixManager helixManager, String resourceName, + public static IdealState updateIdealState(HelixManager helixManager, String resourceName, Function updater, RetryPolicy retryPolicy, boolean noChangeOk) { return IDEAL_STATE_GROUP_COMMIT.commit(helixManager, resourceName, updater, retryPolicy, noChangeOk); } diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/IdealStateGroupCommit.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/IdealStateGroupCommit.java index 1e86401e5bc..ea74fb18e27 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/IdealStateGroupCommit.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/IdealStateGroupCommit.java @@ -70,6 +70,7 @@ private static class Queue { private static class Entry { final String _resourceName; final Function _updater; + IdealState _updatedIdealState = null; AtomicBoolean _sent = new AtomicBoolean(false); Entry(String resourceName, Function updater) { @@ -103,21 +104,21 @@ public static synchronized void setMinNumCharsInISToTurnOnCompression(int minNum * @param helixManager helixManager with the ability to pull from the current data\ * @param resourceName the resource name to be updated * @param updater the idealState updater to be applied - * @return true if successful, false otherwise + * @return IdealState if the update is successful, null if not */ - public boolean commit(HelixManager helixManager, String resourceName, + public IdealState commit(HelixManager helixManager, String resourceName, Function updater, RetryPolicy retryPolicy, boolean noChangeOk) { Queue queue = getQueue(resourceName); Entry entry = new Entry(resourceName, updater); - boolean success = true; queue._pending.add(entry); while (!entry._sent.get()) { if (queue._running.compareAndSet(null, Thread.currentThread())) { ArrayList processed = new ArrayList<>(); try { if (queue._pending.peek() == null) { - return true; + // All pending entries have been processed, the updatedIdealState should be set. + return entry._updatedIdealState; } // remove from queue Entry first = queue._pending.poll(); @@ -137,6 +138,7 @@ public boolean commit(HelixManager helixManager, String resourceName, * value in ZK; use it as initial value if exists */ IdealState updatedIdealState = first._updater.apply(idealStateCopy); + first._updatedIdealState = updatedIdealState; Iterator it = queue._pending.iterator(); while (it.hasNext()) { Entry ent = it.next(); @@ -145,9 +147,9 @@ public boolean commit(HelixManager helixManager, String resourceName, } processed.add(ent); updatedIdealState = ent._updater.apply(idealStateCopy); + ent._updatedIdealState = updatedIdealState; it.remove(); } - success = false; IdealState finalUpdatedIdealState = updatedIdealState; updateIdealState(helixManager, resourceName, anyIdealState -> finalUpdatedIdealState, retryPolicy, noChangeOk); @@ -169,12 +171,12 @@ public boolean commit(HelixManager helixManager, String resourceName, e); // Restore interrupt status Thread.currentThread().interrupt(); - return false; + return null; } } } } - return success; + return entry._updatedIdealState; } private static class IdealStateWrapper { diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java index 78b4256f9e4..dee89efd64b 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java @@ -48,7 +48,6 @@ import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; -import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import javax.annotation.Nullable; import javax.inject.Inject; @@ -1175,15 +1174,14 @@ public SuccessResponse setTimeBoundary( } // Set the timeBoundary in tableIdealState - AtomicReference atomicIdealState = new AtomicReference<>(); - HelixHelper.updateIdealState(_pinotHelixResourceManager.getHelixZkManager(), offlineTableName, is -> { - is.getRecord() - .setSimpleField(CommonConstants.IdealState.HYBRID_TABLE_TIME_BOUNDARY, Long.toString(timeBoundaryMs)); - atomicIdealState.set(is); - return is; - }, RetryPolicies.exponentialBackoffRetryPolicy(5, 1000L, 1.2f)); - - if (atomicIdealState.get() == null) { + IdealState idealState = + HelixHelper.updateIdealState(_pinotHelixResourceManager.getHelixZkManager(), offlineTableName, is -> { + is.getRecord() + .setSimpleField(CommonConstants.IdealState.HYBRID_TABLE_TIME_BOUNDARY, Long.toString(timeBoundaryMs)); + return is; + }, RetryPolicies.exponentialBackoffRetryPolicy(5, 1000L, 1.2f)); + + if (idealState == null) { throw new ControllerApplicationException(LOGGER, "Could not update time boundary", Response.Status.INTERNAL_SERVER_ERROR); } @@ -1207,14 +1205,13 @@ public SuccessResponse deleteTimeBoundary( } // Delete the timeBoundary in tableIdealState - AtomicReference atomicIdealState = new AtomicReference<>(); - HelixHelper.updateIdealState(_pinotHelixResourceManager.getHelixZkManager(), offlineTableName, is -> { - is.getRecord().getSimpleFields().remove(CommonConstants.IdealState.HYBRID_TABLE_TIME_BOUNDARY); - atomicIdealState.set(is); - return is; - }, RetryPolicies.exponentialBackoffRetryPolicy(5, 1000L, 1.2f)); - - if (atomicIdealState.get() == null) { + IdealState idealState = + HelixHelper.updateIdealState(_pinotHelixResourceManager.getHelixZkManager(), offlineTableName, is -> { + is.getRecord().getSimpleFields().remove(CommonConstants.IdealState.HYBRID_TABLE_TIME_BOUNDARY); + return is; + }, RetryPolicies.exponentialBackoffRetryPolicy(5, 1000L, 1.2f)); + + if (idealState == null) { throw new ControllerApplicationException(LOGGER, "Could not remove time boundary", Response.Status.INTERNAL_SERVER_ERROR); } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java index 2f8a7afa32e..76bef151a01 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java @@ -39,7 +39,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import javax.annotation.Nullable; import org.apache.commons.lang3.StringUtils; @@ -956,8 +955,7 @@ public void ensureAllPartitionsConsuming(TableConfig tableConfig, StreamConfig s IdealState updateIdealStateOnSegmentCompletion(String realtimeTableName, String committingSegmentName, String newSegmentName, SegmentAssignment segmentAssignment, Map instancePartitionsMap) { - final AtomicReference atomicIdealState = new AtomicReference<>(); - HelixHelper.updateIdealState(_helixManager, realtimeTableName, idealState -> { + return HelixHelper.updateIdealState(_helixManager, realtimeTableName, idealState -> { assert idealState != null; // When segment completion begins, the zk metadata is updated, followed by ideal state. // We allow only {@link PinotLLCRealtimeSegmentManager::MAX_SEGMENT_COMPLETION_TIME_MILLIS} ms for a segment to @@ -976,10 +974,8 @@ IdealState updateIdealStateOnSegmentCompletion(String realtimeTableName, String } updateInstanceStatesForNewConsumingSegment(idealState.getRecord().getMapFields(), committingSegmentName, isTablePaused(idealState) ? null : newSegmentName, segmentAssignment, instancePartitionsMap); - atomicIdealState.set(idealState); return idealState; }, RetryPolicies.exponentialBackoffRetryPolicy(10, 1000L, 1.2f)); - return atomicIdealState.get(); } public static boolean isTablePaused(IdealState idealState) { @@ -1758,18 +1754,16 @@ private IdealState updatePauseStateInIdealState(String tableNameWithType, boolea PauseState.ReasonCode reasonCode, @Nullable String comment) { PauseState pauseState = new PauseState(pause, reasonCode, comment, new Timestamp(System.currentTimeMillis()).toString()); - AtomicReference updatedIdealState = new AtomicReference<>(); - HelixHelper.updateIdealState(_helixManager, tableNameWithType, idealState -> { + IdealState updatedIdealState = HelixHelper.updateIdealState(_helixManager, tableNameWithType, idealState -> { ZNRecord znRecord = idealState.getRecord(); znRecord.setSimpleField(PAUSE_STATE, pauseState.toJsonString()); // maintain for backward compatibility znRecord.setSimpleField(IS_TABLE_PAUSED, Boolean.valueOf(pause).toString()); - updatedIdealState.set(new IdealState(znRecord)); - return updatedIdealState.get(); + return new IdealState(znRecord); }, RetryPolicies.noDelayRetryPolicy(3)); LOGGER.info("Set 'pauseState' to {} in the Ideal State for table {}. " + "Also set 'isTablePaused' to {} for backward compatibility.", pauseState, tableNameWithType, pause); - return updatedIdealState.get(); + return updatedIdealState; } private void sendForceCommitMessageToServers(String tableNameWithType, Set consumingSegments) {