Skip to content

Commit

Permalink
Change Feed: Fixes CancellationToken support on ChangeFeedIterator (#…
Browse files Browse the repository at this point in the history
…2490)

* wiring

* tests

* fixing flow

* more tests

* validating pipeline

* With reset

* Polishing test to verify success case

Co-authored-by: j82w <j82w@users.noreply.github.com>
  • Loading branch information
ealsur and j82w authored May 24, 2021
1 parent fe1e997 commit 71db82c
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,8 @@ private async Task<ResponseMessage> ReadNextInternalAsync(ITrace trace, Cancella
}

CrossPartitionChangeFeedAsyncEnumerator enumerator = monadicEnumerator.Result;
enumerator.SetCancellationToken(cancellationToken);

try
{
if (!await enumerator.MoveNextAsync(trace))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed.Pagination
internal sealed class CrossPartitionChangeFeedAsyncEnumerator : IAsyncEnumerator<TryCatch<CrossFeedRangePage<ChangeFeedPage, ChangeFeedState>>>
{
private readonly CrossPartitionRangePageAsyncEnumerator<ChangeFeedPage, ChangeFeedState> crossPartitionEnumerator;
private readonly CancellationToken cancellationToken;
private CancellationToken cancellationToken;
private TryCatch<CrossFeedRangePage<ChangeFeedPage, ChangeFeedState>>? bufferedException;

private CrossPartitionChangeFeedAsyncEnumerator(
Expand Down Expand Up @@ -133,6 +133,12 @@ public async ValueTask<bool> MoveNextAsync(ITrace trace)
}
}

public void SetCancellationToken(CancellationToken cancellationToken)
{
this.cancellationToken = cancellationToken;
this.crossPartitionEnumerator.SetCancellationToken(cancellationToken);
}

public static CrossPartitionChangeFeedAsyncEnumerator Create(
IDocumentContainer documentContainer,
CrossFeedRangeState<ChangeFeedState> state,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -734,6 +734,78 @@ public async Task ChangeFeedIteratorCore_WithFullFidelity()
Assert.IsTrue(hasDeletes, "No metadata for delete operationType found");
}

[TestMethod]
public async Task TestCancellationTokenAsync()
{
CancellationTokenRequestHandler cancellationTokenHandler = new CancellationTokenRequestHandler();

ContainerInternal itemsCore = await this.InitializeContainerAsync();
await this.CreateRandomItems(itemsCore, 100, randomPartitionKey: true);

// Inject validating handler
RequestHandler currentInnerHandler = this.cosmosClient.RequestHandler.InnerHandler;
this.cosmosClient.RequestHandler.InnerHandler = cancellationTokenHandler;
cancellationTokenHandler.InnerHandler = currentInnerHandler;

{
// Test to see if the token flows to the pipeline
CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
ChangeFeedIteratorCore feedIterator = itemsCore.GetChangeFeedStreamIterator(
ChangeFeedStartFrom.Beginning(),
ChangeFeedMode.Incremental) as ChangeFeedIteratorCore;
await feedIterator.ReadNextAsync(cancellationTokenSource.Token);
Assert.AreEqual(cancellationTokenSource.Token, cancellationTokenHandler.LastUsedToken, "The token passed did not reach the pipeline");
}

// See if cancellation token is honored for first request
try
{
CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
cancellationTokenSource.Cancel();
ChangeFeedIteratorCore feedIterator = itemsCore.GetChangeFeedStreamIterator(
ChangeFeedStartFrom.Beginning(),
ChangeFeedMode.Incremental) as ChangeFeedIteratorCore;
await feedIterator.ReadNextAsync(cancellationTokenSource.Token);

Assert.Fail("Expected exception.");
}
catch (OperationCanceledException)
{
}

// See if cancellation token is honored for second request
try
{
CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
cancellationTokenSource.Cancel();
ChangeFeedIteratorCore feedIterator = itemsCore.GetChangeFeedStreamIterator(
ChangeFeedStartFrom.Beginning(),
ChangeFeedMode.Incremental) as ChangeFeedIteratorCore;
await feedIterator.ReadNextAsync();
await feedIterator.ReadNextAsync(cancellationTokenSource.Token);
Assert.Fail("Expected exception.");
}
catch (OperationCanceledException)
{
}

// See if cancellation token is honored mid draining
try
{
CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
ChangeFeedIteratorCore feedIterator = itemsCore.GetChangeFeedStreamIterator(
ChangeFeedStartFrom.Beginning(),
ChangeFeedMode.Incremental) as ChangeFeedIteratorCore;
await feedIterator.ReadNextAsync(cancellationTokenSource.Token);
cancellationTokenSource.Cancel();
await feedIterator.ReadNextAsync(cancellationTokenSource.Token);
Assert.Fail("Expected exception.");
}
catch (OperationCanceledException)
{
}
}

private async Task<IList<ToDoActivity>> CreateRandomItems(ContainerInternal container, int pkCount, int perPKItemCount = 1, bool randomPartitionKey = true)
{
Assert.IsFalse(!randomPartitionKey && perPKItemCount > 1);
Expand Down Expand Up @@ -771,5 +843,16 @@ public class ToDoActivityMetadata
[JsonProperty("operationType")]
public string operationType { get; set; }
}

private class CancellationTokenRequestHandler : RequestHandler
{
public CancellationToken LastUsedToken { get; private set; }

public override Task<ResponseMessage> SendAsync(RequestMessage request, CancellationToken cancellationToken)
{
this.LastUsedToken = cancellationToken;
return base.SendAsync(request, cancellationToken);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ namespace Microsoft.Azure.Cosmos.Tests.FeedRange
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.ChangeFeed;
using Microsoft.Azure.Cosmos.CosmosElements;
using Microsoft.Azure.Cosmos.Json.Interop;
using Microsoft.Azure.Cosmos.Pagination;
using Microsoft.Azure.Cosmos.Query.Core.Monads;
using Microsoft.Azure.Cosmos.Resource.CosmosExceptions;
Expand Down Expand Up @@ -250,8 +249,6 @@ public async Task ChangeFeedIteratorCore_OnRetriableCosmosException_HasMoreResul
ChangeFeedStartFrom.Beginning(),
this.MockClientContext());

CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
cancellationTokenSource.Cancel();
ResponseMessage responseMessage = await changeFeedIteratorCore.ReadNextAsync();
Assert.AreEqual(HttpStatusCode.TooManyRequests, responseMessage.StatusCode);
Assert.IsTrue(changeFeedIteratorCore.HasMoreResults);
Expand Down Expand Up @@ -366,6 +363,44 @@ public async Task ChangeFeedIteratorCore_OnUnhandledException_HasMoreResults()
}
}

[TestMethod]
public async Task ChangeFeedIteratorCore_CancellationToken_FlowsThrough()
{
// Generate constant 429
CosmosException exception = CosmosExceptionFactory.CreateThrottledException("retry", new Headers());
IDocumentContainer documentContainer = await CreateDocumentContainerAsync(
numItems: 0,
failureConfigs: new FlakyDocumentContainer.FailureConfigs(
inject429s: false,
injectEmptyPages: false,
returnFailure: exception));

ChangeFeedIteratorCore changeFeedIteratorCore = new ChangeFeedIteratorCore(
documentContainer,
ChangeFeedMode.Incremental,
new ChangeFeedRequestOptions(),
ChangeFeedStartFrom.Beginning(),
this.MockClientContext());

CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
cancellationTokenSource.Cancel();
try
{
// First request triggers initialization, we don't cancel it
ResponseMessage responseMessage = await changeFeedIteratorCore.ReadNextAsync();
Assert.AreEqual(HttpStatusCode.TooManyRequests, responseMessage.StatusCode);

// Should be initialized, let's see if cancellation flows through
await changeFeedIteratorCore.ReadNextAsync(cancellationTokenSource.Token);
Assert.Fail("Should have thrown");
}
catch (OperationCanceledException)
{
}

Assert.IsTrue(changeFeedIteratorCore.HasMoreResults);
}

private static CosmosArray GetChanges(Stream stream)
{
using (MemoryStream memoryStream = new MemoryStream())
Expand Down

0 comments on commit 71db82c

Please sign in to comment.