Skip to content

Commit

Permalink
Update ForceFlush behavior to meet with the latest spec requirements (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
reyang authored Sep 20, 2021
1 parent 1d63b31 commit 47e1c5e
Show file tree
Hide file tree
Showing 11 changed files with 63 additions and 80 deletions.
10 changes: 5 additions & 5 deletions src/OpenTelemetry/BaseExporter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ public abstract class BaseExporter<T> : IDisposable
/// shutdown completed or timed out.
/// </summary>
/// <param name="timeoutMilliseconds">
/// The number of milliseconds to wait, or <c>Timeout.Infinite</c> to
/// wait indefinitely.
/// The number (non-negative) of milliseconds to wait, or
/// <c>Timeout.Infinite</c> to wait indefinitely.
/// </param>
/// <returns>
/// Returns <c>true</c> when shutdown succeeded; otherwise, <c>false</c>.
Expand All @@ -79,7 +79,7 @@ public bool Shutdown(int timeoutMilliseconds = Timeout.Infinite)
{
if (timeoutMilliseconds < 0 && timeoutMilliseconds != Timeout.Infinite)
{
throw new ArgumentOutOfRangeException(nameof(timeoutMilliseconds), timeoutMilliseconds, "timeoutMilliseconds should be non-negative.");
throw new ArgumentOutOfRangeException(nameof(timeoutMilliseconds), timeoutMilliseconds, "timeoutMilliseconds should be non-negative or Timeout.Infinite.");
}

if (Interlocked.Increment(ref this.shutdownCount) > 1)
Expand Down Expand Up @@ -110,8 +110,8 @@ public void Dispose()
/// thread until shutdown completed or timed out.
/// </summary>
/// <param name="timeoutMilliseconds">
/// The number of milliseconds to wait, or <c>Timeout.Infinite</c> to
/// wait indefinitely.
/// The number (non-negative) of milliseconds to wait, or
/// <c>Timeout.Infinite</c> to wait indefinitely.
/// </param>
/// <returns>
/// Returns <c>true</c> when shutdown succeeded; otherwise, <c>false</c>.
Expand Down
20 changes: 10 additions & 10 deletions src/OpenTelemetry/BaseProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ public virtual void OnEnd(T data)
/// completed, shutdown signaled or timed out.
/// </summary>
/// <param name="timeoutMilliseconds">
/// The number of milliseconds to wait, or <c>Timeout.Infinite</c> to
/// wait indefinitely.
/// The number (non-negative) of milliseconds to wait, or
/// <c>Timeout.Infinite</c> to wait indefinitely.
/// </param>
/// <returns>
/// Returns <c>true</c> when flush succeeded; otherwise, <c>false</c>.
Expand All @@ -84,7 +84,7 @@ public bool ForceFlush(int timeoutMilliseconds = Timeout.Infinite)
{
if (timeoutMilliseconds < 0 && timeoutMilliseconds != Timeout.Infinite)
{
throw new ArgumentOutOfRangeException(nameof(timeoutMilliseconds), timeoutMilliseconds, "timeoutMilliseconds should be non-negative.");
throw new ArgumentOutOfRangeException(nameof(timeoutMilliseconds), timeoutMilliseconds, "timeoutMilliseconds should be non-negative or Timeout.Infinite.");
}

try
Expand All @@ -103,8 +103,8 @@ public bool ForceFlush(int timeoutMilliseconds = Timeout.Infinite)
/// shutdown completed or timed out.
/// </summary>
/// <param name="timeoutMilliseconds">
/// The number of milliseconds to wait, or <c>Timeout.Infinite</c> to
/// wait indefinitely.
/// The number (non-negative) of milliseconds to wait, or
/// <c>Timeout.Infinite</c> to wait indefinitely.
/// </param>
/// <returns>
/// Returns <c>true</c> when shutdown succeeded; otherwise, <c>false</c>.
Expand All @@ -120,7 +120,7 @@ public bool Shutdown(int timeoutMilliseconds = Timeout.Infinite)
{
if (timeoutMilliseconds < 0 && timeoutMilliseconds != Timeout.Infinite)
{
throw new ArgumentOutOfRangeException(nameof(timeoutMilliseconds), timeoutMilliseconds, "timeoutMilliseconds should be non-negative.");
throw new ArgumentOutOfRangeException(nameof(timeoutMilliseconds), timeoutMilliseconds, "timeoutMilliseconds should be non-negative or Timeout.Infinite.");
}

if (Interlocked.Increment(ref this.shutdownCount) > 1)
Expand Down Expand Up @@ -156,8 +156,8 @@ internal virtual void SetParentProvider(BaseProvider parentProvider)
/// thread until flush completed, shutdown signaled or timed out.
/// </summary>
/// <param name="timeoutMilliseconds">
/// The number of milliseconds to wait, or <c>Timeout.Infinite</c> to
/// wait indefinitely.
/// The number (non-negative) of milliseconds to wait, or
/// <c>Timeout.Infinite</c> to wait indefinitely.
/// </param>
/// <returns>
/// Returns <c>true</c> when flush succeeded; otherwise, <c>false</c>.
Expand All @@ -177,8 +177,8 @@ protected virtual bool OnForceFlush(int timeoutMilliseconds)
/// thread until shutdown completed or timed out.
/// </summary>
/// <param name="timeoutMilliseconds">
/// The number of milliseconds to wait, or <c>Timeout.Infinite</c> to
/// wait indefinitely.
/// The number (non-negative) of milliseconds to wait, or
/// <c>Timeout.Infinite</c> to wait indefinitely.
/// </param>
/// <returns>
/// Returns <c>true</c> when shutdown succeeded; otherwise, <c>false</c>.
Expand Down
5 changes: 5 additions & 0 deletions src/OpenTelemetry/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@

## Unreleased

* Changed `CompositeProcessor<T>.OnForceFlush` to meet with the spec
requirement. Now the SDK will invoke `ForceFlush` on all registered
processors, even if there is a timeout.
([#2388](https://github.com/open-telemetry/opentelemetry-dotnet/pull/2388))

## 1.2.0-alpha3

Released 2021-Sep-13
Expand Down
19 changes: 5 additions & 14 deletions src/OpenTelemetry/CompositeProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -95,37 +95,28 @@ public override void OnStart(T data)
/// <inheritdoc/>
protected override bool OnForceFlush(int timeoutMilliseconds)
{
var result = true;
var cur = this.head;

var sw = Stopwatch.StartNew();

while (cur != null)
{
if (timeoutMilliseconds == Timeout.Infinite)
{
_ = cur.Value.ForceFlush(Timeout.Infinite);
result = cur.Value.ForceFlush(Timeout.Infinite) && result;
}
else
{
var timeout = timeoutMilliseconds - sw.ElapsedMilliseconds;

if (timeout <= 0)
{
return false;
}

var succeeded = cur.Value.ForceFlush((int)timeout);

if (!succeeded)
{
return false;
}
// notify all the processors, even if we run overtime
result = cur.Value.ForceFlush((int)Math.Max(timeout, 0)) && result;
}

cur = cur.Next;
}

return true;
return result;
}

/// <inheritdoc/>
Expand Down
5 changes: 0 additions & 5 deletions src/OpenTelemetry/Metrics/CompositeMetricReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,6 @@ protected override bool ProcessMetrics(Batch<Metric> metrics, int timeoutMillise
/// <inheritdoc/>
protected override bool OnCollect(int timeoutMilliseconds = Timeout.Infinite)
{
if (timeoutMilliseconds < 0 && timeoutMilliseconds != Timeout.Infinite)
{
throw new ArgumentOutOfRangeException(nameof(timeoutMilliseconds), timeoutMilliseconds, "timeoutMilliseconds should be non-negative.");
}

var result = true;
var cur = this.head;
var sw = Stopwatch.StartNew();
Expand Down
28 changes: 14 additions & 14 deletions src/OpenTelemetry/Metrics/MeterProviderExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ public static class MeterProviderExtensions
/// </summary>
/// <param name="provider">MeterProviderSdk instance on which ForceFlush will be called.</param>
/// <param name="timeoutMilliseconds">
/// The number of milliseconds to wait, or <c>Timeout.Infinite</c> to
/// wait indefinitely.
/// The number (non-negative) of milliseconds to wait, or
/// <c>Timeout.Infinite</c> to wait indefinitely.
/// </param>
/// <returns>
/// Returns <c>true</c> when force flush succeeded; otherwise, <c>false</c>.
Expand All @@ -48,13 +48,13 @@ public static bool ForceFlush(this MeterProvider provider, int timeoutMillisecon
throw new ArgumentNullException(nameof(provider));
}

if (provider is MeterProviderSdk meterProviderSdk)
if (timeoutMilliseconds < 0 && timeoutMilliseconds != Timeout.Infinite)
{
if (timeoutMilliseconds < 0 && timeoutMilliseconds != Timeout.Infinite)
{
throw new ArgumentOutOfRangeException(nameof(timeoutMilliseconds), timeoutMilliseconds, "timeoutMilliseconds should be non-negative.");
}
throw new ArgumentOutOfRangeException(nameof(timeoutMilliseconds), timeoutMilliseconds, "timeoutMilliseconds should be non-negative or Timeout.Infinite.");
}

if (provider is MeterProviderSdk meterProviderSdk)
{
try
{
return meterProviderSdk.OnForceFlush(timeoutMilliseconds);
Expand All @@ -76,8 +76,8 @@ public static bool ForceFlush(this MeterProvider provider, int timeoutMillisecon
/// </summary>
/// <param name="provider">MeterProviderSdk instance on which Shutdown will be called.</param>
/// <param name="timeoutMilliseconds">
/// The number of milliseconds to wait, or <c>Timeout.Infinite</c> to
/// wait indefinitely.
/// The number (non-negative) of milliseconds to wait, or
/// <c>Timeout.Infinite</c> to wait indefinitely.
/// </param>
/// <returns>
/// Returns <c>true</c> when shutdown succeeded; otherwise, <c>false</c>.
Expand All @@ -96,13 +96,13 @@ public static bool Shutdown(this MeterProvider provider, int timeoutMilliseconds
throw new ArgumentNullException(nameof(provider));
}

if (provider is MeterProviderSdk meterProviderSdk)
if (timeoutMilliseconds < 0 && timeoutMilliseconds != Timeout.Infinite)
{
if (timeoutMilliseconds < 0 && timeoutMilliseconds != Timeout.Infinite)
{
throw new ArgumentOutOfRangeException(nameof(timeoutMilliseconds), timeoutMilliseconds, "timeoutMilliseconds should be non-negative.");
}
throw new ArgumentOutOfRangeException(nameof(timeoutMilliseconds), timeoutMilliseconds, "timeoutMilliseconds should be non-negative or Timeout.Infinite.");
}

if (provider is MeterProviderSdk meterProviderSdk)
{
if (Interlocked.Increment(ref meterProviderSdk.ShutdownCount) > 1)
{
return false; // shutdown already called
Expand Down
8 changes: 4 additions & 4 deletions src/OpenTelemetry/Metrics/MeterProviderSdk.cs
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,8 @@ internal Batch<Metric> Collect()
/// thread until flush completed or timed out.
/// </summary>
/// <param name="timeoutMilliseconds">
/// The number of milliseconds to wait, or <c>Timeout.Infinite</c> to
/// wait indefinitely.
/// The number (non-negative) of milliseconds to wait, or
/// <c>Timeout.Infinite</c> to wait indefinitely.
/// </param>
/// <returns>
/// Returns <c>true</c> when flush succeeded; otherwise, <c>false</c>.
Expand All @@ -208,8 +208,8 @@ internal bool OnForceFlush(int timeoutMilliseconds)
/// thread until shutdown completed or timed out.
/// </summary>
/// <param name="timeoutMilliseconds">
/// The number of milliseconds to wait, or <c>Timeout.Infinite</c> to
/// wait indefinitely.
/// The number (non-negative) of milliseconds to wait, or
/// <c>Timeout.Infinite</c> to wait indefinitely.
/// </param>
/// <returns>
/// Returns <c>true</c> when shutdown succeeded; otherwise, <c>false</c>.
Expand Down
4 changes: 2 additions & 2 deletions src/OpenTelemetry/Metrics/MetricReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public bool Collect(int timeoutMilliseconds = Timeout.Infinite)
{
if (timeoutMilliseconds < 0 && timeoutMilliseconds != Timeout.Infinite)
{
throw new ArgumentOutOfRangeException(nameof(timeoutMilliseconds), timeoutMilliseconds, "timeoutMilliseconds should be non-negative.");
throw new ArgumentOutOfRangeException(nameof(timeoutMilliseconds), timeoutMilliseconds, "timeoutMilliseconds should be non-negative or Timeout.Infinite.");
}

try
Expand Down Expand Up @@ -106,7 +106,7 @@ public bool Shutdown(int timeoutMilliseconds = Timeout.Infinite)
{
if (timeoutMilliseconds < 0 && timeoutMilliseconds != Timeout.Infinite)
{
throw new ArgumentOutOfRangeException(nameof(timeoutMilliseconds), timeoutMilliseconds, "timeoutMilliseconds should be non-negative.");
throw new ArgumentOutOfRangeException(nameof(timeoutMilliseconds), timeoutMilliseconds, "timeoutMilliseconds should be non-negative or Timeout.Infinite.");
}

if (Interlocked.Increment(ref this.shutdownCount) > 1)
Expand Down
28 changes: 14 additions & 14 deletions src/OpenTelemetry/Trace/TracerProviderExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ public static TracerProvider AddProcessor(this TracerProvider provider, BaseProc
/// </summary>
/// <param name="provider">TracerProviderSdk instance on which ForceFlush will be called.</param>
/// <param name="timeoutMilliseconds">
/// The number of milliseconds to wait, or <c>Timeout.Infinite</c> to
/// wait indefinitely.
/// The number (non-negative) of milliseconds to wait, or
/// <c>Timeout.Infinite</c> to wait indefinitely.
/// </param>
/// <returns>
/// Returns <c>true</c> when force flush succeeded; otherwise, <c>false</c>.
Expand All @@ -68,13 +68,13 @@ public static bool ForceFlush(this TracerProvider provider, int timeoutMilliseco
throw new ArgumentNullException(nameof(provider));
}

if (provider is TracerProviderSdk tracerProviderSdk)
if (timeoutMilliseconds < 0 && timeoutMilliseconds != Timeout.Infinite)
{
if (timeoutMilliseconds < 0 && timeoutMilliseconds != Timeout.Infinite)
{
throw new ArgumentOutOfRangeException(nameof(timeoutMilliseconds), timeoutMilliseconds, "timeoutMilliseconds should be non-negative.");
}
throw new ArgumentOutOfRangeException(nameof(timeoutMilliseconds), timeoutMilliseconds, "timeoutMilliseconds should be non-negative or Timeout.Infinite.");
}

if (provider is TracerProviderSdk tracerProviderSdk)
{
try
{
return tracerProviderSdk.OnForceFlush(timeoutMilliseconds);
Expand All @@ -95,8 +95,8 @@ public static bool ForceFlush(this TracerProvider provider, int timeoutMilliseco
/// </summary>
/// <param name="provider">TracerProviderSdk instance on which Shutdown will be called.</param>
/// <param name="timeoutMilliseconds">
/// The number of milliseconds to wait, or <c>Timeout.Infinite</c> to
/// wait indefinitely.
/// The number (non-negative) of milliseconds to wait, or
/// <c>Timeout.Infinite</c> to wait indefinitely.
/// </param>
/// <returns>
/// Returns <c>true</c> when shutdown succeeded; otherwise, <c>false</c>.
Expand All @@ -115,13 +115,13 @@ public static bool Shutdown(this TracerProvider provider, int timeoutMillisecond
throw new ArgumentNullException(nameof(provider));
}

if (provider is TracerProviderSdk tracerProviderSdk)
if (timeoutMilliseconds < 0 && timeoutMilliseconds != Timeout.Infinite)
{
if (timeoutMilliseconds < 0 && timeoutMilliseconds != Timeout.Infinite)
{
throw new ArgumentOutOfRangeException(nameof(timeoutMilliseconds), timeoutMilliseconds, "timeoutMilliseconds should be non-negative.");
}
throw new ArgumentOutOfRangeException(nameof(timeoutMilliseconds), timeoutMilliseconds, "timeoutMilliseconds should be non-negative or Timeout.Infinite.");
}

if (provider is TracerProviderSdk tracerProviderSdk)
{
if (Interlocked.Increment(ref tracerProviderSdk.ShutdownCount) > 1)
{
return false; // shutdown already called
Expand Down
4 changes: 2 additions & 2 deletions src/OpenTelemetry/Trace/TracerProviderSdk.cs
Original file line number Diff line number Diff line change
Expand Up @@ -265,8 +265,8 @@ internal bool OnForceFlush(int timeoutMilliseconds)
/// thread until shutdown completed or timed out.
/// </summary>
/// <param name="timeoutMilliseconds">
/// The number of milliseconds to wait, or <c>Timeout.Infinite</c> to
/// wait indefinitely.
/// The number (non-negative) of milliseconds to wait, or
/// <c>Timeout.Infinite</c> to wait indefinitely.
/// </param>
/// <returns>
/// Returns <c>true</c> when shutdown succeeded; otherwise, <c>false</c>.
Expand Down
12 changes: 2 additions & 10 deletions test/OpenTelemetry.Tests/Trace/CompositeActivityProcessorTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -101,16 +101,8 @@ public void CompositeActivityProcessor_ForceFlush(int timeout)
{
processor.ForceFlush(timeout);

if (timeout != 0)
{
Assert.True(p1.ForceFlushCalled);
Assert.True(p2.ForceFlushCalled);
}
else
{
Assert.False(p1.ForceFlushCalled);
Assert.False(p2.ForceFlushCalled);
}
Assert.True(p1.ForceFlushCalled);
Assert.True(p2.ForceFlushCalled);
}
}
}
Expand Down

0 comments on commit 47e1c5e

Please sign in to comment.