X-Git-Url: https://git.enpas.org/?a=blobdiff_plain;ds=sidebyside;f=net%2Fserver_select.lua;h=a0574f33eb2b67b3780829522898e9a425acdbcb;hb=fd40329f79c503efb5c79926964d272a08c83177;hp=4a36617cf2f34bfaea531621b3a4f0dcfed8f339;hpb=701fa72ca55909888b3fd428724128c654d8b543;p=prosody.git diff --git a/net/server_select.lua b/net/server_select.lua index 4a36617c..a0574f33 100644 --- a/net/server_select.lua +++ b/net/server_select.lua @@ -38,10 +38,10 @@ local coroutine = use "coroutine" --// lua lib methods //-- -local os_difftime = os.difftime local math_min = math.min local math_huge = math.huge local table_concat = table.concat +local table_insert = table.insert local string_sub = string.sub local coroutine_wrap = coroutine.wrap local coroutine_yield = coroutine.yield @@ -57,7 +57,6 @@ local getaddrinfo = luasocket.dns.getaddrinfo local ssl_wrap = ( has_luasec and luasec.wrap ) local socket_bind = luasocket.bind -local socket_sleep = luasocket.sleep local socket_select = luasocket.select --// functions //-- @@ -101,7 +100,6 @@ local _sendtraffic local _readtraffic local _selecttimeout -local _sleeptime local _tcpbacklog local _starttime @@ -114,8 +112,6 @@ local _checkinterval local _sendtimeout local _readtimeout -local _timer - local _maxselectlen local _maxfd @@ -140,7 +136,6 @@ _sendtraffic = 0 -- some stats _readtraffic = 0 _selecttimeout = 1 -- timeout of socket.select -_sleeptime = 0 -- time to wait at the end of every loop _tcpbacklog = 128 -- some kind of hint to the OS _maxsendlen = 51000 * 1024 -- max len of send buffer @@ -286,6 +281,7 @@ wrapconnection = function( server, listeners, socket, ip, serverport, clientport local disconnect = listeners.ondisconnect local drain = listeners.ondrain local onreadtimeout = listeners.onreadtimeout; + local detach = listeners.ondetach local bufferqueue = { } -- buffer array local bufferqueuelen = 0 -- end of buffer array @@ -317,11 +313,15 @@ wrapconnection = function( server, listeners, socket, ip, serverport, clientport handler.onreadtimeout = onreadtimeout; handler.setlistener = function( self, listeners ) + if detach then + detach(self) -- Notify listener that it is no longer responsible for this connection + end dispatch = listeners.onincoming disconnect = listeners.ondisconnect status = listeners.onstatus drain = listeners.ondrain handler.onreadtimeout = listeners.onreadtimeout + detach = listeners.ondetach end handler.getstats = function( ) return readtraffic, sendtraffic @@ -564,6 +564,9 @@ wrapconnection = function( server, listeners, socket, ip, serverport, clientport _ = status and status( handler, "ssl-handshake-complete" ) if self.autostart_ssl and listeners.onconnect then listeners.onconnect(self); + if bufferqueuelen ~= 0 then + _sendlistlen = addsocket(_sendlist, client, _sendlistlen) + end end _readlistlen = addsocket(_readlist, client, _readlistlen) return true @@ -710,6 +713,7 @@ local function link(sender, receiver, buffersize) sender:lock_read(true); end end + sender:set_mode("*a"); end ----------------------------------// PUBLIC //-- @@ -783,7 +787,6 @@ end getsettings = function( ) return { select_timeout = _selecttimeout; - select_sleep_time = _sleeptime; tcp_backlog = _tcpbacklog; max_send_buffer_size = _maxsendlen; max_receive_buffer_size = _maxreadlen; @@ -801,7 +804,6 @@ changesettings = function( new ) return nil, "invalid settings table" end _selecttimeout = tonumber( new.select_timeout ) or _selecttimeout - _sleeptime = tonumber( new.select_sleep_time ) or _sleeptime _maxsendlen = tonumber( new.max_send_buffer_size ) or _maxsendlen _maxreadlen = tonumber( new.max_receive_buffer_size ) or _maxreadlen _checkinterval = tonumber( new.select_idle_check_interval ) or _checkinterval @@ -823,6 +825,49 @@ addtimer = function( listener ) return true end +local add_task do + local data = {}; + local new_data = {}; + + function add_task(delay, callback) + local current_time = luasocket_gettime(); + delay = delay + current_time; + if delay >= current_time then + table_insert(new_data, {delay, callback}); + else + local r = callback(current_time); + if r and type(r) == "number" then + return add_task(r, callback); + end + end + end + + addtimer(function(current_time) + if #new_data > 0 then + for _, d in pairs(new_data) do + table_insert(data, d); + end + new_data = {}; + end + + local next_time = math_huge; + for i, d in pairs(data) do + local t, callback = d[1], d[2]; + if t <= current_time then + data[i] = nil; + local r = callback(current_time); + if type(r) == "number" then + add_task(r, callback); + next_time = math_min(next_time, r); + end + else + next_time = math_min(next_time, t - current_time); + end + end + return next_time; + end); +end + stats = function( ) return _readtraffic, _sendtraffic, _readlistlen, _sendlistlen, _timerlistlen end @@ -836,8 +881,15 @@ end loop = function(once) -- this is the main loop of the program if quitting then return "quitting"; end if once then quitting = "once"; end - local next_timer_time = math_huge; + _currenttime = luasocket_gettime( ) repeat + -- Fire timers + local next_timer_time = math_huge; + for i = 1, _timerlistlen do + local t = _timerlist[ i ]( _currenttime ) -- fire timers + if t then next_timer_time = math_min(next_timer_time, t); end + end + local read, write, err = socket_select( _readlist, _sendlist, math_min(_selecttimeout, next_timer_time) ) for i, socket in ipairs( write ) do -- send data waiting in writequeues local handler = _socketlist[ socket ] @@ -865,39 +917,25 @@ loop = function(once) -- this is the main loop of the program _currenttime = luasocket_gettime( ) -- Check for socket timeouts - local difftime = os_difftime( _currenttime - _starttime ) - if difftime > _checkinterval then + if _currenttime - _starttime > _checkinterval then _starttime = _currenttime for handler, timestamp in pairs( _writetimes ) do - if os_difftime( _currenttime - timestamp ) > _sendtimeout then + if _currenttime - timestamp > _sendtimeout then handler.disconnect( )( handler, "send timeout" ) handler:force_close() -- forced disconnect end end for handler, timestamp in pairs( _readtimes ) do - if os_difftime( _currenttime - timestamp ) > _readtimeout then + if _currenttime - timestamp > _readtimeout then if not(handler.onreadtimeout) or handler:onreadtimeout() ~= true then handler.disconnect( )( handler, "read timeout" ) handler:close( ) -- forced disconnect? + else + _readtimes[ handler ] = _currenttime -- reset timer end end end end - - -- Fire timers - if _currenttime - _timer >= math_min(next_timer_time, 1) then - next_timer_time = math_huge; - for i = 1, _timerlistlen do - local t = _timerlist[ i ]( _currenttime ) -- fire timers - if t then next_timer_time = math_min(next_timer_time, t); end - end - _timer = _currenttime - else - next_timer_time = next_timer_time - (_currenttime - _timer); - end - - -- wait some time (0 by default) - socket_sleep( _sleeptime ) until quitting; if once and quitting == "once" then quitting = nil; return; end return "quitting" @@ -966,7 +1004,7 @@ local addclient = function( address, port, listeners, pattern, sslctx, typ ) end client:settimeout( 0 ) local ok, err = client:connect( address, port ) - if ok or err == "timeout" then + if ok or err == "timeout" or err == "Operation already in progress" then return wrapclient( client, address, port, listeners, pattern, sslctx ) else return nil, err @@ -981,7 +1019,6 @@ use "setmetatable" ( _socketlist, { __mode = "k" } ) use "setmetatable" ( _readtimes, { __mode = "k" } ) use "setmetatable" ( _writetimes, { __mode = "k" } ) -_timer = luasocket_gettime( ) _starttime = luasocket_gettime( ) local function setlogger(new_logger) @@ -996,6 +1033,7 @@ end return { _addtimer = addtimer, + add_task = add_task; addclient = addclient, wrapclient = wrapclient,