Skip to content

Commit

Permalink
When testing the delay through REST API, determine whether to store t…
Browse files Browse the repository at this point in the history
…he delay data based on certain conditions instead of discarding it directly (MetaCubeX#609)
  • Loading branch information
wzdnzd authored Jun 7, 2023
1 parent ad11a2b commit 767aa18
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 16 deletions.
35 changes: 32 additions & 3 deletions adapter/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type Proxy struct {
C.ProxyAdapter
history *queue.Queue[C.DelayHistory]
alive *atomic.Bool
url string
extra map[string]*extraProxyState
}

Expand Down Expand Up @@ -112,14 +113,14 @@ func (p *Proxy) DelayHistoryForTestUrl(url string) []C.DelayHistory {
func (p *Proxy) ExtraDelayHistory() map[string][]C.DelayHistory {
extra := map[string][]C.DelayHistory{}
if p.extra != nil && len(p.extra) != 0 {
for url, option := range p.extra {
for testUrl, option := range p.extra {
histories := []C.DelayHistory{}
queueM := option.history.Copy()
for _, item := range queueM {
histories = append(histories, item)
}

extra[url] = histories
extra[testUrl] = histories
}
}
return extra
Expand Down Expand Up @@ -187,6 +188,8 @@ func (p *Proxy) MarshalJSON() ([]byte, error) {
func (p *Proxy) URLTest(ctx context.Context, url string, expectedStatus utils.IntRanges[uint16], store C.DelayHistoryStoreType) (t uint16, err error) {
defer func() {
alive := err == nil
store = p.determineFinalStoreType(store, url)

switch store {
case C.OriginalHistory:
p.alive.Store(alive)
Expand All @@ -198,6 +201,11 @@ func (p *Proxy) URLTest(ctx context.Context, url string, expectedStatus utils.In
if p.history.Len() > defaultHistoriesNum {
p.history.Pop()
}

// test URL configured by the proxy provider
if len(p.url) == 0 {
p.url = url
}
case C.ExtraHistory:
record := C.DelayHistory{Time: time.Now()}
if alive {
Expand Down Expand Up @@ -297,7 +305,7 @@ func (p *Proxy) URLTest(ctx context.Context, url string, expectedStatus utils.In
}

func NewProxy(adapter C.ProxyAdapter) *Proxy {
return &Proxy{adapter, queue.New[C.DelayHistory](defaultHistoriesNum), atomic.NewBool(true), map[string]*extraProxyState{}}
return &Proxy{adapter, queue.New[C.DelayHistory](defaultHistoriesNum), atomic.NewBool(true), "", map[string]*extraProxyState{}}
}

func urlToMetadata(rawURL string) (addr C.Metadata, err error) {
Expand Down Expand Up @@ -326,3 +334,24 @@ func urlToMetadata(rawURL string) (addr C.Metadata, err error) {
}
return
}

func (p *Proxy) determineFinalStoreType(store C.DelayHistoryStoreType, url string) C.DelayHistoryStoreType {
if store != C.DropHistory {
return store
}

if len(p.url) == 0 || url == p.url {
return C.OriginalHistory
}

if p.extra == nil {
store = C.ExtraHistory
} else {
if _, ok := p.extra[url]; ok {
store = C.ExtraHistory
} else if len(p.extra) < 2*C.DefaultMaxHealthCheckUrlNum {
store = C.ExtraHistory
}
}
return store
}
17 changes: 10 additions & 7 deletions adapter/outboundgroup/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ var (
errFormat = errors.New("format error")
errType = errors.New("unsupported type")
errMissProxy = errors.New("`use` or `proxies` missing")
errMissHealthCheck = errors.New("`url` or `interval` missing")
errDuplicateProvider = errors.New("duplicate provider name")
)

Expand Down Expand Up @@ -81,11 +80,8 @@ func ParseProxyGroup(config map[string]any, proxyMap map[string]C.Proxy, provide
return nil, fmt.Errorf("%s: %w", groupName, errDuplicateProvider)
}

hc := provider.NewHealthCheck(ps, "", 0, true, nil)
pd, err := provider.NewCompatibleProvider(groupName, ps, hc)
if err != nil {
return nil, fmt.Errorf("%s: %w", groupName, err)
}
var url string
var interval uint

// select don't need health check
if groupOption.Type != "select" && groupOption.Type != "relay" {
Expand All @@ -97,7 +93,14 @@ func ParseProxyGroup(config map[string]any, proxyMap map[string]C.Proxy, provide
groupOption.Interval = 300
}

pd.RegisterHealthCheckTask(groupOption.URL, expectedStatus, "", uint(groupOption.Interval))
url = groupOption.URL
interval = uint(groupOption.Interval)
}

hc := provider.NewHealthCheck(ps, url, interval, true, expectedStatus)
pd, err := provider.NewCompatibleProvider(groupName, ps, hc)
if err != nil {
return nil, fmt.Errorf("%s: %w", groupName, err)
}

providers = append(providers, pd)
Expand Down
10 changes: 7 additions & 3 deletions adapter/provider/healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (

const (
defaultURLTestTimeout = time.Second * 5
defaultMaxTestUrlNum = 6
)

type HealthCheckOption struct {
Expand Down Expand Up @@ -105,8 +104,8 @@ func (hc *HealthCheck) registerHealthCheckTask(url string, expectedStatus utils.
}

// due to the time-consuming nature of health checks, a maximum of defaultMaxTestURLNum URLs can be set for testing
if len(hc.extra) > defaultMaxTestUrlNum {
log.Debugln("skip add url: %s to health check because it has reached the maximum limit: %d", url, defaultMaxTestUrlNum)
if len(hc.extra) > C.DefaultMaxHealthCheckUrlNum {
log.Debugln("skip add url: %s to health check because it has reached the maximum limit: %d", url, C.DefaultMaxHealthCheckUrlNum)
return
}

Expand Down Expand Up @@ -220,6 +219,11 @@ func (hc *HealthCheck) close() {
}

func NewHealthCheck(proxies []C.Proxy, url string, interval uint, lazy bool, expectedStatus utils.IntRanges[uint16]) *HealthCheck {
if len(url) == 0 {
interval = 0
expectedStatus = nil
}

return &HealthCheck{
proxies: proxies,
url: url,
Expand Down
7 changes: 4 additions & 3 deletions constant/adapters.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,10 @@ const (
)

const (
DefaultTCPTimeout = 5 * time.Second
DefaultUDPTimeout = DefaultTCPTimeout
DefaultTLSTimeout = DefaultTCPTimeout
DefaultTCPTimeout = 5 * time.Second
DefaultUDPTimeout = DefaultTCPTimeout
DefaultTLSTimeout = DefaultTCPTimeout
DefaultMaxHealthCheckUrlNum = 16
)

var ErrNotSupport = errors.New("no support")
Expand Down

0 comments on commit 767aa18

Please sign in to comment.