Merge 0.10->trunk
[prosody.git] / net / server_select.lua
index 85730e73c4d2d6c216d9ec062ab97926b6cf56a2..f70f81d02335fa06b44b6429fc1c4034e8e01f26 100644 (file)
@@ -31,7 +31,6 @@ local tostring = use "tostring"
 
 --// lua libs //--
 
-local os = use "os"
 local table = use "table"
 local string = use "string"
 local coroutine = use "coroutine"
@@ -41,6 +40,7 @@ local coroutine = use "coroutine"
 local math_min = math.min
 local math_huge = math.huge
 local table_concat = table.concat
+local table_insert = table.insert
 local string_sub = string.sub
 local coroutine_wrap = coroutine.wrap
 local coroutine_yield = coroutine.yield
@@ -56,7 +56,6 @@ local getaddrinfo = luasocket.dns.getaddrinfo
 
 local ssl_wrap = ( has_luasec and luasec.wrap )
 local socket_bind = luasocket.bind
-local socket_sleep = luasocket.sleep
 local socket_select = luasocket.select
 
 --// functions //--
@@ -101,7 +100,6 @@ local _sendtraffic
 local _readtraffic
 
 local _selecttimeout
-local _sleeptime
 local _tcpbacklog
 local _accepretry
 
@@ -115,8 +113,6 @@ local _checkinterval
 local _sendtimeout
 local _readtimeout
 
-local _timer
-
 local _maxselectlen
 local _maxfd
 
@@ -142,7 +138,6 @@ _sendtraffic = 0 -- some stats
 _readtraffic = 0
 
 _selecttimeout = 1 -- timeout of socket.select
-_sleeptime = 0 -- time to wait at the end of every loop
 _tcpbacklog = 128 -- some kind of hint to the OS
 _accepretry = 10 -- seconds to wait until the next attempt of a full server to accept
 
@@ -302,7 +297,6 @@ wrapconnection = function( server, listeners, socket, ip, serverport, clientport
        local bufferqueuelen = 0        -- end of buffer array
 
        local toclose
-       local fatalerror
        local needtls
 
        local bufferlen = 0
@@ -517,7 +511,6 @@ wrapconnection = function( server, listeners, socket, ip, serverport, clientport
                        return dispatch( handler, buffer, err )
                else    -- connections was closed or fatal error
                        out_put( "server.lua: client ", tostring(ip), ":", tostring(clientport), " read error: ", tostring(err) )
-                       fatalerror = true
                        _ = handler and handler:force_close( err )
                        return false
                end
@@ -557,7 +550,6 @@ wrapconnection = function( server, listeners, socket, ip, serverport, clientport
                        return true
                else    -- connection was closed during sending or fatal error
                        out_put( "server.lua: client ", tostring(ip), ":", tostring(clientport), " write error: ", tostring(err) )
-                       fatalerror = true
                        _ = handler and handler:force_close( err )
                        return false
                end
@@ -806,7 +798,6 @@ end
 getsettings = function( )
        return {
                select_timeout = _selecttimeout;
-               select_sleep_time = _sleeptime;
                tcp_backlog = _tcpbacklog;
                max_send_buffer_size = _maxsendlen;
                max_receive_buffer_size = _maxreadlen;
@@ -825,7 +816,6 @@ changesettings = function( new )
                return nil, "invalid settings table"
        end
        _selecttimeout = tonumber( new.select_timeout ) or _selecttimeout
-       _sleeptime = tonumber( new.select_sleep_time ) or _sleeptime
        _maxsendlen = tonumber( new.max_send_buffer_size ) or _maxsendlen
        _maxreadlen = tonumber( new.max_receive_buffer_size ) or _maxreadlen
        _checkinterval = tonumber( new.select_idle_check_interval ) or _checkinterval
@@ -848,6 +838,49 @@ addtimer = function( listener )
        return true
 end
 
+local add_task do
+       local data = {};
+       local new_data = {};
+
+       function add_task(delay, callback)
+               local current_time = luasocket_gettime();
+               delay = delay + current_time;
+               if delay >= current_time then
+                       table_insert(new_data, {delay, callback});
+               else
+                       local r = callback(current_time);
+                       if r and type(r) == "number" then
+                               return add_task(r, callback);
+                       end
+               end
+       end
+
+       addtimer(function(current_time)
+               if #new_data > 0 then
+                       for _, d in pairs(new_data) do
+                               table_insert(data, d);
+                       end
+                       new_data = {};
+               end
+
+               local next_time = math_huge;
+               for i, d in pairs(data) do
+                       local t, callback = d[1], d[2];
+                       if t <= current_time then
+                               data[i] = nil;
+                               local r = callback(current_time);
+                               if type(r) == "number" then
+                                       add_task(r, callback);
+                                       next_time = math_min(next_time, r);
+                               end
+                       else
+                               next_time = math_min(next_time, t - current_time);
+                       end
+               end
+               return next_time;
+       end);
+end
+
 stats = function( )
        return _readtraffic, _sendtraffic, _readlistlen, _sendlistlen, _timerlistlen
 end
@@ -861,8 +894,15 @@ end
 loop = function(once) -- this is the main loop of the program
        if quitting then return "quitting"; end
        if once then quitting = "once"; end
-       local next_timer_time = math_huge;
+       _currenttime = luasocket_gettime( )
        repeat
+               -- Fire timers
+       local next_timer_time = math_huge;
+               for i = 1, _timerlistlen do
+                       local t = _timerlist[ i ]( _currenttime ) -- fire timers
+                       if t then next_timer_time = math_min(next_timer_time, t); end
+               end
+
                local read, write, err = socket_select( _readlist, _sendlist, math_min(_selecttimeout, next_timer_time) )
                for _, socket in ipairs( write ) do -- send data waiting in writequeues
                        local handler = _socketlist[ socket ]
@@ -910,27 +950,12 @@ loop = function(once) -- this is the main loop of the program
                        end
                end
 
-               -- Fire timers
-               if _currenttime - _timer >= math_min(next_timer_time, 1) then
-                       next_timer_time = math_huge;
-                       for i = 1, _timerlistlen do
-                               local t = _timerlist[ i ]( _currenttime ) -- fire timers
-                               if t then next_timer_time = math_min(next_timer_time, t); end
-                       end
-                       _timer = _currenttime
-               else
-                       next_timer_time = next_timer_time - (_currenttime - _timer);
-               end
-
                for server, paused_time in pairs( _fullservers ) do
                        if _currenttime - paused_time > _accepretry then
                                _fullservers[ server ] = nil;
                                server.resume();
                        end
                end
-
-               -- wait some time (0 by default)
-               socket_sleep( _sleeptime )
        until quitting;
        if once and quitting == "once" then quitting = nil; return; end
        closeall();
@@ -977,16 +1002,14 @@ local addclient = function( address, port, listeners, pattern, sslctx, typ )
        elseif sslctx and not has_luasec then
                err = "luasec not found"
        end
-       if not typ then
+       if getaddrinfo and not typ then
                local addrinfo, err = getaddrinfo(address)
                if not addrinfo then return nil, err end
                if addrinfo[1] and addrinfo[1].family == "inet6" then
                        typ = "tcp6"
-               else
-                       typ = "tcp"
                end
        end
-       local create = luasocket[typ]
+       local create = luasocket[typ or "tcp"]
        if type( create ) ~= "function"  then
                err = "invalid socket type"
        end
@@ -1002,22 +1025,19 @@ local addclient = function( address, port, listeners, pattern, sslctx, typ )
        end
        client:settimeout( 0 )
        local ok, err = client:connect( address, port )
-       if ok or err == "timeout" then
+       if ok or err == "timeout" or err == "Operation already in progress" then
                return wrapclient( client, address, port, listeners, pattern, sslctx )
        else
                return nil, err
        end
 end
 
---// EXPERIMENTAL //--
-
 ----------------------------------// BEGIN //--
 
 use "setmetatable" ( _socketlist, { __mode = "k" } )
 use "setmetatable" ( _readtimes, { __mode = "k" } )
 use "setmetatable" ( _writetimes, { __mode = "k" } )
 
-_timer = luasocket_gettime( )
 _starttime = luasocket_gettime( )
 
 local function setlogger(new_logger)
@@ -1032,6 +1052,7 @@ end
 
 return {
        _addtimer = addtimer,
+       add_task = add_task;
 
        addclient = addclient,
        wrapclient = wrapclient,