net.server_select: Remove some debugging code.
[prosody.git] / net / server_event.lua
index e79a57d0693a4e90b8d8a7eaf7619a12308daf5d..43e70a0f7f135a0e1e7d4eb9915ad5ca7f18df2b 100644 (file)
@@ -20,14 +20,14 @@ local LAST_MODIFIED         = "2009/11/20"
 
 local cfg = {
        MAX_CONNECTIONS       = 100000,  -- max per server connections (use "ulimit -n" on *nix)
-       MAX_HANDSHAKE_ATTEMPS = 10,  -- attemps to finish ssl handshake
-       HANDSHAKE_TIMEOUT     = 1,  -- timout in seconds per handshake attemp
+       MAX_HANDSHAKE_ATTEMPS = 1000,  -- attempts to finish ssl handshake
+       HANDSHAKE_TIMEOUT     = 60,  -- timout in seconds per handshake attempt
        MAX_READ_LENGTH       = 1024 * 1024 * 1024 * 1024,  -- max bytes allowed to read from sockets
        MAX_SEND_LENGTH       = 1024 * 1024 * 1024 * 1024,  -- max bytes size of write buffer (for writing on sockets)
-       ACCEPT_DELAY          = 10,  -- seconds to wait until the next attemp of a full server to accept
-       READ_TIMEOUT          = 60 * 30,  -- timeout in seconds for read data from socket
-       WRITE_TIMEOUT         = 30,  -- timeout in seconds for write data on socket
-       CONNECT_TIMEOUT       = 10,  -- timeout in seconds for connection attemps
+       ACCEPT_DELAY          = 10,  -- seconds to wait until the next attempt of a full server to accept
+       READ_TIMEOUT          = 60 * 60 * 6,  -- timeout in seconds for read data from socket
+       WRITE_TIMEOUT         = 180,  -- timeout in seconds for write data on socket
+       CONNECT_TIMEOUT       = 20,  -- timeout in seconds for connection attempts
        CLEAR_DELAY           = 5,  -- seconds to wait for clearing interface list (and calling ondisconnect listeners)
        DEBUG                 = true,  -- show debug messages
 }
@@ -160,7 +160,7 @@ do
                        local callback = function( )
                                self:_lock( false,  false, false )
                                --vdebug( "start listening on client socket with id:", self.id )
-                               self.eventread = addevent( base, self.conn, EV_READ, self.readcallback, cfg.READ_TIMEOUT )  -- register callback
+                               self.eventread = addevent( base, self.conn, EV_READ, self.readcallback, cfg.READ_TIMEOUT );  -- register callback
                                self:onconnect()
                                self.eventsession = nil
                                return -1
@@ -197,7 +197,7 @@ do
                                        local _, err
                                        local attempt = 0
                                        local maxattempt = cfg.MAX_HANDSHAKE_ATTEMPS
-                                       while attempt < 1000 do  -- no endless loop
+                                       while attempt < maxattempt do  -- no endless loop
                                                attempt = attempt + 1
                                                debug( "ssl handshake of client with id:"..tostring(self).."attemp:"..attempt )
                                                if attempt > maxattempt then
@@ -262,7 +262,7 @@ do
                                _ = self.eventsession and self.eventsession:close( )
                                _ = self.eventwritetimeout and self.eventwritetimeout:close( )
                                _ = self.eventreadtimeout and self.eventreadtimeout:close( )
-                               _ = self.ondisconnect and self:ondisconnect( self.fatalerror )  -- call ondisconnect listener (wont be the case if handshake failed on connect)
+                               _ = self.ondisconnect and self:ondisconnect( self.fatalerror ~= "client to close" and self.fatalerror)  -- call ondisconnect listener (wont be the case if handshake failed on connect)
                                _ = self.conn and self.conn:close( ) -- close connection, must also be called outside of any socket registered events!
                                _ = self._server and self._server:counter(-1);
                                self.eventread, self.eventwrite = nil, nil
@@ -281,6 +281,23 @@ do
                        self.nointerface, self.noreading, self.nowriting = nointerface, noreading, nowriting
                        return nointerface, noreading, nowriting
        end
+       
+       --TODO: Deprecate
+       function interface_mt:lock_read(switch)
+               if switch then
+                       return self:pause();
+               else
+                       return self:resume();
+               end
+       end
+
+       function interface_mt:pause()
+               return self:_lock(self.nointerface, true, self.nowriting);
+       end
+
+       function interface_mt:resume()
+               return self:_lock(self.nointerface, false, self.nowriting);
+       end
 
        function interface_mt:counter(c)
                if c then
@@ -385,6 +402,13 @@ do
                        self.starttls = false; -- prevent starttls()
                end
        end
+
+       function interface_mt:set_mode(pattern)
+               if pattern then
+                       self._pattern = pattern;
+               end
+               return self._pattern;
+       end
        
        function interface_mt:set_send(new_send)
                -- No-op, we always use the underlying connection's send
@@ -433,6 +457,7 @@ do
        
        -- Stub handlers
        function interface_mt:onconnect()
+               return self:onincoming(nil);
        end
        function interface_mt:onincoming()
        end
@@ -440,6 +465,8 @@ do
        end
        function interface_mt:ontimeout()
        end
+       function interface_mt:ondrain()
+       end
        function interface_mt:onstatus()
                debug("server.lua: Dummy onstatus()")
        end
@@ -520,6 +547,7 @@ do
                                if succ then  -- writing succesful
                                        interface.writebuffer = ""
                                        interface.writebufferlen = 0
+                                       interface:ondrain();
                                        if interface.fatalerror then
                                                debug "closing client after writing"
                                                interface:_close()  -- close interface if needed
@@ -531,7 +559,7 @@ do
                                        end
                                        interface.eventwrite = nil
                                        return -1
-                               elseif byte then  -- want write again
+                               elseif byte and (err == "timeout" or err == "wantwrite") then  -- want write again
                                        --vdebug( "writebuffer is not empty:", err )
                                        interface.writebuffer = string_sub( interface.writebuffer, byte + 1, interface.writebufferlen )  -- new buffer
                                        interface.writebufferlen = interface.writebufferlen - byte
@@ -539,7 +567,7 @@ do
                                                local callback = function( )
                                                        interface:_close()
                                                        interface.eventwritetimeout = nil
-                                                       return evreturn, evtimeout
+                                                       return -1;
                                                end
                                                interface.eventwritetimeout = addevent( base, nil, EV_TIMEOUT, callback, cfg.WRITE_TIMEOUT )  -- reg a new timeout event
                                                debug( "wantread during write attemp, reg it in readcallback but dont know what really happens next..." )
@@ -581,7 +609,7 @@ do
                                                interface.eventreadtimeout = nil
                                        end
                                end
-                               local buffer, err, part = interface.conn:receive( pattern )  -- receive buffer with "pattern"
+                               local buffer, err, part = interface.conn:receive( interface._pattern )  -- receive buffer with "pattern"
                                --vdebug( "read data:", tostring(buffer), "error:", tostring(err), "part:", tostring(part) )
                                buffer = buffer or part or ""
                                local len = string_len( buffer )
@@ -667,16 +695,17 @@ do
                                        debug( "maximal connections reached, refuse client connection; accept delay:", delay )
                                        return EV_TIMEOUT, delay  -- delay for next accept attemp
                                end
-                               local ip, port = client:getpeername( )
+                               local client_ip, client_port = client:getpeername( )
                                interface._connections = interface._connections + 1  -- increase connection count
-                               local clientinterface = handleclient( client, ip, port, interface, pattern, listener, nil, sslctx )
+                               local clientinterface = handleclient( client, client_ip, client_port, interface, pattern, listener, nil, sslctx )
                                --vdebug( "client id:", clientinterface, "startssl:", startssl )
                                if ssl and sslctx then
                                        clientinterface:starttls(sslctx)
                                else
                                        clientinterface:_start_session( clientinterface.onconnect )
                                end
-                               debug( "accepted incoming client connection from:", ip, port )
+                               debug( "accepted incoming client connection from:", client_ip or "<unknown IP>", client_port or "<unknown port>", "to", port or "<unknown port>");
+                               
                                client, err = server:accept()    -- try to accept again
                        end
                        return EV_READ
@@ -704,7 +733,7 @@ local addserver = ( function( )
                                debug "fatal error: luasec not found"
                                return nil, "luasec not found"
                        end
-                       sslctx, err = ssl.newcontext( sslcfg )
+                       sslctx, err = sslcfg
                        if err then
                                debug( "error while creating new ssl context for server socket:", err )
                                return nil, err
@@ -745,7 +774,7 @@ do
                                debug "need luasec, but not available"
                                return nil, "luasec not found"
                        end
-                       sslctx, err = ssl.newcontext( sslcfg )
+                       sslctx, err = sslcfg
                        if err then
                                debug( "cannot create new ssl context:", err )
                                return nil, err
@@ -757,7 +786,7 @@ do
                        local server = function( )
                                return nil, "this is a dummy server interface"
                        end
-                       local interface = wrapclient( client, ip, serverport, listeners, pattern, sslctx, startssl )
+                       local interface = wrapclient( client, ip, serverport, listener, pattern, sslctx, startssl )
                        interface:_start_connection( startssl )
                        debug( "new connection id:", interface.id )
                        return interface, err
@@ -816,11 +845,32 @@ function hook_signal(signal_num, handler)
        return signal_events[signal_num];
 end
 
+local function link(sender, receiver, buffersize)
+       sender:set_mode(buffersize);
+       local sender_locked;
+       
+       function receiver:ondrain()
+               if sender_locked then
+                       sender:resume();
+                       sender_locked = nil;
+               end
+       end
+       
+       function sender:onincoming(data)
+               receiver:write(data);
+               if receiver.writebufferlen >= buffersize then
+                       sender_locked = true;
+                       sender:pause();
+               end
+       end
+end
+
 return {
 
        cfg = cfg,
        base = base,
        loop = loop,
+       link = link,
        event = event,
        event_base = base,
        addevent = newevent,