net.server_event: Schedule another read callback if there is still data left in buffe...
[prosody.git] / net / server_event.lua
index 3cfa76f642b4333cf7f3899c180ec75502044ba2..2cb45553b9fdd552413602e66aea1a77ebf49b77 100644 (file)
@@ -30,6 +30,7 @@ local cfg = {
        WRITE_TIMEOUT         = 180,  -- timeout in seconds for write data on socket
        CONNECT_TIMEOUT       = 20,  -- timeout in seconds for connection attempts
        CLEAR_DELAY           = 5,  -- seconds to wait for clearing interface list (and calling ondisconnect listeners)
+       READ_RETRY_DELAY      = 1e-06, -- if, after reading, there is still data in buffer, wait this long and continue reading
        DEBUG                 = true,  -- show debug messages
 }
 
@@ -58,7 +59,7 @@ local log = require ("util.logger").init("socket")
 local function debug(...)
        return log("debug", ("%s "):rep(select('#', ...)), ...)
 end
-local vdebug = debug;
+-- local vdebug = debug;
 
 local bitor = ( function( ) -- thx Rici Lake
        local hasbit = function( x, p )
@@ -97,26 +98,26 @@ function interface_mt:_close()
        return self:_destroy();
 end
 
-function interface_mt:_start_connection(plainssl) -- should be called from addclient
-               local callback = function( event )
-                       if EV_TIMEOUT == event then  -- timeout during connection
-                               self.fatalerror = "connection timeout"
-                               self:ontimeout()  -- call timeout listener
-                               self:_close()
-                               debug( "new connection failed. id:", self.id, "error:", self.fatalerror )
-                       else
-                               if plainssl and has_luasec then  -- start ssl session
-                                       self:starttls(self._sslctx, true)
-                               else  -- normal connection
-                                       self:_start_session(true)
-                               end
-                               debug( "new connection established. id:", self.id )
+function interface_mt:_start_connection(plainssl) -- called from wrapclient
+       local callback = function( event )
+               if EV_TIMEOUT == event then  -- timeout during connection
+                       self.fatalerror = "connection timeout"
+                       self:ontimeout()  -- call timeout listener
+                       self:_close()
+                       debug( "new connection failed. id:", self.id, "error:", self.fatalerror )
+               else
+                       if plainssl and has_luasec then  -- start ssl session
+                               self:starttls(self._sslctx, true)
+                       else  -- normal connection
+                               self:_start_session(true)
                        end
-                       self.eventconnect = nil
-                       return -1
+                       debug( "new connection established. id:", self.id )
                end
-               self.eventconnect = addevent( base, self.conn, EV_WRITE, callback, cfg.CONNECT_TIMEOUT )
-               return true
+               self.eventconnect = nil
+               return -1
+       end
+       self.eventconnect = addevent( base, self.conn, EV_WRITE, callback, cfg.CONNECT_TIMEOUT )
+       return true
 end
 function interface_mt:_start_session(call_onconnect) -- new session, for example after startssl
        if self.type == "client" then
@@ -139,108 +140,107 @@ function interface_mt:_start_session(call_onconnect) -- new session, for example
        return true
 end
 function interface_mt:_start_ssl(call_onconnect) -- old socket will be destroyed, therefore we have to close read/write events first
-               --vdebug( "starting ssl session with client id:", self.id )
-               local _
-               _ = self.eventread and self.eventread:close( )  -- close events; this must be called outside of the event callbacks!
-               _ = self.eventwrite and self.eventwrite:close( )
-               self.eventread, self.eventwrite = nil, nil
-               local err
-               self.conn, err = ssl.wrap( self.conn, self._sslctx )
-               if err then
-                       self.fatalerror = err
-                       self.conn = nil  -- cannot be used anymore
-                       if call_onconnect then
-                               self.ondisconnect = nil  -- dont call this when client isnt really connected
-                       end
-                       self:_close()
-                       debug( "fatal error while ssl wrapping:", err )
-                       return false
+       --vdebug( "starting ssl session with client id:", self.id )
+       local _
+       _ = self.eventread and self.eventread:close( )  -- close events; this must be called outside of the event callbacks!
+       _ = self.eventwrite and self.eventwrite:close( )
+       self.eventread, self.eventwrite = nil, nil
+       local err
+       self.conn, err = ssl.wrap( self.conn, self._sslctx )
+       if err then
+               self.fatalerror = err
+               self.conn = nil  -- cannot be used anymore
+               if call_onconnect then
+                       self.ondisconnect = nil  -- dont call this when client isnt really connected
                end
-               self.conn:settimeout( 0 )  -- set non blocking
-               local handshakecallback = coroutine_wrap(
-                       function( event )
-                               local _, err
-                               local attempt = 0
-                               local maxattempt = cfg.MAX_HANDSHAKE_ATTEMPTS
-                               while attempt < maxattempt do  -- no endless loop
-                                       attempt = attempt + 1
-                                       debug( "ssl handshake of client with id:"..tostring(self)..", attempt:"..attempt )
-                                       if attempt > maxattempt then
-                                               self.fatalerror = "max handshake attempts exceeded"
-                                       elseif EV_TIMEOUT == event then
-                                               self.fatalerror = "timeout during handshake"
-                                       else
-                                               _, err = self.conn:dohandshake( )
-                                               if not err then
-                                                       self:_lock( false, false, false )  -- unlock the interface; sending, closing etc allowed
-                                                       self.send = self.conn.send  -- caching table lookups with new client object
-                                                       self.receive = self.conn.receive
-                                                       if not call_onconnect then  -- trigger listener
-                                                               self:onstatus("ssl-handshake-complete");
-                                                       end
-                                                       self:_start_session( call_onconnect )
-                                                       debug( "ssl handshake done" )
-                                                       self.eventhandshake = nil
-                                                       return -1
-                                               end
-                                               if err == "wantwrite" then
-                                                       event = EV_WRITE
-                                               elseif err == "wantread" then
-                                                       event = EV_READ
-                                               else
-                                                       debug( "ssl handshake error:", err )
-                                                       self.fatalerror = err
-                                               end
-                                       end
-                                       if self.fatalerror then
-                                               if call_onconnect then
-                                                       self.ondisconnect = nil  -- dont call this when client isnt really connected
-                                               end
-                                               self:_close()
-                                               debug( "handshake failed because:", self.fatalerror )
-                                               self.eventhandshake = nil
-                                               return -1
+               self:_close()
+               debug( "fatal error while ssl wrapping:", err )
+               return false
+       end
+       self.conn:settimeout( 0 )  -- set non blocking
+       local handshakecallback = coroutine_wrap(function( event )
+               local _, err
+               local attempt = 0
+               local maxattempt = cfg.MAX_HANDSHAKE_ATTEMPTS
+               while attempt < maxattempt do  -- no endless loop
+                       attempt = attempt + 1
+                       debug( "ssl handshake of client with id:"..tostring(self)..", attempt:"..attempt )
+                       if attempt > maxattempt then
+                               self.fatalerror = "max handshake attempts exceeded"
+                       elseif EV_TIMEOUT == event then
+                               self.fatalerror = "timeout during handshake"
+                       else
+                               _, err = self.conn:dohandshake( )
+                               if not err then
+                                       self:_lock( false, false, false )  -- unlock the interface; sending, closing etc allowed
+                                       self.send = self.conn.send  -- caching table lookups with new client object
+                                       self.receive = self.conn.receive
+                                       if not call_onconnect then  -- trigger listener
+                                               self:onstatus("ssl-handshake-complete");
                                        end
-                                       event = coroutine_yield( event, cfg.HANDSHAKE_TIMEOUT )  -- yield this monster...
+                                       self:_start_session( call_onconnect )
+                                       debug( "ssl handshake done" )
+                                       self.eventhandshake = nil
+                                       return -1
+                               end
+                               if err == "wantwrite" then
+                                       event = EV_WRITE
+                               elseif err == "wantread" then
+                                       event = EV_READ
+                               else
+                                       debug( "ssl handshake error:", err )
+                                       self.fatalerror = err
                                end
                        end
-               )
-               debug "starting handshake..."
-               self:_lock( false, true, true )  -- unlock read/write events, but keep interface locked
-               self.eventhandshake = addevent( base, self.conn, EV_READWRITE, handshakecallback, cfg.HANDSHAKE_TIMEOUT )
-               return true
+                       if self.fatalerror then
+                               if call_onconnect then
+                                       self.ondisconnect = nil  -- dont call this when client isnt really connected
+                               end
+                               self:_close()
+                               debug( "handshake failed because:", self.fatalerror )
+                               self.eventhandshake = nil
+                               return -1
+                       end
+                       event = coroutine_yield( event, cfg.HANDSHAKE_TIMEOUT )  -- yield this monster...
+               end
+       end
+       )
+       debug "starting handshake..."
+       self:_lock( false, true, true )  -- unlock read/write events, but keep interface locked
+       self.eventhandshake = addevent( base, self.conn, EV_READWRITE, handshakecallback, cfg.HANDSHAKE_TIMEOUT )
+       return true
 end
 function interface_mt:_destroy()  -- close this interface + events and call last listener
-               debug( "closing client with id:", self.id, self.fatalerror )
-               self:_lock( true, true, true )  -- first of all, lock the interface to avoid further actions
-               local _
-               _ = self.eventread and self.eventread:close( )
-               if self.type == "client" then
-                       _ = self.eventwrite and self.eventwrite:close( )
-                       _ = self.eventhandshake and self.eventhandshake:close( )
-                       _ = self.eventstarthandshake and self.eventstarthandshake:close( )
-                       _ = self.eventconnect and self.eventconnect:close( )
-                       _ = self.eventsession and self.eventsession:close( )
-                       _ = self.eventwritetimeout and self.eventwritetimeout:close( )
-                       _ = self.eventreadtimeout and self.eventreadtimeout:close( )
-                       _ = self.ondisconnect and self:ondisconnect( self.fatalerror ~= "client to close" and self.fatalerror)  -- call ondisconnect listener (wont be the case if handshake failed on connect)
-                       _ = self.conn and self.conn:close( ) -- close connection
-                       _ = self._server and self._server:counter(-1);
-                       self.eventread, self.eventwrite = nil, nil
-                       self.eventstarthandshake, self.eventhandshake, self.eventclose = nil, nil, nil
-                       self.readcallback, self.writecallback = nil, nil
-               else
-                       self.conn:close( )
-                       self.eventread, self.eventclose = nil, nil
-                       self.interface, self.readcallback = nil, nil
-               end
-               interfacelist[ self ] = nil
-               return true
+       debug( "closing client with id:", self.id, self.fatalerror )
+       self:_lock( true, true, true )  -- first of all, lock the interface to avoid further actions
+       local _
+       _ = self.eventread and self.eventread:close( )
+       if self.type == "client" then
+               _ = self.eventwrite and self.eventwrite:close( )
+               _ = self.eventhandshake and self.eventhandshake:close( )
+               _ = self.eventstarthandshake and self.eventstarthandshake:close( )
+               _ = self.eventconnect and self.eventconnect:close( )
+               _ = self.eventsession and self.eventsession:close( )
+               _ = self.eventwritetimeout and self.eventwritetimeout:close( )
+               _ = self.eventreadtimeout and self.eventreadtimeout:close( )
+               _ = self.ondisconnect and self:ondisconnect( self.fatalerror ~= "client to close" and self.fatalerror)  -- call ondisconnect listener (wont be the case if handshake failed on connect)
+               _ = self.conn and self.conn:close( ) -- close connection
+               _ = self._server and self._server:counter(-1);
+               self.eventread, self.eventwrite = nil, nil
+               self.eventstarthandshake, self.eventhandshake, self.eventclose = nil, nil, nil
+               self.readcallback, self.writecallback = nil, nil
+       else
+               self.conn:close( )
+               self.eventread, self.eventclose = nil, nil
+               self.interface, self.readcallback = nil, nil
+       end
+       interfacelist[ self ] = nil
+       return true
 end
 
 function interface_mt:_lock(nointerface, noreading, nowriting)  -- lock or unlock this interface or events
-               self.nointerface, self.noreading, self.nowriting = nointerface, noreading, nowriting
-               return nointerface, noreading, nowriting
+       self.nointerface, self.noreading, self.nowriting = nointerface, noreading, nowriting
+       return nointerface, noreading, nowriting
 end
 
 --TODO: Deprecate
@@ -258,8 +258,9 @@ end
 
 function interface_mt:resume()
        self:_lock(self.nointerface, false, self.nowriting);
-       if not self.eventread then
+       if self.readcallback and not self.eventread then
                self.eventread = addevent( base, self.conn, EV_READ, self.readcallback, cfg.READ_TIMEOUT );  -- register callback
+               return true;
        end
 end
 
@@ -391,7 +392,8 @@ function interface_mt:starttls(sslctx, call_onconnect)
        if not self.eventwrite then
                self:_lock( true, true, true )  -- lock the interface, to not disturb the handshake
                self.eventstarthandshake = addevent( base, nil, EV_TIMEOUT, self.startsslcallback, 0 )  -- add event to start handshake
-       else  -- wait until writebuffer is empty
+       else
+               -- wait until writebuffer is empty
                self:_lock( true, true, false )
                debug "ssl session delayed until writebuffer is empty..."
        end
@@ -408,10 +410,13 @@ end
 
 function interface_mt:setlistener(listener)
        self:ondetach(); -- Notify listener that it is no longer responsible for this connection
-       self.onconnect, self.ondisconnect, self.onincoming, self.ontimeout,
-       self.onreadtimeout, self.onstatus, self.ondetach
-               = listener.onconnect, listener.ondisconnect, listener.onincoming, listener.ontimeout,
-                 listener.onreadtimeout, listener.onstatus, listener.ondetach;
+       self.onconnect = listener.onconnect;
+       self.ondisconnect = listener.ondisconnect;
+       self.onincoming = listener.onincoming;
+       self.ontimeout = listener.ontimeout;
+       self.onreadtimeout = listener.onreadtimeout;
+       self.onstatus = listener.onstatus;
+       self.ondetach = listener.ondetach;
 end
 
 -- Stub handlers
@@ -514,8 +519,11 @@ local function handleclient( client, ip, port, server, pattern, listener, sslctx
                                elseif interface.startsslcallback then  -- start ssl connection if needed
                                        debug "starting ssl handshake after writing"
                                        interface.eventstarthandshake = addevent( base, nil, EV_TIMEOUT, interface.startsslcallback, 0 )
+                               elseif interface.writebufferlen ~= 0 then
+                                       -- data possibly written from ondrain
+                                       return EV_WRITE, cfg.WRITE_TIMEOUT
                                elseif interface.eventreadtimeout then
-                                       return EV_WRITE, EV_TIMEOUT
+                                       return EV_WRITE, cfg.WRITE_TIMEOUT
                                end
                                interface.eventwrite = nil
                                return -1
@@ -552,7 +560,7 @@ local function handleclient( client, ip, port, server, pattern, listener, sslctx
                        interface.eventread = nil
                        return -1
                end
-               if EV_TIMEOUT == event and interface:onreadtimeout() ~= true then
+               if EV_TIMEOUT == event and not interface.conn:dirty() and interface:onreadtimeout() ~= true then
                        return -1 -- took too long to get some data from client -> disconnect
                end
                if interface._usingssl then  -- handle luasec
@@ -581,10 +589,7 @@ local function handleclient( client, ip, port, server, pattern, listener, sslctx
                                        interface.eventwrite = addevent( base, interface.conn, EV_WRITE, interface.writecallback, cfg.WRITE_TIMEOUT )
                                end
                                interface.eventreadtimeout = addevent( base, nil, EV_TIMEOUT,
-                                       function( )
-                                               interface:_close()
-                                       end, cfg.READ_TIMEOUT
-                               )
+                                       function( ) interface:_close() end, cfg.READ_TIMEOUT)
                                debug( "wantwrite during read attempt, reg it in writecallback but dont know what really happens next..." )
                                -- to be honest i dont know what happens next, if it is allowed to first read, the write etc...
                        else  -- connection was closed or fatal error
@@ -601,6 +606,9 @@ local function handleclient( client, ip, port, server, pattern, listener, sslctx
                        interface.eventread = nil;
                        return -1;
                end
+               if interface.conn:dirty() then -- still data left in buffer
+                       return EV_TIMEOUT, cfg.READ_RETRY_DELAY;
+               end
                return EV_READ, cfg.READ_TIMEOUT
        end
 
@@ -615,6 +623,7 @@ local function handleserver( server, addr, port, pattern, listener, sslctx )  --
        local interface = {
                _connections = 0;
 
+               type = "server";
                conn = server;
                onconnect = listener.onconnect;  -- will be called when new client connected
                eventread = false;  -- read event handler
@@ -725,7 +734,6 @@ local function addclient( addr, serverport, listener, pattern, sslctx, typ )
        if res or ( err == "timeout" ) then
                local ip, port = client:getsockname( )
                local interface = wrapclient( client, ip, serverport, listener, pattern, sslctx )
-               interface:_start_connection( startssl )
                debug( "new connection id:", interface.id )
                return interface, err
        else
@@ -753,9 +761,9 @@ end
 
 local function setquitting(yes)
        if yes then
-                -- Quit now
-                closeallservers();
-                base:loopexit();
+               -- Quit now
+               closeallservers();
+               base:loopexit();
        end
 end
 
@@ -767,7 +775,7 @@ end
 -- being garbage-collected
 local signal_events = {}; -- [signal_num] -> event object
 local function hook_signal(signal_num, handler)
-       local function _handler(event)
+       local function _handler()
                local ret = handler();
                if ret ~= false then -- Continue handling this signal?
                        return EV_SIGNAL; -- Yes