From bc1e9b3dc522d0b3d492deda92fe0009e889937f Mon Sep 17 00:00:00 2001
From: Senex Crenshaw <35600301+SenexCrenshaw@users.noreply.github.com>
Date: Thu, 22 Feb 2024 12:30:57 -0500
Subject: [PATCH] Sh (#211)
* fix: Remove prometheus
* fix: Remove bad logging
---
.gitignore | 1 +
CHANGELOG.md | 28 +++
Dockerfile.base | 12 +-
Dockerfile.build | 10 +-
StreamMaster.API/AssemblyInfo.cs | 6 +-
StreamMaster.Domain/CircularBuffer.cs | 165 ++++++++++--------
.../Clients/ClientStreamerConfiguration.cs | 12 +-
.../Services/BroadcastService.cs | 2 +-
.../Interfaces/IClientReadStream.cs | 2 +-
.../IClientStreamerConfiguration.cs | 2 +-
.../Buffers/ClientReadStream.Main.cs | 16 +-
.../Buffers/ClientReadStream.Stats.cs | 23 ++-
.../Buffers/ClientReadStream.cs | 20 +--
.../Channels/ChannelManager.cs | 4 +-
.../Clients/ClientStreamerManager.cs | 4 +-
.../Streams/StreamHandler.Stats.cs | 19 +-
.../Streams/StreamHandler.VideoInfo.cs | 45 -----
StreamMaster.Streams/Streams/StreamHandler.cs | 75 ++++----
StreamMaster.Streams/Streams/StreamManager.cs | 4 +-
build_docker.ps1 | 63 ++++++-
release.config.norelease.cjs | 51 ++++++
...e.config.cjs => release.config.release.cjs | 3 +-
22 files changed, 331 insertions(+), 236 deletions(-)
create mode 100644 release.config.norelease.cjs
rename release.config.cjs => release.config.release.cjs (93%)
diff --git a/.gitignore b/.gitignore
index 74fa0986c..274fdf364 100644
--- a/.gitignore
+++ b/.gitignore
@@ -370,3 +370,4 @@ Dockerfile.sm
basever
buildver
smver
+release.config.cjs
diff --git a/CHANGELOG.md b/CHANGELOG.md
index aaa6b9fdf..314b95297 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,31 @@
+## [0.13.2-sh.2](https://github.com/SenexCrenshaw/StreamMaster/compare/v0.13.2-sh.1...v0.13.2-sh.2) (2024-02-22)
+
+
+### Bug Fixes
+
+* Client writing ([d3f4f54](https://github.com/SenexCrenshaw/StreamMaster/commit/d3f4f54dcfa39b0d9ccb65e5a974bd72f0848d62))
+
+## [0.13.2-sh.2](https://github.com/SenexCrenshaw/StreamMaster/compare/v0.13.2-sh.1...v0.13.2-sh.2) (2024-02-22)
+
+
+### Bug Fixes
+
+* Client writing ([d3f4f54](https://github.com/SenexCrenshaw/StreamMaster/commit/d3f4f54dcfa39b0d9ccb65e5a974bd72f0848d62))
+
+## [0.13.2-sh.2](https://github.com/SenexCrenshaw/StreamMaster/compare/v0.13.2-sh.1...v0.13.2-sh.2) (2024-02-22)
+
+
+### Bug Fixes
+
+* Client writing ([d3f4f54](https://github.com/SenexCrenshaw/StreamMaster/commit/d3f4f54dcfa39b0d9ccb65e5a974bd72f0848d62))
+
+## [0.13.2-sh.1](https://github.com/SenexCrenshaw/StreamMaster/compare/v0.13.1...v0.13.2-sh.1) (2024-02-21)
+
+
+### Bug Fixes
+
+* update videoinfo ([f7231ac](https://github.com/SenexCrenshaw/StreamMaster/commit/f7231ac323601b3a0387e4d48e7c3c3260eea728))
+
## [0.13.1](https://github.com/SenexCrenshaw/StreamMaster/compare/v0.13.0...v0.13.1) (2024-02-11)
diff --git a/Dockerfile.base b/Dockerfile.base
index abbccde6f..aefeda527 100644
--- a/Dockerfile.base
+++ b/Dockerfile.base
@@ -1,10 +1,18 @@
FROM mcr.microsoft.com/dotnet/aspnet:8.0 AS base
+ARG TARGETPLATFORM
+ARG TARGETOS
+ARG TARGETARCH
+ARG TARGETVARIANT
WORKDIR /app
-RUN apt-get update -yq \
+RUN rm -f /etc/apt/apt.conf.d/docker-clean; echo 'Binary::apt::APT::Keep-Downloaded-Packages "true";' > /etc/apt/apt.conf.d/keep-cache
+
+RUN \
+ --mount=type=cache,id=apt-$TARGETARCH$TARGETVARIANT,sharing=locked,target=/var/cache/apt \
+ apt-get update -yq \
&& apt-get upgrade -yq \
&& apt-get install -yq --no-install-recommends ffmpeg gosu postgresql postgresql-common curl gnupg lsb-release \
&& apt-get clean \
- && rm -rf /var/lib/apt/lists/* \
+ && rm -rf /var/lib/apt/lists/* \
&& mkdir /docker-entrypoint-initdb.d
\ No newline at end of file
diff --git a/Dockerfile.build b/Dockerfile.build
index 01911e094..e2cf3d17a 100644
--- a/Dockerfile.build
+++ b/Dockerfile.build
@@ -1,6 +1,14 @@
FROM mcr.microsoft.com/dotnet/sdk:8.0 AS build
+ARG TARGETPLATFORM
+ARG TARGETOS
+ARG TARGETARCH
+ARG TARGETVARIANT
-RUN apt-get update -yq \
+RUN rm -f /etc/apt/apt.conf.d/docker-clean; echo 'Binary::apt::APT::Keep-Downloaded-Packages "true";' > /etc/apt/apt.conf.d/keep-cache
+
+RUN \
+ --mount=type=cache,id=apt-$TARGETARCH$TARGETVARIANT,sharing=locked,target=/var/cache/apt \
+ apt-get update -yq \
&& apt-get upgrade -yq \
&& curl -fsSL https://deb.nodesource.com/setup_21.x | bash - \
&& apt-get install --no-install-recommends -yq nodejs \
diff --git a/StreamMaster.API/AssemblyInfo.cs b/StreamMaster.API/AssemblyInfo.cs
index 12a20a013..87d051734 100644
--- a/StreamMaster.API/AssemblyInfo.cs
+++ b/StreamMaster.API/AssemblyInfo.cs
@@ -1,5 +1,5 @@
using System.Reflection;
-[assembly: AssemblyVersion("0.13.1.0")]
-[assembly: AssemblyFileVersion("0.13.1.0")]
-[assembly: AssemblyInformationalVersion("0.13.1.Sha.19d3cf9344080dfb25a6559125410d00a3540665")]
\ No newline at end of file
+[assembly: AssemblyVersion("0.13.2.3")]
+[assembly: AssemblyFileVersion("0.13.2.3")]
+[assembly: AssemblyInformationalVersion("0.13.2-sh.3.Sha.d1570728f5c010f25cd649695fc1d1e339b3d084")]
\ No newline at end of file
diff --git a/StreamMaster.Domain/CircularBuffer.cs b/StreamMaster.Domain/CircularBuffer.cs
index fb3aa2a9a..394c55331 100644
--- a/StreamMaster.Domain/CircularBuffer.cs
+++ b/StreamMaster.Domain/CircularBuffer.cs
@@ -1,98 +1,125 @@
-namespace StreamMaster.Domain;
-
-public class CircularBuffer
+namespace StreamMaster.Domain
{
- private readonly byte[] buffer;
- private int writePosition = 0;
- private readonly object lockObj = new();
-
- public CircularBuffer(int capacity)
+ public class CircularBuffer
{
- if (capacity <= 0)
- {
- throw new ArgumentException("Capacity must be greater than 0", nameof(capacity));
- }
-
- Capacity = capacity;
- buffer = new byte[capacity];
- }
+ private readonly byte[] buffer;
+ private int writePosition = 0;
+ private readonly object lockObj = new();
- public void Write(byte[] data)
- {
- if (data == null)
+ ///
+ /// Initializes a new instance of the CircularBuffer class with a specified capacity.
+ ///
+ /// The maximum amount of data the buffer can hold.
+ /// Thrown when capacity is less than or equal to 0.
+ public CircularBuffer(int capacity)
{
- throw new ArgumentNullException(nameof(data));
+ if (capacity <= 0)
+ {
+ throw new ArgumentException("Capacity must be greater than 0.", nameof(capacity));
+ }
+
+ Capacity = capacity;
+ buffer = new byte[capacity];
}
- lock (lockObj)
+ ///
+ /// Writes data into the circular buffer. If the buffer does not have enough space,
+ /// older data will be overwritten.
+ ///
+ /// The byte array to write into the buffer.
+ /// Thrown when data is null.
+ public void Write(byte[] data)
{
- int dataLength = data.Length;
- if (dataLength > Capacity)
+ if (data == null)
{
- // If data is larger than the capacity, write only the last part of data that fits.
- Array.Copy(data, dataLength - Capacity, buffer, 0, Capacity);
- writePosition = 0;
- AvailableData = Capacity;
+ throw new ArgumentNullException(nameof(data));
}
- else
- {
- int part1Len = Math.Min(Capacity - writePosition, dataLength);
- Array.Copy(data, 0, buffer, writePosition, part1Len);
- if (part1Len < dataLength)
+ lock (lockObj)
+ {
+ int dataLength = data.Length;
+ if (dataLength > Capacity)
{
- // If the data was split, write the remaining part at the beginning of the buffer.
- Array.Copy(data, part1Len, buffer, 0, dataLength - part1Len);
+ // Overwrite the buffer with the last 'Capacity' bytes of 'data'
+ Array.Copy(data, dataLength - Capacity, buffer, 0, Capacity);
+ writePosition = 0;
+ AvailableData = Capacity;
}
+ else
+ {
+ // Calculate how much data can be written without wrapping
+ int part1Len = Math.Min(Capacity - writePosition, dataLength);
+ Array.Copy(data, 0, buffer, writePosition, part1Len);
+
+ if (part1Len < dataLength)
+ {
+ // Wrap and write the remaining data
+ Array.Copy(data, part1Len, buffer, 0, dataLength - part1Len);
+ }
- writePosition = (writePosition + dataLength) % Capacity;
- AvailableData = Math.Min(AvailableData + dataLength, Capacity);
+ // Update write position and available data, wrapping the write position if necessary
+ writePosition = (writePosition + dataLength) % Capacity;
+ AvailableData = Math.Min(AvailableData + dataLength, Capacity);
+ }
}
}
- }
- public byte[] ReadLatestData()
- {
- lock (lockObj)
+ ///
+ /// Reads the most recent data written into the buffer without removing it.
+ ///
+ /// A byte array containing the latest data. The size of the array is up to the amount of available data.
+ public byte[] ReadLatestData()
{
- byte[] latestData = new byte[AvailableData]; // Prepare an array to hold the latest data.
-
- if (AvailableData == 0)
+ lock (lockObj)
{
- // No data available to read.
- return latestData;
- }
+ byte[] latestData = new byte[AvailableData];
- if (writePosition == 0 || AvailableData == Capacity)
- {
- // If writePosition is 0, it means the buffer has just wrapped around or is exactly full,
- // so the latest data is the entire buffer.
- Array.Copy(buffer, 0, latestData, 0, AvailableData);
- }
- else
- {
- // Calculate the start position for the latest data that's not contiguous.
- int startIdx = writePosition - AvailableData;
- if (startIdx < 0)
+ if (AvailableData == 0)
{
- // The data wraps around; copy the end segment and then the start segment.
- startIdx += Capacity; // Correct the start index to a positive value.
- int part1Length = Capacity - startIdx;
- Array.Copy(buffer, startIdx, latestData, 0, part1Length);
- Array.Copy(buffer, 0, latestData, part1Length, writePosition);
+ return latestData; // Early exit if no data available
+ }
+
+ if (writePosition == 0 || AvailableData == Capacity)
+ {
+ // Buffer is exactly full, or just wrapped around
+ Array.Copy(buffer, 0, latestData, 0, AvailableData);
}
else
{
- // All available data is contiguous and can be copied directly.
- Array.Copy(buffer, startIdx, latestData, 0, AvailableData);
+ // Data wraps around the buffer end; copy in two segments
+ int startIdx = (Capacity + writePosition - AvailableData) % Capacity;
+ int part1Length = Math.Min(AvailableData, Capacity - startIdx);
+ Array.Copy(buffer, startIdx, latestData, 0, part1Length);
+ if (part1Length < AvailableData)
+ {
+ Array.Copy(buffer, 0, latestData, part1Length, AvailableData - part1Length);
+ }
}
- }
- return latestData;
+ return latestData;
+ }
}
- }
+ ///
+ /// Gets the capacity of the buffer.
+ ///
+ public int Capacity { get; }
+ ///
+ /// Gets the amount of data currently stored in the buffer.
+ ///
+ public int AvailableData { get; private set; } = 0;
+
+ ///
+ /// Clears all data from the buffer, resetting its state.
+ ///
+ public void Clear()
+ {
+ lock (lockObj)
+ {
+ writePosition = 0;
+ AvailableData = 0;
+ }
+ }
- public int Capacity { get; }
- public int AvailableData { get; private set; } = 0;
+ }
}
diff --git a/StreamMaster.Infrastructure/Clients/ClientStreamerConfiguration.cs b/StreamMaster.Infrastructure/Clients/ClientStreamerConfiguration.cs
index bc94fa36e..81e7576de 100644
--- a/StreamMaster.Infrastructure/Clients/ClientStreamerConfiguration.cs
+++ b/StreamMaster.Infrastructure/Clients/ClientStreamerConfiguration.cs
@@ -34,12 +34,12 @@ public ClientStreamerConfiguration(
public async Task CancelClient(bool includeAbort = true)
{
- if (ReadBuffer != null)
+ if (Stream != null)
{
- ReadBuffer.ReadChannel?.Writer.Complete();
- ReadBuffer.Cancel();
- ReadBuffer.Dispose();
- ReadBuffer = null;
+ Stream.Channel?.Writer.Complete();
+ Stream.Cancel();
+ Stream.Dispose();
+ Stream = null;
}
try
@@ -71,7 +71,7 @@ public async Task CancelClient(bool includeAbort = true)
}
//Buffering
- public IClientReadStream? ReadBuffer { get; set; }
+ public IClientReadStream? Stream { get; set; }
//Tokens
private CancellationToken ClientHTTPRequestCancellationToken { get; }
diff --git a/StreamMaster.Infrastructure/Services/BroadcastService.cs b/StreamMaster.Infrastructure/Services/BroadcastService.cs
index ef3df8e73..3de0028f8 100644
--- a/StreamMaster.Infrastructure/Services/BroadcastService.cs
+++ b/StreamMaster.Infrastructure/Services/BroadcastService.cs
@@ -71,7 +71,7 @@ public void LogDebug()
//logger.LogInformation("GetStreamHandlers: {GetStreamHandlers}", streamManager.GetStreamHandlers().Count);
foreach (IClientStreamerConfiguration clientStreamerConfiguration in clientStreamer.GetAllClientStreamerConfigurations)
{
- printDebug("Client: {0} {1}", clientStreamerConfiguration.ChannelName, clientStreamerConfiguration.ReadBuffer?.Id ?? Guid.Empty);
+ printDebug("Client: {0} {1}", clientStreamerConfiguration.ChannelName, clientStreamerConfiguration.Stream?.Id ?? Guid.Empty);
}
foreach (IStreamHandler handler in streamManager.GetStreamHandlers())
diff --git a/StreamMaster.Streams.Domain/Interfaces/IClientReadStream.cs b/StreamMaster.Streams.Domain/Interfaces/IClientReadStream.cs
index 1b9bd3caf..1eb23cf29 100644
--- a/StreamMaster.Streams.Domain/Interfaces/IClientReadStream.cs
+++ b/StreamMaster.Streams.Domain/Interfaces/IClientReadStream.cs
@@ -8,7 +8,7 @@ namespace StreamMaster.Streams.Domain.Interfaces;
public interface IClientReadStream : IDisposable
{
string VideoStreamName { get; set; }
- Channel ReadChannel { get; }
+ Channel Channel { get; }
///
/// Gets a value indicating whether the stream supports reading.
diff --git a/StreamMaster.Streams.Domain/Interfaces/IClientStreamerConfiguration.cs b/StreamMaster.Streams.Domain/Interfaces/IClientStreamerConfiguration.cs
index eb46a4f51..9aaa54fc5 100644
--- a/StreamMaster.Streams.Domain/Interfaces/IClientStreamerConfiguration.cs
+++ b/StreamMaster.Streams.Domain/Interfaces/IClientStreamerConfiguration.cs
@@ -9,7 +9,7 @@ public interface IClientStreamerConfiguration
Guid ClientId { get; set; }
string ClientIPAddress { get; set; }
string ClientUserAgent { get; set; }
- IClientReadStream? ReadBuffer { get; set; }
+ IClientReadStream? Stream { get; set; }
CancellationTokenSource ClientMasterToken { get; set; }
string VideoStreamName { get; set; }
}
\ No newline at end of file
diff --git a/StreamMaster.Streams/Buffers/ClientReadStream.Main.cs b/StreamMaster.Streams/Buffers/ClientReadStream.Main.cs
index b946b776a..61a39d891 100644
--- a/StreamMaster.Streams/Buffers/ClientReadStream.Main.cs
+++ b/StreamMaster.Streams/Buffers/ClientReadStream.Main.cs
@@ -1,8 +1,6 @@
using Microsoft.Extensions.Caching.Memory;
using Microsoft.Extensions.Logging;
-using StreamMaster.Domain.Cache;
-
using System.Threading.Channels;
namespace StreamMaster.Streams.Buffers;
@@ -37,12 +35,12 @@ public ClientReadStream(IMemoryCache memoryCache, IStatisticsManager _statistics
SingleReader = true,
SingleWriter = true
};
- ReadChannel = Channel.CreateUnbounded(options);
+ Channel = System.Threading.Channels.Channel.CreateUnbounded(options);
logger.LogInformation("Starting client read stream for ClientId: {ClientId}", ClientId);
}
- public Channel ReadChannel { get; private set; }
+ public Channel Channel { get; private set; }
private bool IsCancelled { get; set; }
@@ -75,11 +73,11 @@ public override async Task ReadAsync(byte[] buffer, int offset, int count,
}
catch (Exception ex)
{
- Setting setting = memoryCache.GetSetting();
- if (setting.EnablePrometheus)
- {
- _readErrorsCounter.WithLabels(ClientId.ToString(), VideoStreamName).Inc();
- }
+ //Setting setting = memoryCache.GetSetting();
+ //if (setting.EnablePrometheus)
+ //{
+ // _readErrorsCounter.WithLabels(ClientId.ToString(), VideoStreamName).Inc();
+ //}
logger.LogError(ex, "Error reading buffer for ClientId: {ClientId}", ClientId);
}
diff --git a/StreamMaster.Streams/Buffers/ClientReadStream.Stats.cs b/StreamMaster.Streams/Buffers/ClientReadStream.Stats.cs
index 212d8e26e..511af488d 100644
--- a/StreamMaster.Streams/Buffers/ClientReadStream.Stats.cs
+++ b/StreamMaster.Streams/Buffers/ClientReadStream.Stats.cs
@@ -1,6 +1,5 @@
using Prometheus;
-using StreamMaster.Domain.Cache;
using StreamMaster.Domain.Extensions;
namespace StreamMaster.Streams.Buffers;
@@ -31,23 +30,23 @@ public sealed partial class ClientReadStream
);
private readonly PerformanceBpsMetrics metrics = new();
- private DateTime _lastUpdateTime = SMDT.UtcNow;
+ private readonly DateTime _lastUpdateTime = SMDT.UtcNow;
private int acculmativeBytesRead = 0;
private void SetMetrics(int bytesRead)
{
- DateTime currentTime = SMDT.UtcNow;
- Setting setting = memoryCache.GetSetting();
+ //DateTime currentTime = SMDT.UtcNow;
+ //Setting setting = memoryCache.GetSetting();
- if (setting.EnablePrometheus && (currentTime - _lastUpdateTime > TimeSpan.FromSeconds(5)))
- {
- double bps = metrics.GetBitsPerSecond();
+ //if (setting.EnablePrometheus && (currentTime - _lastUpdateTime > TimeSpan.FromSeconds(5)))
+ //{
+ // double bps = metrics.GetBitsPerSecond();
- _bitsPerSecond.WithLabels(ClientId.ToString(), VideoStreamName).Set(bps);
- _bytesReadCounter.WithLabels(ClientId.ToString(), VideoStreamName).Inc(acculmativeBytesRead);
+ // _bitsPerSecond.WithLabels(ClientId.ToString(), VideoStreamName).Set(bps);
+ // _bytesReadCounter.WithLabels(ClientId.ToString(), VideoStreamName).Inc(acculmativeBytesRead);
- acculmativeBytesRead = 0;
- _lastUpdateTime = currentTime;
- }
+ // acculmativeBytesRead = 0;
+ // _lastUpdateTime = currentTime;
+ //}
acculmativeBytesRead += bytesRead;
}
diff --git a/StreamMaster.Streams/Buffers/ClientReadStream.cs b/StreamMaster.Streams/Buffers/ClientReadStream.cs
index 8e9efd49c..f239581b3 100644
--- a/StreamMaster.Streams/Buffers/ClientReadStream.cs
+++ b/StreamMaster.Streams/Buffers/ClientReadStream.cs
@@ -9,9 +9,6 @@ public sealed partial class ClientReadStream : Stream, IClientReadStream
{
private readonly CancellationTokenSource _readCancel = new();
private readonly ILogger _readLogger;
- //private int currentPosition = 0;
- //private int GetVideoInfoCount = 0;
- //private readonly Memory test = new(new byte[2 * 1024 * 1000]);
public override async ValueTask ReadAsync(Memory buffer, CancellationToken cancellationToken)
{
@@ -31,7 +28,7 @@ public override async ValueTask ReadAsync(Memory buffer, Cancellation
CancellationTokenSource timedToken = new(TimeSpan.FromSeconds(30));
using CancellationTokenSource linkedCts = CancellationTokenSource.CreateLinkedTokenSource(_readCancel.Token, timedToken.Token, cancellationToken);
- byte[] read = await ReadChannel.Reader.ReadAsync(linkedCts.Token);
+ byte[] read = await Channel.Reader.ReadAsync(linkedCts.Token);
bytesRead = read.Length;
_readLogger.LogDebug("End bytesRead: {bytesRead}", bytesRead);
if (bytesRead == 0)
@@ -39,21 +36,6 @@ public override async ValueTask ReadAsync(Memory buffer, Cancellation
return 0;
}
read[..bytesRead].CopyTo(buffer);
- //if (GetVideoInfoCount < 1)
- //{
- // if (bytesRead + currentPosition > test.Length)
- // {
- // string filePath = Path.Combine(BuildInfo.AppDataFolder, $"streamread_{GetVideoInfoCount}.mp4");
- // await File.WriteAllBytesAsync(filePath, test.ToArray()).ConfigureAwait(false);
- // ++GetVideoInfoCount;
- // currentPosition = 0;
- // }
- // else
- // {
- // read[..bytesRead].CopyTo(test[currentPosition..]);
- // currentPosition += bytesRead;
- // }
- //}
metrics.RecordBytesProcessed(bytesRead);
diff --git a/StreamMaster.Streams/Channels/ChannelManager.cs b/StreamMaster.Streams/Channels/ChannelManager.cs
index 25da53cce..378ed691c 100644
--- a/StreamMaster.Streams/Channels/ChannelManager.cs
+++ b/StreamMaster.Streams/Channels/ChannelManager.cs
@@ -209,7 +209,7 @@ public async Task SimulateStreamFailureForAll()
IChannelStatus? channelStatus = await RegisterWithChannelManager(config);
- if (channelStatus is null || config.ReadBuffer is null)
+ if (channelStatus is null || config.Stream is null)
{
channelService.UnRegisterChannel(config.ChannelVideoStreamId);
logger.LogInformation("Exiting Register Client with null due to channelStatus or Read Buffer being null");
@@ -217,7 +217,7 @@ public async Task SimulateStreamFailureForAll()
}
logger.LogInformation("Finished Register Client");
- return (Stream)config.ReadBuffer;
+ return (Stream)config.Stream;
}
private async Task RegisterWithChannelManager(IClientStreamerConfiguration config)
diff --git a/StreamMaster.Streams/Clients/ClientStreamerManager.cs b/StreamMaster.Streams/Clients/ClientStreamerManager.cs
index 006ff7cc3..d4c53d38d 100644
--- a/StreamMaster.Streams/Clients/ClientStreamerManager.cs
+++ b/StreamMaster.Streams/Clients/ClientStreamerManager.cs
@@ -57,9 +57,9 @@ public async Task AddClientToHandler(Guid clientId, IStreamHandler streamHandler
if (streamerConfiguration != null)
{
streamerConfiguration.VideoStreamName = streamHandler.VideoStreamName;
- streamerConfiguration.ReadBuffer ??= new ClientReadStream(memoryCache, statisticsManager, loggerFactory, streamerConfiguration);
+ streamerConfiguration.Stream ??= new ClientReadStream(memoryCache, statisticsManager, loggerFactory, streamerConfiguration);
- logger.LogDebug("Adding client {ClientId} {ReaderID} ", clientId, streamerConfiguration.ReadBuffer?.Id ?? Guid.NewGuid());
+ logger.LogDebug("Adding client {ClientId} {ReaderID} ", clientId, streamerConfiguration.Stream?.Id ?? Guid.NewGuid());
streamHandler.RegisterClientStreamer(streamerConfiguration);
}
diff --git a/StreamMaster.Streams/Streams/StreamHandler.Stats.cs b/StreamMaster.Streams/Streams/StreamHandler.Stats.cs
index 58f2e2e76..1f9de2b48 100644
--- a/StreamMaster.Streams/Streams/StreamHandler.Stats.cs
+++ b/StreamMaster.Streams/Streams/StreamHandler.Stats.cs
@@ -1,5 +1,4 @@
-using StreamMaster.Domain.Cache;
-using StreamMaster.Domain.Extensions;
+using StreamMaster.Domain.Extensions;
namespace StreamMaster.Streams.Streams;
@@ -13,16 +12,16 @@ public sealed partial class StreamHandler
private int acculmativeBytesWritten = 0;
private void SetMetrics(int bytesWritten)
{
- DateTime currentTime = SMDT.UtcNow;
+ //DateTime currentTime = SMDT.UtcNow;
- Setting setting = memoryCache.GetSetting();
+ //Setting setting = memoryCache.GetSetting();
- if (setting.EnablePrometheus && (currentTime - _lastUpdateTime > TimeSpan.FromSeconds(5)))
- {
- inputStreamStatistics.AddBytesWritten(acculmativeBytesWritten);
- _lastUpdateTime = currentTime;
- acculmativeBytesWritten = 0;
- }
+ //if (setting.EnablePrometheus && (currentTime - _lastUpdateTime > TimeSpan.FromSeconds(5)))
+ //{
+ // inputStreamStatistics.AddBytesWritten(acculmativeBytesWritten);
+ // _lastUpdateTime = currentTime;
+ // acculmativeBytesWritten = 0;
+ //}
acculmativeBytesWritten += bytesWritten;
}
diff --git a/StreamMaster.Streams/Streams/StreamHandler.VideoInfo.cs b/StreamMaster.Streams/Streams/StreamHandler.VideoInfo.cs
index fc8d85647..8d213c67a 100644
--- a/StreamMaster.Streams/Streams/StreamHandler.VideoInfo.cs
+++ b/StreamMaster.Streams/Streams/StreamHandler.VideoInfo.cs
@@ -7,7 +7,6 @@
using System.Diagnostics;
using System.Runtime.InteropServices;
using System.Text.Json;
-using System.Threading.Channels;
namespace StreamMaster.Streams.Streams;
@@ -27,43 +26,6 @@ public VideoInfo GetVideoInfo()
return _videoInfo ?? new();
}
-
- private async Task FillVideoMemoryAsync(byte[] videoMemory, ChannelReader> videoChannelReader, CancellationToken cancellationToken)
- {
- int position = 0;
-
- try
- {
- while (await videoChannelReader.WaitToReadAsync(cancellationToken).ConfigureAwait(false))
- {
- while (videoChannelReader.TryRead(out Memory mem) && mem.Length > 0)
- {
- if (position + mem.Length > videoMemory.Length)
- {
- mem = mem[..(videoMemory.Length - position)];
- }
-
- mem.Span.CopyTo(new Span(videoMemory, position, mem.Length));
- position += mem.Length;
-
- if (position >= videoMemory.Length)
- {
- return;
- }
- }
- }
- }
- catch (OperationCanceledException)
- {
- logger.LogInformation("Operation was cancelled.");
- }
- catch (Exception ex)
- {
- logger.LogError(ex, "An error occurred while filling video memory.");
- throw;
- }
- }
-
public async Task BuildVideoInfoAsync(byte[] videoMemory)
{
bool isLocked = false;
@@ -127,13 +89,6 @@ public async Task BuildVideoInfoAsync(byte[] videoMemory)
}
}
- //private async Task PrepareVideoMemoryAsync()
- //{
- // byte[] videoMemory = new byte[videoBufferSize];
- // await FillVideoMemoryAsync(videoMemory, videoChannel.Reader, cancellationTokenSource.Token).ConfigureAwait(false);
- // return videoMemory;
- //}
-
private string GetFFProbeExecutablePath(Setting settings)
{
string ffprobeExec = Path.Combine(BuildInfo.AppDataFolder, settings.FFProbeExecutable);
diff --git a/StreamMaster.Streams/Streams/StreamHandler.cs b/StreamMaster.Streams/Streams/StreamHandler.cs
index 6fbcfbcc8..c6bfafa66 100644
--- a/StreamMaster.Streams/Streams/StreamHandler.cs
+++ b/StreamMaster.Streams/Streams/StreamHandler.cs
@@ -23,6 +23,29 @@ public sealed partial class StreamHandler
private DateTime LastVideoInfoRun = DateTime.MinValue;
+ // Write to all clients with separate buffers
+ private async Task WriteToAllClientsAsync(byte[] data, CancellationToken cancellationToken)
+ {
+ IEnumerable tasks = clientStreamerConfigs.Values
+ .Where(c => c.Stream != null)
+ .Select(async clientStreamerConfig =>
+ {
+ if (clientStreamerConfig.Stream != null)
+ {
+ try
+ {
+ await clientStreamerConfig.Stream.Channel.Writer.WriteAsync(data, cancellationToken).ConfigureAwait(false);
+ }
+ catch (Exception ex)
+ {
+ logger.LogError(ex, "Failed to write to client {ClientId}", clientStreamerConfig.ClientId);
+ }
+ }
+ });
+
+ await Task.WhenAll(tasks);
+ }
+
public async Task StartVideoStreamingAsync(Stream stream)
{
@@ -50,10 +73,8 @@ public async Task StartVideoStreamingAsync(Stream stream)
bool ran = false;
int accumulatedBytes = 0;
Stopwatch testSw = Stopwatch.StartNew();
+ Memory bufferMemory = new byte[ChunkSize];
- Memory test = new(new byte[videoBufferSize]);
-
- byte[] firstByte = new byte[videoBufferSize];
using (stream)
{
while (!linkedToken.IsCancellationRequested)
@@ -71,10 +92,9 @@ public async Task StartVideoStreamingAsync(Stream stream)
try
{
- Memory bufferMemory = new byte[ChunkSize];
_writeLogger.LogDebug("-------------------{VideoStreamName}-----------------------------", VideoStreamName);
int readBytes = await stream.ReadAsync(bufferMemory, linkedToken.Token);
- _writeLogger.LogDebug("End bytes written: {byteswritten}", readBytes);
+ _writeLogger.LogDebug("End bytes read from input stream: {byteswritten}", readBytes);
if (readBytes == 0)
{
throw new EndOfStreamException();
@@ -89,49 +109,22 @@ public async Task StartVideoStreamingAsync(Stream stream)
byte[] clientDataToSend = new byte[readBytes];
bufferMemory[..readBytes].CopyTo(clientDataToSend);
+ await WriteToAllClientsAsync(clientDataToSend, linkedToken.Token).ConfigureAwait(false);
- foreach (IClientStreamerConfiguration clientStreamerConfig in clientStreamerConfigs.Values)
- {
- if (clientStreamerConfig.ReadBuffer != null)
- {
- await clientStreamerConfig.ReadBuffer.ReadChannel.Writer.WriteAsync(clientDataToSend);
- }
- else
- {
- logger.LogError("ClientStreamerConfig ReadBuffer is null for {ClientId}", clientStreamerConfig.ClientId);
- }
-
- }
+ videoBuffer.Write(clientDataToSend);
+ accumulatedBytes += readBytes;
TimeSpan lastRun = SMDT.UtcNow - LastVideoInfoRun;
- if (lastRun.TotalMinutes >= 30 && accumulatedBytes + readBytes > videoBufferSize)
+ if (lastRun.TotalMinutes >= 10)
{
- int overAge = accumulatedBytes + readBytes - videoBufferSize;
- int toRead = readBytes - overAge;
- if (toRead < 0)
- {
- logger.LogError(overAge, "toRead is less than {overAge}", overAge);
- logger.LogError(overAge, "accumulatedBytes {accumulatedBytes}", accumulatedBytes);
- logger.LogError(overAge, "readBytes {readBytes}", readBytes);
- logger.LogError(overAge, "readBytes {videoBufferSize}", videoBufferSize);
- }
- else
+ if (accumulatedBytes > videoBufferSize)
{
- videoBuffer.Write(clientDataToSend[..toRead]);
+ byte[] processData = videoBuffer.ReadLatestData();
+ _ = BuildVideoInfoAsync(processData);
- byte[] videoMemory = videoBuffer.ReadLatestData();
- Task task = BuildVideoInfoAsync(videoMemory);
-
- ++toRead;
- accumulatedBytes = readBytes - toRead;
- videoBuffer.Write(clientDataToSend[toRead..readBytes]);
+ accumulatedBytes = 0;
}
}
- else
- {
- videoBuffer.Write(clientDataToSend);
- accumulatedBytes += readBytes;
- }
}
catch (TaskCanceledException)
{
@@ -169,7 +162,7 @@ public async Task StartVideoStreamingAsync(Stream stream)
//foreach (IClientStreamerConfiguration clientStreamerConfig in clientStreamerConfigs.Values)
//{
- // clientStreamerConfig.ReadBuffer?.ReadChannel.Writer.Complete();
+ // clientStreamerConfig.Stream?.Channel.Writer.Complete();
//}
IsFailed = true;
stream.Close();
diff --git a/StreamMaster.Streams/Streams/StreamManager.cs b/StreamMaster.Streams/Streams/StreamManager.cs
index cc384049b..8630ed515 100644
--- a/StreamMaster.Streams/Streams/StreamManager.cs
+++ b/StreamMaster.Streams/Streams/StreamManager.cs
@@ -119,14 +119,14 @@ private async void StreamHandler_OnStreamingStoppedEvent(object? sender, StreamH
// //foreach (Guid clientId in streamHandler.GetClientStreamerClientIds())
// //{
// // IClientStreamerConfiguration? clientStreamerConfiguration = await clientStreamerManager.GetClientStreamerConfiguration(clientId);
- // // if (clientStreamerConfiguration != null && clientStreamerConfiguration.ReadBuffer != null)
+ // // if (clientStreamerConfiguration != null && clientStreamerConfiguration.Stream != null)
// // {
// // long _lastReadIndex = streamHandler.CircularRingBuffer.GetNextReadIndex();
// // //if (_lastReadIndex > StreamHandler.ChunkSize)
// // //{
// // // _lastReadIndex -= StreamHandler.ChunkSize;
// // //}
- // // clientStreamerConfiguration.ReadBuffer.SetLastIndex(_lastReadIndex);
+ // // clientStreamerConfiguration.Stream.SetLastIndex(_lastReadIndex);
// // }
// //}
//}
diff --git a/build_docker.ps1 b/build_docker.ps1
index f270c0b16..9c0ac00d8 100644
--- a/build_docker.ps1
+++ b/build_docker.ps1
@@ -25,6 +25,7 @@ function Write-StringToFile {
Write-Host "An error occurred: $_"
}
}
+
function Read-StringFromFile {
param (
[string]$Path
@@ -40,6 +41,30 @@ function Read-StringFromFile {
}
}
+function Copy-File {
+ param (
+ [Parameter(Mandatory=$true)]
+ [string]$sourcePath,
+
+ [Parameter(Mandatory=$true)]
+ [string]$destinationPath
+ )
+
+ try {
+ # Check if the source file exists
+ if (-Not (Test-Path -Path $sourcePath -PathType Leaf)) {
+ Write-Host "Source file does not exist: '$sourcePath'"
+ return
+ }
+
+ # Copy the file to the destination
+ Copy-Item -Path $sourcePath -Destination $destinationPath -ErrorAction Stop
+ Write-Host "File copied successfully from '$sourcePath' to '$destinationPath'."
+ } catch {
+ Write-Host "An error occurred while copying the file: $_"
+ }
+}
+
function Main {
Set-EnvironmentVariables
@@ -52,18 +77,21 @@ function Main {
}
if (-not $SkipRelease -and -not $PrintCommands) {
- if ( $BuildProd -and -not $SkipMainBuild) {
- npx semantic-release # -e release.gh
+
+ if ( $BuildProd ) { #} -and -not $SkipMainBuild) {
+ Copy-File -sourcePath "release.config.release.cjs" -destinationPath "release.config.cjs"
}
else {
- npx semantic-release
+ Copy-File -sourcePath "release.config.norelease.cjs" -destinationPath "release.config.cjs"
}
+
+ npx semantic-release
}
# DownloadFiles
$imageName = "docker.io/senexcrenshaw/streammaster"
- $buildName = $imageName + "-builds"
+ $buildName = "streammaster-builds"
$result = Get-AssemblyInfo -assemblyInfoPath "./StreamMaster.API/AssemblyInfo.cs"
$processedAssemblyInfo = ProcessAssemblyInfo $result
@@ -71,14 +99,14 @@ function Main {
if ($BuildBase -or $BuildAll) {
$dockerFile = "Dockerfile.base"
$global:tags = @("$("${buildName}:"+$processedAssemblyInfo.BranchNameRevision)-base")
- BuildImage -result $processedAssemblyInfo -imageName $buildName -dockerFile $dockerFile
+ BuildImage -result $processedAssemblyInfo -imageName $buildName -dockerFile $dockerFile -pull $true
Write-StringToFile -Path "basever" -Content $processedAssemblyInfo.BranchNameRevision
}
if ($BuildBuild -or $BuildAll) {
$dockerFile = "Dockerfile.build"
$global:tags = @("$("${buildName}:"+$processedAssemblyInfo.BranchNameRevision)-build")
- BuildImage -result $processedAssemblyInfo -imageName $buildName -dockerFile $dockerFile
+ BuildImage -result $processedAssemblyInfo -imageName $buildName -dockerFile $dockerFile -pull $true
Write-StringToFile -Path "buildver" -Content $processedAssemblyInfo.BranchNameRevision
}
@@ -110,7 +138,7 @@ function Main {
Add-ContentAtTop -filePath $dockerFile -contentArray $contentArray
$global:tags = DetermineTags -result $processedAssemblyInfo -imageName $imageName
- BuildImage -result $processedAssemblyInfo -imageName $imageName -dockerFile $dockerFile
+ BuildImage -result $processedAssemblyInfo -imageName $imageName -dockerFile $dockerFile -push $true
}
}
Function Add-ContentAtTop {
@@ -214,7 +242,13 @@ function BuildImage {
$dockerFile,
[Parameter(Mandatory = $true)]
- [string]$imageName
+ [string]$imageName,
+
+ [Parameter()]
+ [bool]$push = $false,
+
+ [Parameter()]
+ [bool]$pull = $false
)
@@ -223,7 +257,18 @@ function BuildImage {
$global:tags | ForEach-Object { Write-Host $_ }
# Construct the Docker build command using the tags and the specified Dockerfile
- $buildCommand = "docker buildx build --pull --platform ""linux/amd64,linux/arm64"" -f ./$dockerFile . --push"
+ $buildCommand = "docker buildx build "
+ if ( $pull) {
+ $buildCommand += " --pull"
+ }
+ $buildCommand += " --platform ""linux/amd64,linux/arm64"" -f ./$dockerFile ."
+
+ if ( $push) {
+ $buildCommand += " --push"
+ }
+ else {
+ $buildCommand += " --load"
+ }
foreach ($tag in $global:tags) {
$buildCommand += " --tag=$tag"
}
diff --git a/release.config.norelease.cjs b/release.config.norelease.cjs
new file mode 100644
index 000000000..b4cb58590
--- /dev/null
+++ b/release.config.norelease.cjs
@@ -0,0 +1,51 @@
+/**
+ * @type {import('semantic-release').GlobalConfig}
+ */
+
+module.exports = {
+ branches: [
+ // "+([0-9])?(.{+([0-9]),x}).x",
+ {
+ channel: "main",
+ name: "main",
+ prerelease: false
+ },
+ {
+ name: "!main",
+ prerelease: true
+ }
+ ],
+ ci: false,
+ debug: false,
+ dryRun: false,
+ plugins: [
+ [
+ "@semantic-release/commit-analyzer",
+ {
+ preset: "angular",
+ releaseRules: [
+ { type: "docs", scope: "README", release: "patch" },
+ { type: "refactor", release: "minor" },
+ { type: "style", release: "patch" },
+ // { type: "build", release: "false" },
+ { scope: "no-release", release: false },
+ { type: "update", release: "patch" }
+ ],
+ parserOpts: {
+ noteKeywords: ["BREAKING CHANGE", "BREAKING CHANGES"]
+ }
+ }
+ ],
+ [
+ "@semantic-release/exec",
+ {
+ verifyConditionsCmd: ":",
+ publishCmd: [
+ "node updateAssemblyInfo.js ${nextRelease.version} ${nextRelease.gitHead} ${nextRelease.channel}",
+ "git add ./StreamMaster.API/AssemblyInfo.cs",
+ 'git diff-index --quiet HEAD || git commit -m "chore: update AssemblyInfo.cs to version ${nextRelease.version}"'
+ ].join(" && ")
+ }
+ ]
+ ]
+};
diff --git a/release.config.cjs b/release.config.release.cjs
similarity index 93%
rename from release.config.cjs
rename to release.config.release.cjs
index 0597b3886..0cb960606 100644
--- a/release.config.cjs
+++ b/release.config.release.cjs
@@ -27,7 +27,8 @@ module.exports = {
{ type: "docs", scope: "README", release: "patch" },
{ type: "refactor", release: "minor" },
{ type: "style", release: "patch" },
- { type: "build", release: "false" },
+ // { type: "build", release: "false" },
+ { scope: "no-release", release: false },
{ type: "update", release: "patch" }
],
parserOpts: {