Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support least connections #98

Merged
merged 9 commits into from
Dec 13, 2023
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 22 additions & 27 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func (b *Backend) getServerStatus() string {

// BackendStats holds server stats for backend
type BackendStats struct {
sync.RWMutex
sync.Mutex
LastDowntime time.Duration
CumDowntime time.Duration
TotCalls int64
Expand All @@ -195,8 +195,8 @@ type BackendStats struct {
Tx int64
UpSince time.Time
DowntimeStart time.Time
LastFinished time.Time
CurrentCalls int
LastFinished int64
CurrentCalls int64
jiuker marked this conversation as resolved.
Show resolved Hide resolved
}

const errMessage = `<?xml version="1.0" encoding="UTF-8"?><Error><Code>BackendDown</Code><Message>The remote server returned an error (%v)</Message><Resource>%s</Resource></Error>`
Expand Down Expand Up @@ -447,42 +447,38 @@ func (s *site) upBackends() []*Backend {
}

// Returns the next backend the request should go to.
func (s *site) nextProxy() *Backend {
func (s *site) nextProxy() (*Backend, func()) {
backends := s.upBackends()
if len(backends) == 0 {
return nil
return nil, func() {}
}
min := math.MaxInt32
earliest := time.Now().Add(time.Second)
min := int64(math.MaxInt32)
earliest := time.Now().Add(time.Second).UnixNano()
idx := 0
for i, backend := range backends {
backend.Stats.RLock()
if backend.Stats.CurrentCalls < min {
min = backend.Stats.CurrentCalls
if backend.Stats.LastFinished.Before(earliest) {
earliest = backend.Stats.LastFinished
currentCalls := atomic.LoadInt64(&backend.Stats.CurrentCalls)
if currentCalls < min {
min = currentCalls
lastFinished := atomic.LoadInt64(&backend.Stats.LastFinished)
if lastFinished < earliest {
earliest = lastFinished
idx = i
}
}
backend.Stats.RUnlock()
}
// random backend from a list of available backends.
backend := backends[idx]
jiuker marked this conversation as resolved.
Show resolved Hide resolved
backend.Stats.Lock()
backend.Stats.CurrentCalls++
backend.Stats.Unlock()
return backend
atomic.AddInt64(&backend.Stats.CurrentCalls, 1)
return backend, func() {
atomic.AddInt64(&backend.Stats.CurrentCalls, -1)
atomic.StoreInt64(&backend.Stats.LastFinished, time.Now().UnixNano())
}
}

// ServeHTTP - LoadBalancer implements http.Handler
func (s *site) ServeHTTP(w http.ResponseWriter, r *http.Request) {
backend := s.nextProxy()
defer func() {
backend.Stats.Lock()
backend.Stats.CurrentCalls--
backend.Stats.LastFinished = time.Now()
backend.Stats.Unlock()
}()
backend, done := s.nextProxy()
defer done()
if backend != nil && backend.Online() {
httpTraceHdrs(backend.proxy.ServeHTTP, w, r, backend)
return
Expand Down Expand Up @@ -743,7 +739,7 @@ func configureSite(ctx *cli.Context, siteNum int, siteStrs []string, healthCheck
var backends []*Backend
var prevScheme string
var transport http.RoundTripper
for i, endpoint := range endpoints {
for _, endpoint := range endpoints {
endpoint = strings.TrimSuffix(endpoint, slashSeparator)
target, err := url.Parse(endpoint)
if err != nil {
Expand Down Expand Up @@ -788,8 +784,7 @@ func configureSite(ctx *cli.Context, siteNum int, siteStrs []string, healthCheck
Transport: transport,
ModifyResponse: modifyResponse(),
}
off := rand.New(rand.NewSource(time.Now().UnixNano())).Intn(len(endpoints))
stats := BackendStats{MinLatency: 24 * time.Hour, MaxLatency: 0, LastFinished: time.Now().Add(time.Duration(i + off%len(endpoints)))}
stats := BackendStats{MinLatency: 24 * time.Hour, MaxLatency: 0}
healthCheckURL, err := getHealthCheckURL(endpoint, healthCheckPath, healthCheckPort)
if err != nil {
console.Fatalln(err)
Expand Down
Loading