diff --git a/docs/development/modules.md b/docs/development/modules.md index c62a6d4a086f..4fb0285085df 100644 --- a/docs/development/modules.md +++ b/docs/development/modules.md @@ -104,6 +104,7 @@ In addition to `DataSegmentPusher` and `URIDataPuller`, you can also bind: * [`DataSegmentKiller`](https://github.com/apache/druid/blob/master/processing/src/main/java/org/apache/druid/segment/loading/DataSegmentKiller.java): Removes segments, used as part of the Kill Task to delete unused segments, i.e. perform garbage collection of segments that are either superseded by newer versions or that have been dropped from the cluster. * [`DataSegmentMover`](https://github.com/apache/druid/blob/master/processing/src/main/java/org/apache/druid/segment/loading/DataSegmentMover.java): Allow migrating segments from one place to another, currently this is only used as part of the MoveTask to move unused segments to a different S3 bucket or prefix, typically to reduce storage costs of unused data (e.g. move to glacier or cheaper storage) +* [`DataSegmentCopier`](https://github.com/apache/druid/blob/master/processing/src/main/java/org/apache/druid/segment/loading/DataSegmentCopier.java): Allow copying segments from one place to another, available for use by extensions, etc. * [`DataSegmentArchiver`](https://github.com/apache/druid/blob/master/processing/src/main/java/org/apache/druid/segment/loading/DataSegmentArchiver.java): Just a wrapper around Mover, but comes with a preconfigured target bucket/path, so it doesn't have to be specified at runtime as part of the ArchiveTask. ### Validating your deep storage implementation diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentCopier.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentCopier.java index c8713cfd46fb..6a62f50e111d 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentCopier.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentCopier.java @@ -32,14 +32,6 @@ public class S3DataSegmentCopier extends S3DataSegmentTransferUtility implements { private static final Logger log = new Logger(S3DataSegmentCopier.class); - /** - * Any implementation of DataSegmentCopier is initialized when an ingestion job starts if the extension is loaded - * even when the implementation of DataSegmentCopier is not used. As a result, if we accept a s3 client instead - * of a supplier of it, it can cause unnecessary config validation for s3 even when it's not used at all. - * To perform the config validation only when it is actually used, we use a supplier. - * - * See OmniDataSegmentCopier for how DataSegmentCopiers are initialized. - */ @Inject public S3DataSegmentCopier( Supplier s3ClientSupplier, diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentTransferUtility.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentTransferUtility.java index 59c265de88e6..7fd98bc4412a 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentTransferUtility.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentTransferUtility.java @@ -60,7 +60,7 @@ protected DataSegment transfer(DataSegment segment, Map targetLo throw new SegmentLoadingException("Target S3 baseKey is not specified"); } - safeMove(s3Bucket, s3Path, targetS3Bucket, targetS3Path, deleteSource); + safeTransfer(s3Bucket, s3Path, targetS3Bucket, targetS3Path, deleteSource); return segment.withLoadSpec( ImmutableMap.builder() @@ -83,11 +83,11 @@ public boolean apply(String input) ); } catch (AmazonServiceException e) { - throw new SegmentLoadingException(e, "Unable to move segment[%s]: [%s]", segment.getId(), e); + throw new SegmentLoadingException(e, "Unable to transfer segment[%s]: [%s]", segment.getId(), e); } } - private void safeMove( + private void safeTransfer( final String s3Bucket, final String s3Path, final String targetS3Bucket, @@ -106,11 +106,11 @@ private void safeMove( targetS3Path ); try { - selfCheckingMove(s3Bucket, targetS3Bucket, s3Path, targetS3Path, copyMsg, deleteSource); + selfCheckingTransfer(s3Bucket, targetS3Bucket, s3Path, targetS3Path, copyMsg, deleteSource); return null; } catch (AmazonServiceException | IOException | SegmentLoadingException e) { - log().info(e, "Error while trying to move " + copyMsg); + log().info(e, "Error while trying to transfer " + copyMsg); throw e; } } @@ -124,12 +124,12 @@ private void safeMove( } /** - * Copies an object and after that checks that the object is present at the target location, via a separate API call. - * If it is not, an exception is thrown, and the object is not deleted at the old location. This "paranoic" check - * is added after it was observed that S3 may report a successful move, and the object is not found at the target + * Transfers an object and after that checks that the object is present at the target location, via a separate API call. + * If it is not, an exception is thrown, and (even if deleteSource=true) the object is not deleted at the old location. This "paranoic" check + * is added after it was observed that S3 may report a successful transfer, and the object is not found at the target * location. */ - private void selfCheckingMove( + private void selfCheckingTransfer( String s3Bucket, String targetS3Bucket, String s3Path, @@ -139,7 +139,7 @@ private void selfCheckingMove( ) throws IOException, SegmentLoadingException { if (s3Bucket.equals(targetS3Bucket) && s3Path.equals(targetS3Path)) { - log().info("No need to move file[s3://%s/%s] onto itself", s3Bucket, s3Path); + log().info("No need to transfer file[s3://%s/%s] onto itself", s3Bucket, s3Path); return; } final ServerSideEncryptingAmazonS3 s3Client = this.s3ClientSupplier.get(); @@ -162,13 +162,13 @@ private void selfCheckingMove( StorageClass.fromValue(StringUtils.toUpperCase(objectSummary.getStorageClass())).equals(StorageClass.Glacier)) { throw new AmazonServiceException( StringUtils.format( - "Cannot move file[s3://%s/%s] of storage class glacier, skipping.", + "Cannot transfer file[s3://%s/%s] of storage class glacier, skipping.", s3Bucket, s3Path ) ); } else { - log().info("Moving file %s", copyMsg); + log().info("Transfering file %s", copyMsg); final CopyObjectRequest copyRequest = new CopyObjectRequest(s3Bucket, s3Path, targetS3Bucket, targetS3Path); if (!config.getDisableAcl()) { copyRequest.setAccessControlList(S3Utils.grantFullControlToBucketOwner(s3Client, targetS3Bucket)); @@ -181,15 +181,16 @@ private void selfCheckingMove( ); } if (deleteSource) { + log().info("Beginning deletion of [s3://%s/%s]", s3Bucket, s3Path); deleteWithRetriesSilent(s3Bucket, s3Path); } - log().debug("Finished moving file %s", copyMsg); + log().debug("Finished transfering file %s", copyMsg); } } else { // ensure object exists in target location if (s3Client.doesObjectExist(targetS3Bucket, targetS3Path)) { log().info( - "Not moving file [s3://%s/%s], already present in target location [s3://%s/%s]", + "Not transfering file [s3://%s/%s], already present in target location [s3://%s/%s]", s3Bucket, s3Path, targetS3Bucket, @@ -197,7 +198,7 @@ private void selfCheckingMove( ); } else { throw new SegmentLoadingException( - "Unable to move file %s, not present in either source or target location", + "Unable to transfer file %s, not present in either source or target location", copyMsg ); } diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3StorageDruidModule.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3StorageDruidModule.java index 2df1cb5179bb..ae58dcf823e2 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3StorageDruidModule.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3StorageDruidModule.java @@ -100,6 +100,10 @@ public void configure(Binder binder) .addBinding(SCHEME_S3_ZIP) .to(S3DataSegmentKiller.class) .in(LazySingleton.class); + Binders.dataSegmentCopierBinder(binder) + .addBinding(SCHEME_S3_ZIP) + .to(S3DataSegmentCopier.class) + .in(LazySingleton.class); Binders.dataSegmentMoverBinder(binder) .addBinding(SCHEME_S3_ZIP) .to(S3DataSegmentMover.class) diff --git a/processing/src/main/java/org/apache/druid/guice/Binders.java b/processing/src/main/java/org/apache/druid/guice/Binders.java index a2665ad27ead..5d61872a9a4a 100644 --- a/processing/src/main/java/org/apache/druid/guice/Binders.java +++ b/processing/src/main/java/org/apache/druid/guice/Binders.java @@ -24,6 +24,7 @@ import com.google.inject.multibindings.MapBinder; import org.apache.druid.guice.annotations.PublicApi; import org.apache.druid.segment.loading.DataSegmentArchiver; +import org.apache.druid.segment.loading.DataSegmentCopier; import org.apache.druid.segment.loading.DataSegmentKiller; import org.apache.druid.segment.loading.DataSegmentMover; import org.apache.druid.segment.loading.DataSegmentPusher; @@ -45,6 +46,11 @@ public static MapBinder dataSegmentMoverBinder(Binder return MapBinder.newMapBinder(binder, String.class, DataSegmentMover.class); } + public static MapBinder dataSegmentCopierBinder(Binder binder) + { + return MapBinder.newMapBinder(binder, String.class, DataSegmentCopier.class); + } + public static MapBinder dataSegmentArchiverBinder(Binder binder) { return MapBinder.newMapBinder(binder, String.class, DataSegmentArchiver.class); diff --git a/processing/src/main/java/org/apache/druid/segment/loading/DataSegmentCopier.java b/processing/src/main/java/org/apache/druid/segment/loading/DataSegmentCopier.java index 41a0256d4c1c..8862684c75a9 100644 --- a/processing/src/main/java/org/apache/druid/segment/loading/DataSegmentCopier.java +++ b/processing/src/main/java/org/apache/druid/segment/loading/DataSegmentCopier.java @@ -24,13 +24,6 @@ import java.util.Map; -/** - * DataSegmentCopier knows how to copy the segment location from one to another. - * Since any implementation of DataSegmentCopier is initialized when an ingestion job starts - * if a deep storage extension is loaded even when that deep storage is actually not used, - * implementations should avoid initializing the deep storage client immediately - * but defer it until the deep storage client is actually used. - */ @ExtensionPoint public interface DataSegmentCopier { diff --git a/website/.spelling b/website/.spelling index ed2389c5c9e5..3a0975069104 100644 --- a/website/.spelling +++ b/website/.spelling @@ -1284,6 +1284,7 @@ ComplexMetrics DataSegmentArchiver DataSegmentKiller DataSegmentMover +DataSegmentCopier URIDataPuller DataSegmentPusher DruidModule