notes:
-- when using luaevent, never register 2 or more EV_READ at one socket, same for EV_WRITE
-- you cant even register a new EV_READ/EV_WRITE callback inside another one
- -- never call eventcallback:close( ) from inside eventcallback
-- to do some of the above, use timeout events or something what will called from outside
-- dont let garbagecollect eventcallbacks, as long they are running
-- when using luasec, there are 4 cases of timeout errors: wantread or wantwrite during reading or writing
HANDSHAKE_TIMEOUT = 60, -- timeout in seconds per handshake attempt
MAX_READ_LENGTH = 1024 * 1024 * 1024 * 1024, -- max bytes allowed to read from sockets
MAX_SEND_LENGTH = 1024 * 1024 * 1024 * 1024, -- max bytes size of write buffer (for writing on sockets)
+ ACCEPT_QUEUE = 128, -- might influence the length of the pending sockets queue
ACCEPT_DELAY = 10, -- seconds to wait until the next attempt of a full server to accept
READ_TIMEOUT = 60 * 60 * 6, -- timeout in seconds for read data from socket
WRITE_TIMEOUT = 180, -- timeout in seconds for write data on socket
}
local function use(x) return rawget(_G, x); end
-local print = use "print"
-local pcall = use "pcall"
local ipairs = use "ipairs"
local string = use "string"
local select = use "select"
local coroutine = use "coroutine"
local setmetatable = use "setmetatable"
+local t_insert = table.insert
+local t_concat = table.concat
+
local ssl = use "ssl"
local socket = use "socket" or require "socket"
local addevent = base.addevent
local coroutine_wrap, coroutine_yield = coroutine.wrap,coroutine.yield
- local string_len = string.len
-- Private methods
function interface_mt:_position(new_position)
self.position = new_position or self.position
return self.position;
end
- function interface_mt:_close() -- regs event to start self:_destroy()
- local callback = function( )
- self:_destroy();
- self.eventclose = nil
- return -1
- end
- self.eventclose = addevent( base, nil, EV_TIMEOUT, callback, 0 )
- return true
+ function interface_mt:_close()
+ return self:_destroy();
end
function interface_mt:_start_connection(plainssl) -- should be called from addclient
self:_lock( false, false, false ) -- unlock the interface; sending, closing etc allowed
self.send = self.conn.send -- caching table lookups with new client object
self.receive = self.conn.receive
- local onsomething
if not call_onconnect then -- trigger listener
self:onstatus("ssl-handshake-complete");
end
debug( "closing client with id:", self.id, self.fatalerror )
self:_lock( true, true, true ) -- first of all, lock the interface to avoid further actions
local _
- _ = self.eventread and self.eventread:close( ) -- close events; this must be called outside of the event callbacks!
+ _ = self.eventread and self.eventread:close( )
if self.type == "client" then
_ = self.eventwrite and self.eventwrite:close( )
_ = self.eventhandshake and self.eventhandshake:close( )
_ = self.eventwritetimeout and self.eventwritetimeout:close( )
_ = self.eventreadtimeout and self.eventreadtimeout:close( )
_ = self.ondisconnect and self:ondisconnect( self.fatalerror ~= "client to close" and self.fatalerror) -- call ondisconnect listener (wont be the case if handshake failed on connect)
- _ = self.conn and self.conn:close( ) -- close connection, must also be called outside of any socket registered events!
+ _ = self.conn and self.conn:close( ) -- close connection
_ = self._server and self._server:counter(-1);
self.eventread, self.eventwrite = nil, nil
self.eventstarthandshake, self.eventhandshake, self.eventclose = nil, nil, nil
function interface_mt:resume()
self:_lock(self.nointerface, false, self.nowriting);
- if not self.eventread then
+ if self.readcallback and not self.eventread then
self.eventread = addevent( base, self.conn, EV_READ, self.readcallback, cfg.READ_TIMEOUT ); -- register callback
+ return true;
end
end
if self.nowriting then return nil, "locked" end
--vdebug( "try to send data to client, id/data:", self.id, data )
data = tostring( data )
- local len = string_len( data )
+ local len = #data
local total = len + self.writebufferlen
if total > cfg.MAX_SEND_LENGTH then -- check buffer length
local err = "send buffer exceeded"
debug( "error:", err ) -- to much, check your app
return nil, err
end
- self.writebuffer = self.writebuffer .. data -- new buffer
+ t_insert(self.writebuffer, data) -- new buffer
self.writebufferlen = total
if not self.eventwrite then -- register new write event
--vdebug( "register new write event" )
debug( "try to close server with id:", tostring(self.id))
self.fatalerror = "server to close"
self:_lock( true )
- self:_close( 0 ) -- add new event to remove the server interface
+ self:_close( 0 )
return true
end
end
function interface_mt:ssl()
return self._usingssl
end
+ interface_mt.clientport = interface_mt.port -- COMPAT server_select
function interface_mt:type()
return self._type or "client"
end
function interface_mt:setlistener(listener)
- self.onconnect, self.ondisconnect, self.onincoming, self.ontimeout, self.onstatus
- = listener.onconnect, listener.ondisconnect, listener.onincoming, listener.ontimeout, listener.onstatus;
+ self:ondetach(); -- Notify listener that it is no longer responsible for this connection
+ self.onconnect, self.ondisconnect, self.onincoming,
+ self.ontimeout, self.onstatus, self.ondetach
+ = listener.onconnect, listener.ondisconnect, listener.onincoming,
+ listener.ontimeout, listener.onstatus, listener.ondetach;
end
-- Stub handlers
end
function interface_mt:ondrain()
end
+ function interface_mt:ondetach()
+ end
function interface_mt:onstatus()
end
end
local handleclient;
do
local string_sub = string.sub -- caching table lookups
- local string_len = string.len
local addevent = base.addevent
- local coroutine_wrap = coroutine.wrap
local socket_gettime = socket.gettime
- local coroutine_yield = coroutine.yield
function handleclient( client, ip, port, server, pattern, listener, sslctx ) -- creates an client interface
--vdebug("creating client interfacce...")
local interface = {
type = "client";
conn = client;
currenttime = socket_gettime( ); -- safe the origin
- writebuffer = ""; -- writebuffer
+ writebuffer = {}; -- writebuffer
writebufferlen = 0; -- length of writebuffer
send = client.send; -- caching table lookups
receive = client.receive;
ondisconnect = listener.ondisconnect; -- will be called when client disconnects
onincoming = listener.onincoming; -- will be called when client sends data
ontimeout = listener.ontimeout; -- called when fatal socket timeout occurs
+ ondrain = listener.ondrain; -- called when writebuffer is empty
+ ondetach = listener.ondetach; -- called when disassociating this listener from this connection
onstatus = listener.onstatus; -- called for status changes (e.g. of SSL/TLS)
eventread = false, eventwrite = false, eventclose = false,
eventhandshake = false, eventstarthandshake = false; -- event handler
interface.eventwritetimeout = false
end
end
- local succ, err, byte = interface.conn:send( interface.writebuffer, 1, interface.writebufferlen )
+ interface.writebuffer = { t_concat(interface.writebuffer) }
+ local succ, err, byte = interface.conn:send( interface.writebuffer[1], 1, interface.writebufferlen )
--vdebug( "write data:", interface.writebuffer, "error:", err, "part:", byte )
if succ then -- writing succesful
- interface.writebuffer = ""
+ interface.writebuffer[1] = nil
interface.writebufferlen = 0
interface:ondrain();
if interface.fatalerror then
elseif interface.startsslcallback then -- start ssl connection if needed
debug "starting ssl handshake after writing"
interface.eventstarthandshake = addevent( base, nil, EV_TIMEOUT, interface.startsslcallback, 0 )
+ elseif interface.writebufferlen ~= 0 then
+ -- data possibly written from ondrain
+ return EV_WRITE, cfg.WRITE_TIMEOUT
elseif interface.eventreadtimeout then
- return EV_WRITE, EV_TIMEOUT
+ return EV_WRITE, cfg.WRITE_TIMEOUT
end
interface.eventwrite = nil
return -1
elseif byte and (err == "timeout" or err == "wantwrite") then -- want write again
--vdebug( "writebuffer is not empty:", err )
- interface.writebuffer = string_sub( interface.writebuffer, byte + 1, interface.writebufferlen ) -- new buffer
+ interface.writebuffer[1] = string_sub( interface.writebuffer[1], byte + 1, interface.writebufferlen ) -- new buffer
interface.writebufferlen = interface.writebufferlen - byte
if "wantread" == err then -- happens only with luasec
local callback = function( )
end
local buffer, err, part = interface.conn:receive( interface._pattern ) -- receive buffer with "pattern"
--vdebug( "read data:", tostring(buffer), "error:", tostring(err), "part:", tostring(part) )
- buffer = buffer or part or ""
- local len = string_len( buffer )
- if len > cfg.MAX_READ_LENGTH then -- check buffer length
+ buffer = buffer or part
+ if buffer and #buffer > cfg.MAX_READ_LENGTH then -- check buffer length
interface.fatalerror = "receive buffer exceeded"
debug( "fatal error:", interface.fatalerror )
interface:_close()
interface.eventread = nil
return -1
end
- interface.onincoming( interface, buffer, err ) -- send new data to listener
if err and ( err ~= "timeout" and err ~= "wantread" ) then
if "wantwrite" == err then -- need to read on write event
if not interface.eventwrite then -- register new write event if needed
interface.eventread = nil
return -1
end
+ else
+ interface.onincoming( interface, buffer, err ) -- send new data to listener
end
if interface.noreading then
interface.eventread = nil;
local addclient, wrapclient
do
- function wrapclient( client, ip, port, listeners, pattern, sslctx, startssl )
+ function wrapclient( client, ip, port, listeners, pattern, sslctx )
local interface = handleclient( client, ip, port, nil, pattern, listeners, sslctx )
interface:_start_connection(sslctx)
return interface, client
local res, err = client:connect( addr, serverport ) -- connect
if res or ( err == "timeout" ) then
local ip, port = client:getsockname( )
- local server = function( )
- return nil, "this is a dummy server interface"
- end
local interface = wrapclient( client, ip, serverport, listener, pattern, sslctx, startssl )
interface:_start_connection( startssl )
debug( "new connection id:", interface.id )
sender:pause();
end
end
+ sender:set_mode("*a");
end
return {