Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Load content hash as the etag of the object when the UFS is S3 in 2.8 #18438

Open
wants to merge 6 commits into
base: branch-2.8
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.netty.buffer.ByteBuf;

import java.io.IOException;
import java.util.Optional;
import javax.annotation.concurrent.NotThreadSafe;

/**
Expand Down Expand Up @@ -90,6 +91,11 @@ public int chunkSize() {
return mChunkSize;
}

@Override
public Optional<String> getUfsContentHash() {
return Optional.empty();
}

@Override
public void writeChunk(final ByteBuf buf) throws IOException {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import java.io.Closeable;
import java.io.IOException;
import java.util.Optional;
import javax.annotation.concurrent.ThreadSafe;

/**
Expand Down Expand Up @@ -97,6 +98,12 @@ public static DataWriter create(FileSystemContext context, long blockId, long bl
}
}

/**
* @return the content hash of the file if it is written to the UFS. Will only
* return a non-empty value after the data writer has been closed.
*/
Optional<String> getUfsContentHash();

/**
* Writes a chunk. This method takes the ownership of this chunk even if it fails to write
* the chunk.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -250,14 +251,20 @@ public void cancel() {
* Wait for server to complete the inbound stream.
*
* @param timeoutMs maximum time to wait for server response
* @return the last response of the stream
*/
public void waitForComplete(long timeoutMs) throws IOException {
public Optional<ResT> waitForComplete(long timeoutMs) throws IOException {
if (mCompleted || mCanceled) {
return;
return Optional.empty();
}
while (receive(timeoutMs) != null) {
ResT prevResponse;
ResT response = null;
do {
// wait until inbound stream is closed from server.
}
prevResponse = response;
response = receive(timeoutMs);
} while (response != null);
return Optional.ofNullable(prevResponse);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.grpc.stub.StreamObserver;

import java.io.IOException;
import java.util.Optional;
import java.util.function.Function;
import javax.annotation.concurrent.NotThreadSafe;

Expand Down Expand Up @@ -105,17 +106,21 @@ public void sendDataMessage(DataMessage<ReqT, DataBuffer> message, long timeoutM
}

@Override
public void waitForComplete(long timeoutMs) throws IOException {
public Optional<ResT> waitForComplete(long timeoutMs) throws IOException {
if (mResponseMarshaller == null) {
super.waitForComplete(timeoutMs);
return;
return super.waitForComplete(timeoutMs);
}
// loop until the last response is received, whose result will be returned
DataMessage<ResT, DataBuffer> message;
DataMessage<ResT, DataBuffer> prevMessage = null;
while (!isCanceled() && (message = receiveDataMessage(timeoutMs)) != null) {
if (message.getBuffer() != null) {
message.getBuffer().release();
if (prevMessage != null && prevMessage.getBuffer() != null) {
prevMessage.getBuffer().release();
}
prevMessage = message;
}
super.waitForComplete(timeoutMs);
// note that the combineData call is responsible for releasing the buffer of prevMessage
ResT result = mResponseMarshaller.combineData(prevMessage);
return Optional.ofNullable(super.waitForComplete(timeoutMs).orElse(result));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Optional;
import javax.annotation.concurrent.NotThreadSafe;

/**
Expand Down Expand Up @@ -75,6 +76,9 @@ public final class GrpcDataWriter implements DataWriter {
private final GrpcBlockingStream<WriteRequest, WriteResponse> mStream;
private final WriteRequestMarshaller mMarshaller;

/** The content hash resulting from the write operation if one is available. */
private String mContentHash = null;

/**
* The next pos to queue to the buffer.
*/
Expand Down Expand Up @@ -186,6 +190,11 @@ public long pos() {
return mPosToQueue;
}

@Override
public Optional<String> getUfsContentHash() {
return Optional.ofNullable(mContentHash);
}

@Override
public void writeChunk(final ByteBuf buf) throws IOException {
mPosToQueue += buf.readableBytes();
Expand Down Expand Up @@ -247,6 +256,9 @@ public void flush() throws IOException {
"Flush request %s to worker %s is not acked before complete.", writeRequest, mAddress));
}
posWritten = response.getOffset();
if (response.hasContentHash()) {
mContentHash = response.getContentHash();
}
} while (mPosToQueue != posWritten);
}

Expand All @@ -257,7 +269,9 @@ public void close() throws IOException {
return;
}
mStream.close();
mStream.waitForComplete(mWriterCloseTimeoutMs);
mStream.waitForComplete(mWriterCloseTimeoutMs)
.ifPresent(writeResponse -> mContentHash = writeResponse.hasContentHash()
? writeResponse.getContentHash() : null);
} finally {
mClient.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Optional;
import javax.annotation.concurrent.NotThreadSafe;

/**
Expand Down Expand Up @@ -122,6 +123,11 @@ public int chunkSize() {
return (int) mChunkSize;
}

@Override
public Optional<String> getUfsContentHash() {
return Optional.empty();
}

@Override
public void writeChunk(final ByteBuf buf) throws IOException {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Optional;
import javax.annotation.concurrent.NotThreadSafe;

/**
Expand Down Expand Up @@ -82,6 +83,11 @@ public static UfsFallbackLocalFileDataWriter create(FileSystemContext context,
mIsWritingToLocal = mLocalFileDataWriter != null;
}

@Override
public Optional<String> getUfsContentHash() {
return mGrpcDataWriter.getUfsContentHash();
}

@Override
public void writeChunk(ByteBuf chunk) throws IOException {
if (mIsWritingToLocal) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
@NotThreadSafe
public class UnderFileSystemFileOutStream extends BlockOutStream {
private static final int ID_UNUSED = -1;
private final DataWriter mDataWriter;

/**
* Creates an instance of {@link UnderFileSystemFileOutStream} that writes to a UFS file.
Expand All @@ -52,6 +53,14 @@ public static UnderFileSystemFileOutStream create(FileSystemContext context,
*/
protected UnderFileSystemFileOutStream(DataWriter dataWriter, WorkerNetAddress address) {
super(dataWriter, Long.MAX_VALUE, address);
mDataWriter = dataWriter;
}

/**
* @return the data writer for the stream
*/
public DataWriter getDataWriter() {
return mDataWriter;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ public void close() throws IOException {
} else {
mUnderStorageOutputStream.close();
optionsBuilder.setUfsLength(mBytesWritten);
mUnderStorageOutputStream.getDataWriter().getUfsContentHash().ifPresent(
optionsBuilder::setContentHash);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Optional;

/**
* A {@link DataWriter} which writes data to a bytebuffer.
Expand All @@ -26,6 +27,11 @@ public TestDataWriter(ByteBuffer buffer) {
mBuffer = buffer;
}

@Override
public Optional<String> getUfsContentHash() {
return Optional.empty();
}

@Override
public void writeChunk(ByteBuf chunk) throws IOException {
try {
Expand Down
3 changes: 3 additions & 0 deletions core/common/src/main/java/alluxio/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -228,5 +228,8 @@ public final class Constants {
public static final String MEDIUM_HDD = "HDD";
public static final String MEDIUM_SSD = "SSD";

/* xAttr keys */
public static final String ETAG_XATTR_KEY = "s3_etag";

private Constants() {} // prevent instantiation
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.io.OutputStream;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;

/**
* Forwarder for {@link UnderFileSystem} objects that works through with ForkJoinPool's
Expand Down Expand Up @@ -288,6 +289,11 @@ public Fingerprint getParsedFingerprint(String path) {
return mUfs.getParsedFingerprint(path);
}

@Override
public Fingerprint getParsedFingerprint(String path, @Nullable String contentHash) {
return mUfs.getParsedFingerprint(path, contentHash);
}

@Override
public UfsMode getOperationMode(Map<String, UfsMode> physicalUfsState) {
return mUfs.getOperationMode(physicalUfsState);
Expand Down
Loading
Loading