Skip to content

Commit

Permalink
[controller] Check only parent colo store defered swap config (linked…
Browse files Browse the repository at this point in the history
…in#994)

During batch pushes controller queries all child controller to fetch store config to validate the deferred version swap flag. This could be flaky if the some colo fails to respond in timely manner (it does 5 retries). Since the parent colo has the required store config, this PR checks only that instead of doing a expensive remote call to the child controller.
  • Loading branch information
majisourav99 authored May 21, 2024
1 parent 2ecfa83 commit d0f931b
Showing 1 changed file with 7 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1154,11 +1154,10 @@ Optional<String> getTopicForCurrentPushJob(
*/
if (latestTopic.isPresent()) {
LOGGER.debug("Latest kafka topic for store: {} is {}", storeName, latestTopic.get());

final String latestTopicName = latestTopic.get().getName();
int versionNumber = Version.parseVersionFromKafkaTopicName(latestTopicName);
boolean deferredSwap = isDeferredSwap(clusterName, storeName, versionNumber);
if (deferredSwap) {
Store store = getStore(clusterName, storeName);
if (store.getVersion(versionNumber).map(Version::isVersionSwapDeferred).orElse(false)) {
LOGGER.error(
"There is already future version {} exists for store {}, please wait till the future version is made current.",
versionNumber,
Expand Down Expand Up @@ -3541,13 +3540,13 @@ private OfflinePushStatusInfo getOffLineJobStatus(
if (currentReturnStatus.isTerminal()) {
String storeName = Version.parseStoreFromKafkaTopicName(kafkaTopic);
int versionNum = Version.parseVersionFromKafkaTopicName(kafkaTopic);
boolean deferredSwap = isDeferredSwap(clusterName, storeName, versionNum);
HelixVeniceClusterResources resources = getVeniceHelixAdmin().getHelixVeniceClusterResources(clusterName);

if (!deferredSwap) {
try (AutoCloseableLock ignore = resources.getClusterLockManager().createStoreWriteLock(storeName)) {
ReadWriteStoreRepository repository = resources.getStoreMetadataRepository();
Store parentStore = repository.getStore(storeName);
try (AutoCloseableLock ignore = resources.getClusterLockManager().createStoreWriteLock(storeName)) {
ReadWriteStoreRepository repository = resources.getStoreMetadataRepository();
Store parentStore = repository.getStore(storeName);
boolean isDeferredSwap = parentStore.getVersion(versionNum).map(Version::isVersionSwapDeferred).orElse(false);
if (!isDeferredSwap) {
// targetedRegions is non-empty for target region push of batch store
boolean isTargetRegionPush = !StringUtils.isEmpty(targetedRegions);
Version storeVersion = parentStore.getVersion(versionNum);
Expand Down Expand Up @@ -3591,22 +3590,6 @@ private OfflinePushStatusInfo getOffLineJobStatus(
extraInfoUpdateTimestamp);
}

boolean isDeferredSwap(String clusterName, String storeName, int versionNum) {
Map<String, String> futureVersions = getFutureVersionsForMultiColos(clusterName, storeName);
for (Map.Entry<String, String> entry: futureVersions.entrySet()) {
ControllerClient controllerClient = getVeniceHelixAdmin().getControllerClientMap(clusterName).get(entry.getKey());
StoreResponse storeResponse = ControllerClient.retryableRequest(controllerClient, 5, c -> c.getStore(storeName));
StoreInfo store = storeResponse.getStore();
Optional<Version> version = store.getVersion(versionNum);
if (version.isPresent()) {
if (version.get().isVersionSwapDeferred() && store.getCurrentVersion() < versionNum) {
return true;
}
}
}
return false;
}

/**
* Based on the global information, start determining the final status to return
* @param sortedStatuses
Expand Down

0 comments on commit d0f931b

Please sign in to comment.