Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ChangeFeedProcessor: Refactors AVAD metadata Contract #4380

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1025,7 +1025,7 @@ public override Task<ResponseMessage> DeleteAllItemsByPartitionKeyStreamAsync(
#if SDKPROJECTREF
public override ChangeFeedProcessorBuilder GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes<T>(
string processorName,
ChangeFeedHandler<ChangeFeedItemChange<T>> onChangesDelegate)
ChangeFeedHandler<ChangeFeedItem<T>> onChangesDelegate)
{
return this.container.GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes(
processorName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -758,7 +758,7 @@ public override Task<IEnumerable<string>> GetPartitionKeyRangesAsync(
#if SDKPROJECTREF
public override ChangeFeedProcessorBuilder GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes<T>(
string processorName,
ChangeFeedHandler<ChangeFeedItemChange<T>> onChangesDelegate)
ChangeFeedHandler<ChangeFeedItem<T>> onChangesDelegate)
{
throw new NotImplementedException();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ internal sealed class ChangeFeedObserverFactoryCore<T> : ChangeFeedObserverFacto
private readonly ChangesHandler<T> legacyOnChanges;
private readonly ChangeFeedHandler<T> onChanges;
private readonly ChangeFeedHandlerWithManualCheckpoint<T> onChangesWithManualCheckpoint;
private readonly ChangeFeedHandler<ChangeFeedItemChange<T>> onAllVersionsAndDeletesChanges;
private readonly ChangeFeedHandler<ChangeFeedItem<T>> onAllVersionsAndDeletesChanges;
private readonly CosmosSerializerCore serializerCore;

public ChangeFeedObserverFactoryCore(
Expand All @@ -72,7 +72,7 @@ public ChangeFeedObserverFactoryCore(
}

public ChangeFeedObserverFactoryCore(
ChangeFeedHandler<ChangeFeedItemChange<T>> onChanges,
ChangeFeedHandler<ChangeFeedItem<T>> onChanges,
CosmosSerializerCore serializerCore)
: this(serializerCore)
{
Expand Down Expand Up @@ -130,7 +130,7 @@ private Task AllVersionsAndDeletesStreamHandlerAsync(
Stream stream,
CancellationToken cancellationToken)
{
IReadOnlyCollection<ChangeFeedItemChange<T>> changes = this.AllVersionsAsIReadOnlyCollection(stream, context);
IReadOnlyCollection<ChangeFeedItem<T>> changes = this.AllVersionsAsIReadOnlyCollection(stream, context);
if (changes.Count == 0)
{
return Task.CompletedTask;
Expand All @@ -156,13 +156,13 @@ private IReadOnlyCollection<T> AsIReadOnlyCollection(
}
}

private IReadOnlyCollection<ChangeFeedItemChange<T>> AllVersionsAsIReadOnlyCollection(
private IReadOnlyCollection<ChangeFeedItem<T>> AllVersionsAsIReadOnlyCollection(
Stream stream,
ChangeFeedObserverContextCore context)
{
try
{
return CosmosFeedResponseSerializer.FromFeedResponseStream<ChangeFeedItemChange<T>>(
return CosmosFeedResponseSerializer.FromFeedResponseStream<ChangeFeedItem<T>>(
this.serializerCore,
stream);
}
Expand Down
6 changes: 3 additions & 3 deletions Microsoft.Azure.Cosmos/src/Resource/Container/Container.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1697,15 +1697,15 @@ public abstract Task<IEnumerable<string>> GetPartitionKeyRangesAsync(
/// ManualResetEvent allProcessedDocumentsEvent = new ManualResetEvent(false);
///
/// ChangeFeedProcessor changeFeedProcessor = this.Container
/// .GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes(processorName: "processor", onChangesDelegate: (ChangeFeedProcessorContext context, IReadOnlyCollection<ChangeFeedItemChange<dynamic>> documents, CancellationToken token) =>
/// .GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes(processorName: "processor", onChangesDelegate: (ChangeFeedProcessorContext context, IReadOnlyCollection<ChangeFeedItem<dynamic>> documents, CancellationToken token) =>
/// {
/// Console.WriteLine($"number of documents processed: {documents.Count}");
///
/// string id = default;
/// string pk = default;
/// string description = default;
///
/// foreach (ChangeFeedItemChange<dynamic> changeFeedItem in documents)
/// foreach (ChangeFeedItem<dynamic> changeFeedItem in documents)
/// {
/// if (changeFeedItem.Metadata.OperationType != ChangeFeedOperationType.Delete)
/// {
Expand Down Expand Up @@ -1754,7 +1754,7 @@ public abstract Task<IEnumerable<string>> GetPartitionKeyRangesAsync(
/// <returns>An instance of <see cref="ChangeFeedProcessorBuilder"/></returns>
public abstract ChangeFeedProcessorBuilder GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes<T>(
string processorName,
ChangeFeedHandler<ChangeFeedItemChange<T>> onChangesDelegate);
ChangeFeedHandler<ChangeFeedItem<T>> onChangesDelegate);
#endif
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1222,7 +1222,7 @@ public Task<ResponseMessage> PatchItemStreamAsync(

public override ChangeFeedProcessorBuilder GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes<T>(
string processorName,
ChangeFeedHandler<ChangeFeedItemChange<T>> onChangesDelegate)
ChangeFeedHandler<ChangeFeedItem<T>> onChangesDelegate)
{
if (processorName == null)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ public abstract Task<IEnumerable<string>> GetPartitionKeyRangesAsync(

public abstract ChangeFeedProcessorBuilder GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes<T>(
string processorName,
ChangeFeedHandler<ChangeFeedItemChange<T>> onChangesDelegate);
ChangeFeedHandler<ChangeFeedItem<T>> onChangesDelegate);
#endif

public abstract class TryExecuteQueryResult
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,19 @@ namespace Microsoft.Azure.Cosmos
/// PartitionKey partitionKey = new PartitionKey(@"learning");
/// ChangeFeedStartFrom changeFeedStartFrom = ChangeFeedStartFrom.Now(FeedRange.FromPartitionKey(partitionKey));
///
/// using (FeedIterator<ChangeFeedItemChanges<ToDoActivity>> feedIterator = container.GetChangeFeedIterator<ChangeFeedItemChanges<ToDoActivity>>(
/// using (FeedIterator<ChangeFeedItem<ToDoActivity>> feedIterator = container.GetChangeFeedIterator<ChangeFeedItemChanges<ToDoActivity>>(
/// changeFeedStartFrom: changeFeedStartFrom,
/// changeFeedMode: changeFeedMode))
/// {
/// while (feedIterator.HasMoreResults)
/// {
/// FeedResponse<ChangeFeedItemChanges<ToDoActivity>> feedResponse = await feedIterator.ReadNextAsync();
/// FeedResponse<ChangeFeedItem<ToDoActivity>> feedResponse = await feedIterator.ReadNextAsync();
///
/// if (feedResponse.StatusCode != HttpStatusCode.NotModified)
/// {
/// IEnumerable<ChangeFeedItemChanges<ToDoActivity>> feedResource = feedResponse.Resource;
/// IEnumerable<ChangeFeedItem<ToDoActivity>> feedResource = feedResponse.Resource;
///
/// foreach(ChangeFeedItemChanges<ToDoActivity> itemChanges in feedResource)
/// foreach(ChangeFeedItem<ToDoActivity> itemChanges in feedResource)
/// {
/// ToDoActivity currentToDoActivity = itemChanges.Current;
/// ToDoActivity previousToDoActivity = itemChanges.Previous;
Expand All @@ -47,13 +47,13 @@ namespace Microsoft.Azure.Cosmos
/// ]]>
/// </code>
/// </example>
/// <remarks><see cref="ChangeFeedItemChange{T}"/> is an optional helper class that uses Newtonsoft serialization libraries. Users are welcome to create their own custom helper class.</remarks>
/// <remarks><see cref="ChangeFeedItem{T}"/> is an optional helper class that uses Newtonsoft serialization libraries. Users are welcome to create their own custom helper class.</remarks>
#if PREVIEW
public
#else
internal
#endif
class ChangeFeedItemChange<T>
class ChangeFeedItem<T>
{
/// <summary>
/// The full fidelity change feed current item.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ namespace Microsoft.Azure.Cosmos
class ChangeFeedMetadata
{
/// <summary>
/// New instance of meta data for <see cref="ChangeFeedItemChange{T}"/> created.
/// New instance of meta data for <see cref="ChangeFeedItem{T}"/> created.
/// </summary>
/// <param name="conflictResolutionTimestamp"></param>
/// <param name="lsn"></param>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ private CosmosSerializer GetSerializer<T>()
string directAssemblyName = typeof(Documents.PartitionKeyRange).Assembly.GetName().Name;
string inputAssemblyName = inputType.Assembly.GetName().Name;
bool inputIsClientOrDirect = string.Equals(inputAssemblyName, clientAssemblyName) || string.Equals(inputAssemblyName, directAssemblyName);
bool typeIsWhiteListed = inputType == typeof(Document) || (inputType.IsGenericType && inputType.GetGenericTypeDefinition() == typeof(ChangeFeedItemChange<>));
bool typeIsWhiteListed = inputType == typeof(Document) || (inputType.IsGenericType && inputType.GetGenericTypeDefinition() == typeof(ChangeFeedItem<>));

if (!typeIsWhiteListed && inputIsClientOrDirect)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,13 @@ public async Task WhenADocumentIsCreatedThenUpdatedThenDeletedTestsAsync()
Exception exception = default;

ChangeFeedProcessor processor = monitoredContainer
.GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes(processorName: "processor", onChangesDelegate: (ChangeFeedProcessorContext context, IReadOnlyCollection<ChangeFeedItemChange<dynamic>> docs, CancellationToken token) =>
.GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes(processorName: "processor", onChangesDelegate: (ChangeFeedProcessorContext context, IReadOnlyCollection<ChangeFeedItem<dynamic>> docs, CancellationToken token) =>
{
string id = default;
string pk = default;
string description = default;

foreach (ChangeFeedItemChange<dynamic> change in docs)
foreach (ChangeFeedItem<dynamic> change in docs)
{
if (change.Metadata.OperationType != ChangeFeedOperationType.Delete)
{
Expand Down Expand Up @@ -75,7 +75,7 @@ public async Task WhenADocumentIsCreatedThenUpdatedThenDeletedTestsAsync()
Assert.IsTrue(context.Diagnostics.ToString().Contains("Change Feed Processor Read Next Async"));
Assert.AreEqual(expected: 3, actual: docs.Count);

ChangeFeedItemChange<dynamic> createChange = docs.ElementAt(0);
ChangeFeedItem<dynamic> createChange = docs.ElementAt(0);
Assert.IsNotNull(createChange.Current);
Assert.AreEqual(expected: "1", actual: createChange.Current.id.ToString());
Assert.AreEqual(expected: "1", actual: createChange.Current.pk.ToString());
Expand All @@ -84,7 +84,7 @@ public async Task WhenADocumentIsCreatedThenUpdatedThenDeletedTestsAsync()
Assert.AreEqual(expected: createChange.Metadata.PreviousLsn, actual: 0);
Assert.IsNull(createChange.Previous);

ChangeFeedItemChange<dynamic> replaceChange = docs.ElementAt(1);
ChangeFeedItem<dynamic> replaceChange = docs.ElementAt(1);
Assert.IsNotNull(replaceChange.Current);
Assert.AreEqual(expected: "1", actual: replaceChange.Current.id.ToString());
Assert.AreEqual(expected: "1", actual: replaceChange.Current.pk.ToString());
Expand All @@ -93,7 +93,7 @@ public async Task WhenADocumentIsCreatedThenUpdatedThenDeletedTestsAsync()
Assert.AreEqual(expected: createChange.Metadata.Lsn, actual: replaceChange.Metadata.PreviousLsn);
Assert.IsNull(replaceChange.Previous);

ChangeFeedItemChange<dynamic> deleteChange = docs.ElementAt(2);
ChangeFeedItem<dynamic> deleteChange = docs.ElementAt(2);
Assert.IsNull(deleteChange.Current.id);
Assert.AreEqual(expected: deleteChange.Metadata.OperationType, actual: ChangeFeedOperationType.Delete);
Assert.AreEqual(expected: replaceChange.Metadata.Lsn, actual: deleteChange.Metadata.PreviousLsn);
Expand Down Expand Up @@ -341,7 +341,7 @@ private static async Task BuildChangeFeedProcessorWithAllVersionsAndDeletesAsync
ChangeFeedProcessor allVersionsAndDeletesProcessorAtomic = null;

ChangeFeedProcessorBuilder allVersionsAndDeletesProcessorBuilder = monitoredContainer
.GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes(processorName: $"processorName", onChangesDelegate: (ChangeFeedProcessorContext context, IReadOnlyCollection<ChangeFeedItemChange<dynamic>> documents, CancellationToken token) => Task.CompletedTask)
.GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes(processorName: $"processorName", onChangesDelegate: (ChangeFeedProcessorContext context, IReadOnlyCollection<ChangeFeedItem<dynamic>> documents, CancellationToken token) => Task.CompletedTask)
.WithInstanceName(Guid.NewGuid().ToString())
.WithMaxItems(1)
.WithLeaseContainer(leaseContainer)
Expand Down
Loading