Skip to content

Commit

Permalink
added flush
Browse files Browse the repository at this point in the history
  • Loading branch information
IamRanjeetSingh committed Sep 12, 2024
1 parent 1a3ec78 commit dbcacdf
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 30 deletions.
12 changes: 8 additions & 4 deletions Ginger/GingerCoreNET/Telemetry/BlockingBufferQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,10 @@ public void Enqueue(T item)
}
finally
{
_syncEnqueueEvent.Set();
if (!_isDisposed)
{
_syncEnqueueEvent.Set();
}
}
}

Expand Down Expand Up @@ -90,14 +93,15 @@ public IEnumerable<T> Dequeue()
}
finally
{
_syncDequeueEvent.Set();
if (!_isDisposed)
{
_syncDequeueEvent.Set();
}
}
}

public void Flush()
{
ThrowIfDisposed();

_syncBufferSizeSemaphoreCTS?.Cancel();
}

Expand Down
47 changes: 21 additions & 26 deletions Ginger/GingerCoreNET/Telemetry/TelemetryQueue.cs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
using Microsoft.CodeAnalysis.Scripting.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.VisualStudio.Services.Common;
using Microsoft.VisualStudio.Services.WebApi;
using NPOI.HSSF.Record.Aggregates;
using NPOI.OpenXmlFormats.Dml;
using System;
using System.Collections;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading;
Expand All @@ -16,6 +18,8 @@ namespace Amdocs.Ginger.CoreNET.Telemetry
{
internal sealed class TelemetryQueue<TRecord> : IDisposable
{
private static readonly TimeSpan QueueFlushWaitTime = TimeSpan.FromSeconds(1);

Check warning on line 21 in Ginger/GingerCoreNET/Telemetry/TelemetryQueue.cs

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

Ginger/GingerCoreNET/Telemetry/TelemetryQueue.cs#L21

A static field in a generic type is not shared among instances of different close constructed types.

internal sealed class Config
{
internal required int BufferSize { get; init; }
Expand All @@ -39,7 +43,7 @@ internal sealed class Config

private bool _isDisposed;

private string QueueName { get; } = $"{typeof(TRecord).FullName}-Telemetry-Queue";
private string QueueName { get; } = $"{typeof(TRecord).Name}-Telemetry-Queue";

internal TelemetryQueue(Config config)
{
Expand All @@ -49,41 +53,31 @@ internal TelemetryQueue(Config config)
_retryPollingSize = config.RetryPollingSize;
_retryPollingInterval = config.RetryPollingInterval;
_cancellationTokenSource = new();

Check failure on line 55 in Ginger/GingerCoreNET/Telemetry/TelemetryQueue.cs

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

Ginger/GingerCoreNET/Telemetry/TelemetryQueue.cs#L55

Dispose '_cancellationTokenSource' when it is no longer needed.
_consumerTask = CreateConsumerTask();
_retryTask = CreateRetryTask();
_consumerTask = StartConsumerTask();
_retryTask = StartRetryService();
_recordsBeingProcessed = [];
_logger = config.Logger;

_isDisposed = false;

_consumerTask.Start();
_retryTask.Start();
}

private Task CreateConsumerTask()
private Task StartConsumerTask()
{
_logger?.LogTrace("creating new consumer task in {queueName}", QueueName);

return new Task(async () =>
return Task.Run(async () =>
{
_logger?.LogDebug("started consumer task in {queueName}", QueueName);
while (!_cancellationTokenSource.IsCancellationRequested)
while (!_isDisposed)
{
string corrId = NewCorrelationId();
IEnumerable<TRecord> records = Dequeue(corrId);
if (_cancellationTokenSource.IsCancellationRequested)
{
break;
}
try
{
await ProcessAsync(records, corrId);
}
catch (Exception ex)
catch (Exception ex)
{
_logger?.LogError("error while processing records in {queueName}\n{ex}", QueueName, ex);
}
Expand All @@ -93,19 +87,17 @@ private Task CreateConsumerTask()
});
}

private Task CreateRetryTask()
private Task StartRetryService()
{
_logger?.LogTrace("creating new retry task in {queueName}", QueueName);

return new Task(async () =>
return Task.Run(async () =>
{
_logger?.LogDebug("started retry task in in {queueName}", QueueName);
while (!_cancellationTokenSource.IsCancellationRequested)
while (!_cancellationTokenSource.Token.IsCancellationRequested)
{
string corrId = NewCorrelationId();
await Task.Delay(_retryPollingInterval);
await Task.Delay(_retryPollingInterval, _cancellationTokenSource.Token);
if (_cancellationTokenSource.IsCancellationRequested)
{
Expand All @@ -118,13 +110,13 @@ private Task CreateRetryTask()
{
await ReprocessAsync(records, corrId);
}
catch (Exception ex)
catch (Exception ex)
{
_logger?.LogError("{corrId} error while reprocessing records in {queueName}\n{ex}", corrId, QueueName, ex);
}
}
_logger?.LogDebug("stopped consumer task in {queueName}", QueueName);
_logger?.LogDebug("stopped retry task in {queueName}", QueueName);
});
}

Expand Down Expand Up @@ -298,6 +290,9 @@ public void Dispose()

_isDisposed = true;
_cancellationTokenSource.Cancel();
_queue.Dispose();

_consumerTask.Wait(QueueFlushWaitTime);
}
}
}
1 change: 1 addition & 0 deletions Ginger/GingerCoreNET/WorkSpaceLib/WorkSpace.cs
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ public void Close()
Reporter.TelemetryQueueManager.Dispose();
}

Reporter.TelemetryQueueManager.Dispose();
//WorkSpace.Instance.Telemetry.SessionEnd();
mWorkSpace = null;
}
Expand Down

0 comments on commit dbcacdf

Please sign in to comment.