Skip to content

Commit

Permalink
prioritize sending ACKs over sending new DATAGRAM frames (#3544)
Browse files Browse the repository at this point in the history
  • Loading branch information
marten-seemann authored Sep 9, 2022
1 parent 62b8278 commit c57ea6f
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 32 deletions.
2 changes: 1 addition & 1 deletion connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -542,7 +542,7 @@ func (s *connection) preSetup() {
s.creationTime = now

s.windowUpdateQueue = newWindowUpdateQueue(s.streamsMap, s.connFlowController, s.framer.QueueControlFrame)
s.datagramQueue = newDatagramQueue(s.scheduleSending, s.logger)
s.datagramQueue = newDatagramQueue(s.scheduleSending, s.logger, s.version)
}

// run the connection main loop
Expand Down
36 changes: 28 additions & 8 deletions datagram_queue.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
package quic

import (
"sync"

"github.com/lucas-clemente/quic-go/internal/protocol"
"github.com/lucas-clemente/quic-go/internal/utils"
"github.com/lucas-clemente/quic-go/internal/wire"
)

type datagramQueue struct {
mx sync.Mutex
nextFrameSize protocol.ByteCount

sendQueue chan *wire.DatagramFrame
rcvQueue chan []byte

Expand All @@ -17,17 +22,20 @@ type datagramQueue struct {

dequeued chan struct{}

logger utils.Logger
logger utils.Logger
version protocol.VersionNumber
}

func newDatagramQueue(hasData func(), logger utils.Logger) *datagramQueue {
func newDatagramQueue(hasData func(), logger utils.Logger, v protocol.VersionNumber) *datagramQueue {
return &datagramQueue{
hasData: hasData,
sendQueue: make(chan *wire.DatagramFrame, 1),
rcvQueue: make(chan []byte, protocol.DatagramRcvQueueLen),
dequeued: make(chan struct{}),
closed: make(chan struct{}),
logger: logger,
hasData: hasData,
sendQueue: make(chan *wire.DatagramFrame, 1),
nextFrameSize: protocol.InvalidByteCount,
rcvQueue: make(chan []byte, protocol.DatagramRcvQueueLen),
dequeued: make(chan struct{}),
closed: make(chan struct{}),
logger: logger,
version: v,
}
}

Expand All @@ -36,6 +44,9 @@ func newDatagramQueue(hasData func(), logger utils.Logger) *datagramQueue {
func (h *datagramQueue) AddAndWait(f *wire.DatagramFrame) error {
select {
case h.sendQueue <- f:
h.mx.Lock()
h.nextFrameSize = f.Length(h.version)
h.mx.Unlock()
h.hasData()
case <-h.closed:
return h.closeErr
Expand All @@ -53,13 +64,22 @@ func (h *datagramQueue) AddAndWait(f *wire.DatagramFrame) error {
func (h *datagramQueue) Get() *wire.DatagramFrame {
select {
case f := <-h.sendQueue:
h.mx.Lock()
h.nextFrameSize = protocol.InvalidByteCount
h.mx.Unlock()
h.dequeued <- struct{}{}
return f
default:
return nil
}
}

func (h *datagramQueue) NextFrameSize() protocol.ByteCount {
h.mx.Lock()
defer h.mx.Unlock()
return h.nextFrameSize
}

// HandleDatagramFrame handles a received DATAGRAM frame.
func (h *datagramQueue) HandleDatagramFrame(f *wire.DatagramFrame) {
data := make([]byte, len(f.Data))
Expand Down
10 changes: 8 additions & 2 deletions datagram_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package quic
import (
"errors"

"github.com/lucas-clemente/quic-go/internal/protocol"
"github.com/lucas-clemente/quic-go/internal/utils"
"github.com/lucas-clemente/quic-go/internal/wire"

Expand All @@ -18,25 +19,30 @@ var _ = Describe("Datagram Queue", func() {
queued = make(chan struct{}, 100)
queue = newDatagramQueue(func() {
queued <- struct{}{}
}, utils.DefaultLogger)
}, utils.DefaultLogger, protocol.Version1)
})

Context("sending", func() {
It("returns nil when there's no datagram to send", func() {
Expect(queue.NextFrameSize()).To(Equal(protocol.InvalidByteCount))
Expect(queue.Get()).To(BeNil())
})

It("queues a datagram", func() {
done := make(chan struct{})
frame := &wire.DatagramFrame{Data: []byte("foobar")}
go func() {
defer GinkgoRecover()
defer close(done)
Expect(queue.AddAndWait(&wire.DatagramFrame{Data: []byte("foobar")})).To(Succeed())
Expect(queue.AddAndWait(frame)).To(Succeed())
}()

Eventually(queued).Should(HaveLen(1))
Consistently(done).ShouldNot(BeClosed())
l := queue.NextFrameSize()
f := queue.Get()
Expect(l).To(Equal(f.Length(protocol.Version1)))
Expect(queue.NextFrameSize()).To(Equal(protocol.InvalidByteCount))
Expect(f).ToNot(BeNil())
Expect(f.Data).To(Equal([]byte("foobar")))
Eventually(done).Should(BeClosed())
Expand Down
43 changes: 23 additions & 20 deletions packet_packer.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,18 +555,18 @@ func (p *packetPacker) maybeGetAppDataPacketFor0RTT(sealer sealer, maxPacketSize

hdr := p.getLongHeader(protocol.Encryption0RTT)
maxPayloadSize := maxPacketSize - hdr.GetLength(p.version) - protocol.ByteCount(sealer.Overhead())
payload := p.maybeGetAppDataPacketWithEncLevel(maxPayloadSize, false)
payload := p.maybeGetAppDataPacket(maxPayloadSize, false)
return hdr, payload
}

func (p *packetPacker) maybeGetShortHeaderPacket(sealer handshake.ShortHeaderSealer, maxPacketSize, currentSize protocol.ByteCount) (*wire.ExtendedHeader, *payload) {
hdr := p.getShortHeader(sealer.KeyPhase())
maxPayloadSize := maxPacketSize - hdr.GetLength(p.version) - protocol.ByteCount(sealer.Overhead())
payload := p.maybeGetAppDataPacketWithEncLevel(maxPayloadSize, currentSize == 0)
payload := p.maybeGetAppDataPacket(maxPayloadSize, currentSize == 0)
return hdr, payload
}

func (p *packetPacker) maybeGetAppDataPacketWithEncLevel(maxPayloadSize protocol.ByteCount, ackAllowed bool) *payload {
func (p *packetPacker) maybeGetAppDataPacket(maxPayloadSize protocol.ByteCount, ackAllowed bool) *payload {
payload := p.composeNextPacket(maxPayloadSize, ackAllowed)

// check if we have anything to send
Expand All @@ -593,32 +593,35 @@ func (p *packetPacker) maybeGetAppDataPacketWithEncLevel(maxPayloadSize protocol
func (p *packetPacker) composeNextPacket(maxFrameSize protocol.ByteCount, ackAllowed bool) *payload {
payload := &payload{frames: make([]ackhandler.Frame, 0, 1)}

var hasDatagram bool
hasData := p.framer.HasData()
hasRetransmission := p.retransmissionQueue.HasAppData()

var hasAck bool
if ackAllowed {
if ack := p.acks.GetAckFrame(protocol.Encryption1RTT, !hasRetransmission && !hasData); ack != nil {
payload.ack = ack
payload.length += ack.Length(p.version)
hasAck = true
}
}

if p.datagramQueue != nil {
if datagram := p.datagramQueue.Get(); datagram != nil {
size := p.datagramQueue.NextFrameSize()
if size > 0 && size <= maxFrameSize-payload.length {
datagram := p.datagramQueue.Get()
if datagram == nil || datagram.Length(p.version) != size {
panic("packet packer BUG: inconsistent DATAGRAM frame length")
}
payload.frames = append(payload.frames, ackhandler.Frame{
Frame: datagram,
// set it to a no-op. Then we won't set the default callback, which would retransmit the frame.
OnLost: func(wire.Frame) {},
})
payload.length += datagram.Length(p.version)
hasDatagram = true
}
}

var ack *wire.AckFrame
hasData := p.framer.HasData()
hasRetransmission := p.retransmissionQueue.HasAppData()
// TODO: make sure ACKs are sent when a lot of DATAGRAMs are queued
if !hasDatagram && ackAllowed {
ack = p.acks.GetAckFrame(protocol.Encryption1RTT, !hasRetransmission && !hasData)
if ack != nil {
payload.ack = ack
payload.length += ack.Length(p.version)
}
}

if ack == nil && !hasData && !hasRetransmission {
if hasAck && !hasData && !hasRetransmission {
return payload
}

Expand Down Expand Up @@ -675,7 +678,7 @@ func (p *packetPacker) MaybePackProbePacket(encLevel protocol.EncryptionLevel) (
}
sealer = oneRTTSealer
hdr = p.getShortHeader(oneRTTSealer.KeyPhase())
payload = p.maybeGetAppDataPacketWithEncLevel(p.maxPacketSize-protocol.ByteCount(sealer.Overhead())-hdr.GetLength(p.version), true)
payload = p.maybeGetAppDataPacket(p.maxPacketSize-protocol.ByteCount(sealer.Overhead())-hdr.GetLength(p.version), true)
default:
panic("unknown encryption level")
}
Expand Down
33 changes: 32 additions & 1 deletion packet_packer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ var _ = Describe("Packet packer", func() {
ackFramer = NewMockAckFrameSource(mockCtrl)
sealingManager = NewMockSealingManager(mockCtrl)
pnManager = mockackhandler.NewMockSentPacketHandler(mockCtrl)
datagramQueue = newDatagramQueue(func() {}, utils.DefaultLogger)
datagramQueue = newDatagramQueue(func() {}, utils.DefaultLogger, version)

packer = newPacketPacker(
protocol.ParseConnectionID([]byte{1, 2, 3, 4, 5, 6, 7, 8}),
Expand Down Expand Up @@ -554,6 +554,7 @@ var _ = Describe("Packet packer", func() {
})

It("packs DATAGRAM frames", func() {
ackFramer.EXPECT().GetAckFrame(protocol.Encryption1RTT, true)
pnManager.EXPECT().PeekPacketNumber(protocol.Encryption1RTT).Return(protocol.PacketNumber(0x42), protocol.PacketNumberLen2)
pnManager.EXPECT().PopPacketNumber(protocol.Encryption1RTT).Return(protocol.PacketNumber(0x42))
sealingManager.EXPECT().Get1RTTSealer().Return(getSealer(), nil)
Expand All @@ -580,6 +581,36 @@ var _ = Describe("Packet packer", func() {
Eventually(done).Should(BeClosed())
})

It("doesn't pack a DATAGRAM frame if the ACK frame is too large", func() {
ackFramer.EXPECT().GetAckFrame(protocol.Encryption1RTT, true).Return(&wire.AckFrame{AckRanges: []wire.AckRange{{Largest: 100}}})
pnManager.EXPECT().PeekPacketNumber(protocol.Encryption1RTT).Return(protocol.PacketNumber(0x42), protocol.PacketNumberLen2)
pnManager.EXPECT().PopPacketNumber(protocol.Encryption1RTT).Return(protocol.PacketNumber(0x42))
sealingManager.EXPECT().Get1RTTSealer().Return(getSealer(), nil)
f := &wire.DatagramFrame{
DataLenPresent: true,
Data: make([]byte, maxPacketSize-10),
}
done := make(chan struct{})
go func() {
defer GinkgoRecover()
defer close(done)
datagramQueue.AddAndWait(f)
}()
// make sure the DATAGRAM has actually been queued
time.Sleep(scaleDuration(20 * time.Millisecond))

framer.EXPECT().HasData()
p, err := packer.PackPacket()
Expect(p).ToNot(BeNil())
Expect(err).ToNot(HaveOccurred())
Expect(p.ack).ToNot(BeNil())
Expect(p.frames).To(BeEmpty())
Expect(p.buffer.Data).ToNot(BeEmpty())
Expect(done).ToNot(BeClosed())
datagramQueue.CloseWithError(nil)
Eventually(done).Should(BeClosed())
})

It("accounts for the space consumed by control frames", func() {
pnManager.EXPECT().PeekPacketNumber(protocol.Encryption1RTT).Return(protocol.PacketNumber(0x42), protocol.PacketNumberLen2)
sealingManager.EXPECT().Get1RTTSealer().Return(getSealer(), nil)
Expand Down

0 comments on commit c57ea6f

Please sign in to comment.