diff --git a/CHANGELOG.md b/CHANGELOG.md index 06a54b6990..3caab1cd5f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ # Changelog ## VNext +- [Allow Control of Documents sampling for QuickPulse telemetry](https://github.com/microsoft/ApplicationInsights-dotnet/pull/2425) ## Version 2.19.0-beta1 - [The `{OriginalFormat}` field in ILogger Scope will be emitted as `OriginalFormat` with the braces removed](https://github.com/microsoft/ApplicationInsights-dotnet/pull/2362) diff --git a/WEB/Src/PerformanceCollector/Perf.Tests/Filtering/CollectionConfigurationTests.cs b/WEB/Src/PerformanceCollector/Perf.Tests/Filtering/CollectionConfigurationTests.cs index 99485e9916..9cb5bb9263 100644 --- a/WEB/Src/PerformanceCollector/Perf.Tests/Filtering/CollectionConfigurationTests.cs +++ b/WEB/Src/PerformanceCollector/Perf.Tests/Filtering/CollectionConfigurationTests.cs @@ -8,6 +8,7 @@ namespace Microsoft.ApplicationInsights.Tests using Microsoft.ApplicationInsights.Extensibility; using Microsoft.ApplicationInsights.Extensibility.Filtering; using Microsoft.ApplicationInsights.Extensibility.PerfCounterCollector.Implementation.QuickPulse; + using Microsoft.ApplicationInsights.Extensibility.PerfCounterCollector.Implementation.ServiceContract; using Microsoft.ApplicationInsights.Extensibility.PerfCounterCollector.QuickPulse; using Microsoft.ApplicationInsights.Web.Helpers; using Microsoft.VisualStudio.TestTools.UnitTesting; @@ -653,6 +654,222 @@ public void CollectionConfigurationCarriesOverQuotaWhenCreatingDocumentStreams() Assert.AreEqual(3, documentStreams[3].EventQuotaTracker.CurrentQuota); } + [TestMethod] + public void CollectionConfigurationWithMaxAndInitialQuotas() + { + // ARRANGE + var timeProvider = new ClockMock(); + + CollectionConfigurationError[] errors; + var oldDocumentStreamInfos = new[] + { + new DocumentStreamInfo() + { + Id = "Stream1", + DocumentFilterGroups = + new[] + { + new DocumentFilterConjunctionGroupInfo() + { + TelemetryType = TelemetryType.Request, + Filters = new FilterConjunctionGroupInfo() { Filters = new FilterInfo[0] } + } + } + }, + new DocumentStreamInfo() + { + Id = "Stream2", + DocumentFilterGroups = + new[] + { + new DocumentFilterConjunctionGroupInfo() + { + TelemetryType = TelemetryType.Dependency, + Filters = new FilterConjunctionGroupInfo() { Filters = new FilterInfo[0] } + } + } + }, + new DocumentStreamInfo() + { + Id = "Stream3", + DocumentFilterGroups = + new[] + { + new DocumentFilterConjunctionGroupInfo() + { + TelemetryType = TelemetryType.Exception, + Filters = new FilterConjunctionGroupInfo() { Filters = new FilterInfo[0] } + } + } + } + }; + + var newDocumentStreamInfos = new[] + { + new DocumentStreamInfo() + { + Id = "Stream1", + DocumentFilterGroups = + new[] + { + new DocumentFilterConjunctionGroupInfo() + { + TelemetryType = TelemetryType.Request, + Filters = new FilterConjunctionGroupInfo() { Filters = new FilterInfo[0] } + } + } + }, + new DocumentStreamInfo() + { + Id = "Stream2", + DocumentFilterGroups = + new[] + { + new DocumentFilterConjunctionGroupInfo() + { + TelemetryType = TelemetryType.Dependency, + Filters = new FilterConjunctionGroupInfo() { Filters = new FilterInfo[0] } + } + } + }, + new DocumentStreamInfo() + { + Id = "Stream3", + DocumentFilterGroups = + new[] + { + new DocumentFilterConjunctionGroupInfo() + { + TelemetryType = TelemetryType.Exception, + Filters = new FilterConjunctionGroupInfo() { Filters = new FilterInfo[0] } + } + } + }, + new DocumentStreamInfo() + { + Id = "Stream4", + DocumentFilterGroups = + new[] + { + new DocumentFilterConjunctionGroupInfo() + { + TelemetryType = TelemetryType.Event, + Filters = new FilterConjunctionGroupInfo() { Filters = new FilterInfo[0] } + } + } + }, + new DocumentStreamInfo() + { + Id = "Stream5", + DocumentFilterGroups = + new[] + { + new DocumentFilterConjunctionGroupInfo() + { + TelemetryType = TelemetryType.Trace, + Filters = new FilterConjunctionGroupInfo() { Filters = new FilterInfo[0] } + } + } + } + }; + + var oldCollectionConfigurationInfo = new CollectionConfigurationInfo() + { + DocumentStreams = oldDocumentStreamInfos, + ETag = "ETag1", + QuotaInfo = new QuotaConfigurationInfo() + { + InitialQuota = 10, + MaxQuota = 30 + } + }; + var oldCollectionConfiguration = new CollectionConfiguration(oldCollectionConfigurationInfo, out errors, timeProvider); + + // spend some quota on the old configuration + var accumulatorManager = new QuickPulseDataAccumulatorManager(oldCollectionConfiguration); + var telemetryProcessor = new QuickPulseTelemetryProcessor(new SimpleTelemetryProcessorSpy()); + ((IQuickPulseTelemetryProcessor)telemetryProcessor).StartCollection( + accumulatorManager, + new Uri("http://microsoft.com"), + new TelemetryConfiguration() { InstrumentationKey = "some ikey" }); + + // ACT + // the initial quota is 3 + telemetryProcessor.Process(new RequestTelemetry() { Context = { InstrumentationKey = "some ikey" } }); + + telemetryProcessor.Process(new DependencyTelemetry() { Context = { InstrumentationKey = "some ikey" } }); + telemetryProcessor.Process(new DependencyTelemetry() { Context = { InstrumentationKey = "some ikey" } }); + + telemetryProcessor.Process(new ExceptionTelemetry() { Context = { InstrumentationKey = "some ikey" } }); + telemetryProcessor.Process(new ExceptionTelemetry() { Context = { InstrumentationKey = "some ikey" } }); + telemetryProcessor.Process(new ExceptionTelemetry() { Context = { InstrumentationKey = "some ikey" } }); + + // ACT + // the new configuration must carry the quotas over from the old one (only for those document streams that already existed) + var newCollectionConfigurationInfo = new CollectionConfigurationInfo() { DocumentStreams = newDocumentStreamInfos, ETag = "ETag1" }; + var newCollectionConfiguration = new CollectionConfiguration( + newCollectionConfigurationInfo, + out errors, + timeProvider, + oldCollectionConfiguration.DocumentStreams); + + // ASSERT + DocumentStream[] oldDocumentStreams = oldCollectionConfiguration.DocumentStreams.ToArray(); + Assert.AreEqual(3, oldDocumentStreams.Length); + + Assert.AreEqual("Stream1", oldDocumentStreams[0].Id); + Assert.AreEqual(9, oldDocumentStreams[0].RequestQuotaTracker.CurrentQuota); + Assert.AreEqual(10, oldDocumentStreams[0].DependencyQuotaTracker.CurrentQuota); + Assert.AreEqual(10, oldDocumentStreams[0].ExceptionQuotaTracker.CurrentQuota); + Assert.AreEqual(10, oldDocumentStreams[0].EventQuotaTracker.CurrentQuota); + Assert.AreEqual(10, oldDocumentStreams[0].EventQuotaTracker.CurrentQuota); + + Assert.AreEqual("Stream2", oldDocumentStreams[1].Id); + Assert.AreEqual(10, oldDocumentStreams[1].RequestQuotaTracker.CurrentQuota); + Assert.AreEqual(8, oldDocumentStreams[1].DependencyQuotaTracker.CurrentQuota); + Assert.AreEqual(10, oldDocumentStreams[1].ExceptionQuotaTracker.CurrentQuota); + Assert.AreEqual(10, oldDocumentStreams[1].EventQuotaTracker.CurrentQuota); + Assert.AreEqual(10, oldDocumentStreams[1].EventQuotaTracker.CurrentQuota); + + Assert.AreEqual("Stream3", oldDocumentStreams[2].Id); + Assert.AreEqual(10, oldDocumentStreams[2].RequestQuotaTracker.CurrentQuota); + Assert.AreEqual(10, oldDocumentStreams[2].DependencyQuotaTracker.CurrentQuota); + Assert.AreEqual(7, oldDocumentStreams[2].ExceptionQuotaTracker.CurrentQuota); + Assert.AreEqual(10, oldDocumentStreams[2].EventQuotaTracker.CurrentQuota); + Assert.AreEqual(10, oldDocumentStreams[2].EventQuotaTracker.CurrentQuota); + + DocumentStream[] newDocumentStreams = newCollectionConfiguration.DocumentStreams.ToArray(); + Assert.AreEqual(5, newDocumentStreams.Length); + + Assert.AreEqual("Stream1", newDocumentStreams[0].Id); + Assert.AreEqual(9, newDocumentStreams[0].RequestQuotaTracker.CurrentQuota); + Assert.AreEqual(10, newDocumentStreams[0].DependencyQuotaTracker.CurrentQuota); + Assert.AreEqual(10, newDocumentStreams[0].ExceptionQuotaTracker.CurrentQuota); + Assert.AreEqual(10, newDocumentStreams[0].EventQuotaTracker.CurrentQuota); + Assert.AreEqual(10, newDocumentStreams[0].EventQuotaTracker.CurrentQuota); + + Assert.AreEqual("Stream2", newDocumentStreams[1].Id); + Assert.AreEqual(10, newDocumentStreams[1].RequestQuotaTracker.CurrentQuota); + Assert.AreEqual(8, newDocumentStreams[1].DependencyQuotaTracker.CurrentQuota); + Assert.AreEqual(10, newDocumentStreams[1].ExceptionQuotaTracker.CurrentQuota); + Assert.AreEqual(10, newDocumentStreams[1].EventQuotaTracker.CurrentQuota); + Assert.AreEqual(10, newDocumentStreams[1].EventQuotaTracker.CurrentQuota); + + Assert.AreEqual("Stream3", newDocumentStreams[2].Id); + Assert.AreEqual(10, newDocumentStreams[2].RequestQuotaTracker.CurrentQuota); + Assert.AreEqual(10, newDocumentStreams[2].DependencyQuotaTracker.CurrentQuota); + Assert.AreEqual(7, newDocumentStreams[2].ExceptionQuotaTracker.CurrentQuota); + Assert.AreEqual(10, newDocumentStreams[2].EventQuotaTracker.CurrentQuota); + Assert.AreEqual(10, newDocumentStreams[2].EventQuotaTracker.CurrentQuota); + + Assert.AreEqual("Stream4", newDocumentStreams[3].Id); + Assert.AreEqual(3, newDocumentStreams[3].RequestQuotaTracker.CurrentQuota); + Assert.AreEqual(3, newDocumentStreams[3].DependencyQuotaTracker.CurrentQuota); + Assert.AreEqual(3, newDocumentStreams[3].ExceptionQuotaTracker.CurrentQuota); + Assert.AreEqual(3, newDocumentStreams[3].EventQuotaTracker.CurrentQuota); + Assert.AreEqual(3, newDocumentStreams[3].EventQuotaTracker.CurrentQuota); + } + [TestMethod] public void CollectionConfigurationReportsDocumentStreamsWithDuplicateIds() { diff --git a/WEB/Src/PerformanceCollector/Perf.Tests/QuickPulse/QuickPulseQuotaTrackerTests.cs b/WEB/Src/PerformanceCollector/Perf.Tests/QuickPulse/QuickPulseQuotaTrackerTests.cs index 79b9ca5f49..40990bfea3 100644 --- a/WEB/Src/PerformanceCollector/Perf.Tests/QuickPulse/QuickPulseQuotaTrackerTests.cs +++ b/WEB/Src/PerformanceCollector/Perf.Tests/QuickPulse/QuickPulseQuotaTrackerTests.cs @@ -163,5 +163,28 @@ public void QuickPulseQuotaTrackerIsThreadSafe() Assert.AreEqual(correctResult, passedQuotaCount); Assert.IsFalse(quotaTracker.ApplyQuota()); } + + [TestMethod] + public void QuickPulseQuotaTrackerParameters() + { + // ARRANGE + int maxQuota = 500; + int experimentLengthInSeconds = 499; + var mockTimeProvider = new ClockMock(); + var quotaTracker = new QuickPulseQuotaTracker(mockTimeProvider, maxQuota, 900, 1); + + // ACT + for (int i = 0; i < experimentLengthInSeconds; i++) + { + mockTimeProvider.FastForward(TimeSpan.FromSeconds(1)); + + quotaTracker.ApplyQuota(); + } + + // ASSERT + Assert.AreEqual(499, quotaTracker.CurrentQuota); + Assert.AreEqual(500, quotaTracker.MaxQuota); + Assert.AreEqual(false, quotaTracker.QuotaExhausted); + } } } \ No newline at end of file diff --git a/WEB/Src/PerformanceCollector/Perf.Tests/QuickPulse/QuickPulseTelemetryModuleTests.cs b/WEB/Src/PerformanceCollector/Perf.Tests/QuickPulse/QuickPulseTelemetryModuleTests.cs index bccf71f6a2..e8249b91af 100644 --- a/WEB/Src/PerformanceCollector/Perf.Tests/QuickPulse/QuickPulseTelemetryModuleTests.cs +++ b/WEB/Src/PerformanceCollector/Perf.Tests/QuickPulse/QuickPulseTelemetryModuleTests.cs @@ -13,6 +13,7 @@ using Microsoft.ApplicationInsights.Extensibility.Implementation; using Microsoft.ApplicationInsights.Extensibility.PerfCounterCollector.Implementation.QuickPulse; using Microsoft.ApplicationInsights.Extensibility.PerfCounterCollector.Implementation.QuickPulse.Helpers; + using Microsoft.ApplicationInsights.Extensibility.PerfCounterCollector.Implementation.ServiceContract; using Microsoft.ApplicationInsights.Extensibility.PerfCounterCollector.QuickPulse; using Microsoft.ApplicationInsights.TestFramework; using Microsoft.ApplicationInsights.Web.Helpers; @@ -789,7 +790,7 @@ public void QuickPulseTelemetryModuleUpdatesCollectionConfiguration() var module = new QuickPulseTelemetryModule(collectionTimeSlotManager, null, serviceClient, performanceCollector, topCpuCollector, timings); // ACT & ASSERT - serviceClient.CollectionConfigurationInfo = new CollectionConfigurationInfo() { ETag = "ETag1" }; + serviceClient.CollectionConfigurationInfo = new CollectionConfigurationInfo() { ETag = "ETag1", QuotaInfo = new QuotaConfigurationInfo() { InitialQuota = 50, MaxQuota=60, QuotaAccrualRatePerSec=10 } }; module.Initialize(new TelemetryConfiguration() { InstrumentationKey = "some ikey" }); @@ -802,6 +803,192 @@ public void QuickPulseTelemetryModuleUpdatesCollectionConfiguration() Assert.AreEqual("ETag2", serviceClient.SnappedSamples.Last().CollectionConfigurationAccumulator.CollectionConfiguration.ETag); } + [TestMethod] + public void QuickPulseTelemetryModuleUpdatesCollectionConfigurationWithQuotaAccrualRate() + { + if (QuickPulseTelemetryModuleTests.Ignored) + { + return; + } + + // ARRANGE + var pollingInterval = TimeSpan.FromSeconds(1); + var collectionInterval = TimeSpan.FromMilliseconds(400); + var timings = new QuickPulseTimings(pollingInterval, collectionInterval); + var collectionTimeSlotManager = new QuickPulseCollectionTimeSlotManagerMock(timings); + var serviceClient = new QuickPulseServiceClientMock { ReturnValueFromPing = true, ReturnValueFromSubmitSample = true }; + var performanceCollector = new PerformanceCollectorMock(); + var topCpuCollector = new QuickPulseTopCpuCollectorMock(); + var module = new QuickPulseTelemetryModule(collectionTimeSlotManager, null, serviceClient, performanceCollector, topCpuCollector, timings); + + // ACT & ASSERT + CollectionConfigurationInfo collectionConfigurationInfo = new CollectionConfigurationInfo() + { + ETag = "ETag1", + DocumentStreams = new[] { new DocumentStreamInfo() { Id = "wx3", DocumentFilterGroups = new[] { new DocumentFilterConjunctionGroupInfo() { TelemetryType = TelemetryType.Request, Filters = new FilterConjunctionGroupInfo() { Filters = new FilterInfo[0] } } } } }, + QuotaInfo = new QuotaConfigurationInfo() + { + InitialQuota = 50, + QuotaAccrualRatePerSec = 10, + MaxQuota = 60 + } + }; + serviceClient.CollectionConfigurationInfo = collectionConfigurationInfo; + QuickPulseTelemetryProcessor telemetryProcessor = new QuickPulseTelemetryProcessor(new SimpleTelemetryProcessorSpy()); + module.RegisterTelemetryProcessor(telemetryProcessor); + module.Initialize(new TelemetryConfiguration() { InstrumentationKey = "some ikey" }); + var collectionConfiguration = new CollectionConfiguration(collectionConfigurationInfo, out _, new Clock()); + var accumulatorManager = new QuickPulseDataAccumulatorManager(collectionConfiguration); + ((IQuickPulseTelemetryProcessor)telemetryProcessor).StartCollection( + accumulatorManager, + new Uri("http://microsoft.com"), + new TelemetryConfiguration() { InstrumentationKey = "some ikey" }); + + for (int i=0;i<100;i++) + { + telemetryProcessor.Process(new RequestTelemetry() { Id = "1", Name = "Request1", Context = { InstrumentationKey = "some ikey" } }); + } + + Assert.AreEqual(50, accumulatorManager.CurrentDataAccumulator.TelemetryDocuments.Count); + Thread.Sleep(pollingInterval); + + for (int i = 0; i < 100; i++) + { + telemetryProcessor.Process(new RequestTelemetry() { Id = "1", Name = "Request1", Context = { InstrumentationKey = "some ikey" } }); + } + + Assert.AreEqual(60, accumulatorManager.CurrentDataAccumulator.TelemetryDocuments.Count); + } + + [TestMethod] + public void QuickPulseTelemetryModuleUpdatesCollectionConfigurationWithMaxQuota() + { + if (QuickPulseTelemetryModuleTests.Ignored) + { + return; + } + + // ARRANGE + var pollingInterval = TimeSpan.FromSeconds(1); + var collectionInterval = TimeSpan.FromMilliseconds(400); + var timings = new QuickPulseTimings(pollingInterval, collectionInterval); + var collectionTimeSlotManager = new QuickPulseCollectionTimeSlotManagerMock(timings); + var serviceClient = new QuickPulseServiceClientMock { ReturnValueFromPing = true, ReturnValueFromSubmitSample = true }; + var performanceCollector = new PerformanceCollectorMock(); + var topCpuCollector = new QuickPulseTopCpuCollectorMock(); + var module = new QuickPulseTelemetryModule(collectionTimeSlotManager, null, serviceClient, performanceCollector, topCpuCollector, timings); + + // ACT & ASSERT + CollectionConfigurationInfo collectionConfigurationInfo = new CollectionConfigurationInfo() + { + ETag = "ETag1", + DocumentStreams = new[] { new DocumentStreamInfo() { Id = "wx3", DocumentFilterGroups = new[] { new DocumentFilterConjunctionGroupInfo() { TelemetryType = TelemetryType.Request, Filters = new FilterConjunctionGroupInfo() { Filters = new FilterInfo[0] } } } } }, + QuotaInfo = new QuotaConfigurationInfo() + { + InitialQuota = 50, + QuotaAccrualRatePerSec = 40, + MaxQuota = 60 + } + }; + serviceClient.CollectionConfigurationInfo = collectionConfigurationInfo; + QuickPulseTelemetryProcessor telemetryProcessor = new QuickPulseTelemetryProcessor(new SimpleTelemetryProcessorSpy()); + module.RegisterTelemetryProcessor(telemetryProcessor); + module.Initialize(new TelemetryConfiguration() { InstrumentationKey = "some ikey" }); + var collectionConfiguration = new CollectionConfiguration(collectionConfigurationInfo, out _, new Clock()); + var accumulatorManager = new QuickPulseDataAccumulatorManager(collectionConfiguration); + ((IQuickPulseTelemetryProcessor)telemetryProcessor).StartCollection( + accumulatorManager, + new Uri("http://microsoft.com"), + new TelemetryConfiguration() { InstrumentationKey = "some ikey" }); + + Thread.Sleep(pollingInterval); + + for (int i = 0; i < 100; i++) + { + telemetryProcessor.Process(new RequestTelemetry() { Id = "1", Name = "Request1", Context = { InstrumentationKey = "some ikey" } }); + } + + Assert.AreEqual(60, accumulatorManager.CurrentDataAccumulator.TelemetryDocuments.Count); + } + + [TestMethod] + public void QuickPulseTelemetryModuleUpdatesGlobalCollectionConfigurationWithQuotaInfo() + { +#if !NETCOREAPP + if (QuickPulseTelemetryModuleTests.Ignored) + { + return; + } + + // ARRANGE + var pollingInterval = TimeSpan.FromSeconds(1); + var collectionInterval = TimeSpan.FromMilliseconds(400); + var timings = new QuickPulseTimings(pollingInterval, collectionInterval); + var collectionTimeSlotManager = new QuickPulseCollectionTimeSlotManagerMock(timings); + var serviceClient = new QuickPulseServiceClientMock { ReturnValueFromPing = true, ReturnValueFromSubmitSample = true }; + var performanceCollector = new PerformanceCollectorMock(); + var topCpuCollector = new QuickPulseTopCpuCollectorMock(); + var module = new QuickPulseTelemetryModule(collectionTimeSlotManager, null, serviceClient, performanceCollector, topCpuCollector, timings); + + // ACT & ASSERT + CollectionConfigurationInfo collectionConfigurationInfo = new CollectionConfigurationInfo() + { + ETag = "ETag1", + DocumentStreams = new[] { new DocumentStreamInfo() { Id = "wx3", DocumentFilterGroups = new[] { new DocumentFilterConjunctionGroupInfo() { TelemetryType = TelemetryType.Request, Filters = new FilterConjunctionGroupInfo() { Filters = new FilterInfo[0] } } } } }, + QuotaInfo = new QuotaConfigurationInfo() + { + InitialQuota = 50, + QuotaAccrualRatePerSec = 10, + MaxQuota = 60 + } + }; + serviceClient.CollectionConfigurationInfo = collectionConfigurationInfo; + QuickPulseTelemetryProcessor telemetryProcessor = new QuickPulseTelemetryProcessor(new SimpleTelemetryProcessorSpy()); + module.RegisterTelemetryProcessor(telemetryProcessor); + module.Initialize(new TelemetryConfiguration() { InstrumentationKey = "some ikey" }); + var collectionConfiguration = new CollectionConfiguration(collectionConfigurationInfo, out _, new Clock()); + var accumulatorManager = new QuickPulseDataAccumulatorManager(collectionConfiguration); + ((IQuickPulseTelemetryProcessor)telemetryProcessor).StartCollection( + accumulatorManager, + new Uri("http://microsoft.com"), + new TelemetryConfiguration() { InstrumentationKey = "some ikey" }); + + Thread.Sleep(pollingInterval); + + for (int i = 0; i < 100; i++) + { + telemetryProcessor.Process(new RequestTelemetry() { Id = "1", Name = "Request1", Context = { InstrumentationKey = "some ikey" } }); + } + + Assert.AreEqual(60, accumulatorManager.CurrentDataAccumulator.TelemetryDocuments.Count); + + CollectionConfigurationInfo collectionConfigurationInfo2 = new CollectionConfigurationInfo() + { + ETag = "ETag2", + DocumentStreams = new[] { new DocumentStreamInfo() { Id = "wx3", DocumentFilterGroups = new[] { new DocumentFilterConjunctionGroupInfo() { TelemetryType = TelemetryType.Request, Filters = new FilterConjunctionGroupInfo() { Filters = new FilterInfo[0] } } } } }, + QuotaInfo = new QuotaConfigurationInfo() + { + InitialQuota = 0, + QuotaAccrualRatePerSec = 1, + MaxQuota = 5 + } + }; + + PrivateObject quickPulseTelemetryModuleTester = new PrivateObject(module); + quickPulseTelemetryModuleTester.Invoke("OnUpdatedConfiguration", collectionConfigurationInfo2); + + Thread.Sleep(pollingInterval); + + for (int i = 0; i < 100; i++) + { + telemetryProcessor.Process(new RequestTelemetry() { Id = "1", Name = "Request1", Context = { InstrumentationKey = "some ikey" } }); + } + + Assert.IsTrue(accumulatorManager.CurrentDataAccumulator.GlobalDocumentQuotaReached); + Assert.AreEqual(61, accumulatorManager.CurrentDataAccumulator.TelemetryDocuments.Count); +#endif + } + [TestMethod] public void QuickPulseTelemetryModuleUpdatesPerformanceCollectorWhenUpdatingCollectionConfiguration() { diff --git a/WEB/Src/PerformanceCollector/PerformanceCollector/CollectionConfiguration.cs b/WEB/Src/PerformanceCollector/PerformanceCollector/CollectionConfiguration.cs index 6d635ba81e..15d1304379 100644 --- a/WEB/Src/PerformanceCollector/PerformanceCollector/CollectionConfiguration.cs +++ b/WEB/Src/PerformanceCollector/PerformanceCollector/CollectionConfiguration.cs @@ -52,15 +52,10 @@ public CollectionConfiguration( Clock timeProvider, IEnumerable previousDocumentStreams = null) { - if (info == null) - { - throw new ArgumentNullException(nameof(info)); - } - - this.info = info; + this.info = info ?? throw new ArgumentNullException(nameof(info)); // create metrics based on descriptions in info - this.CreateTelemetryMetrics(info, out CollectionConfigurationError[] metricErrors); + this.CreateTelemetryMetrics(out CollectionConfigurationError[] metricErrors); // maintain a separate collection of all (Id, AggregationType) pairs with some additional data - to allow for uniform access to all types of metrics // this includes both telemetry metrics and Metric metrics @@ -189,6 +184,9 @@ private void CreateDocumentStreams( if (this.info.DocumentStreams != null) { + float? maxQuota = this.info.QuotaInfo?.MaxQuota; + float? quotaAccrualRatePerSec = this.info.QuotaInfo?.QuotaAccrualRatePerSec; + foreach (DocumentStreamInfo documentStreamInfo in this.info.DocumentStreams) { if (documentStreamIds.Contains(documentStreamInfo.Id)) @@ -207,18 +205,24 @@ private void CreateDocumentStreams( CollectionConfigurationError[] localErrors = null; try { - Tuple initialQuotas; - previousQuotasByStreamId.TryGetValue(documentStreamInfo.Id, out initialQuotas); + previousQuotasByStreamId.TryGetValue(documentStreamInfo.Id, out Tuple previousQuotas); + float? initialQuota = this.info.QuotaInfo?.InitialQuota; var documentStream = new DocumentStream( documentStreamInfo, out localErrors, timeProvider, - initialRequestQuota: initialQuotas?.Item1, - initialDependencyQuota: initialQuotas?.Item2, - initialExceptionQuota: initialQuotas?.Item3, - initialEventQuota: initialQuotas?.Item4, - initialTraceQuota: initialQuotas?.Item5); + initialRequestQuota: initialQuota ?? previousQuotas?.Item1, + initialDependencyQuota: initialQuota ?? previousQuotas?.Item2, + initialExceptionQuota: initialQuota ?? previousQuotas?.Item3, + initialEventQuota: initialQuota ?? previousQuotas?.Item4, + initialTraceQuota: initialQuota ?? previousQuotas?.Item5, + maxRequestQuota: maxQuota, + maxDependencyQuota: maxQuota, + maxExceptionQuota: maxQuota, + maxEventQuota: maxQuota, + maxTraceQuota: maxQuota, + quotaAccrualRatePerSec: quotaAccrualRatePerSec); documentStreamIds.Add(documentStreamInfo.Id); this.documentStreams.Add(documentStream); @@ -248,12 +252,12 @@ private void CreateDocumentStreams( errors = errorList.ToArray(); } - private void CreateTelemetryMetrics(CollectionConfigurationInfo info, out CollectionConfigurationError[] errors) + private void CreateTelemetryMetrics(out CollectionConfigurationError[] errors) { var errorList = new List(); var metricIds = new HashSet(); - foreach (CalculatedMetricInfo metricInfo in info.Metrics ?? ArrayExtensions.Empty()) + foreach (CalculatedMetricInfo metricInfo in this.info.Metrics ?? ArrayExtensions.Empty()) { if (metricIds.Contains(metricInfo.Id)) { diff --git a/WEB/Src/PerformanceCollector/PerformanceCollector/DocumentStream.cs b/WEB/Src/PerformanceCollector/PerformanceCollector/DocumentStream.cs index f0f38a8a40..e6f5563ce7 100644 --- a/WEB/Src/PerformanceCollector/PerformanceCollector/DocumentStream.cs +++ b/WEB/Src/PerformanceCollector/PerformanceCollector/DocumentStream.cs @@ -14,7 +14,7 @@ /// internal class DocumentStream { - private const float MaxTelemetryQuota = 30f; + private const float DefaultMaxTelemetryQuota = 30f; private const float InitialTelemetryQuota = 3f; @@ -38,17 +38,23 @@ public DocumentStream( float? initialDependencyQuota = null, float? initialExceptionQuota = null, float? initialEventQuota = null, - float? initialTraceQuota = null) + float? initialTraceQuota = null, + float? maxRequestQuota = null, + float? maxDependencyQuota = null, + float? maxExceptionQuota = null, + float? maxEventQuota = null, + float? maxTraceQuota = null, + float? quotaAccrualRatePerSec = null) { this.info = info ?? throw new ArgumentNullException(nameof(info)); this.CreateFilters(out errors); - this.RequestQuotaTracker = new QuickPulseQuotaTracker(timeProvider, MaxTelemetryQuota, initialRequestQuota ?? InitialTelemetryQuota); - this.DependencyQuotaTracker = new QuickPulseQuotaTracker(timeProvider, MaxTelemetryQuota, initialDependencyQuota ?? InitialTelemetryQuota); - this.ExceptionQuotaTracker = new QuickPulseQuotaTracker(timeProvider, MaxTelemetryQuota, initialExceptionQuota ?? InitialTelemetryQuota); - this.EventQuotaTracker = new QuickPulseQuotaTracker(timeProvider, MaxTelemetryQuota, initialEventQuota ?? InitialTelemetryQuota); - this.TraceQuotaTracker = new QuickPulseQuotaTracker(timeProvider, MaxTelemetryQuota, initialTraceQuota ?? InitialTelemetryQuota); + this.RequestQuotaTracker = new QuickPulseQuotaTracker(timeProvider, maxRequestQuota ?? DefaultMaxTelemetryQuota, initialRequestQuota ?? InitialTelemetryQuota, quotaAccrualRatePerSec); + this.DependencyQuotaTracker = new QuickPulseQuotaTracker(timeProvider, maxDependencyQuota ?? DefaultMaxTelemetryQuota, initialDependencyQuota ?? InitialTelemetryQuota, quotaAccrualRatePerSec); + this.ExceptionQuotaTracker = new QuickPulseQuotaTracker(timeProvider, maxExceptionQuota ?? DefaultMaxTelemetryQuota, initialExceptionQuota ?? InitialTelemetryQuota, quotaAccrualRatePerSec); + this.EventQuotaTracker = new QuickPulseQuotaTracker(timeProvider, maxEventQuota ?? DefaultMaxTelemetryQuota, initialEventQuota ?? InitialTelemetryQuota, quotaAccrualRatePerSec); + this.TraceQuotaTracker = new QuickPulseQuotaTracker(timeProvider, maxTraceQuota ?? DefaultMaxTelemetryQuota, initialTraceQuota ?? InitialTelemetryQuota, quotaAccrualRatePerSec); } public QuickPulseQuotaTracker RequestQuotaTracker { get; } diff --git a/WEB/Src/PerformanceCollector/PerformanceCollector/IQuickPulseTelemetryProcessor.cs b/WEB/Src/PerformanceCollector/PerformanceCollector/IQuickPulseTelemetryProcessor.cs index 2f9024a90a..b1b1448cc1 100644 --- a/WEB/Src/PerformanceCollector/PerformanceCollector/IQuickPulseTelemetryProcessor.cs +++ b/WEB/Src/PerformanceCollector/PerformanceCollector/IQuickPulseTelemetryProcessor.cs @@ -3,6 +3,8 @@ using System; using Microsoft.ApplicationInsights.Extensibility.PerfCounterCollector.Implementation.QuickPulse; + using Microsoft.ApplicationInsights.Extensibility.PerfCounterCollector.Implementation.QuickPulse.Helpers; + using Microsoft.ApplicationInsights.Extensibility.PerfCounterCollector.Implementation.ServiceContract; internal interface IQuickPulseTelemetryProcessor { @@ -11,5 +13,7 @@ internal interface IQuickPulseTelemetryProcessor void StartCollection(IQuickPulseDataAccumulatorManager accumulatorManager, Uri serviceEndpoint, TelemetryConfiguration configuration, bool disableFullTelemetryItems = false); void StopCollection(); + + void UpdateGlobalQuotas(Clock timeProvider, QuotaConfigurationInfo quotaInfo); } } \ No newline at end of file diff --git a/WEB/Src/PerformanceCollector/PerformanceCollector/Implementation/QuickPulse/Helpers/QuickPulseQuotaTracker.cs b/WEB/Src/PerformanceCollector/PerformanceCollector/Implementation/QuickPulse/Helpers/QuickPulseQuotaTracker.cs index dba05f0b9d..90382471e7 100644 --- a/WEB/Src/PerformanceCollector/PerformanceCollector/Implementation/QuickPulse/Helpers/QuickPulseQuotaTracker.cs +++ b/WEB/Src/PerformanceCollector/PerformanceCollector/Implementation/QuickPulse/Helpers/QuickPulseQuotaTracker.cs @@ -8,9 +8,7 @@ /// internal class QuickPulseQuotaTracker { - private readonly float inputStreamRatePerSec; - - private readonly float maxQuota; + private readonly float quotaAccrualRatePerSec; private readonly DateTimeOffset startedTrackingTime; @@ -18,14 +16,15 @@ internal class QuickPulseQuotaTracker private float currentQuota; + private float maxQuota; + private long lastQuotaAccrualFullSeconds; - public QuickPulseQuotaTracker(Clock timeProvider, float maxQuota, float startQuota) + public QuickPulseQuotaTracker(Clock timeProvider, float maxQuota, float startQuota, float? quotaAccrualRatePerSec = null) { this.timeProvider = timeProvider; this.maxQuota = maxQuota; - this.inputStreamRatePerSec = this.maxQuota / 60; - + this.quotaAccrualRatePerSec = quotaAccrualRatePerSec ?? (this.maxQuota / 60); // should not be calculated from maxQuota - Should be passed from the service this.startedTrackingTime = timeProvider.UtcNow; this.lastQuotaAccrualFullSeconds = 0; this.currentQuota = startQuota; @@ -33,6 +32,8 @@ public QuickPulseQuotaTracker(Clock timeProvider, float maxQuota, float startQuo public float CurrentQuota => Interlocked.CompareExchange(ref this.currentQuota, 0, 0); + public float MaxQuota => Interlocked.CompareExchange(ref this.maxQuota, 0, 0); + public bool QuotaExhausted => Interlocked.CompareExchange(ref this.currentQuota, 0, 0) < 1f; /// @@ -132,7 +133,7 @@ private void IncreaseQuota(long seconds) { float originalValue = Interlocked.CompareExchange(ref this.currentQuota, 0, 0); - float delta = Math.Min(this.inputStreamRatePerSec * seconds, this.maxQuota - originalValue); + float delta = Math.Min(this.quotaAccrualRatePerSec * seconds, this.maxQuota - originalValue); if (Interlocked.CompareExchange(ref this.currentQuota, originalValue + delta, originalValue) == originalValue) { diff --git a/WEB/Src/PerformanceCollector/PerformanceCollector/Implementation/Service contract/CollectionConfigurationInfo.cs b/WEB/Src/PerformanceCollector/PerformanceCollector/Implementation/Service contract/CollectionConfigurationInfo.cs index 0835802660..fab95b3de2 100644 --- a/WEB/Src/PerformanceCollector/PerformanceCollector/Implementation/Service contract/CollectionConfigurationInfo.cs +++ b/WEB/Src/PerformanceCollector/PerformanceCollector/Implementation/Service contract/CollectionConfigurationInfo.cs @@ -1,6 +1,7 @@ namespace Microsoft.ApplicationInsights.Extensibility.Filtering { using System.Runtime.Serialization; + using Microsoft.ApplicationInsights.Extensibility.PerfCounterCollector.Implementation.ServiceContract; /// /// DTO that represents the collection configuration - a customizable description of performance counters, metrics, and full telemetry documents @@ -12,6 +13,9 @@ internal class CollectionConfigurationInfo [DataMember] public string ETag { get; set; } + [DataMember(IsRequired = false)] + public QuotaConfigurationInfo QuotaInfo { get; set; } + [DataMember] public CalculatedMetricInfo[] Metrics { get; set; } diff --git a/WEB/Src/PerformanceCollector/PerformanceCollector/Implementation/Service contract/QuotaConfigurationInfo.cs b/WEB/Src/PerformanceCollector/PerformanceCollector/Implementation/Service contract/QuotaConfigurationInfo.cs new file mode 100644 index 0000000000..f6c1c942ee --- /dev/null +++ b/WEB/Src/PerformanceCollector/PerformanceCollector/Implementation/Service contract/QuotaConfigurationInfo.cs @@ -0,0 +1,22 @@ +namespace Microsoft.ApplicationInsights.Extensibility.PerfCounterCollector.Implementation.ServiceContract +{ + using System; + using System.Collections.Generic; + using System.Linq; + using System.Runtime.Serialization; + using System.Text; + using System.Threading.Tasks; + + [DataContract] + internal class QuotaConfigurationInfo + { + [DataMember(IsRequired = false)] + public float? InitialQuota { get; set; } + + [DataMember(IsRequired = true)] + public float MaxQuota { get; set; } + + [DataMember(IsRequired = true)] + public float QuotaAccrualRatePerSec { get; set; } + } +} diff --git a/WEB/Src/PerformanceCollector/PerformanceCollector/QuickPulseTelemetryModule.cs b/WEB/Src/PerformanceCollector/PerformanceCollector/QuickPulseTelemetryModule.cs index de79924b72..687e20d9d2 100644 --- a/WEB/Src/PerformanceCollector/PerformanceCollector/QuickPulseTelemetryModule.cs +++ b/WEB/Src/PerformanceCollector/PerformanceCollector/QuickPulseTelemetryModule.cs @@ -702,6 +702,15 @@ private CollectionConfigurationError[] OnUpdatedConfiguration(CollectionConfigur { // we need to preserve the current quota for each document stream that still exists in the new configuration CollectionConfigurationError[] errorsConfig; + + lock (this.telemetryProcessorsLock) + { + foreach (IQuickPulseTelemetryProcessor telemetryProcessor in this.TelemetryProcessors) + { + telemetryProcessor.UpdateGlobalQuotas(this.timeProvider, configurationInfo.QuotaInfo); + } + } + var newCollectionConfiguration = new CollectionConfiguration(configurationInfo, out errorsConfig, this.timeProvider, this.collectionConfiguration?.DocumentStreams); // the next accumulator that gets swapped in on the collection thread will be initialized with the new collection configuration diff --git a/WEB/Src/PerformanceCollector/PerformanceCollector/QuickPulseTelemetryProcessor.cs b/WEB/Src/PerformanceCollector/PerformanceCollector/QuickPulseTelemetryProcessor.cs index f26e0d3174..fe18c96a94 100644 --- a/WEB/Src/PerformanceCollector/PerformanceCollector/QuickPulseTelemetryProcessor.cs +++ b/WEB/Src/PerformanceCollector/PerformanceCollector/QuickPulseTelemetryProcessor.cs @@ -15,6 +15,7 @@ using Microsoft.ApplicationInsights.Extensibility.Implementation.Tracing; using Microsoft.ApplicationInsights.Extensibility.PerfCounterCollector.Implementation.QuickPulse; using Microsoft.ApplicationInsights.Extensibility.PerfCounterCollector.Implementation.QuickPulse.Helpers; + using Microsoft.ApplicationInsights.Extensibility.PerfCounterCollector.Implementation.ServiceContract; using Microsoft.ManagementServices.RealTimeDataProcessing.QuickPulseService; /// @@ -43,7 +44,7 @@ public class QuickPulseTelemetryProcessor : ITelemetryProcessor, ITelemetryModul /// /// An overall, cross-stream quota tracker. /// - private readonly QuickPulseQuotaTracker globalQuotaTracker; + private QuickPulseQuotaTracker globalQuotaTracker; private IQuickPulseDataAccumulatorManager dataAccumulatorManager = null; @@ -72,12 +73,14 @@ public QuickPulseTelemetryProcessor(ITelemetryProcessor next) /// Time provider. /// Max overall telemetry quota. /// Initial overall telemetry quota. + /// Quota Accrual rate per second. /// Thrown if next is null. internal QuickPulseTelemetryProcessor( ITelemetryProcessor next, Clock timeProvider, float? maxGlobalTelemetryQuota = null, - float? initialGlobalTelemetryQuota = null) + float? initialGlobalTelemetryQuota = null, + float? quotaAccrualRatePerSec = null) { this.Next = next ?? throw new ArgumentNullException(nameof(next)); @@ -86,7 +89,8 @@ internal QuickPulseTelemetryProcessor( this.globalQuotaTracker = new QuickPulseQuotaTracker( timeProvider, maxGlobalTelemetryQuota ?? MaxGlobalTelemetryQuota, - initialGlobalTelemetryQuota ?? InitialGlobalTelemetryQuota); + initialGlobalTelemetryQuota ?? InitialGlobalTelemetryQuota, + quotaAccrualRatePerSec); } /// @@ -126,6 +130,25 @@ The configuration that is being passed into this method is the configuration tha this.RegisterSelfWithQuickPulseTelemetryModule(); } + void IQuickPulseTelemetryProcessor.UpdateGlobalQuotas(Clock timeProvider, QuotaConfigurationInfo quotaInfo) + { + if (quotaInfo != null) + { + this.globalQuotaTracker = new QuickPulseQuotaTracker( + timeProvider, + quotaInfo.MaxQuota, + quotaInfo.InitialQuota ?? InitialGlobalTelemetryQuota, + quotaInfo.QuotaAccrualRatePerSec); + } + else + { + this.globalQuotaTracker = new QuickPulseQuotaTracker( + timeProvider, + MaxGlobalTelemetryQuota, + InitialGlobalTelemetryQuota); + } + } + void IQuickPulseTelemetryProcessor.StartCollection( IQuickPulseDataAccumulatorManager accumulatorManager, Uri serviceEndpoint,