Skip to content

Commit

Permalink
Merge branch 'master' into users/ealsur/batchsizeretry
Browse files Browse the repository at this point in the history
  • Loading branch information
ealsur committed May 24, 2021
2 parents 2a425ea + 71db82c commit e7e675b
Show file tree
Hide file tree
Showing 5 changed files with 131 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<LangVersion>latest</LangVersion>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Azure.Cosmos.Encryption" Version="1.0.0-previewV14" />
<PackageReference Include="Microsoft.Azure.Cosmos.Encryption" Version="1.0.0-previewV15" />
<PackageReference Include="Microsoft.Data.Encryption.AzureKeyVaultProvider" Version="0.2.0-pre" />
<PackageReference Include="Microsoft.Extensions.Configuration" Version="2.2.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.FileExtensions" Version="2.2.0" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,8 @@ private async Task<ResponseMessage> ReadNextInternalAsync(ITrace trace, Cancella
}

CrossPartitionChangeFeedAsyncEnumerator enumerator = monadicEnumerator.Result;
enumerator.SetCancellationToken(cancellationToken);

try
{
if (!await enumerator.MoveNextAsync(trace))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed.Pagination
internal sealed class CrossPartitionChangeFeedAsyncEnumerator : IAsyncEnumerator<TryCatch<CrossFeedRangePage<ChangeFeedPage, ChangeFeedState>>>
{
private readonly CrossPartitionRangePageAsyncEnumerator<ChangeFeedPage, ChangeFeedState> crossPartitionEnumerator;
private readonly CancellationToken cancellationToken;
private CancellationToken cancellationToken;
private TryCatch<CrossFeedRangePage<ChangeFeedPage, ChangeFeedState>>? bufferedException;

private CrossPartitionChangeFeedAsyncEnumerator(
Expand Down Expand Up @@ -133,6 +133,12 @@ public async ValueTask<bool> MoveNextAsync(ITrace trace)
}
}

public void SetCancellationToken(CancellationToken cancellationToken)
{
this.cancellationToken = cancellationToken;
this.crossPartitionEnumerator.SetCancellationToken(cancellationToken);
}

public static CrossPartitionChangeFeedAsyncEnumerator Create(
IDocumentContainer documentContainer,
CrossFeedRangeState<ChangeFeedState> state,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -734,6 +734,78 @@ public async Task ChangeFeedIteratorCore_WithFullFidelity()
Assert.IsTrue(hasDeletes, "No metadata for delete operationType found");
}

[TestMethod]
public async Task TestCancellationTokenAsync()
{
CancellationTokenRequestHandler cancellationTokenHandler = new CancellationTokenRequestHandler();

ContainerInternal itemsCore = await this.InitializeContainerAsync();
await this.CreateRandomItems(itemsCore, 100, randomPartitionKey: true);

// Inject validating handler
RequestHandler currentInnerHandler = this.cosmosClient.RequestHandler.InnerHandler;
this.cosmosClient.RequestHandler.InnerHandler = cancellationTokenHandler;
cancellationTokenHandler.InnerHandler = currentInnerHandler;

{
// Test to see if the token flows to the pipeline
CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
ChangeFeedIteratorCore feedIterator = itemsCore.GetChangeFeedStreamIterator(
ChangeFeedStartFrom.Beginning(),
ChangeFeedMode.Incremental) as ChangeFeedIteratorCore;
await feedIterator.ReadNextAsync(cancellationTokenSource.Token);
Assert.AreEqual(cancellationTokenSource.Token, cancellationTokenHandler.LastUsedToken, "The token passed did not reach the pipeline");
}

// See if cancellation token is honored for first request
try
{
CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
cancellationTokenSource.Cancel();
ChangeFeedIteratorCore feedIterator = itemsCore.GetChangeFeedStreamIterator(
ChangeFeedStartFrom.Beginning(),
ChangeFeedMode.Incremental) as ChangeFeedIteratorCore;
await feedIterator.ReadNextAsync(cancellationTokenSource.Token);

Assert.Fail("Expected exception.");
}
catch (OperationCanceledException)
{
}

// See if cancellation token is honored for second request
try
{
CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
cancellationTokenSource.Cancel();
ChangeFeedIteratorCore feedIterator = itemsCore.GetChangeFeedStreamIterator(
ChangeFeedStartFrom.Beginning(),
ChangeFeedMode.Incremental) as ChangeFeedIteratorCore;
await feedIterator.ReadNextAsync();
await feedIterator.ReadNextAsync(cancellationTokenSource.Token);
Assert.Fail("Expected exception.");
}
catch (OperationCanceledException)
{
}

// See if cancellation token is honored mid draining
try
{
CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
ChangeFeedIteratorCore feedIterator = itemsCore.GetChangeFeedStreamIterator(
ChangeFeedStartFrom.Beginning(),
ChangeFeedMode.Incremental) as ChangeFeedIteratorCore;
await feedIterator.ReadNextAsync(cancellationTokenSource.Token);
cancellationTokenSource.Cancel();
await feedIterator.ReadNextAsync(cancellationTokenSource.Token);
Assert.Fail("Expected exception.");
}
catch (OperationCanceledException)
{
}
}

private async Task<IList<ToDoActivity>> CreateRandomItems(ContainerInternal container, int pkCount, int perPKItemCount = 1, bool randomPartitionKey = true)
{
Assert.IsFalse(!randomPartitionKey && perPKItemCount > 1);
Expand Down Expand Up @@ -771,5 +843,16 @@ public class ToDoActivityMetadata
[JsonProperty("operationType")]
public string operationType { get; set; }
}

private class CancellationTokenRequestHandler : RequestHandler
{
public CancellationToken LastUsedToken { get; private set; }

public override Task<ResponseMessage> SendAsync(RequestMessage request, CancellationToken cancellationToken)
{
this.LastUsedToken = cancellationToken;
return base.SendAsync(request, cancellationToken);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ namespace Microsoft.Azure.Cosmos.Tests.FeedRange
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.ChangeFeed;
using Microsoft.Azure.Cosmos.CosmosElements;
using Microsoft.Azure.Cosmos.Json.Interop;
using Microsoft.Azure.Cosmos.Pagination;
using Microsoft.Azure.Cosmos.Query.Core.Monads;
using Microsoft.Azure.Cosmos.Resource.CosmosExceptions;
Expand Down Expand Up @@ -250,8 +249,6 @@ public async Task ChangeFeedIteratorCore_OnRetriableCosmosException_HasMoreResul
ChangeFeedStartFrom.Beginning(),
this.MockClientContext());

CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
cancellationTokenSource.Cancel();
ResponseMessage responseMessage = await changeFeedIteratorCore.ReadNextAsync();
Assert.AreEqual(HttpStatusCode.TooManyRequests, responseMessage.StatusCode);
Assert.IsTrue(changeFeedIteratorCore.HasMoreResults);
Expand Down Expand Up @@ -366,6 +363,44 @@ public async Task ChangeFeedIteratorCore_OnUnhandledException_HasMoreResults()
}
}

[TestMethod]
public async Task ChangeFeedIteratorCore_CancellationToken_FlowsThrough()
{
// Generate constant 429
CosmosException exception = CosmosExceptionFactory.CreateThrottledException("retry", new Headers());
IDocumentContainer documentContainer = await CreateDocumentContainerAsync(
numItems: 0,
failureConfigs: new FlakyDocumentContainer.FailureConfigs(
inject429s: false,
injectEmptyPages: false,
returnFailure: exception));

ChangeFeedIteratorCore changeFeedIteratorCore = new ChangeFeedIteratorCore(
documentContainer,
ChangeFeedMode.Incremental,
new ChangeFeedRequestOptions(),
ChangeFeedStartFrom.Beginning(),
this.MockClientContext());

CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
cancellationTokenSource.Cancel();
try
{
// First request triggers initialization, we don't cancel it
ResponseMessage responseMessage = await changeFeedIteratorCore.ReadNextAsync();
Assert.AreEqual(HttpStatusCode.TooManyRequests, responseMessage.StatusCode);

// Should be initialized, let's see if cancellation flows through
await changeFeedIteratorCore.ReadNextAsync(cancellationTokenSource.Token);
Assert.Fail("Should have thrown");
}
catch (OperationCanceledException)
{
}

Assert.IsTrue(changeFeedIteratorCore.HasMoreResults);
}

private static CosmosArray GetChanges(Stream stream)
{
using (MemoryStream memoryStream = new MemoryStream())
Expand Down

0 comments on commit e7e675b

Please sign in to comment.