Skip to content

Commit

Permalink
Improve socketchannel, try the next host in backup list when auth fai…
Browse files Browse the repository at this point in the history
…led. See issue #1145
  • Loading branch information
cloudwu committed Jan 10, 2020
1 parent d3a6b8d commit 3e5280c
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 106 deletions.
37 changes: 7 additions & 30 deletions lualib/skynet/db/mongo.lua
Original file line number Diff line number Diff line change
Expand Up @@ -112,38 +112,15 @@ local function mongo_auth(mongoc)
mongoc.__sock:changebackup(backup)
end
if rs_data.ismaster then
if rawget(mongoc, "__pickserver") then
rawset(mongoc, "__pickserver", nil)
end
return
elseif rs_data.primary then
local host, port = __parse_addr(rs_data.primary)
mongoc.host = host
mongoc.port = port
mongoc.__sock:changehost(host, port)
else
if rs_data.primary then
local host, port = __parse_addr(rs_data.primary)
mongoc.host = host
mongoc.port = port
mongoc.__sock:changehost(host, port)
else
skynet.error("WARNING: NO PRIMARY RETURN " .. rs_data.me)
-- determine the primary db using hosts
local pickserver = {}
if rawget(mongoc, "__pickserver") == nil then
for _, v in ipairs(rs_data.hosts) do
if v ~= rs_data.me then
table.insert(pickserver, v)
end
rawset(mongoc, "__pickserver", pickserver)
end
end
if #mongoc.__pickserver <= 0 then
error("CAN NOT DETERMINE THE PRIMARY DB")
end
skynet.error("INFO: TRY TO CONNECT " .. mongoc.__pickserver[1])
local host, port = __parse_addr(mongoc.__pickserver[1])
table.remove(mongoc.__pickserver, 1)
mongoc.host = host
mongoc.port = port
mongoc.__sock:changehost(host, port)
end
-- socketchannel would try the next host in backup list
error ("No primary return : " .. tostring(rs_data.me))
end
end
end
Expand Down
185 changes: 109 additions & 76 deletions lualib/skynet/socketchannel.lua
Original file line number Diff line number Diff line change
Expand Up @@ -201,27 +201,6 @@ local function dispatch_function(self)
end
end

local function connect_backup(self)
if self.__backup then
for _, addr in ipairs(self.__backup) do
local host, port
if type(addr) == "table" then
host, port = addr.host, addr.port
else
host = addr
port = self.__port
end
skynet.error("socket: connect to backup host", host, port)
local fd = socket.open(host, port)
if fd then
self.__host = host
self.__port = port
return fd
end
end
end
end

local function term_dispatch_thread(self)
if not self.__response and self.__dispatch_thread then
-- dispatch by order, send close signal to dispatch thread
Expand All @@ -233,78 +212,132 @@ local function connect_once(self)
if self.__closed then
return false
end
assert(not self.__sock and not self.__authcoroutine)
-- term current dispatch thread (send a signal)
term_dispatch_thread(self)

local fd,err = socket.open(self.__host, self.__port)
if not fd then
fd = connect_backup(self)
if not fd then
return false, err
local addr_list = {}
local addr_set = {}

local function _add_backup()
if self.__backup then
for _, addr in ipairs(self.__backup) do
local host, port
if type(addr) == "table" then
host,port = addr.host, addr.port
else
host = addr
port = self.__port
end

-- don't add the same host
local hostkey = host..":"..port
if not addr_set[hostkey] then
addr_set[hostkey] = true
table.insert(addr_list, { host = host, port = port })
end
end
end
end
if self.__nodelay then
socketdriver.nodelay(fd)

local function _next_addr()
local addr = table.remove(addr_list,1)
if addr then
skynet.error("socket: connect to backup host", addr.host, addr.port)
end
return addr
end

-- register overload warning
local function _connect_once(self, addr)
local fd,err = socket.open(addr.host, addr.port)
if not fd then
-- try next one
addr = _next_addr()
if addr == nil then
return false, err
end
return _connect_once(self, addr)
end

local overload = self.__overload_notify
if overload then
local function overload_trigger(id, size)
if id == self.__sock[1] then
if size == 0 then
if self.__overload then
self.__overload = false
overload(false)
end
else
if not self.__overload then
self.__overload = true
overload(true)
self.__host = addr.host
self.__port = addr.port

assert(not self.__sock and not self.__authcoroutine)
-- term current dispatch thread (send a signal)
term_dispatch_thread(self)

if self.__nodelay then
socketdriver.nodelay(fd)
end

-- register overload warning

local overload = self.__overload_notify
if overload then
local function overload_trigger(id, size)
if id == self.__sock[1] then
if size == 0 then
if self.__overload then
self.__overload = false
overload(false)
end
else
skynet.error(string.format("WARNING: %d K bytes need to send out (fd = %d %s:%s)", size, id, self.__host, self.__port))
if not self.__overload then
self.__overload = true
overload(true)
else
skynet.error(string.format("WARNING: %d K bytes need to send out (fd = %d %s:%s)", size, id, self.__host, self.__port))
end
end
end
end
end

skynet.fork(overload_trigger, fd, 0)
socket.warning(fd, overload_trigger)
end
skynet.fork(overload_trigger, fd, 0)
socket.warning(fd, overload_trigger)
end

while self.__dispatch_thread do
-- wait for dispatch thread exit
skynet.yield()
end
while self.__dispatch_thread do
-- wait for dispatch thread exit
skynet.yield()
end

self.__sock = setmetatable( {fd} , channel_socket_meta )
self.__dispatch_thread = skynet.fork(function()
pcall(dispatch_function(self), self)
-- clear dispatch_thread
self.__dispatch_thread = nil
end)

if self.__auth then
self.__authcoroutine = coroutine.running()
local ok , message = pcall(self.__auth, self)
if not ok then
close_channel_socket(self)
if message ~= socket_error then
self.__authcoroutine = false
skynet.error("socket: auth failed", message)
self.__sock = setmetatable( {fd} , channel_socket_meta )
self.__dispatch_thread = skynet.fork(function()
pcall(dispatch_function(self), self)
-- clear dispatch_thread
self.__dispatch_thread = nil
end)

if self.__auth then
self.__authcoroutine = coroutine.running()
local ok , message = pcall(self.__auth, self)
if not ok then
close_channel_socket(self)
if message ~= socket_error then
self.__authcoroutine = false
skynet.error("socket: auth failed", message)
end
end
self.__authcoroutine = false
if ok then
if not self.__sock then
-- auth may change host, so connect again
return connect_once(self)
end
-- auth succ, go through
else
-- auth failed, try next addr
_add_backup() -- auth may add new backup hosts
addr = _next_addr()
if addr == nil then
return false, "no more backup host"
end
return _connect_once(self, addr)
end
end
self.__authcoroutine = false
if ok and not self.__sock then
-- auth may change host, so connect again
return connect_once(self)
end
return ok

return true
end

return true
_add_backup()
return _connect_once(self, { host = self.__host, port = self.__port })
end

local function try_connect(self , once)
Expand Down

0 comments on commit 3e5280c

Please sign in to comment.