diff --git a/Microsoft.Azure.Cosmos/src/Batch/BatchAsyncContainerExecutorCache.cs b/Microsoft.Azure.Cosmos/src/Batch/BatchAsyncContainerExecutorCache.cs new file mode 100644 index 0000000000..3ca150d48f --- /dev/null +++ b/Microsoft.Azure.Cosmos/src/Batch/BatchAsyncContainerExecutorCache.cs @@ -0,0 +1,55 @@ +//------------------------------------------------------------ +// Copyright (c) Microsoft Corporation. All rights reserved. +//------------------------------------------------------------ + +namespace Microsoft.Azure.Cosmos +{ + using System; + using System.Collections.Concurrent; + using System.Collections.Generic; + using Microsoft.Azure.Documents; + + /// + /// Cache to create and share Executor instances across the client's lifetime. + /// + internal class BatchAsyncContainerExecutorCache : IDisposable + { + private ConcurrentDictionary executorsPerContainer = new ConcurrentDictionary(); + + public BatchAsyncContainerExecutor GetExecutorForContainer( + ContainerCore container, + CosmosClientContext cosmosClientContext) + { + if (!cosmosClientContext.ClientOptions.AllowBulkExecution) + { + throw new InvalidOperationException("AllowBulkExecution is not currently enabled."); + } + + string containerLink = container.LinkUri.ToString(); + if (this.executorsPerContainer.TryGetValue(containerLink, out BatchAsyncContainerExecutor executor)) + { + return executor; + } + + BatchAsyncContainerExecutor newExecutor = new BatchAsyncContainerExecutor( + container, + cosmosClientContext, + Constants.MaxOperationsInDirectModeBatchRequest, + Constants.MaxDirectModeBatchRequestBodySizeInBytes); + if (!this.executorsPerContainer.TryAdd(containerLink, newExecutor)) + { + newExecutor.Dispose(); + } + + return this.executorsPerContainer[containerLink]; + } + + public void Dispose() + { + foreach (KeyValuePair cacheEntry in this.executorsPerContainer) + { + cacheEntry.Value.Dispose(); + } + } + } +} \ No newline at end of file diff --git a/Microsoft.Azure.Cosmos/src/CosmosClient.cs b/Microsoft.Azure.Cosmos/src/CosmosClient.cs index af3c2c136e..5c2c8df8d6 100644 --- a/Microsoft.Azure.Cosmos/src/CosmosClient.cs +++ b/Microsoft.Azure.Cosmos/src/CosmosClient.cs @@ -12,6 +12,7 @@ namespace Microsoft.Azure.Cosmos using System.Text; using System.Threading; using System.Threading.Tasks; + using Microsoft.Azure.Cosmos.Common; using Microsoft.Azure.Cosmos.Handlers; using Microsoft.Azure.Cosmos.Query; using Microsoft.Azure.Documents; @@ -296,6 +297,7 @@ internal CosmosClient( internal RequestInvokerHandler RequestHandler { get; private set; } internal CosmosResponseFactory ResponseFactory { get; private set; } internal CosmosClientContext ClientContext { get; private set; } + internal BatchAsyncContainerExecutorCache BatchExecutorCache { get; private set; } = new BatchAsyncContainerExecutorCache(); /// /// Read Azure Cosmos DB account properties @@ -743,6 +745,12 @@ protected virtual void Dispose(bool disposing) this.DocumentClient.Dispose(); this.DocumentClient = null; } + + if (this.BatchExecutorCache != null) + { + this.BatchExecutorCache.Dispose(); + this.BatchExecutorCache = null; + } } } } diff --git a/Microsoft.Azure.Cosmos/src/Resource/Container/ContainerCore.cs b/Microsoft.Azure.Cosmos/src/Resource/Container/ContainerCore.cs index 74af72cc43..785f93b2c0 100644 --- a/Microsoft.Azure.Cosmos/src/Resource/Container/ContainerCore.cs +++ b/Microsoft.Azure.Cosmos/src/Resource/Container/ContainerCore.cs @@ -294,11 +294,7 @@ internal virtual BatchAsyncContainerExecutor InitializeBatchExecutorForContainer return null; } - return new BatchAsyncContainerExecutor( - this, - this.ClientContext, - Constants.MaxOperationsInDirectModeBatchRequest, - Constants.MaxDirectModeBatchRequestBodySizeInBytes); + return this.ClientContext.Client.BatchExecutorCache.GetExecutorForContainer(this, this.ClientContext); } private Task ReplaceStreamInternalAsync( diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Batch/BatchAsyncContainerExecutorCacheTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Batch/BatchAsyncContainerExecutorCacheTests.cs new file mode 100644 index 0000000000..32e44fedc5 --- /dev/null +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Batch/BatchAsyncContainerExecutorCacheTests.cs @@ -0,0 +1,115 @@ +//------------------------------------------------------------ +// Copyright (c) Microsoft Corporation. All rights reserved. +//------------------------------------------------------------ + +namespace Microsoft.Azure.Cosmos.Tests +{ + using System; + using System.Collections.Generic; + using System.IO; + using System.Threading; + using System.Threading.Tasks; + using Microsoft.Azure.Documents; + using Microsoft.VisualStudio.TestTools.UnitTesting; + using Moq; + + [TestClass] + public class BatchAsyncContainerExecutorCacheTests + { + [TestMethod] + public async Task ConcurrentGet_ReturnsSameExecutorInstance() + { + Mock mockClient = new Mock(); + mockClient.Setup(x => x.Endpoint).Returns(new Uri("http://localhost")); + + CosmosClientContext context = new ClientContextCore( + client: mockClient.Object, + clientOptions: new CosmosClientOptions() { AllowBulkExecution = true }, + userJsonSerializer: null, + defaultJsonSerializer: null, + sqlQuerySpecSerializer: null, + cosmosResponseFactory: null, + requestHandler: null, + documentClient: null); + + DatabaseCore db = new DatabaseCore(context, "test"); + + List> tasks = new List>(); + for (int i = 0; i < 20; i++) + { + tasks.Add(Task.Run(() => Task.FromResult((ContainerCore)db.GetContainer("test")))); + } + + await Task.WhenAll(tasks); + + BatchAsyncContainerExecutor firstExecutor = tasks[0].Result.BatchExecutor; + Assert.IsNotNull(firstExecutor); + for (int i = 1; i < 20; i++) + { + BatchAsyncContainerExecutor otherExecutor = tasks[i].Result.BatchExecutor; + Assert.AreEqual(firstExecutor, otherExecutor); + } + } + + [TestMethod] + [Timeout(60000)] + public async Task SingleTaskScheduler_ExecutorTest() + { + Mock mockClient = new Mock(); + mockClient.Setup(x => x.Endpoint).Returns(new Uri("http://localhost")); + + CosmosClientContext context = new ClientContextCore( + client: mockClient.Object, + clientOptions: new CosmosClientOptions() { AllowBulkExecution = true }, + userJsonSerializer: null, + defaultJsonSerializer: null, + sqlQuerySpecSerializer: null, + cosmosResponseFactory: null, + requestHandler: null, + documentClient: null); + + DatabaseCore db = new DatabaseCore(context, "test"); + + List> tasks = new List>(); + for (int i = 0; i < 20; i++) + { + tasks.Add( + Task.Factory.StartNew(() => (ContainerCore)db.GetContainer("test"), + CancellationToken.None, + TaskCreationOptions.None, + new SingleTaskScheduler())); + } + + await Task.WhenAll(tasks); + + BatchAsyncContainerExecutor firstExecutor = tasks[0].Result.BatchExecutor; + Assert.IsNotNull(firstExecutor); + for (int i = 1; i < 20; i++) + { + BatchAsyncContainerExecutor otherExecutor = tasks[i].Result.BatchExecutor; + Assert.AreEqual(firstExecutor, otherExecutor); + } + } + + [TestMethod] + public void Null_When_OptionsOff() + { + Mock mockClient = new Mock(); + mockClient.Setup(x => x.Endpoint).Returns(new Uri("http://localhost")); + + CosmosClientContext context = new ClientContextCore( + client: mockClient.Object, + clientOptions: new CosmosClientOptions() { }, + userJsonSerializer: null, + defaultJsonSerializer: null, + sqlQuerySpecSerializer: null, + cosmosResponseFactory: null, + requestHandler: null, + documentClient: null); + + DatabaseCore db = new DatabaseCore(context, "test"); + ContainerCore container = (ContainerCore)db.GetContainer("test"); + Assert.IsNull(container.BatchExecutor); + } + } +} diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/CosmosClientResourceUnitTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/CosmosClientResourceUnitTests.cs index f2198540f8..2c78de337e 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/CosmosClientResourceUnitTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/CosmosClientResourceUnitTests.cs @@ -9,6 +9,7 @@ namespace Microsoft.Azure.Cosmos.Core.Tests using System.Net.Http; using Microsoft.Azure.Documents; using Microsoft.VisualStudio.TestTools.UnitTesting; + using Moq; [TestClass] public class CosmosClientResourceUnitTests @@ -19,8 +20,11 @@ public void ValidateUriGenerationForResources() string databaseId = "db1234"; string crId = "cr42"; + Mock mockClient = new Mock(); + mockClient.Setup(x => x.Endpoint).Returns(new Uri("http://localhost")); + CosmosClientContext context = new ClientContextCore( - client: null, + client: mockClient.Object, clientOptions: new CosmosClientOptions(), userJsonSerializer: null, defaultJsonSerializer: null, @@ -120,8 +124,11 @@ public void InitializeBatchExecutorForContainer_Null_WhenAllowBulk_False() string databaseId = "db1234"; string crId = "cr42"; + Mock mockClient = new Mock(); + mockClient.Setup(x => x.Endpoint).Returns(new Uri("http://localhost")); + CosmosClientContext context = new ClientContextCore( - client: null, + client: mockClient.Object, clientOptions: new CosmosClientOptions(), userJsonSerializer: null, defaultJsonSerializer: null, @@ -141,8 +148,11 @@ public void InitializeBatchExecutorForContainer_NotNull_WhenAllowBulk_True() string databaseId = "db1234"; string crId = "cr42"; + Mock mockClient = new Mock(); + mockClient.Setup(x => x.Endpoint).Returns(new Uri("http://localhost")); + CosmosClientContext context = new ClientContextCore( - client: null, + client: mockClient.Object, clientOptions: new CosmosClientOptions() { AllowBulkExecution = true }, userJsonSerializer: null, defaultJsonSerializer: null, diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/DotNetSDKAPI.json b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/DotNetSDKAPI.json index be96ace12a..834ef8546f 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/DotNetSDKAPI.json +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/DotNetSDKAPI.json @@ -1192,7 +1192,7 @@ "Attributes": [], "MethodInfo": "System.Threading.Tasks.Task`1[Microsoft.Azure.Cosmos.DatabaseResponse] CreateDatabaseAsync(System.String, System.Nullable`1[System.Int32], Microsoft.Azure.Cosmos.RequestOptions, System.Threading.CancellationToken)" }, - "System.Threading.Tasks.Task`1[Microsoft.Azure.Cosmos.DatabaseResponse] CreateDatabaseIfNotExistsAsync(System.String, System.Nullable`1[System.Int32], Microsoft.Azure.Cosmos.RequestOptions, System.Threading.CancellationToken)[System.Runtime.CompilerServices.AsyncStateMachineAttribute(typeof(Microsoft.Azure.Cosmos.CosmosClient+d__37))]": { + "System.Threading.Tasks.Task`1[Microsoft.Azure.Cosmos.DatabaseResponse] CreateDatabaseIfNotExistsAsync(System.String, System.Nullable`1[System.Int32], Microsoft.Azure.Cosmos.RequestOptions, System.Threading.CancellationToken)[System.Runtime.CompilerServices.AsyncStateMachineAttribute(typeof(Microsoft.Azure.Cosmos.CosmosClient+d__41))]": { "Type": "Method", "Attributes": [ "AsyncStateMachineAttribute" diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Microsoft.Azure.Cosmos.Tests.csproj b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Microsoft.Azure.Cosmos.Tests.csproj index 2cf0faa53b..2aec2d109f 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Microsoft.Azure.Cosmos.Tests.csproj +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Microsoft.Azure.Cosmos.Tests.csproj @@ -224,9 +224,6 @@ PreserveNewest - - - true diff --git a/changelog.md b/changelog.md index bc7bf14fc8..42343dabe8 100644 --- a/changelog.md +++ b/changelog.md @@ -17,6 +17,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - [#835](https://github.com/Azure/azure-cosmos-dotnet-v3/pull/835) Fixed a bug that caused sortedRanges exceptions - [#846](https://github.com/Azure/azure-cosmos-dotnet-v3/pull/846) Statistics not getting populated correctly on CosmosException. +- [#857](https://github.com/Azure/azure-cosmos-dotnet-v3/pull/857) Fixed reusability of the Bulk support across Container instances ## [3.2.0](https://www.nuget.org/packages/Microsoft.Azure.Cosmos/3.2.0) - 2019-09-17