Skip to content

Commit

Permalink
Stop using fixedConnMapping
Browse files Browse the repository at this point in the history
  • Loading branch information
cbeuw committed Apr 14, 2024
1 parent de4dab6 commit 5988b43
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 31 deletions.
1 change: 1 addition & 0 deletions internal/multiplex/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ func (sesh *Session) recvDataFromRemote(data []byte) error {
}

func (sesh *Session) SetTerminalMsg(msg string) {
log.Debug("terminal message set to " + msg)
sesh.terminalMsgSetter.Do(func() {
sesh.terminalMsg = msg
})
Expand Down
52 changes: 21 additions & 31 deletions internal/multiplex/switchboard.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,12 @@ package multiplex

import (
"errors"
"math/rand"
"github.com/cbeuw/Cloak/internal/common"
log "github.com/sirupsen/logrus"
"math/rand/v2"
"net"
"sync"
"sync/atomic"
"time"

log "github.com/sirupsen/logrus"
)

type switchboardStrategy int
Expand Down Expand Up @@ -39,19 +38,14 @@ type switchboard struct {
}

func makeSwitchboard(sesh *Session) *switchboard {
var strategy switchboardStrategy
if sesh.Unordered {
log.Debug("Connection is unordered")
strategy = uniformSpread
} else {
strategy = fixedConnMapping
}
sb := &switchboard{
session: sesh,
strategy: strategy,
strategy: uniformSpread,
valve: sesh.Valve,
randPool: sync.Pool{New: func() interface{} {
return rand.New(rand.NewSource(int64(time.Now().Nanosecond())))
var state [32]byte
common.CryptoRandRead(state[:])
return rand.New(rand.NewChaCha8(state))
}},
}
return sb
Expand All @@ -60,8 +54,8 @@ func makeSwitchboard(sesh *Session) *switchboard {
var errBrokenSwitchboard = errors.New("the switchboard is broken")

func (sb *switchboard) addConn(conn net.Conn) {
atomic.AddUint32(&sb.connsCount, 1)
sb.conns.Store(conn, conn)
connId := atomic.AddUint32(&sb.connsCount, 1) - 1
sb.conns.Store(connId, conn)
go sb.deplex(conn)
}

Expand All @@ -86,6 +80,9 @@ func (sb *switchboard) send(data []byte, assignedConn *net.Conn) (n int, err err
return n, err
}
case fixedConnMapping:
// FIXME: this strategy has a tendency to cause a TLS conn socket buffer to fill up,
// which is a problem when multiple streams are mapped to the same conn, resulting
// in all such streams being blocked.
conn = *assignedConn
if conn == nil {
conn, err = sb.pickRandConn()
Expand All @@ -110,7 +107,7 @@ func (sb *switchboard) send(data []byte, assignedConn *net.Conn) (n int, err err
return n, nil
}

// returns a random connId
// returns a random conn. This function can be called concurrently.
func (sb *switchboard) pickRandConn() (net.Conn, error) {
if atomic.LoadUint32(&sb.broken) == 1 {
return nil, errBrokenSwitchboard
Expand All @@ -122,33 +119,26 @@ func (sb *switchboard) pickRandConn() (net.Conn, error) {
}

randReader := sb.randPool.Get().(*rand.Rand)

r := randReader.Intn(int(connsCount))
connId := randReader.Uint32N(connsCount)
sb.randPool.Put(randReader)

var c int
var ret net.Conn
sb.conns.Range(func(_, conn interface{}) bool {
if r == c {
ret = conn.(net.Conn)
return false
}
c++
return true
})

return ret, nil
ret, ok := sb.conns.Load(connId)
if !ok {
log.Errorf("failed to get conn %d", connId)
return nil, errBrokenSwitchboard
}
return ret.(net.Conn), nil
}

// actively triggered by session.Close()
func (sb *switchboard) closeAll() {
if !atomic.CompareAndSwapUint32(&sb.broken, 0, 1) {
return
}
atomic.StoreUint32(&sb.connsCount, 0)
sb.conns.Range(func(_, conn interface{}) bool {
conn.(net.Conn).Close()
sb.conns.Delete(conn)
atomic.AddUint32(&sb.connsCount, ^uint32(0))
return true
})
}
Expand Down

0 comments on commit 5988b43

Please sign in to comment.