Skip to content

Commit

Permalink
cleanup lua vm in threadpool
Browse files Browse the repository at this point in the history
  • Loading branch information
zhaozg committed Dec 29, 2021
1 parent a10dd9c commit 83c3727
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 1 deletion.
6 changes: 6 additions & 0 deletions src/luv.c
Original file line number Diff line number Diff line change
Expand Up @@ -822,6 +822,12 @@ LUALIB_API int luaopen_luv (lua_State* L) {
if (ret < 0) {
return luaL_error(L, "%s: %s\n", uv_err_name(ret), uv_strerror(ret));
}

/* do cleanup in main thread */
lua_getglobal(L, "_THREAD");
if (lua_isnil(L, -1))
atexit(luv_work_cleanup);
lua_pop(L, 1);
}
// pcall is NULL, luv use default callback routine
if (ctx->cb_pcall==NULL) {
Expand Down
3 changes: 3 additions & 0 deletions src/thread.c
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,9 @@ static void luv_thread_cb(void* varg) {
lua_State* L = acquire_vm_cb();
luv_ctx_t *ctx = luv_context(L);

lua_pushboolean(L, 1);
lua_setglobal(L, "_THREAD");

//push lua function, thread entry
if (luaL_loadbuffer(L, thd->code, thd->len, "=thread") == 0) {
//push parameter for real thread function
Expand Down
74 changes: 73 additions & 1 deletion src/work.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,22 @@ typedef struct {

static uv_once_t once_vmkey = UV_ONCE_INIT;
static uv_key_t tls_vmkey; /* thread local storage key for Lua state */
static uv_mutex_t vm_mutex;

static unsigned int idx_vms = 0;
static unsigned int nvms = 0;
static lua_State** vms;
static lua_State* default_vms[4];

#ifndef ARRAY_SIZE
#define ARRAY_SIZE(a) (sizeof(a) / sizeof((a)[0]))
#endif

#if LUV_UV_VERSION_GEQ(1, 30, 0)
#define MAX_THREADPOOL_SIZE 1024
#else
#define MAX_THREADPOOL_SIZE 128
#endif

static luv_work_ctx_t* luv_check_work_ctx(lua_State* L, int index) {
luv_work_ctx_t* ctx = (luv_work_ctx_t*)luaL_checkudata(L, index, "luv_work_ctx");
Expand Down Expand Up @@ -118,6 +134,13 @@ static lua_State* luv_work_acquire_vm()
{
L = acquire_vm_cb();
uv_key_set(&tls_vmkey, L);
lua_pushboolean(L, 1);
lua_setglobal(L, "_THREAD");

uv_mutex_lock(&vm_mutex);
vms[idx_vms] = L;
idx_vms += 1;
uv_mutex_unlock(&vm_mutex);
}
return L;
}
Expand Down Expand Up @@ -226,12 +249,61 @@ static const luaL_Reg luv_work_ctx_methods[] = {

static void luv_key_init_once()
{
const char* val;
int status = uv_key_create(&tls_vmkey);
if (status != 0)
{
fprintf(stderr, "*** threadpool not works\n");
fprintf(stderr, "Error to uv_key_create with %s: %s\n", uv_err_name(status), uv_strerror(status));
fprintf(stderr, "Error to uv_key_create with %s: %s\n",
uv_err_name(status), uv_strerror(status));
abort();
}
status = uv_mutex_init(&vm_mutex);
if (status != 0)
{
fprintf(stderr, "*** threadpool not works\n");
fprintf(stderr, "Error to uv_mutex_init with %s: %s\n",
uv_err_name(status), uv_strerror(status));
abort();
}

/* ref to https://github.com/libuv/libuv/blob/v1.x/src/threadpool.c init_threads */
nvms = ARRAY_SIZE(default_vms);
val = getenv("UV_THREADPOOL_SIZE");
if (val != NULL)
nvms = atoi(val);
if (nvms == 0)
nvms = 1;
if (nvms > MAX_THREADPOOL_SIZE)
nvms = MAX_THREADPOOL_SIZE;

vms = default_vms;
if (nvms > ARRAY_SIZE(default_vms)) {
vms = malloc(nvms * sizeof(vms[0]));
if (vms == NULL) {
nvms = ARRAY_SIZE(default_vms);
vms = default_vms;
}
memset(vms, 0, sizeof(vms[0]) * nvms);
}
idx_vms = 0;
}

static void luv_work_cleanup()
{
unsigned int i;

if (nvms == 0)
return;

for (i = 0; i < nvms && vms[i]; i++)
release_vm_cb(vms[i]);

if (vms != default_vms)
free(vms);

uv_mutex_destroy(&vm_mutex);
nvms = 0;
}

static void luv_work_init(lua_State* L) {
Expand Down

0 comments on commit 83c3727

Please sign in to comment.