Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v5: Health Check & LeastLoad Strategy #589

Merged
merged 72 commits into from
Jan 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
72 commits
Select commit Hold shift + click to select a range
112ca27
generate .pb.go
qjebbs Dec 31, 2020
b86a575
health checker conf
qjebbs Jan 3, 2021
1ca57bd
check logic
qjebbs Jan 4, 2021
33f39fd
implement ping
qjebbs Jan 4, 2021
a081a7b
fix check interval
qjebbs Jan 4, 2021
7ea4295
improve check results
qjebbs Jan 4, 2021
9ffb51b
health check on add outbounds
qjebbs Jan 4, 2021
c49cd65
fix tests
qjebbs Jan 4, 2021
e7d2628
fix ping handler
qjebbs Jan 4, 2021
555247a
fix min rtt < 0
qjebbs Jan 5, 2021
503e002
random alive
qjebbs Jan 5, 2021
5fdbd9b
fix check all on add outbounds
qjebbs Jan 5, 2021
edbb7c0
least load strategy
qjebbs Jan 5, 2021
44bc05d
conf codes optimize
qjebbs Jan 6, 2021
f60d7ac
improve least load strategy
qjebbs Jan 6, 2021
0e3eeda
improve health check on AddOutboundHandler
qjebbs Jan 6, 2021
3a57984
cleanup results with scheduler
qjebbs Jan 6, 2021
336ca09
health ping timeout default 5s
qjebbs Jan 6, 2021
1f23322
remove config of health ping round
qjebbs Jan 6, 2021
ac6125d
fix TestSimpleBalancer
qjebbs Jan 6, 2021
dda8911
add TestLeastLoadBalancer
qjebbs Jan 6, 2021
c762def
add todos
qjebbs Jan 6, 2021
69c8540
lint and test fix
qjebbs Jan 6, 2021
28f350e
balancer fallback
qjebbs Jan 7, 2021
6076ce2
api health stats command
qjebbs Jan 7, 2021
6a82d57
add hc cmd to perform health checks
qjebbs Jan 7, 2021
e9237e5
fix typo
qjebbs Jan 7, 2021
e2460f3
select none if no match for baselines only config
qjebbs Jan 8, 2021
f446794
add LeastLoadStrategy tests
qjebbs Jan 8, 2021
b4710ef
don't select alive on no match, go to fallback
qjebbs Jan 8, 2021
abb374f
api hci refactor
qjebbs Jan 8, 2021
f630d13
apply lint style
qjebbs Jan 8, 2021
8cae953
refactor: strategies don't need ref of balancer
qjebbs Jan 11, 2021
e1c3173
change check interval unit to seconds
qjebbs Jan 11, 2021
b94ee89
fix test
qjebbs Jan 11, 2021
50c8d49
RouterService->RoutingService
qjebbs Jan 11, 2021
74a5c90
Revert "generate .pb.go"
qjebbs Jan 11, 2021
c1b0fd9
make checks distributed
qjebbs Jan 12, 2021
20b939c
BalancingStrategy interface optimize
qjebbs Jan 12, 2021
8cf2c86
fix random selects unchecked
qjebbs Jan 12, 2021
5911694
upgrade cmd hci to bi & rename hc to bc
qjebbs Jan 12, 2021
078eb8a
fix test
qjebbs Jan 12, 2021
9830d3d
api bi sort output
qjebbs Jan 13, 2021
b61b8bf
update according to review
qjebbs Jan 15, 2021
64e1008
remove checks on add outbound
qjebbs Jan 15, 2021
101eccf
refactor: move health checker inside to strategy
qjebbs Jan 18, 2021
02c8bf2
apply lint style
qjebbs Jan 18, 2021
acfd969
code optimize
qjebbs Jan 19, 2021
4bb9d51
fix typo
qjebbs Jan 19, 2021
a7d3c3d
update desc of bc bi
qjebbs Jan 21, 2021
3d0b08e
ping with head
qjebbs Jan 22, 2021
68dcbf2
force rouds to 1 if checks not distributed
qjebbs Jan 23, 2021
d4f38cb
leatload: select by standard deviations
qjebbs Jan 25, 2021
724af9d
health ping refactor
qjebbs Jan 25, 2021
c693c8d
add maxRTT config to filter away high delay nodes
qjebbs Jan 25, 2021
5285af6
apply lint
qjebbs Jan 25, 2021
4d4aed9
cost for leastload
qjebbs Jan 27, 2021
777f51d
api bo to override balancer selecting
qjebbs Jan 27, 2021
abfdeaa
fix health ping statistics & fix test
qjebbs Jan 28, 2021
a40ba6c
check connectivity if ping fail
qjebbs Jan 28, 2021
55825b4
add tolerance setting & more detailed bi output
qjebbs Jan 28, 2021
182e138
fix connectivity check
qjebbs Jan 28, 2021
649a32e
optimize bi output
qjebbs Jan 28, 2021
cf4acc7
should not put results when network is down
qjebbs Jan 28, 2021
8e98502
fixes @_@
qjebbs Jan 28, 2021
6943b8b
mux optimize
qjebbs Jan 28, 2021
73bdb36
remove pause option of selecting overriding
qjebbs Jan 28, 2021
5de62fd
update bo desc
qjebbs Jan 28, 2021
a9cc48e
fix potential racing
qjebbs Jan 28, 2021
209f41f
simplify locking
qjebbs Jan 29, 2021
6274d74
code optimize
qjebbs Jan 29, 2021
540ae10
fix connectivity check when url not set
qjebbs Jan 29, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 31 additions & 2 deletions app/dispatcher/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ func (d *DefaultDispatcher) Dispatch(ctx context.Context, destination net.Destin
if !destination.IsValid() {
panic("Dispatcher: Invalid destination.")
}

ob := &session.Outbound{
Target: destination,
}
Expand All @@ -200,9 +201,15 @@ func (d *DefaultDispatcher) Dispatch(ctx context.Context, destination net.Destin
content = new(session.Content)
ctx = session.ContextWithContent(ctx, content)
}

handler := session.HandlerFromContext(ctx)
sniffingRequest := content.SniffingRequest
if destination.Network != net.Network_TCP || !sniffingRequest.Enabled {
go d.routedDispatch(ctx, outbound, destination)
if handler != nil {
go d.targetedDispatch(ctx, outbound, handler.Tag)
} else {
go d.routedDispatch(ctx, outbound, destination)
}
} else {
go func() {
cReader := &cachedReader{
Expand All @@ -219,7 +226,11 @@ func (d *DefaultDispatcher) Dispatch(ctx context.Context, destination net.Destin
destination.Address = net.ParseAddress(domain)
ob.Target = destination
}
d.routedDispatch(ctx, outbound, destination)
if handler != nil {
d.targetedDispatch(ctx, outbound, handler.Tag)
} else {
d.routedDispatch(ctx, outbound, destination)
}
}()
}
return inbound, nil
Expand Down Expand Up @@ -255,6 +266,24 @@ func sniffer(ctx context.Context, cReader *cachedReader) (SniffResult, error) {
}
}

func (d *DefaultDispatcher) targetedDispatch(ctx context.Context, link *transport.Link, tag string) {
handler := d.ohm.GetHandler(tag)
if handler == nil {
newError("outbound handler [", tag, "] not exist").AtError().WriteToLog(session.ExportIDToError(ctx))
common.Close(link.Writer)
common.Interrupt(link.Reader)
return
}
if accessMessage := log.AccessMessageFromContext(ctx); accessMessage != nil {
if tag := handler.Tag(); tag != "" {
accessMessage.Detour = tag
}
log.Record(accessMessage)
}

handler.Dispatch(ctx, link)
}

func (d *DefaultDispatcher) routedDispatch(ctx context.Context, link *transport.Link, destination net.Destination) {
var handler outbound.Handler

Expand Down
63 changes: 36 additions & 27 deletions app/router/balancing.go
Original file line number Diff line number Diff line change
@@ -1,44 +1,53 @@
package router

import (
"v2ray.com/core/common/dice"
"v2ray.com/core/features/outbound"
"v2ray.com/core/features/routing"
)

type BalancingStrategy interface {
PickOutbound([]string) string
}

type RandomStrategy struct {
}

func (s *RandomStrategy) PickOutbound(tags []string) string {
n := len(tags)
if n == 0 {
panic("0 tags")
}

return tags[dice.Roll(n)]
}

// Balancer represents a balancer
type Balancer struct {
selectors []string
strategy BalancingStrategy
ohm outbound.Manager
selectors []string
strategy routing.BalancingStrategy
ohm outbound.Manager
fallbackTag string

override overridden
}

// PickOutbound picks the tag of a outbound
func (b *Balancer) PickOutbound() (string, error) {
hs, ok := b.ohm.(outbound.HandlerSelector)
if !ok {
return "", newError("outbound.Manager is not a HandlerSelector")
candidates, err := b.SelectOutbounds()
if err != nil {
if b.fallbackTag != "" {
newError("fallback to [", b.fallbackTag, "], due to error: ", err).AtInfo().WriteToLog()
return b.fallbackTag, nil
}
return "", err
}
tags := hs.Select(b.selectors)
if len(tags) == 0 {
return "", newError("no available outbounds selected")
var tag string
if o := b.override.Get(); o != nil {
tag = b.strategy.Pick(o.selects)
} else {
tag = b.strategy.SelectAndPick(candidates)
}
tag := b.strategy.PickOutbound(tags)
if tag == "" {
if b.fallbackTag != "" {
newError("fallback to [", b.fallbackTag, "], due to empty tag returned").AtInfo().WriteToLog()
return b.fallbackTag, nil
}
// will use default handler
return "", newError("balancing strategy returns empty tag")
}
return tag, nil
}

// SelectOutbounds select outbounds with selectors of the Balancer
func (b *Balancer) SelectOutbounds() ([]string, error) {
hs, ok := b.ohm.(outbound.HandlerSelector)
if !ok {
return nil, newError("outbound.Manager is not a HandlerSelector")
}
tags := hs.Select(b.selectors)
return tags, nil
}
83 changes: 83 additions & 0 deletions app/router/balancing_override.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package router

import (
sync "sync"
"time"

"v2ray.com/core/features/outbound"
)

func (b *Balancer) overrideSelecting(selects []string, validity time.Duration) error {
if validity <= 0 {
b.override.Clear()
return nil
}
hs, ok := b.ohm.(outbound.HandlerSelector)
if !ok {
return newError("outbound.Manager is not a HandlerSelector")
}
tags := hs.Select(selects)
if len(tags) == 0 {
return newError("no outbound selected")
}
b.override.Put(tags, time.Now().Add(validity))
return nil
}

// OverrideSelecting implements routing.BalancingOverrider
func (r *Router) OverrideSelecting(balancer string, selects []string, validity time.Duration) error {
var b *Balancer
for tag, bl := range r.balancers {
if tag == balancer {
b = bl
break
}
}
if b == nil {
return newError("balancer '", balancer, "' not found")
}
err := b.overrideSelecting(selects, validity)
if err != nil {
return err
}
return nil
}

type overriddenSettings struct {
selects []string
until time.Time
}

type overridden struct {
access sync.RWMutex
settings overriddenSettings
}

// Get gets the overridden settings
func (o *overridden) Get() *overriddenSettings {
o.access.RLock()
defer o.access.RUnlock()
if len(o.settings.selects) == 0 || time.Now().After(o.settings.until) {
return nil
}
return &overriddenSettings{
selects: o.settings.selects,
until: o.settings.until,
}
}

// Put updates the overridden settings
func (o *overridden) Put(selects []string, until time.Time) {
o.access.Lock()
defer o.access.Unlock()
o.settings.selects = selects
o.settings.until = until
}

// Clear clears the overridden settings
func (o *overridden) Clear() {
o.access.Lock()
defer o.access.Unlock()
o.settings.selects = nil
o.settings.until = time.Time{}
}
76 changes: 76 additions & 0 deletions app/router/command/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"time"

"google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
"v2ray.com/core"
"v2ray.com/core/common"
"v2ray.com/core/features/routing"
Expand Down Expand Up @@ -72,6 +74,80 @@ func (s *routingServer) SubscribeRoutingStats(request *SubscribeRoutingStatsRequ
}
}

func (s *routingServer) GetBalancers(ctx context.Context, request *GetBalancersRequest) (*GetBalancersResponse, error) {
h, ok := s.router.(routing.RouterChecker)
if !ok {
return nil, status.Errorf(codes.Unavailable, "current router is not a health checker")
}
results, err := h.GetBalancersInfo(request.BalancerTags)
if err != nil {
return nil, status.Errorf(codes.Internal, err.Error())
}
rsp := &GetBalancersResponse{
Balancers: make([]*BalancerMsg, 0),
}
for _, result := range results {
var override *OverrideSelectingMsg
if result.Override != nil {
override = &OverrideSelectingMsg{
Until: result.Override.Until.Local().String(),
Selects: result.Override.Selects,
}
}
stat := &BalancerMsg{
Tag: result.Tag,
StrategySettings: result.Strategy.Settings,
Titles: result.Strategy.ValueTitles,
Override: override,
Selects: make([]*OutboundMsg, 0),
Others: make([]*OutboundMsg, 0),
}
for _, item := range result.Strategy.Selects {
stat.Selects = append(stat.Selects, &OutboundMsg{
Tag: item.Tag,
Values: item.Values,
})
}
for _, item := range result.Strategy.Others {
stat.Others = append(stat.Others, &OutboundMsg{
Tag: item.Tag,
Values: item.Values,
})
}
rsp.Balancers = append(rsp.Balancers, stat)
}
return rsp, nil
}
func (s *routingServer) CheckBalancers(ctx context.Context, request *CheckBalancersRequest) (*CheckBalancersResponse, error) {
h, ok := s.router.(routing.RouterChecker)
if !ok {
return nil, status.Errorf(codes.Unavailable, "current router is not a health checker")
}
go func() {
err := h.CheckBalancers(request.BalancerTags)
if err != nil {
newError("CheckBalancers error:", err).AtInfo().WriteToLog()
}
}()
return &CheckBalancersResponse{}, nil
}

func (s *routingServer) OverrideSelecting(ctx context.Context, request *OverrideSelectingRequest) (*OverrideSelectingResponse, error) {
bo, ok := s.router.(routing.BalancingOverrider)
if !ok {
return nil, status.Errorf(codes.Unavailable, "current router doesn't support balancing override")
}
err := bo.OverrideSelecting(
request.BalancerTag,
request.Selectors,
time.Duration(request.Validity),
)
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, err.Error())
}
return &OverrideSelectingResponse{}, nil
}

func (s *routingServer) mustEmbedUnimplementedRoutingServiceServer() {}

type service struct {
Expand Down
Loading