diff --git a/internal/core/src/segcore/segcore_init_c.cpp b/internal/core/src/segcore/segcore_init_c.cpp index 38ffbe6bc6547..dbb74db5ab7b1 100644 --- a/internal/core/src/segcore/segcore_init_c.cpp +++ b/internal/core/src/segcore/segcore_init_c.cpp @@ -9,6 +9,7 @@ // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express // or implied. See the License for the specific language governing permissions and limitations under the License +#include "pthread.h" #include "config/ConfigKnowhere.h" #include "fmt/core.h" #include "log/Log.h" @@ -105,4 +106,13 @@ GetMinimalIndexVersion() { return milvus::config::GetMinimalIndexVersion(); } +extern "C" void +SetThreadName(const char* name) { +#ifdef __linux__ + pthread_setname_np(pthread_self(), name); +#elif __APPLE__ + pthread_setname_np(name); +#endif +} + } // namespace milvus::segcore diff --git a/internal/core/src/segcore/segcore_init_c.h b/internal/core/src/segcore/segcore_init_c.h index d617d796a8406..13e1dfcc25754 100644 --- a/internal/core/src/segcore/segcore_init_c.h +++ b/internal/core/src/segcore/segcore_init_c.h @@ -56,6 +56,9 @@ GetCurrentIndexVersion(); int32_t GetMinimalIndexVersion(); +void +SetThreadName(const char*); + #ifdef __cplusplus } #endif diff --git a/internal/querynodev2/segments/pool.go b/internal/querynodev2/segments/pool.go index 7bddca6169e83..578a9c37ef423 100644 --- a/internal/querynodev2/segments/pool.go +++ b/internal/querynodev2/segments/pool.go @@ -16,6 +16,16 @@ package segments +/* +#cgo pkg-config: milvus_core + +#include +#include +#include "common/init_c.h" +#include "segcore/segcore_init_c.h" +*/ +import "C" + import ( "context" "math" @@ -51,6 +61,12 @@ var ( bfPool atomic.Pointer[conc.Pool[any]] bfApplyOnce sync.Once + + // intentionally leaked CGO tag names + cgoTagSQ = C.CString("CGO_SQ") + cgoTagLoad = C.CString("CGO_LOAD") + cgoTagDynamic = C.CString("CGO_DYN") + cgoTagWarmup = C.CString("CGO_WARMUP") ) // initSQPool initialize @@ -63,7 +79,10 @@ func initSQPool() { conc.WithPreAlloc(false), // pre alloc must be false to resize pool dynamically, use warmup to alloc worker here conc.WithDisablePurge(true), ) - conc.WarmupPool(pool, runtime.LockOSThread) + conc.WarmupPool(pool, func() { + runtime.LockOSThread() + C.SetThreadName(cgoTagSQ) + }) sqp.Store(pool) pt.Watch(pt.QueryNodeCfg.MaxReadConcurrency.Key, config.NewHandler("qn.sqpool.maxconc", ResizeSQPool)) @@ -74,15 +93,19 @@ func initSQPool() { func initDynamicPool() { dynOnce.Do(func() { + size := hardware.GetCPUNum() pool := conc.NewPool[any]( - hardware.GetCPUNum(), + size, conc.WithPreAlloc(false), conc.WithDisablePurge(false), - conc.WithPreHandler(runtime.LockOSThread), // lock os thread for cgo thread disposal + conc.WithPreHandler(func() { + runtime.LockOSThread() + C.SetThreadName(cgoTagDynamic) + }), // lock os thread for cgo thread disposal ) dp.Store(pool) - log.Info("init dynamicPool done", zap.Int("size", hardware.GetCPUNum())) + log.Info("init dynamicPool done", zap.Int("size", size)) }) } @@ -94,7 +117,10 @@ func initLoadPool() { poolSize, conc.WithPreAlloc(false), conc.WithDisablePurge(false), - conc.WithPreHandler(runtime.LockOSThread), // lock os thread for cgo thread disposal + conc.WithPreHandler(func() { + runtime.LockOSThread() + C.SetThreadName(cgoTagLoad) + }), // lock os thread for cgo thread disposal ) loadPool.Store(pool) @@ -112,8 +138,11 @@ func initWarmupPool() { poolSize, conc.WithPreAlloc(false), conc.WithDisablePurge(false), - conc.WithPreHandler(runtime.LockOSThread), // lock os thread for cgo thread disposal - conc.WithNonBlocking(true), // make warming up non blocking + conc.WithPreHandler(func() { + runtime.LockOSThread() + C.SetThreadName(cgoTagWarmup) + }), // lock os thread for cgo thread disposal + conc.WithNonBlocking(true), // make warming up non blocking ) warmupPool.Store(pool)