Skip to content

Commit

Permalink
Replace AtomicRef by effective final metaclient
Browse files Browse the repository at this point in the history
  • Loading branch information
codope committed Nov 15, 2024
1 parent e4522f6 commit 546cbcd
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -911,10 +910,13 @@ public String startCommit() {
*/
public String startCommit(String actionType, HoodieTableMetaClient metaClient) {
if (needsUpgradeOrDowngrade(metaClient)) {
// Wrap metaClient in an AtomicReference to allow updating within the lambda
AtomicReference<HoodieTableMetaClient> metaClientRef = new AtomicReference<>(metaClient);
executeUsingTxnManager(Option.empty(), () -> tryUpgrade(metaClientRef.get(), Option.empty()));
metaClient = metaClientRef.get();
final HoodieTableMetaClient[] updatedMetaClient = {metaClient};
executeUsingTxnManager(Option.empty(), () -> {
if(tryUpgrade(updatedMetaClient[0], Option.empty())) {
updatedMetaClient[0] = HoodieTableMetaClient.reload(updatedMetaClient[0]);
}
});
metaClient = updatedMetaClient[0];
}

CleanerUtils.rollbackFailedWrites(config.getFailedWritesCleanPolicy(),
Expand Down Expand Up @@ -947,11 +949,13 @@ public void startCommitWithTime(String instantTime, String actionType) {
*/
private void startCommitWithTime(String instantTime, String actionType, HoodieTableMetaClient metaClient) {
if (needsUpgradeOrDowngrade(metaClient)) {
// Wrap metaClient in an AtomicReference to allow updating within the lambda
AtomicReference<HoodieTableMetaClient> metaClientRef = new AtomicReference<>(metaClient);
// unclear what instant to use, since upgrade does have a given instant.
executeUsingTxnManager(Option.empty(), () -> tryUpgrade(metaClientRef.get(), Option.empty()));
metaClient = metaClientRef.get();
final HoodieTableMetaClient[] updatedMetaClient = {metaClient};
executeUsingTxnManager(Option.empty(), () -> {
if(tryUpgrade(updatedMetaClient[0], Option.empty())) {
updatedMetaClient[0] = HoodieTableMetaClient.reload(updatedMetaClient[0]);
}
});
metaClient = updatedMetaClient[0];
}
CleanerUtils.rollbackFailedWrites(config.getFailedWritesCleanPolicy(),
HoodieTimeline.COMMIT_ACTION, () -> tableServiceClient.rollbackFailedWrites());
Expand Down Expand Up @@ -1267,14 +1271,15 @@ protected HoodieTableMetaClient initTableAndGetMetaClient(WriteOperationType ope
if (instantTime.isPresent()) {
ownerInstant = Option.of(new HoodieInstant(true, CommitUtils.getCommitActionType(operationType, metaClient.getTableType()), instantTime.get()));
}
// Wrap metaClient in an AtomicReference to allow updating within the lambda
AtomicReference<HoodieTableMetaClient> metaClientRef = new AtomicReference<>(metaClient);
final HoodieTableMetaClient[] updatedMetaClient = {metaClient};
executeUsingTxnManager(ownerInstant, () -> {
metaClientRef.set(tryUpgrade(metaClientRef.get(), instantTime));
if (tryUpgrade(updatedMetaClient[0], instantTime)) {
updatedMetaClient[0] = HoodieTableMetaClient.reload(updatedMetaClient[0]);
}
// TODO: this also does MT table management..
initMetadataTable(instantTime, metaClientRef.get());
initMetadataTable(instantTime, updatedMetaClient[0]);
});
return metaClientRef.get();
return updatedMetaClient[0];
}

private void executeUsingTxnManager(Option<HoodieInstant> ownerInstant, Runnable r) {
Expand Down Expand Up @@ -1442,12 +1447,11 @@ public void setWriteTimer(String commitType) {
* @param metaClient instance of {@link HoodieTableMetaClient} to use.
* @param instantTime instant time of interest if we have one.
*/
protected HoodieTableMetaClient tryUpgrade(HoodieTableMetaClient metaClient, Option<String> instantTime) {
protected boolean tryUpgrade(HoodieTableMetaClient metaClient, Option<String> instantTime) {
UpgradeDowngrade upgradeDowngrade =
new UpgradeDowngrade(metaClient, config, context, upgradeDowngradeHelper);

if (upgradeDowngrade.needsUpgradeOrDowngrade(config.getWriteVersion())) {
metaClient = HoodieTableMetaClient.reload(metaClient);
// Ensure no inflight commits by setting EAGER policy and explicitly cleaning all failed commits
List<String> instantsToRollback = tableServiceClient.getInstantsToRollback(metaClient, HoodieFailedWritesCleaningPolicy.EAGER, instantTime);

Expand All @@ -1457,11 +1461,10 @@ protected HoodieTableMetaClient tryUpgrade(HoodieTableMetaClient metaClient, Opt
tableServiceClient.rollbackFailedWrites(pendingRollbacks, true, true);
}

metaClient.reloadActiveTimeline();
upgradeDowngrade.setMetaClient(metaClient);
upgradeDowngrade.run(HoodieTableVersion.current(), instantTime.orElse(null));
return true;
}
return upgradeDowngrade.getMetaClient();
return false;
}

private boolean needsUpgradeOrDowngrade(HoodieTableMetaClient metaClient) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,6 @@ public UpgradeDowngrade(
this.upgradeDowngradeHelper = upgradeDowngradeHelper;
}

public HoodieTableMetaClient getMetaClient() {
return metaClient;
}

public void setMetaClient(HoodieTableMetaClient metaClient) {
this.metaClient = metaClient;
}

public boolean needsUpgradeOrDowngrade(HoodieTableVersion toWriteVersion) {
HoodieTableVersion fromTableVersion = metaClient.getTableConfig().getTableVersion();

Expand Down

0 comments on commit 546cbcd

Please sign in to comment.