From: Kim Alvefur Date: Fri, 8 Jul 2016 13:12:44 +0000 (+0200) Subject: Merge 0.10->trunk X-Git-Url: https://git.enpas.org/?a=commitdiff_plain;h=0ca860005cbdd6841778076c1c22fd324f7f58b8;hp=-c;p=prosody.git Merge 0.10->trunk --- 0ca860005cbdd6841778076c1c22fd324f7f58b8 diff --combined plugins/mod_c2s.lua index a690e9be,e69bf461..703c4ac7 --- a/plugins/mod_c2s.lua +++ b/plugins/mod_c2s.lua @@@ -15,10 -15,9 +15,10 @@@ local sessionmanager = require "core.se local st = require "util.stanza"; local sm_new_session, sm_destroy_session = sessionmanager.new_session, sessionmanager.destroy_session; local uuid_generate = require "util.uuid".generate; +local runner = require "util.async".runner; local xpcall, tostring, type = xpcall, tostring, type; -local traceback = debug.traceback; +local t_insert, t_remove = table.insert, table.remove; local xmlns_xmpp_streams = "urn:ietf:params:xml:ns:xmpp-streams"; @@@ -36,8 -35,17 +36,18 @@@ local hosts = prosody.hosts local stream_callbacks = { default_ns = "jabber:client" }; local listener = {}; +local runner_callbacks = {}; + do + -- Connection counter resets to 0 on load and reload + -- Bump it up to current value + local count = 0; + for _ in pairs(sessions) do + count = count + 1; + end + measure_connections(count); + end + --- Stream events handlers local stream_xmlns_attr = {xmlns='urn:ietf:params:xml:ns:xmpp-streams'}; @@@ -123,9 -131,12 +133,9 @@@ function stream_callbacks.error(session end end -local function handleerr(err) log("error", "Traceback[c2s]: %s", traceback(tostring(err), 2)); end function stream_callbacks.handlestanza(session, stanza) stanza = session.filter("stanzas/in", stanza); - if stanza then - return xpcall(function () return core_process_stanza(session, stanza) end, handleerr); - end + session.thread:run(stanza); end --- Session methods @@@ -194,18 -205,6 +204,18 @@@ module:hook_global("user-deleted", func end end, 200); +function runner_callbacks:ready() + self.data.conn:resume(); +end + +function runner_callbacks:waiting() + self.data.conn:pause(); +end + +function runner_callbacks:error(err) + (self.data.log or log)("error", "Traceback[c2s]: %s", err); +end + --- Port listener function listener.onconnect(conn) measure_connections(1); @@@ -243,10 -242,6 +253,10 @@@ session.stream:reset(); end + session.thread = runner(function (stanza) + core_process_stanza(session, stanza); + end, runner_callbacks, session); + local filter = session.filter; function session.data(data) -- Parse the data, which will store stanzas in session.pending_stanzas diff --combined plugins/mod_s2s/mod_s2s.lua index c0eaea01,ea186cf0..fe674b55 --- a/plugins/mod_s2s/mod_s2s.lua +++ b/plugins/mod_s2s/mod_s2s.lua @@@ -26,7 -26,6 +26,7 @@@ local s2s_new_outgoing = require "core. local s2s_destroy_session = require "core.s2smanager".destroy_session; local uuid_gen = require "util.uuid".generate; local fire_global_event = prosody.events.fire_event; +local runner = require "util.async".runner; local s2sout = module:require("s2sout"); @@@ -42,10 -41,18 +42,20 @@@ local measure_connections = module:meas local sessions = module:shared("sessions"); +local runner_callbacks = {}; + local log = module._log; + do + -- Connection counter resets to 0 on load and reload + -- Bump it up to current value + local count = 0; + for _ in pairs(sessions) do + count = count + 1; + end + measure_connections(count); + end + --- Handle stanzas to remote domains local bouncy_stanzas = { message = true, presence = true, iq = true }; @@@ -59,9 -66,6 +69,9 @@@ local function bounce_sendq(session, re (session.log or log)("error", "Replying to to an s2s error reply, please report this! Traceback: %s", traceback()); end; dummy = true; + close = function () + (session.log or log)("error", "Attempting to close the dummy origin of s2s error replies, please report this! Traceback: %s", traceback()); + end; }; for i, data in ipairs(sendq) do local reply = data[2]; @@@ -149,7 -153,7 +159,7 @@@ module:hook("s2s-read-timeout", keepali function module.add_host(module) if module:get_option_boolean("disallow_s2s", false) then - module:log("warn", "The 'disallow_s2s' config option is deprecated, please see http://prosody.im/doc/s2s#disabling"); + module:log("warn", "The 'disallow_s2s' config option is deprecated, please see https://prosody.im/doc/s2s#disabling"); return nil, "This host has disallow_s2s set"; end module:hook("route/remote", route_to_existing_session, -1); @@@ -260,21 -264,11 +270,21 @@@ en --- XMPP stream event handlers -local stream_callbacks = { default_ns = "jabber:server", handlestanza = core_process_stanza }; +local stream_callbacks = { default_ns = "jabber:server" }; + +function stream_callbacks.handlestanza(session, stanza) + stanza = session.filter("stanzas/in", stanza); + session.thread:run(stanza); +end local xmlns_xmpp_streams = "urn:ietf:params:xml:ns:xmpp-streams"; function stream_callbacks.streamopened(session, attr) + -- run _streamopened in async context + session.thread:run({ attr = attr }); +end + +function stream_callbacks._streamopened(session, attr) session.version = tonumber(attr.version) or 0; -- TODO: Rename session.secure to session.encrypted @@@ -448,6 -442,14 +458,6 @@@ function stream_callbacks.error(session end end -local function handleerr(err) log("error", "Traceback[s2s]: %s", traceback(tostring(err), 2)); end -function stream_callbacks.handlestanza(session, stanza) - stanza = session.filter("stanzas/in", stanza); - if stanza then - return xpcall(function () return core_process_stanza(session, stanza) end, handleerr); - end -end - local listener = {}; --- Session methods @@@ -522,15 -524,6 +532,15 @@@ en -- Session initialization logic shared by incoming and outgoing local function initialize_session(session) local stream = new_xmpp_stream(session, stream_callbacks); + + session.thread = runner(function (stanza) + if stanza.name == nil then + stream_callbacks._streamopened(session, stanza.attr); + else + core_process_stanza(session, stanza); + end + end, runner_callbacks, session); + local log = session.log or log; session.stream = stream; @@@ -594,20 -587,6 +604,20 @@@ end); end +function runner_callbacks:ready() + self.data.log("debug", "Runner %s ready (%s)", self.thread, coroutine.status(self.thread)); + self.data.conn:resume(); +end + +function runner_callbacks:waiting() + self.data.log("debug", "Runner %s waiting (%s)", self.thread, coroutine.status(self.thread)); + self.data.conn:pause(); +end + +function runner_callbacks:error(err) + (self.data.log or log)("error", "Traceback[s2s]: %s", err); +end + function listener.onconnect(conn) measure_connections(1); conn:setoption("keepalive", opt_keepalives);