Skip to content

Commit

Permalink
add more binding support and adjust log wording
Browse files Browse the repository at this point in the history
  • Loading branch information
jtuglu-netflix committed Oct 30, 2024
1 parent 49c7941 commit ec80d2f
Show file tree
Hide file tree
Showing 7 changed files with 28 additions and 30 deletions.
1 change: 1 addition & 0 deletions docs/development/modules.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ In addition to `DataSegmentPusher` and `URIDataPuller`, you can also bind:

* [`DataSegmentKiller`](https://github.com/apache/druid/blob/master/processing/src/main/java/org/apache/druid/segment/loading/DataSegmentKiller.java): Removes segments, used as part of the Kill Task to delete unused segments, i.e. perform garbage collection of segments that are either superseded by newer versions or that have been dropped from the cluster.
* [`DataSegmentMover`](https://github.com/apache/druid/blob/master/processing/src/main/java/org/apache/druid/segment/loading/DataSegmentMover.java): Allow migrating segments from one place to another, currently this is only used as part of the MoveTask to move unused segments to a different S3 bucket or prefix, typically to reduce storage costs of unused data (e.g. move to glacier or cheaper storage)
* [`DataSegmentCopier`](https://github.com/apache/druid/blob/master/processing/src/main/java/org/apache/druid/segment/loading/DataSegmentCopier.java): Allow copying segments from one place to another, available for use by extensions, etc.
* [`DataSegmentArchiver`](https://github.com/apache/druid/blob/master/processing/src/main/java/org/apache/druid/segment/loading/DataSegmentArchiver.java): Just a wrapper around Mover, but comes with a preconfigured target bucket/path, so it doesn't have to be specified at runtime as part of the ArchiveTask.

### Validating your deep storage implementation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,6 @@ public class S3DataSegmentCopier extends S3DataSegmentTransferUtility implements
{
private static final Logger log = new Logger(S3DataSegmentCopier.class);

/**
* Any implementation of DataSegmentCopier is initialized when an ingestion job starts if the extension is loaded
* even when the implementation of DataSegmentCopier is not used. As a result, if we accept a s3 client instead
* of a supplier of it, it can cause unnecessary config validation for s3 even when it's not used at all.
* To perform the config validation only when it is actually used, we use a supplier.
*
* See OmniDataSegmentCopier for how DataSegmentCopiers are initialized.
*/
@Inject
public S3DataSegmentCopier(
Supplier<ServerSideEncryptingAmazonS3> s3ClientSupplier,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ protected DataSegment transfer(DataSegment segment, Map<String, Object> targetLo
throw new SegmentLoadingException("Target S3 baseKey is not specified");
}

safeMove(s3Bucket, s3Path, targetS3Bucket, targetS3Path, deleteSource);
safeTransfer(s3Bucket, s3Path, targetS3Bucket, targetS3Path, deleteSource);

return segment.withLoadSpec(
ImmutableMap.<String, Object>builder()
Expand All @@ -83,11 +83,11 @@ public boolean apply(String input)
);
}
catch (AmazonServiceException e) {
throw new SegmentLoadingException(e, "Unable to move segment[%s]: [%s]", segment.getId(), e);
throw new SegmentLoadingException(e, "Unable to transfer segment[%s]: [%s]", segment.getId(), e);
}
}

private void safeMove(
private void safeTransfer(
final String s3Bucket,
final String s3Path,
final String targetS3Bucket,
Expand All @@ -106,11 +106,11 @@ private void safeMove(
targetS3Path
);
try {
selfCheckingMove(s3Bucket, targetS3Bucket, s3Path, targetS3Path, copyMsg, deleteSource);
selfCheckingTransfer(s3Bucket, targetS3Bucket, s3Path, targetS3Path, copyMsg, deleteSource);
return null;
}
catch (AmazonServiceException | IOException | SegmentLoadingException e) {
log().info(e, "Error while trying to move " + copyMsg);
log().info(e, "Error while trying to transfer " + copyMsg);
throw e;
}
}
Expand All @@ -124,12 +124,12 @@ private void safeMove(
}

/**
* Copies an object and after that checks that the object is present at the target location, via a separate API call.
* If it is not, an exception is thrown, and the object is not deleted at the old location. This "paranoic" check
* is added after it was observed that S3 may report a successful move, and the object is not found at the target
* Transfers an object and after that checks that the object is present at the target location, via a separate API call.
* If it is not, an exception is thrown, and (even if deleteSource=true) the object is not deleted at the old location. This "paranoic" check
* is added after it was observed that S3 may report a successful transfer, and the object is not found at the target
* location.
*/
private void selfCheckingMove(
private void selfCheckingTransfer(
String s3Bucket,
String targetS3Bucket,
String s3Path,
Expand All @@ -139,7 +139,7 @@ private void selfCheckingMove(
) throws IOException, SegmentLoadingException
{
if (s3Bucket.equals(targetS3Bucket) && s3Path.equals(targetS3Path)) {
log().info("No need to move file[s3://%s/%s] onto itself", s3Bucket, s3Path);
log().info("No need to transfer file[s3://%s/%s] onto itself", s3Bucket, s3Path);
return;
}
final ServerSideEncryptingAmazonS3 s3Client = this.s3ClientSupplier.get();
Expand All @@ -162,13 +162,13 @@ private void selfCheckingMove(
StorageClass.fromValue(StringUtils.toUpperCase(objectSummary.getStorageClass())).equals(StorageClass.Glacier)) {
throw new AmazonServiceException(
StringUtils.format(
"Cannot move file[s3://%s/%s] of storage class glacier, skipping.",
"Cannot transfer file[s3://%s/%s] of storage class glacier, skipping.",
s3Bucket,
s3Path
)
);
} else {
log().info("Moving file %s", copyMsg);
log().info("Transfering file %s", copyMsg);
final CopyObjectRequest copyRequest = new CopyObjectRequest(s3Bucket, s3Path, targetS3Bucket, targetS3Path);
if (!config.getDisableAcl()) {
copyRequest.setAccessControlList(S3Utils.grantFullControlToBucketOwner(s3Client, targetS3Bucket));
Expand All @@ -181,23 +181,24 @@ private void selfCheckingMove(
);
}
if (deleteSource) {
log().info("Beginning deletion of [s3://%s/%s]", s3Bucket, s3Path);
deleteWithRetriesSilent(s3Bucket, s3Path);
}
log().debug("Finished moving file %s", copyMsg);
log().debug("Finished transfering file %s", copyMsg);
}
} else {
// ensure object exists in target location
if (s3Client.doesObjectExist(targetS3Bucket, targetS3Path)) {
log().info(
"Not moving file [s3://%s/%s], already present in target location [s3://%s/%s]",
"Not transfering file [s3://%s/%s], already present in target location [s3://%s/%s]",
s3Bucket,
s3Path,
targetS3Bucket,
targetS3Path
);
} else {
throw new SegmentLoadingException(
"Unable to move file %s, not present in either source or target location",
"Unable to transfer file %s, not present in either source or target location",
copyMsg
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ public void configure(Binder binder)
.addBinding(SCHEME_S3_ZIP)
.to(S3DataSegmentKiller.class)
.in(LazySingleton.class);
Binders.dataSegmentCopierBinder(binder)
.addBinding(SCHEME_S3_ZIP)
.to(S3DataSegmentCopier.class)
.in(LazySingleton.class);
Binders.dataSegmentMoverBinder(binder)
.addBinding(SCHEME_S3_ZIP)
.to(S3DataSegmentMover.class)
Expand Down
6 changes: 6 additions & 0 deletions processing/src/main/java/org/apache/druid/guice/Binders.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.inject.multibindings.MapBinder;
import org.apache.druid.guice.annotations.PublicApi;
import org.apache.druid.segment.loading.DataSegmentArchiver;
import org.apache.druid.segment.loading.DataSegmentCopier;
import org.apache.druid.segment.loading.DataSegmentKiller;
import org.apache.druid.segment.loading.DataSegmentMover;
import org.apache.druid.segment.loading.DataSegmentPusher;
Expand All @@ -45,6 +46,11 @@ public static MapBinder<String, DataSegmentMover> dataSegmentMoverBinder(Binder
return MapBinder.newMapBinder(binder, String.class, DataSegmentMover.class);
}

public static MapBinder<String, DataSegmentCopier> dataSegmentCopierBinder(Binder binder)
{
return MapBinder.newMapBinder(binder, String.class, DataSegmentCopier.class);
}

public static MapBinder<String, DataSegmentArchiver> dataSegmentArchiverBinder(Binder binder)
{
return MapBinder.newMapBinder(binder, String.class, DataSegmentArchiver.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,6 @@

import java.util.Map;

/**
* DataSegmentCopier knows how to copy the segment location from one to another.
* Since any implementation of DataSegmentCopier is initialized when an ingestion job starts
* if a deep storage extension is loaded even when that deep storage is actually not used,
* implementations should avoid initializing the deep storage client immediately
* but defer it until the deep storage client is actually used.
*/
@ExtensionPoint
public interface DataSegmentCopier
{
Expand Down
1 change: 1 addition & 0 deletions website/.spelling
Original file line number Diff line number Diff line change
Expand Up @@ -1284,6 +1284,7 @@ ComplexMetrics
DataSegmentArchiver
DataSegmentKiller
DataSegmentMover
DataSegmentCopier
URIDataPuller
DataSegmentPusher
DruidModule
Expand Down

0 comments on commit ec80d2f

Please sign in to comment.