local cfg = {
MAX_CONNECTIONS = 100000, -- max per server connections (use "ulimit -n" on *nix)
- MAX_HANDSHAKE_ATTEMPS = 10, -- attemps to finish ssl handshake
- HANDSHAKE_TIMEOUT = 1, -- timout in seconds per handshake attemp
+ MAX_HANDSHAKE_ATTEMPTS= 1000, -- attempts to finish ssl handshake
+ HANDSHAKE_TIMEOUT = 60, -- timeout in seconds per handshake attempt
MAX_READ_LENGTH = 1024 * 1024 * 1024 * 1024, -- max bytes allowed to read from sockets
MAX_SEND_LENGTH = 1024 * 1024 * 1024 * 1024, -- max bytes size of write buffer (for writing on sockets)
- ACCEPT_DELAY = 10, -- seconds to wait until the next attemp of a full server to accept
- READ_TIMEOUT = 60 * 30, -- timeout in seconds for read data from socket
- WRITE_TIMEOUT = 30, -- timeout in seconds for write data on socket
- CONNECT_TIMEOUT = 10, -- timeout in seconds for connection attemps
- CLEAR_DELAY = 5, -- seconds to wait for clearing interface list (and calling ondisconnect listeners)
+ ACCEPT_DELAY = 10, -- seconds to wait until the next attempt of a full server to accept
+ READ_TIMEOUT = 60 * 60 * 6, -- timeout in seconds for read data from socket
+ WRITE_TIMEOUT = 180, -- timeout in seconds for write data on socket
+ CONNECT_TIMEOUT = 20, -- timeout in seconds for connection attempts
+ CLEAR_DELAY = 5, -- seconds to wait for clearing interface list (and calling ondisconnect listeners)
DEBUG = true, -- show debug messages
}
local coroutine = use "coroutine"
local setmetatable = use "setmetatable"
-local ssl = use "ssl" or require "ssl"
+local ssl = use "ssl"
local socket = use "socket" or require "socket"
local log = require ("util.logger").init("socket")
local hasbit = function( x, p )
return x % ( p + p ) >= p
end
- return function( x, y )
+ return function( x, y )
local p = 1
local z = 0
local limit = x > y and x or y
- while p <= limit do
+ while p <= limit do
if hasbit( x, p ) or hasbit( y, p ) then
z = z + p
end
array[ len ] = nil
end
len = len - 1
- return len
+ return len
else
return array
end
function interface_mt:_start_connection(plainssl) -- should be called from addclient
local callback = function( event )
- if EV_TIMEOUT == event then -- timout during connection
+ if EV_TIMEOUT == event then -- timeout during connection
self.fatalerror = "connection timeout"
self:ontimeout() -- call timeout listener
self:_close()
debug( "new connection failed. id:", self.id, "error:", self.fatalerror )
else
- if plainssl then -- start ssl session
- self:starttls()
+ if plainssl and ssl then -- start ssl session
+ self:starttls(nil, true)
else -- normal connection
- self:_start_session( self.listener.onconnect )
+ self:_start_session(true)
end
debug( "new connection established. id:", self.id )
end
self.eventconnect = addevent( base, self.conn, EV_WRITE, callback, cfg.CONNECT_TIMEOUT )
return true
end
- function interface_mt:_start_session(onconnect) -- new session, for example after startssl
+ function interface_mt:_start_session(call_onconnect) -- new session, for example after startssl
if self.type == "client" then
local callback = function( )
self:_lock( false, false, false )
- --vdebug( "start listening on client socket with id:", self.id )
- self.eventread = addevent( base, self.conn, EV_READ, self.readcallback, cfg.READ_TIMEOUT ) -- register callback
- self:onconnect()
+ --vdebug( "start listening on client socket with id:", self.id )
+ self.eventread = addevent( base, self.conn, EV_READ, self.readcallback, cfg.READ_TIMEOUT ); -- register callback
+ if call_onconnect then
+ self:onconnect()
+ end
self.eventsession = nil
return -1
end
end
return true
end
- function interface_mt:_start_ssl(arg) -- old socket will be destroyed, therefore we have to close read/write events first
+ function interface_mt:_start_ssl(call_onconnect) -- old socket will be destroyed, therefore we have to close read/write events first
--vdebug( "starting ssl session with client id:", self.id )
local _
_ = self.eventread and self.eventread:close( ) -- close events; this must be called outside of the event callbacks!
if err then
self.fatalerror = err
self.conn = nil -- cannot be used anymore
- if "onconnect" == arg then
+ if call_onconnect then
self.ondisconnect = nil -- dont call this when client isnt really connected
end
self:_close()
function( event )
local _, err
local attempt = 0
- local maxattempt = cfg.MAX_HANDSHAKE_ATTEMPS
- while attempt < 1000 do -- no endless loop
+ local maxattempt = cfg.MAX_HANDSHAKE_ATTEMPTS
+ while attempt < maxattempt do -- no endless loop
attempt = attempt + 1
- debug( "ssl handshake of client with id:"..tostring(self).."attemp:"..attempt )
+ debug( "ssl handshake of client with id:"..tostring(self)..", attempt:"..attempt )
if attempt > maxattempt then
- self.fatalerror = "max handshake attemps exceeded"
+ self.fatalerror = "max handshake attempts exceeded"
elseif EV_TIMEOUT == event then
self.fatalerror = "timeout during handshake"
else
self.send = self.conn.send -- caching table lookups with new client object
self.receive = self.conn.receive
local onsomething
- if "onconnect" == arg then -- trigger listener
- onsomething = self.onconnect
- else
- onsomething = self.onsslconnection
+ if not call_onconnect then -- trigger listener
+ self:onstatus("ssl-handshake-complete");
end
- self:_start_session( onsomething )
+ self:_start_session( call_onconnect )
debug( "ssl handshake done" )
self.eventhandshake = nil
return -1
end
- debug( "error during ssl handshake:", err )
if err == "wantwrite" then
event = EV_WRITE
elseif err == "wantread" then
event = EV_READ
else
+ debug( "ssl handshake error:", err )
self.fatalerror = err
- end
+ end
end
if self.fatalerror then
- if "onconnect" == arg then
+ if call_onconnect then
self.ondisconnect = nil -- dont call this when client isnt really connected
end
self:_close()
end
)
debug "starting handshake..."
- self:_lock( false, true, true ) -- unlock read/write events, but keep interface locked
+ self:_lock( false, true, true ) -- unlock read/write events, but keep interface locked
self.eventhandshake = addevent( base, self.conn, EV_READWRITE, handshakecallback, cfg.HANDSHAKE_TIMEOUT )
return true
end
_ = self.eventsession and self.eventsession:close( )
_ = self.eventwritetimeout and self.eventwritetimeout:close( )
_ = self.eventreadtimeout and self.eventreadtimeout:close( )
- _ = self.ondisconnect and self:ondisconnect( self.fatalerror ) -- call ondisconnect listener (wont be the case if handshake failed on connect)
+ _ = self.ondisconnect and self:ondisconnect( self.fatalerror ~= "client to close" and self.fatalerror) -- call ondisconnect listener (wont be the case if handshake failed on connect)
_ = self.conn and self.conn:close( ) -- close connection, must also be called outside of any socket registered events!
_ = self._server and self._server:counter(-1);
self.eventread, self.eventwrite = nil, nil
self.nointerface, self.noreading, self.nowriting = nointerface, noreading, nowriting
return nointerface, noreading, nowriting
end
+
+ --TODO: Deprecate
+ function interface_mt:lock_read(switch)
+ if switch then
+ return self:pause();
+ else
+ return self:resume();
+ end
+ end
+
+ function interface_mt:pause()
+ return self:_lock(self.nointerface, true, self.nowriting);
+ end
+
+ function interface_mt:resume()
+ self:_lock(self.nointerface, false, self.nowriting);
+ if not self.eventread then
+ self.eventread = addevent( base, self.conn, EV_READ, self.readcallback, cfg.READ_TIMEOUT ); -- register callback
+ end
+ end
function interface_mt:counter(c)
if c then
local err = "send buffer exceeded"
debug( "error:", err ) -- to much, check your app
return nil, err
- end
+ end
self.writebuffer = self.writebuffer .. data -- new buffer
self.writebufferlen = total
if not self.eventwrite then -- register new write event
return nil, "writebuffer not empty, waiting"
end
else
- debug( "try to close server with id:", self.id, "args:", now )
+ debug( "try to close server with id:", tostring(self.id), "args:", tostring(now) )
self.fatalerror = "server to close"
self:_lock( true )
- local count = 0
- for _, item in ipairs( interfacelist( ) ) do
- if ( item.type ~= "server" ) and ( item._server == self ) then -- client/server match
- if item:close( now ) then -- writebuffer was empty
- count = count + 1
- end
- end
- end
- local timeout = 0 -- dont wait for unfinished writebuffers of clients...
- if not now then
- timeout = cfg.WRITE_TIMEOUT -- ...or wait for it
- end
- self:_close( timeout ) -- add new event to remove the server interface
- debug( "seconds remained until server is closed:", timeout )
- return count -- returns finished clients with empty writebuffer
+ self:_close( 0 ) -- add new event to remove the server interface
+ return true
end
end
+ function interface_mt:socket()
+ return self.conn
+ end
+
function interface_mt:server()
return self._server or self;
end
self.starttls = false; -- prevent starttls()
end
end
+
+ function interface_mt:set_mode(pattern)
+ if pattern then
+ self._pattern = pattern;
+ end
+ return self._pattern;
+ end
function interface_mt:set_send(new_send)
-- No-op, we always use the underlying connection's send
end
- function interface_mt:starttls()
+ function interface_mt:starttls(sslctx, call_onconnect)
debug( "try to start ssl at client id:", self.id )
local err
- if not self._sslctx then -- no ssl available
- err = "no ssl context available"
- elseif self._usingssl then -- startssl was already called
+ self._sslctx = sslctx;
+ if self._usingssl then -- startssl was already called
err = "ssl already active"
end
if err then
debug( "error:", err )
- return nil, err
+ return nil, err
end
self._usingssl = true
self.startsslcallback = function( ) -- we have to start the handshake outside of a read/write event
self.startsslcallback = nil
- self:_start_ssl();
+ self:_start_ssl(call_onconnect);
self.eventstarthandshake = nil
return -1
end
return false, "setoption not implemented";
end
+ function interface_mt:setlistener(listener)
+ self.onconnect, self.ondisconnect, self.onincoming, self.ontimeout, self.onstatus
+ = listener.onconnect, listener.ondisconnect, listener.onincoming, listener.ontimeout, listener.onstatus;
+ end
+
-- Stub handlers
function interface_mt:onconnect()
end
end
function interface_mt:ontimeout()
end
-end
+ function interface_mt:ondrain()
+ end
+ function interface_mt:onstatus()
+ end
+end
-- End of client interface methods
ondisconnect = listener.ondisconnect; -- will be called when client disconnects
onincoming = listener.onincoming; -- will be called when client sends data
ontimeout = listener.ontimeout; -- called when fatal socket timeout occurs
+ onstatus = listener.onstatus; -- called for status changes (e.g. of SSL/TLS)
eventread = false, eventwrite = false, eventclose = false,
eventhandshake = false, eventstarthandshake = false; -- event handler
eventconnect = false, eventsession = false; -- more event handler...
_sslctx = sslctx; -- parameters
_usingssl = false; -- client is using ssl;
}
- if not sslctx then
- interface.starttls = false -- don't allow TLS
- end
+ if not ssl then interface.starttls = false; end
interface.id = tostring(interface):match("%x+$");
interface.writecallback = function( event ) -- called on write events
--vdebug( "new client write event, id/ip/port:", interface, ip, port )
end
if EV_TIMEOUT == event then -- took too long to write some data to socket -> disconnect
interface.fatalerror = "timeout during writing"
- debug( "writing failed:", interface.fatalerror )
+ debug( "writing failed:", interface.fatalerror )
interface:_close()
interface.eventwrite = false
return -1
if succ then -- writing succesful
interface.writebuffer = ""
interface.writebufferlen = 0
+ interface:ondrain();
if interface.fatalerror then
debug "closing client after writing"
interface:_close() -- close interface if needed
end
interface.eventwrite = nil
return -1
- elseif byte then -- want write again
+ elseif byte and (err == "timeout" or err == "wantwrite") then -- want write again
--vdebug( "writebuffer is not empty:", err )
interface.writebuffer = string_sub( interface.writebuffer, byte + 1, interface.writebufferlen ) -- new buffer
- interface.writebufferlen = interface.writebufferlen - byte
+ interface.writebufferlen = interface.writebufferlen - byte
if "wantread" == err then -- happens only with luasec
local callback = function( )
interface:_close()
interface.eventwritetimeout = nil
- return evreturn, evtimeout
+ return -1;
end
interface.eventwritetimeout = addevent( base, nil, EV_TIMEOUT, callback, cfg.WRITE_TIMEOUT ) -- reg a new timeout event
- debug( "wantread during write attemp, reg it in readcallback but dont know what really happens next..." )
+ debug( "wantread during write attempt, reg it in readcallback but dont know what really happens next..." )
-- hopefully this works with luasec; its simply not possible to use 2 different write events on a socket in luaevent
return -1
end
- return EV_WRITE, cfg.WRITE_TIMEOUT
+ return EV_WRITE, cfg.WRITE_TIMEOUT
else -- connection was closed during writing or fatal error
interface.fatalerror = err or "fatal error"
- debug( "connection failed in write event:", interface.fatalerror )
+ debug( "connection failed in write event:", interface.fatalerror )
interface:_close()
interface.eventwrite = nil
return -1
end
if EV_TIMEOUT == event then -- took too long to get some data from client -> disconnect
interface.fatalerror = "timeout during receiving"
- debug( "connection failed:", interface.fatalerror )
+ debug( "connection failed:", interface.fatalerror )
interface:_close()
interface.eventread = nil
return -1
interface.eventreadtimeout = nil
end
end
- local buffer, err, part = interface.conn:receive( pattern ) -- receive buffer with "pattern"
- --vdebug( "read data:", tostring(buffer), "error:", tostring(err), "part:", tostring(part) )
+ local buffer, err, part = interface.conn:receive( interface._pattern ) -- receive buffer with "pattern"
+ --vdebug( "read data:", tostring(buffer), "error:", tostring(err), "part:", tostring(part) )
buffer = buffer or part or ""
local len = string_len( buffer )
if len > cfg.MAX_READ_LENGTH then -- check buffer length
function( )
interface:_close()
end, cfg.READ_TIMEOUT
- )
- debug( "wantwrite during read attemp, reg it in writecallback but dont know what really happens next..." )
+ )
+ debug( "wantwrite during read attempt, reg it in writecallback but dont know what really happens next..." )
-- to be honest i dont know what happens next, if it is allowed to first read, the write etc...
- else -- connection was closed or fatal error
+ else -- connection was closed or fatal error
interface.fatalerror = err
- debug( "connection failed in read event:", interface.fatalerror )
+ debug( "connection failed in read event:", interface.fatalerror )
interface:_close()
interface.eventread = nil
return -1
end
end
+ if interface.noreading then
+ interface.eventread = nil;
+ return -1;
+ end
return EV_READ, cfg.READ_TIMEOUT
end
end
local handleserver
do
- function handleserver( server, addr, port, pattern, listener, sslctx, startssl ) -- creates an server interface
+ function handleserver( server, addr, port, pattern, listener, sslctx ) -- creates an server interface
debug "creating server interface..."
local interface = {
_connections = 0;
if interface._connections >= cfg.MAX_CONNECTIONS then
client:close( ) -- refuse connection
debug( "maximal connections reached, refuse client connection; accept delay:", delay )
- return EV_TIMEOUT, delay -- delay for next accept attemp
+ return EV_TIMEOUT, delay -- delay for next accept attempt
end
- local ip, port = client:getpeername( )
+ local client_ip, client_port = client:getpeername( )
interface._connections = interface._connections + 1 -- increase connection count
- local clientinterface = handleclient( client, ip, port, interface, pattern, listener, nil, sslctx )
+ local clientinterface = handleclient( client, client_ip, client_port, interface, pattern, listener, nil, sslctx )
--vdebug( "client id:", clientinterface, "startssl:", startssl )
- if startssl then
- clientinterface:starttls()
+ if ssl and sslctx then
+ clientinterface:starttls(sslctx, true)
else
- clientinterface:_start_session( clientinterface.onconnect )
+ clientinterface:_start_session( true )
end
- debug( "accepted incoming client connection from:", ip, port )
+ debug( "accepted incoming client connection from:", client_ip or "<unknown IP>", client_port or "<unknown port>", "to", port or "<unknown port>");
+
client, err = server:accept() -- try to accept again
end
return EV_READ
--vdebug( "creating new tcp server with following parameters:", addr or "nil", port or "nil", sslcfg or "nil", startssl or "nil")
local server, err = socket.bind( addr, port, cfg.ACCEPT_QUEUE ) -- create server socket
if not server then
- debug( "creating server socket failed because:", err )
+ debug( "creating server socket on "..addr.." port "..port.." failed:", err )
return nil, err
end
local sslctx
debug "fatal error: luasec not found"
return nil, "luasec not found"
end
- sslctx, err = ssl.newcontext( sslcfg )
+ sslctx, err = sslcfg
if err then
debug( "error while creating new ssl context for server socket:", err )
return nil, err
end
- end
+ end
local interface = handleserver( server, addr, port, pattern, listener, sslctx, startssl ) -- new server handler
debug( "new server created with id:", tostring(interface))
return interface
function wrapclient( client, ip, port, listeners, pattern, sslctx, startssl )
local interface = handleclient( client, ip, port, nil, pattern, listeners, sslctx )
interface:_start_session()
- return interface
+ return interface, client
--function handleclient( client, ip, port, server, pattern, listener, _, sslctx ) -- creates an client interface
end
function addclient( addr, serverport, listener, pattern, localaddr, localport, sslcfg, startssl )
local client, err = socket.tcp() -- creating new socket
if not client then
- debug( "cannot create socket:", err )
+ debug( "cannot create socket:", err )
return nil, err
end
client:settimeout( 0 ) -- set nonblocking
local sslctx
if sslcfg then -- handle ssl/new context
if not ssl then
- debug "need luasec, but not available"
+ debug "need luasec, but not available"
return nil, "luasec not found"
end
- sslctx, err = ssl.newcontext( sslcfg )
+ sslctx, err = sslcfg
if err then
debug( "cannot create new ssl context:", err )
return nil, err
local server = function( )
return nil, "this is a dummy server interface"
end
- local interface = wrapclient( client, ip, serverport, listeners, pattern, sslctx, startssl )
+ local interface = wrapclient( client, ip, serverport, listener, pattern, sslctx, startssl )
interface:_start_connection( startssl )
debug( "new connection id:", interface.id )
return interface, err
end
end
-function get_backend()
+local function get_backend()
return base:method();
end
-- We need to hold onto the events to stop them
-- being garbage-collected
local signal_events = {}; -- [signal_num] -> event object
-function hook_signal(signal_num, handler)
+local function hook_signal(signal_num, handler)
local function _handler(event)
local ret = handler();
if ret ~= false then -- Continue handling this signal?
return signal_events[signal_num];
end
+local function link(sender, receiver, buffersize)
+ local sender_locked;
+
+ function receiver:ondrain()
+ if sender_locked then
+ sender:resume();
+ sender_locked = nil;
+ end
+ end
+
+ function sender:onincoming(data)
+ receiver:write(data);
+ if receiver.writebufferlen >= buffersize then
+ sender_locked = true;
+ sender:pause();
+ end
+ end
+end
+
return {
cfg = cfg,
base = base,
loop = loop,
+ link = link,
event = event,
event_base = base,
addevent = newevent,