Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update how measurements are processed #196

Closed

Conversation

rogordon01
Copy link
Contributor

This PR makes several enhancements to how Measurements are produced from DeviceEvents:

  • NormalizedEvents are emitted to the processing pipeline as soon as they are created. Previously, all NormalizedEvents for a single DeviceEvent were added to a collection before being emitted to the processing pipeline
  • Reusing a single JObject clone during template evaluation
  • Added a metric to capture processing time per NormalizedEvent

@rogordon01 rogordon01 requested a review from a team as a code owner June 3, 2022 18:00
@@ -201,5 +189,70 @@ private Task<bool> ProcessErrorAsync(Exception ex, EventData data)
var handled = _exceptionTelemetryProcessor.HandleException(ex, _log);
return Task.FromResult(!handled);
}

private IEnumerable<(string, IMeasurement)> CreateMeasurementIterator(JToken token, string partitionId, ConcurrentBag<Exception> exceptions, Func<Exception, EventData, Task<bool>> errorConsumer, EventData evt)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously, we batched all Measurements per DeviceEvent before sending down the processing pipeline. If there was an error with one Measurement we wouldn't send any Measurements for that DeviceEvent.

This change streams each Measurement onto the processing pipeline as soon as it is generated

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So one consequence of this is we have bad data and spin, we could be now sending more repeat/duplicate messages down stream?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is true if we were to spin. But I had thought that we decided not to spin, and simply discard bad data? Until we could send it over to the EMS?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will continue to spin by default though I did add the ability to change the strategy if desired.

storeNormalizedException(ex);
shouldLoop = false;
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dustinburson Do we skip processing the remainder of the DeviceEvent now that one Measurement produced an Exception? Or do we simply skip this Measurement, and allow the remaining Measurements to be generated?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it we would keep processing but we would need to discuss further.

tokenClone.Add(MatchedToken, extractedToken);
yield return tokenClone;
tokenClone.Remove(MatchedToken);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this work? We are returning the tokenClone and them updating the same reference to remove the property. I don't think we can do the remove. Anything downstream of this will potentially be missing the MatchedToken.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should work as intended as this is a synchronous IEnumerable. We first yield the cloned JToken and then produce a Measurement from it. Finally we return here to remove the MatchedToken property and start the loop again. The code producing the Measurement does not hold onto a reference for the token it was given.

We have various unit tests which should be testing out this scenario, such as: https://github.com/microsoft/iomt-fhir/blob/main/test/Microsoft.Health.Fhir.Ingest.Template.UnitTests/CalculatedFunctionContentTemplateTests.cs#L79

@@ -201,5 +189,70 @@ private Task<bool> ProcessErrorAsync(Exception ex, EventData data)
var handled = _exceptionTelemetryProcessor.HandleException(ex, _log);
return Task.FromResult(!handled);
}

private IEnumerable<(string, IMeasurement)> CreateMeasurementIterator(JToken token, string partitionId, ConcurrentBag<Exception> exceptions, Func<Exception, EventData, Task<bool>> errorConsumer, EventData evt)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So one consequence of this is we have bad data and spin, we could be now sending more repeat/duplicate messages down stream?

@@ -45,7 +46,7 @@ public class MeasurementEventNormalizationService : IDataNormalizationService<Ev
Data.IConverter<EventData, JToken> converter,
IExceptionTelemetryProcessor exceptionTelemetryProcessor,
int maxParallelism,
int asyncCollectorBatchSize = 200)
int asyncCollectorBatchSize = 25)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have any data on the impact of this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No hard numbers. I had originally put in 200 as a starting point, though. With the intent on tweaking it as needed. I can revert this change.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That might be good for now. If we have enough time to compare just this change with out the others and we see a benefit then we can include.

storeNormalizedException(ex);
shouldLoop = false;
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it we would keep processing but we would need to discuss further.

{
try
{
stopWatch.Start();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have some helper classes that simplify this. We may need to port them from PaaS to OSS.

foreach (var measurement in _contentTemplate.GetMeasurements(token))
{
measurement.IngestionTimeUtc = evt.SystemProperties.EnqueuedTimeUtc;
createdMeasurements.Add((partitionId, measurement));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason we can't just yield return here?

Copy link
Contributor Author

@rogordon01 rogordon01 Jun 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately a yield statement cannot existing inside of a try block if there is a catch block. That is the original reason why I create the custom IEnumerable.

https://docs.microsoft.com/en-us/archive/msdn-magazine/2017/june/essential-net-custom-iterators-with-yield#yield-statement-requirements


try
{
enumerable = _contentTemplate.GetMeasurements(token).GetEnumerator();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't be we able to remove the lower while loop and just yield return as we iterate through each element here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may also want to look at simplifying this class completely. I don't know if using the TPL adds much value. We can also at switching over to async enumerables https://docs.microsoft.com/en-us/archive/msdn-magazine/2019/november/csharp-iterating-with-async-enumerables-in-csharp-8

https://docs.microsoft.com/en-us/dotnet/csharp/whats-new/tutorials/generate-consume-asynchronous-stream

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my comment about yield statements inside of try/catch blocks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Switching this to an AsyncEnumberable could be a good idea. At the very least it would simplify things. I did make use of the pipelines ability to batch, though. But we could see if that could be refactored.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the other perf PR we are no longer using this class. Thoughts on reverting this change? It will still be used for the Azure Function implementation but we wouldn't be taking the risk of potentially introducing a new bug for that path. The new metric is great, and we can work to port it over to the new implementation.

}
}

var data = new { Body = body, input.Properties, input.SystemProperties };
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From my testing this actually adds quite a bit of overhead. In my branch I tweaked it to the following:

        private const string BodyAttr = "Body";
        private const string PropertiesAttr = "Properties";
        private const string SystemPropertiesAttr = "SystemProperties";

        private readonly JsonSerializer jsonSerializer = JsonSerializer.CreateDefault();

        public JToken Convert(IEventMessage input)
        {
            EnsureArg.IsNotNull(input, nameof(input));

            JToken token = new JObject();
            JToken body = null;

            if (input.Body.Length > 0)
            {
                using StreamReader streamReader = new StreamReader(input.Body.AsStream(), Encoding.UTF8);
                using JsonReader jsonReader = new JsonTextReader(streamReader);
                body = JToken.ReadFrom(jsonReader);
            }

            token[BodyAttr] = body;
            token[PropertiesAttr] = JToken.FromObject(input.Properties, jsonSerializer);
            token[SystemPropertiesAttr] = JToken.FromObject(input.SystemProperties, jsonSerializer);

            return token;
        }

The stream processing will work a little differently in my example (I updated the code to have the original ReadOnlyMemory and I am taking advantage of that) but the rest should be achievable in this version. I think the biggest cost is building the new JToken from the object created in line 32. Starting with an empty JObject and adding the properties to it manually resulted in a ~ 35% increase in number of bytes processed per minute. I am still not applying the templates but the increase is promising.

@dustinburson dustinburson self-requested a review June 17, 2022 01:49
@rogordon01
Copy link
Contributor Author

Closing PR as most of these perf improvements have been made redundant by other PRs. The relevant improvements have been moved into PR #199

@rogordon01 rogordon01 closed this Jun 18, 2022
@dustinburson dustinburson deleted the personal/rogordon/streamMeasurementsDuringCollection branch November 8, 2022 18:39
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants