Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor MetricReader #2385

Merged
merged 7 commits into from
Sep 20, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 2 additions & 8 deletions docs/metrics/extending-the-sdk/MyReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,9 @@ public MyReader(string name = "MyReader")
this.name = name;
}

protected override bool OnCollect(Batch<Metric> metrics, int timeoutMilliseconds)
protected override bool ProcessMetrics(Batch<Metric> metrics, int timeoutMilliseconds)
{
Console.WriteLine($"{this.name}.OnCollect(metrics={metrics}, timeoutMilliseconds={timeoutMilliseconds})");
return true;
}

protected override bool OnForceFlush(int timeoutMilliseconds)
{
Console.WriteLine($"{this.name}.OnForceFlush(timeoutMilliseconds={timeoutMilliseconds})");
Console.WriteLine($"{this.name}.ProcessMetrics(metrics={metrics}, timeoutMilliseconds={timeoutMilliseconds})");
return true;
}

Expand Down
9 changes: 1 addition & 8 deletions src/OpenTelemetry/Metrics/BaseExportingMetricReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,18 +54,11 @@ internal override void SetParentProvider(BaseProvider parentProvider)
}

/// <inheritdoc/>
protected override bool OnCollect(Batch<Metric> metrics, int timeoutMilliseconds)
protected override bool ProcessMetrics(Batch<Metric> metrics, int timeoutMilliseconds)
{
return this.exporter.Export(metrics) == ExportResult.Success;
}

/// <inheritdoc/>
protected override bool OnForceFlush(int timeoutMilliseconds)
{
// TODO: need to hammer this out
return true;
}

/// <inheritdoc />
protected override bool OnShutdown(int timeoutMilliseconds)
{
Expand Down
55 changes: 13 additions & 42 deletions src/OpenTelemetry/Metrics/CompositeMetricReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -69,72 +69,43 @@ public CompositeMetricReader AddReader(MetricReader reader)
}

/// <inheritdoc/>
public override bool Collect(int timeoutMilliseconds = Timeout.Infinite)
{
var cur = this.head;
var result = true;
var sw = Stopwatch.StartNew();

while (cur != null)
{
if (timeoutMilliseconds == Timeout.Infinite)
{
result = cur.Value.Collect(Timeout.Infinite) && result;
}
else
{
var timeout = timeoutMilliseconds - sw.ElapsedMilliseconds;

// notify all the readers, even if we run overtime
result = cur.Value.Collect((int)Math.Max(timeout, 0)) && result;
}

cur = cur.Next;
}

return result;
}

protected override bool OnCollect(Batch<Metric> metrics, int timeoutMilliseconds)
protected override bool ProcessMetrics(Batch<Metric> metrics, int timeoutMilliseconds)
{
// CompositeMetricReader delegates the work to its underlying readers,
// so CompositeMetricReader.OnCollect should never be called.
// so CompositeMetricReader.ProcessMetrics should never be called.
throw new NotImplementedException();
}

/// <inheritdoc/>
protected override bool OnForceFlush(int timeoutMilliseconds)
protected override bool OnCollect(int timeoutMilliseconds = Timeout.Infinite)
{
if (timeoutMilliseconds < 0 && timeoutMilliseconds != Timeout.Infinite)
{
throw new ArgumentOutOfRangeException(nameof(timeoutMilliseconds), timeoutMilliseconds, "timeoutMilliseconds should be non-negative.");
}

var result = true;
var cur = this.head;
var sw = Stopwatch.StartNew();

while (cur != null)
{
if (timeoutMilliseconds == Timeout.Infinite)
{
_ = cur.Value.ForceFlush(Timeout.Infinite);
result = cur.Value.Collect(Timeout.Infinite) && result;
}
else
{
var timeout = timeoutMilliseconds - sw.ElapsedMilliseconds;

if (timeout <= 0)
{
return false;
}

var succeeded = cur.Value.ForceFlush((int)timeout);

if (!succeeded)
{
return false;
}
// notify all the readers, even if we run overtime
result = cur.Value.Collect((int)Math.Max(timeout, 0)) && result;
}

cur = cur.Next;
}

return true;
return result;
}

/// <inheritdoc/>
Expand Down
2 changes: 1 addition & 1 deletion src/OpenTelemetry/Metrics/MeterProviderSdk.cs
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ internal Batch<Metric> Collect()
/// </remarks>
internal bool OnForceFlush(int timeoutMilliseconds)
{
return this.reader?.ForceFlush(timeoutMilliseconds) ?? true;
return this.reader?.Collect(timeoutMilliseconds) ?? true;
}

/// <summary>
Expand Down
101 changes: 54 additions & 47 deletions src/OpenTelemetry/Metrics/MetricReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,50 +49,24 @@ public AggregationTemporality SupportedAggregationTemporality
}
}

public virtual bool Collect(int timeoutMilliseconds = Timeout.Infinite)
{
var sw = Stopwatch.StartNew();

var collectMetric = this.ParentProvider.GetMetricCollect();
var metricsCollected = collectMetric();

if (timeoutMilliseconds == Timeout.Infinite)
{
this.OnCollect(metricsCollected, Timeout.Infinite);
}
else
{
var timeout = timeoutMilliseconds - sw.ElapsedMilliseconds;

if (timeout <= 0)
{
return false;
}

return this.OnCollect(metricsCollected, (int)timeout);
}

return true;
}

/// <summary>
/// Flushes the processor, blocks the current thread until flush
/// completed, shutdown signaled or timed out.
/// Attempts to collect the metrics, blocks the current thread until
/// metrics collection completed or timed out.
/// </summary>
/// <param name="timeoutMilliseconds">
/// The number of milliseconds to wait, or <c>Timeout.Infinite</c> to
/// wait indefinitely.
/// The number (non-negative) of milliseconds to wait, or
/// <c>Timeout.Infinite</c> to wait indefinitely.
/// </param>
/// <returns>
/// Returns <c>true</c> when flush succeeded; otherwise, <c>false</c>.
/// Returns <c>true</c> when metrics collection succeeded; otherwise, <c>false</c>.
/// </returns>
/// <exception cref="System.ArgumentOutOfRangeException">
/// Thrown when the <c>timeoutMilliseconds</c> is smaller than -1.
/// </exception>
/// <remarks>
/// This function guarantees thread-safety.
/// </remarks>
public bool ForceFlush(int timeoutMilliseconds = Timeout.Infinite)
public bool Collect(int timeoutMilliseconds = Timeout.Infinite)
{
if (timeoutMilliseconds < 0 && timeoutMilliseconds != Timeout.Infinite)
{
Expand All @@ -101,11 +75,11 @@ public bool ForceFlush(int timeoutMilliseconds = Timeout.Infinite)

try
{
return this.OnForceFlush(timeoutMilliseconds);
return this.OnCollect(timeoutMilliseconds);
}
catch (Exception)
{
// TODO: OpenTelemetrySdkEventSource.Log.SpanProcessorException(nameof(this.ForceFlush), ex);
// TODO: OpenTelemetrySdkEventSource.Log.SpanProcessorException(nameof(this.Collect), ex);
return false;
}
}
Expand All @@ -115,8 +89,8 @@ public bool ForceFlush(int timeoutMilliseconds = Timeout.Infinite)
/// shutdown completed or timed out.
/// </summary>
/// <param name="timeoutMilliseconds">
/// The number of milliseconds to wait, or <c>Timeout.Infinite</c> to
/// wait indefinitely.
/// The number (non-negative) of milliseconds to wait, or
/// <c>Timeout.Infinite</c> to wait indefinitely.
/// </param>
/// <returns>
/// Returns <c>true</c> when shutdown succeeded; otherwise, <c>false</c>.
Expand Down Expand Up @@ -163,36 +137,69 @@ internal virtual void SetParentProvider(BaseProvider parentProvider)
this.ParentProvider = parentProvider;
}

protected abstract bool OnCollect(Batch<Metric> metrics, int timeoutMilliseconds);
/// <summary>
/// Processes a batch of metrics.
/// </summary>
/// <param name="metrics">Batch of metrics to be processed.</param>
/// <param name="timeoutMilliseconds">
/// The number (non-negative) of milliseconds to wait, or
/// <c>Timeout.Infinite</c> to wait indefinitely.
/// </param>
/// <returns>
/// Returns <c>true</c> when metrics processing succeeded; otherwise,
/// <c>false</c>.
/// </returns>
protected abstract bool ProcessMetrics(Batch<Metric> metrics, int timeoutMilliseconds);

/// <summary>
/// Called by <c>ForceFlush</c>. This function should block the current
/// thread until flush completed, shutdown signaled or timed out.
/// Called by <c>Collect</c>. This function should block the current
/// thread until metrics collection completed, shutdown signaled or
/// timed out.
/// </summary>
/// <param name="timeoutMilliseconds">
/// The number of milliseconds to wait, or <c>Timeout.Infinite</c> to
/// wait indefinitely.
/// The number (non-negative) of milliseconds to wait, or
/// <c>Timeout.Infinite</c> to wait indefinitely.
/// </param>
/// <returns>
/// Returns <c>true</c> when flush succeeded; otherwise, <c>false</c>.
/// Returns <c>true</c> when metrics collection succeeded; otherwise,
/// <c>false</c>.
/// </returns>
/// <remarks>
/// This function is called synchronously on the thread which called
/// <c>ForceFlush</c>. This function should be thread-safe, and should
/// <c>Collect</c>. This function should be thread-safe, and should
/// not throw exceptions.
/// </remarks>
protected virtual bool OnForceFlush(int timeoutMilliseconds)
protected virtual bool OnCollect(int timeoutMilliseconds)
{
return true;
var sw = Stopwatch.StartNew();

var collectMetric = this.ParentProvider.GetMetricCollect();
var metrics = collectMetric();

if (timeoutMilliseconds == Timeout.Infinite)
{
return this.ProcessMetrics(metrics, Timeout.Infinite);
}
else
{
var timeout = timeoutMilliseconds - sw.ElapsedMilliseconds;

if (timeout <= 0)
{
return false;
}

return this.ProcessMetrics(metrics, (int)timeout);
reyang marked this conversation as resolved.
Show resolved Hide resolved
}
}

/// <summary>
/// Called by <c>Shutdown</c>. This function should block the current
/// thread until shutdown completed or timed out.
/// </summary>
/// <param name="timeoutMilliseconds">
/// The number of milliseconds to wait, or <c>Timeout.Infinite</c> to
/// wait indefinitely.
/// The number (non-negative) of milliseconds to wait, or
/// <c>Timeout.Infinite</c> to wait indefinitely.
/// </param>
/// <returns>
/// Returns <c>true</c> when shutdown succeeded; otherwise, <c>false</c>.
Expand Down