X-Git-Url: https://git.enpas.org/?a=blobdiff_plain;f=net%2Fserver_event.lua;h=e320b15c3b759539e4d8af00dcd3aacb06dc9f89;hb=84c934632367eac822fb6af2663032b67fd0a596;hp=8d6f5597b945d91eaed1d742afdcc58caf0528b0;hpb=2dd2707cf32ebe48ac5af9fcabd1dfd9e4d216c8;p=prosody.git diff --git a/net/server_event.lua b/net/server_event.lua index 8d6f5597..e320b15c 100644 --- a/net/server_event.lua +++ b/net/server_event.lua @@ -6,7 +6,6 @@ notes: -- when using luaevent, never register 2 or more EV_READ at one socket, same for EV_WRITE -- you cant even register a new EV_READ/EV_WRITE callback inside another one - -- never call eventcallback:close( ) from inside eventcallback -- to do some of the above, use timeout events or something what will called from outside -- dont let garbagecollect eventcallbacks, as long they are running -- when using luasec, there are 4 cases of timeout errors: wantread or wantwrite during reading or writing @@ -24,6 +23,7 @@ local cfg = { HANDSHAKE_TIMEOUT = 60, -- timeout in seconds per handshake attempt MAX_READ_LENGTH = 1024 * 1024 * 1024 * 1024, -- max bytes allowed to read from sockets MAX_SEND_LENGTH = 1024 * 1024 * 1024 * 1024, -- max bytes size of write buffer (for writing on sockets) + ACCEPT_QUEUE = 128, -- might influence the length of the pending sockets queue ACCEPT_DELAY = 10, -- seconds to wait until the next attempt of a full server to accept READ_TIMEOUT = 60 * 60 * 6, -- timeout in seconds for read data from socket WRITE_TIMEOUT = 180, -- timeout in seconds for write data on socket @@ -33,8 +33,6 @@ local cfg = { } local function use(x) return rawget(_G, x); end -local print = use "print" -local pcall = use "pcall" local ipairs = use "ipairs" local string = use "string" local select = use "select" @@ -43,6 +41,9 @@ local tostring = use "tostring" local coroutine = use "coroutine" local setmetatable = use "setmetatable" +local t_insert = table.insert +local t_concat = table.concat + local ssl = use "ssl" local socket = use "socket" or require "socket" @@ -117,21 +118,14 @@ do local addevent = base.addevent local coroutine_wrap, coroutine_yield = coroutine.wrap,coroutine.yield - local string_len = string.len -- Private methods function interface_mt:_position(new_position) self.position = new_position or self.position return self.position; end - function interface_mt:_close() -- regs event to start self:_destroy() - local callback = function( ) - self:_destroy(); - self.eventclose = nil - return -1 - end - self.eventclose = addevent( base, nil, EV_TIMEOUT, callback, 0 ) - return true + function interface_mt:_close() + return self:_destroy(); end function interface_mt:_start_connection(plainssl) -- should be called from addclient @@ -143,7 +137,7 @@ do debug( "new connection failed. id:", self.id, "error:", self.fatalerror ) else if plainssl and ssl then -- start ssl session - self:starttls(nil, true) + self:starttls(self._sslctx, true) else -- normal connection self:_start_session(true) end @@ -212,7 +206,6 @@ do self:_lock( false, false, false ) -- unlock the interface; sending, closing etc allowed self.send = self.conn.send -- caching table lookups with new client object self.receive = self.conn.receive - local onsomething if not call_onconnect then -- trigger listener self:onstatus("ssl-handshake-complete"); end @@ -249,10 +242,10 @@ do return true end function interface_mt:_destroy() -- close this interface + events and call last listener - debug( "closing client with id:", self.id ) + debug( "closing client with id:", self.id, self.fatalerror ) self:_lock( true, true, true ) -- first of all, lock the interface to avoid further actions local _ - _ = self.eventread and self.eventread:close( ) -- close events; this must be called outside of the event callbacks! + _ = self.eventread and self.eventread:close( ) if self.type == "client" then _ = self.eventwrite and self.eventwrite:close( ) _ = self.eventhandshake and self.eventhandshake:close( ) @@ -262,7 +255,7 @@ do _ = self.eventwritetimeout and self.eventwritetimeout:close( ) _ = self.eventreadtimeout and self.eventreadtimeout:close( ) _ = self.ondisconnect and self:ondisconnect( self.fatalerror ~= "client to close" and self.fatalerror) -- call ondisconnect listener (wont be the case if handshake failed on connect) - _ = self.conn and self.conn:close( ) -- close connection, must also be called outside of any socket registered events! + _ = self.conn and self.conn:close( ) -- close connection _ = self._server and self._server:counter(-1); self.eventread, self.eventwrite = nil, nil self.eventstarthandshake, self.eventhandshake, self.eventclose = nil, nil, nil @@ -313,14 +306,14 @@ do if self.nowriting then return nil, "locked" end --vdebug( "try to send data to client, id/data:", self.id, data ) data = tostring( data ) - local len = string_len( data ) + local len = #data local total = len + self.writebufferlen if total > cfg.MAX_SEND_LENGTH then -- check buffer length local err = "send buffer exceeded" debug( "error:", err ) -- to much, check your app return nil, err end - self.writebuffer = self.writebuffer .. data -- new buffer + t_insert(self.writebuffer, data) -- new buffer self.writebufferlen = total if not self.eventwrite then -- register new write event --vdebug( "register new write event" ) @@ -328,25 +321,25 @@ do end return true end - function interface_mt:close(now) + function interface_mt:close() if self.nointerface then return nil, "locked"; end debug( "try to close client connection with id:", self.id ) if self.type == "client" then self.fatalerror = "client to close" - if ( not self.eventwrite ) or now then -- try to close immediately - self:_lock( true, true, true ) - self:_close() - return true - else -- wait for incomplete write request + if self.eventwrite then -- wait for incomplete write request self:_lock( true, true, false ) debug "closing delayed until writebuffer is empty" return nil, "writebuffer not empty, waiting" + else -- close now + self:_lock( true, true, true ) + self:_close() + return true end else - debug( "try to close server with id:", tostring(self.id), "args:", tostring(now) ) + debug( "try to close server with id:", tostring(self.id)) self.fatalerror = "server to close" self:_lock( true ) - self:_close( 0 ) -- add new event to remove the server interface + self:_close( 0 ) return true end end @@ -468,18 +461,15 @@ end local handleclient; do local string_sub = string.sub -- caching table lookups - local string_len = string.len local addevent = base.addevent - local coroutine_wrap = coroutine.wrap local socket_gettime = socket.gettime - local coroutine_yield = coroutine.yield - function handleclient( client, ip, port, server, pattern, listener, _, sslctx ) -- creates an client interface + function handleclient( client, ip, port, server, pattern, listener, sslctx ) -- creates an client interface --vdebug("creating client interfacce...") local interface = { type = "client"; conn = client; currenttime = socket_gettime( ); -- safe the origin - writebuffer = ""; -- writebuffer + writebuffer = {}; -- writebuffer writebufferlen = 0; -- length of writebuffer send = client.send; -- caching table lookups receive = client.receive; @@ -487,6 +477,7 @@ do ondisconnect = listener.ondisconnect; -- will be called when client disconnects onincoming = listener.onincoming; -- will be called when client sends data ontimeout = listener.ontimeout; -- called when fatal socket timeout occurs + ondrain = listener.ondrain; -- called when writebuffer is empty onstatus = listener.onstatus; -- called for status changes (e.g. of SSL/TLS) eventread = false, eventwrite = false, eventclose = false, eventhandshake = false, eventstarthandshake = false; -- event handler @@ -533,10 +524,11 @@ do interface.eventwritetimeout = false end end - local succ, err, byte = interface.conn:send( interface.writebuffer, 1, interface.writebufferlen ) + interface.writebuffer = { t_concat(interface.writebuffer) } + local succ, err, byte = interface.conn:send( interface.writebuffer[1], 1, interface.writebufferlen ) --vdebug( "write data:", interface.writebuffer, "error:", err, "part:", byte ) if succ then -- writing succesful - interface.writebuffer = "" + interface.writebuffer[1] = nil interface.writebufferlen = 0 interface:ondrain(); if interface.fatalerror then @@ -552,7 +544,7 @@ do return -1 elseif byte and (err == "timeout" or err == "wantwrite") then -- want write again --vdebug( "writebuffer is not empty:", err ) - interface.writebuffer = string_sub( interface.writebuffer, byte + 1, interface.writebufferlen ) -- new buffer + interface.writebuffer[1] = string_sub( interface.writebuffer[1], byte + 1, interface.writebufferlen ) -- new buffer interface.writebufferlen = interface.writebufferlen - byte if "wantread" == err then -- happens only with luasec local callback = function( ) @@ -602,16 +594,14 @@ do end local buffer, err, part = interface.conn:receive( interface._pattern ) -- receive buffer with "pattern" --vdebug( "read data:", tostring(buffer), "error:", tostring(err), "part:", tostring(part) ) - buffer = buffer or part or "" - local len = string_len( buffer ) - if len > cfg.MAX_READ_LENGTH then -- check buffer length + buffer = buffer or part + if buffer and #buffer > cfg.MAX_READ_LENGTH then -- check buffer length interface.fatalerror = "receive buffer exceeded" debug( "fatal error:", interface.fatalerror ) interface:_close() interface.eventread = nil return -1 end - interface.onincoming( interface, buffer, err ) -- send new data to listener if err and ( err ~= "timeout" and err ~= "wantread" ) then if "wantwrite" == err then -- need to read on write event if not interface.eventwrite then -- register new write event if needed @@ -631,6 +621,8 @@ do interface.eventread = nil return -1 end + else + interface.onincoming( interface, buffer, err ) -- send new data to listener end if interface.noreading then interface.eventread = nil; @@ -692,7 +684,7 @@ do end local client_ip, client_port = client:getpeername( ) interface._connections = interface._connections + 1 -- increase connection count - local clientinterface = handleclient( client, client_ip, client_port, interface, pattern, listener, nil, sslctx ) + local clientinterface = handleclient( client, client_ip, client_port, interface, pattern, listener, sslctx ) --vdebug( "client id:", clientinterface, "startssl:", startssl ) if ssl and sslctx then clientinterface:starttls(sslctx, true) @@ -742,9 +734,9 @@ end )( ) local addclient, wrapclient do - function wrapclient( client, ip, port, listeners, pattern, sslctx, startssl ) + function wrapclient( client, ip, port, listeners, pattern, sslctx ) local interface = handleclient( client, ip, port, nil, pattern, listeners, sslctx ) - interface:_start_session() + interface:_start_connection(sslctx) return interface, client --function handleclient( client, ip, port, server, pattern, listener, _, sslctx ) -- creates an client interface end @@ -778,9 +770,6 @@ do local res, err = client:connect( addr, serverport ) -- connect if res or ( err == "timeout" ) then local ip, port = client:getsockname( ) - local server = function( ) - return nil, "this is a dummy server interface" - end local interface = wrapclient( client, ip, serverport, listener, pattern, sslctx, startssl ) interface:_start_connection( startssl ) debug( "new connection id:", interface.id )