mod_s2s: Add util.async support
authorKim Alvefur <zash@zash.se>
Mon, 30 May 2016 11:36:43 +0000 (13:36 +0200)
committerKim Alvefur <zash@zash.se>
Mon, 30 May 2016 11:36:43 +0000 (13:36 +0200)
plugins/mod_s2s/mod_s2s.lua

index f5be4f273341527ff59f0d81772a111dffc74c34..c0eaea01216b1e68beb9d83684371b785c08e25d 100644 (file)
@@ -26,6 +26,7 @@ local s2s_new_outgoing = require "core.s2smanager".new_outgoing;
 local s2s_destroy_session = require "core.s2smanager".destroy_session;
 local uuid_gen = require "util.uuid".generate;
 local fire_global_event = prosody.events.fire_event;
+local runner = require "util.async".runner;
 
 local s2sout = module:require("s2sout");
 
@@ -41,6 +42,8 @@ local measure_connections = module:measure("connections", "counter");
 
 local sessions = module:shared("sessions");
 
+local runner_callbacks = {};
+
 local log = module._log;
 
 --- Handle stanzas to remote domains
@@ -257,11 +260,21 @@ end
 
 --- XMPP stream event handlers
 
-local stream_callbacks = { default_ns = "jabber:server", handlestanza =  core_process_stanza };
+local stream_callbacks = { default_ns = "jabber:server" };
+
+function stream_callbacks.handlestanza(session, stanza)
+       stanza = session.filter("stanzas/in", stanza);
+       session.thread:run(stanza);
+end
 
 local xmlns_xmpp_streams = "urn:ietf:params:xml:ns:xmpp-streams";
 
 function stream_callbacks.streamopened(session, attr)
+       -- run _streamopened in async context
+       session.thread:run({ attr = attr });
+end
+
+function stream_callbacks._streamopened(session, attr)
        session.version = tonumber(attr.version) or 0;
 
        -- TODO: Rename session.secure to session.encrypted
@@ -435,14 +448,6 @@ function stream_callbacks.error(session, error, data)
        end
 end
 
-local function handleerr(err) log("error", "Traceback[s2s]: %s", traceback(tostring(err), 2)); end
-function stream_callbacks.handlestanza(session, stanza)
-       stanza = session.filter("stanzas/in", stanza);
-       if stanza then
-               return xpcall(function () return core_process_stanza(session, stanza) end, handleerr);
-       end
-end
-
 local listener = {};
 
 --- Session methods
@@ -517,6 +522,15 @@ end
 -- Session initialization logic shared by incoming and outgoing
 local function initialize_session(session)
        local stream = new_xmpp_stream(session, stream_callbacks);
+
+       session.thread = runner(function (stanza)
+               if stanza.name == nil then
+                       stream_callbacks._streamopened(session, stanza.attr);
+               else
+                       core_process_stanza(session, stanza);
+               end
+       end, runner_callbacks, session);
+
        local log = session.log or log;
        session.stream = stream;
 
@@ -580,6 +594,20 @@ local function initialize_session(session)
        end);
 end
 
+function runner_callbacks:ready()
+       self.data.log("debug", "Runner %s ready (%s)", self.thread, coroutine.status(self.thread));
+       self.data.conn:resume();
+end
+
+function runner_callbacks:waiting()
+       self.data.log("debug", "Runner %s waiting (%s)", self.thread, coroutine.status(self.thread));
+       self.data.conn:pause();
+end
+
+function runner_callbacks:error(err)
+       (self.data.log or log)("error", "Traceback[s2s]: %s", err);
+end
+
 function listener.onconnect(conn)
        measure_connections(1);
        conn:setoption("keepalive", opt_keepalives);