Skip to content

Commit

Permalink
Sh (#211)
Browse files Browse the repository at this point in the history
* fix: Remove prometheus
* fix: Remove bad logging
  • Loading branch information
SenexCrenshaw authored Feb 22, 2024
1 parent 497b721 commit bc1e9b3
Show file tree
Hide file tree
Showing 22 changed files with 331 additions and 236 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -370,3 +370,4 @@ Dockerfile.sm
basever
buildver
smver
release.config.cjs
28 changes: 28 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,31 @@
## [0.13.2-sh.2](https://github.com/SenexCrenshaw/StreamMaster/compare/v0.13.2-sh.1...v0.13.2-sh.2) (2024-02-22)


### Bug Fixes

* Client writing ([d3f4f54](https://github.com/SenexCrenshaw/StreamMaster/commit/d3f4f54dcfa39b0d9ccb65e5a974bd72f0848d62))

## [0.13.2-sh.2](https://github.com/SenexCrenshaw/StreamMaster/compare/v0.13.2-sh.1...v0.13.2-sh.2) (2024-02-22)


### Bug Fixes

* Client writing ([d3f4f54](https://github.com/SenexCrenshaw/StreamMaster/commit/d3f4f54dcfa39b0d9ccb65e5a974bd72f0848d62))

## [0.13.2-sh.2](https://github.com/SenexCrenshaw/StreamMaster/compare/v0.13.2-sh.1...v0.13.2-sh.2) (2024-02-22)


### Bug Fixes

* Client writing ([d3f4f54](https://github.com/SenexCrenshaw/StreamMaster/commit/d3f4f54dcfa39b0d9ccb65e5a974bd72f0848d62))

## [0.13.2-sh.1](https://github.com/SenexCrenshaw/StreamMaster/compare/v0.13.1...v0.13.2-sh.1) (2024-02-21)


### Bug Fixes

* update videoinfo ([f7231ac](https://github.com/SenexCrenshaw/StreamMaster/commit/f7231ac323601b3a0387e4d48e7c3c3260eea728))

## [0.13.1](https://github.com/SenexCrenshaw/StreamMaster/compare/v0.13.0...v0.13.1) (2024-02-11)


Expand Down
12 changes: 10 additions & 2 deletions Dockerfile.base
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
FROM mcr.microsoft.com/dotnet/aspnet:8.0 AS base
ARG TARGETPLATFORM
ARG TARGETOS
ARG TARGETARCH
ARG TARGETVARIANT

WORKDIR /app

RUN apt-get update -yq \
RUN rm -f /etc/apt/apt.conf.d/docker-clean; echo 'Binary::apt::APT::Keep-Downloaded-Packages "true";' > /etc/apt/apt.conf.d/keep-cache

RUN \
--mount=type=cache,id=apt-$TARGETARCH$TARGETVARIANT,sharing=locked,target=/var/cache/apt \
apt-get update -yq \
&& apt-get upgrade -yq \
&& apt-get install -yq --no-install-recommends ffmpeg gosu postgresql postgresql-common curl gnupg lsb-release \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/* \
&& rm -rf /var/lib/apt/lists/* \
&& mkdir /docker-entrypoint-initdb.d
10 changes: 9 additions & 1 deletion Dockerfile.build
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
FROM mcr.microsoft.com/dotnet/sdk:8.0 AS build
ARG TARGETPLATFORM
ARG TARGETOS
ARG TARGETARCH
ARG TARGETVARIANT

RUN apt-get update -yq \
RUN rm -f /etc/apt/apt.conf.d/docker-clean; echo 'Binary::apt::APT::Keep-Downloaded-Packages "true";' > /etc/apt/apt.conf.d/keep-cache

RUN \
--mount=type=cache,id=apt-$TARGETARCH$TARGETVARIANT,sharing=locked,target=/var/cache/apt \
apt-get update -yq \
&& apt-get upgrade -yq \
&& curl -fsSL https://deb.nodesource.com/setup_21.x | bash - \
&& apt-get install --no-install-recommends -yq nodejs \
Expand Down
6 changes: 3 additions & 3 deletions StreamMaster.API/AssemblyInfo.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
using System.Reflection;

[assembly: AssemblyVersion("0.13.1.0")]
[assembly: AssemblyFileVersion("0.13.1.0")]
[assembly: AssemblyInformationalVersion("0.13.1.Sha.19d3cf9344080dfb25a6559125410d00a3540665")]
[assembly: AssemblyVersion("0.13.2.3")]
[assembly: AssemblyFileVersion("0.13.2.3")]
[assembly: AssemblyInformationalVersion("0.13.2-sh.3.Sha.d1570728f5c010f25cd649695fc1d1e339b3d084")]
165 changes: 96 additions & 69 deletions StreamMaster.Domain/CircularBuffer.cs
Original file line number Diff line number Diff line change
@@ -1,98 +1,125 @@
namespace StreamMaster.Domain;

public class CircularBuffer
namespace StreamMaster.Domain
{
private readonly byte[] buffer;
private int writePosition = 0;
private readonly object lockObj = new();

public CircularBuffer(int capacity)
public class CircularBuffer
{
if (capacity <= 0)
{
throw new ArgumentException("Capacity must be greater than 0", nameof(capacity));
}

Capacity = capacity;
buffer = new byte[capacity];
}
private readonly byte[] buffer;
private int writePosition = 0;
private readonly object lockObj = new();

public void Write(byte[] data)
{
if (data == null)
/// <summary>
/// Initializes a new instance of the CircularBuffer class with a specified capacity.
/// </summary>
/// <param name="capacity">The maximum amount of data the buffer can hold.</param>
/// <exception cref="ArgumentException">Thrown when capacity is less than or equal to 0.</exception>
public CircularBuffer(int capacity)
{
throw new ArgumentNullException(nameof(data));
if (capacity <= 0)
{
throw new ArgumentException("Capacity must be greater than 0.", nameof(capacity));
}

Capacity = capacity;
buffer = new byte[capacity];
}

lock (lockObj)
/// <summary>
/// Writes data into the circular buffer. If the buffer does not have enough space,
/// older data will be overwritten.
/// </summary>
/// <param name="data">The byte array to write into the buffer.</param>
/// <exception cref="ArgumentNullException">Thrown when data is null.</exception>
public void Write(byte[] data)
{
int dataLength = data.Length;
if (dataLength > Capacity)
if (data == null)
{
// If data is larger than the capacity, write only the last part of data that fits.
Array.Copy(data, dataLength - Capacity, buffer, 0, Capacity);
writePosition = 0;
AvailableData = Capacity;
throw new ArgumentNullException(nameof(data));
}
else
{
int part1Len = Math.Min(Capacity - writePosition, dataLength);
Array.Copy(data, 0, buffer, writePosition, part1Len);

if (part1Len < dataLength)
lock (lockObj)
{
int dataLength = data.Length;
if (dataLength > Capacity)
{
// If the data was split, write the remaining part at the beginning of the buffer.
Array.Copy(data, part1Len, buffer, 0, dataLength - part1Len);
// Overwrite the buffer with the last 'Capacity' bytes of 'data'
Array.Copy(data, dataLength - Capacity, buffer, 0, Capacity);
writePosition = 0;
AvailableData = Capacity;
}
else
{
// Calculate how much data can be written without wrapping
int part1Len = Math.Min(Capacity - writePosition, dataLength);
Array.Copy(data, 0, buffer, writePosition, part1Len);

if (part1Len < dataLength)
{
// Wrap and write the remaining data
Array.Copy(data, part1Len, buffer, 0, dataLength - part1Len);
}

writePosition = (writePosition + dataLength) % Capacity;
AvailableData = Math.Min(AvailableData + dataLength, Capacity);
// Update write position and available data, wrapping the write position if necessary
writePosition = (writePosition + dataLength) % Capacity;
AvailableData = Math.Min(AvailableData + dataLength, Capacity);
}
}
}
}

public byte[] ReadLatestData()
{
lock (lockObj)
/// <summary>
/// Reads the most recent data written into the buffer without removing it.
/// </summary>
/// <returns>A byte array containing the latest data. The size of the array is up to the amount of available data.</returns>
public byte[] ReadLatestData()
{
byte[] latestData = new byte[AvailableData]; // Prepare an array to hold the latest data.

if (AvailableData == 0)
lock (lockObj)
{
// No data available to read.
return latestData;
}
byte[] latestData = new byte[AvailableData];

if (writePosition == 0 || AvailableData == Capacity)
{
// If writePosition is 0, it means the buffer has just wrapped around or is exactly full,
// so the latest data is the entire buffer.
Array.Copy(buffer, 0, latestData, 0, AvailableData);
}
else
{
// Calculate the start position for the latest data that's not contiguous.
int startIdx = writePosition - AvailableData;
if (startIdx < 0)
if (AvailableData == 0)
{
// The data wraps around; copy the end segment and then the start segment.
startIdx += Capacity; // Correct the start index to a positive value.
int part1Length = Capacity - startIdx;
Array.Copy(buffer, startIdx, latestData, 0, part1Length);
Array.Copy(buffer, 0, latestData, part1Length, writePosition);
return latestData; // Early exit if no data available
}

if (writePosition == 0 || AvailableData == Capacity)
{
// Buffer is exactly full, or just wrapped around
Array.Copy(buffer, 0, latestData, 0, AvailableData);
}
else
{
// All available data is contiguous and can be copied directly.
Array.Copy(buffer, startIdx, latestData, 0, AvailableData);
// Data wraps around the buffer end; copy in two segments
int startIdx = (Capacity + writePosition - AvailableData) % Capacity;
int part1Length = Math.Min(AvailableData, Capacity - startIdx);
Array.Copy(buffer, startIdx, latestData, 0, part1Length);
if (part1Length < AvailableData)
{
Array.Copy(buffer, 0, latestData, part1Length, AvailableData - part1Length);
}
}
}

return latestData;
return latestData;
}
}
}
/// <summary>
/// Gets the capacity of the buffer.
/// </summary>
public int Capacity { get; }

/// <summary>
/// Gets the amount of data currently stored in the buffer.
/// </summary>
public int AvailableData { get; private set; } = 0;

/// <summary>
/// Clears all data from the buffer, resetting its state.
/// </summary>
public void Clear()
{
lock (lockObj)
{
writePosition = 0;
AvailableData = 0;
}
}

public int Capacity { get; }
public int AvailableData { get; private set; } = 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,12 @@ public ClientStreamerConfiguration(

public async Task CancelClient(bool includeAbort = true)
{
if (ReadBuffer != null)
if (Stream != null)
{
ReadBuffer.ReadChannel?.Writer.Complete();
ReadBuffer.Cancel();
ReadBuffer.Dispose();
ReadBuffer = null;
Stream.Channel?.Writer.Complete();
Stream.Cancel();
Stream.Dispose();
Stream = null;
}

try
Expand Down Expand Up @@ -71,7 +71,7 @@ public async Task CancelClient(bool includeAbort = true)
}

//Buffering
public IClientReadStream? ReadBuffer { get; set; }
public IClientReadStream? Stream { get; set; }

//Tokens
private CancellationToken ClientHTTPRequestCancellationToken { get; }
Expand Down
2 changes: 1 addition & 1 deletion StreamMaster.Infrastructure/Services/BroadcastService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public void LogDebug()
//logger.LogInformation("GetStreamHandlers: {GetStreamHandlers}", streamManager.GetStreamHandlers().Count);
foreach (IClientStreamerConfiguration clientStreamerConfiguration in clientStreamer.GetAllClientStreamerConfigurations)
{
printDebug("Client: {0} {1}", clientStreamerConfiguration.ChannelName, clientStreamerConfiguration.ReadBuffer?.Id ?? Guid.Empty);
printDebug("Client: {0} {1}", clientStreamerConfiguration.ChannelName, clientStreamerConfiguration.Stream?.Id ?? Guid.Empty);
}

foreach (IStreamHandler handler in streamManager.GetStreamHandlers())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ namespace StreamMaster.Streams.Domain.Interfaces;
public interface IClientReadStream : IDisposable
{
string VideoStreamName { get; set; }
Channel<byte[]> ReadChannel { get; }
Channel<byte[]> Channel { get; }

/// <summary>
/// Gets a value indicating whether the stream supports reading.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ public interface IClientStreamerConfiguration
Guid ClientId { get; set; }
string ClientIPAddress { get; set; }
string ClientUserAgent { get; set; }
IClientReadStream? ReadBuffer { get; set; }
IClientReadStream? Stream { get; set; }
CancellationTokenSource ClientMasterToken { get; set; }
string VideoStreamName { get; set; }
}
16 changes: 7 additions & 9 deletions StreamMaster.Streams/Buffers/ClientReadStream.Main.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
using Microsoft.Extensions.Caching.Memory;
using Microsoft.Extensions.Logging;

using StreamMaster.Domain.Cache;

using System.Threading.Channels;

namespace StreamMaster.Streams.Buffers;
Expand Down Expand Up @@ -37,12 +35,12 @@ public ClientReadStream(IMemoryCache memoryCache, IStatisticsManager _statistics
SingleReader = true,
SingleWriter = true
};
ReadChannel = Channel.CreateUnbounded<byte[]>(options);
Channel = System.Threading.Channels.Channel.CreateUnbounded<byte[]>(options);

logger.LogInformation("Starting client read stream for ClientId: {ClientId}", ClientId);
}

public Channel<byte[]> ReadChannel { get; private set; }
public Channel<byte[]> Channel { get; private set; }

private bool IsCancelled { get; set; }

Expand Down Expand Up @@ -75,11 +73,11 @@ public override async Task<int> ReadAsync(byte[] buffer, int offset, int count,
}
catch (Exception ex)
{
Setting setting = memoryCache.GetSetting();
if (setting.EnablePrometheus)
{
_readErrorsCounter.WithLabels(ClientId.ToString(), VideoStreamName).Inc();
}
//Setting setting = memoryCache.GetSetting();
//if (setting.EnablePrometheus)
//{
// _readErrorsCounter.WithLabels(ClientId.ToString(), VideoStreamName).Inc();
//}

logger.LogError(ex, "Error reading buffer for ClientId: {ClientId}", ClientId);
}
Expand Down
Loading

0 comments on commit bc1e9b3

Please sign in to comment.