Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

quic: prioritise listen connections for reuse #2262

Merged
merged 9 commits into from
May 9, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 42 additions & 12 deletions p2p/transport/quicreuse/reuse.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,15 @@ type reuse struct {
routes routing.Router
unicast map[string] /* IP.String() */ map[int] /* port */ *reuseConn
// global contains connections that are listening on 0.0.0.0 / ::
global map[int]*reuseConn
global map[int]*reuseConn
globalDial map[int]*reuseConn
}

func newReuse() *reuse {
r := &reuse{
unicast: make(map[string]map[int]*reuseConn),
global: make(map[int]*reuseConn),
globalDial: make(map[int]*reuseConn),
closeChan: make(chan struct{}),
gcStopChan: make(chan struct{}),
}
Expand All @@ -95,6 +97,9 @@ func (r *reuse) gc() {
for _, conn := range r.global {
conn.Close()
}
for _, conn := range r.globalDial {
conn.Close()
}
for _, conns := range r.unicast {
for _, conn := range conns {
conn.Close()
Expand All @@ -119,6 +124,12 @@ func (r *reuse) gc() {
delete(r.global, key)
}
}
for key, conn := range r.globalDial {
if conn.ShouldGarbageCollect(now) {
conn.Close()
delete(r.globalDial, key)
}
}
for ukey, conns := range r.unicast {
for key, conn := range conns {
if conn.ShouldGarbageCollect(now) {
Expand Down Expand Up @@ -189,6 +200,11 @@ func (r *reuse) dialLocked(network string, source *net.IP) (*reuseConn, error) {
return conn, nil
}

// Use a connection we've previously dialed from
for _, conn := range r.globalDial {
return conn, nil
}

// We don't have a connection that we can use for dialing.
// Dial a new connection from a random port.
var addr *net.UDPAddr
Expand All @@ -203,29 +219,43 @@ func (r *reuse) dialLocked(network string, source *net.IP) (*reuseConn, error) {
return nil, err
}
rconn := newReuseConn(conn)
r.global[conn.LocalAddr().(*net.UDPAddr).Port] = rconn
r.globalDial[conn.LocalAddr().(*net.UDPAddr).Port] = rconn
return rconn, nil
}

func (r *reuse) Listen(network string, laddr *net.UDPAddr) (*reuseConn, error) {
conn, err := net.ListenUDP(network, laddr)
if err != nil {
return nil, err
r.mutex.Lock()
defer r.mutex.Unlock()

var rconn *reuseConn
var localAddr *net.UDPAddr

// reuse the connection if we've dialed out of this port already
if laddr.IP.IsUnspecified() {
if _, ok := r.globalDial[laddr.Port]; ok {
rconn = r.globalDial[laddr.Port]
localAddr = rconn.UDPConn.LocalAddr().(*net.UDPAddr)
}
}
if rconn == nil {
conn, err := net.ListenUDP(network, laddr)
if err != nil {
return nil, err
}
localAddr = conn.LocalAddr().(*net.UDPAddr)
rconn = newReuseConn(conn)
}
localAddr := conn.LocalAddr().(*net.UDPAddr)

rconn := newReuseConn(conn)
rconn.IncreaseCount()

r.mutex.Lock()
defer r.mutex.Unlock()

// Deal with listen on a global address
if localAddr.IP.IsUnspecified() {
// The kernel already checked that the laddr is not already listen
// so we need not check here (when we create ListenUDP).
r.global[localAddr.Port] = rconn
return rconn, err
// delete the entry from dial map in case we are reusing this connection
delete(r.globalDial, localAddr.Port)
return rconn, nil
}

// Deal with listen on a unicast address
Expand All @@ -239,7 +269,7 @@ func (r *reuse) Listen(network string, laddr *net.UDPAddr) (*reuseConn, error) {
// The kernel already checked that the laddr is not already listen
// so we need not check here (when we create ListenUDP).
r.unicast[localAddr.IP.String()][localAddr.Port] = rconn
return rconn, err
return rconn, nil
}

func (r *reuse) Close() error {
Expand Down
20 changes: 20 additions & 0 deletions p2p/transport/quicreuse/reuse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ func closeAllConns(reuse *reuse) {
conn.DecreaseCount()
}
}
for _, conn := range reuse.globalDial {
for conn.GetCount() > 0 {
conn.DecreaseCount()
}
}
for _, conns := range reuse.unicast {
for _, conn := range conns {
for conn.GetCount() > 0 {
Expand Down Expand Up @@ -110,6 +115,21 @@ func TestReuseConnectionWhenDialing(t *testing.T) {
require.Equal(t, conn.GetCount(), 2)
}

func TestReuseConnectionWhenListening(t *testing.T) {
reuse := newReuse()
cleanup(t, reuse)

raddr, err := net.ResolveUDPAddr("udp4", "1.1.1.1:1234")
require.NoError(t, err)
conn, err := reuse.Dial("udp4", raddr)
require.NoError(t, err)
laddr := &net.UDPAddr{IP: net.IPv4zero, Port: conn.UDPConn.LocalAddr().(*net.UDPAddr).Port}
lconn, err := reuse.Listen("udp4", laddr)
require.NoError(t, err)
require.Equal(t, lconn.GetCount(), 2)
require.Equal(t, conn.GetCount(), 2)
}

func TestReuseListenOnSpecificInterface(t *testing.T) {
if platformHasRoutingTables() {
t.Skip("this test only works on platforms that support routing tables")
Expand Down