diff --git a/job/server/src/main/java/alluxio/job/plan/replicate/SetReplicaDefinition.java b/job/server/src/main/java/alluxio/job/plan/replicate/SetReplicaDefinition.java index 819402db7569..8db1e861fffd 100644 --- a/job/server/src/main/java/alluxio/job/plan/replicate/SetReplicaDefinition.java +++ b/job/server/src/main/java/alluxio/job/plan/replicate/SetReplicaDefinition.java @@ -20,6 +20,7 @@ import alluxio.conf.Configuration; import alluxio.exception.status.NotFoundException; import alluxio.grpc.RemoveBlockRequest; +import alluxio.grpc.SetAttributePOptions; import alluxio.job.RunTaskContext; import alluxio.job.SelectExecutorsContext; import alluxio.job.plan.AbstractVoidPlanDefinition; @@ -37,6 +38,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.util.Collections; import java.util.List; import java.util.Set; @@ -168,8 +170,25 @@ private void replicate(SetReplicaConfig config, RunTaskContext context) throws E // to avoid the the race between "replicate" and "rename", so that even a file to replicate is // renamed, the job is still working on the correct file. URIStatus status = context.getFileSystem().getStatus(new AlluxioURI(config.getPath())); - - JobUtils.loadBlock(status, context.getFsContext(), config.getBlockId(), null, false); + try { + JobUtils.loadBlock(status, context.getFsContext(), config.getBlockId(), null, false); + } catch (IOException e) { + // This will remove the file from the pinlist if it fails to replicate, there can be false + // positives because replication can fail transiently and this would unpin it. However, + // compared to repeatedly replicating, this is a more acceptable result. + LOG.warn("Replication of {} failed, reduce min replication to 0 and unpin. Reason: {} ", + status.getPath(), e.getMessage()); + SetAttributePOptions.Builder optionsBuilder = + SetAttributePOptions.newBuilder(); + try { + context.getFileSystem().setAttribute(new AlluxioURI(config.getPath()), + optionsBuilder.setReplicationMin(0).setPinned(false).build()); + } catch (Throwable e2) { + e.addSuppressed(e2); + LOG.warn("Attempt to set min replication to 0 and unpin failed due to ", e2); + } + throw e; + } LOG.info("Replicated file " + config.getPath() + " block " + config.getBlockId()); } }