Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bulk: Adds retry for patch operations #2390

Merged
merged 8 commits into from
May 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions Microsoft.Azure.Cosmos/src/BulkExecutionRetryPolicy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ namespace Microsoft.Azure.Cosmos
/// <see cref="ItemBatchOperationContext"/>
internal sealed class BulkExecutionRetryPolicy : IDocumentClientRetryPolicy
{
private const int SubstatusCodeBatchResponseSizeExceeded = 3402;
private const int MaxRetryOn410 = 10;
private readonly IDocumentClientRetryPolicy nextRetryPolicy;
private readonly OperationType operationType;
Expand Down Expand Up @@ -86,8 +87,6 @@ public void OnBeforeSendRequest(DocumentServiceRequest request)
this.nextRetryPolicy.OnBeforeSendRequest(request);
}

private bool IsReadRequest => this.operationType == OperationType.Read;

private async Task<ShouldRetryResult> ShouldRetryInternalAsync(
HttpStatusCode? statusCode,
SubStatusCodes? subStatusCode,
Expand Down Expand Up @@ -126,9 +125,9 @@ await partitionKeyRangeCache.TryGetOverlappingRangesAsync(
}

// Batch API can return 413 which means the response is bigger than 4Mb.
// Operations that exceed the 4Mb limit are returned as 413, while the operations within the 4Mb limit will be 200
if (this.IsReadRequest
&& statusCode == HttpStatusCode.RequestEntityTooLarge)
// Operations that exceed the 4Mb limit are returned as 413/3402, while the operations within the 4Mb limit will be 200
if (statusCode == HttpStatusCode.RequestEntityTooLarge
&& (int)subStatusCode == SubstatusCodeBatchResponseSizeExceeded)
{
return ShouldRetryResult.RetryAfter(TimeSpan.Zero);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,62 @@ private readonly BatchAsyncBatcherExecuteDelegate ExecutorWith413
return new PartitionKeyRangeBatchExecutionResult(request.PartitionKeyRangeId, request.Operations, batchresponse);
};

private readonly BatchAsyncBatcherExecuteDelegate ExecutorWith413_3402
= async (PartitionKeyRangeServerBatchRequest request, ITrace trace, CancellationToken cancellationToken) =>
{
List<TransactionalBatchOperationResult> results = new List<TransactionalBatchOperationResult>();
ItemBatchOperation[] arrayOperations = new ItemBatchOperation[request.Operations.Count];
int index = 0;
foreach (ItemBatchOperation operation in request.Operations)
{
if (index == 0)
{
// First operation is fine
results.Add(
new TransactionalBatchOperationResult(HttpStatusCode.OK)
{
ETag = operation.Id
});
}
else
{
// second operation is too big
results.Add(
new TransactionalBatchOperationResult(HttpStatusCode.RequestEntityTooLarge)
{
SubStatusCode = (SubStatusCodes)3402,
ETag = operation.Id
});
}

arrayOperations[index++] = operation;
}

MemoryStream responseContent = await new BatchResponsePayloadWriter(results).GeneratePayloadAsync();

SinglePartitionKeyServerBatchRequest batchRequest = await SinglePartitionKeyServerBatchRequest.CreateAsync(
partitionKey: null,
operations: new ArraySegment<ItemBatchOperation>(arrayOperations),
serializerCore: MockCosmosUtil.Serializer,
trace: trace,
cancellationToken: cancellationToken);

ResponseMessage responseMessage = new ResponseMessage((HttpStatusCode)207)
{
Content = responseContent
};

TransactionalBatchResponse batchresponse = await TransactionalBatchResponse.FromResponseMessageAsync(
responseMessage,
batchRequest,
MockCosmosUtil.Serializer,
true,
trace: trace,
CancellationToken.None);

return new PartitionKeyRangeBatchExecutionResult(request.PartitionKeyRangeId, request.Operations, batchresponse);
};

// The response will include all but 2 operation responses
private readonly BatchAsyncBatcherExecuteDelegate ExecutorWithLessResponses
= async (PartitionKeyRangeServerBatchRequest request, ITrace trace, CancellationToken cancellationToken) =>
Expand Down Expand Up @@ -610,7 +666,7 @@ public async Task RetrierGetsCalledOnOverFlow()
}

[TestMethod]
public async Task RetrierGetsCalledOn413_OnRead()
public async Task RetrierGetsCalledOn413_3402()
{
IDocumentClientRetryPolicy retryPolicy1 = new BulkExecutionRetryPolicy(
GetSplitEnabledContainer(),
Expand All @@ -622,48 +678,66 @@ public async Task RetrierGetsCalledOn413_OnRead()
OperationType.Read,
new ResourceThrottleRetryPolicy(1));

IDocumentClientRetryPolicy retryPolicy3 = new BulkExecutionRetryPolicy(
GetSplitEnabledContainer(),
OperationType.Create,
new ResourceThrottleRetryPolicy(1));

ItemBatchOperation operation1 = this.CreateItemBatchOperation();
ItemBatchOperation operation2 = this.CreateItemBatchOperation();
ItemBatchOperation operation3 = this.CreateItemBatchOperation();
operation1.AttachContext(new ItemBatchOperationContext(string.Empty, NoOpTrace.Singleton, retryPolicy1));
operation2.AttachContext(new ItemBatchOperationContext(string.Empty, NoOpTrace.Singleton, retryPolicy2));
operation3.AttachContext(new ItemBatchOperationContext(string.Empty, NoOpTrace.Singleton, retryPolicy3));

Mock<BatchAsyncBatcherRetryDelegate> retryDelegate = new Mock<BatchAsyncBatcherRetryDelegate>();

BatchAsyncBatcher batchAsyncBatcher = new BatchAsyncBatcher(2, 1000, MockCosmosUtil.Serializer, this.ExecutorWith413, retryDelegate.Object, BatchAsyncBatcherTests.MockClientContext());
BatchAsyncBatcher batchAsyncBatcher = new BatchAsyncBatcher(3, 1000, MockCosmosUtil.Serializer, this.ExecutorWith413_3402, retryDelegate.Object, BatchAsyncBatcherTests.MockClientContext());
Assert.IsTrue(batchAsyncBatcher.TryAdd(operation1));
Assert.IsTrue(batchAsyncBatcher.TryAdd(operation2));
Assert.IsTrue(batchAsyncBatcher.TryAdd(operation3));
await batchAsyncBatcher.DispatchAsync(metric);
retryDelegate.Verify(a => a(It.Is<ItemBatchOperation>(o => o == operation1), It.IsAny<CancellationToken>()), Times.Never);
retryDelegate.Verify(a => a(It.Is<ItemBatchOperation>(o => o == operation2), It.IsAny<CancellationToken>()), Times.Once);
retryDelegate.Verify(a => a(It.IsAny<ItemBatchOperation>(), It.IsAny<CancellationToken>()), Times.Once);
retryDelegate.Verify(a => a(It.Is<ItemBatchOperation>(o => o == operation2), It.IsAny<CancellationToken>()), Times.Once);
retryDelegate.Verify(a => a(It.IsAny<ItemBatchOperation>(), It.IsAny<CancellationToken>()), Times.Exactly(2));
}

[TestMethod]
public async Task RetrierGetsCalledOn413_OnWrite()
public async Task RetrierGetsCalledOn413_NoSubstatus()
{
IDocumentClientRetryPolicy retryPolicy1 = new BulkExecutionRetryPolicy(
GetSplitEnabledContainer(),
OperationType.Create,
OperationType.Read,
new ResourceThrottleRetryPolicy(1));

IDocumentClientRetryPolicy retryPolicy2 = new BulkExecutionRetryPolicy(
GetSplitEnabledContainer(),
OperationType.Read,
new ResourceThrottleRetryPolicy(1));

IDocumentClientRetryPolicy retryPolicy3 = new BulkExecutionRetryPolicy(
GetSplitEnabledContainer(),
OperationType.Create,
new ResourceThrottleRetryPolicy(1));

ItemBatchOperation operation1 = this.CreateItemBatchOperation();
ItemBatchOperation operation2 = this.CreateItemBatchOperation();
ItemBatchOperation operation3 = this.CreateItemBatchOperation();
operation1.AttachContext(new ItemBatchOperationContext(string.Empty, NoOpTrace.Singleton, retryPolicy1));
operation2.AttachContext(new ItemBatchOperationContext(string.Empty, NoOpTrace.Singleton, retryPolicy2));
operation3.AttachContext(new ItemBatchOperationContext(string.Empty, NoOpTrace.Singleton, retryPolicy3));

Mock<BatchAsyncBatcherRetryDelegate> retryDelegate = new Mock<BatchAsyncBatcherRetryDelegate>();

BatchAsyncBatcher batchAsyncBatcher = new BatchAsyncBatcher(2, 1000, MockCosmosUtil.Serializer, this.ExecutorWith413, retryDelegate.Object, BatchAsyncBatcherTests.MockClientContext());
BatchAsyncBatcher batchAsyncBatcher = new BatchAsyncBatcher(3, 1000, MockCosmosUtil.Serializer, this.ExecutorWith413, retryDelegate.Object, BatchAsyncBatcherTests.MockClientContext());
Assert.IsTrue(batchAsyncBatcher.TryAdd(operation1));
Assert.IsTrue(batchAsyncBatcher.TryAdd(operation2));
Assert.IsTrue(batchAsyncBatcher.TryAdd(operation3));
await batchAsyncBatcher.DispatchAsync(metric);
retryDelegate.Verify(a => a(It.Is<ItemBatchOperation>(o => o == operation1), It.IsAny<CancellationToken>()), Times.Never);
retryDelegate.Verify(a => a(It.Is<ItemBatchOperation>(o => o == operation2), It.IsAny<CancellationToken>()), Times.Never);
retryDelegate.Verify(a => a(It.Is<ItemBatchOperation>(o => o == operation3), It.IsAny<CancellationToken>()), Times.Never);
retryDelegate.Verify(a => a(It.IsAny<ItemBatchOperation>(), It.IsAny<CancellationToken>()), Times.Never);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,21 +202,21 @@ public async Task ShouldRetry_WithPolicy_On429()
}

[TestMethod]
public async Task ShouldRetry_WithPolicy_On413_OnRead()
public async Task ShouldRetry_WithPolicy_On413_3402()
{
IDocumentClientRetryPolicy retryPolicy = new BulkExecutionRetryPolicy(
Mock.Of<ContainerInternal>(),
OperationType.Read,
new ResourceThrottleRetryPolicy(1));
TransactionalBatchOperationResult result = new TransactionalBatchOperationResult(HttpStatusCode.RequestEntityTooLarge);
TransactionalBatchOperationResult result = new TransactionalBatchOperationResult(HttpStatusCode.RequestEntityTooLarge) { SubStatusCode = (SubStatusCodes)3402 };
ItemBatchOperation operation = new ItemBatchOperation(OperationType.Create, 0, Cosmos.PartitionKey.Null);
operation.AttachContext(new ItemBatchOperationContext(string.Empty, NoOpTrace.Singleton, retryPolicy));
ShouldRetryResult shouldRetryResult = await operation.Context.ShouldRetryAsync(result, default);
Assert.IsTrue(shouldRetryResult.ShouldRetry);
}

[TestMethod]
public async Task ShouldRetry_WithPolicy_On413_OnWrite()
public async Task ShouldRetry_WithPolicy_On413_0()
{
IDocumentClientRetryPolicy retryPolicy = new BulkExecutionRetryPolicy(
Mock.Of<ContainerInternal>(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,20 +45,20 @@ public async Task RetriesOn429()
}

[TestMethod]
public async Task RetriesOn413_OnRead()
public async Task RetriesOn413_3204()
{
IDocumentClientRetryPolicy retryPolicy = new BulkExecutionRetryPolicy(
Mock.Of<ContainerInternal>(),
OperationType.Read,
new ResourceThrottleRetryPolicy(1));

TransactionalBatchOperationResult result = new TransactionalBatchOperationResult(HttpStatusCode.RequestEntityTooLarge);
TransactionalBatchOperationResult result = new TransactionalBatchOperationResult(HttpStatusCode.RequestEntityTooLarge) { SubStatusCode = (SubStatusCodes)3402 };
ShouldRetryResult shouldRetryResult = await retryPolicy.ShouldRetryAsync(result.ToResponseMessage(), default);
Assert.IsTrue(shouldRetryResult.ShouldRetry);
}

[TestMethod]
public async Task RetriesOn413_OnWrite()
public async Task RetriesOn413_0()
{
IDocumentClientRetryPolicy retryPolicy = new BulkExecutionRetryPolicy(
Mock.Of<ContainerInternal>(),
Expand Down