mod_admin_adhoc: Implement global module reloading
[prosody.git] / plugins / mod_s2s / mod_s2s.lua
index abe1992c3b2ba4e8a2ab5c9edf9193a8f07f6ee2..15c89ceddface713754e7a0761e74301e2831b03 100644 (file)
@@ -10,7 +10,7 @@ module:set_global();
 
 local prosody = prosody;
 local hosts = prosody.hosts;
-local core_process_stanza = core_process_stanza;
+local core_process_stanza = prosody.core_process_stanza;
 
 local tostring, type = tostring, type;
 local t_insert = table.insert;
@@ -30,7 +30,8 @@ local cert_verify_identity = require "util.x509".verify_identity;
 
 local s2sout = module:require("s2sout");
 
-local connect_timeout = module:get_option_number("s2s_timeout", 60);
+local connect_timeout = module:get_option_number("s2s_timeout", 90);
+local stream_close_timeout = module:get_option_number("s2s_close_timeout", 5);
 
 local sessions = module:shared("sessions");
 
@@ -97,9 +98,10 @@ function route_to_existing_session(event)
                                log("error", "WARNING! This might, possibly, be a bug, but it might not...");
                                log("error", "We are going to send from %s instead of %s", tostring(host.from_host), tostring(from_host));
                        end
-                       host.sends2s(stanza);
-                       host.log("debug", "stanza sent over "..host.type);
-                       return true;
+                       if host.sends2s(stanza) then
+                               host.log("debug", "stanza sent over %s", host.type);
+                               return true;
+                       end
                end
        end
 end
@@ -125,6 +127,7 @@ end
 
 function module.add_host(module)
        if module:get_option_boolean("disallow_s2s", false) then
+               module:log("warn", "The 'disallow_s2s' config option is deprecated, please see http://prosody.im/doc/s2s#disabling");
                return nil, "This host has disallow_s2s set";
        end
        module:hook("route/remote", route_to_existing_session, 200);
@@ -176,12 +179,19 @@ local xmlns_xmpp_streams = "urn:ietf:params:xml:ns:xmpp-streams";
 function stream_callbacks.streamopened(session, attr)
        local send = session.sends2s;
        
-       -- TODO: #29: SASL/TLS on s2s streams
        session.version = tonumber(attr.version) or 0;
        
        -- TODO: Rename session.secure to session.encrypted
        if session.secure == false then
                session.secure = true;
+
+               -- Check if TLS compression is used
+               local sock = session.conn:socket();
+               if sock.info then
+                       session.compressed = sock:info"compression";
+               elseif sock.compression then
+                       session.compressed = sock:compression(); --COMPAT mw/luasec-hg
+               end
        end
 
        if session.direction == "incoming" then
@@ -213,21 +223,24 @@ function stream_callbacks.streamopened(session, attr)
                        return;
                end
                
+               -- For convenience we'll put the sanitised values into these variables
+               to, from = session.to_host, session.from_host;
+               
                session.streamid = uuid_gen();
                (session.log or log)("debug", "Incoming s2s received %s", st.stanza("stream:stream", attr):top_tag());
-               if session.to_host then
-                       if not hosts[session.to_host] then
+               if to then
+                       if not hosts[to] then
                                -- Attempting to connect to a host we don't serve
                                session:close({
                                        condition = "host-unknown";
-                                       text = "This host does not serve "..session.to_host
+                                       text = "This host does not serve "..to
                                });
                                return;
-                       elseif hosts[session.to_host].disallow_s2s then
+                       elseif not hosts[to].modules.s2s then
                                -- Attempting to connect to a host that disallows s2s
                                session:close({
                                        condition = "policy-violation";
-                                       text = "Server-to-server communication is not allowed to this host";
+                                       text = "Server-to-server communication is disabled for this host";
                                });
                                return;
                        end
@@ -237,24 +250,19 @@ function stream_callbacks.streamopened(session, attr)
 
                send("<?xml version='1.0'?>");
                send(st.stanza("stream:stream", { xmlns='jabber:server', ["xmlns:db"]='jabber:server:dialback',
-                               ["xmlns:stream"]='http://etherx.jabber.org/streams', id=session.streamid, from=session.to_host, to=session.from_host, version=(session.version > 0 and "1.0" or nil) }):top_tag());
+                               ["xmlns:stream"]='http://etherx.jabber.org/streams', id=session.streamid, from=to, to=from, version=(session.version > 0 and "1.0" or nil) }):top_tag());
                if session.version >= 1.0 then
                        local features = st.stanza("stream:features");
                        
-                       if session.to_host then
-                               hosts[session.to_host].events.fire_event("s2s-stream-features", { origin = session, features = features });
+                       if to then
+                               hosts[to].events.fire_event("s2s-stream-features", { origin = session, features = features });
                        else
-                               (session.log or log)("warn", "No 'to' on stream header from %s means we can't offer any features", session.from_host or "unknown host");
+                               (session.log or log)("warn", "No 'to' on stream header from %s means we can't offer any features", from or "unknown host");
                        end
                        
                        log("debug", "Sending stream features: %s", tostring(features));
                        send(features);
                end
-               
-               local host_session = hosts[session.to_host];
-               session.send = function(stanza)
-                       host_session.events.fire_event("route/remote", { from_host = session.to_host, to_host = session.from_host, stanza = stanza})
-               end;
        elseif session.direction == "outgoing" then
                -- If we are just using the connection for verifying dialback keys, we won't try and auth it
                if not attr.id then error("stream response did not give us a streamid!!!"); end
@@ -290,19 +298,7 @@ end
 
 function stream_callbacks.streamclosed(session)
        (session.log or log)("debug", "Received </stream:stream>");
-       session:close();
-end
-
-function stream_callbacks.streamdisconnected(session, err)
-       if err and err ~= "closed" and session.direction == "outgoing" then
-               (session.log or log)("debug", "s2s connection attempt failed: %s", err);
-               if s2sout.attempt_connection(session, err) then
-                       (session.log or log)("debug", "...so we're going to try another target");
-                       return true; -- Session lives for now
-               end
-       end
-       (session.log or log)("info", "s2s disconnected: %s->%s (%s)", tostring(session.from_host), tostring(session.to_host), tostring(err or "closed"));
-       s2s_destroy_session(session, err);
+       session:close(false);
 end
 
 function stream_callbacks.error(session, error, data)
@@ -354,9 +350,9 @@ local function session_close(session, reason, remote_reason)
                        session.sends2s("<?xml version='1.0'?>");
                        session.sends2s(st.stanza("stream:stream", default_stream_attr):top_tag());
                end
-               if reason then
+               if reason then -- nil == no err, initiated by us, false == initiated by remote
                        if type(reason) == "string" then -- assume stream error
-                               log("info", "Disconnecting %s[%s], <stream:error> is: %s", session.host or "(unknown host)", session.type, reason);
+                               log("debug", "Disconnecting %s[%s], <stream:error> is: %s", session.host or "(unknown host)", session.type, reason);
                                session.sends2s(st.stanza("stream:error"):tag(reason, {xmlns = 'urn:ietf:params:xml:ns:xmpp-streams' }));
                        elseif type(reason) == "table" then
                                if reason.condition then
@@ -367,20 +363,35 @@ local function session_close(session, reason, remote_reason)
                                        if reason.extra then
                                                stanza:add_child(reason.extra);
                                        end
-                                       log("info", "Disconnecting %s[%s], <stream:error> is: %s", session.host or "(unknown host)", session.type, tostring(stanza));
+                                       log("debug", "Disconnecting %s[%s], <stream:error> is: %s", session.host or "(unknown host)", session.type, tostring(stanza));
                                        session.sends2s(stanza);
                                elseif reason.name then -- a stanza
-                                       log("info", "Disconnecting %s->%s[%s], <stream:error> is: %s", session.from_host or "(unknown host)", session.to_host or "(unknown host)", session.type, tostring(reason));
+                                       log("debug", "Disconnecting %s->%s[%s], <stream:error> is: %s", session.from_host or "(unknown host)", session.to_host or "(unknown host)", session.type, tostring(reason));
                                        session.sends2s(reason);
                                end
                        end
                end
+
                session.sends2s("</stream:stream>");
-               if session.notopen or not session.conn:close() then
-                       session.conn:close(true); -- Force FIXME: timer?
+               function session.sends2s() return false; end
+               
+               local reason = remote_reason or (reason and (reason.text or reason.condition)) or reason;
+               session.log("info", "%s s2s stream %s->%s closed: %s", session.direction, session.from_host or "(unknown host)", session.to_host or "(unknown host)", reason or "stream closed");
+               
+               -- Authenticated incoming stream may still be sending us stanzas, so wait for </stream:stream> from remote
+               local conn = session.conn;
+               if reason == nil and not session.notopen and session.type == "s2sin" then
+                       add_task(stream_close_timeout, function ()
+                               if not session.destroyed then
+                                       session.log("warn", "Failed to receive a stream close response, closing connection anyway...");
+                                       s2s_destroy_session(session, reason);
+                                       conn:close();
+                               end
+                       end);
+               else
+                       s2s_destroy_session(session, reason);
+                       conn:close(); -- Close immediately, as this is an outgoing connection or is not authed
                end
-               session.conn:close();
-               listener.ondisconnect(session.conn, remote_reason or (reason and (reason.text or reason.condition)) or reason or "stream closed");
        end
 end
 
@@ -415,11 +426,11 @@ local function initialize_session(session)
                return handlestanza(session, stanza);
        end
 
-       local conn = session.conn;
        add_task(connect_timeout, function ()
-               if session.conn ~= conn or session.connecting
-               or session.type == "s2sin" or session.type == "s2sout" then
-                       return; -- Ok, we're connect[ed|ing]
+               if session.type == "s2sin" or session.type == "s2sout" then
+                       return; -- Ok, we're connected
+               elseif session.type == "s2s_destroyed" then
+                       return; -- Session already destroyed
                end
                -- Not connected, need to close session and clean up
                (session.log or log)("debug", "Destroying incomplete session %s->%s due to inactivity",
@@ -429,8 +440,9 @@ local function initialize_session(session)
 end
 
 function listener.onconnect(conn)
-       if not sessions[conn] then -- May be an existing outgoing session
-               local session = s2s_new_incoming(conn);
+       local session = sessions[conn];
+       if not session then -- New incoming connection
+               session = s2s_new_incoming(conn);
                sessions[conn] = session;
                session.log("debug", "Incoming s2s connection");
 
@@ -450,6 +462,8 @@ function listener.onconnect(conn)
                end
        
                initialize_session(session);
+       else -- Outgoing session connected
+               session:open_stream(session.from_host, session.to_host);
        end
 end
 
@@ -473,11 +487,17 @@ end
 function listener.ondisconnect(conn, err)
        local session = sessions[conn];
        if session then
-               if stream_callbacks.streamdisconnected(session, err) then
-                       return; -- Connection lives, for now
+               sessions[conn] = nil;
+               if err and session.direction == "outgoing" and session.notopen then
+                       (session.log or log)("debug", "s2s connection attempt failed: %s", err);
+                       if s2sout.attempt_connection(session, err) then
+                               (session.log or log)("debug", "...so we're going to try another target");
+                               return; -- Session lives for now
+                       end
                end
+               (session.log or log)("debug", "s2s disconnected: %s->%s (%s)", tostring(session.from_host), tostring(session.to_host), tostring(err or "connection closed"));
+               s2s_destroy_session(session, err);
        end
-       sessions[conn] = nil;
 end
 
 function listener.register_outgoing(conn, session)
@@ -488,7 +508,16 @@ end
 
 s2sout.set_listener(listener);
 
-module:add_item("net-provider", {
+module:hook("server-stopping", function(event)
+       local reason = event.reason;
+       for _, session in pairs(sessions) do
+               session:close{ condition = "system-shutdown", text = reason };
+       end
+end,500);
+
+
+
+module:provides("net", {
        name = "s2s";
        listener = listener;
        default_port = 5269;