From 9cedfbcdd9b4c168ba64de1e2af71d348422ed0b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Santos=20Garrido?= Date: Sat, 18 Nov 2023 21:52:48 +0100 Subject: [PATCH] Use Discord heartbeat interval in audio. (#2765) Reduce heartbeat interval to ensure Discord receives it within range. Refactor some AudioClient code. Co-authored-by: Quin Lynch <49576606+quinchs@users.noreply.github.com> --- src/Discord.Net.Core/DiscordConfig.cs | 9 ++ .../Audio/AudioClient.cs | 86 +++++++++++-------- .../DiscordSocketClient.cs | 5 +- 3 files changed, 62 insertions(+), 38 deletions(-) diff --git a/src/Discord.Net.Core/DiscordConfig.cs b/src/Discord.Net.Core/DiscordConfig.cs index 7386464f92..ac703a0015 100644 --- a/src/Discord.Net.Core/DiscordConfig.cs +++ b/src/Discord.Net.Core/DiscordConfig.cs @@ -238,6 +238,15 @@ public class DiscordConfig /// public const int MaxApplicationTagCount = 5; + /// + /// Returns the factor to reduce the heartbeat interval. + /// + /// + /// If a heartbeat takes longer than the interval estimated by Discord, the connection will be closed. + /// This factor is used to reduce the interval and ensure that Discord will get the heartbeat within the estimated interval. + /// + internal const double HeartbeatIntervalFactor = 0.9; + /// /// Returns the maximum length of a voice channel status. /// diff --git a/src/Discord.Net.WebSocket/Audio/AudioClient.cs b/src/Discord.Net.WebSocket/Audio/AudioClient.cs index 61d1a0f01f..1bd4d434e5 100644 --- a/src/Discord.Net.WebSocket/Audio/AudioClient.cs +++ b/src/Discord.Net.WebSocket/Audio/AudioClient.cs @@ -18,7 +18,10 @@ namespace Discord.Audio //TODO: Add audio reconnecting internal partial class AudioClient : IAudioClient { - internal struct StreamPair + private static readonly int ConnectionTimeoutMs = 30000; // 30 seconds + private static readonly int KeepAliveIntervalMs = 5000; // 5 seconds + + private struct StreamPair { public AudioInStream Reader; public AudioOutStream Writer; @@ -40,6 +43,7 @@ public StreamPair(AudioInStream reader, AudioOutStream writer) private readonly ConcurrentDictionary _streams; private Task _heartbeatTask, _keepaliveTask; + private int _heartbeatInterval; private long _lastMessageTime; private string _url, _sessionId, _token; private ulong _userId; @@ -71,7 +75,7 @@ internal AudioClient(SocketGuild guild, int clientId, ulong channelId) ApiClient.ReceivedPacket += ProcessPacketAsync; _stateLock = new SemaphoreSlim(1, 1); - _connection = new ConnectionManager(_stateLock, _audioLogger, 30000, + _connection = new ConnectionManager(_stateLock, _audioLogger, ConnectionTimeoutMs, OnConnectingAsync, OnDisconnectingAsync, x => ApiClient.Disconnected += x); _connection.Connected += () => _connectedEvent.InvokeAsync(); _connection.Disconnected += (ex, recon) => _disconnectedEvent.InvokeAsync(ex); @@ -113,8 +117,8 @@ public async Task StopAsync() private async Task OnConnectingAsync() { await _audioLogger.DebugAsync("Connecting ApiClient").ConfigureAwait(false); - await ApiClient.ConnectAsync("wss://" + _url + "?v=" + DiscordConfig.VoiceAPIVersion).ConfigureAwait(false); - await _audioLogger.DebugAsync("Listening on port " + ApiClient.UdpPort).ConfigureAwait(false); + await ApiClient.ConnectAsync($"wss://{_url}?v={DiscordConfig.VoiceAPIVersion}").ConfigureAwait(false); + await _audioLogger.DebugAsync($"Listening on port {ApiClient.UdpPort}").ConfigureAwait(false); await _audioLogger.DebugAsync("Sending Identity").ConfigureAwait(false); await ApiClient.SendIdentityAsync(_userId, _sessionId, _token).ConfigureAwait(false); @@ -128,13 +132,13 @@ private async Task OnDisconnectingAsync(Exception ex) //Wait for tasks to complete await _audioLogger.DebugAsync("Waiting for heartbeater").ConfigureAwait(false); - var heartbeatTask = _heartbeatTask; - if (heartbeatTask != null) - await heartbeatTask.ConfigureAwait(false); + + if (_heartbeatTask != null) + await _heartbeatTask.ConfigureAwait(false); _heartbeatTask = null; - var keepaliveTask = _keepaliveTask; - if (keepaliveTask != null) - await keepaliveTask.ConfigureAwait(false); + + if (_keepaliveTask != null) + await _keepaliveTask.ConfigureAwait(false); _keepaliveTask = null; while (_heartbeatTimes.TryDequeue(out _)) @@ -194,11 +198,12 @@ internal AudioInStream GetInputStream(ulong id) { if (_streams.TryGetValue(id, out StreamPair streamPair)) return streamPair.Reader; + return null; } internal async Task RemoveInputStreamAsync(ulong userId) { - if (_streams.TryRemove(userId, out var pair)) + if (_streams.TryRemove(userId, out StreamPair pair)) { await _streamDestroyedEvent.InvokeAsync(userId).ConfigureAwait(false); pair.Reader.Dispose(); @@ -236,8 +241,7 @@ private async Task ProcessMessageAsync(VoiceOpCode opCode, object payload) ApiClient.SetUdpEndpoint(data.Ip, data.Port); await ApiClient.SendDiscoveryAsync(_ssrc).ConfigureAwait(false); - - _heartbeatTask = RunHeartbeatAsync(41250, _connection.CancelToken); + _heartbeatTask = RunHeartbeatAsync(_heartbeatInterval, _connection.CancelToken); } break; case VoiceOpCode.SessionDescription: @@ -250,10 +254,10 @@ private async Task ProcessMessageAsync(VoiceOpCode opCode, object payload) SecretKey = data.SecretKey; _isSpeaking = false; - await ApiClient.SendSetSpeaking(false).ConfigureAwait(false); - _keepaliveTask = RunKeepaliveAsync(5000, _connection.CancelToken); + await ApiClient.SendSetSpeaking(_isSpeaking).ConfigureAwait(false); + _keepaliveTask = RunKeepaliveAsync(_connection.CancelToken); - var _ = _connection.CompleteAsync(); + _ = _connection.CompleteAsync(); } break; case VoiceOpCode.HeartbeatAck: @@ -270,6 +274,14 @@ private async Task ProcessMessageAsync(VoiceOpCode opCode, object payload) } } break; + case VoiceOpCode.Hello: + { + await _audioLogger.DebugAsync("Received Hello").ConfigureAwait(false); + var data = (payload as JToken).ToObject(_serializer); + + _heartbeatInterval = data.HeartbeatInterval; + } + break; case VoiceOpCode.Speaking: { await _audioLogger.DebugAsync("Received Speaking").ConfigureAwait(false); @@ -291,13 +303,12 @@ private async Task ProcessMessageAsync(VoiceOpCode opCode, object payload) break; default: await _audioLogger.WarningAsync($"Unknown OpCode ({opCode})").ConfigureAwait(false); - return; + break; } } catch (Exception ex) { await _audioLogger.ErrorAsync($"Error handling {opCode}", ex).ConfigureAwait(false); - return; } } private async Task ProcessPacketAsync(byte[] packet) @@ -358,29 +369,28 @@ private async Task ProcessPacketAsync(byte[] packet) } else { - if (!RTPReadStream.TryReadSsrc(packet, 0, out var ssrc)) + if (!RTPReadStream.TryReadSsrc(packet, 0, out uint ssrc)) { await _audioLogger.DebugAsync("Malformed Frame").ConfigureAwait(false); - return; } - if (!_ssrcMap.TryGetValue(ssrc, out var userId)) + else if (!_ssrcMap.TryGetValue(ssrc, out ulong userId)) { await _audioLogger.DebugAsync($"Unknown SSRC {ssrc}").ConfigureAwait(false); - return; } - if (!_streams.TryGetValue(userId, out var pair)) + else if (!_streams.TryGetValue(userId, out StreamPair pair)) { await _audioLogger.DebugAsync($"Unknown User {userId}").ConfigureAwait(false); - return; } - try + else { - await pair.Writer.WriteAsync(packet, 0, packet.Length).ConfigureAwait(false); - } - catch (Exception ex) - { - await _audioLogger.DebugAsync("Malformed Frame", ex).ConfigureAwait(false); - return; + try + { + await pair.Writer.WriteAsync(packet, 0, packet.Length).ConfigureAwait(false); + } + catch (Exception ex) + { + await _audioLogger.DebugAsync("Malformed Frame", ex).ConfigureAwait(false); + } } //await _audioLogger.DebugAsync($"Received {packet.Length} bytes from user {userId}").ConfigureAwait(false); } @@ -389,19 +399,20 @@ private async Task ProcessPacketAsync(byte[] packet) catch (Exception ex) { await _audioLogger.WarningAsync("Failed to process UDP packet", ex).ConfigureAwait(false); - return; } } private async Task RunHeartbeatAsync(int intervalMillis, CancellationToken cancelToken) { + int delayInterval = (int)(intervalMillis * DiscordConfig.HeartbeatIntervalFactor); + // TODO: Clean this up when Discord's session patch is live try { await _audioLogger.DebugAsync("Heartbeat Started").ConfigureAwait(false); while (!cancelToken.IsCancellationRequested) { - var now = Environment.TickCount; + int now = Environment.TickCount; //Did server respond to our last heartbeat? if (_heartbeatTimes.Count != 0 && (now - _lastMessageTime) > intervalMillis && @@ -421,7 +432,8 @@ private async Task RunHeartbeatAsync(int intervalMillis, CancellationToken cance await _audioLogger.WarningAsync("Failed to send heartbeat", ex).ConfigureAwait(false); } - await Task.Delay(intervalMillis, cancelToken).ConfigureAwait(false); + int delay = Math.Max(0, delayInterval - Latency); + await Task.Delay(delay, cancelToken).ConfigureAwait(false); } await _audioLogger.DebugAsync("Heartbeat Stopped").ConfigureAwait(false); } @@ -434,14 +446,14 @@ private async Task RunHeartbeatAsync(int intervalMillis, CancellationToken cance await _audioLogger.ErrorAsync("Heartbeat Errored", ex).ConfigureAwait(false); } } - private async Task RunKeepaliveAsync(int intervalMillis, CancellationToken cancelToken) + private async Task RunKeepaliveAsync(CancellationToken cancelToken) { try { await _audioLogger.DebugAsync("Keepalive Started").ConfigureAwait(false); while (!cancelToken.IsCancellationRequested) { - var now = Environment.TickCount; + int now = Environment.TickCount; try { @@ -454,7 +466,7 @@ private async Task RunKeepaliveAsync(int intervalMillis, CancellationToken cance await _audioLogger.WarningAsync("Failed to send keepalive", ex).ConfigureAwait(false); } - await Task.Delay(intervalMillis, cancelToken).ConfigureAwait(false); + await Task.Delay(KeepAliveIntervalMs, cancelToken).ConfigureAwait(false); } await _audioLogger.DebugAsync("Keepalive Stopped").ConfigureAwait(false); } diff --git a/src/Discord.Net.WebSocket/DiscordSocketClient.cs b/src/Discord.Net.WebSocket/DiscordSocketClient.cs index 00aaa7d602..d96ca54945 100644 --- a/src/Discord.Net.WebSocket/DiscordSocketClient.cs +++ b/src/Discord.Net.WebSocket/DiscordSocketClient.cs @@ -3200,6 +3200,8 @@ private async Task ProcessMessageAsync(GatewayOpCode opCode, int? seq, string ty private async Task RunHeartbeatAsync(int intervalMillis, CancellationToken cancelToken) { + int delayInterval = (int)(intervalMillis * DiscordConfig.HeartbeatIntervalFactor); + try { await _gatewayLogger.DebugAsync("Heartbeat Started").ConfigureAwait(false); @@ -3227,7 +3229,8 @@ private async Task RunHeartbeatAsync(int intervalMillis, CancellationToken cance await _gatewayLogger.WarningAsync("Heartbeat Errored", ex).ConfigureAwait(false); } - await Task.Delay(intervalMillis, cancelToken).ConfigureAwait(false); + int delay = Math.Max(0, delayInterval - Latency); + await Task.Delay(delay, cancelToken).ConfigureAwait(false); } await _gatewayLogger.DebugAsync("Heartbeat Stopped").ConfigureAwait(false); }