Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

prioritize sending ACKs over sending new DATAGRAM frames #3544

Merged
merged 1 commit into from
Sep 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -542,7 +542,7 @@ func (s *connection) preSetup() {
s.creationTime = now

s.windowUpdateQueue = newWindowUpdateQueue(s.streamsMap, s.connFlowController, s.framer.QueueControlFrame)
s.datagramQueue = newDatagramQueue(s.scheduleSending, s.logger)
s.datagramQueue = newDatagramQueue(s.scheduleSending, s.logger, s.version)
}

// run the connection main loop
Expand Down
36 changes: 28 additions & 8 deletions datagram_queue.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
package quic

import (
"sync"

"github.com/lucas-clemente/quic-go/internal/protocol"
"github.com/lucas-clemente/quic-go/internal/utils"
"github.com/lucas-clemente/quic-go/internal/wire"
)

type datagramQueue struct {
mx sync.Mutex
nextFrameSize protocol.ByteCount

sendQueue chan *wire.DatagramFrame
rcvQueue chan []byte

Expand All @@ -17,17 +22,20 @@ type datagramQueue struct {

dequeued chan struct{}

logger utils.Logger
logger utils.Logger
version protocol.VersionNumber
}

func newDatagramQueue(hasData func(), logger utils.Logger) *datagramQueue {
func newDatagramQueue(hasData func(), logger utils.Logger, v protocol.VersionNumber) *datagramQueue {
return &datagramQueue{
hasData: hasData,
sendQueue: make(chan *wire.DatagramFrame, 1),
rcvQueue: make(chan []byte, protocol.DatagramRcvQueueLen),
dequeued: make(chan struct{}),
closed: make(chan struct{}),
logger: logger,
hasData: hasData,
sendQueue: make(chan *wire.DatagramFrame, 1),
nextFrameSize: protocol.InvalidByteCount,
rcvQueue: make(chan []byte, protocol.DatagramRcvQueueLen),
dequeued: make(chan struct{}),
closed: make(chan struct{}),
logger: logger,
version: v,
}
}

Expand All @@ -36,6 +44,9 @@ func newDatagramQueue(hasData func(), logger utils.Logger) *datagramQueue {
func (h *datagramQueue) AddAndWait(f *wire.DatagramFrame) error {
select {
case h.sendQueue <- f:
h.mx.Lock()
h.nextFrameSize = f.Length(h.version)
h.mx.Unlock()
h.hasData()
case <-h.closed:
return h.closeErr
Expand All @@ -53,13 +64,22 @@ func (h *datagramQueue) AddAndWait(f *wire.DatagramFrame) error {
func (h *datagramQueue) Get() *wire.DatagramFrame {
select {
case f := <-h.sendQueue:
h.mx.Lock()
h.nextFrameSize = protocol.InvalidByteCount
h.mx.Unlock()
h.dequeued <- struct{}{}
return f
default:
return nil
}
}

func (h *datagramQueue) NextFrameSize() protocol.ByteCount {
h.mx.Lock()
defer h.mx.Unlock()
return h.nextFrameSize
}

// HandleDatagramFrame handles a received DATAGRAM frame.
func (h *datagramQueue) HandleDatagramFrame(f *wire.DatagramFrame) {
data := make([]byte, len(f.Data))
Expand Down
10 changes: 8 additions & 2 deletions datagram_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package quic
import (
"errors"

"github.com/lucas-clemente/quic-go/internal/protocol"
"github.com/lucas-clemente/quic-go/internal/utils"
"github.com/lucas-clemente/quic-go/internal/wire"

Expand All @@ -18,25 +19,30 @@ var _ = Describe("Datagram Queue", func() {
queued = make(chan struct{}, 100)
queue = newDatagramQueue(func() {
queued <- struct{}{}
}, utils.DefaultLogger)
}, utils.DefaultLogger, protocol.Version1)
})

Context("sending", func() {
It("returns nil when there's no datagram to send", func() {
Expect(queue.NextFrameSize()).To(Equal(protocol.InvalidByteCount))
Expect(queue.Get()).To(BeNil())
})

It("queues a datagram", func() {
done := make(chan struct{})
frame := &wire.DatagramFrame{Data: []byte("foobar")}
go func() {
defer GinkgoRecover()
defer close(done)
Expect(queue.AddAndWait(&wire.DatagramFrame{Data: []byte("foobar")})).To(Succeed())
Expect(queue.AddAndWait(frame)).To(Succeed())
}()

Eventually(queued).Should(HaveLen(1))
Consistently(done).ShouldNot(BeClosed())
l := queue.NextFrameSize()
f := queue.Get()
Expect(l).To(Equal(f.Length(protocol.Version1)))
Expect(queue.NextFrameSize()).To(Equal(protocol.InvalidByteCount))
Expect(f).ToNot(BeNil())
Expect(f.Data).To(Equal([]byte("foobar")))
Eventually(done).Should(BeClosed())
Expand Down
43 changes: 23 additions & 20 deletions packet_packer.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,18 +555,18 @@ func (p *packetPacker) maybeGetAppDataPacketFor0RTT(sealer sealer, maxPacketSize

hdr := p.getLongHeader(protocol.Encryption0RTT)
maxPayloadSize := maxPacketSize - hdr.GetLength(p.version) - protocol.ByteCount(sealer.Overhead())
payload := p.maybeGetAppDataPacketWithEncLevel(maxPayloadSize, false)
payload := p.maybeGetAppDataPacket(maxPayloadSize, false)
return hdr, payload
}

func (p *packetPacker) maybeGetShortHeaderPacket(sealer handshake.ShortHeaderSealer, maxPacketSize, currentSize protocol.ByteCount) (*wire.ExtendedHeader, *payload) {
hdr := p.getShortHeader(sealer.KeyPhase())
maxPayloadSize := maxPacketSize - hdr.GetLength(p.version) - protocol.ByteCount(sealer.Overhead())
payload := p.maybeGetAppDataPacketWithEncLevel(maxPayloadSize, currentSize == 0)
payload := p.maybeGetAppDataPacket(maxPayloadSize, currentSize == 0)
return hdr, payload
}

func (p *packetPacker) maybeGetAppDataPacketWithEncLevel(maxPayloadSize protocol.ByteCount, ackAllowed bool) *payload {
func (p *packetPacker) maybeGetAppDataPacket(maxPayloadSize protocol.ByteCount, ackAllowed bool) *payload {
payload := p.composeNextPacket(maxPayloadSize, ackAllowed)

// check if we have anything to send
Expand All @@ -593,32 +593,35 @@ func (p *packetPacker) maybeGetAppDataPacketWithEncLevel(maxPayloadSize protocol
func (p *packetPacker) composeNextPacket(maxFrameSize protocol.ByteCount, ackAllowed bool) *payload {
payload := &payload{frames: make([]ackhandler.Frame, 0, 1)}

var hasDatagram bool
hasData := p.framer.HasData()
hasRetransmission := p.retransmissionQueue.HasAppData()

var hasAck bool
if ackAllowed {
if ack := p.acks.GetAckFrame(protocol.Encryption1RTT, !hasRetransmission && !hasData); ack != nil {
payload.ack = ack
payload.length += ack.Length(p.version)
hasAck = true
}
}

if p.datagramQueue != nil {
if datagram := p.datagramQueue.Get(); datagram != nil {
size := p.datagramQueue.NextFrameSize()
if size > 0 && size <= maxFrameSize-payload.length {
datagram := p.datagramQueue.Get()
if datagram == nil || datagram.Length(p.version) != size {
panic("packet packer BUG: inconsistent DATAGRAM frame length")
}
payload.frames = append(payload.frames, ackhandler.Frame{
Frame: datagram,
// set it to a no-op. Then we won't set the default callback, which would retransmit the frame.
OnLost: func(wire.Frame) {},
})
payload.length += datagram.Length(p.version)
hasDatagram = true
}
}

var ack *wire.AckFrame
hasData := p.framer.HasData()
hasRetransmission := p.retransmissionQueue.HasAppData()
// TODO: make sure ACKs are sent when a lot of DATAGRAMs are queued
if !hasDatagram && ackAllowed {
ack = p.acks.GetAckFrame(protocol.Encryption1RTT, !hasRetransmission && !hasData)
if ack != nil {
payload.ack = ack
payload.length += ack.Length(p.version)
}
}

if ack == nil && !hasData && !hasRetransmission {
if hasAck && !hasData && !hasRetransmission {
return payload
}

Expand Down Expand Up @@ -675,7 +678,7 @@ func (p *packetPacker) MaybePackProbePacket(encLevel protocol.EncryptionLevel) (
}
sealer = oneRTTSealer
hdr = p.getShortHeader(oneRTTSealer.KeyPhase())
payload = p.maybeGetAppDataPacketWithEncLevel(p.maxPacketSize-protocol.ByteCount(sealer.Overhead())-hdr.GetLength(p.version), true)
payload = p.maybeGetAppDataPacket(p.maxPacketSize-protocol.ByteCount(sealer.Overhead())-hdr.GetLength(p.version), true)
default:
panic("unknown encryption level")
}
Expand Down
33 changes: 32 additions & 1 deletion packet_packer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ var _ = Describe("Packet packer", func() {
ackFramer = NewMockAckFrameSource(mockCtrl)
sealingManager = NewMockSealingManager(mockCtrl)
pnManager = mockackhandler.NewMockSentPacketHandler(mockCtrl)
datagramQueue = newDatagramQueue(func() {}, utils.DefaultLogger)
datagramQueue = newDatagramQueue(func() {}, utils.DefaultLogger, version)

packer = newPacketPacker(
protocol.ParseConnectionID([]byte{1, 2, 3, 4, 5, 6, 7, 8}),
Expand Down Expand Up @@ -554,6 +554,7 @@ var _ = Describe("Packet packer", func() {
})

It("packs DATAGRAM frames", func() {
ackFramer.EXPECT().GetAckFrame(protocol.Encryption1RTT, true)
pnManager.EXPECT().PeekPacketNumber(protocol.Encryption1RTT).Return(protocol.PacketNumber(0x42), protocol.PacketNumberLen2)
pnManager.EXPECT().PopPacketNumber(protocol.Encryption1RTT).Return(protocol.PacketNumber(0x42))
sealingManager.EXPECT().Get1RTTSealer().Return(getSealer(), nil)
Expand All @@ -580,6 +581,36 @@ var _ = Describe("Packet packer", func() {
Eventually(done).Should(BeClosed())
})

It("doesn't pack a DATAGRAM frame if the ACK frame is too large", func() {
ackFramer.EXPECT().GetAckFrame(protocol.Encryption1RTT, true).Return(&wire.AckFrame{AckRanges: []wire.AckRange{{Largest: 100}}})
pnManager.EXPECT().PeekPacketNumber(protocol.Encryption1RTT).Return(protocol.PacketNumber(0x42), protocol.PacketNumberLen2)
pnManager.EXPECT().PopPacketNumber(protocol.Encryption1RTT).Return(protocol.PacketNumber(0x42))
sealingManager.EXPECT().Get1RTTSealer().Return(getSealer(), nil)
f := &wire.DatagramFrame{
DataLenPresent: true,
Data: make([]byte, maxPacketSize-10),
}
done := make(chan struct{})
go func() {
defer GinkgoRecover()
defer close(done)
datagramQueue.AddAndWait(f)
}()
// make sure the DATAGRAM has actually been queued
time.Sleep(scaleDuration(20 * time.Millisecond))

framer.EXPECT().HasData()
p, err := packer.PackPacket()
Expect(p).ToNot(BeNil())
Expect(err).ToNot(HaveOccurred())
Expect(p.ack).ToNot(BeNil())
Expect(p.frames).To(BeEmpty())
Expect(p.buffer.Data).ToNot(BeEmpty())
Expect(done).ToNot(BeClosed())
datagramQueue.CloseWithError(nil)
Eventually(done).Should(BeClosed())
})

It("accounts for the space consumed by control frames", func() {
pnManager.EXPECT().PeekPacketNumber(protocol.Encryption1RTT).Return(protocol.PacketNumber(0x42), protocol.PacketNumberLen2)
sealingManager.EXPECT().Get1RTTSealer().Return(getSealer(), nil)
Expand Down