Merge 0.10->trunk
[prosody.git] / plugins / mod_s2s / mod_s2s.lua
index ea186cf0f265f48d5e1d5139b21679a42bcc1281..fe674b55d961b7755d808fee9f68733be2dc2e5d 100644 (file)
@@ -26,6 +26,7 @@ local s2s_new_outgoing = require "core.s2smanager".new_outgoing;
 local s2s_destroy_session = require "core.s2smanager".destroy_session;
 local uuid_gen = require "util.uuid".generate;
 local fire_global_event = prosody.events.fire_event;
+local runner = require "util.async".runner;
 
 local s2sout = module:require("s2sout");
 
@@ -41,6 +42,8 @@ local measure_connections = module:measure("connections", "counter");
 
 local sessions = module:shared("sessions");
 
+local runner_callbacks = {};
+
 local log = module._log;
 
 do
@@ -66,6 +69,9 @@ local function bounce_sendq(session, reason)
                        (session.log or log)("error", "Replying to to an s2s error reply, please report this! Traceback: %s", traceback());
                end;
                dummy = true;
+               close = function ()
+                       (session.log or log)("error", "Attempting to close the dummy origin of s2s error replies, please report this! Traceback: %s", traceback());
+               end;
        };
        for i, data in ipairs(sendq) do
                local reply = data[2];
@@ -153,7 +159,7 @@ module:hook("s2s-read-timeout", keepalive, -1);
 
 function module.add_host(module)
        if module:get_option_boolean("disallow_s2s", false) then
-               module:log("warn", "The 'disallow_s2s' config option is deprecated, please see http://prosody.im/doc/s2s#disabling");
+               module:log("warn", "The 'disallow_s2s' config option is deprecated, please see https://prosody.im/doc/s2s#disabling");
                return nil, "This host has disallow_s2s set";
        end
        module:hook("route/remote", route_to_existing_session, -1);
@@ -264,11 +270,21 @@ end
 
 --- XMPP stream event handlers
 
-local stream_callbacks = { default_ns = "jabber:server", handlestanza =  core_process_stanza };
+local stream_callbacks = { default_ns = "jabber:server" };
+
+function stream_callbacks.handlestanza(session, stanza)
+       stanza = session.filter("stanzas/in", stanza);
+       session.thread:run(stanza);
+end
 
 local xmlns_xmpp_streams = "urn:ietf:params:xml:ns:xmpp-streams";
 
 function stream_callbacks.streamopened(session, attr)
+       -- run _streamopened in async context
+       session.thread:run({ attr = attr });
+end
+
+function stream_callbacks._streamopened(session, attr)
        session.version = tonumber(attr.version) or 0;
 
        -- TODO: Rename session.secure to session.encrypted
@@ -442,14 +458,6 @@ function stream_callbacks.error(session, error, data)
        end
 end
 
-local function handleerr(err) log("error", "Traceback[s2s]: %s", traceback(tostring(err), 2)); end
-function stream_callbacks.handlestanza(session, stanza)
-       stanza = session.filter("stanzas/in", stanza);
-       if stanza then
-               return xpcall(function () return core_process_stanza(session, stanza) end, handleerr);
-       end
-end
-
 local listener = {};
 
 --- Session methods
@@ -524,6 +532,15 @@ end
 -- Session initialization logic shared by incoming and outgoing
 local function initialize_session(session)
        local stream = new_xmpp_stream(session, stream_callbacks);
+
+       session.thread = runner(function (stanza)
+               if stanza.name == nil then
+                       stream_callbacks._streamopened(session, stanza.attr);
+               else
+                       core_process_stanza(session, stanza);
+               end
+       end, runner_callbacks, session);
+
        local log = session.log or log;
        session.stream = stream;
 
@@ -587,6 +604,20 @@ local function initialize_session(session)
        end);
 end
 
+function runner_callbacks:ready()
+       self.data.log("debug", "Runner %s ready (%s)", self.thread, coroutine.status(self.thread));
+       self.data.conn:resume();
+end
+
+function runner_callbacks:waiting()
+       self.data.log("debug", "Runner %s waiting (%s)", self.thread, coroutine.status(self.thread));
+       self.data.conn:pause();
+end
+
+function runner_callbacks:error(err)
+       (self.data.log or log)("error", "Traceback[s2s]: %s", err);
+end
+
 function listener.onconnect(conn)
        measure_connections(1);
        conn:setoption("keepalive", opt_keepalives);