WRITE_TIMEOUT = 180, -- timeout in seconds for write data on socket
CONNECT_TIMEOUT = 20, -- timeout in seconds for connection attempts
CLEAR_DELAY = 5, -- seconds to wait for clearing interface list (and calling ondisconnect listeners)
+ READ_RETRY_DELAY = 1e-06, -- if, after reading, there is still data in buffer, wait this long and continue reading
DEBUG = true, -- show debug messages
}
local has_luasec, ssl = pcall ( require , "ssl" )
local socket = require "socket"
-local event = require "luaevent.core"
+local levent = require "luaevent.core"
local socket_gettime = socket.gettime
local getaddrinfo = socket.dns.getaddrinfo
local function debug(...)
return log("debug", ("%s "):rep(select('#', ...)), ...)
end
-local vdebug = debug;
+-- local vdebug = debug;
local bitor = ( function( ) -- thx Rici Lake
local hasbit = function( x, p )
end
end )( )
-local base = event.new( )
+local base = levent.new( )
local addevent = base.addevent
-local EV_READ = event.EV_READ
-local EV_WRITE = event.EV_WRITE
-local EV_TIMEOUT = event.EV_TIMEOUT
-local EV_SIGNAL = event.EV_SIGNAL
+local EV_READ = levent.EV_READ
+local EV_WRITE = levent.EV_WRITE
+local EV_TIMEOUT = levent.EV_TIMEOUT
+local EV_SIGNAL = levent.EV_SIGNAL
local EV_READWRITE = bitor( EV_READ, EV_WRITE )
return self:_destroy();
end
-function interface_mt:_start_connection(plainssl) -- should be called from addclient
- local callback = function( event )
- if EV_TIMEOUT == event then -- timeout during connection
- self.fatalerror = "connection timeout"
- self:ontimeout() -- call timeout listener
- self:_close()
- debug( "new connection failed. id:", self.id, "error:", self.fatalerror )
- else
- if plainssl and has_luasec then -- start ssl session
- self:starttls(self._sslctx, true)
- else -- normal connection
- self:_start_session(true)
- end
- debug( "new connection established. id:", self.id )
+function interface_mt:_start_connection(plainssl) -- called from wrapclient
+ local callback = function( event )
+ if EV_TIMEOUT == event then -- timeout during connection
+ self.fatalerror = "connection timeout"
+ self:ontimeout() -- call timeout listener
+ self:_close()
+ debug( "new connection failed. id:", self.id, "error:", self.fatalerror )
+ else
+ if plainssl and has_luasec then -- start ssl session
+ self:starttls(self._sslctx, true)
+ else -- normal connection
+ self:_start_session(true)
end
- self.eventconnect = nil
- return -1
+ debug( "new connection established. id:", self.id )
end
- self.eventconnect = addevent( base, self.conn, EV_WRITE, callback, cfg.CONNECT_TIMEOUT )
- return true
+ self.eventconnect = nil
+ return -1
+ end
+ self.eventconnect = addevent( base, self.conn, EV_WRITE, callback, cfg.CONNECT_TIMEOUT )
+ return true
end
function interface_mt:_start_session(call_onconnect) -- new session, for example after startssl
if self.type == "client" then
return true
end
function interface_mt:_start_ssl(call_onconnect) -- old socket will be destroyed, therefore we have to close read/write events first
- --vdebug( "starting ssl session with client id:", self.id )
- local _
- _ = self.eventread and self.eventread:close( ) -- close events; this must be called outside of the event callbacks!
- _ = self.eventwrite and self.eventwrite:close( )
- self.eventread, self.eventwrite = nil, nil
- local err
- self.conn, err = ssl.wrap( self.conn, self._sslctx )
- if err then
- self.fatalerror = err
- self.conn = nil -- cannot be used anymore
- if call_onconnect then
- self.ondisconnect = nil -- dont call this when client isnt really connected
- end
- self:_close()
- debug( "fatal error while ssl wrapping:", err )
- return false
+ --vdebug( "starting ssl session with client id:", self.id )
+ local _
+ _ = self.eventread and self.eventread:close( ) -- close events; this must be called outside of the event callbacks!
+ _ = self.eventwrite and self.eventwrite:close( )
+ self.eventread, self.eventwrite = nil, nil
+ local err
+ self.conn, err = ssl.wrap( self.conn, self._sslctx )
+ if err then
+ self.fatalerror = err
+ self.conn = nil -- cannot be used anymore
+ if call_onconnect then
+ self.ondisconnect = nil -- dont call this when client isnt really connected
end
- self.conn:settimeout( 0 ) -- set non blocking
- local handshakecallback = coroutine_wrap(
- function( event )
- local _, err
- local attempt = 0
- local maxattempt = cfg.MAX_HANDSHAKE_ATTEMPTS
- while attempt < maxattempt do -- no endless loop
- attempt = attempt + 1
- debug( "ssl handshake of client with id:"..tostring(self)..", attempt:"..attempt )
- if attempt > maxattempt then
- self.fatalerror = "max handshake attempts exceeded"
- elseif EV_TIMEOUT == event then
- self.fatalerror = "timeout during handshake"
- else
- _, err = self.conn:dohandshake( )
- if not err then
- self:_lock( false, false, false ) -- unlock the interface; sending, closing etc allowed
- self.send = self.conn.send -- caching table lookups with new client object
- self.receive = self.conn.receive
- if not call_onconnect then -- trigger listener
- self:onstatus("ssl-handshake-complete");
- end
- self:_start_session( call_onconnect )
- debug( "ssl handshake done" )
- self.eventhandshake = nil
- return -1
- end
- if err == "wantwrite" then
- event = EV_WRITE
- elseif err == "wantread" then
- event = EV_READ
- else
- debug( "ssl handshake error:", err )
- self.fatalerror = err
- end
- end
- if self.fatalerror then
- if call_onconnect then
- self.ondisconnect = nil -- dont call this when client isnt really connected
- end
- self:_close()
- debug( "handshake failed because:", self.fatalerror )
- self.eventhandshake = nil
- return -1
+ self:_close()
+ debug( "fatal error while ssl wrapping:", err )
+ return false
+ end
+ self.conn:settimeout( 0 ) -- set non blocking
+ local handshakecallback = coroutine_wrap(function( event )
+ local _, err
+ local attempt = 0
+ local maxattempt = cfg.MAX_HANDSHAKE_ATTEMPTS
+ while attempt < maxattempt do -- no endless loop
+ attempt = attempt + 1
+ debug( "ssl handshake of client with id:"..tostring(self)..", attempt:"..attempt )
+ if attempt > maxattempt then
+ self.fatalerror = "max handshake attempts exceeded"
+ elseif EV_TIMEOUT == event then
+ self.fatalerror = "timeout during handshake"
+ else
+ _, err = self.conn:dohandshake( )
+ if not err then
+ self:_lock( false, false, false ) -- unlock the interface; sending, closing etc allowed
+ self.send = self.conn.send -- caching table lookups with new client object
+ self.receive = self.conn.receive
+ if not call_onconnect then -- trigger listener
+ self:onstatus("ssl-handshake-complete");
end
- event = coroutine_yield( event, cfg.HANDSHAKE_TIMEOUT ) -- yield this monster...
+ self:_start_session( call_onconnect )
+ debug( "ssl handshake done" )
+ self.eventhandshake = nil
+ return -1
+ end
+ if err == "wantwrite" then
+ event = EV_WRITE
+ elseif err == "wantread" then
+ event = EV_READ
+ else
+ debug( "ssl handshake error:", err )
+ self.fatalerror = err
end
end
- )
- debug "starting handshake..."
- self:_lock( false, true, true ) -- unlock read/write events, but keep interface locked
- self.eventhandshake = addevent( base, self.conn, EV_READWRITE, handshakecallback, cfg.HANDSHAKE_TIMEOUT )
- return true
+ if self.fatalerror then
+ if call_onconnect then
+ self.ondisconnect = nil -- dont call this when client isnt really connected
+ end
+ self:_close()
+ debug( "handshake failed because:", self.fatalerror )
+ self.eventhandshake = nil
+ return -1
+ end
+ event = coroutine_yield( event, cfg.HANDSHAKE_TIMEOUT ) -- yield this monster...
+ end
+ end
+ )
+ debug "starting handshake..."
+ self:_lock( false, true, true ) -- unlock read/write events, but keep interface locked
+ self.eventhandshake = addevent( base, self.conn, EV_READWRITE, handshakecallback, cfg.HANDSHAKE_TIMEOUT )
+ return true
end
function interface_mt:_destroy() -- close this interface + events and call last listener
- debug( "closing client with id:", self.id, self.fatalerror )
- self:_lock( true, true, true ) -- first of all, lock the interface to avoid further actions
- local _
- _ = self.eventread and self.eventread:close( )
- if self.type == "client" then
- _ = self.eventwrite and self.eventwrite:close( )
- _ = self.eventhandshake and self.eventhandshake:close( )
- _ = self.eventstarthandshake and self.eventstarthandshake:close( )
- _ = self.eventconnect and self.eventconnect:close( )
- _ = self.eventsession and self.eventsession:close( )
- _ = self.eventwritetimeout and self.eventwritetimeout:close( )
- _ = self.eventreadtimeout and self.eventreadtimeout:close( )
- _ = self.ondisconnect and self:ondisconnect( self.fatalerror ~= "client to close" and self.fatalerror) -- call ondisconnect listener (wont be the case if handshake failed on connect)
- _ = self.conn and self.conn:close( ) -- close connection
- _ = self._server and self._server:counter(-1);
- self.eventread, self.eventwrite = nil, nil
- self.eventstarthandshake, self.eventhandshake, self.eventclose = nil, nil, nil
- self.readcallback, self.writecallback = nil, nil
- else
- self.conn:close( )
- self.eventread, self.eventclose = nil, nil
- self.interface, self.readcallback = nil, nil
- end
- interfacelist[ self ] = nil
- return true
+ debug( "closing client with id:", self.id, self.fatalerror )
+ self:_lock( true, true, true ) -- first of all, lock the interface to avoid further actions
+ local _
+ _ = self.eventread and self.eventread:close( )
+ if self.type == "client" then
+ _ = self.eventwrite and self.eventwrite:close( )
+ _ = self.eventhandshake and self.eventhandshake:close( )
+ _ = self.eventstarthandshake and self.eventstarthandshake:close( )
+ _ = self.eventconnect and self.eventconnect:close( )
+ _ = self.eventsession and self.eventsession:close( )
+ _ = self.eventwritetimeout and self.eventwritetimeout:close( )
+ _ = self.eventreadtimeout and self.eventreadtimeout:close( )
+ _ = self.ondisconnect and self:ondisconnect( self.fatalerror ~= "client to close" and self.fatalerror) -- call ondisconnect listener (wont be the case if handshake failed on connect)
+ _ = self.conn and self.conn:close( ) -- close connection
+ _ = self._server and self._server:counter(-1);
+ self.eventread, self.eventwrite = nil, nil
+ self.eventstarthandshake, self.eventhandshake, self.eventclose = nil, nil, nil
+ self.readcallback, self.writecallback = nil, nil
+ else
+ self.conn:close( )
+ self.eventread, self.eventclose = nil, nil
+ self.interface, self.readcallback = nil, nil
+ end
+ interfacelist[ self ] = nil
+ return true
end
function interface_mt:_lock(nointerface, noreading, nowriting) -- lock or unlock this interface or events
- self.nointerface, self.noreading, self.nowriting = nointerface, noreading, nowriting
- return nointerface, noreading, nowriting
+ self.nointerface, self.noreading, self.nowriting = nointerface, noreading, nowriting
+ return nointerface, noreading, nowriting
end
--TODO: Deprecate
function interface_mt:resume()
self:_lock(self.nointerface, false, self.nowriting);
- if not self.eventread then
+ if self.readcallback and not self.eventread then
self.eventread = addevent( base, self.conn, EV_READ, self.readcallback, cfg.READ_TIMEOUT ); -- register callback
+ return true;
end
end
if not self.eventwrite then
self:_lock( true, true, true ) -- lock the interface, to not disturb the handshake
self.eventstarthandshake = addevent( base, nil, EV_TIMEOUT, self.startsslcallback, 0 ) -- add event to start handshake
- else -- wait until writebuffer is empty
+ else
+ -- wait until writebuffer is empty
self:_lock( true, true, false )
debug "ssl session delayed until writebuffer is empty..."
end
function interface_mt:setlistener(listener)
self:ondetach(); -- Notify listener that it is no longer responsible for this connection
- self.onconnect, self.ondisconnect, self.onincoming, self.ontimeout,
- self.onreadtimeout, self.onstatus, self.ondetach
- = listener.onconnect, listener.ondisconnect, listener.onincoming, listener.ontimeout,
- listener.onreadtimeout, listener.onstatus, listener.ondetach;
+ self.onconnect = listener.onconnect;
+ self.ondisconnect = listener.ondisconnect;
+ self.onincoming = listener.onincoming;
+ self.ontimeout = listener.ontimeout;
+ self.onreadtimeout = listener.onreadtimeout;
+ self.onstatus = listener.onstatus;
+ self.ondetach = listener.ondetach;
end
-- Stub handlers
elseif interface.startsslcallback then -- start ssl connection if needed
debug "starting ssl handshake after writing"
interface.eventstarthandshake = addevent( base, nil, EV_TIMEOUT, interface.startsslcallback, 0 )
+ elseif interface.writebufferlen ~= 0 then
+ -- data possibly written from ondrain
+ return EV_WRITE, cfg.WRITE_TIMEOUT
elseif interface.eventreadtimeout then
- return EV_WRITE, EV_TIMEOUT
+ return EV_WRITE, cfg.WRITE_TIMEOUT
end
interface.eventwrite = nil
return -1
interface.eventread = nil
return -1
end
- if EV_TIMEOUT == event and interface:onreadtimeout() ~= true then
+ if EV_TIMEOUT == event and not interface.conn:dirty() and interface:onreadtimeout() ~= true then
return -1 -- took too long to get some data from client -> disconnect
end
if interface._usingssl then -- handle luasec
interface.eventwrite = addevent( base, interface.conn, EV_WRITE, interface.writecallback, cfg.WRITE_TIMEOUT )
end
interface.eventreadtimeout = addevent( base, nil, EV_TIMEOUT,
- function( )
- interface:_close()
- end, cfg.READ_TIMEOUT
- )
+ function( ) interface:_close() end, cfg.READ_TIMEOUT)
debug( "wantwrite during read attempt, reg it in writecallback but dont know what really happens next..." )
-- to be honest i dont know what happens next, if it is allowed to first read, the write etc...
else -- connection was closed or fatal error
interface.eventread = nil;
return -1;
end
+ if interface.conn:dirty() then -- still data left in buffer
+ return EV_TIMEOUT, cfg.READ_RETRY_DELAY;
+ end
return EV_READ, cfg.READ_TIMEOUT
end
local interface = {
_connections = 0;
+ type = "server";
conn = server;
onconnect = listener.onconnect; -- will be called when new client connected
eventread = false; -- read event handler
if res or ( err == "timeout" ) then
local ip, port = client:getsockname( )
local interface = wrapclient( client, ip, serverport, listener, pattern, sslctx )
- interface:_start_connection( startssl )
debug( "new connection id:", interface.id )
return interface, err
else
local function setquitting(yes)
if yes then
- -- Quit now
- closeallservers();
- base:loopexit();
+ -- Quit now
+ closeallservers();
+ base:loopexit();
end
end
-- being garbage-collected
local signal_events = {}; -- [signal_num] -> event object
local function hook_signal(signal_num, handler)
- local function _handler(event)
+ local function _handler()
local ret = handler();
if ret ~= false then -- Continue handling this signal?
return EV_SIGNAL; -- Yes
base = base,
loop = loop,
link = link,
- event = event,
+ event = levent,
event_base = base,
addevent = newevent,
addserver = addserver,