Skip to content

Commit

Permalink
User proper write-once semantics for GCS repository (#30438)
Browse files Browse the repository at this point in the history
There's no need for an extra blobExists() call when writing a blob to the GCS service. GCS provides
an option (with stronger consistency guarantees) on the insert method that guarantees that the
blob that's uploaded does not already exist.

Relates to #19749
  • Loading branch information
ywelsch authored May 17, 2018
1 parent 399489b commit b57d21b
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,6 @@ public InputStream readBlob(String blobName) throws IOException {

@Override
public void writeBlob(String blobName, InputStream inputStream, long blobSize) throws IOException {
if (blobExists(blobName)) {
throw new FileAlreadyExistsException("blob [" + blobName + "] already exists, cannot overwrite");
}
blobStore.writeBlob(buildKey(blobName), inputStream, blobSize);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.Storage.BlobListOption;
import com.google.cloud.storage.Storage.CopyRequest;
import com.google.cloud.storage.StorageException;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobMetaData;
Expand All @@ -47,12 +48,15 @@
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.NoSuchFileException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static java.net.HttpURLConnection.HTTP_PRECON_FAILED;

class GoogleCloudStorageBlobStore extends AbstractComponent implements BlobStore {

// The recommended maximum size of a blob that should be uploaded in a single
Expand Down Expand Up @@ -204,24 +208,32 @@ void writeBlob(String blobName, InputStream inputStream, long blobSize) throws I
* @param inputStream the stream containing the blob data
*/
private void writeBlobResumable(BlobInfo blobInfo, InputStream inputStream) throws IOException {
final WriteChannel writeChannel = SocketAccess.doPrivilegedIOException(() -> storage.writer(blobInfo));
Streams.copy(inputStream, Channels.newOutputStream(new WritableByteChannel() {
@Override
public boolean isOpen() {
return writeChannel.isOpen();
}
try {
final WriteChannel writeChannel = SocketAccess.doPrivilegedIOException(
() -> storage.writer(blobInfo, Storage.BlobWriteOption.doesNotExist()));
Streams.copy(inputStream, Channels.newOutputStream(new WritableByteChannel() {
@Override
public boolean isOpen() {
return writeChannel.isOpen();
}

@Override
public void close() throws IOException {
SocketAccess.doPrivilegedVoidIOException(writeChannel::close);
}
@Override
public void close() throws IOException {
SocketAccess.doPrivilegedVoidIOException(writeChannel::close);
}

@SuppressForbidden(reason = "Channel is based of a socket not a file")
@Override
public int write(ByteBuffer src) throws IOException {
return SocketAccess.doPrivilegedIOException(() -> writeChannel.write(src));
@SuppressForbidden(reason = "Channel is based of a socket not a file")
@Override
public int write(ByteBuffer src) throws IOException {
return SocketAccess.doPrivilegedIOException(() -> writeChannel.write(src));
}
}));
} catch (StorageException se) {
if (se.getCode() == HTTP_PRECON_FAILED) {
throw new FileAlreadyExistsException(blobInfo.getBlobId().getName(), null, se.getMessage());
}
}));
throw se;
}
}

/**
Expand All @@ -238,7 +250,17 @@ private void writeBlobMultipart(BlobInfo blobInfo, InputStream inputStream, long
assert blobSize <= LARGE_BLOB_THRESHOLD_BYTE_SIZE : "large blob uploads should use the resumable upload method";
final ByteArrayOutputStream baos = new ByteArrayOutputStream(Math.toIntExact(blobSize));
Streams.copy(inputStream, baos);
SocketAccess.doPrivilegedVoidIOException(() -> storage.create(blobInfo, baos.toByteArray()));
SocketAccess.doPrivilegedVoidIOException(
() -> {
try {
storage.create(blobInfo, baos.toByteArray(), Storage.BlobTargetOption.doesNotExist());
} catch (StorageException se) {
if (se.getCode() == HTTP_PRECON_FAILED) {
throw new FileAlreadyExistsException(blobInfo.getBlobId().getName(), null, se.getMessage());
}
throw se;
}
});
}

/**
Expand Down Expand Up @@ -295,8 +317,8 @@ void deleteBlobs(Collection<String> blobNames) throws IOException {
/**
* Moves a blob within the same bucket
*
* @param sourceBlob name of the blob to move
* @param targetBlob new name of the blob in the same bucket
* @param sourceBlobName name of the blob to move
* @param targetBlobName new name of the blob in the same bucket
*/
void moveBlob(String sourceBlobName, String targetBlobName) throws IOException {
final BlobId sourceBlobId = BlobId.of(bucket, sourceBlobName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* {@link MockStorage} mocks a {@link Storage} client by storing all the blobs
Expand Down Expand Up @@ -113,7 +114,14 @@ public Blob create(BlobInfo blobInfo, byte[] content, BlobTargetOption... option
if (bucketName.equals(blobInfo.getBucket()) == false) {
throw new StorageException(404, "Bucket not found");
}
blobs.put(blobInfo.getName(), content);
if (Stream.of(options).anyMatch(option -> option.equals(BlobTargetOption.doesNotExist()))) {
byte[] existingBytes = blobs.putIfAbsent(blobInfo.getName(), content);
if (existingBytes != null) {
throw new StorageException(412, "Blob already exists");
}
} else {
blobs.put(blobInfo.getName(), content);
}
return get(BlobId.of(blobInfo.getBucket(), blobInfo.getName()));
}

Expand Down Expand Up @@ -243,9 +251,16 @@ public boolean isOpen() {
}

@Override
public void close() throws IOException {
public void close() {
IOUtils.closeWhileHandlingException(writableByteChannel);
blobs.put(blobInfo.getName(), output.toByteArray());
if (Stream.of(options).anyMatch(option -> option.equals(BlobWriteOption.doesNotExist()))) {
byte[] existingBytes = blobs.putIfAbsent(blobInfo.getName(), output.toByteArray());
if (existingBytes != null) {
throw new StorageException(412, "Blob already exists");
}
} else {
blobs.put(blobInfo.getName(), output.toByteArray());
}
}
};
}
Expand Down

0 comments on commit b57d21b

Please sign in to comment.