local default_timeout = 15;
-------------------------------------------------- module dns
-module('dns')
-local dns = _M;
+local _ENV = nil;
+local dns = {};
-- dns type & class codes ------------------------------ dns type & class codes
end
-function resolver:new() -- - - - - - - - - - - - - - - - - - - - - resolver
- local r = { active = {}, cache = {}, unsorted = {} };
- setmetatable(r, resolver);
- setmetatable(r.cache, cache_metatable);
- setmetatable(r.unsorted, { __mode = 'kv' });
- return r;
-end
-
-
-- packet layer -------------------------------------------------- packet layer
if peer:find(":") then
sock, err = socket.udp6();
else
- sock, err = socket.udp();
+ sock, err = (socket.udp4 or socket.udp)();
end
if sock and self.socket_wrapper then sock, err = self.socket_wrapper(sock, self); end
if not sock then
self.active[id] = self.active[id] or {};
self.active[id][question] = o;
- -- remember which coroutine wants the answer
- if co then
- set(self.wanted, qclass, qtype, qname, co, true);
- end
-
local conn, err = self:getsocket(o.server)
if not conn then
return nil, err;
end
conn:send (o.packet)
+ -- remember which coroutine wants the answer
+ if co then
+ set(self.wanted, qclass, qtype, qname, co, true);
+ end
+
if timer and self.timeout then
local num_servers = #self.server;
local i = 1;
-- retire the query
local queries = self.active[response.header.id];
queries[response.question.raw] = nil;
-
+
if not next(queries) then self.active[response.header.id] = nil; end
if not next(self.active) then self:closeall(); end
set(self.wanted, q.class, q.type, q.name, nil);
end
end
-
+
end
end
end
function dns.resolver () -- - - - - - - - - - - - - - - - - - - - - resolver
- -- this function seems to be redundant with resolver.new ()
-
local r = { active = {}, cache = {}, unsorted = {}, wanted = {}, best_server = 1 };
setmetatable (r, resolver);
setmetatable (r.cache, cache_metatable);
---
+--
-- server.lua by blastbeat of the luadch project
-- Re-used here under the MIT/X Consortium License
---
+--
-- Modifications (C) 2008-2010 Matthew Wild, Waqas Hussain
--
--// extern libs //--
-local luasec = use "ssl"
+local has_luasec, luasec = pcall ( require , "ssl" )
local luasocket = use "socket" or require "socket"
local luasocket_gettime = luasocket.gettime
+local getaddrinfo = luasocket.dns.getaddrinfo
--// extern lib methods //--
-local ssl_wrap = ( luasec and luasec.wrap )
+local ssl_wrap = ( has_luasec and luasec.wrap )
local socket_bind = luasocket.bind
local socket_sleep = luasocket.sleep
local socket_select = luasocket.select
local _closelist
local _readtimes
local _writetimes
+ local _fullservers
--// simple data types //--
local _selecttimeout
local _sleeptime
local _tcpbacklog
+ local _accepretry
local _starttime
local _currenttime
_readtimes = { } -- key = handler, value = timestamp of last data reading
_writetimes = { } -- key = handler, value = timestamp of last data writing/sending
_closelist = { } -- handlers to close
+ _fullservers = { } -- servers in a paused state while there are too many clients
_readlistlen = 0 -- length of readlist
_sendlistlen = 0 -- length of sendlist
_selecttimeout = 1 -- timeout of socket.select
_sleeptime = 0 -- time to wait at the end of every loop
_tcpbacklog = 128 -- some kind of hint to the OS
+ _accepretry = 10 -- seconds to wait until the next attempt of a full server to accept
_maxsendlen = 51000 * 1024 -- max len of send buffer
_maxreadlen = 25000 * 1024 -- max len of read buffer
-_checkinterval = 1200000 -- interval in secs to check idle clients
+_checkinterval = 30 -- interval in secs to check idle clients
_sendtimeout = 60000 -- allowed send idle time in secs
_readtimeout = 6 * 60 * 60 -- allowed read idle time in secs
socket = nil;
end
handler.paused = true;
+ out_put("server.lua: server [", ip, "]:", serverport, " paused")
end
end
handler.resume = function( )
end
_readlistlen = addsocket(_readlist, socket, _readlistlen)
_socketlist[ socket ] = handler
+ _fullservers[ handler ] = nil
handler.paused = false;
+ out_put("server.lua: server [", ip, "]:", serverport, " resumed")
end
end
handler.ip = function( )
handler.readbuffer = function( )
if _readlistlen >= _maxselectlen or _sendlistlen >= _maxselectlen then
handler.pause( )
+ _fullservers[ handler ] = _currenttime
out_put( "server.lua: refused new client connection: server full" )
return false
end
return;
elseif err then -- maybe timeout or something else
out_put( "server.lua: error with new client connection: ", tostring(err) )
+ handler.pause( )
+ _fullservers[ handler ] = _currenttime
return false
end
end
out_error("server.lua: Disallowed FD number: "..socket:getfd()) -- PROTIP: Switch to libevent
socket:close( ) -- Should we send some kind of error here?
if server then
+ _fullservers[ server ] = _currenttime
server.pause( )
end
return nil, nil, "fd-too-large"
local status = listeners.onstatus
local disconnect = listeners.ondisconnect
local drain = listeners.ondrain
+ local onreadtimeout = listeners.onreadtimeout;
local detach = listeners.ondetach
local bufferqueue = { } -- buffer array
handler.disconnect = function( )
return disconnect
end
+ handler.onreadtimeout = onreadtimeout;
+
handler.setlistener = function( self, listeners )
if detach then
detach(self) -- Notify listener that it is no longer responsible for this connection
disconnect = listeners.ondisconnect
status = listeners.onstatus
drain = listeners.ondrain
+ handler.onreadtimeout = listeners.onreadtimeout
detach = listeners.ondetach
end
handler.getstats = function( )
out_put "server.lua: closed client handler and removed socket from list"
return true
end
+ handler.server = function ( )
+ return server
+ end
handler.ip = function( )
return ip
end
_ = status and status( handler, "ssl-handshake-complete" )
if self.autostart_ssl and listeners.onconnect then
listeners.onconnect(self);
+ if bufferqueuelen ~= 0 then
+ _sendlistlen = addsocket(_sendlist, client, _sendlistlen)
+ end
end
_readlistlen = addsocket(_readlist, client, _readlistlen)
return true
coroutine_yield( ) -- handshake not finished
end
end
- out_put( "server.lua: ssl handshake error: ", tostring(err or "handshake too long") )
- _ = handler and handler:force_close("ssl handshake failed")
+ err = "ssl handshake error: " .. ( err or "handshake too long" );
+ out_put( "server.lua: ", err );
+ _ = handler and handler:force_close(err)
return false, err -- handshake failed
end
)
end
- if luasec then
+ if has_luasec then
handler.starttls = function( self, _sslctx)
if _sslctx then
handler:set_sslctx(_sslctx);
shutdown = id
_socketlist[ socket ] = handler
_readlistlen = addsocket(_readlist, socket, _readlistlen)
-
+
-- remove traces of the old socket
_readlistlen = removesocket( _readlist, oldsocket, _readlistlen )
_sendlistlen = removesocket( _sendlist, oldsocket, _sendlistlen )
_socketlist[ socket ] = handler
_readlistlen = addsocket(_readlist, socket, _readlistlen)
- if sslctx and luasec then
+ if sslctx and has_luasec then
out_put "server.lua: auto-starting ssl negotiation..."
handler.autostart_ssl = true;
local ok, err = handler:starttls(sslctx);
sender_locked = nil;
end
end
-
+
local _readbuffer = sender.readbuffer;
function sender.readbuffer()
_readbuffer();
----------------------------------// PUBLIC //--
addserver = function( addr, port, listeners, pattern, sslctx ) -- this function provides a way for other scripts to reg a server
+ addr = addr or "*"
local err
if type( listeners ) ~= "table" then
err = "invalid listener table"
- end
- if type( port ) ~= "number" or not ( port >= 0 and port <= 65535 ) then
+ elseif type ( addr ) ~= "string" then
+ err = "invalid address"
+ elseif type( port ) ~= "number" or not ( port >= 0 and port <= 65535 ) then
err = "invalid port"
elseif _server[ addr..":"..port ] then
err = "listeners on '[" .. addr .. "]:" .. port .. "' already exist"
- elseif sslctx and not luasec then
+ elseif sslctx and not has_luasec then
err = "luasec not found"
end
if err then
out_error( "server.lua, [", addr, "]:", port, ": ", err )
return nil, err
end
- addr = addr or "*"
local server, err = socket_bind( addr, port, _tcpbacklog )
if err then
out_error( "server.lua, [", addr, "]:", port, ": ", err )
max_connections = _maxselectlen;
max_ssl_handshake_roundtrips = _maxsslhandshake;
highest_allowed_fd = _maxfd;
+ accept_retry_interval = _accepretry;
}
end
_tcpbacklog = tonumber( new.tcp_backlog ) or _tcpbacklog
_sendtimeout = tonumber( new.send_timeout ) or _sendtimeout
_readtimeout = tonumber( new.read_timeout ) or _readtimeout
+ _accepretry = tonumber( new.accept_retry_interval ) or _accepretry
_maxselectlen = new.max_connections or _maxselectlen
_maxsslhandshake = new.max_ssl_handshake_roundtrips or _maxsslhandshake
_maxfd = new.highest_allowed_fd or _maxfd
_starttime = _currenttime
for handler, timestamp in pairs( _writetimes ) do
if os_difftime( _currenttime - timestamp ) > _sendtimeout then
- --_writetimes[ handler ] = nil
handler.disconnect( )( handler, "send timeout" )
handler:force_close() -- forced disconnect
end
end
for handler, timestamp in pairs( _readtimes ) do
if os_difftime( _currenttime - timestamp ) > _readtimeout then
- --_readtimes[ handler ] = nil
- handler.disconnect( )( handler, "read timeout" )
- handler:close( ) -- forced disconnect?
+ if not(handler.onreadtimeout) or handler:onreadtimeout() ~= true then
+ handler.disconnect( )( handler, "read timeout" )
+ handler:close( ) -- forced disconnect?
+ else
+ _readtimes[ handler ] = _currenttime -- reset timer
+ end
end
end
end
next_timer_time = next_timer_time - (_currenttime - _timer);
end
+ for server, paused_time in pairs( _fullservers ) do
+ if _currenttime - paused_time > _accepretry then
+ _fullservers[ server ] = nil;
+ server.resume();
+ end
+ end
+
-- wait some time (0 by default)
socket_sleep( _sleeptime )
until quitting;
if once and quitting == "once" then quitting = nil; return; end
+ closeall();
return "quitting"
end
return handler, socket
end
-local addclient = function( address, port, listeners, pattern, sslctx )
- local client, err = luasocket.tcp( )
+local addclient = function( address, port, listeners, pattern, sslctx, typ )
+ local err
+ if type( listeners ) ~= "table" then
+ err = "invalid listener table"
+ elseif type ( address ) ~= "string" then
+ err = "invalid address"
+ elseif type( port ) ~= "number" or not ( port >= 0 and port <= 65535 ) then
+ err = "invalid port"
+ elseif sslctx and not has_luasec then
+ err = "luasec not found"
+ end
+ if not typ then
+ local addrinfo, err = getaddrinfo(address)
+ if not addrinfo then return nil, err end
+ if addrinfo[1] and addrinfo[1].family == "inet6" then
+ typ = "tcp6"
+ else
+ typ = "tcp"
+ end
+ end
+ local create = luasocket[typ]
+ if type( create ) ~= "function" then
+ err = "invalid socket type"
+ end
+
+ if err then
+ out_error( "server.lua, addclient: ", err )
+ return nil, err
+ end
+
+ local client, err = create( )
if err then
return nil, err
end
client:settimeout( 0 )
- _, err = client:connect( address, port )
- if err then -- try again
- local handler = wrapclient( client, address, port, listeners )
+ local ok, err = client:connect( address, port )
+ if ok or err == "timeout" then
+ return wrapclient( client, address, port, listeners, pattern, sslctx )
else
- wrapconnection( nil, listeners, client, address, port, "clientport", pattern, sslctx )
+ return nil, err
end
end
addclient = addclient,
wrapclient = wrapclient,
-
+
loop = loop,
link = link,
step = step,
-- Prosody IM
-- Copyright (C) 2008-2010 Matthew Wild
-- Copyright (C) 2008-2010 Waqas Hussain
---
+--
-- This project is MIT/X11 licensed. Please see the
-- COPYING file in the source package for more information.
--
local adns = require "net.adns";
local dns = require "net.dns";
local t_insert, t_sort, ipairs = table.insert, table.sort, ipairs;
+ local local_addresses = require "util.net".local_addresses;
local s2s_destroy_session = require "core.s2smanager".destroy_session;
local log = module._log;
- local anysource = { IPv4 = "0.0.0.0", IPv6 = "::" };
- local function get_sources(addrs)
- local sources = {};
- for _, IP in ipairs(addrs) do
- local sock;
- if IP.proto == "IPv4" then
- sock = socket.udp();
- elseif IP.proto == "IPv6" then
- sock = socket.udp6();
- end
- sock:setpeername(IP.addr, 9);
- local localaddr = sock:getsockname() or anysource[IP.proto];
- sock:close();
- if not sources[localaddr] then
- sources[localaddr] = true;
- t_insert(sources, new_ip(localaddr, IP.proto));
- end
- end
- return sources;
- end
+ local sources = {};
local has_ipv4, has_ipv6;
local dns_timeout = module:get_option_number("dns_timeout", 15);
function s2sout.initiate_connection(host_session)
initialize_filters(host_session);
host_session.version = 1;
-
+
-- Kick the connection attempting machine into life
if not s2sout.attempt_connection(host_session) then
-- Intentionally not returning here, the
-- session is needed, connected or not
s2s_destroy_session(host_session);
end
-
+
if not host_session.sends2s then
-- A sends2s which buffers data (until the stream is opened)
-- note that data in this buffer will be sent before the stream is authed
function s2sout.attempt_connection(host_session, err)
local to_host = host_session.to_host;
local connect_host, connect_port = to_host and idna_to_ascii(to_host), 5269;
-
+
if not connect_host then
return false;
end
-
+
if not err then -- This is our first attempt
log("debug", "First attempt to connect to %s, starting with SRV lookup...", to_host);
host_session.connecting = true;
local handle;
handle = adns.lookup(function (answer)
handle = nil;
+ local srv_hosts = { answer = answer };
+ host_session.srv_hosts = srv_hosts;
+ host_session.srv_choice = 0;
host_session.connecting = nil;
if answer and #answer > 0 then
log("debug", "%s has SRV records, handling...", to_host);
- local srv_hosts = { answer = answer };
- host_session.srv_hosts = srv_hosts;
for _, record in ipairs(answer) do
t_insert(srv_hosts, record.srv);
end
return;
end
t_sort(srv_hosts, compare_srv_priorities);
-
+
local srv_choice = srv_hosts[1];
host_session.srv_choice = 1;
if srv_choice then
end
end
end, "_xmpp-server._tcp."..connect_host..".", "SRV");
-
+
return true; -- Attempt in progress
elseif host_session.ip_hosts then
return s2sout.try_connect(host_session, connect_host, connect_port, err);
connect_host, connect_port = srv_choice.target or to_host, srv_choice.port or connect_port;
host_session.log("info", "Connection failed (%s). Attempt #%d: This time to %s:%d", tostring(err), host_session.srv_choice, connect_host, connect_port);
else
- host_session.log("info", "Out of connection options, can't connect to %s", tostring(host_session.to_host));
+ host_session.log("info", "Failed in all attempts to connect to %s", tostring(host_session.to_host));
-- We're out of options
return false;
end
-
+
if not (connect_host and connect_port) then
-- Likely we couldn't resolve DNS
log("warn", "Hmm, we're without a host (%s) and port (%s) to connect to for %s, giving up :(", tostring(connect_host), tostring(connect_port), tostring(to_host));
if have_other_result then
if #IPs > 0 then
- rfc6724_dest(host_session.ip_hosts, get_sources(host_session.ip_hosts));
+ rfc6724_dest(host_session.ip_hosts, sources);
for i = 1, #IPs do
IPs[i] = {ip = IPs[i], port = connect_port};
end
if have_other_result then
if #IPs > 0 then
- rfc6724_dest(host_session.ip_hosts, get_sources(host_session.ip_hosts));
+ rfc6724_dest(host_session.ip_hosts, sources);
for i = 1, #IPs do
IPs[i] = {ip = IPs[i], port = connect_port};
end
end
function s2sout.make_connect(host_session, connect_host, connect_port)
- (host_session.log or log)("info", "Beginning new connection attempt to %s ([%s]:%d)", host_session.to_host, connect_host.addr, connect_port);
+ (host_session.log or log)("debug", "Beginning new connection attempt to %s ([%s]:%d)", host_session.to_host, connect_host.addr, connect_port);
-- Reset secure flag in case this is another
-- connection attempt after a failed STARTTLS
host_session.secure = nil;
+ host_session.encrypted = nil;
local conn, handler;
local proto = connect_host.proto;
else
handler = "Unsupported protocol: "..tostring(proto);
end
-
+
if not conn then
log("warn", "Failed to create outgoing connection, system error: %s", handler);
return false, handler;
log("warn", "s2s connect() to %s (%s:%d) failed: %s", host_session.to_host, connect_host.addr, connect_port, err);
return false, err;
end
-
+
conn = wrapclient(conn, connect_host.addr, connect_port, s2s_listener, "*a");
host_session.conn = conn;
-
- local filter = initialize_filters(host_session);
- local w, log = conn.write, host_session.log;
- host_session.sends2s = function (t)
- log("debug", "sending: %s", (t.top_tag and t:top_tag()) or t:match("^[^>]*>?"));
- if t.name then
- t = filter("stanzas/out", t);
- end
- if t then
- t = filter("bytes/out", tostring(t));
- if t then
- return w(conn, tostring(t));
- end
- end
- end
-
+
-- Register this outgoing connection so that xmppserver_listener knows about it
-- otherwise it will assume it is a new incoming connection
s2s_listener.register_outgoing(conn, host_session);
-
+
log("debug", "Connection attempt in progress...");
return true;
end
return;
end
for source, _ in pairs(s2s_sources) do
- if source:find(":") then
- has_ipv6 = true;
+ if source == "*" or source == "0.0.0.0" then
+ for _, addr in ipairs(local_addresses("ipv4", true)) do
+ sources[#sources + 1] = new_ip(addr, "IPv4");
+ end
+ elseif source == "::" then
+ for _, addr in ipairs(local_addresses("ipv6", true)) do
+ sources[#sources + 1] = new_ip(addr, "IPv6");
+ end
else
+ sources[#sources + 1] = new_ip(source, (source:find(":") and "IPv6") or "IPv4");
+ end
+ end
+ for i = 1,#sources do
+ if sources[i].proto == "IPv6" then
+ has_ipv6 = true;
+ elseif sources[i].proto == "IPv4" then
has_ipv4 = true;
end
end
+ if not (has_ipv4 or has_ipv6) then
+ module:log("warn", "No local IPv4 or IPv6 addresses detected, outgoing connections may fail");
+ end
end);
return s2sout;