diff --git a/.gitignore b/.gitignore index 74fa0986c..274fdf364 100644 --- a/.gitignore +++ b/.gitignore @@ -370,3 +370,4 @@ Dockerfile.sm basever buildver smver +release.config.cjs diff --git a/CHANGELOG.md b/CHANGELOG.md index aaa6b9fdf..314b95297 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,31 @@ +## [0.13.2-sh.2](https://github.com/SenexCrenshaw/StreamMaster/compare/v0.13.2-sh.1...v0.13.2-sh.2) (2024-02-22) + + +### Bug Fixes + +* Client writing ([d3f4f54](https://github.com/SenexCrenshaw/StreamMaster/commit/d3f4f54dcfa39b0d9ccb65e5a974bd72f0848d62)) + +## [0.13.2-sh.2](https://github.com/SenexCrenshaw/StreamMaster/compare/v0.13.2-sh.1...v0.13.2-sh.2) (2024-02-22) + + +### Bug Fixes + +* Client writing ([d3f4f54](https://github.com/SenexCrenshaw/StreamMaster/commit/d3f4f54dcfa39b0d9ccb65e5a974bd72f0848d62)) + +## [0.13.2-sh.2](https://github.com/SenexCrenshaw/StreamMaster/compare/v0.13.2-sh.1...v0.13.2-sh.2) (2024-02-22) + + +### Bug Fixes + +* Client writing ([d3f4f54](https://github.com/SenexCrenshaw/StreamMaster/commit/d3f4f54dcfa39b0d9ccb65e5a974bd72f0848d62)) + +## [0.13.2-sh.1](https://github.com/SenexCrenshaw/StreamMaster/compare/v0.13.1...v0.13.2-sh.1) (2024-02-21) + + +### Bug Fixes + +* update videoinfo ([f7231ac](https://github.com/SenexCrenshaw/StreamMaster/commit/f7231ac323601b3a0387e4d48e7c3c3260eea728)) + ## [0.13.1](https://github.com/SenexCrenshaw/StreamMaster/compare/v0.13.0...v0.13.1) (2024-02-11) diff --git a/Dockerfile.base b/Dockerfile.base index abbccde6f..aefeda527 100644 --- a/Dockerfile.base +++ b/Dockerfile.base @@ -1,10 +1,18 @@ FROM mcr.microsoft.com/dotnet/aspnet:8.0 AS base +ARG TARGETPLATFORM +ARG TARGETOS +ARG TARGETARCH +ARG TARGETVARIANT WORKDIR /app -RUN apt-get update -yq \ +RUN rm -f /etc/apt/apt.conf.d/docker-clean; echo 'Binary::apt::APT::Keep-Downloaded-Packages "true";' > /etc/apt/apt.conf.d/keep-cache + +RUN \ + --mount=type=cache,id=apt-$TARGETARCH$TARGETVARIANT,sharing=locked,target=/var/cache/apt \ + apt-get update -yq \ && apt-get upgrade -yq \ && apt-get install -yq --no-install-recommends ffmpeg gosu postgresql postgresql-common curl gnupg lsb-release \ && apt-get clean \ - && rm -rf /var/lib/apt/lists/* \ + && rm -rf /var/lib/apt/lists/* \ && mkdir /docker-entrypoint-initdb.d \ No newline at end of file diff --git a/Dockerfile.build b/Dockerfile.build index 01911e094..e2cf3d17a 100644 --- a/Dockerfile.build +++ b/Dockerfile.build @@ -1,6 +1,14 @@ FROM mcr.microsoft.com/dotnet/sdk:8.0 AS build +ARG TARGETPLATFORM +ARG TARGETOS +ARG TARGETARCH +ARG TARGETVARIANT -RUN apt-get update -yq \ +RUN rm -f /etc/apt/apt.conf.d/docker-clean; echo 'Binary::apt::APT::Keep-Downloaded-Packages "true";' > /etc/apt/apt.conf.d/keep-cache + +RUN \ + --mount=type=cache,id=apt-$TARGETARCH$TARGETVARIANT,sharing=locked,target=/var/cache/apt \ + apt-get update -yq \ && apt-get upgrade -yq \ && curl -fsSL https://deb.nodesource.com/setup_21.x | bash - \ && apt-get install --no-install-recommends -yq nodejs \ diff --git a/StreamMaster.API/AssemblyInfo.cs b/StreamMaster.API/AssemblyInfo.cs index 12a20a013..87d051734 100644 --- a/StreamMaster.API/AssemblyInfo.cs +++ b/StreamMaster.API/AssemblyInfo.cs @@ -1,5 +1,5 @@ using System.Reflection; -[assembly: AssemblyVersion("0.13.1.0")] -[assembly: AssemblyFileVersion("0.13.1.0")] -[assembly: AssemblyInformationalVersion("0.13.1.Sha.19d3cf9344080dfb25a6559125410d00a3540665")] \ No newline at end of file +[assembly: AssemblyVersion("0.13.2.3")] +[assembly: AssemblyFileVersion("0.13.2.3")] +[assembly: AssemblyInformationalVersion("0.13.2-sh.3.Sha.d1570728f5c010f25cd649695fc1d1e339b3d084")] \ No newline at end of file diff --git a/StreamMaster.Domain/CircularBuffer.cs b/StreamMaster.Domain/CircularBuffer.cs index fb3aa2a9a..394c55331 100644 --- a/StreamMaster.Domain/CircularBuffer.cs +++ b/StreamMaster.Domain/CircularBuffer.cs @@ -1,98 +1,125 @@ -namespace StreamMaster.Domain; - -public class CircularBuffer +namespace StreamMaster.Domain { - private readonly byte[] buffer; - private int writePosition = 0; - private readonly object lockObj = new(); - - public CircularBuffer(int capacity) + public class CircularBuffer { - if (capacity <= 0) - { - throw new ArgumentException("Capacity must be greater than 0", nameof(capacity)); - } - - Capacity = capacity; - buffer = new byte[capacity]; - } + private readonly byte[] buffer; + private int writePosition = 0; + private readonly object lockObj = new(); - public void Write(byte[] data) - { - if (data == null) + /// + /// Initializes a new instance of the CircularBuffer class with a specified capacity. + /// + /// The maximum amount of data the buffer can hold. + /// Thrown when capacity is less than or equal to 0. + public CircularBuffer(int capacity) { - throw new ArgumentNullException(nameof(data)); + if (capacity <= 0) + { + throw new ArgumentException("Capacity must be greater than 0.", nameof(capacity)); + } + + Capacity = capacity; + buffer = new byte[capacity]; } - lock (lockObj) + /// + /// Writes data into the circular buffer. If the buffer does not have enough space, + /// older data will be overwritten. + /// + /// The byte array to write into the buffer. + /// Thrown when data is null. + public void Write(byte[] data) { - int dataLength = data.Length; - if (dataLength > Capacity) + if (data == null) { - // If data is larger than the capacity, write only the last part of data that fits. - Array.Copy(data, dataLength - Capacity, buffer, 0, Capacity); - writePosition = 0; - AvailableData = Capacity; + throw new ArgumentNullException(nameof(data)); } - else - { - int part1Len = Math.Min(Capacity - writePosition, dataLength); - Array.Copy(data, 0, buffer, writePosition, part1Len); - if (part1Len < dataLength) + lock (lockObj) + { + int dataLength = data.Length; + if (dataLength > Capacity) { - // If the data was split, write the remaining part at the beginning of the buffer. - Array.Copy(data, part1Len, buffer, 0, dataLength - part1Len); + // Overwrite the buffer with the last 'Capacity' bytes of 'data' + Array.Copy(data, dataLength - Capacity, buffer, 0, Capacity); + writePosition = 0; + AvailableData = Capacity; } + else + { + // Calculate how much data can be written without wrapping + int part1Len = Math.Min(Capacity - writePosition, dataLength); + Array.Copy(data, 0, buffer, writePosition, part1Len); + + if (part1Len < dataLength) + { + // Wrap and write the remaining data + Array.Copy(data, part1Len, buffer, 0, dataLength - part1Len); + } - writePosition = (writePosition + dataLength) % Capacity; - AvailableData = Math.Min(AvailableData + dataLength, Capacity); + // Update write position and available data, wrapping the write position if necessary + writePosition = (writePosition + dataLength) % Capacity; + AvailableData = Math.Min(AvailableData + dataLength, Capacity); + } } } - } - public byte[] ReadLatestData() - { - lock (lockObj) + /// + /// Reads the most recent data written into the buffer without removing it. + /// + /// A byte array containing the latest data. The size of the array is up to the amount of available data. + public byte[] ReadLatestData() { - byte[] latestData = new byte[AvailableData]; // Prepare an array to hold the latest data. - - if (AvailableData == 0) + lock (lockObj) { - // No data available to read. - return latestData; - } + byte[] latestData = new byte[AvailableData]; - if (writePosition == 0 || AvailableData == Capacity) - { - // If writePosition is 0, it means the buffer has just wrapped around or is exactly full, - // so the latest data is the entire buffer. - Array.Copy(buffer, 0, latestData, 0, AvailableData); - } - else - { - // Calculate the start position for the latest data that's not contiguous. - int startIdx = writePosition - AvailableData; - if (startIdx < 0) + if (AvailableData == 0) { - // The data wraps around; copy the end segment and then the start segment. - startIdx += Capacity; // Correct the start index to a positive value. - int part1Length = Capacity - startIdx; - Array.Copy(buffer, startIdx, latestData, 0, part1Length); - Array.Copy(buffer, 0, latestData, part1Length, writePosition); + return latestData; // Early exit if no data available + } + + if (writePosition == 0 || AvailableData == Capacity) + { + // Buffer is exactly full, or just wrapped around + Array.Copy(buffer, 0, latestData, 0, AvailableData); } else { - // All available data is contiguous and can be copied directly. - Array.Copy(buffer, startIdx, latestData, 0, AvailableData); + // Data wraps around the buffer end; copy in two segments + int startIdx = (Capacity + writePosition - AvailableData) % Capacity; + int part1Length = Math.Min(AvailableData, Capacity - startIdx); + Array.Copy(buffer, startIdx, latestData, 0, part1Length); + if (part1Length < AvailableData) + { + Array.Copy(buffer, 0, latestData, part1Length, AvailableData - part1Length); + } } - } - return latestData; + return latestData; + } } - } + /// + /// Gets the capacity of the buffer. + /// + public int Capacity { get; } + /// + /// Gets the amount of data currently stored in the buffer. + /// + public int AvailableData { get; private set; } = 0; + + /// + /// Clears all data from the buffer, resetting its state. + /// + public void Clear() + { + lock (lockObj) + { + writePosition = 0; + AvailableData = 0; + } + } - public int Capacity { get; } - public int AvailableData { get; private set; } = 0; + } } diff --git a/StreamMaster.Infrastructure/Clients/ClientStreamerConfiguration.cs b/StreamMaster.Infrastructure/Clients/ClientStreamerConfiguration.cs index bc94fa36e..81e7576de 100644 --- a/StreamMaster.Infrastructure/Clients/ClientStreamerConfiguration.cs +++ b/StreamMaster.Infrastructure/Clients/ClientStreamerConfiguration.cs @@ -34,12 +34,12 @@ public ClientStreamerConfiguration( public async Task CancelClient(bool includeAbort = true) { - if (ReadBuffer != null) + if (Stream != null) { - ReadBuffer.ReadChannel?.Writer.Complete(); - ReadBuffer.Cancel(); - ReadBuffer.Dispose(); - ReadBuffer = null; + Stream.Channel?.Writer.Complete(); + Stream.Cancel(); + Stream.Dispose(); + Stream = null; } try @@ -71,7 +71,7 @@ public async Task CancelClient(bool includeAbort = true) } //Buffering - public IClientReadStream? ReadBuffer { get; set; } + public IClientReadStream? Stream { get; set; } //Tokens private CancellationToken ClientHTTPRequestCancellationToken { get; } diff --git a/StreamMaster.Infrastructure/Services/BroadcastService.cs b/StreamMaster.Infrastructure/Services/BroadcastService.cs index ef3df8e73..3de0028f8 100644 --- a/StreamMaster.Infrastructure/Services/BroadcastService.cs +++ b/StreamMaster.Infrastructure/Services/BroadcastService.cs @@ -71,7 +71,7 @@ public void LogDebug() //logger.LogInformation("GetStreamHandlers: {GetStreamHandlers}", streamManager.GetStreamHandlers().Count); foreach (IClientStreamerConfiguration clientStreamerConfiguration in clientStreamer.GetAllClientStreamerConfigurations) { - printDebug("Client: {0} {1}", clientStreamerConfiguration.ChannelName, clientStreamerConfiguration.ReadBuffer?.Id ?? Guid.Empty); + printDebug("Client: {0} {1}", clientStreamerConfiguration.ChannelName, clientStreamerConfiguration.Stream?.Id ?? Guid.Empty); } foreach (IStreamHandler handler in streamManager.GetStreamHandlers()) diff --git a/StreamMaster.Streams.Domain/Interfaces/IClientReadStream.cs b/StreamMaster.Streams.Domain/Interfaces/IClientReadStream.cs index 1b9bd3caf..1eb23cf29 100644 --- a/StreamMaster.Streams.Domain/Interfaces/IClientReadStream.cs +++ b/StreamMaster.Streams.Domain/Interfaces/IClientReadStream.cs @@ -8,7 +8,7 @@ namespace StreamMaster.Streams.Domain.Interfaces; public interface IClientReadStream : IDisposable { string VideoStreamName { get; set; } - Channel ReadChannel { get; } + Channel Channel { get; } /// /// Gets a value indicating whether the stream supports reading. diff --git a/StreamMaster.Streams.Domain/Interfaces/IClientStreamerConfiguration.cs b/StreamMaster.Streams.Domain/Interfaces/IClientStreamerConfiguration.cs index eb46a4f51..9aaa54fc5 100644 --- a/StreamMaster.Streams.Domain/Interfaces/IClientStreamerConfiguration.cs +++ b/StreamMaster.Streams.Domain/Interfaces/IClientStreamerConfiguration.cs @@ -9,7 +9,7 @@ public interface IClientStreamerConfiguration Guid ClientId { get; set; } string ClientIPAddress { get; set; } string ClientUserAgent { get; set; } - IClientReadStream? ReadBuffer { get; set; } + IClientReadStream? Stream { get; set; } CancellationTokenSource ClientMasterToken { get; set; } string VideoStreamName { get; set; } } \ No newline at end of file diff --git a/StreamMaster.Streams/Buffers/ClientReadStream.Main.cs b/StreamMaster.Streams/Buffers/ClientReadStream.Main.cs index b946b776a..61a39d891 100644 --- a/StreamMaster.Streams/Buffers/ClientReadStream.Main.cs +++ b/StreamMaster.Streams/Buffers/ClientReadStream.Main.cs @@ -1,8 +1,6 @@ using Microsoft.Extensions.Caching.Memory; using Microsoft.Extensions.Logging; -using StreamMaster.Domain.Cache; - using System.Threading.Channels; namespace StreamMaster.Streams.Buffers; @@ -37,12 +35,12 @@ public ClientReadStream(IMemoryCache memoryCache, IStatisticsManager _statistics SingleReader = true, SingleWriter = true }; - ReadChannel = Channel.CreateUnbounded(options); + Channel = System.Threading.Channels.Channel.CreateUnbounded(options); logger.LogInformation("Starting client read stream for ClientId: {ClientId}", ClientId); } - public Channel ReadChannel { get; private set; } + public Channel Channel { get; private set; } private bool IsCancelled { get; set; } @@ -75,11 +73,11 @@ public override async Task ReadAsync(byte[] buffer, int offset, int count, } catch (Exception ex) { - Setting setting = memoryCache.GetSetting(); - if (setting.EnablePrometheus) - { - _readErrorsCounter.WithLabels(ClientId.ToString(), VideoStreamName).Inc(); - } + //Setting setting = memoryCache.GetSetting(); + //if (setting.EnablePrometheus) + //{ + // _readErrorsCounter.WithLabels(ClientId.ToString(), VideoStreamName).Inc(); + //} logger.LogError(ex, "Error reading buffer for ClientId: {ClientId}", ClientId); } diff --git a/StreamMaster.Streams/Buffers/ClientReadStream.Stats.cs b/StreamMaster.Streams/Buffers/ClientReadStream.Stats.cs index 212d8e26e..511af488d 100644 --- a/StreamMaster.Streams/Buffers/ClientReadStream.Stats.cs +++ b/StreamMaster.Streams/Buffers/ClientReadStream.Stats.cs @@ -1,6 +1,5 @@ using Prometheus; -using StreamMaster.Domain.Cache; using StreamMaster.Domain.Extensions; namespace StreamMaster.Streams.Buffers; @@ -31,23 +30,23 @@ public sealed partial class ClientReadStream ); private readonly PerformanceBpsMetrics metrics = new(); - private DateTime _lastUpdateTime = SMDT.UtcNow; + private readonly DateTime _lastUpdateTime = SMDT.UtcNow; private int acculmativeBytesRead = 0; private void SetMetrics(int bytesRead) { - DateTime currentTime = SMDT.UtcNow; - Setting setting = memoryCache.GetSetting(); + //DateTime currentTime = SMDT.UtcNow; + //Setting setting = memoryCache.GetSetting(); - if (setting.EnablePrometheus && (currentTime - _lastUpdateTime > TimeSpan.FromSeconds(5))) - { - double bps = metrics.GetBitsPerSecond(); + //if (setting.EnablePrometheus && (currentTime - _lastUpdateTime > TimeSpan.FromSeconds(5))) + //{ + // double bps = metrics.GetBitsPerSecond(); - _bitsPerSecond.WithLabels(ClientId.ToString(), VideoStreamName).Set(bps); - _bytesReadCounter.WithLabels(ClientId.ToString(), VideoStreamName).Inc(acculmativeBytesRead); + // _bitsPerSecond.WithLabels(ClientId.ToString(), VideoStreamName).Set(bps); + // _bytesReadCounter.WithLabels(ClientId.ToString(), VideoStreamName).Inc(acculmativeBytesRead); - acculmativeBytesRead = 0; - _lastUpdateTime = currentTime; - } + // acculmativeBytesRead = 0; + // _lastUpdateTime = currentTime; + //} acculmativeBytesRead += bytesRead; } diff --git a/StreamMaster.Streams/Buffers/ClientReadStream.cs b/StreamMaster.Streams/Buffers/ClientReadStream.cs index 8e9efd49c..f239581b3 100644 --- a/StreamMaster.Streams/Buffers/ClientReadStream.cs +++ b/StreamMaster.Streams/Buffers/ClientReadStream.cs @@ -9,9 +9,6 @@ public sealed partial class ClientReadStream : Stream, IClientReadStream { private readonly CancellationTokenSource _readCancel = new(); private readonly ILogger _readLogger; - //private int currentPosition = 0; - //private int GetVideoInfoCount = 0; - //private readonly Memory test = new(new byte[2 * 1024 * 1000]); public override async ValueTask ReadAsync(Memory buffer, CancellationToken cancellationToken) { @@ -31,7 +28,7 @@ public override async ValueTask ReadAsync(Memory buffer, Cancellation CancellationTokenSource timedToken = new(TimeSpan.FromSeconds(30)); using CancellationTokenSource linkedCts = CancellationTokenSource.CreateLinkedTokenSource(_readCancel.Token, timedToken.Token, cancellationToken); - byte[] read = await ReadChannel.Reader.ReadAsync(linkedCts.Token); + byte[] read = await Channel.Reader.ReadAsync(linkedCts.Token); bytesRead = read.Length; _readLogger.LogDebug("End bytesRead: {bytesRead}", bytesRead); if (bytesRead == 0) @@ -39,21 +36,6 @@ public override async ValueTask ReadAsync(Memory buffer, Cancellation return 0; } read[..bytesRead].CopyTo(buffer); - //if (GetVideoInfoCount < 1) - //{ - // if (bytesRead + currentPosition > test.Length) - // { - // string filePath = Path.Combine(BuildInfo.AppDataFolder, $"streamread_{GetVideoInfoCount}.mp4"); - // await File.WriteAllBytesAsync(filePath, test.ToArray()).ConfigureAwait(false); - // ++GetVideoInfoCount; - // currentPosition = 0; - // } - // else - // { - // read[..bytesRead].CopyTo(test[currentPosition..]); - // currentPosition += bytesRead; - // } - //} metrics.RecordBytesProcessed(bytesRead); diff --git a/StreamMaster.Streams/Channels/ChannelManager.cs b/StreamMaster.Streams/Channels/ChannelManager.cs index 25da53cce..378ed691c 100644 --- a/StreamMaster.Streams/Channels/ChannelManager.cs +++ b/StreamMaster.Streams/Channels/ChannelManager.cs @@ -209,7 +209,7 @@ public async Task SimulateStreamFailureForAll() IChannelStatus? channelStatus = await RegisterWithChannelManager(config); - if (channelStatus is null || config.ReadBuffer is null) + if (channelStatus is null || config.Stream is null) { channelService.UnRegisterChannel(config.ChannelVideoStreamId); logger.LogInformation("Exiting Register Client with null due to channelStatus or Read Buffer being null"); @@ -217,7 +217,7 @@ public async Task SimulateStreamFailureForAll() } logger.LogInformation("Finished Register Client"); - return (Stream)config.ReadBuffer; + return (Stream)config.Stream; } private async Task RegisterWithChannelManager(IClientStreamerConfiguration config) diff --git a/StreamMaster.Streams/Clients/ClientStreamerManager.cs b/StreamMaster.Streams/Clients/ClientStreamerManager.cs index 006ff7cc3..d4c53d38d 100644 --- a/StreamMaster.Streams/Clients/ClientStreamerManager.cs +++ b/StreamMaster.Streams/Clients/ClientStreamerManager.cs @@ -57,9 +57,9 @@ public async Task AddClientToHandler(Guid clientId, IStreamHandler streamHandler if (streamerConfiguration != null) { streamerConfiguration.VideoStreamName = streamHandler.VideoStreamName; - streamerConfiguration.ReadBuffer ??= new ClientReadStream(memoryCache, statisticsManager, loggerFactory, streamerConfiguration); + streamerConfiguration.Stream ??= new ClientReadStream(memoryCache, statisticsManager, loggerFactory, streamerConfiguration); - logger.LogDebug("Adding client {ClientId} {ReaderID} ", clientId, streamerConfiguration.ReadBuffer?.Id ?? Guid.NewGuid()); + logger.LogDebug("Adding client {ClientId} {ReaderID} ", clientId, streamerConfiguration.Stream?.Id ?? Guid.NewGuid()); streamHandler.RegisterClientStreamer(streamerConfiguration); } diff --git a/StreamMaster.Streams/Streams/StreamHandler.Stats.cs b/StreamMaster.Streams/Streams/StreamHandler.Stats.cs index 58f2e2e76..1f9de2b48 100644 --- a/StreamMaster.Streams/Streams/StreamHandler.Stats.cs +++ b/StreamMaster.Streams/Streams/StreamHandler.Stats.cs @@ -1,5 +1,4 @@ -using StreamMaster.Domain.Cache; -using StreamMaster.Domain.Extensions; +using StreamMaster.Domain.Extensions; namespace StreamMaster.Streams.Streams; @@ -13,16 +12,16 @@ public sealed partial class StreamHandler private int acculmativeBytesWritten = 0; private void SetMetrics(int bytesWritten) { - DateTime currentTime = SMDT.UtcNow; + //DateTime currentTime = SMDT.UtcNow; - Setting setting = memoryCache.GetSetting(); + //Setting setting = memoryCache.GetSetting(); - if (setting.EnablePrometheus && (currentTime - _lastUpdateTime > TimeSpan.FromSeconds(5))) - { - inputStreamStatistics.AddBytesWritten(acculmativeBytesWritten); - _lastUpdateTime = currentTime; - acculmativeBytesWritten = 0; - } + //if (setting.EnablePrometheus && (currentTime - _lastUpdateTime > TimeSpan.FromSeconds(5))) + //{ + // inputStreamStatistics.AddBytesWritten(acculmativeBytesWritten); + // _lastUpdateTime = currentTime; + // acculmativeBytesWritten = 0; + //} acculmativeBytesWritten += bytesWritten; } diff --git a/StreamMaster.Streams/Streams/StreamHandler.VideoInfo.cs b/StreamMaster.Streams/Streams/StreamHandler.VideoInfo.cs index fc8d85647..8d213c67a 100644 --- a/StreamMaster.Streams/Streams/StreamHandler.VideoInfo.cs +++ b/StreamMaster.Streams/Streams/StreamHandler.VideoInfo.cs @@ -7,7 +7,6 @@ using System.Diagnostics; using System.Runtime.InteropServices; using System.Text.Json; -using System.Threading.Channels; namespace StreamMaster.Streams.Streams; @@ -27,43 +26,6 @@ public VideoInfo GetVideoInfo() return _videoInfo ?? new(); } - - private async Task FillVideoMemoryAsync(byte[] videoMemory, ChannelReader> videoChannelReader, CancellationToken cancellationToken) - { - int position = 0; - - try - { - while (await videoChannelReader.WaitToReadAsync(cancellationToken).ConfigureAwait(false)) - { - while (videoChannelReader.TryRead(out Memory mem) && mem.Length > 0) - { - if (position + mem.Length > videoMemory.Length) - { - mem = mem[..(videoMemory.Length - position)]; - } - - mem.Span.CopyTo(new Span(videoMemory, position, mem.Length)); - position += mem.Length; - - if (position >= videoMemory.Length) - { - return; - } - } - } - } - catch (OperationCanceledException) - { - logger.LogInformation("Operation was cancelled."); - } - catch (Exception ex) - { - logger.LogError(ex, "An error occurred while filling video memory."); - throw; - } - } - public async Task BuildVideoInfoAsync(byte[] videoMemory) { bool isLocked = false; @@ -127,13 +89,6 @@ public async Task BuildVideoInfoAsync(byte[] videoMemory) } } - //private async Task PrepareVideoMemoryAsync() - //{ - // byte[] videoMemory = new byte[videoBufferSize]; - // await FillVideoMemoryAsync(videoMemory, videoChannel.Reader, cancellationTokenSource.Token).ConfigureAwait(false); - // return videoMemory; - //} - private string GetFFProbeExecutablePath(Setting settings) { string ffprobeExec = Path.Combine(BuildInfo.AppDataFolder, settings.FFProbeExecutable); diff --git a/StreamMaster.Streams/Streams/StreamHandler.cs b/StreamMaster.Streams/Streams/StreamHandler.cs index 6fbcfbcc8..c6bfafa66 100644 --- a/StreamMaster.Streams/Streams/StreamHandler.cs +++ b/StreamMaster.Streams/Streams/StreamHandler.cs @@ -23,6 +23,29 @@ public sealed partial class StreamHandler private DateTime LastVideoInfoRun = DateTime.MinValue; + // Write to all clients with separate buffers + private async Task WriteToAllClientsAsync(byte[] data, CancellationToken cancellationToken) + { + IEnumerable tasks = clientStreamerConfigs.Values + .Where(c => c.Stream != null) + .Select(async clientStreamerConfig => + { + if (clientStreamerConfig.Stream != null) + { + try + { + await clientStreamerConfig.Stream.Channel.Writer.WriteAsync(data, cancellationToken).ConfigureAwait(false); + } + catch (Exception ex) + { + logger.LogError(ex, "Failed to write to client {ClientId}", clientStreamerConfig.ClientId); + } + } + }); + + await Task.WhenAll(tasks); + } + public async Task StartVideoStreamingAsync(Stream stream) { @@ -50,10 +73,8 @@ public async Task StartVideoStreamingAsync(Stream stream) bool ran = false; int accumulatedBytes = 0; Stopwatch testSw = Stopwatch.StartNew(); + Memory bufferMemory = new byte[ChunkSize]; - Memory test = new(new byte[videoBufferSize]); - - byte[] firstByte = new byte[videoBufferSize]; using (stream) { while (!linkedToken.IsCancellationRequested) @@ -71,10 +92,9 @@ public async Task StartVideoStreamingAsync(Stream stream) try { - Memory bufferMemory = new byte[ChunkSize]; _writeLogger.LogDebug("-------------------{VideoStreamName}-----------------------------", VideoStreamName); int readBytes = await stream.ReadAsync(bufferMemory, linkedToken.Token); - _writeLogger.LogDebug("End bytes written: {byteswritten}", readBytes); + _writeLogger.LogDebug("End bytes read from input stream: {byteswritten}", readBytes); if (readBytes == 0) { throw new EndOfStreamException(); @@ -89,49 +109,22 @@ public async Task StartVideoStreamingAsync(Stream stream) byte[] clientDataToSend = new byte[readBytes]; bufferMemory[..readBytes].CopyTo(clientDataToSend); + await WriteToAllClientsAsync(clientDataToSend, linkedToken.Token).ConfigureAwait(false); - foreach (IClientStreamerConfiguration clientStreamerConfig in clientStreamerConfigs.Values) - { - if (clientStreamerConfig.ReadBuffer != null) - { - await clientStreamerConfig.ReadBuffer.ReadChannel.Writer.WriteAsync(clientDataToSend); - } - else - { - logger.LogError("ClientStreamerConfig ReadBuffer is null for {ClientId}", clientStreamerConfig.ClientId); - } - - } + videoBuffer.Write(clientDataToSend); + accumulatedBytes += readBytes; TimeSpan lastRun = SMDT.UtcNow - LastVideoInfoRun; - if (lastRun.TotalMinutes >= 30 && accumulatedBytes + readBytes > videoBufferSize) + if (lastRun.TotalMinutes >= 10) { - int overAge = accumulatedBytes + readBytes - videoBufferSize; - int toRead = readBytes - overAge; - if (toRead < 0) - { - logger.LogError(overAge, "toRead is less than {overAge}", overAge); - logger.LogError(overAge, "accumulatedBytes {accumulatedBytes}", accumulatedBytes); - logger.LogError(overAge, "readBytes {readBytes}", readBytes); - logger.LogError(overAge, "readBytes {videoBufferSize}", videoBufferSize); - } - else + if (accumulatedBytes > videoBufferSize) { - videoBuffer.Write(clientDataToSend[..toRead]); + byte[] processData = videoBuffer.ReadLatestData(); + _ = BuildVideoInfoAsync(processData); - byte[] videoMemory = videoBuffer.ReadLatestData(); - Task task = BuildVideoInfoAsync(videoMemory); - - ++toRead; - accumulatedBytes = readBytes - toRead; - videoBuffer.Write(clientDataToSend[toRead..readBytes]); + accumulatedBytes = 0; } } - else - { - videoBuffer.Write(clientDataToSend); - accumulatedBytes += readBytes; - } } catch (TaskCanceledException) { @@ -169,7 +162,7 @@ public async Task StartVideoStreamingAsync(Stream stream) //foreach (IClientStreamerConfiguration clientStreamerConfig in clientStreamerConfigs.Values) //{ - // clientStreamerConfig.ReadBuffer?.ReadChannel.Writer.Complete(); + // clientStreamerConfig.Stream?.Channel.Writer.Complete(); //} IsFailed = true; stream.Close(); diff --git a/StreamMaster.Streams/Streams/StreamManager.cs b/StreamMaster.Streams/Streams/StreamManager.cs index cc384049b..8630ed515 100644 --- a/StreamMaster.Streams/Streams/StreamManager.cs +++ b/StreamMaster.Streams/Streams/StreamManager.cs @@ -119,14 +119,14 @@ private async void StreamHandler_OnStreamingStoppedEvent(object? sender, StreamH // //foreach (Guid clientId in streamHandler.GetClientStreamerClientIds()) // //{ // // IClientStreamerConfiguration? clientStreamerConfiguration = await clientStreamerManager.GetClientStreamerConfiguration(clientId); - // // if (clientStreamerConfiguration != null && clientStreamerConfiguration.ReadBuffer != null) + // // if (clientStreamerConfiguration != null && clientStreamerConfiguration.Stream != null) // // { // // long _lastReadIndex = streamHandler.CircularRingBuffer.GetNextReadIndex(); // // //if (_lastReadIndex > StreamHandler.ChunkSize) // // //{ // // // _lastReadIndex -= StreamHandler.ChunkSize; // // //} - // // clientStreamerConfiguration.ReadBuffer.SetLastIndex(_lastReadIndex); + // // clientStreamerConfiguration.Stream.SetLastIndex(_lastReadIndex); // // } // //} //} diff --git a/build_docker.ps1 b/build_docker.ps1 index f270c0b16..9c0ac00d8 100644 --- a/build_docker.ps1 +++ b/build_docker.ps1 @@ -25,6 +25,7 @@ function Write-StringToFile { Write-Host "An error occurred: $_" } } + function Read-StringFromFile { param ( [string]$Path @@ -40,6 +41,30 @@ function Read-StringFromFile { } } +function Copy-File { + param ( + [Parameter(Mandatory=$true)] + [string]$sourcePath, + + [Parameter(Mandatory=$true)] + [string]$destinationPath + ) + + try { + # Check if the source file exists + if (-Not (Test-Path -Path $sourcePath -PathType Leaf)) { + Write-Host "Source file does not exist: '$sourcePath'" + return + } + + # Copy the file to the destination + Copy-Item -Path $sourcePath -Destination $destinationPath -ErrorAction Stop + Write-Host "File copied successfully from '$sourcePath' to '$destinationPath'." + } catch { + Write-Host "An error occurred while copying the file: $_" + } +} + function Main { Set-EnvironmentVariables @@ -52,18 +77,21 @@ function Main { } if (-not $SkipRelease -and -not $PrintCommands) { - if ( $BuildProd -and -not $SkipMainBuild) { - npx semantic-release # -e release.gh + + if ( $BuildProd ) { #} -and -not $SkipMainBuild) { + Copy-File -sourcePath "release.config.release.cjs" -destinationPath "release.config.cjs" } else { - npx semantic-release + Copy-File -sourcePath "release.config.norelease.cjs" -destinationPath "release.config.cjs" } + + npx semantic-release } # DownloadFiles $imageName = "docker.io/senexcrenshaw/streammaster" - $buildName = $imageName + "-builds" + $buildName = "streammaster-builds" $result = Get-AssemblyInfo -assemblyInfoPath "./StreamMaster.API/AssemblyInfo.cs" $processedAssemblyInfo = ProcessAssemblyInfo $result @@ -71,14 +99,14 @@ function Main { if ($BuildBase -or $BuildAll) { $dockerFile = "Dockerfile.base" $global:tags = @("$("${buildName}:"+$processedAssemblyInfo.BranchNameRevision)-base") - BuildImage -result $processedAssemblyInfo -imageName $buildName -dockerFile $dockerFile + BuildImage -result $processedAssemblyInfo -imageName $buildName -dockerFile $dockerFile -pull $true Write-StringToFile -Path "basever" -Content $processedAssemblyInfo.BranchNameRevision } if ($BuildBuild -or $BuildAll) { $dockerFile = "Dockerfile.build" $global:tags = @("$("${buildName}:"+$processedAssemblyInfo.BranchNameRevision)-build") - BuildImage -result $processedAssemblyInfo -imageName $buildName -dockerFile $dockerFile + BuildImage -result $processedAssemblyInfo -imageName $buildName -dockerFile $dockerFile -pull $true Write-StringToFile -Path "buildver" -Content $processedAssemblyInfo.BranchNameRevision } @@ -110,7 +138,7 @@ function Main { Add-ContentAtTop -filePath $dockerFile -contentArray $contentArray $global:tags = DetermineTags -result $processedAssemblyInfo -imageName $imageName - BuildImage -result $processedAssemblyInfo -imageName $imageName -dockerFile $dockerFile + BuildImage -result $processedAssemblyInfo -imageName $imageName -dockerFile $dockerFile -push $true } } Function Add-ContentAtTop { @@ -214,7 +242,13 @@ function BuildImage { $dockerFile, [Parameter(Mandatory = $true)] - [string]$imageName + [string]$imageName, + + [Parameter()] + [bool]$push = $false, + + [Parameter()] + [bool]$pull = $false ) @@ -223,7 +257,18 @@ function BuildImage { $global:tags | ForEach-Object { Write-Host $_ } # Construct the Docker build command using the tags and the specified Dockerfile - $buildCommand = "docker buildx build --pull --platform ""linux/amd64,linux/arm64"" -f ./$dockerFile . --push" + $buildCommand = "docker buildx build " + if ( $pull) { + $buildCommand += " --pull" + } + $buildCommand += " --platform ""linux/amd64,linux/arm64"" -f ./$dockerFile ." + + if ( $push) { + $buildCommand += " --push" + } + else { + $buildCommand += " --load" + } foreach ($tag in $global:tags) { $buildCommand += " --tag=$tag" } diff --git a/release.config.norelease.cjs b/release.config.norelease.cjs new file mode 100644 index 000000000..b4cb58590 --- /dev/null +++ b/release.config.norelease.cjs @@ -0,0 +1,51 @@ +/** + * @type {import('semantic-release').GlobalConfig} + */ + +module.exports = { + branches: [ + // "+([0-9])?(.{+([0-9]),x}).x", + { + channel: "main", + name: "main", + prerelease: false + }, + { + name: "!main", + prerelease: true + } + ], + ci: false, + debug: false, + dryRun: false, + plugins: [ + [ + "@semantic-release/commit-analyzer", + { + preset: "angular", + releaseRules: [ + { type: "docs", scope: "README", release: "patch" }, + { type: "refactor", release: "minor" }, + { type: "style", release: "patch" }, + // { type: "build", release: "false" }, + { scope: "no-release", release: false }, + { type: "update", release: "patch" } + ], + parserOpts: { + noteKeywords: ["BREAKING CHANGE", "BREAKING CHANGES"] + } + } + ], + [ + "@semantic-release/exec", + { + verifyConditionsCmd: ":", + publishCmd: [ + "node updateAssemblyInfo.js ${nextRelease.version} ${nextRelease.gitHead} ${nextRelease.channel}", + "git add ./StreamMaster.API/AssemblyInfo.cs", + 'git diff-index --quiet HEAD || git commit -m "chore: update AssemblyInfo.cs to version ${nextRelease.version}"' + ].join(" && ") + } + ] + ] +}; diff --git a/release.config.cjs b/release.config.release.cjs similarity index 93% rename from release.config.cjs rename to release.config.release.cjs index 0597b3886..0cb960606 100644 --- a/release.config.cjs +++ b/release.config.release.cjs @@ -27,7 +27,8 @@ module.exports = { { type: "docs", scope: "README", release: "patch" }, { type: "refactor", release: "minor" }, { type: "style", release: "patch" }, - { type: "build", release: "false" }, + // { type: "build", release: "false" }, + { scope: "no-release", release: false }, { type: "update", release: "patch" } ], parserOpts: {