Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add datasegment copier interface and s3 impl #17430

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/development/modules.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ In addition to `DataSegmentPusher` and `URIDataPuller`, you can also bind:

* [`DataSegmentKiller`](https://github.com/apache/druid/blob/master/processing/src/main/java/org/apache/druid/segment/loading/DataSegmentKiller.java): Removes segments, used as part of the Kill Task to delete unused segments, i.e. perform garbage collection of segments that are either superseded by newer versions or that have been dropped from the cluster.
* [`DataSegmentMover`](https://github.com/apache/druid/blob/master/processing/src/main/java/org/apache/druid/segment/loading/DataSegmentMover.java): Allow migrating segments from one place to another, currently this is only used as part of the MoveTask to move unused segments to a different S3 bucket or prefix, typically to reduce storage costs of unused data (e.g. move to glacier or cheaper storage)
* [`DataSegmentCopier`](https://github.com/apache/druid/blob/master/processing/src/main/java/org/apache/druid/segment/loading/DataSegmentCopier.java): Allow copying segments from one place to another, available for use by extensions, etc.
* [`DataSegmentArchiver`](https://github.com/apache/druid/blob/master/processing/src/main/java/org/apache/druid/segment/loading/DataSegmentArchiver.java): Just a wrapper around Mover, but comes with a preconfigured target bucket/path, so it doesn't have to be specified at runtime as part of the ArchiveTask.

### Validating your deep storage implementation
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.storage.s3;

import com.google.common.base.Supplier;
import com.google.inject.Inject;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.segment.loading.DataSegmentCopier;
import org.apache.druid.segment.loading.SegmentLoadingException;
import org.apache.druid.timeline.DataSegment;

import java.util.Map;

public class S3DataSegmentCopier extends S3DataSegmentTransferUtility implements DataSegmentCopier
{
private static final Logger log = new Logger(S3DataSegmentCopier.class);

@Inject
public S3DataSegmentCopier(
Supplier<ServerSideEncryptingAmazonS3> s3ClientSupplier,
S3DataSegmentPusherConfig config
)
{
super(config, s3ClientSupplier);
}

@Override
public DataSegment copy(DataSegment segment, Map<String, Object> targetLoadSpec) throws SegmentLoadingException
{
return this.transfer(segment, targetLoadSpec, false);
}

@Override
protected Logger log() {
return log;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,33 +19,16 @@

package org.apache.druid.storage.s3;

import com.amazonaws.AmazonServiceException;
import com.amazonaws.services.s3.model.CopyObjectRequest;
import com.amazonaws.services.s3.model.ListObjectsV2Request;
import com.amazonaws.services.s3.model.ListObjectsV2Result;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.amazonaws.services.s3.model.StorageClass;
import com.google.common.base.Predicate;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.inject.Inject;
import org.apache.druid.java.util.common.IOE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.MapUtils;
import org.apache.druid.java.util.common.RetryUtils;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.segment.loading.DataSegmentMover;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.loading.SegmentLoadingException;
import org.apache.druid.timeline.DataSegment;

import java.io.IOException;
import java.util.Map;

public class S3DataSegmentMover implements DataSegmentMover
public class S3DataSegmentMover extends S3DataSegmentTransferUtility implements DataSegmentMover
{
private static final Logger log = new Logger(S3DataSegmentMover.class);

Expand All @@ -57,207 +40,23 @@ public class S3DataSegmentMover implements DataSegmentMover
*
* See OmniDataSegmentMover for how DataSegmentMovers are initialized.
*/
private final Supplier<ServerSideEncryptingAmazonS3> s3ClientSupplier;
private final S3DataSegmentPusherConfig config;

@Inject
public S3DataSegmentMover(
Supplier<ServerSideEncryptingAmazonS3> s3ClientSupplier,
S3DataSegmentPusherConfig config
)
{
this.s3ClientSupplier = s3ClientSupplier;
this.config = config;
super(config, s3ClientSupplier);
}

@Override
public DataSegment move(DataSegment segment, Map<String, Object> targetLoadSpec) throws SegmentLoadingException
{
try {
Map<String, Object> loadSpec = segment.getLoadSpec();
String s3Bucket = MapUtils.getString(loadSpec, "bucket");
String s3Path = MapUtils.getString(loadSpec, "key");

final String targetS3Bucket = MapUtils.getString(targetLoadSpec, "bucket");
final String targetS3BaseKey = MapUtils.getString(targetLoadSpec, "baseKey");

final String targetS3Path = S3Utils.constructSegmentPath(
targetS3BaseKey,
DataSegmentPusher.getDefaultStorageDir(segment, false)
);

if (targetS3Bucket.isEmpty()) {
throw new SegmentLoadingException("Target S3 bucket is not specified");
}
if (targetS3Path.isEmpty()) {
throw new SegmentLoadingException("Target S3 baseKey is not specified");
}

safeMove(s3Bucket, s3Path, targetS3Bucket, targetS3Path);

return segment.withLoadSpec(
ImmutableMap.<String, Object>builder()
.putAll(
Maps.filterKeys(
loadSpec,
new Predicate<String>()
{
@Override
public boolean apply(String input)
{
return !("bucket".equals(input) || "key".equals(input));
}
}
)
)
.put("bucket", targetS3Bucket)
.put("key", targetS3Path)
.build()
);
}
catch (AmazonServiceException e) {
throw new SegmentLoadingException(e, "Unable to move segment[%s]: [%s]", segment.getId(), e);
}
}

private void safeMove(
final String s3Bucket,
final String s3Path,
final String targetS3Bucket,
final String targetS3Path
) throws SegmentLoadingException
{
try {
S3Utils.retryS3Operation(
() -> {
final String copyMsg = StringUtils.format(
"[s3://%s/%s] to [s3://%s/%s]",
s3Bucket,
s3Path,
targetS3Bucket,
targetS3Path
);
try {
selfCheckingMove(s3Bucket, targetS3Bucket, s3Path, targetS3Path, copyMsg);
return null;
}
catch (AmazonServiceException | IOException | SegmentLoadingException e) {
log.info(e, "Error while trying to move " + copyMsg);
throw e;
}
}
);
}
catch (Exception e) {
Throwables.propagateIfInstanceOf(e, AmazonServiceException.class);
Throwables.propagateIfInstanceOf(e, SegmentLoadingException.class);
throw new RuntimeException(e);
}
return this.transfer(segment, targetLoadSpec, true);
}

/**
* Copies an object and after that checks that the object is present at the target location, via a separate API call.
* If it is not, an exception is thrown, and the object is not deleted at the old location. This "paranoic" check
* is added after it was observed that S3 may report a successful move, and the object is not found at the target
* location.
*/
private void selfCheckingMove(
String s3Bucket,
String targetS3Bucket,
String s3Path,
String targetS3Path,
String copyMsg
) throws IOException, SegmentLoadingException
{
if (s3Bucket.equals(targetS3Bucket) && s3Path.equals(targetS3Path)) {
log.info("No need to move file[s3://%s/%s] onto itself", s3Bucket, s3Path);
return;
}
final ServerSideEncryptingAmazonS3 s3Client = this.s3ClientSupplier.get();
if (s3Client.doesObjectExist(s3Bucket, s3Path)) {
final ListObjectsV2Result listResult = s3Client.listObjectsV2(
new ListObjectsV2Request()
.withBucketName(s3Bucket)
.withPrefix(s3Path)
.withMaxKeys(1)
);
// Using getObjectSummaries().size() instead of getKeyCount as, in some cases
// it is observed that even though the getObjectSummaries returns some data
// keyCount is still zero.
if (listResult.getObjectSummaries().size() == 0) {
// should never happen
throw new ISE("Unable to list object [s3://%s/%s]", s3Bucket, s3Path);
}
final S3ObjectSummary objectSummary = listResult.getObjectSummaries().get(0);
if (objectSummary.getStorageClass() != null &&
StorageClass.fromValue(StringUtils.toUpperCase(objectSummary.getStorageClass())).equals(StorageClass.Glacier)) {
throw new AmazonServiceException(
StringUtils.format(
"Cannot move file[s3://%s/%s] of storage class glacier, skipping.",
s3Bucket,
s3Path
)
);
} else {
log.info("Moving file %s", copyMsg);
final CopyObjectRequest copyRequest = new CopyObjectRequest(s3Bucket, s3Path, targetS3Bucket, targetS3Path);
if (!config.getDisableAcl()) {
copyRequest.setAccessControlList(S3Utils.grantFullControlToBucketOwner(s3Client, targetS3Bucket));
}
s3Client.copyObject(copyRequest);
if (!s3Client.doesObjectExist(targetS3Bucket, targetS3Path)) {
throw new IOE(
"After copy was reported as successful the file doesn't exist in the target location [%s]",
copyMsg
);
}
deleteWithRetriesSilent(s3Bucket, s3Path);
log.debug("Finished moving file %s", copyMsg);
}
} else {
// ensure object exists in target location
if (s3Client.doesObjectExist(targetS3Bucket, targetS3Path)) {
log.info(
"Not moving file [s3://%s/%s], already present in target location [s3://%s/%s]",
s3Bucket,
s3Path,
targetS3Bucket,
targetS3Path
);
} else {
throw new SegmentLoadingException(
"Unable to move file %s, not present in either source or target location",
copyMsg
);
}
}
}

private void deleteWithRetriesSilent(final String s3Bucket, final String s3Path)
{
try {
deleteWithRetries(s3Bucket, s3Path);
}
catch (Exception e) {
log.error(e, "Failed to delete file [s3://%s/%s], giving up", s3Bucket, s3Path);
}
}

private void deleteWithRetries(final String s3Bucket, final String s3Path) throws Exception
{
RetryUtils.retry(
() -> {
try {
s3ClientSupplier.get().deleteObject(s3Bucket, s3Path);
return null;
}
catch (Exception e) {
log.info(e, "Error while trying to delete [s3://%s/%s]", s3Bucket, s3Path);
throw e;
}
},
S3Utils.S3RETRY,
3
);
@Override
protected Logger log() {
return log;
}
}
Loading
Loading