Skip to content

Commit

Permalink
Fix for bulk operations incorrect routing on split (Azure#21420)
Browse files Browse the repository at this point in the history
* Fixing a scenario where bulk operations fail with 410 on container split.

* Splits are taking longer somehow! So increasing the timeout for QueryValidationTests::splitQueryContinuationToken

* Splits are taking longer somehow! So increasing the timeout for readFeedDocumentsAfterSplit()

* Refactoring code
Implementing PR comments
Increasing timeout on tests
  • Loading branch information
mbhaskar authored May 17, 2021
1 parent 4bae24d commit 1ab1784
Show file tree
Hide file tree
Showing 7 changed files with 245 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -264,29 +264,37 @@ private Mono<CosmosBulkOperationResponse<TContext>> handleTransactionalBatchExec

// First check if it failed due to split, so the operations need to go in a different pk range group. So
// add it in the mainSink.
if (cosmosException.getStatusCode() == HttpResponseStatus.GONE.code() &&
itemBulkOperation.getRetryPolicy().shouldRetryForGone(
cosmosException.getStatusCode(),
cosmosException.getSubStatusCode())) {

mainSink.next(itemOperation);
return itemBulkOperation.getRetryPolicy()
.shouldRetryForGone(cosmosException.getStatusCode(), cosmosException.getSubStatusCode())
.flatMap(shouldRetryGone -> {
if (shouldRetryGone) {
mainSink.next(itemOperation);
return Mono.empty();
} else {
return retryOtherExceptions(itemOperation, exception, groupSink, cosmosException,
itemBulkOperation);
}
});
}

return Mono.just(BridgeInternal.createCosmosBulkOperationResponse(itemOperation, exception, this.batchContext));
}

private Mono<CosmosBulkOperationResponse<TContext>> retryOtherExceptions(
CosmosItemOperation itemOperation, Exception exception, FluxSink<CosmosItemOperation> groupSink,
CosmosException cosmosException, ItemBulkOperation<?> itemBulkOperation) {
return itemBulkOperation.getRetryPolicy().shouldRetry(cosmosException).flatMap(result -> {
if (result.shouldRetry) {

groupSink.next(itemOperation);
return Mono.empty();
} else {
return itemBulkOperation.getRetryPolicy().shouldRetry(cosmosException).flatMap(result -> {
if (result.shouldRetry) {

groupSink.next(itemOperation);
return Mono.empty();
} else {

return Mono.just(BridgeInternal.createCosmosBulkOperationResponse(
itemOperation, exception, this.batchContext));
}
});
return Mono.just(BridgeInternal.createCosmosBulkOperationResponse(
itemOperation, exception, this.batchContext));
}
}

return Mono.just(BridgeInternal.createCosmosBulkOperationResponse(itemOperation, exception, this.batchContext));
});
}

private Mono<TransactionalBatchResponse> executeBatchRequest(PartitionKeyRangeServerBatchRequest serverRequest) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ static void setRetryPolicyForBulk(

BulkOperationRetryPolicy bulkRetryPolicy = new BulkOperationRetryPolicy(
docClientWrapper.getCollectionCache(),
docClientWrapper.getPartitionKeyRangeCache(),
BridgeInternal.getLink(container),
resourceThrottleRetryPolicy);
itemBulkOperation.setRetryPolicy(bulkRetryPolicy);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,16 @@
import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.CosmosException;
import com.azure.cosmos.TransactionalBatchOperationResult;
import com.azure.cosmos.implementation.DocumentClientRetryPolicy;
import com.azure.cosmos.implementation.HttpConstants.StatusCodes;
import com.azure.cosmos.implementation.HttpConstants.SubStatusCodes;
import com.azure.cosmos.implementation.IRetryPolicy;
import com.azure.cosmos.implementation.ResourceThrottleRetryPolicy;
import com.azure.cosmos.implementation.RetryContext;
import com.azure.cosmos.implementation.RxDocumentServiceRequest;
import com.azure.cosmos.implementation.ShouldRetryResult;
import com.azure.cosmos.implementation.Utils;
import com.azure.cosmos.implementation.caches.RxCollectionCache;
import com.azure.cosmos.implementation.caches.RxPartitionKeyRangeCache;
import com.azure.cosmos.implementation.feedranges.FeedRangeEpkImpl;
import reactor.core.publisher.Mono;

import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull;
Expand All @@ -28,16 +28,19 @@ final class BulkOperationRetryPolicy implements IRetryPolicy {
private static final int MAX_RETRIES = 1;

private final RxCollectionCache collectionCache;
private final RxPartitionKeyRangeCache partitionKeyRangeCache;
private final String collectionLink;
private final ResourceThrottleRetryPolicy resourceThrottleRetryPolicy;
private int attemptedRetries;

BulkOperationRetryPolicy(
RxCollectionCache collectionCache,
RxPartitionKeyRangeCache partitionKeyRangeCache,
String resourceFullName,
ResourceThrottleRetryPolicy resourceThrottleRetryPolicy) {

this.collectionCache = collectionCache;
this.partitionKeyRangeCache = partitionKeyRangeCache;

// Similar to PartitionKeyMismatchRetryPolicy constructor.
collectionLink = Utils.getCollectionName(resourceFullName);
Expand Down Expand Up @@ -77,25 +80,35 @@ public RetryContext getRetryContext() {
return this.resourceThrottleRetryPolicy.getRetryContext();
}

boolean shouldRetryForGone(int statusCode, int subStatusCode) {

if (statusCode == StatusCodes.GONE
&& (subStatusCode == SubStatusCodes.PARTITION_KEY_RANGE_GONE ||
subStatusCode == SubStatusCodes.NAME_CACHE_IS_STALE ||
subStatusCode == SubStatusCodes.COMPLETING_SPLIT ||
subStatusCode == SubStatusCodes.COMPLETING_PARTITION_MIGRATION)
&& this.attemptedRetries < MAX_RETRIES) {
Mono<Boolean> shouldRetryForGone(int statusCode, int subStatusCode) {
if (statusCode == StatusCodes.GONE) {
if (this.attemptedRetries++ > MAX_RETRIES) {
return Mono.just(false);
}

this.attemptedRetries++;
if ((subStatusCode == SubStatusCodes.PARTITION_KEY_RANGE_GONE ||
subStatusCode == SubStatusCodes.COMPLETING_SPLIT ||
subStatusCode == SubStatusCodes.COMPLETING_PARTITION_MIGRATION)) {
return collectionCache
.resolveByNameAsync(null, collectionLink, null)
.flatMap(collection -> this.partitionKeyRangeCache
.tryGetOverlappingRangesAsync(null /*metaDataDiagnosticsContext*/,
collection.getResourceId(),
FeedRangeEpkImpl.forFullRange()
.getRange(),
true,
null /*properties*/)
.then(Mono.just(true)));
}

if (subStatusCode == SubStatusCodes.NAME_CACHE_IS_STALE) {
refreshCollectionCache();
}

return true;
return Mono.just(true);
}

return false;
return Mono.just(false);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,27 @@
// Licensed under the MIT License.
package com.azure.cosmos.implementation.caches;

import com.azure.cosmos.implementation.DiagnosticsClientContext;
import com.azure.cosmos.implementation.MetadataDiagnosticsContext;
import com.azure.cosmos.implementation.RxDocumentClientImpl;
import com.azure.cosmos.implementation.apachecommons.collections.CollectionUtils;
import com.azure.cosmos.implementation.routing.CollectionRoutingMap;
import com.azure.cosmos.implementation.routing.InMemoryCollectionRoutingMap;
import com.azure.cosmos.implementation.routing.Range;
import com.azure.cosmos.implementation.AsyncDocumentClient;
import com.azure.cosmos.CosmosException;
import com.azure.cosmos.implementation.DiagnosticsClientContext;
import com.azure.cosmos.implementation.DocumentCollection;
import com.azure.cosmos.models.ModelBridgeInternal;
import com.azure.cosmos.models.CosmosQueryRequestOptions;
import com.azure.cosmos.implementation.NotFoundException;
import com.azure.cosmos.implementation.Exceptions;
import com.azure.cosmos.implementation.HttpConstants;
import com.azure.cosmos.implementation.MetadataDiagnosticsContext;
import com.azure.cosmos.implementation.NotFoundException;
import com.azure.cosmos.implementation.OperationType;
import com.azure.cosmos.implementation.PartitionKeyRange;
import com.azure.cosmos.implementation.ResourceType;
import com.azure.cosmos.implementation.RxDocumentClientImpl;
import com.azure.cosmos.implementation.RxDocumentServiceRequest;
import com.azure.cosmos.implementation.Utils;
import com.azure.cosmos.implementation.routing.IServerIdentity;
import com.azure.cosmos.implementation.apachecommons.collections.CollectionUtils;
import com.azure.cosmos.implementation.apachecommons.lang.tuple.ImmutablePair;
import com.azure.cosmos.implementation.routing.CollectionRoutingMap;
import com.azure.cosmos.implementation.routing.IServerIdentity;
import com.azure.cosmos.implementation.routing.InMemoryCollectionRoutingMap;
import com.azure.cosmos.implementation.routing.Range;
import com.azure.cosmos.models.CosmosQueryRequestOptions;
import com.azure.cosmos.models.ModelBridgeInternal;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
Expand Down Expand Up @@ -248,4 +247,3 @@ private Mono<List<PartitionKeyRange>> getPartitionKeyRange(MetadataDiagnosticsCo
});
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.cosmos;

import com.azure.cosmos.implementation.AsyncDocumentClient;
import com.azure.cosmos.implementation.PartitionKeyRange;
import com.azure.cosmos.models.CosmosContainerProperties;
import com.azure.cosmos.models.CosmosContainerResponse;
import com.azure.cosmos.models.CosmosQueryRequestOptions;
import com.azure.cosmos.models.FeedResponse;
import com.azure.cosmos.models.PartitionKey;
import com.azure.cosmos.models.ThroughputProperties;
import com.azure.cosmos.models.ThroughputResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Factory;
import org.testng.annotations.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;

import static org.assertj.core.api.Assertions.assertThat;

public class CosmosBulkGatewayTest extends BatchTestBase {
private final static Logger logger = LoggerFactory.getLogger(CosmosBulkAsyncTest.class);

private CosmosAsyncClient bulkClient;
private CosmosAsyncDatabase createdDatabase;

@Factory(dataProvider = "simpleClientBuilderGatewaySession")
public CosmosBulkGatewayTest(CosmosClientBuilder clientBuilder) {
super(clientBuilder);
}

@BeforeClass(groups = {"simple"}, timeOut = SETUP_TIMEOUT)
public void before_CosmosBulkAsyncTest() {
assertThat(this.bulkClient).isNull();
this.bulkClient = getClientBuilder().buildAsyncClient();
createdDatabase = getSharedCosmosDatabase(this.bulkClient);
}

@AfterClass(groups = {"simple"}, timeOut = SHUTDOWN_TIMEOUT, alwaysRun = true)
public void afterClass() {
safeCloseAsync(this.bulkClient);
}

@Test(groups = {"simple"}, timeOut = TIMEOUT * 20)
public void createItem_withBulk_split() throws InterruptedException {
String containerId = "bulksplittestcontainer_" + UUID.randomUUID();
int totalRequest = getTotalRequest();
CosmosContainerProperties containerProperties = new CosmosContainerProperties(containerId, "/mypk");
CosmosContainerResponse containerResponse = createdDatabase.createContainer(containerProperties).block();
CosmosAsyncContainer container = createdDatabase.getContainer(containerId);

Flux<CosmosItemOperation> cosmosItemOperationFlux1 = Flux.range(0, totalRequest).map(i -> {
String partitionKey = UUID.randomUUID().toString();
TestDoc testDoc = this.populateTestDoc(partitionKey);

return BulkOperations.getCreateItemOperation(testDoc, new PartitionKey(partitionKey));
});

Flux<CosmosItemOperation> cosmosItemOperationFlux2 = Flux.range(0, totalRequest).map(i -> {
String partitionKey = UUID.randomUUID().toString();
EventDoc eventDoc = new EventDoc(UUID.randomUUID().toString(), 2, 4, "type1", partitionKey);

return BulkOperations.getCreateItemOperation(eventDoc, new PartitionKey(partitionKey));
});

BulkProcessingOptions<CosmosBulkAsyncTest> bulkProcessingOptions = new BulkProcessingOptions<>();
bulkProcessingOptions.setMaxMicroBatchSize(100);
bulkProcessingOptions.setMaxMicroBatchConcurrency(5);

Flux<CosmosBulkOperationResponse<CosmosBulkAsyncTest>> responseFlux =
container.processBulkOperations(cosmosItemOperationFlux1, bulkProcessingOptions);

AtomicInteger processedDoc = new AtomicInteger(0);
responseFlux
.flatMap((CosmosBulkOperationResponse<CosmosBulkAsyncTest> cosmosBulkOperationResponse) -> {

processedDoc.incrementAndGet();

CosmosBulkItemResponse cosmosBulkItemResponse = cosmosBulkOperationResponse.getResponse();
assertThat(cosmosBulkItemResponse.getStatusCode()).isEqualTo(HttpResponseStatus.CREATED.code());
assertThat(cosmosBulkItemResponse.getRequestCharge()).isGreaterThan(0);
assertThat(cosmosBulkItemResponse.getCosmosDiagnostics().toString()).isNotNull();
assertThat(cosmosBulkItemResponse.getSessionToken()).isNotNull();
assertThat(cosmosBulkItemResponse.getActivityId()).isNotNull();
assertThat(cosmosBulkItemResponse.getRequestCharge()).isNotNull();

return Mono.just(cosmosBulkItemResponse);
}).blockLast();

assertThat(processedDoc.get()).isEqualTo(totalRequest);

// introduce a split and continue bulk operations after split. The partition key range cache has to be
// refreshed and bulk processing should complete without errors
List<PartitionKeyRange> partitionKeyRanges = getPartitionKeyRanges(containerId, this.bulkClient);
// Scale up the throughput for a split
logger.info("Scaling up throughput for split");
ThroughputProperties throughputProperties = ThroughputProperties.createManualThroughput(16000);
ThroughputResponse throughputResponse = container.replaceThroughput(throughputProperties).block();
logger.info("Throughput replace request submitted for {} ",
throughputResponse.getProperties().getManualThroughput());
throughputResponse = container.readThroughput().block();


// Wait for the throughput update to complete so that we get the partition split
while (true) {
assert throughputResponse != null;
if (!throughputResponse.isReplacePending()) {
break;
}
logger.info("Waiting for split to complete");
Thread.sleep(10 * 1000);
throughputResponse = container.readThroughput().block();
}

// Read number of partitions. Should be greater than one
List<PartitionKeyRange> partitionKeyRangesAfterSplit = getPartitionKeyRanges(containerId,
this.bulkClient);
assertThat(partitionKeyRangesAfterSplit.size()).isGreaterThan(partitionKeyRanges.size())
.as("Partition ranges should increase after split");
logger.info("After split num partitions = {}", partitionKeyRangesAfterSplit.size());

responseFlux = container.processBulkOperations(cosmosItemOperationFlux2, bulkProcessingOptions);

AtomicInteger processedDoc2 = new AtomicInteger(0);
responseFlux
.flatMap((CosmosBulkOperationResponse<CosmosBulkAsyncTest> cosmosBulkOperationResponse) -> {

processedDoc2.incrementAndGet();

CosmosBulkItemResponse cosmosBulkItemResponse = cosmosBulkOperationResponse.getResponse();
assertThat(cosmosBulkItemResponse.getStatusCode()).isEqualTo(HttpResponseStatus.CREATED.code());
assertThat(cosmosBulkItemResponse.getRequestCharge()).isGreaterThan(0);
assertThat(cosmosBulkItemResponse.getCosmosDiagnostics().toString()).isNotNull();
assertThat(cosmosBulkItemResponse.getSessionToken()).isNotNull();
assertThat(cosmosBulkItemResponse.getActivityId()).isNotNull();
assertThat(cosmosBulkItemResponse.getRequestCharge()).isNotNull();

return Mono.just(cosmosBulkItemResponse);
}).blockLast();

assertThat(processedDoc.get()).isEqualTo(totalRequest);
container.delete().block();
}

@NotNull
private List<PartitionKeyRange> getPartitionKeyRanges(
String containerId, CosmosAsyncClient asyncClient) {
List<PartitionKeyRange> partitionKeyRanges = new ArrayList<>();
AsyncDocumentClient asyncDocumentClient = BridgeInternal.getContextClient(asyncClient);
List<FeedResponse<PartitionKeyRange>> partitionFeedResponseList = asyncDocumentClient
.readPartitionKeyRanges("/dbs/" + createdDatabase.getId()
+ "/colls/" + containerId,
new CosmosQueryRequestOptions())
.collectList().block();
partitionFeedResponseList.forEach(f -> partitionKeyRanges.addAll(f.getResults()));
return partitionKeyRanges;
}

private int getTotalRequest() {
int countRequest = new Random().nextInt(100) + 200;
logger.info("Total count of request for this test case: " + countRequest);

return countRequest;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,7 @@ public void staledLeaseAcquiring() throws InterruptedException {
}
}

@Test(groups = { "simple" }, timeOut = 50 * CHANGE_FEED_PROCESSOR_TIMEOUT)
@Test(groups = { "simple" }, timeOut = 160 * CHANGE_FEED_PROCESSOR_TIMEOUT)
public void readFeedDocumentsAfterSplit() throws InterruptedException {
CosmosAsyncContainer createdFeedCollectionForSplit = createFeedCollection(FEED_COLLECTION_THROUGHPUT_FOR_SPLIT);
CosmosAsyncContainer createdLeaseCollection = createLeaseCollection(LEASE_COLLECTION_THROUGHPUT);
Expand Down Expand Up @@ -584,7 +584,8 @@ public void readFeedDocumentsAfterSplit() throws InterruptedException {
.retryWhen(Retry.max(40).filter(throwable -> {
try {
log.warn("Retrying...");
Thread.sleep(CHANGE_FEED_PROCESSOR_TIMEOUT);
// Splits are taking longer, so increasing sleep time between retries
Thread.sleep(60 * CHANGE_FEED_PROCESSOR_TIMEOUT);
} catch (InterruptedException e) {
throw new RuntimeException("Interrupted exception", e);
}
Expand Down
Loading

0 comments on commit 1ab1784

Please sign in to comment.