Skip to content

Commit

Permalink
Initial framework for Aggregators (#2053)
Browse files Browse the repository at this point in the history
  • Loading branch information
victlu authored May 27, 2021
1 parent 4190b6b commit b5f8873
Show file tree
Hide file tree
Showing 28 changed files with 1,214 additions and 143 deletions.
30 changes: 25 additions & 5 deletions examples/Console/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,20 +34,22 @@ public class Program
/// dotnet run -p Examples.Console.csproj prometheus -i 15 -p 9184 -d 2
/// dotnet run -p Examples.Console.csproj otlp -e "http://localhost:4317"
/// dotnet run -p Examples.Console.csproj zpages
/// dotnet run -p Examples.Console.csproj metrics -p 100 -e 500
/// dotnet run -p Examples.Console.csproj metrics --help
///
/// The above must be run from the project root folder
/// (eg: C:\repos\opentelemetry-dotnet\examples\Console\).
/// </summary>
/// <param name="args">Arguments from command line.</param>
public static void Main(string[] args)
{
bool prompt = true;

Parser.Default.ParseArguments<JaegerOptions, ZipkinOptions, PrometheusOptions, MetricsOptions, GrpcNetClientOptions, HttpClientOptions, RedisOptions, ZPagesOptions, ConsoleOptions, OpenTelemetryShimOptions, OpenTracingShimOptions, OtlpOptions, InMemoryOptions>(args)
.MapResult(
(JaegerOptions options) => TestJaegerExporter.Run(options.Host, options.Port),
(ZipkinOptions options) => TestZipkinExporter.Run(options.Uri),
(PrometheusOptions options) => TestPrometheusExporter.Run(options.Port, options.PushIntervalInSecs, options.DurationInMins),
(MetricsOptions options) => TestMetrics.Run(options.ObservationPeriodMilliseconds, options.CollectionPeriodMilliseconds),
(MetricsOptions options) => TestMetrics.Run(options, ref prompt),
(GrpcNetClientOptions options) => TestGrpcNetClient.Run(),
(HttpClientOptions options) => TestHttpClient.Run(),
(RedisOptions options) => TestRedis.Run(options.Uri),
Expand All @@ -59,7 +61,10 @@ public static void Main(string[] args)
(InMemoryOptions options) => TestInMemoryExporter.Run(options),
errs => 1);

System.Console.ReadLine();
if (prompt)
{
System.Console.ReadLine();
}
}
}

Expand Down Expand Up @@ -98,11 +103,26 @@ internal class PrometheusOptions
[Verb("metrics", HelpText = "Specify the options required to test Metrics")]
internal class MetricsOptions
{
[Option('p', "observationPeriodMilliseconds", Default = 100, HelpText = "Observation period.", Required = false)]
[Option('p', "prompt", HelpText = "Prompt for exit", Default = false)]
public bool? Prompt { get; set; }

[Option("runtime", Default = 5000, HelpText = "Run time in milliseconds.", Required = false)]
public int RunTime { get; set; }

[Option("observationPeriodMilliseconds", Default = 100, HelpText = "Observation period.", Required = false)]
public int ObservationPeriodMilliseconds { get; set; }

[Option('c', "collectionPeriodMilliseconds", Default = 500, HelpText = "Collection period.", Required = false)]
[Option("collectionPeriodMilliseconds", Default = 500, HelpText = "Collection period.", Required = false)]
public int CollectionPeriodMilliseconds { get; set; }

[Option('o', "runObservable", Default = true, HelpText = "Run observable counters.", Required = false)]
public bool? RunObservable { get; set; }

[Option('t', "numTasks", Default = 1, HelpText = "Run # of tasks.", Required = false)]
public int NumTasks { get; set; }

[Option("maxLoops", Default = 0, HelpText = "Maximum number of loops. 0 = No Limit", Required = false)]
public int MaxLoops { get; set; }
}

[Verb("grpc", HelpText = "Specify the options required to test Grpc.Net.Client")]
Expand Down
92 changes: 70 additions & 22 deletions examples/Console/TestMetrics.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.Metrics;
using System.Threading;
using System.Threading.Tasks;
using OpenTelemetry;
using OpenTelemetry.Metrics;
Expand All @@ -25,42 +26,89 @@ namespace Examples.Console
{
internal class TestMetrics
{
internal static object Run(int observationInterval, int collectionInterval)
internal static object Run(MetricsOptions options, ref bool prompt)
{
prompt = options.Prompt.Value;

using var provider = Sdk.CreateMeterProviderBuilder()
.AddSource("TestMeter") // All instruments from this meter are enabled.
.SetObservationPeriod(observationInterval)
.SetCollectionPeriod(collectionInterval)
.AddProcessor(new TagEnrichmentProcessor())
.SetObservationPeriod(options.ObservationPeriodMilliseconds)
.SetCollectionPeriod(options.CollectionPeriodMilliseconds)
.AddProcessor(new TagEnrichmentProcessor("newAttrib", "newAttribValue"))
.AddExportProcessor(new MetricConsoleExporter())
.Build();

using var meter = new Meter("TestMeter", "0.0.1");

var counter = meter.CreateCounter<int>("counter1");
counter.Add(10);

counter.Add(
100,
new KeyValuePair<string, object>("tag1", "value1"));
if (options.RunObservable ?? true)
{
var observableCounter = meter.CreateObservableGauge<int>("CurrentMemoryUsage", () =>
{
return new List<Measurement<int>>()
{
new Measurement<int>(
(int)Process.GetCurrentProcess().PrivateMemorySize64,
new KeyValuePair<string, object>("tag1", "value1")),
};
});
}

counter.Add(
200,
new KeyValuePair<string, object>("tag1", "value1"),
new KeyValuePair<string, object>("tag2", "value2"));
var cts = new CancellationTokenSource();

var observableCounter = meter.CreateObservableGauge<int>("CurrentMemoryUsage", () =>
var tasks = new List<Task>();

for (int i = 0; i < options.NumTasks; i++)
{
return new List<Measurement<int>>()
var taskno = i;

tasks.Add(Task.Run(() =>
{
new Measurement<int>(
(int)Process.GetCurrentProcess().PrivateMemorySize64,
new KeyValuePair<string, object>("tag1", "value1")),
};
});

Task.Delay(5000).Wait();
System.Console.WriteLine("Press Enter key to exit.");
System.Console.WriteLine($"Task started {taskno + 1}/{options.NumTasks}.");

var loops = 0;

while (!cts.IsCancellationRequested)
{
if (options.MaxLoops > 0 && loops >= options.MaxLoops)
{
break;
}

counter.Add(10);

counter.Add(
100,
new KeyValuePair<string, object>("tag1", "value1"));

counter.Add(
200,
new KeyValuePair<string, object>("tag1", "value2"),
new KeyValuePair<string, object>("tag2", "value2"));

counter.Add(
100,
new KeyValuePair<string, object>("tag1", "value1"));

counter.Add(
200,
new KeyValuePair<string, object>("tag2", "value2"),
new KeyValuePair<string, object>("tag1", "value2"));

loops++;
}
}));
}

cts.CancelAfter(options.RunTime);
Task.WaitAll(tasks.ToArray());

if (prompt)
{
System.Console.WriteLine("Press Enter key to exit.");
}

return null;
}
}
Expand Down
35 changes: 35 additions & 0 deletions src/OpenTelemetry/Metrics/Aggregator.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// <copyright file="Aggregator.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>

using System;
using System.Collections.Generic;
using System.Linq;

namespace OpenTelemetry.Metrics
{
internal abstract class Aggregator
{
internal virtual void Update<T>(DateTimeOffset dt, T value)
where T : struct
{
}

internal virtual IEnumerable<Metric> Collect()
{
return Enumerable.Empty<Metric>();
}
}
}
85 changes: 85 additions & 0 deletions src/OpenTelemetry/Metrics/Aggregator/LastValueAggregator.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// <copyright file="LastValueAggregator.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>

using System;
using System.Collections.Generic;
using System.Diagnostics.Metrics;
using System.Linq;

namespace OpenTelemetry.Metrics
{
internal class LastValueAggregator : Aggregator
{
private readonly Instrument instrument;
private readonly KeyValuePair<string, object>[] tags;

private readonly object lockUpdate = new object();
private int count = 0;
private DataPoint lastDataPoint;

internal LastValueAggregator(Instrument instrument, string[] names, object[] values)
{
this.instrument = instrument;

if (names.Length != values.Length)
{
throw new ArgumentException($"Length of {nameof(names)} and {nameof(values)} must match.");
}

this.tags = new KeyValuePair<string, object>[names.Length];
for (int i = 0; i < names.Length; i++)
{
this.tags[i] = new KeyValuePair<string, object>(names[i], values[i]);
}
}

internal override void Update<T>(DateTimeOffset dt, T value)
where T : struct
{
lock (this.lockUpdate)
{
this.count++;
this.lastDataPoint = DataPoint.CreateDataPoint(dt, value, this.tags);
}
}

internal override IEnumerable<Metric> Collect()
{
// TODO: Need to determine how to convert to Metric

if (this.count == 0)
{
return Enumerable.Empty<Metric>();
}

DataPoint lastValue;
lock (this.lockUpdate)
{
lastValue = this.lastDataPoint;
this.count = 0;
}

var metrics = new Metric[]
{
new Metric(
$"{this.instrument.Meter.Name}:{this.instrument.Name}:LastValue",
lastValue),
};

return metrics;
}
}
}
Loading

0 comments on commit b5f8873

Please sign in to comment.