Merge 0.9->0.10
authorKim Alvefur <zash@zash.se>
Tue, 2 Sep 2014 20:33:11 +0000 (22:33 +0200)
committerKim Alvefur <zash@zash.se>
Tue, 2 Sep 2014 20:33:11 +0000 (22:33 +0200)
1  2 
net/http.lua
net/http/server.lua
net/server_event.lua
net/server_select.lua
plugins/mod_admin_telnet.lua
plugins/mod_c2s.lua
plugins/mod_component.lua
plugins/mod_s2s/mod_s2s.lua

diff --combined net/http.lua
index ab9ec7b6c86f6b4e45c8d8457d0b74bc16ca1b1e,9dde6062d22ebf1c9df245cbc95f7f7148eef00c..0c0ef967eb734dc55a5dd7616d1112c023d32fe0
@@@ -1,7 -1,7 +1,7 @@@
  -- Prosody IM
  -- Copyright (C) 2008-2010 Matthew Wild
  -- Copyright (C) 2008-2010 Waqas Hussain
 --- 
 +--
  -- This project is MIT/X11 licensed. Please see the
  -- COPYING file in the source package for more information.
  --
@@@ -37,7 -37,7 +37,7 @@@ function listener.onconnect(conn
        if req.query then
                t_insert(request_line, 4, "?"..req.query);
        end
 -      
 +
        conn:write(t_concat(request_line));
        local t = { [2] = ": ", [4] = "\r\n" };
        for k, v in pairs(req.headers) do
@@@ -45,7 -45,7 +45,7 @@@
                conn:write(t_concat(t));
        end
        conn:write("\r\n");
 -      
 +
        if req.body then
                conn:write(req.body);
        end
@@@ -72,6 -72,10 +72,10 @@@ function listener.ondisconnect(conn, er
        requests[conn] = nil;
  end
  
+ function listener.ondetach(conn)
+       requests[conn] = nil;
+ end
  local function request_reader(request, data, err)
        if not request.parser then
                local function error_cb(reason)
                        end
                        destroy_request(request);
                end
 -              
 +
                if not data then
                        error_cb(err);
                        return;
                end
 -              
 +
                local function success_cb(r)
                        if request.callback then
                                request.callback(r.body, r.code, r, request);
@@@ -105,18 -109,18 +109,18 @@@ en
  local function handleerr(err) log("error", "Traceback[http]: %s", traceback(tostring(err), 2)); end
  function request(u, ex, callback)
        local req = url.parse(u);
 -      
 +
        if not (req and req.host) then
                callback(nil, 0, req);
                return nil, "invalid-url";
        end
 -      
 +
        if not req.path then
                req.path = "/";
        end
 -      
 +
        local method, headers, body;
 -      
 +
        local host, port = req.host, req.port;
        local host_header = host;
        if (port == "80" and req.scheme == "http")
                ["Host"] = host_header;
                ["User-Agent"] = "Prosody XMPP Server";
        };
 -      
 +
        if req.userinfo then
                headers["Authorization"] = "Basic "..b64(req.userinfo);
        end
                        end
                end
        end
 -      
 +
        -- Attach to request object
        req.method, req.headers, req.body = method, headers, body;
 -      
 +
        local using_https = req.scheme == "https";
        if using_https and not ssl_available then
                error("SSL not available, unable to contact https URL");
        end
        local port_number = port and tonumber(port) or (using_https and 443 or 80);
 -      
 +
        -- Connect the socket, and wrap it with net.server
        local conn = socket.tcp();
        conn:settimeout(10);
                callback(nil, 0, req);
                return nil, err;
        end
 -      
 +
        local sslctx = false;
        if using_https then
                sslctx = ex and ex.sslctx or { mode = "client", protocol = "sslv23", options = { "no_sslv2" } };
  
        req.handler, req.conn = assert(server.wrapclient(conn, host, port_number, listener, "*a", sslctx));
        req.write = function (...) return req.handler:write(...); end
 -      
 +
        req.callback = function (content, code, request, response) log("debug", "Calling callback, status %s", code or "---"); return select(2, xpcall(function () return callback(content, code, request, response) end, handleerr)); end
        req.reader = request_reader;
        req.state = "status";
diff --combined net/http/server.lua
index 09f8d2a486283456a1454866641cfdadbc18d805,7937f87c8277e22608612b6f688ca232d43f4825..be870c51c91e22762d988e450006a0d6f5bd575e
@@@ -142,6 -142,10 +142,10 @@@ function listener.ondisconnect(conn
        sessions[conn] = nil;
  end
  
+ function listener.ondetach(conn)
+       sessions[conn] = nil;
+ end
  function listener.onincoming(conn, data)
        sessions[conn]:feed(data);
  end
@@@ -185,7 -189,6 +189,7 @@@ function handle_request(conn, request, 
                persistent = persistent;
                conn = conn;
                send = _M.send_response;
 +              done = _M.finish_response;
                finish_cb = finish_cb;
        };
        conn._http_open_response = response;
                        err_code, err = 400, "Missing or invalid 'Host' header";
                end
        end
 -      
 +
        if err then
                response.status_code = err_code;
                response:send(events.fire_event("http-error", { code = err_code, message = err }));
        response.status_code = 404;
        response:send(events.fire_event("http-error", { code = 404 }));
  end
 -function _M.send_response(response, body)
 -      if response.finished then return; end
 -      response.finished = true;
 -      response.conn._http_open_response = nil;
 -      
 +local function prepare_header(response)
        local status_line = "HTTP/"..response.request.httpversion.." "..(response.status or codes[response.status_code]);
        local headers = response.headers;
 -      body = body or response.body or "";
 -      headers.content_length = #body;
 -
        local output = { status_line };
        for k,v in pairs(headers) do
                t_insert(output, headerfix[k]..v);
        end
        t_insert(output, "\r\n\r\n");
 +      return output;
 +end
 +_M.prepare_header = prepare_header;
 +function _M.send_response(response, body)
 +      if response.finished then return; end
 +      body = body or response.body or "";
 +      response.headers.content_length = #body;
 +      local output = prepare_header(response);
        t_insert(output, body);
 -
        response.conn:write(t_concat(output));
 +      response:done();
 +end
 +function _M.finish_response(response)
 +      if response.finished then return; end
 +      response.finished = true;
 +      response.conn._http_open_response = nil;
        if response.on_destroy then
                response:on_destroy();
                response.on_destroy = nil;
diff --combined net/server_event.lua
index e10606dded0dd079f17ecd176cc40ef585274b4a,45938a13b46c94f718c3fcf8c6259dd3d1e31220..756e9837b43c7ffa13de2c2ba1c6fe03916519f6
@@@ -115,10 -115,10 +115,10 @@@ end )( 
  local interface_mt
  do
        interface_mt = {}; interface_mt.__index = interface_mt;
 -      
 +
        local addevent = base.addevent
        local coroutine_wrap, coroutine_yield = coroutine.wrap,coroutine.yield
 -      
 +
        -- Private methods
        function interface_mt:_position(new_position)
                        self.position = new_position or self.position
        function interface_mt:_close()
                return self:_destroy();
        end
 -      
 +
        function interface_mt:_start_connection(plainssl) -- should be called from addclient
                        local callback = function( event )
                                if EV_TIMEOUT == event then  -- timeout during connection
                        interfacelist( "delete", self )
                        return true
        end
 -      
 +
        function interface_mt:_lock(nointerface, noreading, nowriting)  -- lock or unlock this interface or events
                        self.nointerface, self.noreading, self.nowriting = nointerface, noreading, nowriting
                        return nointerface, noreading, nowriting
        end
 -      
 +
        --TODO: Deprecate
        function interface_mt:lock_read(switch)
                if switch then
                end
                return self._connections
        end
 -      
 +
        -- Public methods
        function interface_mt:write(data)
                if self.nowriting then return nil, "locked" end
                        return true
                end
        end
 -      
 +
        function interface_mt:socket()
                return self.conn
        end
 -      
 +
        function interface_mt:server()
                return self._server or self;
        end
 -      
 +
        function interface_mt:port()
                return self._port
        end
 -      
 +
        function interface_mt:serverport()
                return self._serverport
        end
 -      
 +
        function interface_mt:ip()
                return self._ip
        end
 -      
 +
        function interface_mt:ssl()
                return self._usingssl
        end
        function interface_mt:type()
                return self._type or "client"
        end
 -      
 +
        function interface_mt:connections()
                return self._connections
        end
 -      
 +
        function interface_mt:address()
                return self.addr
        end
 -      
 +
        function interface_mt:set_sslctx(sslctx)
                self._sslctx = sslctx;
                if sslctx then
                end
                return self._pattern;
        end
 -      
 +
        function interface_mt:set_send(new_send)
                -- No-op, we always use the underlying connection's send
        end
 -      
 +
        function interface_mt:starttls(sslctx, call_onconnect)
                debug( "try to start ssl at client id:", self.id )
                local err
                self.starttls = false;
                return true
        end
 -      
 +
        function interface_mt:setoption(option, value)
                if self.conn.setoption then
                        return self.conn:setoption(option, value);
                end
                return false, "setoption not implemented";
        end
 -      
 +
        function interface_mt:setlistener(listener)
-               self.onconnect, self.ondisconnect, self.onincoming, self.ontimeout, self.onreadtimeout, self.onstatus
-                       = listener.onconnect, listener.ondisconnect, listener.onincoming,
-                         listener.ontimeout, listener.onreadtimeout, listener.onstatus;
+               self:ondetach(); -- Notify listener that it is no longer responsible for this connection
 -              self.onconnect, self.ondisconnect, self.onincoming,
 -              self.ontimeout, self.onstatus, self.ondetach
 -                      = listener.onconnect, listener.ondisconnect, listener.onincoming,
 -                      listener.ontimeout, listener.onstatus, listener.ondetach;
++              self.onconnect, self.ondisconnect, self.onincoming, self.ontimeout,
++              self.onreadtimeout, self.onstatus, self.ondetach
++                      = listener.onconnect, listener.ondisconnect, listener.onincoming, listener.ontimeout,
++                        listener.onreadtimeout, listener.onstatus, listener.ondetach;
        end
 -      
 +
        -- Stub handlers
        function interface_mt:onconnect()
        end
        end
        function interface_mt:ontimeout()
        end
 +      function interface_mt:onreadtimeout()
 +              self.fatalerror = "timeout during receiving"
 +              debug( "connection failed:", self.fatalerror )
 +              self:_close()
 +              self.eventread = nil
 +      end
        function interface_mt:ondrain()
        end
+       function interface_mt:ondetach()
+       end
        function interface_mt:onstatus()
        end
  end
@@@ -485,8 -483,8 +489,9 @@@ d
                        ondisconnect = listener.ondisconnect;  -- will be called when client disconnects
                        onincoming = listener.onincoming;  -- will be called when client sends data
                        ontimeout = listener.ontimeout; -- called when fatal socket timeout occurs
 +                      onreadtimeout = listener.onreadtimeout; -- called when socket inactivity timeout occurs
                        ondrain = listener.ondrain; -- called when writebuffer is empty
+                       ondetach = listener.ondetach; -- called when disassociating this listener from this connection
                        onstatus = listener.onstatus; -- called for status changes (e.g. of SSL/TLS)
                        eventread = false, eventwrite = false, eventclose = false,
                        eventhandshake = false, eventstarthandshake = false;  -- event handler
                        noreading = false, nowriting = false;  -- locks of the read/writecallback
                        startsslcallback = false;  -- starting handshake callback
                        position = false;  -- position of client in interfacelist
 -                      
 +
                        -- Properties
                        _ip = ip, _port = port, _server = server, _pattern = pattern,
                        _serverport = (server and server:port() or nil),
                                end
                        end
                end
 -              
 +
                interface.readcallback = function( event )  -- called on read events
                        --vdebug( "new client read event, id/ip/port:", tostring(interface.id), tostring(ip), tostring(port) )
                        if interface.noreading or interface.fatalerror then  -- leave this event
                                interface.eventread = nil
                                return -1
                        end
 -                      if EV_TIMEOUT == event then  -- took too long to get some data from client -> disconnect
 -                              interface.fatalerror = "timeout during receiving"
 -                              debug( "connection failed:", interface.fatalerror )
 +                      if EV_TIMEOUT == event and interface:onreadtimeout() ~= true then
 +                              return -1 -- took too long to get some data from client -> disconnect
 +                      end
 +                      if interface._usingssl then  -- handle luasec
 +                              if interface.eventwritetimeout then  -- ok, in the past writecallback was regged
 +                                      local ret = interface.writecallback( )  -- call it
 +                                      --vdebug( "tried to write in readcallback, result:", tostring(ret) )
 +                              end
 +                              if interface.eventreadtimeout then
 +                                      interface.eventreadtimeout:close( )
 +                                      interface.eventreadtimeout = nil
 +                              end
 +                      end
 +                      local buffer, err, part = interface.conn:receive( interface._pattern )  -- receive buffer with "pattern"
 +                      --vdebug( "read data:", tostring(buffer), "error:", tostring(err), "part:", tostring(part) )
 +                      buffer = buffer or part
 +                      if buffer and #buffer > cfg.MAX_READ_LENGTH then  -- check buffer length
 +                              interface.fatalerror = "receive buffer exceeded"
 +                              debug( "fatal error:", interface.fatalerror )
                                interface:_close()
                                interface.eventread = nil
                                return -1
 -                      else -- can read
 -                              if interface._usingssl then  -- handle luasec
 -                                      if interface.eventwritetimeout then  -- ok, in the past writecallback was regged
 -                                              local ret = interface.writecallback( )  -- call it
 -                                              --vdebug( "tried to write in readcallback, result:", tostring(ret) )
 -                                      end
 -                                      if interface.eventreadtimeout then
 -                                              interface.eventreadtimeout:close( )
 -                                              interface.eventreadtimeout = nil
 +                      end
 +                      if err and ( err ~= "timeout" and err ~= "wantread" ) then
 +                              if "wantwrite" == err then -- need to read on write event
 +                                      if not interface.eventwrite then  -- register new write event if needed
 +                                              interface.eventwrite = addevent( base, interface.conn, EV_WRITE, interface.writecallback, cfg.WRITE_TIMEOUT )
                                        end
 -                              end
 -                              local buffer, err, part = interface.conn:receive( interface._pattern )  -- receive buffer with "pattern"
 -                              --vdebug( "read data:", tostring(buffer), "error:", tostring(err), "part:", tostring(part) )
 -                              buffer = buffer or part
 -                              if buffer and #buffer > cfg.MAX_READ_LENGTH then  -- check buffer length
 -                                      interface.fatalerror = "receive buffer exceeded"
 -                                      debug( "fatal error:", interface.fatalerror )
 +                                      interface.eventreadtimeout = addevent( base, nil, EV_TIMEOUT,
 +                                              function( )
 +                                                      interface:_close()
 +                                              end, cfg.READ_TIMEOUT
 +                                      )
 +                                      debug( "wantwrite during read attempt, reg it in writecallback but dont know what really happens next..." )
 +                                      -- to be honest i dont know what happens next, if it is allowed to first read, the write etc...
 +                              else  -- connection was closed or fatal error
 +                                      interface.fatalerror = err
 +                                      debug( "connection failed in read event:", interface.fatalerror )
                                        interface:_close()
                                        interface.eventread = nil
                                        return -1
                                end
 -                              if err and ( err ~= "timeout" and err ~= "wantread" ) then
 -                                      if "wantwrite" == err then -- need to read on write event
 -                                              if not interface.eventwrite then  -- register new write event if needed
 -                                                      interface.eventwrite = addevent( base, interface.conn, EV_WRITE, interface.writecallback, cfg.WRITE_TIMEOUT )
 -                                              end
 -                                              interface.eventreadtimeout = addevent( base, nil, EV_TIMEOUT,
 -                                                      function( )
 -                                                              interface:_close()
 -                                                      end, cfg.READ_TIMEOUT
 -                                              )
 -                                              debug( "wantwrite during read attempt, reg it in writecallback but dont know what really happens next..." )
 -                                              -- to be honest i dont know what happens next, if it is allowed to first read, the write etc...
 -                                      else  -- connection was closed or fatal error
 -                                              interface.fatalerror = err
 -                                              debug( "connection failed in read event:", interface.fatalerror )
 -                                              interface:_close()
 -                                              interface.eventread = nil
 -                                              return -1
 -                                      end
 -                              else
 -                                      interface.onincoming( interface, buffer, err )  -- send new data to listener
 -                              end
 -                              if interface.noreading then
 -                                      interface.eventread = nil;
 -                                      return -1;
 -                              end
 -                              return EV_READ, cfg.READ_TIMEOUT
 +                      else
 +                              interface.onincoming( interface, buffer, err )  -- send new data to listener
                        end
 +                      if interface.noreading then
 +                              interface.eventread = nil;
 +                              return -1;
 +                      end
 +                      return EV_READ, cfg.READ_TIMEOUT
                end
  
                client:settimeout( 0 )  -- set non blocking
@@@ -649,7 -652,7 +654,7 @@@ d
                debug "creating server interface..."
                local interface = {
                        _connections = 0;
 -                      
 +
                        conn = server;
                        onconnect = listener.onconnect;  -- will be called when new client connected
                        eventread = false;  -- read event handler
                        readcallback = false; -- read event callback
                        fatalerror = false; -- error message
                        nointerface = true;  -- lock/unlock parameter
 -                      
 +
                        _ip = addr, _port = port, _pattern = pattern,
                        _sslctx = sslctx;
                }
                                        clientinterface:_start_session( true )
                                end
                                debug( "accepted incoming client connection from:", client_ip or "<unknown IP>", client_port or "<unknown port>", "to", port or "<unknown port>");
 -                              
 +
                                client, err = server:accept()    -- try to accept again
                        end
                        return EV_READ
                end
 -              
 +
                server:settimeout( 0 )
                setmetatable(interface, interface_mt)
                interfacelist( "add", interface )
@@@ -744,7 -747,7 +749,7 @@@ d
                return interface, client
                --function handleclient( client, ip, port, server, pattern, listener, _, sslctx )  -- creates an client interface
        end
 -      
 +
        function addclient( addr, serverport, listener, pattern, localaddr, localport, sslcfg, startssl )
                local client, err = socket.tcp()  -- creating new socket
                if not client then
@@@ -835,14 -838,14 +840,14 @@@ en
  
  local function link(sender, receiver, buffersize)
        local sender_locked;
 -      
 +
        function receiver:ondrain()
                if sender_locked then
                        sender:resume();
                        sender_locked = nil;
                end
        end
 -      
 +
        function sender:onincoming(data)
                receiver:write(data);
                if receiver.writebufferlen >= buffersize then
diff --combined net/server_select.lua
index 6308e62fa4485b623243ab6d29b24edfce3a831f,7ac4152314cca2068f711b3f8865fbe5371b73d3..d2192f0abd827bea81de3746f06bde11d5b3ea30
@@@ -1,7 -1,7 +1,7 @@@
 --- 
 +--
  -- server.lua by blastbeat of the luadch project
  -- Re-used here under the MIT/X Consortium License
 --- 
 +--
  -- Modifications (C) 2008-2010 Matthew Wild, Waqas Hussain
  --
  
@@@ -145,7 -145,7 +145,7 @@@ _tcpbacklog = 128 -- some kind of hint 
  _maxsendlen = 51000 * 1024 -- max len of send buffer
  _maxreadlen = 25000 * 1024 -- max len of read buffer
  
 -_checkinterval = 1200000 -- interval in secs to check idle clients
 +_checkinterval = 30 -- interval in secs to check idle clients
  _sendtimeout = 60000 -- allowed send idle time in secs
  _readtimeout = 6 * 60 * 60 -- allowed read idle time in secs
  
@@@ -284,7 -284,7 +284,8 @@@ wrapconnection = function( server, list
        local status = listeners.onstatus
        local disconnect = listeners.ondisconnect
        local drain = listeners.ondrain
 +      local onreadtimeout = listeners.onreadtimeout;
+       local detach = listeners.ondetach
  
        local bufferqueue = { } -- buffer array
        local bufferqueuelen = 0        -- end of buffer array
        handler.disconnect = function( )
                return disconnect
        end
 +      handler.onreadtimeout = onreadtimeout;
 +
        handler.setlistener = function( self, listeners )
+               if detach then
+                       detach(self) -- Notify listener that it is no longer responsible for this connection
+               end
                dispatch = listeners.onincoming
                disconnect = listeners.ondisconnect
                status = listeners.onstatus
                drain = listeners.ondrain
 +              handler.onreadtimeout = listeners.onreadtimeout
+               detach = listeners.ondetach
        end
        handler.getstats = function( )
                return readtraffic, sendtraffic
                        shutdown = id
                        _socketlist[ socket ] = handler
                        _readlistlen = addsocket(_readlist, socket, _readlistlen)
 -                      
 +
                        -- remove traces of the old socket
                        _readlistlen = removesocket( _readlist, oldsocket, _readlistlen )
                        _sendlistlen = removesocket( _sendlist, oldsocket, _sendlistlen )
@@@ -700,7 -701,7 +705,7 @@@ local function link(sender, receiver, b
                        sender_locked = nil;
                end
        end
 -      
 +
        local _readbuffer = sender.readbuffer;
        function sender.readbuffer()
                _readbuffer();
@@@ -869,16 -870,16 +874,16 @@@ loop = function(once) -- this is the ma
                        _starttime = _currenttime
                        for handler, timestamp in pairs( _writetimes ) do
                                if os_difftime( _currenttime - timestamp ) > _sendtimeout then
 -                                      --_writetimes[ handler ] = nil
                                        handler.disconnect( )( handler, "send timeout" )
                                        handler:force_close()    -- forced disconnect
                                end
                        end
                        for handler, timestamp in pairs( _readtimes ) do
                                if os_difftime( _currenttime - timestamp ) > _readtimeout then
 -                                      --_readtimes[ handler ] = nil
 -                                      handler.disconnect( )( handler, "read timeout" )
 -                                      handler:close( )        -- forced disconnect?
 +                                      if not(handler.onreadtimeout) or handler:onreadtimeout() ~= true then
 +                                              handler.disconnect( )( handler, "read timeout" )
 +                                              handler:close( )        -- forced disconnect?
 +                                      end
                                end
                        end
                end
@@@ -939,9 -940,9 +944,9 @@@ local addclient = function( address, po
        client:settimeout( 0 )
        _, err = client:connect( address, port )
        if err then -- try again
 -              local handler = wrapclient( client, address, port, listeners )
 +              return wrapclient( client, address, port, listeners, pattern, sslctx )
        else
 -              wrapconnection( nil, listeners, client, address, port, "clientport", pattern, sslctx )
 +              return wrapconnection( nil, listeners, client, address, port, "clientport", pattern, sslctx )
        end
  end
  
@@@ -971,7 -972,7 +976,7 @@@ return 
  
        addclient = addclient,
        wrapclient = wrapclient,
 -      
 +
        loop = loop,
        link = link,
        step = step,
index 5388a0e606f316bc55b6434b740bcbf498e88daf,e4b5a045ccbab2308b758c4f1f87556c4326929a..a17b1c57856cf263c3c9be0f07151aa9b585a2f4
@@@ -1,7 -1,7 +1,7 @@@
  -- Prosody IM
  -- Copyright (C) 2008-2010 Matthew Wild
  -- Copyright (C) 2008-2010 Waqas Hussain
 --- 
 +--
  -- This project is MIT/X11 licensed. Please see the
  -- COPYING file in the source package for more information.
  --
@@@ -17,6 -17,7 +17,6 @@@ local _G = _G
  
  local prosody = _G.prosody;
  local hosts = prosody.hosts;
 -local incoming_s2s = prosody.incoming_s2s;
  
  local console_listener = { default_port = 5582; default_mode = "*a"; interface = "127.0.0.1" };
  
@@@ -59,20 -60,20 +59,20 @@@ function console:new_session(conn
                        disconnect = function () conn:close(); end;
                        };
        session.env = setmetatable({}, default_env_mt);
 -      
 +
        -- Load up environment with helper objects
        for name, t in pairs(def_env) do
                if type(t) == "table" then
                        session.env[name] = setmetatable({ session = session }, { __index = t });
                end
        end
 -      
 +
        return session;
  end
  
  function console:process_line(session, line)
        local useglobalenv;
 -      
 +
        if line:match("^>") then
                line = line:gsub("^>", "");
                useglobalenv = true;
@@@ -86,9 -87,9 +86,9 @@@
                        return;
                end
        end
 -      
 +
        session.env._ = line;
 -      
 +
        local chunkname = "=console";
        local env = (useglobalenv and redirect_output(_G, session)) or session.env or nil
        local chunk, err = envload("return "..line, chunkname, env);
                        return;
                end
        end
 -      
 +
        local ranok, taskok, message = pcall(chunk);
 -      
 +
        if not (ranok or message or useglobalenv) and commands[line:lower()] then
                commands[line:lower()](session, line);
                return;
        end
 -      
 +
        if not ranok then
                session.print("Fatal error while running command, it did not complete");
                session.print("Error: "..taskok);
                return;
        end
 -      
 +
        if not message then
                session.print("Result: "..tostring(taskok));
                return;
                session.print("Message: "..tostring(message));
                return;
        end
 -      
 +
        session.print("OK: "..tostring(message));
  end
  
@@@ -154,14 -155,6 +154,14 @@@ function console_listener.onincoming(co
        session.partial_data = data:match("[^\n]+$");
  end
  
 +function console_listener.onreadtimeout(conn)
 +      local session = sessions[conn];
 +      if session then
 +              session.send("\0");
 +              return true;
 +      end
 +end
 +
  function console_listener.ondisconnect(conn, err)
        local session = sessions[conn];
        if session then
        end
  end
  
+ function console_listener.ondetach(conn)
+       sessions[conn] = nil;
+ end
  -- Console commands --
  -- These are simple commands, not valid standalone in Lua
  
@@@ -220,11 -217,9 +224,11 @@@ function commands.help(session, data
                print [[c2s:show(jid) - Show all client sessions with the specified JID (or all if no JID given)]]
                print [[c2s:show_insecure() - Show all unencrypted client connections]]
                print [[c2s:show_secure() - Show all encrypted client connections]]
 +              print [[c2s:show_tls() - Show TLS cipher info for encrypted sessions]]
                print [[c2s:close(jid) - Close all sessions for the specified JID]]
        elseif section == "s2s" then
                print [[s2s:show(domain) - Show all s2s connections for the given domain (or all if no domain given)]]
 +              print [[s2s:show_tls(domain) - Show TLS cipher info for encrypted sessions]]
                print [[s2s:close(from, to) - Close a connection from one domain to another]]
                print [[s2s:closeall(host) - Close all the incoming/outgoing s2s sessions to specified host]]
        elseif section == "module" then
@@@ -353,9 -348,9 +357,9 @@@ en
  
  function def_env.module:load(name, hosts, config)
        local mm = require "modulemanager";
 -      
 +
        hosts = get_hosts_set(hosts);
 -      
 +
        -- Load the module for each host
        local ok, err, count, mod = true, nil, 0, nil;
        for host in hosts do
                        end
                end
        end
 -      
 -      return ok, (ok and "Module loaded onto "..count.." host"..(count ~= 1 and "s" or "")) or ("Last error: "..tostring(err));       
 +
 +      return ok, (ok and "Module loaded onto "..count.." host"..(count ~= 1 and "s" or "")) or ("Last error: "..tostring(err));
  end
  
  function def_env.module:unload(name, hosts)
        local mm = require "modulemanager";
  
        hosts = get_hosts_set(hosts, name);
 -      
 +
        -- Unload the module for each host
        local ok, err, count = true, nil, 0;
        for host in hosts do
@@@ -442,7 -437,7 +446,7 @@@ function def_env.module:list(hosts
        if type(hosts) ~= "table" then
                return false, "Please supply a host or a list of hosts you would like to see";
        end
 -      
 +
        local print = self.session.print;
        for _, host in ipairs(hosts) do
                print((host == "*" and "Global" or host)..":");
@@@ -481,57 -476,15 +485,57 @@@ function def_env.config:reload(
        return ok, (ok and "Config reloaded (you may need to reload modules to take effect)") or tostring(err);
  end
  
 -def_env.hosts = {};
 -function def_env.hosts:list()
 -      for host, host_session in pairs(hosts) do
 -              self.session.print(host);
 +local function common_info(session, line)
 +      if session.id then
 +              line[#line+1] = "["..session.id.."]"
 +      else
 +              line[#line+1] = "["..session.type..(tostring(session):match("%x*$")).."]"
        end
 -      return true, "Done";
  end
  
 -function def_env.hosts:add(name)
 +local function session_flags(session, line)
 +      line = line or {};
 +      common_info(session, line);
 +      if session.type == "c2s" then
 +              local status, priority = "unavailable", tostring(session.priority or "-");
 +              if session.presence then
 +                      status = session.presence:get_child_text("show") or "available";
 +              end
 +              line[#line+1] = status.."("..priority..")";
 +      end
 +      if session.cert_identity_status == "valid" then
 +              line[#line+1] = "(authenticated)";
 +      end
 +      if session.secure then
 +              line[#line+1] = "(encrypted)";
 +      end
 +      if session.compressed then
 +              line[#line+1] = "(compressed)";
 +      end
 +      if session.smacks then
 +              line[#line+1] = "(sm)";
 +      end
 +      if session.ip and session.ip:match(":") then
 +              line[#line+1] = "(IPv6)";
 +      end
 +      return table.concat(line, " ");
 +end
 +
 +local function tls_info(session, line)
 +      line = line or {};
 +      common_info(session, line);
 +      if session.secure then
 +              local sock = session.conn and session.conn.socket and session.conn:socket();
 +              if sock and sock.info then
 +                      local info = sock:info();
 +                      line[#line+1] = ("(%s with %s)"):format(info.protocol, info.cipher);
 +              else
 +                      line[#line+1] = "(cipher info unavailable)";
 +              end
 +      else
 +              line[#line+1] = "(insecure)";
 +      end
 +      return table.concat(line, " ");
  end
  
  def_env.c2s = {};
@@@ -552,14 -505,13 +556,14 @@@ function def_env.c2s:count(match_jid
        show_c2s(function (jid, session)
                if (not match_jid) or jid:match(match_jid) then
                        count = count + 1;
 -              end             
 +              end
        end);
        return true, "Total: "..count.." clients";
  end
  
 -function def_env.c2s:show(match_jid)
 +function def_env.c2s:show(match_jid, annotate)
        local print, count = self.session.print, 0;
 +      annotate = annotate or session_flags;
        local curr_host;
        show_c2s(function (jid, session)
                if curr_host ~= session.host then
                end
                if (not match_jid) or jid:match(match_jid) then
                        count = count + 1;
 -                      local status, priority = "unavailable", tostring(session.priority or "-");
 -                      if session.presence then
 -                              status = session.presence:child_with_name("show");
 -                              if status then
 -                                      status = status:get_text() or "[invalid!]";
 -                              else
 -                                      status = "available";
 -                              end
 -                      end
 -                      print("   "..jid.." - "..status.."("..priority..")");
 -              end             
 +                      print(annotate(session, { "  ", jid }));
 +              end
        end);
        return true, "Total: "..count.." clients";
  end
@@@ -580,7 -541,7 +584,7 @@@ function def_env.c2s:show_insecure(matc
                if ((not match_jid) or jid:match(match_jid)) and not session.secure then
                        count = count + 1;
                        print(jid);
 -              end             
 +              end
        end);
        return true, "Total: "..count.." insecure client connections";
  end
@@@ -591,15 -552,11 +595,15 @@@ function def_env.c2s:show_secure(match_
                if ((not match_jid) or jid:match(match_jid)) and session.secure then
                        count = count + 1;
                        print(jid);
 -              end             
 +              end
        end);
        return true, "Total: "..count.." secure client connections";
  end
  
 +function def_env.c2s:show_tls(match_jid)
 +      return self:show(match_jid, tls_info);
 +end
 +
  function def_env.c2s:close(match_jid)
        local count = 0;
        show_c2s(function (jid, session)
        return true, "Total: "..count.." sessions closed";
  end
  
 -local function session_flags(session, line)
 -      if session.cert_identity_status == "valid" then
 -              line[#line+1] = "(secure)";
 -      elseif session.secure then
 -              line[#line+1] = "(encrypted)";
 -      end
 -      if session.compressed then
 -              line[#line+1] = "(compressed)";
 -      end
 -      if session.smacks then
 -              line[#line+1] = "(sm)";
 -      end
 -      if session.conn and session.conn:ip():match(":") then
 -              line[#line+1] = "(IPv6)";
 -      end
 -      return table.concat(line, " ");
 -end
  
  def_env.s2s = {};
 -function def_env.s2s:show(match_jid)
 -      local _print = self.session.print;
 +function def_env.s2s:show(match_jid, annotate)
        local print = self.session.print;
 -      
 +      annotate = annotate or session_flags;
 +
        local count_in, count_out = 0,0;
 -      
 -      for host, host_session in pairs(hosts) do
 -              print = function (...) _print(host); _print(...); print = _print; end
 -              for remotehost, session in pairs(host_session.s2sout) do
 -                      if (not match_jid) or remotehost:match(match_jid) or host:match(match_jid) then
 -                              count_out = count_out + 1;
 -                              print(session_flags(session, {"   ", host, "->", remotehost}));
 -                              if session.sendq then
 -                                      print("        There are "..#session.sendq.." queued outgoing stanzas for this connection");
 -                              end
 -                              if session.type == "s2sout_unauthed" then
 -                                      if session.connecting then
 -                                              print("        Connection not yet established");
 -                                              if not session.srv_hosts then
 -                                                      if not session.conn then
 -                                                              print("        We do not yet have a DNS answer for this host's SRV records");
 -                                                      else
 -                                                              print("        This host has no SRV records, using A record instead");
 -                                                      end
 -                                              elseif session.srv_choice then
 -                                                      print("        We are on SRV record "..session.srv_choice.." of "..#session.srv_hosts);
 -                                                      local srv_choice = session.srv_hosts[session.srv_choice];
 -                                                      print("        Using "..(srv_choice.target or ".")..":"..(srv_choice.port or 5269));
 +      local s2s_list = { };
 +
 +      local s2s_sessions = module:shared"/*/s2s/sessions";
 +      for _, session in pairs(s2s_sessions) do
 +              local remotehost, localhost, direction;
 +              if session.direction == "outgoing" then
 +                      direction = "->";
 +                      count_out = count_out + 1;
 +                      remotehost, localhost = session.to_host or "?", session.from_host or "?";
 +              else
 +                      direction = "<-";
 +                      count_in = count_in + 1;
 +                      remotehost, localhost = session.from_host or "?", session.to_host or "?";
 +              end
 +              local sess_lines = { l = localhost, r = remotehost,
 +                      annotate(session, { "", direction, remotehost or "?" })};
 +
 +              if (not match_jid) or remotehost:match(match_jid) or localhost:match(match_jid) then
 +                      table.insert(s2s_list, sess_lines);
 +                      local print = function (s) table.insert(sess_lines, "        "..s); end
 +                      if session.sendq then
 +                              print("There are "..#session.sendq.." queued outgoing stanzas for this connection");
 +                      end
 +                      if session.type == "s2sout_unauthed" then
 +                              if session.connecting then
 +                                      print("Connection not yet established");
 +                                      if not session.srv_hosts then
 +                                              if not session.conn then
 +                                                      print("We do not yet have a DNS answer for this host's SRV records");
 +                                              else
 +                                                      print("This host has no SRV records, using A record instead");
                                                end
 -                                      elseif session.notopen then
 -                                              print("        The <stream> has not yet been opened");
 -                                      elseif not session.dialback_key then
 -                                              print("        Dialback has not been initiated yet");
 -                                      elseif session.dialback_key then
 -                                              print("        Dialback has been requested, but no result received");
 +                                      elseif session.srv_choice then
 +                                              print("We are on SRV record "..session.srv_choice.." of "..#session.srv_hosts);
 +                                              local srv_choice = session.srv_hosts[session.srv_choice];
 +                                              print("Using "..(srv_choice.target or ".")..":"..(srv_choice.port or 5269));
                                        end
 +                              elseif session.notopen then
 +                                      print("The <stream> has not yet been opened");
 +                              elseif not session.dialback_key then
 +                                      print("Dialback has not been initiated yet");
 +                              elseif session.dialback_key then
 +                                      print("Dialback has been requested, but no result received");
                                end
                        end
 -              end     
 -              local subhost_filter = function (h)
 -                              return (match_jid and h:match(match_jid));
 -                      end
 -              for session in pairs(incoming_s2s) do
 -                      if session.to_host == host and ((not match_jid) or host:match(match_jid)
 -                              or (session.from_host and session.from_host:match(match_jid))
 -                              -- Pft! is what I say to list comprehensions
 -                              or (session.hosts and #array.collect(keys(session.hosts)):filter(subhost_filter)>0)) then
 -                              count_in = count_in + 1;
 -                              print(session_flags(session, {"   ", host, "<-", session.from_host or "(unknown)"}));
 -                              if session.type == "s2sin_unauthed" then
 -                                              print("        Connection not yet authenticated");
 -                              end
 +                      if session.type == "s2sin_unauthed" then
 +                              print("Connection not yet authenticated");
 +                      elseif session.type == "s2sin" then
                                for name in pairs(session.hosts) do
                                        if name ~= session.from_host then
 -                                              print("        also hosts "..tostring(name));
 +                                              print("also hosts "..tostring(name));
                                        end
                                end
                        end
                end
 -              
 -              print = _print;
        end
 -      
 -      for session in pairs(incoming_s2s) do
 -              if not session.to_host and ((not match_jid) or session.from_host and session.from_host:match(match_jid)) then
 -                      count_in = count_in + 1;
 -                      print("Other incoming s2s connections");
 -                      print("    (unknown) <- "..(session.from_host or "(unknown)"));                 
 -              end
 +
 +      -- Sort by local host, then remote host
 +      table.sort(s2s_list, function(a,b)
 +              if a.l == b.l then return a.r < b.r; end
 +              return a.l < b.l;
 +      end);
 +      local lasthost;
 +      for _, sess_lines in ipairs(s2s_list) do
 +              if sess_lines.l ~= lasthost then print(sess_lines.l); lasthost=sess_lines.l end
 +              for _, line in ipairs(sess_lines) do print(line); end
        end
 -      
        return true, "Total: "..count_out.." outgoing, "..count_in.." incoming connections";
  end
  
 +function def_env.s2s:show_tls(match_jid)
 +      return self:show(match_jid, tls_info);
 +end
 +
  local function print_subject(print, subject)
        for _, entry in ipairs(subject) do
                print(
@@@ -720,9 -689,14 +724,9 @@@ en
  function def_env.s2s:showcert(domain)
        local ser = require "util.serialization".serialize;
        local print = self.session.print;
 -      local domain_sessions = set.new(array.collect(keys(incoming_s2s)))
 -              /function(session) return session.from_host == domain and session or nil; end;
 -      for local_host in values(prosody.hosts) do
 -              local s2sout = local_host.s2sout;
 -              if s2sout and s2sout[domain] then
 -                      domain_sessions:add(s2sout[domain]);
 -              end
 -      end
 +      local s2s_sessions = module:shared"/*/s2s/sessions";
 +      local domain_sessions = set.new(array.collect(values(s2s_sessions)))
 +              /function(session) return (session.to_host == domain or session.from_host == domain) and session or nil; end;
        local cert_set = {};
        for session in domain_sessions do
                local conn = session.conn;
        local domain_certs = array.collect(values(cert_set));
        -- Phew. We now have a array of unique certificates presented by domain.
        local n_certs = #domain_certs;
 -      
 +
        if n_certs == 0 then
                return "No certificates found for "..domain;
        end
 -      
 +
        local function _capitalize_and_colon(byte)
                return string.upper(byte)..":";
        end
        local function pretty_fingerprint(hash)
                return hash:gsub("..", _capitalize_and_colon):sub(1, -2);
        end
 -      
 +
        for cert_info in values(domain_certs) do
                local certs = cert_info.certs;
                local cert = certs[1];
@@@ -813,38 -787,76 +817,38 @@@ en
  
  function def_env.s2s:close(from, to)
        local print, count = self.session.print, 0;
 -      
 -      if not (from and to) then
 +      local s2s_sessions = module:shared"/*/s2s/sessions";
 +
 +      local match_id;
 +      if from and not to then
 +              match_id, from = from;
 +      elseif not to then
                return false, "Syntax: s2s:close('from', 'to') - Closes all s2s sessions from 'from' to 'to'";
        elseif from == to then
                return false, "Both from and to are the same... you can't do that :)";
        end
 -      
 -      if hosts[from] and not hosts[to] then
 -              -- Is an outgoing connection
 -              local session = hosts[from].s2sout[to];
 -              if not session then
 -                      print("No outgoing connection from "..from.." to "..to)
 -              else
 +
 +      for _, session in pairs(s2s_sessions) do
 +              local id = session.type..tostring(session):match("[a-f0-9]+$");
 +              if (match_id and match_id == id)
 +              or (session.from_host == from and session.to_host == to) then
 +                      print(("Closing connection from %s to %s [%s]"):format(session.from_host, session.to_host, id));
                        (session.close or s2smanager.destroy_session)(session);
 -                      count = count + 1;
 -                      print("Closed outgoing session from "..from.." to "..to);
 +                      count = count + 1 ;
                end
 -      elseif hosts[to] and not hosts[from] then
 -              -- Is an incoming connection
 -              for session in pairs(incoming_s2s) do
 -                      if session.to_host == to and session.from_host == from then
 -                              (session.close or s2smanager.destroy_session)(session);
 -                              count = count + 1;
                        end
 -              end
 -              
 -              if count == 0 then
 -                      print("No incoming connections from "..from.." to "..to);
 -              else
 -                      print("Closed "..count.." incoming session"..((count == 1 and "") or "s").." from "..from.." to "..to);
 -              end
 -      elseif hosts[to] and hosts[from] then
 -              return false, "Both of the hostnames you specified are local, there are no s2s sessions to close";
 -      else
 -              return false, "Neither of the hostnames you specified are being used on this server";
 -      end
 -      
        return true, "Closed "..count.." s2s session"..((count == 1 and "") or "s");
  end
  
  function def_env.s2s:closeall(host)
          local count = 0;
 -
 -        if not host or type(host) ~= "string" then return false, "wrong syntax: please use s2s:closeall('hostname.tld')"; end
 -        if hosts[host] then
 -                for session in pairs(incoming_s2s) do
 -                        if session.to_host == host then
 -                                (session.close or s2smanager.destroy_session)(session);
 +      local s2s_sessions = module:shared"/*/s2s/sessions";
 +      for _,session in pairs(s2s_sessions) do
 +              if not host or session.from_host == host or session.to_host == host then
 +                      session:close();
                                  count = count + 1;
                          end
                  end
 -                for _, session in pairs(hosts[host].s2sout) do
 -                        (session.close or s2smanager.destroy_session)(session);
 -                        count = count + 1;
 -                end
 -        else
 -                for session in pairs(incoming_s2s) do
 -                      if session.from_host == host then
 -                              (session.close or s2smanager.destroy_session)(session);
 -                              count = count + 1;
 -                      end
 -              end
 -              for _, h in pairs(hosts) do
 -                      if h.s2sout[host] then
 -                              (h.s2sout[host].close or s2smanager.destroy_session)(h.s2sout[host]);
 -                              count = count + 1;
 -                      end
 -              end
 -        end
 -
        if count == 0 then return false, "No sessions to close.";
        else return true, "Closed "..count.." s2s session"..((count == 1 and "") or "s"); end
  end
@@@ -861,19 -873,9 +865,19 @@@ en
  function def_env.host:list()
        local print = self.session.print;
        local i = 0;
 +      local type;
        for host in values(array.collect(keys(prosody.hosts)):sort()) do
                i = i + 1;
 -              print(host);
 +              type = hosts[host].type;
 +              if type == "local" then
 +                      print(host);
 +              else
 +                      type = module:context(host):get_option_string("component_module", type);
 +                      if type ~= "component" then
 +                              type = type .. " component";
 +                      end
 +                      print(("%s (%s)"):format(host, type));
 +              end
        end
        return true, i.." hosts";
  end
@@@ -964,19 -966,6 +968,19 @@@ function def_env.muc:room(room_jid
        return setmetatable({ room = room_obj }, console_room_mt);
  end
  
 +function def_env.muc:list(host)
 +      local host_session = hosts[host];
 +      if not host_session or not host_session.modules.muc then
 +              return nil, "Please supply the address of a local MUC component";
 +      end
 +      local c = 0;
 +      for name in keys(host_session.modules.muc.rooms) do
 +              print(name);
 +              c = c + 1;
 +      end
 +      return true, c.." rooms";
 +end
 +
  local um = require"core.usermanager";
  
  def_env.user = {};
@@@ -1097,12 -1086,12 +1101,12 @@@ function printbanner(session
        local option = module:get_option("console_banner");
        if option == nil or option == "full" or option == "graphic" then
                session.print [[
 -                   ____                \   /     _       
 -                    |  _ \ _ __ ___  ___  _-_   __| |_   _ 
 +                   ____                \   /     _
 +                    |  _ \ _ __ ___  ___  _-_   __| |_   _
                      | |_) | '__/ _ \/ __|/ _ \ / _` | | | |
                      |  __/| | | (_) \__ \ |_| | (_| | |_| |
                      |_|   |_|  \___/|___/\___/ \__,_|\__, |
 -                    A study in simplicity            |___/ 
 +                    A study in simplicity            |___/
  
  ]]
        end
diff --combined plugins/mod_c2s.lua
index bb3858c0770604f527249c40a935daa0733c3a33,3d6487c91eadd11b942e2063a32437b58990ce9c..4238b2e7e1f3b729450dcccd845477f9065ff62e
@@@ -1,7 -1,7 +1,7 @@@
  -- Prosody IM
  -- Copyright (C) 2008-2010 Matthew Wild
  -- Copyright (C) 2008-2010 Waqas Hussain
 --- 
 +--
  -- This project is MIT/X11 licensed. Please see the
  -- COPYING file in the source package for more information.
  --
@@@ -15,10 -15,9 +15,10 @@@ local sessionmanager = require "core.se
  local st = require "util.stanza";
  local sm_new_session, sm_destroy_session = sessionmanager.new_session, sessionmanager.destroy_session;
  local uuid_generate = require "util.uuid".generate;
 +local runner = require "util.async".runner;
  
  local xpcall, tostring, type = xpcall, tostring, type;
 -local traceback = debug.traceback;
 +local t_insert, t_remove = table.insert, table.remove;
  
  local xmlns_xmpp_streams = "urn:ietf:params:xml:ns:xmpp-streams";
  
@@@ -32,12 -31,12 +32,12 @@@ local sessions = module:shared("session
  local core_process_stanza = prosody.core_process_stanza;
  local hosts = prosody.hosts;
  
 -local stream_callbacks = { default_ns = "jabber:client", handlestanza = core_process_stanza };
 +local stream_callbacks = { default_ns = "jabber:client" };
  local listener = {};
 +local runner_callbacks = {};
  
  --- Stream events handlers
  local stream_xmlns_attr = {xmlns='urn:ietf:params:xml:ns:xmpp-streams'};
 -local default_stream_attr = { ["xmlns:stream"] = "http://etherx.jabber.org/streams", xmlns = stream_callbacks.default_ns, version = "1.0", id = "" };
  
  function stream_callbacks.streamopened(session, attr)
        local send = session.send;
        session.streamid = uuid_generate();
        (session.log or session)("debug", "Client sent opening <stream:stream> to %s", session.host);
  
 -      if not hosts[session.host] or not hosts[session.host].users then
 +      if not hosts[session.host] or not hosts[session.host].modules.c2s then
                -- We don't serve this host...
                session:close{ condition = "host-unknown", text = "This server does not serve "..tostring(session.host)};
                return;
        end
  
 -      send("<?xml version='1.0'?>"..st.stanza("stream:stream", {
 -              xmlns = 'jabber:client', ["xmlns:stream"] = 'http://etherx.jabber.org/streams';
 -              id = session.streamid, from = session.host, version = '1.0', ["xml:lang"] = 'en' }):top_tag());
 +      session:open_stream();
  
        (session.log or log)("debug", "Sent reply <stream:stream> to client");
        session.notopen = nil;
        -- since we now have a new stream header, session is secured
        if session.secure == false then
                session.secure = true;
 +              session.encrypted = true;
  
 -              -- Check if TLS compression is used
                local sock = session.conn:socket();
                if sock.info then
 -                      session.compressed = sock:info"compression";
 -              elseif sock.compression then
 -                      session.compressed = sock:compression(); --COMPAT mw/luasec-hg
 +                      local info = sock:info();
 +                      (session.log or log)("info", "Stream encrypted (%s with %s)", info.protocol, info.cipher);
 +                      session.compressed = info.compression;
 +              else
 +                      (session.log or log)("info", "Stream encrypted");
 +                      session.compressed = sock.compression and sock:compression(); --COMPAT mw/luasec-hg
                end
        end
  
        local features = st.stanza("stream:features");
        hosts[session.host].events.fire_event("stream-features", { origin = session, features = features });
 -      module:fire_event("stream-features", session, features);
 -
        send(features);
  end
  
@@@ -116,9 -116,12 +116,9 @@@ function stream_callbacks.error(session
        end
  end
  
 -local function handleerr(err) log("error", "Traceback[c2s]: %s", traceback(tostring(err), 2)); end
  function stream_callbacks.handlestanza(session, stanza)
        stanza = session.filter("stanzas/in", stanza);
 -      if stanza then
 -              return xpcall(function () return core_process_stanza(session, stanza) end, handleerr);
 -      end
 +      session.thread:run(stanza);
  end
  
  --- Session methods
@@@ -126,7 -129,8 +126,7 @@@ local function session_close(session, r
        local log = session.log or log;
        if session.conn then
                if session.notopen then
 -                      session.send("<?xml version='1.0'?>");
 -                      session.send(st.stanza("stream:stream", default_stream_attr):top_tag());
 +                      session:open_stream();
                end
                if reason then -- nil == no err, initiated by us, false == initiated by client
                        local stream_error = st.stanza("stream:error");
                        log("debug", "Disconnecting client, <stream:error> is: %s", stream_error);
                        session.send(stream_error);
                end
 -              
 +
                session.send("</stream:stream>");
                function session.send() return false; end
 -              
 +
                local reason = (reason and (reason.name or reason.text or reason.condition)) or reason;
 -              session.log("info", "c2s stream for %s closed: %s", session.full_jid or ("<"..session.ip..">"), reason or "session closed");
 +              session.log("debug", "c2s stream for %s closed: %s", session.full_jid or ("<"..session.ip..">"), reason or "session closed");
  
                -- Authenticated incoming stream may still be sending us stanzas, so wait for </stream:stream> from remote
                local conn = session.conn;
@@@ -184,29 -188,16 +184,29 @@@ module:hook_global("user-deleted", func
        end
  end, 200);
  
 +function runner_callbacks:ready()
 +      self.data.conn:resume();
 +end
 +
 +function runner_callbacks:waiting()
 +      self.data.conn:pause();
 +end
 +
 +function runner_callbacks:error(err)
 +      (self.data.log or log)("error", "Traceback[c2s]: %s", err);
 +end
 +
  --- Port listener
  function listener.onconnect(conn)
        local session = sm_new_session(conn);
        sessions[conn] = session;
 -      
 +
        session.log("info", "Client connected");
 -      
 +
        -- Client is using legacy SSL (otherwise mod_tls sets this flag)
        if conn:ssl() then
                session.secure = true;
 +              session.encrypted = true;
  
                -- Check if TLS compression is used
                local sock = conn:socket();
                        session.compressed = sock:compression(); --COMPAT mw/luasec-hg
                end
        end
 -      
 +
        if opt_keepalives then
                conn:setoption("keepalive", opt_keepalives);
        end
 -      
 +
        session.close = session_close;
 -      
 +
        local stream = new_xmpp_stream(session, stream_callbacks);
        session.stream = stream;
        session.notopen = true;
 -      
 +
        function session.reset_stream()
                session.notopen = true;
                session.stream:reset();
        end
 -      
 +
 +      session.thread = runner(function (stanza)
 +              core_process_stanza(session, stanza);
 +      end, runner_callbacks, session);
 +
        local filter = session.filter;
        function session.data(data)
-                       data = filter("bytes/in", data);
-                       if data then
-                               local ok, err = stream:feed(data);
 +              -- Parse the data, which will store stanzas in session.pending_stanzas
 +              if data then
 -                      if ok then return; end
 -                      log("debug", "Received invalid XML (%s) %d bytes: %s", tostring(err), #data, data:sub(1, 300):gsub("[\r\n]+", " "):gsub("[%z\1-\31]", "_"));
 -                      session:close("not-well-formed");
+               data = filter("bytes/in", data);
+               if data then
+                       local ok, err = stream:feed(data);
 +                              if not ok then
 +                                      log("debug", "Received invalid XML (%s) %d bytes: %s", tostring(err), #data, data:sub(1, 300):gsub("[\r\n]+", " "):gsub("[%z\1-\31]", "_"));
 +                                      session:close("not-well-formed");
 +                              end
 +                      end
                end
        end
  
 -      
        if c2s_timeout then
                add_task(c2s_timeout, function ()
                        if session.type == "c2s_unauthed" then
@@@ -278,27 -262,14 +278,27 @@@ function listener.ondisconnect(conn, er
        end
  end
  
 +function listener.onreadtimeout(conn)
 +      local session = sessions[conn];
 +      if session then
 +              return (hosts[session.host] or prosody).events.fire_event("c2s-read-timeout", { session = session });
 +      end
 +end
 +
 +local function keepalive(event)
 +      return event.session.send(' ');
 +end
 +
  function listener.associate_session(conn, session)
        sessions[conn] = session;
  end
  
 -function listener.ondetach(conn)
 -      sessions[conn] = nil;
 +function module.add_host(module)
 +      module:hook("c2s-read-timeout", keepalive, -1);
  end
  
 +module:hook("c2s-read-timeout", keepalive, -1);
 +
  module:hook("server-stopping", function(event)
        local reason = event.reason;
        for _, session in pairs(sessions) do
index 297609d830d1a55ffab5c797477cef2c9e83fd19,7bc0f5b76dcc1814fdc65e6c65b8dbe776727d16..53ef4ed028b79670606f218fb55605fb5f917cf6
@@@ -1,7 -1,7 +1,7 @@@
  -- Prosody IM
  -- Copyright (C) 2008-2010 Matthew Wild
  -- Copyright (C) 2008-2010 Waqas Hussain
 --- 
 +--
  -- This project is MIT/X11 licensed. Please see the
  -- COPYING file in the source package for more information.
  --
@@@ -33,7 -33,7 +33,7 @@@ function module.add_host(module
        if module:get_host_type() ~= "component" then
                error("Don't load mod_component manually, it should be for a component, please see http://prosody.im/doc/components", 0);
        end
 -      
 +
        local env = module.environment;
        env.connected = false;
  
                send = nil;
                session.on_destroy = nil;
        end
 -      
 +
        -- Handle authentication attempts by component
        local function handle_component_auth(event)
                local session, stanza = event.origin, event.stanza;
 -              
 +
                if session.type ~= "component_unauthed" then return; end
 -      
 +
                if (not session.host) or #stanza.tags > 0 then
                        (session.log or log)("warn", "Invalid component handshake for host: %s", session.host);
                        session:close("not-authorized");
                        return true;
                end
 -              
 +
                local secret = module:get_option("component_secret");
                if not secret then
                        (session.log or log)("warn", "Component attempted to identify as %s, but component_secret is not set", session.host);
                        session:close("not-authorized");
                        return true;
                end
 -              
 +
                local supplied_token = t_concat(stanza);
                local calculated_token = sha1(session.streamid..secret, true);
                if supplied_token:lower() ~= calculated_token:lower() then
                        session:close{ condition = "not-authorized", text = "Given token does not match calculated token" };
                        return true;
                end
 -              
 +
                if env.connected then
                        module:log("error", "Second component attempted to connect, denying connection");
                        session:close{ condition = "conflict", text = "Component already connected" };
                        return true;
                end
 -              
 +
                env.connected = true;
                send = session.send;
                session.on_destroy = on_destroy;
@@@ -85,7 -85,7 +85,7 @@@
                session.type = "component";
                module:log("info", "External component successfully authenticated");
                session.send(st.stanza("handshake"));
 -      
 +
                return true;
        end
        module:hook("stanza/jabber:component:accept:handshake", handle_component_auth, -1);
                end
                return true;
        end
 -      
 +
        module:hook("iq/bare", handle_stanza, -1);
        module:hook("message/bare", handle_stanza, -1);
        module:hook("presence/bare", handle_stanza, -1);
@@@ -177,7 -177,9 +177,7 @@@ function stream_callbacks.streamopened(
        session.streamid = uuid_gen();
        session.notopen = nil;
        -- Return stream header
 -      session.send("<?xml version='1.0'?>");
 -      session.send(st.stanza("stream:stream", { xmlns=xmlns_component,
 -                      ["xmlns:stream"]='http://etherx.jabber.org/streams', id=session.streamid, from=session.host }):top_tag());
 +      session:open_stream();
  end
  
  function stream_callbacks.streamclosed(session)
@@@ -273,14 -275,14 +273,14 @@@ function listener.onconnect(conn
        if opt_keepalives then
                conn:setoption("keepalive", opt_keepalives);
        end
 -      
 +
        session.log("info", "Incoming Jabber component connection");
 -      
 +
        local stream = new_xmpp_stream(session, stream_callbacks);
        session.stream = stream;
 -      
 +
        session.notopen = true;
 -      
 +
        function session.reset_stream()
                session.notopen = true;
                session.stream:reset();
                module:log("debug", "Received invalid XML (%s) %d bytes: %s", tostring(err), #data, data:sub(1, 300):gsub("[\r\n]+", " "):gsub("[%z\1-\31]", "_"));
                session:close("not-well-formed");
        end
 -      
 +
        session.dispatch_stanza = stream_callbacks.handlestanza;
  
        sessions[conn] = session;
@@@ -317,6 -319,10 +317,10 @@@ function listener.ondisconnect(conn, er
        end
  end
  
+ function listener.ondetach(conn)
+       sessions[conn] = nil;
+ end
  module:provides("net", {
        name = "component";
        private = true;
index 8614b8570933085a79dc5dfaeb251e9bf042862e,ee03987de0254c5ac2767df1319c9475852ffb32..0a2b5bb773bd08cad82ad808c58ce3f5d2af938a
@@@ -1,7 -1,7 +1,7 @@@
  -- Prosody IM
  -- Copyright (C) 2008-2010 Matthew Wild
  -- Copyright (C) 2008-2010 Waqas Hussain
 --- 
 +--
  -- This project is MIT/X11 licensed. Please see the
  -- COPYING file in the source package for more information.
  --
@@@ -15,6 -15,7 +15,6 @@@ local core_process_stanza = prosody.cor
  local tostring, type = tostring, type;
  local t_insert = table.insert;
  local xpcall, traceback = xpcall, debug.traceback;
 -local NULL = {};
  
  local add_task = require "util.timer".add_task;
  local st = require "util.stanza";
@@@ -25,6 -26,7 +25,6 @@@ local s2s_new_incoming = require "core.
  local s2s_new_outgoing = require "core.s2smanager".new_outgoing;
  local s2s_destroy_session = require "core.s2smanager".destroy_session;
  local uuid_gen = require "util.uuid".generate;
 -local cert_verify_identity = require "util.x509".verify_identity;
  local fire_global_event = prosody.events.fire_event;
  
  local s2sout = module:require("s2sout");
@@@ -133,12 -135,6 +133,12 @@@ function route_to_new_session(event
        return true;
  end
  
 +local function keepalive(event)
 +      return event.session.sends2s(' ');
 +end
 +
 +module:hook("s2s-read-timeout", keepalive, -1);
 +
  function module.add_host(module)
        if module:get_option_boolean("disallow_s2s", false) then
                module:log("warn", "The 'disallow_s2s' config option is deprecated, please see http://prosody.im/doc/s2s#disabling");
        module:hook("route/remote", route_to_existing_session, -1);
        module:hook("route/remote", route_to_new_session, -10);
        module:hook("s2s-authenticated", make_authenticated, -1);
 +      module:hook("s2s-read-timeout", keepalive, -1);
 +      module:hook_stanza("http://etherx.jabber.org/streams", "features", function (session, stanza)
 +              if session.type == "s2sout" then
 +                      -- Stream is authenticated and we are seem to be done with feature negotiation,
 +                      -- so the stream is ready for stanzas.  RFC 6120 Section 4.3
 +                      mark_connected(session);
 +              end
 +      end, -1);
  end
  
  -- Stream is authorised, and ready for normal stanzas
  function mark_connected(session)
        local sendq, send = session.sendq, session.sends2s;
 -      
 +
        local from, to = session.from_host, session.to_host;
 -      
 -      session.log("info", "%s s2s connection %s->%s complete", session.direction, from, to);
 +
 +      session.log("info", "%s s2s connection %s->%s complete", session.direction:gsub("^.", string.upper), from, to);
  
        local event_data = { session = session };
        if session.type == "s2sout" then
                fire_global_event("s2sin-established", event_data);
                hosts[to].events.fire_event("s2sin-established", event_data);
        end
 -      
 +
        if session.direction == "outgoing" then
                if sendq then
                        session.log("debug", "sending %d queued stanzas across new outgoing connection to %s", #sendq, session.to_host);
                        end
                        session.sendq = nil;
                end
 -              
 +
                session.ip_hosts = nil;
                session.srv_hosts = nil;
        end
@@@ -223,17 -211,14 +223,17 @@@ function make_authenticated(event
                return false;
        end
        session.log("debug", "connection %s->%s is now authenticated for %s", session.from_host, session.to_host, host);
 -      
 -      mark_connected(session);
 -      
 +
 +      if (session.type == "s2sout" and session.external_auth ~= "succeeded") or session.type == "s2sin" then
 +              -- Stream either used dialback for authentication or is an incoming stream.
 +              mark_connected(session);
 +      end
 +
        return true;
  end
  
  --- Helper to check that a session peer's certificate is valid
 -local function check_cert_status(session)
 +function check_cert_status(session)
        local host = session.direction == "outgoing" and session.to_host or session.from_host
        local conn = session.conn:socket()
        local cert
                cert = conn:getpeercertificate()
        end
  
 -      if cert then
 -              local chain_valid, errors;
 -              if conn.getpeerverification then
 -                      chain_valid, errors = conn:getpeerverification();
 -              elseif conn.getpeerchainvalid then -- COMPAT mw/luasec-hg
 -                      chain_valid, errors = conn:getpeerchainvalid();
 -                      errors = (not chain_valid) and { { errors } } or nil;
 -              else
 -                      chain_valid, errors = false, { { "Chain verification not supported by this version of LuaSec" } };
 -              end
 -              -- Is there any interest in printing out all/the number of errors here?
 -              if not chain_valid then
 -                      (session.log or log)("debug", "certificate chain validation result: invalid");
 -                      for depth, t in pairs(errors or NULL) do
 -                              (session.log or log)("debug", "certificate error(s) at depth %d: %s", depth-1, table.concat(t, ", "))
 -                      end
 -                      session.cert_chain_status = "invalid";
 -              else
 -                      (session.log or log)("debug", "certificate chain validation result: valid");
 -                      session.cert_chain_status = "valid";
 -
 -                      -- We'll go ahead and verify the asserted identity if the
 -                      -- connecting server specified one.
 -                      if host then
 -                              if cert_verify_identity(host, "xmpp-server", cert) then
 -                                      session.cert_identity_status = "valid"
 -                              else
 -                                      session.cert_identity_status = "invalid"
 -                              end
 -                              (session.log or log)("debug", "certificate identity validation result: %s", session.cert_identity_status);
 -                      end
 -              end
 -      end
        return module:fire_event("s2s-check-certificate", { host = host, session = session, cert = cert });
  end
  
@@@ -252,28 -270,25 +252,28 @@@ local xmlns_xmpp_streams = "urn:ietf:pa
  
  function stream_callbacks.streamopened(session, attr)
        local send = session.sends2s;
 -      
 +
        session.version = tonumber(attr.version) or 0;
 -      
 +
        -- TODO: Rename session.secure to session.encrypted
        if session.secure == false then
                session.secure = true;
 +              session.encrypted = true;
  
 -              -- Check if TLS compression is used
                local sock = session.conn:socket();
                if sock.info then
 -                      session.compressed = sock:info"compression";
 -              elseif sock.compression then
 -                      session.compressed = sock:compression(); --COMPAT mw/luasec-hg
 +                      local info = sock:info();
 +                      (session.log or log)("info", "Stream encrypted (%s with %s)", info.protocol, info.cipher);
 +                      session.compressed = info.compression;
 +              else
 +                      (session.log or log)("info", "Stream encrypted");
 +                      session.compressed = sock.compression and sock:compression(); --COMPAT mw/luasec-hg
                end
        end
  
        if session.direction == "incoming" then
                -- Send a reply stream header
 -              
 +
                -- Validate to/from
                local to, from = nameprep(attr.to), nameprep(attr.from);
                if not to and attr.to then -- COMPAT: Some servers do not reliably set 'to' (especially on stream restarts)
                        session:close({ condition = "improper-addressing", text = "Invalid 'from' address" });
                        return;
                end
 -              
 +
                -- Set session.[from/to]_host if they have not been set already and if
                -- this session isn't already authenticated
                if session.type == "s2sin_unauthed" and from and not session.from_host then
                        session:close({ condition = "improper-addressing", text = "New stream 'to' attribute does not match original" });
                        return;
                end
 -              
 +
                -- For convenience we'll put the sanitised values into these variables
                to, from = session.to_host, session.from_host;
 -              
 +
                session.streamid = uuid_gen();
                (session.log or log)("debug", "Incoming s2s received %s", st.stanza("stream:stream", attr):top_tag());
                if to then
                session:open_stream(session.to_host, session.from_host)
                if session.version >= 1.0 then
                        local features = st.stanza("stream:features");
 -                      
 +
                        if to then
                                hosts[to].events.fire_event("s2s-stream-features", { origin = session, features = features });
                        else
                                (session.log or log)("warn", "No 'to' on stream header from %s means we can't offer any features", from or session.ip or "unknown host");
                        end
 -                      
 +
                        log("debug", "Sending stream features: %s", tostring(features));
                        send(features);
                end
                session.notopen = nil;
        elseif session.direction == "outgoing" then
                session.notopen = nil;
-               -- If we are just using the connection for verifying dialback keys, we won't try and auth it
-               if not attr.id then error("stream response did not give us a streamid!!!"); end
+               if not attr.id then
+                       log("error", "Stream response did not give us a stream id!");
+                       session:close({ condition = "undefined-condition", text = "Missing stream ID" });
+                       return;
+               end
                session.streamid = attr.id;
  
                if session.secure and not session.cert_chain_status then
                        end
                end
                session.send_buffer = nil;
 -      
 +
                -- If server is pre-1.0, don't wait for features, just do dialback
                if session.version < 1.0 then
                        if not session.dialback_verifying then
@@@ -466,10 -484,10 +469,10 @@@ local function session_close(session, r
  
                session.sends2s("</stream:stream>");
                function session.sends2s() return false; end
 -              
 +
                local reason = remote_reason or (reason and (reason.text or reason.condition)) or reason;
 -              session.log("info", "%s s2s stream %s->%s closed: %s", session.direction, session.from_host or "(unknown host)", session.to_host or "(unknown host)", reason or "stream closed");
 -              
 +              session.log("info", "%s s2s stream %s->%s closed: %s", session.direction:gsub("^.", string.upper), session.from_host or "(unknown host)", session.to_host or "(unknown host)", reason or "stream closed");
 +
                -- Authenticated incoming stream may still be sending us stanzas, so wait for </stream:stream> from remote
                local conn = session.conn;
                if reason == nil and not session.notopen and session.type == "s2sin" then
        end
  end
  
 -function session_open_stream(session, from, to)
 -      local attr = {
 -              ["xmlns:stream"] = 'http://etherx.jabber.org/streams',
 -              xmlns = 'jabber:server',
 -              version = session.version and (session.version > 0 and "1.0" or nil),
 -              ["xml:lang"] = 'en',
 -              id = session.streamid,
 -              from = from, to = to,
 -      }
 +function session_stream_attrs(session, from, to, attr)
        if not from or (hosts[from] and hosts[from].modules.dialback) then
                attr["xmlns:db"] = 'jabber:server:dialback';
        end
 -
 -      session.sends2s("<?xml version='1.0'?>");
 -      session.sends2s(st.stanza("stream:stream", attr):top_tag());
 -      return true;
  end
  
  -- Session initialization logic shared by incoming and outgoing
  local function initialize_session(session)
        local stream = new_xmpp_stream(session, stream_callbacks);
 +      local log = session.log or log;
        session.stream = stream;
 -      
 +
        session.notopen = true;
 -              
 +
        function session.reset_stream()
                session.notopen = true;
                session.streamid = nil;
                session.stream:reset();
        end
  
 -      session.open_stream = session_open_stream;
 -      
 -      local filter = session.filter;
 +      session.stream_attrs = session_stream_attrs;
 +
 +      local filter = initialize_filters(session);
 +      local conn = session.conn;
 +      local w = conn.write;
 +
 +      function session.sends2s(t)
 +              log("debug", "sending: %s", t.top_tag and t:top_tag() or t:match("^[^>]*>?"));
 +              if t.name then
 +                      t = filter("stanzas/out", t);
 +              end
 +              if t then
 +                      t = filter("bytes/out", tostring(t));
 +                      if t then
 +                              return w(conn, t);
 +                      end
 +              end
 +      end
 +
        function session.data(data)
                data = filter("bytes/in", data);
                if data then
                        local ok, err = stream:feed(data);
                        if ok then return; end
 -                      (session.log or log)("warn", "Received invalid XML: %s", data);
 -                      (session.log or log)("warn", "Problem was: %s", err);
 +                      log("warn", "Received invalid XML: %s", data);
 +                      log("warn", "Problem was: %s", err);
                        session:close("not-well-formed");
                end
        end
                return handlestanza(session, stanza);
        end
  
 +      module:fire_event("s2s-created", { session = session });
 +
        add_task(connect_timeout, function ()
                if session.type == "s2sin" or session.type == "s2sout" then
                        return; -- Ok, we're connected
@@@ -566,11 -577,26 +569,11 @@@ function listener.onconnect(conn
                session = s2s_new_incoming(conn);
                sessions[conn] = session;
                session.log("debug", "Incoming s2s connection");
 -
 -              local filter = initialize_filters(session);
 -              local w = conn.write;
 -              session.sends2s = function (t)
 -                      log("debug", "sending: %s", t.top_tag and t:top_tag() or t:match("^([^>]*>?)"));
 -                      if t.name then
 -                              t = filter("stanzas/out", t);
 -                      end
 -                      if t then
 -                              t = filter("bytes/out", tostring(t));
 -                              if t then
 -                                      return w(conn, t);
 -                              end
 -                      end
 -              end
 -      
                initialize_session(session);
        else -- Outgoing session connected
                session:open_stream(session.from_host, session.to_host);
        end
 +      session.ip = conn:ip();
  end
  
  function listener.onincoming(conn, data)
                session.data(data);
        end
  end
 -      
 +
  function listener.onstatus(conn, status)
        if status == "ssl-handshake-complete" then
                local session = sessions[conn];
@@@ -597,6 -623,7 +600,6 @@@ function listener.ondisconnect(conn, er
                if err and session.direction == "outgoing" and session.notopen then
                        (session.log or log)("debug", "s2s connection attempt failed: %s", err);
                        if s2sout.attempt_connection(session, err) then
 -                              (session.log or log)("debug", "...so we're going to try another target");
                                return; -- Session lives for now
                        end
                end
        end
  end
  
 +function listener.onreadtimeout(conn)
 +      local session = sessions[conn];
 +      if session then
 +              return (hosts[session.host] or prosody).events.fire_event("s2s-read-timeout", { session = session });
 +      end
 +end
 +
  function listener.register_outgoing(conn, session)
 -      session.direction = "outgoing";
        sessions[conn] = session;
        initialize_session(session);
  end
  
+ function listener.ondetach(conn)
+       sessions[conn] = nil;
+ end
  function check_auth_policy(event)
        local host, session = event.host, event.session;
        local must_secure = secure_auth;
        elseif must_secure and insecure_domains[host] then
                must_secure = false;
        end
 -      
 +
        if must_secure and (session.cert_chain_status ~= "valid" or session.cert_identity_status ~= "valid") then
                module:log("warn", "Forbidding insecure connection to/from %s", host or session.ip or "(unknown host)");
                if session.direction == "incoming" then