Merge 0.9->0.10
authorKim Alvefur <zash@zash.se>
Fri, 22 Jan 2016 13:49:05 +0000 (14:49 +0100)
committerKim Alvefur <zash@zash.se>
Fri, 22 Jan 2016 13:49:05 +0000 (14:49 +0100)
net/dns.lua
net/server.lua
net/server_select.lua
plugins/mod_s2s/s2sout.lib.lua

index d711af3499fc2eb2d4c4d475d12e658893456704..b047ec542ec3813328d8ee2aaaaa153ba98316df 100644 (file)
@@ -754,17 +754,17 @@ function resolver:query(qname, qtype, qclass)    -- - - - - - - - - - -- query
        self.active[id] = self.active[id] or {};
        self.active[id][question] = o;
 
-       -- remember which coroutine wants the answer
-       if co then
-               set(self.wanted, qclass, qtype, qname, co, true);
-       end
-
        local conn, err = self:getsocket(o.server)
        if not conn then
                return nil, err;
        end
        conn:send (o.packet)
 
+       -- remember which coroutine wants the answer
+       if co then
+               set(self.wanted, qclass, qtype, qname, co, true);
+       end
+       
        if timer and self.timeout then
                local num_servers = #self.server;
                local i = 1;
@@ -861,7 +861,7 @@ function resolver:receive(rset)    -- - - - - - - - - - - - - - - - -  receive
                                        -- retire the query
                                        local queries = self.active[response.header.id];
                                        queries[response.question.raw] = nil;
-
+                                       
                                        if not next(queries) then self.active[response.header.id] = nil; end
                                        if not next(self.active) then self:closeall(); end
 
@@ -875,7 +875,7 @@ function resolver:receive(rset)    -- - - - - - - - - - - - - - - - -  receive
                                                set(self.wanted, q.class, q.type, q.name, nil);
                                        end
                                end
-
+                               
                        end
                end
        end
index 2a0b89ae8aa60b68228d0b4c2657fd14f3e2e524..41e180faa6ce9cd5cee9518b89c0671d9a0be8ae 100644 (file)
@@ -50,7 +50,7 @@ if prosody then
                local settings = config_get("*", "network_settings") or {};
                if use_luaevent then
                        local event_settings = {
-                               ACCEPT_DELAY = settings.event_accept_retry_interval;
+                               ACCEPT_DELAY = settings.accept_retry_interval;
                                ACCEPT_QUEUE = settings.tcp_backlog;
                                CLEAR_DELAY = settings.event_clear_interval;
                                CONNECT_TIMEOUT = settings.connect_timeout;
index eccc7239f21cd13a0bfce492b239eee8dd1e51fe..52a0d5f1a1fd572d81609f76640f926886188891 100644 (file)
@@ -89,6 +89,7 @@ local _socketlist
 local _closelist
 local _readtimes
 local _writetimes
+local _fullservers
 
 --// simple data types //--
 
@@ -103,6 +104,7 @@ local _readtraffic
 local _selecttimeout
 local _sleeptime
 local _tcpbacklog
+local _accepretry
 
 local _starttime
 local _currenttime
@@ -131,6 +133,7 @@ _socketlist = { } -- key = socket, value = wrapped socket (handlers)
 _readtimes = { } -- key = handler, value = timestamp of last data reading
 _writetimes = { } -- key = handler, value = timestamp of last data writing/sending
 _closelist = { } -- handlers to close
+_fullservers = { } -- servers in a paused state while there are too many clients
 
 _readlistlen = 0 -- length of readlist
 _sendlistlen = 0 -- length of sendlist
@@ -142,6 +145,7 @@ _readtraffic = 0
 _selecttimeout = 1 -- timeout of socket.select
 _sleeptime = 0 -- time to wait at the end of every loop
 _tcpbacklog = 128 -- some kind of hint to the OS
+_accepretry = 10 -- seconds to wait until the next attempt of a full server to accept
 
 _maxsendlen = 51000 * 1024 -- max len of send buffer
 _maxreadlen = 25000 * 1024 -- max len of read buffer
@@ -210,6 +214,7 @@ wrapserver = function( listeners, socket, ip, serverport, pattern, sslctx ) -- t
                                socket = nil;
                        end
                        handler.paused = true;
+                       out_put("server.lua: server [", ip, "]:", serverport, " paused")
                end
        end
        handler.resume = function( )
@@ -220,7 +225,9 @@ wrapserver = function( listeners, socket, ip, serverport, pattern, sslctx ) -- t
                        end
                        _readlistlen = addsocket(_readlist, socket, _readlistlen)
                        _socketlist[ socket ] = handler
+                       _fullservers[ handler ] = nil
                        handler.paused = false;
+                       out_put("server.lua: server [", ip, "]:", serverport, " resumed")
                end
        end
        handler.ip = function( )
@@ -235,6 +242,7 @@ wrapserver = function( listeners, socket, ip, serverport, pattern, sslctx ) -- t
        handler.readbuffer = function( )
                if _readlistlen >= _maxselectlen or _sendlistlen >= _maxselectlen then
                        handler.pause( )
+                       _fullservers[ handler ] = _currenttime
                        out_put( "server.lua: refused new client connection: server full" )
                        return false
                end
@@ -253,6 +261,8 @@ wrapserver = function( listeners, socket, ip, serverport, pattern, sslctx ) -- t
                        return;
                elseif err then -- maybe timeout or something else
                        out_put( "server.lua: error with new client connection: ", tostring(err) )
+                       handler.pause( )
+                       _fullservers[ handler ] = _currenttime
                        return false
                end
        end
@@ -265,6 +275,7 @@ wrapconnection = function( server, listeners, socket, ip, serverport, clientport
                out_error("server.lua: Disallowed FD number: "..socket:getfd()) -- PROTIP: Switch to libevent
                socket:close( ) -- Should we send some kind of error here?
                if server then
+                       _fullservers[ server ] = _currenttime
                        server.pause( )
                end
                return nil, nil, "fd-too-large"
@@ -806,6 +817,7 @@ getsettings = function( )
                max_connections = _maxselectlen;
                max_ssl_handshake_roundtrips = _maxsslhandshake;
                highest_allowed_fd = _maxfd;
+               accept_retry_interval = _accepretry;
        }
 end
 
@@ -821,6 +833,7 @@ changesettings = function( new )
        _tcpbacklog = tonumber( new.tcp_backlog ) or _tcpbacklog
        _sendtimeout = tonumber( new.send_timeout ) or _sendtimeout
        _readtimeout = tonumber( new.read_timeout ) or _readtimeout
+       _accepretry = tonumber( new.accept_retry_interval ) or _accepretry
        _maxselectlen = new.max_connections or _maxselectlen
        _maxsslhandshake = new.max_ssl_handshake_roundtrips or _maxsslhandshake
        _maxfd = new.highest_allowed_fd or _maxfd
@@ -911,6 +924,13 @@ loop = function(once) -- this is the main loop of the program
                        next_timer_time = next_timer_time - (_currenttime - _timer);
                end
 
+               for server, paused_time in pairs( _fullservers ) do
+                       if _currenttime - paused_time > _accepretry then
+                               _fullservers[ server ] = nil;
+                               server.resume();
+                       end
+               end
+
                -- wait some time (0 by default)
                socket_sleep( _sleeptime )
        until quitting;
index 395406cd69494a2b175c24855bb7431ef18ffdb7..4241316469e35bfce1fc237fa05c251d37fec367 100644 (file)
@@ -18,31 +18,13 @@ local socket = require "socket";
 local adns = require "net.adns";
 local dns = require "net.dns";
 local t_insert, t_sort, ipairs = table.insert, table.sort, ipairs;
+local local_addresses = require "util.net".local_addresses;
 
 local s2s_destroy_session = require "core.s2smanager".destroy_session;
 
 local log = module._log;
 
-local anysource = { IPv4 = "0.0.0.0", IPv6 = "::" };
-local function get_sources(addrs)
-       local sources = {};
-       for _, IP in ipairs(addrs) do
-               local sock;
-               if IP.proto == "IPv4" then
-                       sock = socket.udp();
-               elseif IP.proto == "IPv6" then
-                       sock = socket.udp6();
-               end
-               sock:setpeername(IP.addr, 9);
-               local localaddr = sock:getsockname() or anysource[IP.proto];
-               sock:close();
-               if not sources[localaddr] then
-                       sources[localaddr] = true;
-                       t_insert(sources, new_ip(localaddr, IP.proto));
-               end
-       end
-       return sources;
-end
+local sources = {};
 local has_ipv4, has_ipv6;
 
 local dns_timeout = module:get_option_number("dns_timeout", 15);
@@ -196,7 +178,7 @@ function s2sout.try_connect(host_session, connect_host, connect_port, err)
 
                                if have_other_result then
                                        if #IPs > 0 then
-                                               rfc6724_dest(host_session.ip_hosts, get_sources(host_session.ip_hosts));
+                                               rfc6724_dest(host_session.ip_hosts, sources);
                                                for i = 1, #IPs do
                                                        IPs[i] = {ip = IPs[i], port = connect_port};
                                                end
@@ -232,7 +214,7 @@ function s2sout.try_connect(host_session, connect_host, connect_port, err)
 
                                if have_other_result then
                                        if #IPs > 0 then
-                                               rfc6724_dest(host_session.ip_hosts, get_sources(host_session.ip_hosts));
+                                               rfc6724_dest(host_session.ip_hosts, sources);
                                                for i = 1, #IPs do
                                                        IPs[i] = {ip = IPs[i], port = connect_port};
                                                end
@@ -320,12 +302,28 @@ module:hook_global("service-added", function (event)
                return;
        end
        for source, _ in pairs(s2s_sources) do
-               if source:find(":") then
-                       has_ipv6 = true;
+               if source == "*" or source == "0.0.0.0" then
+                       for _, addr in ipairs(local_addresses("ipv4", true)) do
+                               sources[#sources + 1] = new_ip(addr, "IPv4");
+                       end
+               elseif source == "::" then
+                       for _, addr in ipairs(local_addresses("ipv6", true)) do
+                               sources[#sources + 1] = new_ip(addr, "IPv6");
+                       end
                else
+                       sources[#sources + 1] = new_ip(source, (source:find(":") and "IPv6") or "IPv4");
+               end
+       end
+       for i = 1,#sources do
+               if sources[i].proto == "IPv6" then
+                       has_ipv6 = true;
+               elseif sources[i].proto == "IPv4" then
                        has_ipv4 = true;
                end
        end
+       if not (has_ipv4 or has_ipv6)  then
+               module:log("warn", "No local IPv4 or IPv6 addresses detected, outgoing connections may fail");
+       end
 end);
 
 return s2sout;