Skip to content

Commit

Permalink
Least load balancer (#2999)
Browse files Browse the repository at this point in the history
* v5: Health Check & LeastLoad Strategy (rebased from 2c5a71490368500a982018a74a6d519c7e121816)

Some changes will be necessary to integrate it into V2Ray

* Update proto

* parse duration conf with time.Parse()

* moving health ping to observatory as a standalone component

* moving health ping to observatory as a standalone component: auto generated file

* add initialization for health ping

* incorporate changes in router implementation

* support principle target output

* add v4 json support for BurstObservatory & fix balancer reference

* update API command

* remove cancelled API

* return zero length value when observer is not found

* remove duplicated targeted dispatch

* adjust test with updated structure

* bug fix for observer

* fix strategy selector

* fix strategy least load

* Fix ticker usage

ticker.Close does not close ticker.C

* feat: Replace default Health Ping URL to HTTPS (#1991)

* fix selectLeastLoad() returns wrong number of nodes (#2083)

* Test: fix leastload strategy unit test

* fix(router): panic caused by concurrent map read and write (#2678)

* Clean up code

---------

Co-authored-by: Jebbs <qjebbs@gmail.com>
Co-authored-by: Shelikhoo <xiaokangwang@outlook.com>
Co-authored-by: 世界 <i@sekai.icu>
Co-authored-by: Bernd Eichelberger <46166740+4-FLOSS-Free-Libre-Open-Source-Software@users.noreply.github.com>
Co-authored-by: 秋のかえで <autmaple@protonmail.com>
Co-authored-by: Rinka <kujourinka@gmail.com>
  • Loading branch information
7 people authored Feb 18, 2024
1 parent bf02392 commit fa5d7a2
Show file tree
Hide file tree
Showing 105 changed files with 3,520 additions and 426 deletions.
2 changes: 1 addition & 1 deletion app/commander/config.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion app/dispatcher/config.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion app/dispatcher/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ func (d *DefaultDispatcher) Dispatch(ctx context.Context, destination net.Destin
content = new(session.Content)
ctx = session.ContextWithContent(ctx, content)
}

sniffingRequest := content.SniffingRequest
inbound, outbound := d.getLink(ctx)
if !sniffingRequest.Enabled {
Expand Down Expand Up @@ -366,7 +367,6 @@ func sniffer(ctx context.Context, cReader *cachedReader, metadataOnly bool, netw
}
return contentResult, contentErr
}

func (d *DefaultDispatcher) routedDispatch(ctx context.Context, link *transport.Link, destination net.Destination) {
ob := session.OutboundFromContext(ctx)
if hosts, ok := d.dns.(dns.HostsLookup); ok && destination.Address.Family().IsDomain() {
Expand Down
2 changes: 1 addition & 1 deletion app/dns/config.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion app/dns/fakedns/fakedns.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion app/log/command/config.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion app/log/config.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion app/metrics/config.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions app/observatory/burst/burst.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package burst

import (
"math"
"time"
)

//go:generate go run github.com/v2fly/v2ray-core/v4/common/errors/errorgen

const (
rttFailed = time.Duration(math.MaxInt64 - iota)
rttUntested
rttUnqualified
)
108 changes: 108 additions & 0 deletions app/observatory/burst/burstobserver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package burst

import (
"context"

"github.com/xtls/xray-core/core"
"github.com/xtls/xray-core/app/observatory"
"github.com/xtls/xray-core/common"
"github.com/xtls/xray-core/common/signal/done"
"github.com/xtls/xray-core/features/extension"
"github.com/xtls/xray-core/features/outbound"
"google.golang.org/protobuf/proto"
"sync"
)

type Observer struct {
config *Config
ctx context.Context

statusLock sync.Mutex
hp *HealthPing

finished *done.Instance

ohm outbound.Manager
}

func (o *Observer) GetObservation(ctx context.Context) (proto.Message, error) {
return &observatory.ObservationResult{Status: o.createResult()}, nil
}

func (o *Observer) createResult() []*observatory.OutboundStatus {
var result []*observatory.OutboundStatus
o.hp.access.Lock()
defer o.hp.access.Unlock()
for name, value := range o.hp.Results {
status := observatory.OutboundStatus{
Alive: value.getStatistics().All != value.getStatistics().Fail,
Delay: value.getStatistics().Average.Milliseconds(),
LastErrorReason: "",
OutboundTag: name,
LastSeenTime: 0,
LastTryTime: 0,
HealthPing: &observatory.HealthPingMeasurementResult{
All: int64(value.getStatistics().All),
Fail: int64(value.getStatistics().Fail),
Deviation: int64(value.getStatistics().Deviation),
Average: int64(value.getStatistics().Average),
Max: int64(value.getStatistics().Max),
Min: int64(value.getStatistics().Min),
},
}
result = append(result, &status)
}
return result
}

func (o *Observer) Type() interface{} {
return extension.ObservatoryType()
}

func (o *Observer) Start() error {
if o.config != nil && len(o.config.SubjectSelector) != 0 {
o.finished = done.New()
o.hp.StartScheduler(func() ([]string, error) {
hs, ok := o.ohm.(outbound.HandlerSelector)
if !ok {

return nil, newError("outbound.Manager is not a HandlerSelector")
}

outbounds := hs.Select(o.config.SubjectSelector)
return outbounds, nil
})
}
return nil
}

func (o *Observer) Close() error {
if o.finished != nil {
o.hp.StopScheduler()
return o.finished.Close()
}
return nil
}

func New(ctx context.Context, config *Config) (*Observer, error) {
var outboundManager outbound.Manager
err := core.RequireFeatures(ctx, func(om outbound.Manager) {
outboundManager = om
})
if err != nil {
return nil, newError("Cannot get depended features").Base(err)
}
hp := NewHealthPing(ctx, config.PingConfig)
return &Observer{
config: config,
ctx: ctx,
ohm: outboundManager,
hp: hp,
}, nil
}

func init() {
common.Must(common.RegisterConfig((*Config)(nil), func(ctx context.Context, config interface{}) (interface{}, error) {
return New(ctx, config.(*Config))
}))
}
Loading

0 comments on commit fa5d7a2

Please sign in to comment.