Skip to content

Commit

Permalink
Update expiration time based on ttl on the source coordinator item.
Browse files Browse the repository at this point in the history
Signed-off-by: Souvik Bose <souvbose@amazon.com>
  • Loading branch information
sbose2k21 committed Dec 14, 2024
1 parent 956a89a commit 2cb110d
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,8 @@ public Optional<SourcePartitionStoreItem> getAvailablePartition(final String own
final Duration ownershipTimeout,
final SourcePartitionStatus sourcePartitionStatus,
final String sourceStatusCombinationKey,
final int pageLimit) {
final int pageLimit,
final Duration ttl) {
try {

final DynamoDbIndex<DynamoDbSourcePartitionItem> sourceStatusIndex = table.index(SOURCE_STATUS_COMBINATION_KEY_GLOBAL_SECONDARY_INDEX);
Expand Down Expand Up @@ -273,8 +274,11 @@ public Optional<SourcePartitionStoreItem> getAvailablePartition(final String own
item.setSourcePartitionStatus(SourcePartitionStatus.ASSIGNED);
item.setSourceStatusCombinationKey(String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, item.getSourceIdentifier(), SourcePartitionStatus.ASSIGNED));
item.setPartitionPriority(partitionOwnershipTimeout.toString());
final boolean acquired = this.tryAcquirePartitionItem(item);
if (Objects.nonNull(ttl)) {
item.setExpirationTime(Instant.now().plus(ttl).getEpochSecond());
}

final boolean acquired = this.tryAcquirePartitionItem(item);
if (acquired) {
return Optional.of(item);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,23 +98,23 @@ public boolean tryCreatePartitionItem(final String sourceIdentifier,
public Optional<SourcePartitionStoreItem> tryAcquireAvailablePartition(final String sourceIdentifier, final String ownerId, final Duration ownershipTimeout) {
final Optional<SourcePartitionStoreItem> acquiredAssignedItem = dynamoDbClientWrapper.getAvailablePartition(
ownerId, ownershipTimeout, SourcePartitionStatus.ASSIGNED,
String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, sourceIdentifier, SourcePartitionStatus.ASSIGNED), 1);
String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, sourceIdentifier, SourcePartitionStatus.ASSIGNED), 1, dynamoStoreSettings.getTtl());

if (acquiredAssignedItem.isPresent()) {
return acquiredAssignedItem;
}

final Optional<SourcePartitionStoreItem> acquiredUnassignedItem = dynamoDbClientWrapper.getAvailablePartition(
ownerId, ownershipTimeout, SourcePartitionStatus.UNASSIGNED,
String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, sourceIdentifier, SourcePartitionStatus.UNASSIGNED), 5);
String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, sourceIdentifier, SourcePartitionStatus.UNASSIGNED), 5, dynamoStoreSettings.getTtl());

if (acquiredUnassignedItem.isPresent()) {
return acquiredUnassignedItem;
}

return dynamoDbClientWrapper.getAvailablePartition(
ownerId, ownershipTimeout, SourcePartitionStatus.CLOSED,
String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, sourceIdentifier, SourcePartitionStatus.CLOSED), 1);
String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, sourceIdentifier, SourcePartitionStatus.CLOSED), 1, dynamoStoreSettings.getTtl());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.notNullValue;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
Expand Down Expand Up @@ -464,8 +465,10 @@ void getAvailablePartition_with_no_items_from_query_returns_empty_optional(final

final int pageLimit = new Random().nextInt(20);

final Duration ttl = Duration.ofSeconds(new Random().nextInt());

final Optional<SourcePartitionStoreItem> result = objectUnderTest.getAvailablePartition(
ownerId, ownershipTimeout, SourcePartitionStatus.valueOf(sourcePartitionStatus), sourceStatusCombinationKey, pageLimit);
ownerId, ownershipTimeout, SourcePartitionStatus.valueOf(sourcePartitionStatus), sourceStatusCombinationKey, pageLimit, ttl);

assertThat(result.isEmpty(), equalTo(true));

Expand Down Expand Up @@ -513,8 +516,10 @@ void getAvailablePartition_will_continue_until_tryAcquirePartition_succeeds(fina
final DynamoDbClientWrapper objectUnderTest = createObjectUnderTest();
reflectivelySetField(objectUnderTest, "table", table);

final Duration ttl = Duration.ofSeconds(new Random().nextInt());

final Optional<SourcePartitionStoreItem> result = objectUnderTest.getAvailablePartition(
ownerId, ownershipTimeout, SourcePartitionStatus.valueOf(sourcePartitionStatus), sourceStatusCombinationKey, new Random().nextInt(20));
ownerId, ownershipTimeout, SourcePartitionStatus.valueOf(sourcePartitionStatus), sourceStatusCombinationKey, new Random().nextInt(20), ttl);

assertThat(result.isPresent(), equalTo(true));
assertThat(result.get(), equalTo(acquiredItem));
Expand All @@ -531,6 +536,10 @@ void getAvailablePartition_will_continue_until_tryAcquirePartition_succeeds(fina

assertThat(newPartitionOwnershipTimeout.isAfter(now.plus(ownershipTimeout)), equalTo(true));

final ArgumentCaptor<Long> expiryTimeArgumentCaptor = ArgumentCaptor.forClass(Long.class);
verify(acquiredItem).setExpirationTime(expiryTimeArgumentCaptor.capture());
assertThat(expiryTimeArgumentCaptor.getValue(), greaterThan(Instant.now().getEpochSecond()));

verify(acquiredItem).setPartitionPriority(newPartitionOwnershipTimeout.toString());
}

Expand Down Expand Up @@ -574,8 +583,10 @@ void getAvailablePartition_with_multiple_pages_continue_until_tryAcquirePartitio
final DynamoDbClientWrapper objectUnderTest = createObjectUnderTest();
reflectivelySetField(objectUnderTest, "table", table);

final Duration ttl = Duration.ofSeconds(new Random().nextInt());

final Optional<SourcePartitionStoreItem> result = objectUnderTest.getAvailablePartition(
ownerId, ownershipTimeout, SourcePartitionStatus.valueOf(sourcePartitionStatus), sourceStatusCombinationKey, new Random().nextInt(20));
ownerId, ownershipTimeout, SourcePartitionStatus.valueOf(sourcePartitionStatus), sourceStatusCombinationKey, new Random().nextInt(20), ttl);

assertThat(result.isPresent(), equalTo(true));
assertThat(result.get(), equalTo(acquiredItem));
Expand All @@ -593,6 +604,10 @@ void getAvailablePartition_with_multiple_pages_continue_until_tryAcquirePartitio
assertThat(newPartitionOwnershipTimeout.isAfter(now.plus(ownershipTimeout)), equalTo(true));

verify(acquiredItem).setPartitionPriority(newPartitionOwnershipTimeout.toString());

final ArgumentCaptor<Long> expiryTimeArgumentCaptor = ArgumentCaptor.forClass(Long.class);
verify(acquiredItem).setExpirationTime(expiryTimeArgumentCaptor.capture());
assertThat(expiryTimeArgumentCaptor.getValue(), greaterThan(Instant.now().getEpochSecond()));
}

@ParameterizedTest
Expand Down Expand Up @@ -635,8 +650,10 @@ void getAvailablePartition_with_multiple_pages_will_iterate_through_all_items_wi
final DynamoDbClientWrapper objectUnderTest = createObjectUnderTest();
reflectivelySetField(objectUnderTest, "table", table);

final Duration ttl = Duration.ofSeconds(new Random().nextInt());

final Optional<SourcePartitionStoreItem> result = objectUnderTest.getAvailablePartition(
ownerId, ownershipTimeout, SourcePartitionStatus.valueOf(sourcePartitionStatus), sourceStatusCombinationKey, new Random().nextInt(20));
ownerId, ownershipTimeout, SourcePartitionStatus.valueOf(sourcePartitionStatus), sourceStatusCombinationKey, new Random().nextInt(20), ttl);

assertThat(result.isEmpty(), equalTo(true));

Expand All @@ -653,6 +670,10 @@ void getAvailablePartition_with_multiple_pages_will_iterate_through_all_items_wi
assertThat(newPartitionOwnershipTimeout.isAfter(now.plus(ownershipTimeout)), equalTo(true));

verify(acquiredItem).setPartitionPriority(newPartitionOwnershipTimeout.toString());

final ArgumentCaptor<Long> expiryTimeArgumentCaptor = ArgumentCaptor.forClass(Long.class);
verify(acquiredItem).setExpirationTime(expiryTimeArgumentCaptor.capture());
assertThat(expiryTimeArgumentCaptor.getValue(), greaterThan(Instant.now().getEpochSecond()));
}

@Test
Expand Down Expand Up @@ -681,7 +702,7 @@ void getAvailablePartition_with_assigned_partition_with_unexpired_partitionOwner
reflectivelySetField(objectUnderTest, "table", table);

final Optional<SourcePartitionStoreItem> result = objectUnderTest.getAvailablePartition(
ownerId, ownershipTimeout, SourcePartitionStatus.ASSIGNED, sourceStatusCombinationKey, new Random().nextInt(20));
ownerId, ownershipTimeout, SourcePartitionStatus.ASSIGNED, sourceStatusCombinationKey, new Random().nextInt(20), Duration.ofSeconds(new Random().nextInt()));

assertThat(result.isEmpty(), equalTo(true));

Expand Down Expand Up @@ -716,7 +737,7 @@ void getAvailablePartition_with_closed_partition_with_unreached_reOpenAt_time_re
reflectivelySetField(objectUnderTest, "table", table);

final Optional<SourcePartitionStoreItem> result = objectUnderTest.getAvailablePartition(
ownerId, ownershipTimeout, SourcePartitionStatus.CLOSED, sourceStatusCombinationKey, new Random().nextInt(20));
ownerId, ownershipTimeout, SourcePartitionStatus.CLOSED, sourceStatusCombinationKey, new Random().nextInt(20), Duration.ofSeconds(new Random().nextInt()));

assertThat(result.isEmpty(), equalTo(true));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,21 +245,22 @@ void getAvailablePartition_with_no_item_acquired_returns_empty_optional() {
final String ownerId = UUID.randomUUID().toString();
final String sourceIdentifier = UUID.randomUUID().toString();
final Duration ownershipTimeout = Duration.ofMinutes(2);
final Duration ttl = Duration.ofSeconds(new Random().nextInt());

given(dynamoDbClientWrapper.getAvailablePartition(ownerId, ownershipTimeout,
SourcePartitionStatus.ASSIGNED,
String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, sourceIdentifier, SourcePartitionStatus.ASSIGNED),
1))
1, ttl))
.willReturn(Optional.empty());
given(dynamoDbClientWrapper.getAvailablePartition(ownerId, ownershipTimeout,
SourcePartitionStatus.CLOSED,
String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, sourceIdentifier, SourcePartitionStatus.CLOSED),
1))
1, ttl))
.willReturn(Optional.empty());
given(dynamoDbClientWrapper.getAvailablePartition(ownerId, ownershipTimeout,
SourcePartitionStatus.UNASSIGNED,
String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, sourceIdentifier, SourcePartitionStatus.UNASSIGNED),
5))
5, ttl))
.willReturn(Optional.empty());

final Optional<SourcePartitionStoreItem> result = createObjectUnderTest().tryAcquireAvailablePartition(sourceIdentifier, ownerId, ownershipTimeout);
Expand All @@ -272,13 +273,14 @@ void getAvailablePartition_with_acquired_ASSIGNED_partition_returns_the_partitio
final String ownerId = UUID.randomUUID().toString();
final String sourceIdentifier = UUID.randomUUID().toString();
final Duration ownershipTimeout = Duration.ofMinutes(2);
final Duration ttl = Duration.ofSeconds(new Random().nextInt());

final DynamoDbSourcePartitionItem acquiredItem = mock(DynamoDbSourcePartitionItem.class);

given(dynamoDbClientWrapper.getAvailablePartition(ownerId, ownershipTimeout,
SourcePartitionStatus.ASSIGNED,
String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, sourceIdentifier, SourcePartitionStatus.ASSIGNED),
1))
1, ttl))
.willReturn(Optional.of(acquiredItem));

final Optional<SourcePartitionStoreItem> result = createObjectUnderTest().tryAcquireAvailablePartition(sourceIdentifier, ownerId, ownershipTimeout);
Expand All @@ -294,23 +296,24 @@ void getAvailablePartition_with_acquired_CLOSED_partition_returns_the_partition(
final String ownerId = UUID.randomUUID().toString();
final String sourceIdentifier = UUID.randomUUID().toString();
final Duration ownershipTimeout = Duration.ofMinutes(2);
final Duration ttl = Duration.ofSeconds(new Random().nextInt());

final DynamoDbSourcePartitionItem acquiredItem = mock(DynamoDbSourcePartitionItem.class);

given(dynamoDbClientWrapper.getAvailablePartition(ownerId, ownershipTimeout,
SourcePartitionStatus.ASSIGNED,
String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, sourceIdentifier, SourcePartitionStatus.ASSIGNED),
1))
1, ttl))
.willReturn(Optional.empty());
given(dynamoDbClientWrapper.getAvailablePartition(ownerId, ownershipTimeout,
SourcePartitionStatus.UNASSIGNED,
String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, sourceIdentifier, SourcePartitionStatus.UNASSIGNED),
5))
5, ttl))
.willReturn(Optional.empty());
given(dynamoDbClientWrapper.getAvailablePartition(ownerId, ownershipTimeout,
SourcePartitionStatus.CLOSED,
String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, sourceIdentifier, SourcePartitionStatus.CLOSED),
1))
1, ttl))
.willReturn(Optional.of(acquiredItem));

final Optional<SourcePartitionStoreItem> result = createObjectUnderTest().tryAcquireAvailablePartition(sourceIdentifier, ownerId, ownershipTimeout);
Expand All @@ -326,18 +329,19 @@ void getAvailablePartition_with_acquired_UNASSIGNED_partition_returns_the_partit
final String ownerId = UUID.randomUUID().toString();
final String sourceIdentifier = UUID.randomUUID().toString();
final Duration ownershipTimeout = Duration.ofMinutes(2);
final Duration ttl = Duration.ofSeconds(new Random().nextInt());

final DynamoDbSourcePartitionItem acquiredItem = mock(DynamoDbSourcePartitionItem.class);

given(dynamoDbClientWrapper.getAvailablePartition(ownerId, ownershipTimeout,
SourcePartitionStatus.ASSIGNED,
String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, sourceIdentifier, SourcePartitionStatus.ASSIGNED),
1))
1, ttl))
.willReturn(Optional.empty());
given(dynamoDbClientWrapper.getAvailablePartition(ownerId, ownershipTimeout,
SourcePartitionStatus.UNASSIGNED,
String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, sourceIdentifier, SourcePartitionStatus.UNASSIGNED),
5))
5, ttl))
.willReturn(Optional.of(acquiredItem));

final Optional<SourcePartitionStoreItem> result = createObjectUnderTest().tryAcquireAvailablePartition(sourceIdentifier, ownerId, ownershipTimeout);
Expand Down

0 comments on commit 2cb110d

Please sign in to comment.