Skip to content

Commit

Permalink
Change back the commit signature to return IdealState
Browse files Browse the repository at this point in the history
  • Loading branch information
xiangfu0 committed Sep 14, 2024
1 parent e79b9fb commit 8e4edc7
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,18 +78,18 @@ public static IdealState cloneIdealState(IdealState idealState) {
(ZNRecord) ZN_RECORD_SERIALIZER.deserialize(ZN_RECORD_SERIALIZER.serialize(idealState.getRecord())));
}

public static boolean updateIdealState(HelixManager helixManager, String resourceName,
public static IdealState updateIdealState(HelixManager helixManager, String resourceName,
Function<IdealState, IdealState> updater) {
return IDEAL_STATE_GROUP_COMMIT.commit(helixManager, resourceName, updater,
DEFAULT_TABLE_IDEALSTATES_UPDATE_RETRY_POLICY, false);
}

public static boolean updateIdealState(HelixManager helixManager, String resourceName,
public static IdealState updateIdealState(HelixManager helixManager, String resourceName,
Function<IdealState, IdealState> updater, RetryPolicy retryPolicy) {
return IDEAL_STATE_GROUP_COMMIT.commit(helixManager, resourceName, updater, retryPolicy, false);
}

public static boolean updateIdealState(HelixManager helixManager, String resourceName,
public static IdealState updateIdealState(HelixManager helixManager, String resourceName,
Function<IdealState, IdealState> updater, RetryPolicy retryPolicy, boolean noChangeOk) {
return IDEAL_STATE_GROUP_COMMIT.commit(helixManager, resourceName, updater, retryPolicy, noChangeOk);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ private static class Queue {
private static class Entry {
final String _resourceName;
final Function<IdealState, IdealState> _updater;
IdealState _updatedIdealState = null;
AtomicBoolean _sent = new AtomicBoolean(false);

Entry(String resourceName, Function<IdealState, IdealState> updater) {
Expand Down Expand Up @@ -103,21 +104,21 @@ public static synchronized void setMinNumCharsInISToTurnOnCompression(int minNum
* @param helixManager helixManager with the ability to pull from the current data\
* @param resourceName the resource name to be updated
* @param updater the idealState updater to be applied
* @return true if successful, false otherwise
* @return IdealState if the update is successful, null if not
*/
public boolean commit(HelixManager helixManager, String resourceName,
public IdealState commit(HelixManager helixManager, String resourceName,
Function<IdealState, IdealState> updater, RetryPolicy retryPolicy, boolean noChangeOk) {
Queue queue = getQueue(resourceName);
Entry entry = new Entry(resourceName, updater);

boolean success = true;
queue._pending.add(entry);
while (!entry._sent.get()) {
if (queue._running.compareAndSet(null, Thread.currentThread())) {
ArrayList<Entry> processed = new ArrayList<>();
try {
if (queue._pending.peek() == null) {
return true;
// All pending entries have been processed, the updatedIdealState should be set.
return entry._updatedIdealState;
}
// remove from queue
Entry first = queue._pending.poll();
Expand All @@ -137,6 +138,7 @@ public boolean commit(HelixManager helixManager, String resourceName,
* value in ZK; use it as initial value if exists
*/
IdealState updatedIdealState = first._updater.apply(idealStateCopy);
first._updatedIdealState = updatedIdealState;
Iterator<Entry> it = queue._pending.iterator();
while (it.hasNext()) {
Entry ent = it.next();
Expand All @@ -145,9 +147,9 @@ public boolean commit(HelixManager helixManager, String resourceName,
}
processed.add(ent);
updatedIdealState = ent._updater.apply(idealStateCopy);
ent._updatedIdealState = updatedIdealState;
it.remove();
}
success = false;
IdealState finalUpdatedIdealState = updatedIdealState;
updateIdealState(helixManager, resourceName, anyIdealState -> finalUpdatedIdealState,
retryPolicy, noChangeOk);
Expand All @@ -169,12 +171,12 @@ public boolean commit(HelixManager helixManager, String resourceName,
e);
// Restore interrupt status
Thread.currentThread().interrupt();
return false;
return null;
}
}
}
}
return success;
return entry._updatedIdealState;
}

private static class IdealStateWrapper {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import javax.inject.Inject;
Expand Down Expand Up @@ -1175,15 +1174,14 @@ public SuccessResponse setTimeBoundary(
}

// Set the timeBoundary in tableIdealState
AtomicReference<IdealState> atomicIdealState = new AtomicReference<>();
HelixHelper.updateIdealState(_pinotHelixResourceManager.getHelixZkManager(), offlineTableName, is -> {
is.getRecord()
.setSimpleField(CommonConstants.IdealState.HYBRID_TABLE_TIME_BOUNDARY, Long.toString(timeBoundaryMs));
atomicIdealState.set(is);
return is;
}, RetryPolicies.exponentialBackoffRetryPolicy(5, 1000L, 1.2f));

if (atomicIdealState.get() == null) {
IdealState idealState =
HelixHelper.updateIdealState(_pinotHelixResourceManager.getHelixZkManager(), offlineTableName, is -> {
is.getRecord()
.setSimpleField(CommonConstants.IdealState.HYBRID_TABLE_TIME_BOUNDARY, Long.toString(timeBoundaryMs));
return is;
}, RetryPolicies.exponentialBackoffRetryPolicy(5, 1000L, 1.2f));

if (idealState == null) {
throw new ControllerApplicationException(LOGGER, "Could not update time boundary",
Response.Status.INTERNAL_SERVER_ERROR);
}
Expand All @@ -1207,14 +1205,13 @@ public SuccessResponse deleteTimeBoundary(
}

// Delete the timeBoundary in tableIdealState
AtomicReference<IdealState> atomicIdealState = new AtomicReference<>();
HelixHelper.updateIdealState(_pinotHelixResourceManager.getHelixZkManager(), offlineTableName, is -> {
is.getRecord().getSimpleFields().remove(CommonConstants.IdealState.HYBRID_TABLE_TIME_BOUNDARY);
atomicIdealState.set(is);
return is;
}, RetryPolicies.exponentialBackoffRetryPolicy(5, 1000L, 1.2f));

if (atomicIdealState.get() == null) {
IdealState idealState =
HelixHelper.updateIdealState(_pinotHelixResourceManager.getHelixZkManager(), offlineTableName, is -> {
is.getRecord().getSimpleFields().remove(CommonConstants.IdealState.HYBRID_TABLE_TIME_BOUNDARY);
return is;
}, RetryPolicies.exponentialBackoffRetryPolicy(5, 1000L, 1.2f));

if (idealState == null) {
throw new ControllerApplicationException(LOGGER, "Could not remove time boundary",
Response.Status.INTERNAL_SERVER_ERROR);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -956,8 +955,7 @@ public void ensureAllPartitionsConsuming(TableConfig tableConfig, StreamConfig s
IdealState updateIdealStateOnSegmentCompletion(String realtimeTableName, String committingSegmentName,
String newSegmentName, SegmentAssignment segmentAssignment,
Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap) {
final AtomicReference<IdealState> atomicIdealState = new AtomicReference<>();
HelixHelper.updateIdealState(_helixManager, realtimeTableName, idealState -> {
return HelixHelper.updateIdealState(_helixManager, realtimeTableName, idealState -> {
assert idealState != null;
// When segment completion begins, the zk metadata is updated, followed by ideal state.
// We allow only {@link PinotLLCRealtimeSegmentManager::MAX_SEGMENT_COMPLETION_TIME_MILLIS} ms for a segment to
Expand All @@ -976,10 +974,8 @@ IdealState updateIdealStateOnSegmentCompletion(String realtimeTableName, String
}
updateInstanceStatesForNewConsumingSegment(idealState.getRecord().getMapFields(), committingSegmentName,
isTablePaused(idealState) ? null : newSegmentName, segmentAssignment, instancePartitionsMap);
atomicIdealState.set(idealState);
return idealState;
}, RetryPolicies.exponentialBackoffRetryPolicy(10, 1000L, 1.2f));
return atomicIdealState.get();
}

public static boolean isTablePaused(IdealState idealState) {
Expand Down Expand Up @@ -1758,18 +1754,16 @@ private IdealState updatePauseStateInIdealState(String tableNameWithType, boolea
PauseState.ReasonCode reasonCode, @Nullable String comment) {
PauseState pauseState = new PauseState(pause, reasonCode, comment,
new Timestamp(System.currentTimeMillis()).toString());
AtomicReference<IdealState> updatedIdealState = new AtomicReference<>();
HelixHelper.updateIdealState(_helixManager, tableNameWithType, idealState -> {
IdealState updatedIdealState = HelixHelper.updateIdealState(_helixManager, tableNameWithType, idealState -> {
ZNRecord znRecord = idealState.getRecord();
znRecord.setSimpleField(PAUSE_STATE, pauseState.toJsonString());
// maintain for backward compatibility
znRecord.setSimpleField(IS_TABLE_PAUSED, Boolean.valueOf(pause).toString());
updatedIdealState.set(new IdealState(znRecord));
return updatedIdealState.get();
return new IdealState(znRecord);
}, RetryPolicies.noDelayRetryPolicy(3));
LOGGER.info("Set 'pauseState' to {} in the Ideal State for table {}. "
+ "Also set 'isTablePaused' to {} for backward compatibility.", pauseState, tableNameWithType, pause);
return updatedIdealState.get();
return updatedIdealState;
}

private void sendForceCommitMessageToServers(String tableNameWithType, Set<String> consumingSegments) {
Expand Down

0 comments on commit 8e4edc7

Please sign in to comment.