X-Git-Url: https://git.enpas.org/?a=blobdiff_plain;f=net%2Fserver_event.lua;h=b34845d63869fdf551246e4a508a13a0cb53f597;hb=cb589397c575ebef8ccd68da699c409560339314;hp=43e70a0f7f135a0e1e7d4eb9915ad5ca7f18df2b;hpb=f672c8647e3ad1a808c412e8d37ee10fe5122ab2;p=prosody.git diff --git a/net/server_event.lua b/net/server_event.lua index 43e70a0f..b34845d6 100644 --- a/net/server_event.lua +++ b/net/server_event.lua @@ -6,7 +6,6 @@ notes: -- when using luaevent, never register 2 or more EV_READ at one socket, same for EV_WRITE -- you cant even register a new EV_READ/EV_WRITE callback inside another one - -- never call eventcallback:close( ) from inside eventcallback -- to do some of the above, use timeout events or something what will called from outside -- dont let garbagecollect eventcallbacks, as long they are running -- when using luasec, there are 4 cases of timeout errors: wantread or wantwrite during reading or writing @@ -20,8 +19,8 @@ local LAST_MODIFIED = "2009/11/20" local cfg = { MAX_CONNECTIONS = 100000, -- max per server connections (use "ulimit -n" on *nix) - MAX_HANDSHAKE_ATTEMPS = 1000, -- attempts to finish ssl handshake - HANDSHAKE_TIMEOUT = 60, -- timout in seconds per handshake attempt + MAX_HANDSHAKE_ATTEMPTS= 1000, -- attempts to finish ssl handshake + HANDSHAKE_TIMEOUT = 60, -- timeout in seconds per handshake attempt MAX_READ_LENGTH = 1024 * 1024 * 1024 * 1024, -- max bytes allowed to read from sockets MAX_SEND_LENGTH = 1024 * 1024 * 1024 * 1024, -- max bytes size of write buffer (for writing on sockets) ACCEPT_DELAY = 10, -- seconds to wait until the next attempt of a full server to accept @@ -33,8 +32,6 @@ local cfg = { } local function use(x) return rawget(_G, x); end -local print = use "print" -local pcall = use "pcall" local ipairs = use "ipairs" local string = use "string" local select = use "select" @@ -43,6 +40,9 @@ local tostring = use "tostring" local coroutine = use "coroutine" local setmetatable = use "setmetatable" +local t_insert = table.insert +local t_concat = table.concat + local ssl = use "ssl" local socket = use "socket" or require "socket" @@ -117,35 +117,28 @@ do local addevent = base.addevent local coroutine_wrap, coroutine_yield = coroutine.wrap,coroutine.yield - local string_len = string.len -- Private methods function interface_mt:_position(new_position) self.position = new_position or self.position return self.position; end - function interface_mt:_close() -- regs event to start self:_destroy() - local callback = function( ) - self:_destroy(); - self.eventclose = nil - return -1 - end - self.eventclose = addevent( base, nil, EV_TIMEOUT, callback, 0 ) - return true + function interface_mt:_close() + return self:_destroy(); end function interface_mt:_start_connection(plainssl) -- should be called from addclient local callback = function( event ) - if EV_TIMEOUT == event then -- timout during connection + if EV_TIMEOUT == event then -- timeout during connection self.fatalerror = "connection timeout" self:ontimeout() -- call timeout listener self:_close() debug( "new connection failed. id:", self.id, "error:", self.fatalerror ) else if plainssl and ssl then -- start ssl session - self:starttls() + self:starttls(self._sslctx, true) else -- normal connection - self:_start_session( self.listener.onconnect ) + self:_start_session(true) end debug( "new connection established. id:", self.id ) end @@ -155,13 +148,15 @@ do self.eventconnect = addevent( base, self.conn, EV_WRITE, callback, cfg.CONNECT_TIMEOUT ) return true end - function interface_mt:_start_session(onconnect) -- new session, for example after startssl + function interface_mt:_start_session(call_onconnect) -- new session, for example after startssl if self.type == "client" then local callback = function( ) self:_lock( false, false, false ) --vdebug( "start listening on client socket with id:", self.id ) self.eventread = addevent( base, self.conn, EV_READ, self.readcallback, cfg.READ_TIMEOUT ); -- register callback - self:onconnect() + if call_onconnect then + self:onconnect() + end self.eventsession = nil return -1 end @@ -173,7 +168,7 @@ do end return true end - function interface_mt:_start_ssl(arg) -- old socket will be destroyed, therefore we have to close read/write events first + function interface_mt:_start_ssl(call_onconnect) -- old socket will be destroyed, therefore we have to close read/write events first --vdebug( "starting ssl session with client id:", self.id ) local _ _ = self.eventread and self.eventread:close( ) -- close events; this must be called outside of the event callbacks! @@ -184,7 +179,7 @@ do if err then self.fatalerror = err self.conn = nil -- cannot be used anymore - if "onconnect" == arg then + if call_onconnect then self.ondisconnect = nil -- dont call this when client isnt really connected end self:_close() @@ -196,12 +191,12 @@ do function( event ) local _, err local attempt = 0 - local maxattempt = cfg.MAX_HANDSHAKE_ATTEMPS + local maxattempt = cfg.MAX_HANDSHAKE_ATTEMPTS while attempt < maxattempt do -- no endless loop attempt = attempt + 1 - debug( "ssl handshake of client with id:"..tostring(self).."attemp:"..attempt ) + debug( "ssl handshake of client with id:"..tostring(self)..", attempt:"..attempt ) if attempt > maxattempt then - self.fatalerror = "max handshake attemps exceeded" + self.fatalerror = "max handshake attempts exceeded" elseif EV_TIMEOUT == event then self.fatalerror = "timeout during handshake" else @@ -210,29 +205,25 @@ do self:_lock( false, false, false ) -- unlock the interface; sending, closing etc allowed self.send = self.conn.send -- caching table lookups with new client object self.receive = self.conn.receive - local onsomething - if "onconnect" == arg then -- trigger listener - onsomething = self.onconnect - else - onsomething = self.onsslconnection + if not call_onconnect then -- trigger listener + self:onstatus("ssl-handshake-complete"); end - self:_start_session( onsomething ) + self:_start_session( call_onconnect ) debug( "ssl handshake done" ) - self:onstatus("ssl-handshake-complete"); self.eventhandshake = nil return -1 end - debug( "error during ssl handshake:", err ) if err == "wantwrite" then event = EV_WRITE elseif err == "wantread" then event = EV_READ else + debug( "ssl handshake error:", err ) self.fatalerror = err end end if self.fatalerror then - if "onconnect" == arg then + if call_onconnect then self.ondisconnect = nil -- dont call this when client isnt really connected end self:_close() @@ -250,10 +241,10 @@ do return true end function interface_mt:_destroy() -- close this interface + events and call last listener - debug( "closing client with id:", self.id ) + debug( "closing client with id:", self.id, self.fatalerror ) self:_lock( true, true, true ) -- first of all, lock the interface to avoid further actions local _ - _ = self.eventread and self.eventread:close( ) -- close events; this must be called outside of the event callbacks! + _ = self.eventread and self.eventread:close( ) if self.type == "client" then _ = self.eventwrite and self.eventwrite:close( ) _ = self.eventhandshake and self.eventhandshake:close( ) @@ -263,7 +254,7 @@ do _ = self.eventwritetimeout and self.eventwritetimeout:close( ) _ = self.eventreadtimeout and self.eventreadtimeout:close( ) _ = self.ondisconnect and self:ondisconnect( self.fatalerror ~= "client to close" and self.fatalerror) -- call ondisconnect listener (wont be the case if handshake failed on connect) - _ = self.conn and self.conn:close( ) -- close connection, must also be called outside of any socket registered events! + _ = self.conn and self.conn:close( ) -- close connection _ = self._server and self._server:counter(-1); self.eventread, self.eventwrite = nil, nil self.eventstarthandshake, self.eventhandshake, self.eventclose = nil, nil, nil @@ -296,7 +287,10 @@ do end function interface_mt:resume() - return self:_lock(self.nointerface, false, self.nowriting); + self:_lock(self.nointerface, false, self.nowriting); + if not self.eventread then + self.eventread = addevent( base, self.conn, EV_READ, self.readcallback, cfg.READ_TIMEOUT ); -- register callback + end end function interface_mt:counter(c) @@ -311,14 +305,14 @@ do if self.nowriting then return nil, "locked" end --vdebug( "try to send data to client, id/data:", self.id, data ) data = tostring( data ) - local len = string_len( data ) + local len = #data local total = len + self.writebufferlen if total > cfg.MAX_SEND_LENGTH then -- check buffer length local err = "send buffer exceeded" debug( "error:", err ) -- to much, check your app return nil, err end - self.writebuffer = self.writebuffer .. data -- new buffer + t_insert(self.writebuffer, data) -- new buffer self.writebufferlen = total if not self.eventwrite then -- register new write event --vdebug( "register new write event" ) @@ -326,42 +320,33 @@ do end return true end - function interface_mt:close(now) + function interface_mt:close() if self.nointerface then return nil, "locked"; end debug( "try to close client connection with id:", self.id ) if self.type == "client" then self.fatalerror = "client to close" - if ( not self.eventwrite ) or now then -- try to close immediately - self:_lock( true, true, true ) - self:_close() - return true - else -- wait for incomplete write request + if self.eventwrite then -- wait for incomplete write request self:_lock( true, true, false ) debug "closing delayed until writebuffer is empty" return nil, "writebuffer not empty, waiting" + else -- close now + self:_lock( true, true, true ) + self:_close() + return true end else - debug( "try to close server with id:", self.id, "args:", now ) + debug( "try to close server with id:", tostring(self.id)) self.fatalerror = "server to close" self:_lock( true ) - local count = 0 - for _, item in ipairs( interfacelist( ) ) do - if ( item.type ~= "server" ) and ( item._server == self ) then -- client/server match - if item:close( now ) then -- writebuffer was empty - count = count + 1 - end - end - end - local timeout = 0 -- dont wait for unfinished writebuffers of clients... - if not now then - timeout = cfg.WRITE_TIMEOUT -- ...or wait for it - end - self:_close( timeout ) -- add new event to remove the server interface - debug( "seconds remained until server is closed:", timeout ) - return count -- returns finished clients with empty writebuffer + self:_close( 0 ) + return true end end + function interface_mt:socket() + return self.conn + end + function interface_mt:server() return self._server or self; end @@ -414,7 +399,7 @@ do -- No-op, we always use the underlying connection's send end - function interface_mt:starttls(sslctx) + function interface_mt:starttls(sslctx, call_onconnect) debug( "try to start ssl at client id:", self.id ) local err self._sslctx = sslctx; @@ -428,7 +413,7 @@ do self._usingssl = true self.startsslcallback = function( ) -- we have to start the handshake outside of a read/write event self.startsslcallback = nil - self:_start_ssl(); + self:_start_ssl(call_onconnect); self.eventstarthandshake = nil return -1 end @@ -457,7 +442,6 @@ do -- Stub handlers function interface_mt:onconnect() - return self:onincoming(nil); end function interface_mt:onincoming() end @@ -468,7 +452,6 @@ do function interface_mt:ondrain() end function interface_mt:onstatus() - debug("server.lua: Dummy onstatus()") end end @@ -477,18 +460,15 @@ end local handleclient; do local string_sub = string.sub -- caching table lookups - local string_len = string.len local addevent = base.addevent - local coroutine_wrap = coroutine.wrap local socket_gettime = socket.gettime - local coroutine_yield = coroutine.yield - function handleclient( client, ip, port, server, pattern, listener, _, sslctx ) -- creates an client interface + function handleclient( client, ip, port, server, pattern, listener, sslctx ) -- creates an client interface --vdebug("creating client interfacce...") local interface = { type = "client"; conn = client; currenttime = socket_gettime( ); -- safe the origin - writebuffer = ""; -- writebuffer + writebuffer = {}; -- writebuffer writebufferlen = 0; -- length of writebuffer send = client.send; -- caching table lookups receive = client.receive; @@ -542,10 +522,11 @@ do interface.eventwritetimeout = false end end - local succ, err, byte = interface.conn:send( interface.writebuffer, 1, interface.writebufferlen ) + interface.writebuffer = { t_concat(interface.writebuffer) } + local succ, err, byte = interface.conn:send( interface.writebuffer[1], 1, interface.writebufferlen ) --vdebug( "write data:", interface.writebuffer, "error:", err, "part:", byte ) if succ then -- writing succesful - interface.writebuffer = "" + interface.writebuffer[1] = nil interface.writebufferlen = 0 interface:ondrain(); if interface.fatalerror then @@ -561,7 +542,7 @@ do return -1 elseif byte and (err == "timeout" or err == "wantwrite") then -- want write again --vdebug( "writebuffer is not empty:", err ) - interface.writebuffer = string_sub( interface.writebuffer, byte + 1, interface.writebufferlen ) -- new buffer + interface.writebuffer[1] = string_sub( interface.writebuffer[1], byte + 1, interface.writebufferlen ) -- new buffer interface.writebufferlen = interface.writebufferlen - byte if "wantread" == err then -- happens only with luasec local callback = function( ) @@ -570,7 +551,7 @@ do return -1; end interface.eventwritetimeout = addevent( base, nil, EV_TIMEOUT, callback, cfg.WRITE_TIMEOUT ) -- reg a new timeout event - debug( "wantread during write attemp, reg it in readcallback but dont know what really happens next..." ) + debug( "wantread during write attempt, reg it in readcallback but dont know what really happens next..." ) -- hopefully this works with luasec; its simply not possible to use 2 different write events on a socket in luaevent return -1 end @@ -611,16 +592,14 @@ do end local buffer, err, part = interface.conn:receive( interface._pattern ) -- receive buffer with "pattern" --vdebug( "read data:", tostring(buffer), "error:", tostring(err), "part:", tostring(part) ) - buffer = buffer or part or "" - local len = string_len( buffer ) - if len > cfg.MAX_READ_LENGTH then -- check buffer length + buffer = buffer or part + if buffer and #buffer > cfg.MAX_READ_LENGTH then -- check buffer length interface.fatalerror = "receive buffer exceeded" debug( "fatal error:", interface.fatalerror ) interface:_close() interface.eventread = nil return -1 end - interface.onincoming( interface, buffer, err ) -- send new data to listener if err and ( err ~= "timeout" and err ~= "wantread" ) then if "wantwrite" == err then -- need to read on write event if not interface.eventwrite then -- register new write event if needed @@ -631,7 +610,7 @@ do interface:_close() end, cfg.READ_TIMEOUT ) - debug( "wantwrite during read attemp, reg it in writecallback but dont know what really happens next..." ) + debug( "wantwrite during read attempt, reg it in writecallback but dont know what really happens next..." ) -- to be honest i dont know what happens next, if it is allowed to first read, the write etc... else -- connection was closed or fatal error interface.fatalerror = err @@ -640,6 +619,12 @@ do interface.eventread = nil return -1 end + else + interface.onincoming( interface, buffer, err ) -- send new data to listener + end + if interface.noreading then + interface.eventread = nil; + return -1; end return EV_READ, cfg.READ_TIMEOUT end @@ -693,16 +678,16 @@ do if interface._connections >= cfg.MAX_CONNECTIONS then client:close( ) -- refuse connection debug( "maximal connections reached, refuse client connection; accept delay:", delay ) - return EV_TIMEOUT, delay -- delay for next accept attemp + return EV_TIMEOUT, delay -- delay for next accept attempt end local client_ip, client_port = client:getpeername( ) interface._connections = interface._connections + 1 -- increase connection count - local clientinterface = handleclient( client, client_ip, client_port, interface, pattern, listener, nil, sslctx ) + local clientinterface = handleclient( client, client_ip, client_port, interface, pattern, listener, sslctx ) --vdebug( "client id:", clientinterface, "startssl:", startssl ) if ssl and sslctx then - clientinterface:starttls(sslctx) + clientinterface:starttls(sslctx, true) else - clientinterface:_start_session( clientinterface.onconnect ) + clientinterface:_start_session( true ) end debug( "accepted incoming client connection from:", client_ip or "", client_port or "", "to", port or ""); @@ -724,7 +709,7 @@ local addserver = ( function( ) --vdebug( "creating new tcp server with following parameters:", addr or "nil", port or "nil", sslcfg or "nil", startssl or "nil") local server, err = socket.bind( addr, port, cfg.ACCEPT_QUEUE ) -- create server socket if not server then - debug( "creating server socket failed because:", err ) + debug( "creating server socket on "..addr.." port "..port.." failed:", err ) return nil, err end local sslctx @@ -747,10 +732,10 @@ end )( ) local addclient, wrapclient do - function wrapclient( client, ip, port, listeners, pattern, sslctx, startssl ) + function wrapclient( client, ip, port, listeners, pattern, sslctx ) local interface = handleclient( client, ip, port, nil, pattern, listeners, sslctx ) - interface:_start_session() - return interface + interface:_start_connection(sslctx) + return interface, client --function handleclient( client, ip, port, server, pattern, listener, _, sslctx ) -- creates an client interface end @@ -783,9 +768,6 @@ do local res, err = client:connect( addr, serverport ) -- connect if res or ( err == "timeout" ) then local ip, port = client:getsockname( ) - local server = function( ) - return nil, "this is a dummy server interface" - end local interface = wrapclient( client, ip, serverport, listener, pattern, sslctx, startssl ) interface:_start_connection( startssl ) debug( "new connection id:", interface.id ) @@ -826,14 +808,14 @@ local function setquitting(yes) end end -function get_backend() +local function get_backend() return base:method(); end -- We need to hold onto the events to stop them -- being garbage-collected local signal_events = {}; -- [signal_num] -> event object -function hook_signal(signal_num, handler) +local function hook_signal(signal_num, handler) local function _handler(event) local ret = handler(); if ret ~= false then -- Continue handling this signal? @@ -846,7 +828,6 @@ function hook_signal(signal_num, handler) end local function link(sender, receiver, buffersize) - sender:set_mode(buffersize); local sender_locked; function receiver:ondrain()