Skip to content

Commit

Permalink
Use Discord heartbeat interval in audio. (#2765)
Browse files Browse the repository at this point in the history
Reduce heartbeat interval to ensure Discord receives it within range.
Refactor some AudioClient code.

Co-authored-by: Quin Lynch <49576606+quinchs@users.noreply.github.com>
  • Loading branch information
josago97 and quinchs committed Nov 18, 2023
1 parent 8d5022a commit 9cedfbc
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 38 deletions.
9 changes: 9 additions & 0 deletions src/Discord.Net.Core/DiscordConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,15 @@ public class DiscordConfig
/// </summary>
public const int MaxApplicationTagCount = 5;

/// <summary>
/// Returns the factor to reduce the heartbeat interval.
/// </summary>
/// <remarks>
/// If a heartbeat takes longer than the interval estimated by Discord, the connection will be closed.
/// This factor is used to reduce the interval and ensure that Discord will get the heartbeat within the estimated interval.
/// </remarks>
internal const double HeartbeatIntervalFactor = 0.9;

/// <summary>
/// Returns the maximum length of a voice channel status.
/// </summary>
Expand Down
86 changes: 49 additions & 37 deletions src/Discord.Net.WebSocket/Audio/AudioClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ namespace Discord.Audio
//TODO: Add audio reconnecting
internal partial class AudioClient : IAudioClient
{
internal struct StreamPair
private static readonly int ConnectionTimeoutMs = 30000; // 30 seconds
private static readonly int KeepAliveIntervalMs = 5000; // 5 seconds

private struct StreamPair
{
public AudioInStream Reader;
public AudioOutStream Writer;
Expand All @@ -40,6 +43,7 @@ public StreamPair(AudioInStream reader, AudioOutStream writer)
private readonly ConcurrentDictionary<ulong, StreamPair> _streams;

private Task _heartbeatTask, _keepaliveTask;
private int _heartbeatInterval;
private long _lastMessageTime;
private string _url, _sessionId, _token;
private ulong _userId;
Expand Down Expand Up @@ -71,7 +75,7 @@ internal AudioClient(SocketGuild guild, int clientId, ulong channelId)
ApiClient.ReceivedPacket += ProcessPacketAsync;

_stateLock = new SemaphoreSlim(1, 1);
_connection = new ConnectionManager(_stateLock, _audioLogger, 30000,
_connection = new ConnectionManager(_stateLock, _audioLogger, ConnectionTimeoutMs,
OnConnectingAsync, OnDisconnectingAsync, x => ApiClient.Disconnected += x);
_connection.Connected += () => _connectedEvent.InvokeAsync();
_connection.Disconnected += (ex, recon) => _disconnectedEvent.InvokeAsync(ex);
Expand Down Expand Up @@ -113,8 +117,8 @@ public async Task StopAsync()
private async Task OnConnectingAsync()
{
await _audioLogger.DebugAsync("Connecting ApiClient").ConfigureAwait(false);
await ApiClient.ConnectAsync("wss://" + _url + "?v=" + DiscordConfig.VoiceAPIVersion).ConfigureAwait(false);
await _audioLogger.DebugAsync("Listening on port " + ApiClient.UdpPort).ConfigureAwait(false);
await ApiClient.ConnectAsync($"wss://{_url}?v={DiscordConfig.VoiceAPIVersion}").ConfigureAwait(false);
await _audioLogger.DebugAsync($"Listening on port {ApiClient.UdpPort}").ConfigureAwait(false);
await _audioLogger.DebugAsync("Sending Identity").ConfigureAwait(false);
await ApiClient.SendIdentityAsync(_userId, _sessionId, _token).ConfigureAwait(false);

Expand All @@ -128,13 +132,13 @@ private async Task OnDisconnectingAsync(Exception ex)

//Wait for tasks to complete
await _audioLogger.DebugAsync("Waiting for heartbeater").ConfigureAwait(false);
var heartbeatTask = _heartbeatTask;
if (heartbeatTask != null)
await heartbeatTask.ConfigureAwait(false);

if (_heartbeatTask != null)
await _heartbeatTask.ConfigureAwait(false);
_heartbeatTask = null;
var keepaliveTask = _keepaliveTask;
if (keepaliveTask != null)
await keepaliveTask.ConfigureAwait(false);

if (_keepaliveTask != null)
await _keepaliveTask.ConfigureAwait(false);
_keepaliveTask = null;

while (_heartbeatTimes.TryDequeue(out _))
Expand Down Expand Up @@ -194,11 +198,12 @@ internal AudioInStream GetInputStream(ulong id)
{
if (_streams.TryGetValue(id, out StreamPair streamPair))
return streamPair.Reader;

return null;
}
internal async Task RemoveInputStreamAsync(ulong userId)
{
if (_streams.TryRemove(userId, out var pair))
if (_streams.TryRemove(userId, out StreamPair pair))
{
await _streamDestroyedEvent.InvokeAsync(userId).ConfigureAwait(false);
pair.Reader.Dispose();
Expand Down Expand Up @@ -236,8 +241,7 @@ private async Task ProcessMessageAsync(VoiceOpCode opCode, object payload)
ApiClient.SetUdpEndpoint(data.Ip, data.Port);
await ApiClient.SendDiscoveryAsync(_ssrc).ConfigureAwait(false);


_heartbeatTask = RunHeartbeatAsync(41250, _connection.CancelToken);
_heartbeatTask = RunHeartbeatAsync(_heartbeatInterval, _connection.CancelToken);
}
break;
case VoiceOpCode.SessionDescription:
Expand All @@ -250,10 +254,10 @@ private async Task ProcessMessageAsync(VoiceOpCode opCode, object payload)

SecretKey = data.SecretKey;
_isSpeaking = false;
await ApiClient.SendSetSpeaking(false).ConfigureAwait(false);
_keepaliveTask = RunKeepaliveAsync(5000, _connection.CancelToken);
await ApiClient.SendSetSpeaking(_isSpeaking).ConfigureAwait(false);
_keepaliveTask = RunKeepaliveAsync(_connection.CancelToken);

var _ = _connection.CompleteAsync();
_ = _connection.CompleteAsync();
}
break;
case VoiceOpCode.HeartbeatAck:
Expand All @@ -270,6 +274,14 @@ private async Task ProcessMessageAsync(VoiceOpCode opCode, object payload)
}
}
break;
case VoiceOpCode.Hello:
{
await _audioLogger.DebugAsync("Received Hello").ConfigureAwait(false);
var data = (payload as JToken).ToObject<HelloEvent>(_serializer);

_heartbeatInterval = data.HeartbeatInterval;
}
break;
case VoiceOpCode.Speaking:
{
await _audioLogger.DebugAsync("Received Speaking").ConfigureAwait(false);
Expand All @@ -291,13 +303,12 @@ private async Task ProcessMessageAsync(VoiceOpCode opCode, object payload)
break;
default:
await _audioLogger.WarningAsync($"Unknown OpCode ({opCode})").ConfigureAwait(false);
return;
break;
}
}
catch (Exception ex)
{
await _audioLogger.ErrorAsync($"Error handling {opCode}", ex).ConfigureAwait(false);
return;
}
}
private async Task ProcessPacketAsync(byte[] packet)
Expand Down Expand Up @@ -358,29 +369,28 @@ private async Task ProcessPacketAsync(byte[] packet)
}
else
{
if (!RTPReadStream.TryReadSsrc(packet, 0, out var ssrc))
if (!RTPReadStream.TryReadSsrc(packet, 0, out uint ssrc))
{
await _audioLogger.DebugAsync("Malformed Frame").ConfigureAwait(false);
return;
}
if (!_ssrcMap.TryGetValue(ssrc, out var userId))
else if (!_ssrcMap.TryGetValue(ssrc, out ulong userId))
{
await _audioLogger.DebugAsync($"Unknown SSRC {ssrc}").ConfigureAwait(false);
return;
}
if (!_streams.TryGetValue(userId, out var pair))
else if (!_streams.TryGetValue(userId, out StreamPair pair))
{
await _audioLogger.DebugAsync($"Unknown User {userId}").ConfigureAwait(false);
return;
}
try
else
{
await pair.Writer.WriteAsync(packet, 0, packet.Length).ConfigureAwait(false);
}
catch (Exception ex)
{
await _audioLogger.DebugAsync("Malformed Frame", ex).ConfigureAwait(false);
return;
try
{
await pair.Writer.WriteAsync(packet, 0, packet.Length).ConfigureAwait(false);
}
catch (Exception ex)
{
await _audioLogger.DebugAsync("Malformed Frame", ex).ConfigureAwait(false);
}
}
//await _audioLogger.DebugAsync($"Received {packet.Length} bytes from user {userId}").ConfigureAwait(false);
}
Expand All @@ -389,19 +399,20 @@ private async Task ProcessPacketAsync(byte[] packet)
catch (Exception ex)
{
await _audioLogger.WarningAsync("Failed to process UDP packet", ex).ConfigureAwait(false);
return;
}
}

private async Task RunHeartbeatAsync(int intervalMillis, CancellationToken cancelToken)
{
int delayInterval = (int)(intervalMillis * DiscordConfig.HeartbeatIntervalFactor);

// TODO: Clean this up when Discord's session patch is live
try
{
await _audioLogger.DebugAsync("Heartbeat Started").ConfigureAwait(false);
while (!cancelToken.IsCancellationRequested)
{
var now = Environment.TickCount;
int now = Environment.TickCount;

//Did server respond to our last heartbeat?
if (_heartbeatTimes.Count != 0 && (now - _lastMessageTime) > intervalMillis &&
Expand All @@ -421,7 +432,8 @@ private async Task RunHeartbeatAsync(int intervalMillis, CancellationToken cance
await _audioLogger.WarningAsync("Failed to send heartbeat", ex).ConfigureAwait(false);
}

await Task.Delay(intervalMillis, cancelToken).ConfigureAwait(false);
int delay = Math.Max(0, delayInterval - Latency);
await Task.Delay(delay, cancelToken).ConfigureAwait(false);
}
await _audioLogger.DebugAsync("Heartbeat Stopped").ConfigureAwait(false);
}
Expand All @@ -434,14 +446,14 @@ private async Task RunHeartbeatAsync(int intervalMillis, CancellationToken cance
await _audioLogger.ErrorAsync("Heartbeat Errored", ex).ConfigureAwait(false);
}
}
private async Task RunKeepaliveAsync(int intervalMillis, CancellationToken cancelToken)
private async Task RunKeepaliveAsync(CancellationToken cancelToken)
{
try
{
await _audioLogger.DebugAsync("Keepalive Started").ConfigureAwait(false);
while (!cancelToken.IsCancellationRequested)
{
var now = Environment.TickCount;
int now = Environment.TickCount;

try
{
Expand All @@ -454,7 +466,7 @@ private async Task RunKeepaliveAsync(int intervalMillis, CancellationToken cance
await _audioLogger.WarningAsync("Failed to send keepalive", ex).ConfigureAwait(false);
}

await Task.Delay(intervalMillis, cancelToken).ConfigureAwait(false);
await Task.Delay(KeepAliveIntervalMs, cancelToken).ConfigureAwait(false);
}
await _audioLogger.DebugAsync("Keepalive Stopped").ConfigureAwait(false);
}
Expand Down
5 changes: 4 additions & 1 deletion src/Discord.Net.WebSocket/DiscordSocketClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3200,6 +3200,8 @@ private async Task ProcessMessageAsync(GatewayOpCode opCode, int? seq, string ty

private async Task RunHeartbeatAsync(int intervalMillis, CancellationToken cancelToken)
{
int delayInterval = (int)(intervalMillis * DiscordConfig.HeartbeatIntervalFactor);

try
{
await _gatewayLogger.DebugAsync("Heartbeat Started").ConfigureAwait(false);
Expand Down Expand Up @@ -3227,7 +3229,8 @@ private async Task RunHeartbeatAsync(int intervalMillis, CancellationToken cance
await _gatewayLogger.WarningAsync("Heartbeat Errored", ex).ConfigureAwait(false);
}

await Task.Delay(intervalMillis, cancelToken).ConfigureAwait(false);
int delay = Math.Max(0, delayInterval - Latency);
await Task.Delay(delay, cancelToken).ConfigureAwait(false);
}
await _gatewayLogger.DebugAsync("Heartbeat Stopped").ConfigureAwait(false);
}
Expand Down

0 comments on commit 9cedfbc

Please sign in to comment.