Skip to content

Commit

Permalink
[ENG-17094] Change implicit lock provider lock key scheme
Browse files Browse the repository at this point in the history
  • Loading branch information
Davis-Zhang-Onehouse committed Nov 14, 2024
1 parent 3a57591 commit fbff054
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,18 @@

import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.config.LockConfiguration;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.util.hash.HashID;
import org.apache.hudi.storage.StorageConfiguration;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hudi.common.util.StringUtils;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;

import javax.annotation.concurrent.NotThreadSafe;

import static org.apache.hudi.aws.utils.S3Utils.s3aToS3;
import static org.apache.hudi.common.util.StringUtils.concatenateWithThreshold;
import static org.apache.hudi.config.DynamoDbBasedLockConfig.MAX_PARTITION_KEY_SIZE_BYTE;

/**
* A DynamoDB based lock.
Expand All @@ -43,27 +42,33 @@
public class DynamoDBBasedImplicitPartitionKeyLockProvider extends DynamoDBBasedLockProviderBase {
protected static final Logger LOG = LoggerFactory.getLogger(DynamoDBBasedImplicitPartitionKeyLockProvider.class);

private final String hudiTableBasePath;

public DynamoDBBasedImplicitPartitionKeyLockProvider(final LockConfiguration lockConfiguration, final StorageConfiguration<?> conf) {
this(lockConfiguration, conf, null);
}

public DynamoDBBasedImplicitPartitionKeyLockProvider(
final LockConfiguration lockConfiguration, final StorageConfiguration<?> conf, DynamoDbClient dynamoDB) {
super(lockConfiguration, conf, dynamoDB);
hudiTableBasePath = s3aToS3(lockConfiguration.getConfig().getString(HoodieCommonConfig.BASE_PATH.key()));
}

public static String generatePartitionKey(String basePath, String tableName) {
String hashPart = '-' + HashID.generateXXHashAsString(basePath, HashID.Size.BITS_64);
String partitionKey = concatenateWithThreshold(tableName, hashPart, MAX_PARTITION_KEY_SIZE_BYTE);
LOG.info(String.format("The DynamoDB partition key of the lock provider for the base path %s is %s", basePath, partitionKey));
@Override
public String getDynamoDBPartitionKey(LockConfiguration lockConfiguration) {
// Ensure consistent format for S3 URI.
String hudiTableBasePathNormalized = s3aToS3(lockConfiguration.getConfig().getString(
HoodieCommonConfig.BASE_PATH.key()));
String partitionKey = HashID.generateXXHashAsString(hudiTableBasePathNormalized, HashID.Size.BITS_64);
LOG.info(String.format("The DynamoDB partition key of the lock provider for the base path %s is %s",
hudiTableBasePathNormalized, partitionKey));
return partitionKey;
}

@Override
public String getDynamoDBPartitionKey(LockConfiguration lockConfiguration) {
String hudiTableBasePath = lockConfiguration.getConfig().getString(HoodieCommonConfig.BASE_PATH.key());
String hudiTableName = lockConfiguration.getConfig().getString(HoodieTableConfig.HOODIE_TABLE_NAME_KEY);
// Ensure consistent format for S3 URI.
return generatePartitionKey(s3aToS3(hudiTableBasePath), hudiTableName);
protected String generateLogSuffixString() {
return StringUtils.join("DynamoDb table = ", tableName,
", partition key = ", dynamoDBPartitionKey,
", hudi table base path = ", hudiTableBasePath);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public abstract class DynamoDBBasedLockProviderBase implements LockProvider<Lock
protected final DynamoDbBasedLockConfig dynamoDbBasedLockConfig;
protected final AmazonDynamoDBLockClient client;
protected final String tableName;
private final String dynamoDBPartitionKey;
protected final String dynamoDBPartitionKey;
protected volatile LockItem lock;

protected DynamoDBBasedLockProviderBase(final LockConfiguration lockConfiguration, final StorageConfiguration<?> conf, DynamoDbClient dynamoDB) {
Expand Down Expand Up @@ -215,7 +215,7 @@ private void createLockTableInDynamoDB(DynamoDbClient dynamoDB, String tableName
LOG.info("Created dynamoDB table " + tableName);
}

private String generateLogSuffixString() {
protected String generateLogSuffixString() {
return StringUtils.join("DynamoDb table = ", tableName, ", partition key = ", dynamoDBPartitionKey);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,6 @@ public static DynamoDbBasedLockConfig.Builder newBuilder() {
return new DynamoDbBasedLockConfig.Builder();
}

// The max length of DDB partition key allowed.
public static final int MAX_PARTITION_KEY_SIZE_BYTE = 2048;

// configs for DynamoDb based locks
public static final String DYNAMODB_BASED_LOCK_PROPERTY_PREFIX = LockConfiguration.LOCK_PREFIX + "dynamodb.";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,17 +114,6 @@ public static void setup() throws InterruptedException {
dynamoDb = getDynamoClientWithLocalEndpoint();
}

public static Stream<Object> testDimensions() {
return Stream.of(
// Without parititon key, only table name is used.
Arguments.of(IMPLICIT_PART_KEY_LOCK_CONFIG, DynamoDBBasedLockProvider.class),
Arguments.of(LOCK_CONFIGURATION, DynamoDBBasedLockProvider.class),
// Even if we have partition key set, nothing would break.
Arguments.of(IMPLICIT_PART_KEY_LOCK_CONFIG_WITH_PART_KEY, DynamoDBBasedImplicitPartitionKeyLockProvider.class),
Arguments.of(IMPLICIT_PART_KEY_LOCK_CONFIG, DynamoDBBasedImplicitPartitionKeyLockProvider.class)
);
}

public static Stream<Object> badTestDimensions() {
return Stream.of(
Arguments.of(IMPLICIT_PART_KEY_LOCK_CONFIG_NO_TBL_NAME, DynamoDBBasedLockProvider.class),
Expand Down Expand Up @@ -165,6 +154,18 @@ void testBadConfig(LockConfiguration lockConfig, Class<?> lockProviderClass) {
Assertions.assertEquals(IllegalArgumentException.class, e.getCause().getCause().getClass());
}

public static Stream<Arguments> testDimensions() {
return Stream.of(
// Without partition key, only table name is used.
Arguments.of(IMPLICIT_PART_KEY_LOCK_CONFIG, DynamoDBBasedLockProvider.class),
// Even if we have partition key set, nothing would break.
Arguments.of(LOCK_CONFIGURATION, DynamoDBBasedLockProvider.class),
// Even if we have partition key set, nothing would break.
Arguments.of(IMPLICIT_PART_KEY_LOCK_CONFIG_WITH_PART_KEY, DynamoDBBasedImplicitPartitionKeyLockProvider.class),
Arguments.of(IMPLICIT_PART_KEY_LOCK_CONFIG, DynamoDBBasedImplicitPartitionKeyLockProvider.class)
);
}

@ParameterizedTest
@MethodSource("testDimensions")
void testAcquireLock(LockConfiguration lockConfig, Class<?> lockProviderClass) {
Expand All @@ -187,11 +188,14 @@ void testAcquireLock(LockConfiguration lockConfig, Class<?> lockProviderClass) {
String tableName = (String) lockConfig.getConfig().get(HoodieTableConfig.HOODIE_TABLE_NAME_KEY);
String basePath = (String) lockConfig.getConfig().get(HoodieCommonConfig.BASE_PATH.key());
// Base path is constructed with prefix s3a, verify that for partition key calculation, s3a is replaced with s3
Assertions.assertTrue(basePath.startsWith(SCHEME_S3A));
// Verify base path only scheme partition key
Assertions.assertEquals(
tableName + '-' + HashID.generateXXHashAsString(SCHEME_S3 + URI_NO_CLOUD_PROVIDER_PREFIX, HashID.Size.BITS_64),
HashID.generateXXHashAsString(SCHEME_S3 + URI_NO_CLOUD_PROVIDER_PREFIX, HashID.Size.BITS_64),
dynamoDbBasedLockProvider.getPartitionKey());
Assertions.assertTrue(basePath.startsWith(SCHEME_S3A));
}

// Test lock acquisition and release
Assertions.assertTrue(dynamoDbBasedLockProvider.tryLock(lockConfig.getConfig().getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS));
dynamoDbBasedLockProvider.unlock();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,6 @@ public abstract class BaseZookeeperBasedLockProvider implements LockProvider<Int
protected final String zkBasePath;
protected final String lockKey;

public static final int MAX_ZK_BASE_PATH_NUM_BYTES = 4096;

public BaseZookeeperBasedLockProvider(final LockConfiguration lockConfiguration, final StorageConfiguration<?> conf) {
checkRequiredProps(lockConfiguration);
this.lockConfiguration = lockConfiguration;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,17 @@
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.config.LockConfiguration;
import org.apache.hudi.common.lock.LockProvider;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.hash.HashID;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.common.util.StringUtils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.concurrent.NotThreadSafe;

import static org.apache.hudi.aws.utils.S3Utils.s3aToS3;
import static org.apache.hudi.common.util.StringUtils.concatenateWithThreshold;

/**
* A zookeeper based lock. This {@link LockProvider} implementation allows to lock table operations
Expand All @@ -41,31 +42,38 @@
*/
@NotThreadSafe
public class ZookeeperBasedImplicitBasePathLockProvider extends BaseZookeeperBasedLockProvider {
private static final Logger LOG = LoggerFactory.getLogger(ZookeeperBasedImplicitBasePathLockProvider.class);

public static final String LOCK_KEY = "lock_key";
private final String hudiTableBasePath;

public static String getLockBasePath(String hudiTableBasePath, String hudiTableName) {
public static String getLockBasePath(String hudiTableBasePath) {
// Ensure consistent format for S3 URI.
String hashPart = '-' + HashID.generateXXHashAsString(s3aToS3(hudiTableBasePath), HashID.Size.BITS_64);
String folderName = concatenateWithThreshold(hudiTableName, hashPart, MAX_ZK_BASE_PATH_NUM_BYTES);
return "/tmp/" + folderName;
String lockBasePath = "/tmp/" + HashID.generateXXHashAsString(s3aToS3(hudiTableBasePath), HashID.Size.BITS_64);
LOG.info(String.format("The Zookeeper lock key for the base path %s is %s", hudiTableBasePath, lockBasePath));
return lockBasePath;
}

public ZookeeperBasedImplicitBasePathLockProvider(final LockConfiguration lockConfiguration, final StorageConfiguration<?> conf) {
super(lockConfiguration, conf);
hudiTableBasePath = s3aToS3(lockConfiguration.getConfig().getString(HoodieCommonConfig.BASE_PATH.key()));
}

@Override
protected String getZkBasePath(LockConfiguration lockConfiguration) {
String hudiTableBasePath = ConfigUtils.getStringWithAltKeys(lockConfiguration.getConfig(), HoodieCommonConfig.BASE_PATH);
String hudiTableName = ConfigUtils.getStringWithAltKeys(lockConfiguration.getConfig(), HoodieTableConfig.NAME);
String hudiTableBasePath = lockConfiguration.getConfig().getString(HoodieCommonConfig.BASE_PATH.key());
ValidationUtils.checkArgument(hudiTableBasePath != null);
ValidationUtils.checkArgument(hudiTableName != null);
return getLockBasePath(hudiTableBasePath, hudiTableName);
return getLockBasePath(hudiTableBasePath);
}

@Override
protected String getLockKey(LockConfiguration lockConfiguration) {
return LOCK_KEY;
}

@Override
protected String generateLogSuffixString() {
return StringUtils.join("ZkBasePath = ", zkBasePath,
", lock key = ", lockKey, ", hudi table base path = ", hudiTableBasePath);
}
}

0 comments on commit fbff054

Please sign in to comment.