From 83c3727a6291d1c5e67086275e7c9d1af0026cd0 Mon Sep 17 00:00:00 2001 From: George Zhao Date: Wed, 29 Dec 2021 20:44:29 +0800 Subject: [PATCH] cleanup lua vm in threadpool --- src/luv.c | 6 +++++ src/thread.c | 3 +++ src/work.c | 74 +++++++++++++++++++++++++++++++++++++++++++++++++++- 3 files changed, 82 insertions(+), 1 deletion(-) diff --git a/src/luv.c b/src/luv.c index 7cc2277f..1d23de02 100644 --- a/src/luv.c +++ b/src/luv.c @@ -822,6 +822,12 @@ LUALIB_API int luaopen_luv (lua_State* L) { if (ret < 0) { return luaL_error(L, "%s: %s\n", uv_err_name(ret), uv_strerror(ret)); } + + /* do cleanup in main thread */ + lua_getglobal(L, "_THREAD"); + if (lua_isnil(L, -1)) + atexit(luv_work_cleanup); + lua_pop(L, 1); } // pcall is NULL, luv use default callback routine if (ctx->cb_pcall==NULL) { diff --git a/src/thread.c b/src/thread.c index 17351e8f..4ba87697 100644 --- a/src/thread.c +++ b/src/thread.c @@ -280,6 +280,9 @@ static void luv_thread_cb(void* varg) { lua_State* L = acquire_vm_cb(); luv_ctx_t *ctx = luv_context(L); + lua_pushboolean(L, 1); + lua_setglobal(L, "_THREAD"); + //push lua function, thread entry if (luaL_loadbuffer(L, thd->code, thd->len, "=thread") == 0) { //push parameter for real thread function diff --git a/src/work.c b/src/work.c index adaeaf40..b6b7561c 100644 --- a/src/work.c +++ b/src/work.c @@ -35,6 +35,22 @@ typedef struct { static uv_once_t once_vmkey = UV_ONCE_INIT; static uv_key_t tls_vmkey; /* thread local storage key for Lua state */ +static uv_mutex_t vm_mutex; + +static unsigned int idx_vms = 0; +static unsigned int nvms = 0; +static lua_State** vms; +static lua_State* default_vms[4]; + +#ifndef ARRAY_SIZE +#define ARRAY_SIZE(a) (sizeof(a) / sizeof((a)[0])) +#endif + +#if LUV_UV_VERSION_GEQ(1, 30, 0) +#define MAX_THREADPOOL_SIZE 1024 +#else +#define MAX_THREADPOOL_SIZE 128 +#endif static luv_work_ctx_t* luv_check_work_ctx(lua_State* L, int index) { luv_work_ctx_t* ctx = (luv_work_ctx_t*)luaL_checkudata(L, index, "luv_work_ctx"); @@ -118,6 +134,13 @@ static lua_State* luv_work_acquire_vm() { L = acquire_vm_cb(); uv_key_set(&tls_vmkey, L); + lua_pushboolean(L, 1); + lua_setglobal(L, "_THREAD"); + + uv_mutex_lock(&vm_mutex); + vms[idx_vms] = L; + idx_vms += 1; + uv_mutex_unlock(&vm_mutex); } return L; } @@ -226,12 +249,61 @@ static const luaL_Reg luv_work_ctx_methods[] = { static void luv_key_init_once() { + const char* val; int status = uv_key_create(&tls_vmkey); if (status != 0) { fprintf(stderr, "*** threadpool not works\n"); - fprintf(stderr, "Error to uv_key_create with %s: %s\n", uv_err_name(status), uv_strerror(status)); + fprintf(stderr, "Error to uv_key_create with %s: %s\n", + uv_err_name(status), uv_strerror(status)); + abort(); + } + status = uv_mutex_init(&vm_mutex); + if (status != 0) + { + fprintf(stderr, "*** threadpool not works\n"); + fprintf(stderr, "Error to uv_mutex_init with %s: %s\n", + uv_err_name(status), uv_strerror(status)); + abort(); + } + + /* ref to https://github.com/libuv/libuv/blob/v1.x/src/threadpool.c init_threads */ + nvms = ARRAY_SIZE(default_vms); + val = getenv("UV_THREADPOOL_SIZE"); + if (val != NULL) + nvms = atoi(val); + if (nvms == 0) + nvms = 1; + if (nvms > MAX_THREADPOOL_SIZE) + nvms = MAX_THREADPOOL_SIZE; + + vms = default_vms; + if (nvms > ARRAY_SIZE(default_vms)) { + vms = malloc(nvms * sizeof(vms[0])); + if (vms == NULL) { + nvms = ARRAY_SIZE(default_vms); + vms = default_vms; + } + memset(vms, 0, sizeof(vms[0]) * nvms); } + idx_vms = 0; +} + +static void luv_work_cleanup() +{ + unsigned int i; + + if (nvms == 0) + return; + + for (i = 0; i < nvms && vms[i]; i++) + release_vm_cb(vms[i]); + + if (vms != default_vms) + free(vms); + + uv_mutex_destroy(&vm_mutex); + nvms = 0; } static void luv_work_init(lua_State* L) {