Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Internal Pagination: Fixes APIs to be cleaner. #2144

Merged
merged 26 commits into from
Feb 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
79d3929
wired through request options
bchong95 Jan 21, 2021
f7d25e7
wiring up additonal headers through response page
bchong95 Jan 22, 2021
db7672d
resolved iteration comments
bchong95 Jan 22, 2021
a7f9c25
Merge branch 'master' into users/brchon/Pagination/Options
bchong95 Jan 23, 2021
a9cfeb1
fixed infinite loop typo
bchong95 Jan 29, 2021
ce68d5c
fixed some tests
bchong95 Jan 29, 2021
3a35316
Merge branch 'master' into users/brchon/Pagination/Options
sboshra Jan 30, 2021
050d108
fixed tests
bchong95 Jan 30, 2021
a1ce10b
Merge branch 'users/brchon/Pagination/Options' of https://github.com/…
bchong95 Jan 30, 2021
996c66e
wired up options
bchong95 Jan 30, 2021
4934040
fixed tests
bchong95 Jan 30, 2021
d1f0b9f
fixed some infinite loops
bchong95 Jan 31, 2021
bd42e6e
added tests
bchong95 Feb 1, 2021
059bea1
removed unused test
bchong95 Feb 1, 2021
454e2dc
utfanystring
bchong95 Feb 1, 2021
65c4927
Merge branch 'master' into users/brchon/Pagination/Options
bchong95 Feb 3, 2021
188539f
taking the union
bchong95 Feb 3, 2021
210ad70
seperated out the headers
bchong95 Feb 3, 2021
3683e97
did the same for ChangeFeedIteratorCore
bchong95 Feb 3, 2021
7d0a8b4
enabled stream iterator also
bchong95 Feb 3, 2021
9041f35
Merge branch 'master' into users/brchon/Pagination/Options
bchong95 Feb 3, 2021
747257d
page size limit
bchong95 Feb 4, 2021
05f0c2b
Merge branch 'users/brchon/Pagination/Options' of https://github.com/…
bchong95 Feb 4, 2021
f13dce2
fixed CF processor headers
bchong95 Feb 4, 2021
b4d25de
Merge branch 'master' into users/brchon/Pagination/Options
sboshra Feb 5, 2021
34a3b9d
Merge branch 'master' into users/brchon/Pagination/Options
sboshra Feb 5, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,39 +10,39 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed
using Microsoft.Azure.Cosmos.ChangeFeed.Pagination;
using Microsoft.Azure.Cosmos.Pagination;
using Microsoft.Azure.Cosmos.Query.Core.Monads;
using Microsoft.Azure.Cosmos.Serializer;

internal sealed class ChangeFeedCrossFeedRangeAsyncEnumerable : IAsyncEnumerable<TryCatch<ChangeFeedPage>>
{
private readonly IDocumentContainer documentContainer;
private readonly ChangeFeedMode changeFeedMode;
private readonly ChangeFeedRequestOptions changeFeedRequestOptions;
private readonly ChangeFeedPaginationOptions changeFeedPaginationOptions;
private readonly ChangeFeedCrossFeedRangeState state;
private readonly JsonSerializationFormatOptions jsonSerializationFormatOptions;

public ChangeFeedCrossFeedRangeAsyncEnumerable(
IDocumentContainer documentContainer,
ChangeFeedMode changeFeedMode,
ChangeFeedRequestOptions changeFeedRequestOptions,
ChangeFeedCrossFeedRangeState state)
ChangeFeedCrossFeedRangeState state,
ChangeFeedPaginationOptions changeFeedPaginationOptions,
JsonSerializationFormatOptions jsonSerializationFormatOptions = null)
{
this.documentContainer = documentContainer ?? throw new ArgumentNullException(nameof(documentContainer));
this.changeFeedMode = changeFeedMode ?? throw new ArgumentNullException(nameof(changeFeedMode));
this.changeFeedRequestOptions = changeFeedRequestOptions;
this.changeFeedPaginationOptions = changeFeedPaginationOptions ?? ChangeFeedPaginationOptions.Default;
this.state = state;
this.jsonSerializationFormatOptions = jsonSerializationFormatOptions;
bchong95 marked this conversation as resolved.
Show resolved Hide resolved
}

public IAsyncEnumerator<TryCatch<ChangeFeedPage>> GetAsyncEnumerator(CancellationToken cancellationToken = default)
{
CrossFeedRangeState<ChangeFeedState> innerState = new CrossFeedRangeState<ChangeFeedState>(this.state.FeedRangeStates);
CrossPartitionChangeFeedAsyncEnumerator innerEnumerator = CrossPartitionChangeFeedAsyncEnumerator.Create(
this.documentContainer,
this.changeFeedMode,
this.changeFeedRequestOptions,
innerState,
this.changeFeedPaginationOptions,
cancellationToken);

return new ChangeFeedCrossFeedRangeAsyncEnumerator(
innerEnumerator,
this.changeFeedRequestOptions?.JsonSerializationFormatOptions);
innerEnumerator,
this.jsonSerializationFormatOptions);
}
}
}
33 changes: 30 additions & 3 deletions Microsoft.Azure.Cosmos/src/ChangeFeed/ChangeFeedIteratorCore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ internal sealed class ChangeFeedIteratorCore : FeedIteratorInternal
public ChangeFeedIteratorCore(
IDocumentContainer documentContainer,
ChangeFeedMode changeFeedMode,
ChangeFeedRequestOptions changeFeedRequestOptions,
ChangeFeedRequestOptions changeFeedRequestOptions,
ChangeFeedStartFrom changeFeedStartFrom)
{
if (changeFeedStartFrom == null)
Expand Down Expand Up @@ -153,11 +153,38 @@ public ChangeFeedIteratorCore(
innerException: monadicChangeFeedCrossFeedRangeState.Exception));
}

Dictionary<string, string> additionalHeaders;
if (changeFeedRequestOptions?.Properties != null)
{
additionalHeaders = new Dictionary<string, string>();
Dictionary<string, object> nonStringHeaders = new Dictionary<string, object>();
foreach (KeyValuePair<string, object> keyValuePair in changeFeedRequestOptions.Properties)
{
if (keyValuePair.Value is string stringValue)
{
additionalHeaders[keyValuePair.Key] = stringValue;
vivekr20 marked this conversation as resolved.
Show resolved Hide resolved
}
else
{
nonStringHeaders[keyValuePair.Key] = keyValuePair.Value;
}
}

changeFeedRequestOptions.Properties = nonStringHeaders;
}
else
{
additionalHeaders = null;
}

CrossPartitionChangeFeedAsyncEnumerator enumerator = CrossPartitionChangeFeedAsyncEnumerator.Create(
documentContainer,
changeFeedMode,
changeFeedRequestOptions,
new CrossFeedRangeState<ChangeFeedState>(monadicChangeFeedCrossFeedRangeState.Result.FeedRangeStates),
new ChangeFeedPaginationOptions(
changeFeedMode,
changeFeedRequestOptions?.PageSizeHint,
changeFeedRequestOptions?.JsonSerializationFormatOptions?.JsonSerializationFormat,
additionalHeaders),
cancellationToken: default);

TryCatch<CrossPartitionChangeFeedAsyncEnumerator> monadicEnumerator = TryCatch<CrossPartitionChangeFeedAsyncEnumerator>.FromResult(enumerator);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
namespace Microsoft.Azure.Cosmos.ChangeFeed
{
using System;
using System.Globalization;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.ChangeFeed.LeaseManagement;
Expand Down Expand Up @@ -106,6 +107,17 @@ public override async Task<ResponseMessage> ReadNextAsync(ITrace trace, Cancella
// Set time headers if any
ChangeFeedStartFromRequestOptionPopulator visitor = new ChangeFeedStartFromRequestOptionPopulator(requestMessage);
this.changeFeedStartFrom.Accept(visitor);

if (this.changeFeedOptions.PageSizeHint.HasValue)
{
requestMessage.Headers.Add(
HttpConstants.HttpHeaders.PageSize,
this.changeFeedOptions.PageSizeHint.Value.ToString(CultureInfo.InvariantCulture));
}

requestMessage.Headers.Add(
HttpConstants.HttpHeaders.A_IM,
HttpConstants.A_IMHeaderValues.IncrementalFeed);
},
feedRange: this.changeFeedStartFrom.FeedRange,
streamPayload: default,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,22 @@

namespace Microsoft.Azure.Cosmos.ChangeFeed.Pagination
{
using System.Collections.Generic;
using System.Collections.Immutable;

internal sealed class ChangeFeedNotModifiedPage : ChangeFeedPage
{
private static readonly ImmutableHashSet<string> bannedHeaders = new HashSet<string>().ToImmutableHashSet();

public ChangeFeedNotModifiedPage(
double requestCharge,
string activityId,
IReadOnlyDictionary<string, string> additionalHeaders,
ChangeFeedState state)
: base(requestCharge, activityId, state)
: base(requestCharge, activityId, additionalHeaders, state)
{
}

protected override ImmutableHashSet<string> DerivedClassBannedHeaders => bannedHeaders;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,28 @@

namespace Microsoft.Azure.Cosmos.ChangeFeed.Pagination
{
using System;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Linq;
using Microsoft.Azure.Cosmos.Pagination;
using Microsoft.Azure.Documents;

internal abstract class ChangeFeedPage : Page<ChangeFeedState>
{
public static readonly ImmutableHashSet<string> BannedHeaders = new HashSet<string>()
{
HttpConstants.HttpHeaders.ETag,
}
.Concat(Page<ChangeFeedState>.BannedHeadersBase)
.ToImmutableHashSet();

protected ChangeFeedPage(
double requestCharge,
string activityId,
IReadOnlyDictionary<string, string> additionalHeaders,
bchong95 marked this conversation as resolved.
Show resolved Hide resolved
ChangeFeedState state)
: base(state)
: base(requestCharge, activityId, additionalHeaders, state)
{
this.RequestCharge = requestCharge < 0 ? throw new ArgumentOutOfRangeException(nameof(requestCharge)) : requestCharge;
this.ActivityId = activityId ?? throw new ArgumentNullException(nameof(activityId));
}

public double RequestCharge { get; }

public string ActivityId { get; }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// ------------------------------------------------------------

namespace Microsoft.Azure.Cosmos.ChangeFeed.Pagination
{
using System;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Linq;
using Microsoft.Azure.Cosmos.Json;
using Microsoft.Azure.Cosmos.Pagination;
using Microsoft.Azure.Documents;

internal sealed class ChangeFeedPaginationOptions : PaginationOptions
{
public static readonly ChangeFeedPaginationOptions Default = new ChangeFeedPaginationOptions(
mode: ChangeFeedMode.Incremental);

public static readonly ImmutableHashSet<string> BannedHeaders = new HashSet<string>()
{
HttpConstants.HttpHeaders.A_IM,
HttpConstants.HttpHeaders.IfModifiedSince,
HttpConstants.HttpHeaders.IfNoneMatch,
}
.Concat(PaginationOptions.bannedAdditionalHeaders)
.ToImmutableHashSet();

public ChangeFeedPaginationOptions(
ChangeFeedMode mode,
int? pageSizeHint = null,
JsonSerializationFormat? jsonSerializationFormat = null,
Dictionary<string, string> additionalHeaders = null)
: base(pageSizeHint, jsonSerializationFormat, additionalHeaders)
{
this.Mode = mode ?? throw new ArgumentNullException(nameof(mode));
}

public ChangeFeedMode Mode { get; }

protected override ImmutableHashSet<string> BannedAdditionalHeaders => BannedHeaders;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,45 +7,33 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed.Pagination
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.Json;
using Microsoft.Azure.Cosmos.Pagination;
using Microsoft.Azure.Cosmos.Query.Core.Monads;
using Microsoft.Azure.Cosmos.Tracing;
using Microsoft.Azure.Documents;

internal sealed class ChangeFeedPartitionRangePageAsyncEnumerator : PartitionRangePageAsyncEnumerator<ChangeFeedPage, ChangeFeedState>
{
private readonly IChangeFeedDataSource changeFeedDataSource;
private readonly int pageSize;
private readonly ChangeFeedMode changeFeedMode;
private readonly JsonSerializationFormat? jsonSerializationFormat;
private readonly ChangeFeedPaginationOptions changeFeedPaginationOptions;

public ChangeFeedPartitionRangePageAsyncEnumerator(
IChangeFeedDataSource changeFeedDataSource,
FeedRangeInternal range,
int pageSize,
ChangeFeedMode changeFeedMode,
JsonSerializationFormat? jsonSerializationFormat,
ChangeFeedState state,
FeedRangeState<ChangeFeedState> feedRangeState,
ChangeFeedPaginationOptions changeFeedPaginationOptions,
CancellationToken cancellationToken)
: base(range, cancellationToken, state)
: base(feedRangeState, cancellationToken)
{
this.changeFeedDataSource = changeFeedDataSource ?? throw new ArgumentNullException(nameof(changeFeedDataSource));
this.changeFeedMode = changeFeedMode ?? throw new ArgumentNullException(nameof(changeFeedMode));
this.pageSize = pageSize;
this.jsonSerializationFormat = jsonSerializationFormat;
this.changeFeedPaginationOptions = changeFeedPaginationOptions ?? throw new ArgumentNullException(nameof(changeFeedPaginationOptions));
}

public override ValueTask DisposeAsync() => default;

protected override Task<TryCatch<ChangeFeedPage>> GetNextPageAsync(
ITrace trace,
CancellationToken cancellationToken) => this.changeFeedDataSource.MonadicChangeFeedAsync(
this.State,
this.Range,
this.pageSize,
this.changeFeedMode,
this.jsonSerializationFormat,
this.FeedRangeState,
this.changeFeedPaginationOptions,
trace,
cancellationToken);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,27 @@
namespace Microsoft.Azure.Cosmos.ChangeFeed.Pagination
{
using System;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.IO;

internal sealed class ChangeFeedSuccessPage : ChangeFeedPage
{
private static readonly ImmutableHashSet<string> bannedHeaders = new HashSet<string>().ToImmutableHashSet();

public ChangeFeedSuccessPage(
Stream content,
double requestCharge,
string activityId,
IReadOnlyDictionary<string, string> additionalHeaders,
ChangeFeedState state)
: base(requestCharge, activityId, state)
: base(requestCharge, activityId, additionalHeaders, state)
{
this.Content = content ?? throw new ArgumentNullException(nameof(content));
}

public Stream Content { get; }

protected override ImmutableHashSet<string> DerivedClassBannedHeaders => bannedHeaders;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed.Pagination
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.Json;
using Microsoft.Azure.Cosmos.Pagination;
using Microsoft.Azure.Cosmos.Query.Core.Monads;
using Microsoft.Azure.Cosmos.Tracing;
Expand Down Expand Up @@ -107,13 +106,15 @@ public async ValueTask<bool> MoveNextAsync(ITrace trace)
changeFeedSuccessPage.Content,
totalRequestCharge,
changeFeedSuccessPage.ActivityId,
changeFeedSuccessPage.AdditionalHeaders,
changeFeedSuccessPage.State);
}
else
{
backendPage = new ChangeFeedNotModifiedPage(
totalRequestCharge,
backendPage.ActivityId,
backendPage.AdditionalHeaders,
backendPage.State);
}
}
Expand All @@ -130,30 +131,22 @@ public async ValueTask<bool> MoveNextAsync(ITrace trace)

public static CrossPartitionChangeFeedAsyncEnumerator Create(
IDocumentContainer documentContainer,
ChangeFeedMode changeFeedMode,
ChangeFeedRequestOptions changeFeedRequestOptions,
CrossFeedRangeState<ChangeFeedState> state,
ChangeFeedPaginationOptions changeFeedPaginationOptions,
CancellationToken cancellationToken)
{
changeFeedRequestOptions ??= new ChangeFeedRequestOptions();
changeFeedPaginationOptions ??= ChangeFeedPaginationOptions.Default;

if (documentContainer == null)
{
throw new ArgumentNullException(nameof(documentContainer));
}

if (changeFeedMode == null)
{
throw new ArgumentNullException(nameof(changeFeedMode));
}

CrossPartitionRangePageAsyncEnumerator<ChangeFeedPage, ChangeFeedState> crossPartitionEnumerator = new CrossPartitionRangePageAsyncEnumerator<ChangeFeedPage, ChangeFeedState>(
documentContainer,
CrossPartitionChangeFeedAsyncEnumerator.MakeCreateFunction(
documentContainer,
changeFeedRequestOptions.PageSizeHint.GetValueOrDefault(int.MaxValue),
changeFeedMode,
changeFeedRequestOptions?.JsonSerializationFormatOptions?.JsonSerializationFormat,
changeFeedPaginationOptions,
cancellationToken),
comparer: default /* this uses a regular queue instead of prioirty queue */,
maxConcurrency: default,
Expand All @@ -169,16 +162,11 @@ public static CrossPartitionChangeFeedAsyncEnumerator Create(

private static CreatePartitionRangePageAsyncEnumerator<ChangeFeedPage, ChangeFeedState> MakeCreateFunction(
IChangeFeedDataSource changeFeedDataSource,
int pageSize,
ChangeFeedMode changeFeedMode,
JsonSerializationFormat? jsonSerializationFormat,
CancellationToken cancellationToken) => (FeedRangeInternal range, ChangeFeedState state) => new ChangeFeedPartitionRangePageAsyncEnumerator(
ChangeFeedPaginationOptions changeFeedPaginationOptions,
CancellationToken cancellationToken) => (FeedRangeState<ChangeFeedState> feedRangeState) => new ChangeFeedPartitionRangePageAsyncEnumerator(
changeFeedDataSource,
range,
pageSize,
changeFeedMode,
jsonSerializationFormat,
state,
feedRangeState,
changeFeedPaginationOptions,
cancellationToken);
}
}
Loading