Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

simplify vm management in thread and threadpool #577

Merged
merged 2 commits into from
Dec 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion src/lthreadpool.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ typedef struct {
int argc;
int flags; // control gc

lua_State *L;
luv_val_t argv[LUV_THREAD_MAXNUM_ARG];
} luv_thread_arg_t;

Expand Down
6 changes: 6 additions & 0 deletions src/luv.c
Original file line number Diff line number Diff line change
Expand Up @@ -822,6 +822,12 @@ LUALIB_API int luaopen_luv (lua_State* L) {
if (ret < 0) {
return luaL_error(L, "%s: %s\n", uv_err_name(ret), uv_strerror(ret));
}

/* do cleanup in main thread */
lua_getglobal(L, "_THREAD");
if (lua_isnil(L, -1))
atexit(luv_work_cleanup);
lua_pop(L, 1);
}
// pcall is NULL, luv use default callback routine
if (ctx->cb_pcall==NULL) {
Expand Down
7 changes: 4 additions & 3 deletions src/thread.c
Original file line number Diff line number Diff line change
Expand Up @@ -277,9 +277,12 @@ static int luv_thread_tostring(lua_State* L)
static void luv_thread_cb(void* varg) {
//acquire vm and get top
luv_thread_t* thd = (luv_thread_t*)varg;
lua_State* L = thd->args.L;
lua_State* L = acquire_vm_cb();
luv_ctx_t *ctx = luv_context(L);

lua_pushboolean(L, 1);
lua_setglobal(L, "_THREAD");

//push lua function, thread entry
if (luaL_loadbuffer(L, thd->code, thd->len, "=thread") == 0) {
//push parameter for real thread function
Expand Down Expand Up @@ -344,14 +347,12 @@ static int luv_new_thread(lua_State* L) {
}
thread->len = len;

thread->args.L = acquire_vm_cb();
#if LUV_UV_VERSION_GEQ(1, 26, 0)
ret = uv_thread_create_ex(&thread->handle, &options, luv_thread_cb, thread);
#else
ret = uv_thread_create(&thread->handle, luv_thread_cb, thread);
#endif
if (ret < 0) {
release_vm_cb(thread->args.L);
return luv_error(L, ret);
}

Expand Down
158 changes: 100 additions & 58 deletions src/work.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ typedef struct {
size_t len;

int after_work_cb; /* ref, run in main ,call after work cb*/
int pool_ref; /* ref of lua_State cache array */
} luv_work_ctx_t;

typedef struct {
Expand All @@ -34,27 +33,35 @@ typedef struct {
int ref; /* ref to luv_work_ctx_t, which create a new uv_work_t*/
} luv_work_t;

static uv_once_t once_vmkey = UV_ONCE_INIT;
static uv_key_t tls_vmkey; /* thread local storage key for Lua state */
static uv_mutex_t vm_mutex;

static unsigned int idx_vms = 0;
static unsigned int nvms = 0;
static lua_State** vms;
static lua_State* default_vms[4];

#ifndef ARRAY_SIZE
#define ARRAY_SIZE(a) (sizeof(a) / sizeof((a)[0]))
#endif

#if LUV_UV_VERSION_GEQ(1, 30, 0)
#define MAX_THREADPOOL_SIZE 1024
#else
#define MAX_THREADPOOL_SIZE 128
#endif

static luv_work_ctx_t* luv_check_work_ctx(lua_State* L, int index) {
luv_work_ctx_t* ctx = (luv_work_ctx_t*)luaL_checkudata(L, index, "luv_work_ctx");
return ctx;
}

static int luv_work_ctx_gc(lua_State *L) {
int i, n;
luv_work_ctx_t* ctx = luv_check_work_ctx(L, 1);
free(ctx->code);
luaL_unref(L, LUA_REGISTRYINDEX, ctx->after_work_cb);

lua_rawgeti(L, LUA_REGISTRYINDEX, ctx->pool_ref);
n = lua_rawlen(L, -1);
for (i=1; i<=n; i++) {
lua_State *S;
lua_rawgeti(L, -1, i);
S = *(lua_State**)lua_touserdata(L, -1);
release_vm_cb(S);
lua_pop(L, 1);
}
luaL_unref(L, LUA_REGISTRYINDEX, ctx->pool_ref);
return 0;
}

Expand Down Expand Up @@ -115,15 +122,32 @@ static int luv_work_cb(lua_State* L) {
return luaL_error(L, "Uncaught Error: %s can't be work entry\n",
lua_typename(L, lua_type(L,-1)));
}
work->args.L = L;
if (top!=lua_gettop(L))
return luaL_error(L, "stack not balance in luv_work_cb, need %d but %d", top, lua_gettop(L));
return LUA_OK;
}

static lua_State* luv_work_acquire_vm()
{
lua_State* L = uv_key_get(&tls_vmkey);
if (L == NULL)
{
L = acquire_vm_cb();
uv_key_set(&tls_vmkey, L);
lua_pushboolean(L, 1);
lua_setglobal(L, "_THREAD");

uv_mutex_lock(&vm_mutex);
vms[idx_vms] = L;
idx_vms += 1;
uv_mutex_unlock(&vm_mutex);
}
return L;
}

static void luv_work_cb_wrapper(uv_work_t* req) {
luv_work_t* work = (luv_work_t*)req->data;
lua_State *L = work->args.L;
lua_State *L = luv_work_acquire_vm();
luv_ctx_t* lctx = luv_context(L);

// If exit is called on a thread in the thread pool, abort is called in
Expand All @@ -148,13 +172,6 @@ static void luv_after_work_cb(uv_work_t* req, int status) {
i = luv_thread_arg_push(L, &work->rets, LUVF_THREAD_SIDE_MAIN);
lctx->cb_pcall(L, i, 0, 0);

//cache lua_State to reuse
lua_rawgeti(L, LUA_REGISTRYINDEX, ctx->pool_ref);
i = lua_rawlen(L, -1);
*(lua_State**)lua_newuserdata(L, sizeof(lua_State*)) = work->args.L;
lua_rawseti(L, -2, i+1);
lua_pop(L, 1);

//ref down to ctx, up in luv_queue_work()
luaL_unref(L, LUA_REGISTRYINDEX, work->ref);
work->ref = LUA_NOREF;
Expand Down Expand Up @@ -192,47 +209,18 @@ static int luv_new_work(lua_State* L) {
luaL_getmetatable(L, "luv_work_ctx");
lua_setmetatable(L, -2);

lua_newtable(L);
ctx->pool_ref = luaL_ref(L, LUA_REGISTRYINDEX);

return 1;
}

static int luv_queue_work(lua_State* L) {
int top = lua_gettop(L);
luv_work_ctx_t* ctx = luv_check_work_ctx(L, 1);
luv_work_t* work = (luv_work_t*)malloc(sizeof(*work));
int ret, n;
int ret;

memset(work, 0, sizeof(*work));
//prepare lua_State for threadpool
lua_rawgeti(L, LUA_REGISTRYINDEX, ctx->pool_ref);
n = lua_rawlen(L, -1);
if (n > 0) {
int i;
lua_rawgeti(L, -1, 1);
work->args.L = *(lua_State **)lua_touserdata(L, -1);
lua_pop(L, 1);
for(i=1; i<n; i++) {
lua_rawgeti(L, -1, i+1);
lua_rawseti(L, -2, i);
}
lua_pushnil(L);
lua_rawseti(L, -2, n);
}
else
work->args.L = acquire_vm_cb();
lua_pop(L, 1);

ret = luv_thread_arg_set(L, &work->args, 2, top, LUVF_THREAD_SIDE_MAIN); //clear in sub threads,luv_work_cb
if (ret < 0) {
//cache lua_State to reuse
lua_rawgeti(L, LUA_REGISTRYINDEX, ctx->pool_ref);
int i = lua_rawlen(L, -1);
*(lua_State**)lua_newuserdata(L, sizeof(lua_State*)) = work->args.L;
lua_rawseti(L, -2, i+1);
lua_pop(L, 1);

luv_thread_arg_clear(L, &work->args, LUVF_THREAD_SIDE_MAIN);
free(work);
return luv_thread_arg_error(L);
Expand All @@ -241,13 +229,6 @@ static int luv_queue_work(lua_State* L) {
work->work.data = work;
ret = uv_queue_work(luv_loop(L), &work->work, luv_work_cb_wrapper, luv_after_work_cb);
if (ret < 0) {
//cache lua_State to reuse
lua_rawgeti(L, LUA_REGISTRYINDEX, ctx->pool_ref);
int i = lua_rawlen(L, -1);
*(lua_State**)lua_newuserdata(L, sizeof(lua_State*)) = work->args.L;
lua_rawseti(L, -2, i+1);
lua_pop(L, 1);

luv_thread_arg_clear(L, &work->args, LUVF_THREAD_SIDE_MAIN);
free(work);
return luv_error(L, ret);
Expand All @@ -266,6 +247,65 @@ static const luaL_Reg luv_work_ctx_methods[] = {
{NULL, NULL}
};

static void luv_key_init_once()
{
const char* val;
int status = uv_key_create(&tls_vmkey);
if (status != 0)
{
fprintf(stderr, "*** threadpool not works\n");
fprintf(stderr, "Error to uv_key_create with %s: %s\n",
uv_err_name(status), uv_strerror(status));
abort();
}
status = uv_mutex_init(&vm_mutex);
if (status != 0)
{
fprintf(stderr, "*** threadpool not works\n");
fprintf(stderr, "Error to uv_mutex_init with %s: %s\n",
uv_err_name(status), uv_strerror(status));
abort();
}

/* ref to https://github.com/libuv/libuv/blob/v1.x/src/threadpool.c init_threads */
nvms = ARRAY_SIZE(default_vms);
val = getenv("UV_THREADPOOL_SIZE");
if (val != NULL)
nvms = atoi(val);
if (nvms == 0)
nvms = 1;
if (nvms > MAX_THREADPOOL_SIZE)
nvms = MAX_THREADPOOL_SIZE;

vms = default_vms;
if (nvms > ARRAY_SIZE(default_vms)) {
vms = malloc(nvms * sizeof(vms[0]));
if (vms == NULL) {
nvms = ARRAY_SIZE(default_vms);
vms = default_vms;
}
memset(vms, 0, sizeof(vms[0]) * nvms);
}
idx_vms = 0;
}

static void luv_work_cleanup()
{
unsigned int i;

if (nvms == 0)
return;

for (i = 0; i < nvms && vms[i]; i++)
release_vm_cb(vms[i]);

if (vms != default_vms)
free(vms);

uv_mutex_destroy(&vm_mutex);
nvms = 0;
}

static void luv_work_init(lua_State* L) {
luaL_newmetatable(L, "luv_work_ctx");
lua_pushcfunction(L, luv_work_ctx_tostring);
Expand All @@ -275,4 +315,6 @@ static void luv_work_init(lua_State* L) {
luaL_newlib(L, luv_work_ctx_methods);
lua_setfield(L, -2, "__index");
lua_pop(L, 1);

uv_once(&once_vmkey, luv_key_init_once);
}