From 8c805bd037257889416df02c9c4da2ae13671593 Mon Sep 17 00:00:00 2001 From: Kiritow <1362050620@qq.com> Date: Wed, 24 Jun 2020 18:47:46 +0800 Subject: [PATCH] LuaEngine v0.5 Better Thread and Channel inspired by Golang. Minor bug fix. --- Channel.cpp | 259 ++++++++++++++++++++++++++++++++++++++ LuaEngine.cpp | 9 ++ LuaEngine.h | 2 +- Thread.cpp | 336 ++++++++++++++++++-------------------------------- include.h | 1 + main.cpp | 17 ++- 6 files changed, 401 insertions(+), 223 deletions(-) create mode 100644 Channel.cpp diff --git a/Channel.cpp b/Channel.cpp new file mode 100644 index 0000000..8899753 --- /dev/null +++ b/Channel.cpp @@ -0,0 +1,259 @@ +#include "include.h" +#include +#include +#include +#include +#include +#include +using namespace std; + +class Channel +{ +public: + int capacity; + bool closed; + queue> bus; + mutex m; + condition_variable cond; + + Channel(int cap) : capacity(cap), closed(false) + { + + } +}; + +class LuaChannel +{ +public: + shared_ptr sp; +}; + +int channel_dtor(lua_State* L) +{ + auto c = lua_checkblock(L, 1, "LuaChannel"); + c->~LuaChannel(); + return 0; +} + +// Get a value from channel +int channel_get(lua_State* L) +{ + auto c = lua_checkblock(L, 1, "LuaChannel"); + int type; + string value; + + printf("Call get on LuaChannel %p\n", c); + + { + unique_lock ulk(c->sp->m); + if (c->sp->closed) + { + lua_pushnil(L); + lua_pushboolean(L, 1); + return 2; + } + c->sp->cond.wait(ulk, [&]() { return !c->sp->bus.empty(); }); + // Un-serialize from string to Lua object + + tie(type, value) = c->sp->bus.front(); + c->sp->bus.pop(); + c->sp->cond.notify_all(); + } + + switch (type) + { + case LUA_TNIL: + lua_pushnil(L); + break; + case LUA_TNUMBER: + lua_stringtonumber(L, value.c_str()); + break; + case LUA_TBOOLEAN: + lua_pushboolean(L, value.empty()); + break; + case LUA_TSTRING: + lua_pushlstring(L, value.c_str(), value.size()); + break; + default: + return luaL_error(L, "channel_get: unsupported type %s", lua_typename(L, type)); + } + + return 1; +} + +// Push a value to channel. Types are limited to nil, boolean, number, string. +int channel_put(lua_State* L) +{ + auto c = lua_checkblock(L, 1, "LuaChannel"); + int type = lua_type(L, 2); + string value; + switch (type) + { + case LUA_TNIL: + break; + case LUA_TNUMBER: + value = lua_tostring(L, 2); + break; + case LUA_TBOOLEAN: + if (lua_toboolean(L, 2)) + { + value = "true"; + } + break; + case LUA_TSTRING: + value = lua_tostring(L, 2); + break; + default: + return luaL_error(L, "channel_put: unsupported type %s", lua_typename(L, type)); + } + + printf("Call put on LuaChannel %p\n", c); + + { + unique_lock ulk(c->sp->m); + if (c->sp->closed) + { + return luaL_error(L, "channel_put: cannot put to closed channel."); + } + c->sp->cond.wait(ulk, [&]() { return (c->sp->capacity > 0 && c->sp->bus.size() < c->sp->capacity) || (c->sp->capacity == 0 && c->sp->bus.empty()); }); + c->sp->bus.emplace(type, value); + c->sp->cond.notify_all(); + } + + return 0; +} + +// This will push a LuaChannel userdata onto the lvm stack. +void put_channel(lua_State* L, const shared_ptr& spChan) +{ + auto c = new (lua_newblock(L)) LuaChannel; + c->sp = spChan; + printf("LuaChannel %p pushed, underlying channel: %p\n", c, c->sp.get()); + + if (luaL_newmetatable(L, "LuaChannel")) + { + lua_setfield_function(L, "__gc", channel_dtor); + lua_newtable(L); + lua_setfield_function(L, "get", channel_get); + lua_setfield_function(L, "put", channel_put); + lua_setfield(L, -2, "__index"); + } + lua_setmetatable(L, -2); +} + +// Global Variables for managing LuaChannel openning and creating +mutex gChannelNSLock; +map> gChannelNS; + +int channel_open(lua_State* L) +{ + size_t namelen; + const char* name = luaL_checklstring(L, 1, &namelen); + string channelName(name, namelen); + + { + unique_lock ulk(gChannelNSLock); + auto iter = gChannelNS.find(channelName); + if (iter == gChannelNS.end()) + { + // not found. + lua_pushnil(L); + return 1; + } + else + { + auto spChan = iter->second.lock(); + if (spChan) + { + printf("channel %p(%s) opened.\n", spChan.get(), iter->first.c_str()); + put_channel(L, spChan); + return 1; + } + else + { + printf("channel %s expired.\n", iter->first.c_str()); + gChannelNS.erase(iter); + // found but expired. + lua_pushnil(L); + return 1; + } + } + } +} + +int channel_create(lua_State* L) +{ + size_t namelen; + const char* name = luaL_checklstring(L, 1, &namelen); + string channelName(name, namelen); + int cap = 0; + if (lua_isinteger(L, 2)) + { + cap = lua_tointeger(L, 2); + } + + { + unique_lock ulk(gChannelNSLock); + auto iter = gChannelNS.find(channelName); + if (iter != gChannelNS.end()) + { + auto spChan = iter->second.lock(); + if (spChan) + { + // found. + printf("channel %p(%s) skip create and opened.\n", spChan.get(), iter->first.c_str()); + + put_channel(L, spChan); + lua_pushboolean(L, 0); + return 2; + } + else + { + // found but expired. + printf("channel %s expired.\n", iter->first.c_str()); + gChannelNS.erase(iter); + } + } + + // not found, create one + shared_ptr spChan(new Channel(cap)); + put_channel(L, spChan); + lua_pushboolean(L, 1); + gChannelNS.emplace(channelName, spChan); + + printf("channel %p(%s) created.\n", spChan.get(), channelName.c_str()); + + return 2; + } +} + +int channel_collect(lua_State* L) +{ + unique_lock ulk(gChannelNSLock); + for (auto iter = gChannelNS.begin(); iter != gChannelNS.end(); ) + { + auto sp = iter->second.lock(); + if (!sp) + { + // expired. + iter = gChannelNS.erase(iter); + } + else + { + ++iter; + } + } + return 0; +} + +void InitChannel(lua_State* L) +{ + lua_getglobal(L, "package"); + lua_getfield(L, -1, "loaded"); + lua_newtable(L); + lua_setfield_function(L, "create", channel_create); + lua_setfield_function(L, "open", channel_open); + lua_setfield_function(L, "collect", channel_collect); + lua_setfield(L, -2, "Channel"); + lua_pop(L, 2); +} diff --git a/LuaEngine.cpp b/LuaEngine.cpp index 8cb45e3..a7fd300 100644 --- a/LuaEngine.cpp +++ b/LuaEngine.cpp @@ -15,6 +15,15 @@ void InitLuaEngine(lua_State* L) InitNetwork(L); InitSocketSelector(L); InitThread(L); + InitChannel(L); +} + +lua_State* CreateLuaEngine() +{ + lua_State* L = luaL_newstate(); + luaL_openlibs(L); + InitLuaEngine(L); + return L; } void InitEngine() diff --git a/LuaEngine.h b/LuaEngine.h index df2fa79..8889217 100644 --- a/LuaEngine.h +++ b/LuaEngine.h @@ -11,5 +11,5 @@ void InitEngine(); void InitLuaEngine(lua_State* L); - +lua_State* CreateLuaEngine(); void CloseEngine(); diff --git a/Thread.cpp b/Thread.cpp index 80f3efb..b496a3d 100644 --- a/Thread.cpp +++ b/Thread.cpp @@ -1,271 +1,175 @@ #include "include.h" #include -#include -#include -#include +#include +#include using namespace std; -class SubLuaVM +class LuaThread { +private: + void _WorkerEntry() + { + status = 1; + if (lua_pcall(L, nArgs, LUA_MULTRET, 0)) + { + status = 3; + } + else + { + status = 2; + } + } public: lua_State* L; - unique_ptr td; - mutex mLock; - condition_variable cond; - string inBuffer; - string outBuffer; - int status; // -1 Not started 0 Running 1 Exited 2 Exited with exception + int nArgs; + // 0 Not started 1 Running 2 Finished 3 Errored + atomic status; + thread td; - SubLuaVM() + LuaThread(lua_State* subLVM, int subArgs) : L(subLVM), nArgs(subArgs), status(0), td(&LuaThread::_WorkerEntry, this) { - status = -1; - L = luaL_newstate(); - luaL_openlibs(L); - InitTCPSocket(L); + } - ~SubLuaVM() + ~LuaThread() { - if (td && td->joinable()) td->join(); - if(L) lua_close(L); + if (td.joinable()) + { + td.join(); + } + lua_close(L); } }; -static void SubLuaVMEntry(SubLuaVM* vm) -{ - { - unique_lock ulk(vm->mLock); - if (luaL_loadstring(vm->L, vm->inBuffer.c_str())) - { - vm->outBuffer = lua_tostring(vm->L, -1); - vm->status = 2; - vm->cond.notify_all(); - return; - } - vm->inBuffer.clear(); - } - - if(lua_pcall(vm->L, 0, LUA_MULTRET, 0)) - { - unique_lock ulk(vm->mLock); - vm->outBuffer = lua_tostring(vm->L, -1); - vm->status = 2; - vm->cond.notify_all(); - return; - } - else - { - vm->status = 1; - vm->cond.notify_all(); - } -} - int thread_dtor(lua_State* L) { - auto vm = lua_checkblock(L, 1, "LuaEngineThread"); - vm->~SubLuaVM(); + auto t = lua_checkblock(L, 1, "LuaThread"); + t->~LuaThread(); return 0; } -int thread_put(lua_State* L) +int thread_wait(lua_State* L) { - auto vm = lua_checkblock(L, 1, "LuaEngineThread"); - size_t datasz; - const char* rawdata = luaL_checklstring(L, 2, &datasz); - string data(rawdata, datasz); - - int timeout_ms = -1; - if (!lua_isnone(L, 3)) + auto t = lua_checkblock(L, 1, "LuaThread"); + if (t->td.joinable()) { - timeout_ms = luaL_checkinteger(L, 3); - } - if (timeout_ms >= 0) - { - unique_lock ulk(vm->mLock); - while (!vm->inBuffer.empty()) - { - auto ret = vm->cond.wait_for(ulk, chrono::milliseconds(timeout_ms)); - if (ret == cv_status::timeout) - { - lua_pushboolean(L, false); - return 1; - } - else continue; - } - vm->inBuffer = data; - lua_pushboolean(L, true); - return 1; + t->td.join(); + lua_pushboolean(L, 1); } else { - unique_lock ulk(vm->mLock); - while (!vm->inBuffer.empty()) - { - vm->cond.wait(ulk); - } - vm->inBuffer = data; - return 0; + lua_pushboolean(L, 0); } + return 1; } int thread_get(lua_State* L) { - auto vm = lua_checkblock(L, 1, "LuaEngineThread"); - int timeout_ms = -1; - if (!lua_isnone(L, 2)) + auto t = lua_checkblock(L, 1, "LuaThread"); + if (t->td.joinable()) { - timeout_ms = luaL_checkinteger(L, 2); + t->td.join(); } - if (timeout_ms >= 0) + + if (t->status == 2) { - unique_lock ulk(vm->mLock); - while (vm->outBuffer.empty()) - { - auto ret = vm->cond.wait_for(ulk, chrono::milliseconds(timeout_ms)); - if (ret == cv_status::timeout) - { - lua_pushboolean(L, false); - return 1; - } - else continue; - } - string temp = vm->outBuffer; - vm->outBuffer.clear(); - lua_pushboolean(L, true); - lua_pushlstring(L, temp.data(), temp.size()); - return 2; + lua_pushboolean(L, 1); } else { - unique_lock ulk(vm->mLock); - while (vm->outBuffer.empty()) - { - vm->cond.wait(ulk); - } - string temp = vm->outBuffer; - vm->outBuffer.clear(); - lua_pushlstring(L, temp.data(), temp.size()); - return 1; + lua_pushboolean(L, 0); } + + int stackTop = lua_gettop(t->L); + for (int i = 1; i <= stackTop; i++) + { + switch (lua_type(t->L, i)) + { + case LUA_TNIL: + lua_pushnil(L); + break; + case LUA_TNUMBER: + lua_pushnumber(L, lua_tonumber(t->L, i)); + break; + case LUA_TBOOLEAN: + lua_pushboolean(L, lua_toboolean(t->L, i)); + break; + case LUA_TSTRING: + lua_pushstring(L, lua_tostring(t->L, i)); + break; + default: + return luaL_error(L, "thread_get: return value #%d has unsupported type: %s", i, lua_typename(t->L, lua_type(t->L, i))); + } + } + + return stackTop + 1; } -int thread_inner_get(lua_State* L) +int thread_status(lua_State* L) { - auto vm = lua_checkpointer(L, 1, "LuaEngineThreadWorker"); - int timeout_ms = -1; - if (!lua_isnone(L, 2)) - { - timeout_ms = luaL_checkinteger(L, 2); - } - if (timeout_ms >= 0) - { - unique_lock ulk(vm->mLock); - while (vm->inBuffer.empty()) - { - auto ret = vm->cond.wait_for(ulk, chrono::milliseconds(timeout_ms)); - if (ret == cv_status::timeout) - { - lua_pushboolean(L, false); - return 1; - } - else continue; - } - string temp = vm->inBuffer; - vm->inBuffer.clear(); - lua_pushboolean(L, true); - lua_pushlstring(L, temp.data(), temp.size()); - return 2; - } - else - { - unique_lock ulk(vm->mLock); - while (vm->inBuffer.empty()) - { - vm->cond.wait(ulk); - } - string temp = vm->inBuffer; - vm->inBuffer.clear(); - lua_pushlstring(L, temp.data(), temp.size()); - return 1; - } -} - -int thread_inner_put(lua_State* L) -{ - auto vm = lua_checkblock(L, 1, "LuaEngineThreadWorker"); - size_t datasz; - const char* rawdata = luaL_checklstring(L, 2, &datasz); - string data(rawdata, datasz); - - int timeout_ms = -1; - if (!lua_isnone(L, 3)) - { - timeout_ms = luaL_checkinteger(L, 3); - } - if (timeout_ms >= 0) - { - unique_lock ulk(vm->mLock); - while (!vm->outBuffer.empty()) - { - auto ret = vm->cond.wait_for(ulk, chrono::milliseconds(timeout_ms)); - if (ret == cv_status::timeout) - { - lua_pushboolean(L, false); - return 1; - } - else continue; - } - vm->outBuffer = data; - lua_pushboolean(L, true); - return 1; - } - else - { - unique_lock ulk(vm->mLock); - while (!vm->outBuffer.empty()) - { - vm->cond.wait(ulk); - } - vm->outBuffer = data; - return 0; - } + auto t = lua_checkblock(L, 1, "LuaThread"); + lua_pushinteger(L, t->status); + return 1; } int thread_new(lua_State* L) { - size_t codesz; - const char* rawcode = luaL_checklstring(L, 1, &codesz); - string code(rawcode, codesz); + size_t codelen; + const char* code = luaL_checklstring(L, 1, &codelen); + + lua_State* subL = CreateLuaEngine(); + + // compile + if (luaL_loadbuffer(subL, code, codelen, "ThreadMain")) + { + // Compile error, cannot load. Return error message to caller. + lua_pushnil(L); + lua_pushstring(L, lua_tostring(subL, 1)); + lua_close(subL); + return 2; + } - // 在主端这一侧添加全部内容, 包扩gc. - auto vm = new (lua_newblock(L)) SubLuaVM; - if (luaL_newmetatable(L, "LuaEngineThread")) + // push args + int stackTop = lua_gettop(L); + for (int idx = 2; idx <= stackTop; idx++) + { + switch (lua_type(L, idx)) + { + case LUA_TNIL: + lua_pushnil(subL); + break; + case LUA_TNUMBER: + lua_pushnumber(subL, lua_tonumber(L, idx)); + break; + case LUA_TBOOLEAN: + lua_pushboolean(subL, lua_toboolean(L, idx)); + break; + case LUA_TSTRING: + { + size_t datalen; + const char* data = lua_tolstring(L, idx, &datalen); + lua_pushlstring(subL, data, datalen); + break; + } + default: + lua_pushnil(L); + lua_pushfstring(L, "thread_create: parameter #%d has unsupported type: %s", idx, lua_typename(L, lua_type(L, idx))); + lua_close(subL); + return 2; + } + } + + auto c = new (lua_newblock(L)) LuaThread(subL, stackTop - 1); + if (luaL_newmetatable(L, "LuaThread")) { lua_setfield_function(L, "__gc", thread_dtor); lua_newtable(L); + lua_setfield_function(L, "wait", thread_wait); lua_setfield_function(L, "get", thread_get); - lua_setfield_function(L, "put", thread_put); + lua_setfield_function(L, "status", thread_status); lua_setfield(L, -2, "__index"); } lua_setmetatable(L, -2); - - // 在从端这一侧添加this_thread, 只有get和set. - lua_getglobal(vm->L, "package"); - lua_getfield(vm->L, -1, "loaded"); - lua_newpointer(vm->L, vm); - if (luaL_newmetatable(vm->L, "LuaEngineThreadWorker")) - { - lua_newtable(vm->L); - lua_setfield_function(vm->L, "get", thread_inner_get); - lua_setfield_function(vm->L, "put", thread_inner_put); - lua_setfield(vm->L, -2, "__index"); - } - lua_setmetatable(vm->L, -2); - lua_setfield(vm->L, -2, "this_thread"); - lua_pop(vm->L, 2); - - vm->inBuffer = code; - vm->td.reset(new thread(SubLuaVMEntry, vm)); return 1; } diff --git a/include.h b/include.h index 3fb46bc..c071042 100644 --- a/include.h +++ b/include.h @@ -54,6 +54,7 @@ void InitTCPSocket(lua_State* L); void InitUDPSocket(lua_State* L); void InitNetwork(lua_State* L); void InitThread(lua_State* L); +void InitChannel(lua_State* L); void InitSocketSelector(lua_State* L); void PlatInit(); diff --git a/main.cpp b/main.cpp index e1c27ff..b660a28 100644 --- a/main.cpp +++ b/main.cpp @@ -8,7 +8,7 @@ string LoadFile(const string& filename) { string temp; int ret; - char buff[1024]; + char buff[1024] = { 0 }; SDL_RWops* io = SDL_RWFromFile(filename.c_str(), "rb"); if (!io) { @@ -18,6 +18,7 @@ string LoadFile(const string& filename) while ((ret=SDL_RWread(io, buff, 1, 1024))) { temp.append(buff, ret); + memset(buff, 0, 1024); } SDL_RWclose(io); return temp; @@ -31,15 +32,19 @@ int main() InitLuaEngine(L); _chdir("game"); string code = LoadFile("app.lua"); - if (luaL_loadstring(L, code.c_str())) + if (luaL_loadbufferx(L, code.c_str(), code.size(), "ProgramMain", "t")) { - cout << lua_tostring(L, -1) << endl; - SDL_Log("[LuaAppSyntaxError] %s\n", lua_tostring(L, -1)); + size_t errlen; + const char* err = lua_tolstring(L, -1, &errlen); + string errmsg(err, errlen); + SDL_LogError(SDL_LOG_CATEGORY_APPLICATION, "[LuaAppSyntaxError] %s", errmsg.c_str()); } else if(lua_pcall(L, 0, LUA_MULTRET, 0)) { - cout << lua_tostring(L, -1) << endl; - SDL_Log("[LuaAppRuntimeError] %s\n", lua_tostring(L, -1)); + size_t errlen; + const char* err = lua_tolstring(L, -1, &errlen); + string errmsg(err, errlen); + SDL_LogError(SDL_LOG_CATEGORY_APPLICATION, "[LuaAppRuntimeError] %s", errmsg.c_str()); } lua_close(L); CloseEngine();