diff --git a/src/lib/Microsoft.Health.Fhir.Ingest.Template/MeasurementExtractor.cs b/src/lib/Microsoft.Health.Fhir.Ingest.Template/MeasurementExtractor.cs index 67d625e1..984948e9 100644 --- a/src/lib/Microsoft.Health.Fhir.Ingest.Template/MeasurementExtractor.cs +++ b/src/lib/Microsoft.Health.Fhir.Ingest.Template/MeasurementExtractor.cs @@ -126,12 +126,18 @@ protected virtual IEnumerable MatchTypeTokens(JObject token) EnsureArg.IsNotNull(token, nameof(token)); var evaluator = CreateRequiredExpressionEvaluator(Template.TypeMatchExpression, nameof(Template.TypeMatchExpression)); + JObject tokenClone = null; + foreach (var extractedToken in evaluator.SelectTokens(token)) { // Add the extracted data as an element of the original data. // This allows subsequent expressions access to data from the original event data + if (tokenClone == null) + { + tokenClone = new JObject(token); + } - var tokenClone = token.DeepClone() as JObject; + tokenClone.Remove(MatchedToken); tokenClone.Add(MatchedToken, extractedToken); yield return tokenClone; } diff --git a/src/lib/Microsoft.Health.Fhir.Ingest/Service/MeasurementEventNormalizationService.cs b/src/lib/Microsoft.Health.Fhir.Ingest/Service/MeasurementEventNormalizationService.cs index 0cafa0f6..c58e2971 100644 --- a/src/lib/Microsoft.Health.Fhir.Ingest/Service/MeasurementEventNormalizationService.cs +++ b/src/lib/Microsoft.Health.Fhir.Ingest/Service/MeasurementEventNormalizationService.cs @@ -6,6 +6,7 @@ using System; using System.Collections.Concurrent; using System.Collections.Generic; +using System.Diagnostics; using System.Linq; using System.Threading; using System.Threading.Tasks; @@ -117,10 +118,19 @@ private async Task StartConsumer(ISourceBlock producer, IEnumerableAs try { + var stopWatch = new Stopwatch(); + stopWatch.Start(); foreach (var measurement in _contentTemplate.GetMeasurements(token)) { measurement.IngestionTimeUtc = evt.SystemProperties.EnqueuedTimeUtc; createdMeasurements.Add((partitionId, measurement)); + + stopWatch.Stop(); + _log.LogMetric( + IomtMetrics.NormalizedEventGenerationTimeMs(partitionId), + stopWatch.ElapsedMilliseconds); + stopWatch.Reset(); + stopWatch.Start(); } } catch (Exception ex) diff --git a/src/lib/Microsoft.Health.Fhir.Ingest/Telemetry/Metrics/IomtMetricDefinition.cs b/src/lib/Microsoft.Health.Fhir.Ingest/Telemetry/Metrics/IomtMetricDefinition.cs index bec58505..6e9e0411 100644 --- a/src/lib/Microsoft.Health.Fhir.Ingest/Telemetry/Metrics/IomtMetricDefinition.cs +++ b/src/lib/Microsoft.Health.Fhir.Ingest/Telemetry/Metrics/IomtMetricDefinition.cs @@ -24,6 +24,8 @@ private IomtMetricDefinition(string metricName) public static IomtMetricDefinition NormalizedEvent { get; } = new IomtMetricDefinition(nameof(NormalizedEvent)); + public static IomtMetricDefinition NormalizedEventGenerationTimeMs { get; } = new IomtMetricDefinition(nameof(NormalizedEventGenerationTimeMs)); + public static IomtMetricDefinition Measurement { get; } = new IomtMetricDefinition(nameof(Measurement)); public static IomtMetricDefinition MeasurementGroup { get; } = new IomtMetricDefinition(nameof(MeasurementGroup)); diff --git a/src/lib/Microsoft.Health.Fhir.Ingest/Telemetry/Metrics/IomtMetrics.cs b/src/lib/Microsoft.Health.Fhir.Ingest/Telemetry/Metrics/IomtMetrics.cs index defc5c62..f180c42e 100644 --- a/src/lib/Microsoft.Health.Fhir.Ingest/Telemetry/Metrics/IomtMetrics.cs +++ b/src/lib/Microsoft.Health.Fhir.Ingest/Telemetry/Metrics/IomtMetrics.cs @@ -159,5 +159,16 @@ public static Metric HandledException(string exceptionName, string connectorStag { return exceptionName.ToErrorMetric(connectorStage, ErrorType.GeneralError, ErrorSeverity.Critical); } + + /// + /// The time it takes to generate a Normalized Event. + /// + /// The partition id of the events being consumed from the event hub partition + public static Metric NormalizedEventGenerationTimeMs(string partitionId = null) + { + return IomtMetricDefinition.NormalizedEventGenerationTimeMs + .CreateBaseMetric(Category.Traffic, ConnectorOperation.Normalization) + .AddDimension(_partitionDimension, partitionId); + } } }