Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

quic: prioritise listen connections for reuse #2262

Merged
merged 9 commits into from
May 9, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 48 additions & 15 deletions p2p/transport/quicreuse/reuse.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,19 @@ type reuse struct {
unicast map[string] /* IP.String() */ map[int] /* port */ *reuseConn
// global contains connections that are listening on 0.0.0.0 / ::
global map[int]*reuseConn
sukunrt marked this conversation as resolved.
Show resolved Hide resolved
// globalFallback contains connections that we've dialed out from. connections
// are reused from this map if no connection is available in the global
// map. These connections are listening on 0.0.0.0 / ::
globalFallback map[int]*reuseConn
sukunrt marked this conversation as resolved.
Show resolved Hide resolved
}

func newReuse() *reuse {
r := &reuse{
unicast: make(map[string]map[int]*reuseConn),
global: make(map[int]*reuseConn),
closeChan: make(chan struct{}),
gcStopChan: make(chan struct{}),
unicast: make(map[string]map[int]*reuseConn),
global: make(map[int]*reuseConn),
globalFallback: make(map[int]*reuseConn),
closeChan: make(chan struct{}),
gcStopChan: make(chan struct{}),
}
go r.gc()
return r
Expand All @@ -95,6 +100,9 @@ func (r *reuse) gc() {
for _, conn := range r.global {
conn.Close()
}
for _, conn := range r.globalFallback {
conn.Close()
}
for _, conns := range r.unicast {
for _, conn := range conns {
conn.Close()
Expand All @@ -119,6 +127,12 @@ func (r *reuse) gc() {
delete(r.global, key)
}
}
for key, conn := range r.globalFallback {
if conn.ShouldGarbageCollect(now) {
conn.Close()
delete(r.globalFallback, key)
}
}
for ukey, conns := range r.unicast {
for key, conn := range conns {
if conn.ShouldGarbageCollect(now) {
Expand Down Expand Up @@ -189,6 +203,11 @@ func (r *reuse) dialLocked(network string, source *net.IP) (*reuseConn, error) {
return conn, nil
}

// Use a connection we've previously dialed from
for _, conn := range r.globalFallback {
return conn, nil
}

// We don't have a connection that we can use for dialing.
// Dial a new connection from a random port.
var addr *net.UDPAddr
Expand All @@ -203,29 +222,43 @@ func (r *reuse) dialLocked(network string, source *net.IP) (*reuseConn, error) {
return nil, err
}
rconn := newReuseConn(conn)
r.global[conn.LocalAddr().(*net.UDPAddr).Port] = rconn
r.globalFallback[conn.LocalAddr().(*net.UDPAddr).Port] = rconn
return rconn, nil
}

func (r *reuse) Listen(network string, laddr *net.UDPAddr) (*reuseConn, error) {
conn, err := net.ListenUDP(network, laddr)
if err != nil {
return nil, err
r.mutex.Lock()
defer r.mutex.Unlock()

var rconn *reuseConn
var localAddr *net.UDPAddr

// reuse the fallback connection if we've dialed out from this port already
if laddr.IP.IsUnspecified() {
if _, ok := r.globalFallback[laddr.Port]; ok {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to handle listening on port 0?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so. An entry in the map(listeners or dialers) will never have port 0 because we add the finally assigned listener port to the map and not 0. So if a listen call happens with port 0 and there is an entry in globalDialers map, we will not find a match and then net.ListenUDP will give use a new port which is not in the globalDialers map.

Do you think this makes sense?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So if a listen call happens with port 0 and there is an entry in globalDialers map, we will not find a match and then net.ListenUDP will give use a new port which is not in the globalDialers map.

That's what I meant. If the port is 0, the user doesn't care which port is assigned. So we might as well reuse a connection that we used before, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, that's a nice idea. will fix.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed this. updated the test TestReuseConnectionWhenDialBeforeListen

rconn = r.globalFallback[laddr.Port]
localAddr = rconn.UDPConn.LocalAddr().(*net.UDPAddr)
}
}
if rconn == nil {
conn, err := net.ListenUDP(network, laddr)
if err != nil {
return nil, err
}
localAddr = conn.LocalAddr().(*net.UDPAddr)
rconn = newReuseConn(conn)
}
localAddr := conn.LocalAddr().(*net.UDPAddr)

rconn := newReuseConn(conn)
rconn.IncreaseCount()

r.mutex.Lock()
defer r.mutex.Unlock()

// Deal with listen on a global address
if localAddr.IP.IsUnspecified() {
// The kernel already checked that the laddr is not already listen
// so we need not check here (when we create ListenUDP).
r.global[localAddr.Port] = rconn
return rconn, err
// delete the entry from fallback map in case we are reusing this connection
delete(r.globalFallback, localAddr.Port)
return rconn, nil
sukunrt marked this conversation as resolved.
Show resolved Hide resolved
}

// Deal with listen on a unicast address
Expand All @@ -239,7 +272,7 @@ func (r *reuse) Listen(network string, laddr *net.UDPAddr) (*reuseConn, error) {
// The kernel already checked that the laddr is not already listen
// so we need not check here (when we create ListenUDP).
r.unicast[localAddr.IP.String()][localAddr.Port] = rconn
return rconn, err
return rconn, nil
}

func (r *reuse) Close() error {
Expand Down
44 changes: 44 additions & 0 deletions p2p/transport/quicreuse/reuse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ func closeAllConns(reuse *reuse) {
conn.DecreaseCount()
}
}
for _, conn := range reuse.globalFallback {
for conn.GetCount() > 0 {
conn.DecreaseCount()
}
}
for _, conns := range reuse.unicast {
for _, conn := range conns {
for conn.GetCount() > 0 {
Expand Down Expand Up @@ -110,6 +115,45 @@ func TestReuseConnectionWhenDialing(t *testing.T) {
require.Equal(t, conn.GetCount(), 2)
}

func TestReuseConnectionWhenListening(t *testing.T) {
reuse := newReuse()
cleanup(t, reuse)

raddr, err := net.ResolveUDPAddr("udp4", "1.1.1.1:1234")
require.NoError(t, err)
conn, err := reuse.Dial("udp4", raddr)
require.NoError(t, err)
laddr := &net.UDPAddr{IP: net.IPv4zero, Port: conn.UDPConn.LocalAddr().(*net.UDPAddr).Port}
lconn, err := reuse.Listen("udp4", laddr)
require.NoError(t, err)
require.Equal(t, lconn.GetCount(), 2)
require.Equal(t, conn.GetCount(), 2)
}

func TestReuseConnectionWhenDialBeforeListen(t *testing.T) {
reuse := newReuse()
cleanup(t, reuse)

// dial any address
raddr, err := net.ResolveUDPAddr("udp4", "1.1.1.1:1234")
require.NoError(t, err)
_, err = reuse.Dial("udp4", raddr)
require.NoError(t, err)

// open a listener
laddr := &net.UDPAddr{IP: net.IPv4zero, Port: 10000}
lconn, err := reuse.Listen("udp4", laddr)
require.NoError(t, err)

// new dials should go via the listener connection
raddr, err = net.ResolveUDPAddr("udp4", "1.1.1.1:1235")
require.NoError(t, err)
conn, err := reuse.Dial("udp4", raddr)
require.NoError(t, err)
require.Equal(t, conn, lconn)
require.Equal(t, conn.GetCount(), 2)
}

func TestReuseListenOnSpecificInterface(t *testing.T) {
if platformHasRoutingTables() {
t.Skip("this test only works on platforms that support routing tables")
Expand Down