Skip to content

Commit

Permalink
Add CRC64 checksum check
Browse files Browse the repository at this point in the history
### What changes are proposed in this pull request?

![image](https://github.com/user-attachments/assets/9230d1fb-c522-4d21-b382-9e06dc9d83ee)

### Why are the changes needed?

To check the file integrity

### Does this PR introduce any user facing changes?

Add a crc64 check cli
			pr-link: #18682
			change-id: cid-4eb62e603591f3235b92e4191c5b511b534f6fc3
  • Loading branch information
elega authored Sep 5, 2024
1 parent 9e373f0 commit 1a3fbc9
Show file tree
Hide file tree
Showing 21 changed files with 1,133 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import alluxio.grpc.ClearMetricsResponse;
import alluxio.grpc.CreateLocalBlockRequest;
import alluxio.grpc.CreateLocalBlockResponse;
import alluxio.grpc.GetBlockChecksumRequest;
import alluxio.grpc.GetBlockChecksumResponse;
import alluxio.grpc.GrpcServerAddress;
import alluxio.grpc.LoadRequest;
import alluxio.grpc.LoadResponse;
Expand Down Expand Up @@ -161,4 +163,16 @@ StreamObserver<OpenLocalBlockRequest> openLocalBlock(
* @throws StatusRuntimeException if any error occurs
*/
ListenableFuture<LoadResponse> load(LoadRequest request);

/**
* Calculates the checksum of a block (currently CRC64).
* @param request the request
* @return the response
*/
ListenableFuture<GetBlockChecksumResponse> getBlockChecksum(GetBlockChecksumRequest request);

/**
* @return the grpc server address
*/
GrpcServerAddress getAddress();
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import alluxio.grpc.DataMessageMarshaller;
import alluxio.grpc.DataMessageMarshallerProvider;
import alluxio.grpc.FreeWorkerRequest;
import alluxio.grpc.GetBlockChecksumRequest;
import alluxio.grpc.GetBlockChecksumResponse;
import alluxio.grpc.GrpcChannel;
import alluxio.grpc.GrpcChannelBuilder;
import alluxio.grpc.GrpcNetworkGroup;
Expand Down Expand Up @@ -248,4 +250,15 @@ public void freeWorker() {
public ListenableFuture<LoadResponse> load(LoadRequest request) {
return mRpcFutureStub.load(request);
}

@Override
public ListenableFuture<GetBlockChecksumResponse> getBlockChecksum(
GetBlockChecksumRequest request) {
return mRpcFutureStub.getBlockChecksum(request);
}

@Override
public GrpcServerAddress getAddress() {
return mAddress;
}
}
1 change: 1 addition & 0 deletions core/common/src/main/java/alluxio/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ public final class Constants {

/* xAttr keys */
public static final String ETAG_XATTR_KEY = "s3_etag";
public static final String CRC64_KEY = "crc64";

private Constants() {} // prevent instantiation
}
42 changes: 42 additions & 0 deletions core/common/src/main/java/alluxio/conf/PropertyKey.java
Original file line number Diff line number Diff line change
Expand Up @@ -4297,6 +4297,31 @@ public String toString() {
.setDescription("The implementation of LocalBlockStore that can be instantiated.")
.setScope(Scope.WORKER)
.build();
public static final PropertyKey WORKER_BLOCK_CHECKSUM_CALCULATION_THROUGHPUT_THRESHOLD =
dataSizeBuilder(Name.WORKER_BLOCK_CHECKSUM_CALCULATION_THROUGHPUT_THRESHOLD)
.setDefaultValue(-1)
.setDescription("The throughput threshold per second to trigger the rate limit. "
+ "-1 means no limitation. The minimum precision is 1MB.")
.setConsistencyCheckLevel(ConsistencyCheckLevel.IGNORE)
.setScope(Scope.WORKER)
.build();
public static final PropertyKey WORKER_BLOCK_CHECKSUM_CALCULATION_THREAD_POOL_SIZE =
intBuilder(Name.WORKER_BLOCK_CHECKSUM_CALCULATION_THREAD_POOL_SIZE)
.setDefaultValue(16)
.setDescription("The thread pool size for the worker block checksum calculation. "
+ "Each thread will take up at most 8MB (the chunksize) memory")
.setConsistencyCheckLevel(ConsistencyCheckLevel.IGNORE)
.setScope(Scope.WORKER)
.build();
public static final PropertyKey WORKER_BLOCK_CHECKSUM_CALCULATION_USE_BUFFER_POOL =
booleanBuilder(Name.WORKER_BLOCK_CHECKSUM_CALCULATION_USE_BUFFER_POOL)
.setDefaultValue(false)
.setDescription("If enabled, will use buffer pool for reading the file during "
+ "checksum calculation. This reduces GC but will introduce permanent "
+ "memory consumption.")
.setConsistencyCheckLevel(ConsistencyCheckLevel.IGNORE)
.setScope(Scope.WORKER)
.build();
public static final PropertyKey WORKER_CONTAINER_HOSTNAME =
stringBuilder(Name.WORKER_CONTAINER_HOSTNAME)
.setDescription("The container hostname if worker is running in a container.")
Expand Down Expand Up @@ -6411,6 +6436,15 @@ public String toString() {
.setConsistencyCheckLevel(ConsistencyCheckLevel.WARN)
.setScope(Scope.CLIENT)
.build();
public static final PropertyKey USER_CLIENT_CHECKSUM_CALCULATION_BATCH_SIZE =
intBuilder(Name.USER_CLIENT_CHECKSUM_CALCULATION_BATCH_SIZE)
.setDefaultValue(Integer.MAX_VALUE)
.setDescription("The batch size of block ids for the checksum calculation rpc. "
+ "by default all block ids are in a single batch. Reduce this value if "
+ "you see too many timeouts.")
.setConsistencyCheckLevel(ConsistencyCheckLevel.IGNORE)
.setScope(Scope.CLIENT)
.build();

public static final PropertyKey USER_FILE_WRITE_TYPE_DEFAULT =
enumBuilder(Name.USER_FILE_WRITE_TYPE_DEFAULT, WriteType.class)
Expand Down Expand Up @@ -8710,6 +8744,12 @@ public static final class Name {
public static final String WORKER_BLOCK_HEARTBEAT_TIMEOUT_MS =
"alluxio.worker.block.heartbeat.timeout";
public static final String WORKER_BLOCK_STORE_TYPE = "alluxio.worker.block.store.type";
public static final String WORKER_BLOCK_CHECKSUM_CALCULATION_THROUGHPUT_THRESHOLD =
"alluxio.worker.block.checksum.calculation.throughput.threshold";
public static final String WORKER_BLOCK_CHECKSUM_CALCULATION_THREAD_POOL_SIZE =
"alluxio.worker.block.checksum.calculation.thread.pool.size";
public static final String WORKER_BLOCK_CHECKSUM_CALCULATION_USE_BUFFER_POOL =
"alluxio.worker.block.checksum.calculation.use.buffer.pool";
public static final String WORKER_CONTAINER_HOSTNAME =
"alluxio.worker.container.hostname";
public static final String WORKER_DATA_FOLDER = "alluxio.worker.data.folder";
Expand Down Expand Up @@ -9102,6 +9142,8 @@ public static final class Name {
"alluxio.user.client.cache.include.mtime";
public static final String USER_CLIENT_REPORT_VERSION_ENABLED =
"alluxio.user.client.report.version.enabled";
public static final String USER_CLIENT_CHECKSUM_CALCULATION_BATCH_SIZE =
"alluxio.useer.client.checksum.calculation.batch.size";
public static final String USER_CONF_CLUSTER_DEFAULT_ENABLED =
"alluxio.user.conf.cluster.default.enabled";
public static final String USER_CONF_SYNC_INTERVAL = "alluxio.user.conf.sync.interval";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* The Alluxio Open Foundation licenses this work under the Apache License, version 2.0
* (the "License"). You may not use this work except in compliance with the License, which is
* available at www.apache.org/licenses/LICENSE-2.0
*
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied, as more fully set forth in the License.
*
* See the NOTICE file distributed with this work for information regarding copyright ownership.
*/

package alluxio.network.protocol.databuffer;

import alluxio.exception.runtime.ResourceExhaustedRuntimeException;
import alluxio.retry.RetryPolicy;

import java.nio.ByteBuffer;
import java.util.LinkedList;
import java.util.Map;
import java.util.TreeMap;

/**
* Heap buffer pool.
*/
public class NioHeapBufferPool {
private static final TreeMap<Integer, LinkedList<ByteBuffer>> BUF_POOL = new TreeMap<>();

/**
* @param length
* @return buffer
*/
public static synchronized ByteBuffer acquire(int length) {
Map.Entry<Integer, LinkedList<ByteBuffer>> entry = BUF_POOL.ceilingEntry(length);
if (entry == null || entry.getValue().size() == 0) {
return ByteBuffer.allocate(length);
}
ByteBuffer buffer = entry.getValue().pop();
buffer.clear();
// the buffer probably is larger than the amount of capacity being requested
// need to set the limit explicitly
buffer.limit(length);
return buffer;
}

/**
* @param length
* @param policy the retry policy to use
* @return buffer
*/
public static synchronized ByteBuffer acquire(int length, RetryPolicy policy) {
Error cause = null;
while (policy.attempt()) {
try {
return acquire(length);
} catch (OutOfMemoryError error) {
cause = error;
}
}
throw new ResourceExhaustedRuntimeException("Not enough heap memory allocated to buffer",
cause, false);
}

/**
* @param buffer
*/
public static synchronized void release(ByteBuffer buffer) {
LinkedList<ByteBuffer> bufList = BUF_POOL.get(buffer.capacity());
if (bufList == null) {
bufList = new LinkedList<>();
BUF_POOL.put(buffer.capacity(), bufList);
}
bufList.push(buffer);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
package alluxio.underfs;

import alluxio.AlluxioURI;
import alluxio.Constants;
import alluxio.collections.Pair;
import alluxio.conf.AlluxioConfiguration;
import alluxio.conf.PropertyKey;
Expand Down Expand Up @@ -54,6 +55,7 @@
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Optional;
import java.util.TreeMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -116,20 +118,28 @@ protected static class ObjectStatus {
/** Last modified epoch time in ms, or null if it is not available. */
private final Long mLastModifiedTimeMs;
private final String mName;
private final Optional<String> mCrc64Checksum;

public ObjectStatus(String name, String contentHash, long contentLength,
@Nullable Long lastModifiedTimeMs) {
@Nullable Long lastModifiedTimeMs, @Nullable String crc64) {
mContentHash = contentHash == null ? UfsFileStatus.INVALID_CONTENT_HASH : contentHash;
mContentLength = contentLength;
mLastModifiedTimeMs = lastModifiedTimeMs;
mName = name;
mCrc64Checksum = Optional.ofNullable(crc64);
}

public ObjectStatus(String name, String contentHash, long contentLength,
@Nullable Long lastModifiedTimeMs) {
this(name, contentHash, contentLength, lastModifiedTimeMs, null);
}

public ObjectStatus(String name) {
mContentHash = UfsFileStatus.INVALID_CONTENT_HASH;
mContentLength = INVALID_CONTENT_LENGTH;
mLastModifiedTimeMs = null;
mName = name;
mCrc64Checksum = Optional.empty();
}

/**
Expand Down Expand Up @@ -166,6 +176,10 @@ public Long getLastModifiedTimeMs() {
public String getName() {
return mName;
}

public Optional<String> getCrc64Checksum() {
return mCrc64Checksum;
}
}

/**
Expand Down Expand Up @@ -527,10 +541,17 @@ public long getSpace(String path, SpaceType type) throws IOException {
public UfsFileStatus getFileStatus(String path, GetFileStatusOptions options) throws IOException {
ObjectStatus details = getObjectStatus(stripPrefixIfPresent(path));
if (details != null) {
Map<String, byte[]> xAttr = null;
if (details.getCrc64Checksum().isPresent()) {
xAttr = new HashMap<>();
xAttr.put(Constants.CRC64_KEY, details.getCrc64Checksum().get().getBytes());
}
ObjectPermissions permissions = getPermissions();
return new UfsFileStatus(path, details.getContentHash(), details.getContentLength(),
return
new UfsFileStatus(path, details.getContentHash(), details.getContentLength(),
details.getLastModifiedTimeMs(), permissions.getOwner(), permissions.getGroup(),
permissions.getMode(), mUfsConf.getBytes(PropertyKey.USER_BLOCK_SIZE_BYTES_DEFAULT));
permissions.getMode(), xAttr,
mUfsConf.getBytes(PropertyKey.USER_BLOCK_SIZE_BYTES_DEFAULT));
} else {
LOG.debug("Error fetching file status, assuming file {} does not exist", path);
throw new FileNotFoundException("Failed to fetch file status " + path);
Expand All @@ -548,11 +569,18 @@ public UfsStatus getStatus(String path) throws IOException {
return getDirectoryStatus(path);
}
ObjectStatus details = getObjectStatus(stripPrefixIfPresent(path));
ObjectPermissions permissions = getPermissions();
if (details != null) {
ObjectPermissions permissions = getPermissions();
return new UfsFileStatus(path, details.getContentHash(), details.getContentLength(),
details.getLastModifiedTimeMs(), permissions.getOwner(), permissions.getGroup(),
permissions.getMode(), mUfsConf.getBytes(PropertyKey.USER_BLOCK_SIZE_BYTES_DEFAULT));
Map<String, byte[]> xAttr = null;
if (details.getCrc64Checksum().isPresent()) {
xAttr = new HashMap<>();
xAttr.put(Constants.CRC64_KEY, details.getCrc64Checksum().get().getBytes());
}
return
new UfsFileStatus(path, details.getContentHash(), details.getContentLength(),
details.getLastModifiedTimeMs(), permissions.getOwner(), permissions.getGroup(),
permissions.getMode(), xAttr,
mUfsConf.getBytes(PropertyKey.USER_BLOCK_SIZE_BYTES_DEFAULT));
}
return getDirectoryStatus(path);
}
Expand Down
Loading

0 comments on commit 1a3fbc9

Please sign in to comment.