Skip to content

Commit

Permalink
golang http fitler: fix race when waiting callback from Envoy (envoyp…
Browse files Browse the repository at this point in the history
…roxy#32081)

fix envoyproxy#31654 and envoyproxy#29496

Thanks @spacewander for founding this race:
envoyGoRequestSemaDec may be invoked eariler than r.sema.Wait, then, there is no one to resume the r.sema.Wait.

Commit Message:
Additional Description:
Risk Level:
Testing:
Docs Changes:
Release Notes:
Platform Specific Features:
Fixes: envoyproxy#31654, envoyproxy#29496


Signed-off-by: doujiang24 <doujiang24@gmail.com>
  • Loading branch information
doujiang24 authored and spacewander committed Jun 25, 2024
1 parent d13ed04 commit 0f03ee4
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 29 deletions.
22 changes: 9 additions & 13 deletions contrib/golang/filters/http/source/go/pkg/http/capi_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
"errors"
"runtime"
"strings"
"sync/atomic"
"unsafe"

"google.golang.org/protobuf/proto"
Expand Down Expand Up @@ -321,17 +320,16 @@ func (c *httpCApiImpl) HttpGetDynamicMetadata(rr unsafe.Pointer, filterName stri
r := (*httpRequest)(rr)
r.mutex.Lock()
defer r.mutex.Unlock()
r.sema.Add(1)
r.markMayWaitingCallback()

var valueData C.uint64_t
var valueLen C.int
res := C.envoyGoFilterHttpGetDynamicMetadata(unsafe.Pointer(r.req),
unsafe.Pointer(unsafe.StringData(filterName)), C.int(len(filterName)), &valueData, &valueLen)
if res == C.CAPIYield {
atomic.AddInt32(&r.waitingOnEnvoy, 1)
r.sema.Wait()
r.checkOrWaitCallback()
} else {
r.sema.Done()
r.markNoWaitingCallback()
handleCApiStatus(res)
}
buf := unsafe.Slice((*byte)(unsafe.Pointer(uintptr(valueData))), int(valueLen))
Expand Down Expand Up @@ -394,14 +392,13 @@ func (c *httpCApiImpl) HttpGetStringFilterState(rr unsafe.Pointer, key string) s
var valueLen C.int
r.mutex.Lock()
defer r.mutex.Unlock()
r.sema.Add(1)
r.markMayWaitingCallback()
res := C.envoyGoFilterHttpGetStringFilterState(unsafe.Pointer(r.req),
unsafe.Pointer(unsafe.StringData(key)), C.int(len(key)), &valueData, &valueLen)
if res == C.CAPIYield {
atomic.AddInt32(&r.waitingOnEnvoy, 1)
r.sema.Wait()
r.checkOrWaitCallback()
} else {
r.sema.Done()
r.markNoWaitingCallback()
handleCApiStatus(res)
}

Expand All @@ -416,15 +413,14 @@ func (c *httpCApiImpl) HttpGetStringProperty(rr unsafe.Pointer, key string) (str
var rc C.int
r.mutex.Lock()
defer r.mutex.Unlock()
r.sema.Add(1)
r.markMayWaitingCallback()
res := C.envoyGoFilterHttpGetStringProperty(unsafe.Pointer(r.req),
unsafe.Pointer(unsafe.StringData(key)), C.int(len(key)), &valueData, &valueLen, &rc)
if res == C.CAPIYield {
atomic.AddInt32(&r.waitingOnEnvoy, 1)
r.sema.Wait()
r.checkOrWaitCallback()
res = C.CAPIStatus(rc)
} else {
r.sema.Done()
r.markNoWaitingCallback()
handleCApiStatus(res)
}

Expand Down
63 changes: 57 additions & 6 deletions contrib/golang/filters/http/source/go/pkg/http/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"fmt"
"runtime"
"sync"
"sync/atomic"
"unsafe"

"github.com/envoyproxy/envoy/contrib/golang/common/go/api"
Expand All @@ -47,6 +48,11 @@ const (
HTTP30 = "HTTP/3.0"
)

const (
NoWaitingCallback = 0
MayWaitingCallback = 1
)

var protocolsIdToName = map[uint64]string{
0: HTTP10,
1: HTTP11,
Expand All @@ -59,12 +65,57 @@ type panicInfo struct {
details string
}
type httpRequest struct {
req *C.httpRequest
httpFilter api.StreamFilter
pInfo panicInfo
sema sync.WaitGroup
waitingOnEnvoy int32
mutex sync.Mutex
req *C.httpRequest
httpFilter api.StreamFilter
pInfo panicInfo
waitingLock sync.Mutex // protect waitingCallback
cond sync.Cond
waitingCallback int32

// protect multiple cases:
// 1. protect req_->strValue in the C++ side from being used concurrently.
// 2. protect waitingCallback from being modified in markMayWaitingCallback concurrently.
mutex sync.Mutex
}

// markWaitingOnEnvoy marks the request may be waiting a callback from envoy.
// Must be the NoWaitingCallback state since it's invoked under the r.mutex lock.
// We do not do lock waitingCallback here, to reduce lock contention.
func (r *httpRequest) markMayWaitingCallback() {
if !atomic.CompareAndSwapInt32(&r.waitingCallback, NoWaitingCallback, MayWaitingCallback) {
panic("markWaitingCallback: unexpected state")
}
}

// markNoWaitingOnEnvoy marks the request is not waiting a callback from envoy.
// Can not make sure it's in the MayWaitingCallback state, since the state maybe changed by OnDestroy.
func (r *httpRequest) markNoWaitingCallback() {
atomic.StoreInt32(&r.waitingCallback, NoWaitingCallback)
}

// checkOrWaitCallback checks if we need to wait a callback from envoy, and wait it.
func (r *httpRequest) checkOrWaitCallback() {
// need acquire the lock, since there might be concurrency race with resumeWaitCallback.
r.cond.L.Lock()
defer r.cond.L.Unlock()

// callback or OnDestroy already called, no need to wait.
if atomic.LoadInt32(&r.waitingCallback) == NoWaitingCallback {
return
}
r.cond.Wait()
}

// resumeWaitCallback resumes the goroutine that waiting for the callback from envoy.
func (r *httpRequest) resumeWaitCallback() {
// need acquire the lock, since there might be concurrency race with checkOrWaitCallback.
r.cond.L.Lock()
defer r.cond.L.Unlock()

if atomic.CompareAndSwapInt32(&r.waitingCallback, MayWaitingCallback, NoWaitingCallback) {
// Broadcast is safe even there is no waiters.
r.cond.Broadcast()
}
}

func (r *httpRequest) pluginName() string {
Expand Down
14 changes: 4 additions & 10 deletions contrib/golang/filters/http/source/go/pkg/http/shim.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import (
"fmt"
"runtime"
"sync"
"sync/atomic"

"github.com/envoyproxy/envoy/contrib/golang/common/go/api"
)
Expand Down Expand Up @@ -83,6 +82,7 @@ func createRequest(r *C.httpRequest) *httpRequest {
req := &httpRequest{
req: r,
}
req.cond.L = &req.waitingLock
// NP: make sure filter will be deleted.
runtime.SetFinalizer(req, requestFinalize)

Expand Down Expand Up @@ -214,9 +214,6 @@ func envoyGoFilterOnHttpLog(r *C.httpRequest, logType uint64) {
}

defer req.RecoverPanic()
if atomic.CompareAndSwapInt32(&req.waitingOnEnvoy, 1, 0) {
req.sema.Done()
}

v := api.AccessLogType(logType)

Expand All @@ -238,9 +235,8 @@ func envoyGoFilterOnHttpDestroy(r *C.httpRequest, reason uint64) {
req := getRequest(r)
// do nothing even when req.panic is true, since filter is already destroying.
defer req.RecoverPanic()
if atomic.CompareAndSwapInt32(&req.waitingOnEnvoy, 1, 0) {
req.sema.Done()
}

req.resumeWaitCallback()

v := api.DestroyReason(reason)

Expand All @@ -259,7 +255,5 @@ func envoyGoFilterOnHttpDestroy(r *C.httpRequest, reason uint64) {
func envoyGoRequestSemaDec(r *C.httpRequest) {
req := getRequest(r)
defer req.RecoverPanic()
if atomic.CompareAndSwapInt32(&req.waitingOnEnvoy, 1, 0) {
req.sema.Done()
}
req.resumeWaitCallback()
}

0 comments on commit 0f03ee4

Please sign in to comment.