-
Notifications
You must be signed in to change notification settings - Fork 650
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Make Counter and MinMaxSumCount aggregators thread safe #439
Make Counter and MinMaxSumCount aggregators thread safe #439
Conversation
It's possible tha future.running() returns BEFORE the task is scheduled.
Codecov Report
@@ Coverage Diff @@
## master #439 +/- ##
==========================================
+ Coverage 88.25% 88.56% +0.31%
==========================================
Files 41 41
Lines 2078 2082 +4
Branches 238 238
==========================================
+ Hits 1834 1844 +10
+ Misses 172 169 -3
+ Partials 72 69 -3
Continue to review full report at Codecov.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One comment about a missing lock, and a few other minor comments, but otherwise this looks great.
Also very nice that the concurrency tests fail if you remove the locks.
self.current = 0 | ||
with self._lock: | ||
self.checkpoint = self.current | ||
self.current = 0 | ||
|
||
def merge(self, other): | ||
self.checkpoint += other.checkpoint |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't you need to lock here too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
According to my understanding merge()
and take_checkpoint()
will be never called concurrently. Those methods are invoked in Batcher::process()
that is invoked from Meter::collect()
that is never concurrent. @lzchen could you please check this reasoning?
opentelemetry-python/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/batcher.py
Lines 87 to 103 in ed25287
def process(self, record): | |
# Checkpoints the current aggregator value to be collected for export | |
record.aggregator.take_checkpoint() | |
batch_key = (record.metric, record.label_set) | |
batch_value = self._batch_map.get(batch_key) | |
aggregator = record.aggregator | |
if batch_value: | |
# Update the stored checkpointed value if exists. The call to merge | |
# here combines only identical records (same key). | |
batch_value.merge(aggregator) | |
return | |
if self.stateful: | |
# if stateful batcher, create a copy of the aggregator and update | |
# it with the current checkpointed value for long-term storage | |
aggregator = self.aggregator_for(record.metric.__class__) | |
aggregator.merge(record.aggregator) | |
self._batch_map[batch_key] = aggregator |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Even still, I think it's better not to rely on the behavior of the caller here. Even better: if they're never called concurrently, there will never be lock contention to slow this down.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, I added locks to merge too but I didn't added tests for that. We can revisit this later on when considering the use of atomics libraries here.
@@ -63,46 +67,45 @@ class MinMaxSumCountAggregator(Aggregator): | |||
"""Agregator for Measure metrics that keeps min, max, sum and count.""" | |||
|
|||
_TYPE = namedtuple("minmaxsumcount", "min max sum count") | |||
_EMPTY = _TYPE(None, None, None, 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't sum be 0
too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good question, I don't have a strong feeling about that, if you have please let me know and I'll change it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it should be because of https://en.wikipedia.org/wiki/Identity_element, also not a strong opinion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given that, min
should be inf
and max -inf
. The point is, do we want to have a special value to indicate that it hasn't been computed?. i.e, None
. I don't have a strong opinion, the go implementation returns an error in such case, even if I don't understand the details of the race condition that is mentioned there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Either way's fine with me, but I'd expect sum and count to either both be None
or both be 0
. As for +/-inf
I think the min/max of an empty set is literally undefined, but the sum and count are defined and 0
.
But this is getting dangerously close to philosophy. LGTM as is.
self._max(self.checkpoint.max, other.checkpoint.max), | ||
self._sum(self.checkpoint.sum, other.checkpoint.sum), | ||
self.checkpoint.count + other.checkpoint.count, | ||
self.checkpoint = self._merge_checkpoint( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you still need to lock here:
- thread 1 calls
merge
, loadsself.checkpoint
- thread 2 calls
take_checkpoint
, updatesself.checkpoint
- thread 1 updates
self.checkpoint
by merging the value it loaded in step 1, undoing step 2
This is based on:
In [9]: dis.dis(MinMaxSumCountAggregator.merge)
109 0 LOAD_FAST 0 (self)
2 LOAD_METHOD 0 (_merge_checkpoint)
110 4 LOAD_FAST 0 (self)
6 LOAD_ATTR 1 (checkpoint)
8 LOAD_FAST 1 (other) << thread 2 start
10 LOAD_ATTR 1 (checkpoint)
12 CALL_METHOD 2
14 LOAD_FAST 0 (self)
16 STORE_ATTR 1 (checkpoint)
18 LOAD_CONST 0 (None)
20 RETURN_VALUE
In [10]: dis.dis(MinMaxSumCountAggregator.take_checkpoint)
104 0 LOAD_FAST 0 (self)
2 LOAD_ATTR 0 (_lock)
4 SETUP_WITH 22 (to 28)
6 POP_TOP
105 8 LOAD_FAST 0 (self)
10 LOAD_ATTR 1 (current)
12 LOAD_FAST 0 (self)
14 STORE_ATTR 2 (checkpoint)
106 16 LOAD_FAST 0 (self) << thread 1 overwrite
18 LOAD_ATTR 3 (_EMPTY)
20 LOAD_FAST 0 (self)
22 STORE_ATTR 1 (current)
24 POP_BLOCK
26 LOAD_CONST 0 (None)
>> 28 WITH_CLEANUP_START
30 WITH_CLEANUP_FINISH
32 END_FINALLY
34 LOAD_CONST 0 (None)
36 RETURN_VALUE
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please see #439 (comment).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM after addressing @ocelotl's comments.
self.current = 0 | ||
with self._lock: | ||
self.checkpoint = self.current | ||
self.current = 0 | ||
|
||
def merge(self, other): | ||
self.checkpoint += other.checkpoint |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Even still, I think it's better not to rely on the behavior of the caller here. Even better: if they're never called concurrently, there will never be lock contention to slow this down.
@@ -63,46 +67,45 @@ class MinMaxSumCountAggregator(Aggregator): | |||
"""Agregator for Measure metrics that keeps min, max, sum and count.""" | |||
|
|||
_TYPE = namedtuple("minmaxsumcount", "min max sum count") | |||
_EMPTY = _TYPE(None, None, None, 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it should be because of https://en.wikipedia.org/wiki/Identity_element, also not a strong opinion.
Rename TracerSource to TracerProvider (open-telemetry#441) Following discussion in open-telemetry#434, align the name with the specification. Co-authored-by: Chris Kleinknecht <libc@google.com> Fix new ext READMEs (open-telemetry#444) Some of the new ext packages had ReStructuredText errors. PyPI rejected the uploads for these packages with: HTTPError: 400 Client Error: The description failed to render for 'text/x-rst'. See https://pypi.org/help/#description-content-type for more information. for url: https://upload.pypi.org/legacy/ Adding attach/detach methods as per spec (open-telemetry#429) This change updates the Context API with the following: - removes the remove_value method - removes the set_current method - adds attach and detach methods Fixes open-telemetry#420 Co-authored-by: Chris Kleinknecht <libc@google.com> Make Counter and MinMaxSumCount aggregators thread safe (open-telemetry#439) OT Collector trace exporter (open-telemetry#405) Based on the OpenCensus agent exporter. Fixes open-telemetry#343 Co-authored-by: Chris Kleinknecht <libc@google.com> API: Renaming TraceOptions to TraceFlags (open-telemetry#450) Renaming TraceOptions to TraceFlags, which is the term used to describe the flags associated with the trace in the OpenTelemetry specification. Closes open-telemetry#434 api: Implement "named" meters + Remove "Batcher" from Meter constructor (open-telemetry#431) Implements open-telemetry#221. Also fixes open-telemetry#394. Stateful.py and stateless.py in metrics example folder are not changed to use the new loader in anticipation of open-telemetry#422 being merged first and removing them. Lastly, moves InstrumentationInfo from trace.py in the sdk to utils. Prepare to host on readthedocs.org (open-telemetry#452) sdk: Implement observer instrument (open-telemetry#425) Observer instruments are used to capture a current set of values at a point in time [1]. This commit extends the Meter interface to allow to register an observer instrument by pasing a callback that will be executed at collection time. The logic inside collection is updated to consider these instruments and a new ObserverAggregator is implemented. [1] https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/api-metrics.md#observer-instruments sdk: fix ConsoleSpanExporter (open-telemetry#455) 19d573a ("Add io and formatter options to console exporter (open-telemetry#412)") changed the way spans are printed by using write() instead of print(). In Python 3.x sys.stdout is line-buffered, so the spans were not being printed to the console at the right timing. This commit fixes that by adding an explicit flush() call at the end of the export function , it also changes the default formatter to include a line break. To be precise, only one of the changes was needed to solve the problem, but as a matter of completness both are included, i.e, to handle the case where the formatter chosen by the user doesn't append a line break. jaeger: Usage README Update for opentelemetry-ext-jaeger (open-telemetry#459) Usage docs for opentelemetry-ext-jaeger need to be updated after the change to `TracerSource` with v0.4. Looks like it was partially updated already. Users following the usage docs will currently run into this error: `AttributeError: 'Tracer' object has no attribute 'add_span_processor'` api: Implementing Propagators API to use Context (open-telemetry#446) Implementing Propagators API to use Context. Moving tracecontexthttptextformat to trace/propagation, as TraceContext is specific to trace rather that broader context propagation. Using attach/detach for wsgi and flask extensions, enabling activation of the full context rather that activating of a sub component such as traces. Adding a composite propagator. Co-authored-by: Mauricio Vásquez <mauricio@kinvolk.io>
Total Changelog: Documentations has been significantly overhauled, including: * a getting started guide * examples has been consolidated to an docs/examples folder * several minor improvements to the examples in each extension's readme. - Adding Correlation Context API and propagator ([open-telemetry#471](open-telemetry#471)) - Adding a global configuration module to simplify setting and getting globals ([open-telemetry#466](open-telemetry#466)) - Rename metric handle to bound metric ([open-telemetry#470](open-telemetry#470)) - Moving resources to sdk ([open-telemetry#464](open-telemetry#464)) - Implementing propagators to API to use context ([open-telemetry#446](open-telemetry#446)) - Implement observer instrument for metrics ([open-telemetry#425](open-telemetry#425)) - Adding named meters, removing batchers ([open-telemetry#431](open-telemetry#431)) - Renaming TraceOptions to TraceFlags ([open-telemetry#450](open-telemetry#450)) - Renaming TracerSource to TraceProvider ([open-telemetry#441](open-telemetry#441)) - Adding Correlation Context SDK and propagator ([open-telemetry#471](open-telemetry#471)) - Adding OT Collector metrics exporter ([open-telemetry#454](open-telemetry#454)) - Improve validation of attributes ([open-telemetry#460](open-telemetry#460)) - Re-raise errors caught in opentelemetry.sdk.trace.Tracer.use_span() (open-telemetry#469) ([open-telemetry#469](open-telemetry#469)) - Adding named meters, removing batchers ([open-telemetry#431](open-telemetry#431)) - Make counter and MinMaxSumCount aggregators thread safe ([open-telemetry#439](open-telemetry#439)) - Initial release. Support is included for both trace and metrics.
Total Changelog: Documentations has been significantly overhauled, including: * a getting started guide * examples has been consolidated to an docs/examples folder * several minor improvements to the examples in each extension's readme. - Adding Correlation Context API and propagator ([open-telemetry#471](open-telemetry#471)) - Adding a global configuration module to simplify setting and getting globals ([open-telemetry#466](open-telemetry#466)) - Rename metric handle to bound metric ([open-telemetry#470](open-telemetry#470)) - Moving resources to sdk ([open-telemetry#464](open-telemetry#464)) - Implementing propagators to API to use context ([open-telemetry#446](open-telemetry#446)) - Implement observer instrument for metrics ([open-telemetry#425](open-telemetry#425)) - Adding named meters, removing batchers ([open-telemetry#431](open-telemetry#431)) - Renaming TraceOptions to TraceFlags ([open-telemetry#450](open-telemetry#450)) - Renaming TracerSource to TraceProvider ([open-telemetry#441](open-telemetry#441)) - Adding Correlation Context SDK and propagator ([open-telemetry#471](open-telemetry#471)) - Adding OT Collector metrics exporter ([open-telemetry#454](open-telemetry#454)) - Improve validation of attributes ([open-telemetry#460](open-telemetry#460)) - Re-raise errors caught in opentelemetry.sdk.trace.Tracer.use_span() (open-telemetry#469) ([open-telemetry#469](open-telemetry#469)) - Adding named meters, removing batchers ([open-telemetry#431](open-telemetry#431)) - Make counter and MinMaxSumCount aggregators thread safe ([open-telemetry#439](open-telemetry#439)) - Initial release. Support is included for both trace and metrics.
Rename TracerSource to TracerProvider (open-telemetry#441) Following discussion in open-telemetry#434, align the name with the specification. Co-authored-by: Chris Kleinknecht <libc@google.com> Fix new ext READMEs (open-telemetry#444) Some of the new ext packages had ReStructuredText errors. PyPI rejected the uploads for these packages with: HTTPError: 400 Client Error: The description failed to render for 'text/x-rst'. See https://pypi.org/help/#description-content-type for more information. for url: https://upload.pypi.org/legacy/ Adding attach/detach methods as per spec (open-telemetry#429) This change updates the Context API with the following: - removes the remove_value method - removes the set_current method - adds attach and detach methods Fixes open-telemetry#420 Co-authored-by: Chris Kleinknecht <libc@google.com> Make Counter and MinMaxSumCount aggregators thread safe (open-telemetry#439) OT Collector trace exporter (open-telemetry#405) Based on the OpenCensus agent exporter. Fixes open-telemetry#343 Co-authored-by: Chris Kleinknecht <libc@google.com> API: Renaming TraceOptions to TraceFlags (open-telemetry#450) Renaming TraceOptions to TraceFlags, which is the term used to describe the flags associated with the trace in the OpenTelemetry specification. Closes open-telemetry#434 api: Implement "named" meters + Remove "Batcher" from Meter constructor (open-telemetry#431) Implements open-telemetry#221. Also fixes open-telemetry#394. Stateful.py and stateless.py in metrics example folder are not changed to use the new loader in anticipation of open-telemetry#422 being merged first and removing them. Lastly, moves InstrumentationInfo from trace.py in the sdk to utils. Prepare to host on readthedocs.org (open-telemetry#452) sdk: Implement observer instrument (open-telemetry#425) Observer instruments are used to capture a current set of values at a point in time [1]. This commit extends the Meter interface to allow to register an observer instrument by pasing a callback that will be executed at collection time. The logic inside collection is updated to consider these instruments and a new ObserverAggregator is implemented. [1] https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/api-metrics.md#observer-instruments sdk: fix ConsoleSpanExporter (open-telemetry#455) 19d573a ("Add io and formatter options to console exporter (open-telemetry#412)") changed the way spans are printed by using write() instead of print(). In Python 3.x sys.stdout is line-buffered, so the spans were not being printed to the console at the right timing. This commit fixes that by adding an explicit flush() call at the end of the export function , it also changes the default formatter to include a line break. To be precise, only one of the changes was needed to solve the problem, but as a matter of completness both are included, i.e, to handle the case where the formatter chosen by the user doesn't append a line break. jaeger: Usage README Update for opentelemetry-ext-jaeger (open-telemetry#459) Usage docs for opentelemetry-ext-jaeger need to be updated after the change to `TracerSource` with v0.4. Looks like it was partially updated already. Users following the usage docs will currently run into this error: `AttributeError: 'Tracer' object has no attribute 'add_span_processor'` api: Implementing Propagators API to use Context (open-telemetry#446) Implementing Propagators API to use Context. Moving tracecontexthttptextformat to trace/propagation, as TraceContext is specific to trace rather that broader context propagation. Using attach/detach for wsgi and flask extensions, enabling activation of the full context rather that activating of a sub component such as traces. Adding a composite propagator. Co-authored-by: Mauricio Vásquez <mauricio@kinvolk.io>
Rename TracerSource to TracerProvider (open-telemetry#441) Following discussion in open-telemetry#434, align the name with the specification. Co-authored-by: Chris Kleinknecht <libc@google.com> Fix new ext READMEs (open-telemetry#444) Some of the new ext packages had ReStructuredText errors. PyPI rejected the uploads for these packages with: HTTPError: 400 Client Error: The description failed to render for 'text/x-rst'. See https://pypi.org/help/#description-content-type for more information. for url: https://upload.pypi.org/legacy/ Adding attach/detach methods as per spec (open-telemetry#429) This change updates the Context API with the following: - removes the remove_value method - removes the set_current method - adds attach and detach methods Fixes open-telemetry#420 Co-authored-by: Chris Kleinknecht <libc@google.com> Make Counter and MinMaxSumCount aggregators thread safe (open-telemetry#439) OT Collector trace exporter (open-telemetry#405) Based on the OpenCensus agent exporter. Fixes open-telemetry#343 Co-authored-by: Chris Kleinknecht <libc@google.com> API: Renaming TraceOptions to TraceFlags (open-telemetry#450) Renaming TraceOptions to TraceFlags, which is the term used to describe the flags associated with the trace in the OpenTelemetry specification. Closes open-telemetry#434 api: Implement "named" meters + Remove "Batcher" from Meter constructor (open-telemetry#431) Implements open-telemetry#221. Also fixes open-telemetry#394. Stateful.py and stateless.py in metrics example folder are not changed to use the new loader in anticipation of open-telemetry#422 being merged first and removing them. Lastly, moves InstrumentationInfo from trace.py in the sdk to utils. Prepare to host on readthedocs.org (open-telemetry#452) sdk: Implement observer instrument (open-telemetry#425) Observer instruments are used to capture a current set of values at a point in time [1]. This commit extends the Meter interface to allow to register an observer instrument by pasing a callback that will be executed at collection time. The logic inside collection is updated to consider these instruments and a new ObserverAggregator is implemented. [1] https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/api-metrics.md#observer-instruments sdk: fix ConsoleSpanExporter (open-telemetry#455) 19d573a ("Add io and formatter options to console exporter (open-telemetry#412)") changed the way spans are printed by using write() instead of print(). In Python 3.x sys.stdout is line-buffered, so the spans were not being printed to the console at the right timing. This commit fixes that by adding an explicit flush() call at the end of the export function , it also changes the default formatter to include a line break. To be precise, only one of the changes was needed to solve the problem, but as a matter of completness both are included, i.e, to handle the case where the formatter chosen by the user doesn't append a line break. jaeger: Usage README Update for opentelemetry-ext-jaeger (open-telemetry#459) Usage docs for opentelemetry-ext-jaeger need to be updated after the change to `TracerSource` with v0.4. Looks like it was partially updated already. Users following the usage docs will currently run into this error: `AttributeError: 'Tracer' object has no attribute 'add_span_processor'` api: Implementing Propagators API to use Context (open-telemetry#446) Implementing Propagators API to use Context. Moving tracecontexthttptextformat to trace/propagation, as TraceContext is specific to trace rather that broader context propagation. Using attach/detach for wsgi and flask extensions, enabling activation of the full context rather that activating of a sub component such as traces. Adding a composite propagator. Co-authored-by: Mauricio Vásquez <mauricio@kinvolk.io>
* chore: cleanup closed issues * chore: update basic tracer * chore: clear open-telemetry#59 * chore: clear todo in DEFAULT_CONFIG
The
update()
and thetake_checkpoint()
functions on these aggregators can be called concurrently,update()
is called from the user context whiletake_checkpoint()
is invoked from the controller. It is also possible to receive concurrent calls to theupdate()
function from the user. For these reasons this PR adds a locking mechanism to guarantee that those functions are thread safe. This PR also implements some concurrency tests for those aggregators.As a note, it could be a temporal solution as in the future we want to explore atomic to avoid the cost of locking.