X-Git-Url: https://git.enpas.org/?a=blobdiff_plain;f=plugins%2Fmod_s2s%2Fmod_s2s.lua;h=7401445758361ddd1160746786816dfda5b2eb0f;hb=14b46085cf72301a35aaa02b9e25aef440994928;hp=e2a2273807cdc4e5059a35cba944c725dace21aa;hpb=b9ee11b547b1859fa041b09f3d38da2c4258d39b;p=prosody.git diff --git a/plugins/mod_s2s/mod_s2s.lua b/plugins/mod_s2s/mod_s2s.lua index e2a22738..74014457 100644 --- a/plugins/mod_s2s/mod_s2s.lua +++ b/plugins/mod_s2s/mod_s2s.lua @@ -10,7 +10,7 @@ module:set_global(); local prosody = prosody; local hosts = prosody.hosts; -local core_process_stanza = core_process_stanza; +local core_process_stanza = prosody.core_process_stanza; local tostring, type = tostring, type; local t_insert = table.insert; @@ -30,7 +30,8 @@ local cert_verify_identity = require "util.x509".verify_identity; local s2sout = module:require("s2sout"); -local connect_timeout = module:get_option_number("s2s_timeout", 60); +local connect_timeout = module:get_option_number("s2s_timeout", 90); +local stream_close_timeout = module:get_option_number("s2s_close_timeout", 5); local sessions = module:shared("sessions"); @@ -97,9 +98,10 @@ function route_to_existing_session(event) log("error", "WARNING! This might, possibly, be a bug, but it might not..."); log("error", "We are going to send from %s instead of %s", tostring(host.from_host), tostring(from_host)); end - host.sends2s(stanza); - host.log("debug", "stanza sent over "..host.type); - return true; + if host.sends2s(stanza) then + host.log("debug", "stanza sent over %s", host.type); + return true; + end end end end @@ -134,6 +136,7 @@ end --- Helper to check that a session peer's certificate is valid local function check_cert_status(session) + local host = session.direction == "incoming" and session.from_host or session.to_host local conn = session.conn:socket() local cert if conn.getpeercertificate then @@ -153,8 +156,6 @@ local function check_cert_status(session) (session.log or log)("debug", "certificate chain validation result: valid"); session.cert_chain_status = "valid"; - local host = session.direction == "incoming" and session.from_host or session.to_host - -- We'll go ahead and verify the asserted identity if the -- connecting server specified one. if host then @@ -166,6 +167,7 @@ local function check_cert_status(session) end end end + module:fire_event("s2s-check-certificate", { host = host, session = session, cert = cert }); end --- XMPP stream event handlers @@ -177,12 +179,19 @@ local xmlns_xmpp_streams = "urn:ietf:params:xml:ns:xmpp-streams"; function stream_callbacks.streamopened(session, attr) local send = session.sends2s; - -- TODO: #29: SASL/TLS on s2s streams session.version = tonumber(attr.version) or 0; -- TODO: Rename session.secure to session.encrypted if session.secure == false then session.secure = true; + + -- Check if TLS compression is used + local sock = session.conn:socket(); + if sock.info then + session.compressed = sock:info"compression"; + elseif sock.compression then + session.compressed = sock:compression(); --COMPAT mw/luasec-hg + end end if session.direction == "incoming" then @@ -278,7 +287,7 @@ function stream_callbacks.streamopened(session, attr) -- If server is pre-1.0, don't wait for features, just do dialback if session.version < 1.0 then if not session.dialback_verifying then - hosts[session.from_host].events.fire_event("s2s-authenticate-legacy", { origin = session }); + hosts[session.from_host].events.fire_event("s2sout-authenticate-legacy", { origin = session }); else s2s_mark_connected(session); end @@ -289,19 +298,7 @@ end function stream_callbacks.streamclosed(session) (session.log or log)("debug", "Received "); - session:close(); -end - -function stream_callbacks.streamdisconnected(session, err) - if err and err ~= "closed" and session.direction == "outgoing" then - (session.log or log)("debug", "s2s connection attempt failed: %s", err); - if s2sout.attempt_connection(session, err) then - (session.log or log)("debug", "...so we're going to try another target"); - return true; -- Session lives for now - end - end - (session.log or log)("info", "s2s disconnected: %s->%s (%s)", tostring(session.from_host), tostring(session.to_host), tostring(err or "closed")); - s2s_destroy_session(session, err); + session:close(false); end function stream_callbacks.error(session, error, data) @@ -353,9 +350,9 @@ local function session_close(session, reason, remote_reason) session.sends2s(""); session.sends2s(st.stanza("stream:stream", default_stream_attr):top_tag()); end - if reason then + if reason then -- nil == no err, initiated by us, false == initiated by remote if type(reason) == "string" then -- assume stream error - log("info", "Disconnecting %s[%s], is: %s", session.host or "(unknown host)", session.type, reason); + log("debug", "Disconnecting %s[%s], is: %s", session.host or "(unknown host)", session.type, reason); session.sends2s(st.stanza("stream:error"):tag(reason, {xmlns = 'urn:ietf:params:xml:ns:xmpp-streams' })); elseif type(reason) == "table" then if reason.condition then @@ -366,20 +363,35 @@ local function session_close(session, reason, remote_reason) if reason.extra then stanza:add_child(reason.extra); end - log("info", "Disconnecting %s[%s], is: %s", session.host or "(unknown host)", session.type, tostring(stanza)); + log("debug", "Disconnecting %s[%s], is: %s", session.host or "(unknown host)", session.type, tostring(stanza)); session.sends2s(stanza); elseif reason.name then -- a stanza - log("info", "Disconnecting %s->%s[%s], is: %s", session.from_host or "(unknown host)", session.to_host or "(unknown host)", session.type, tostring(reason)); + log("debug", "Disconnecting %s->%s[%s], is: %s", session.from_host or "(unknown host)", session.to_host or "(unknown host)", session.type, tostring(reason)); session.sends2s(reason); end end end + session.sends2s(""); - if session.notopen or not session.conn:close() then - session.conn:close(true); -- Force FIXME: timer? + function session.sends2s() return false; end + + local reason = remote_reason or (reason and (reason.text or reason.condition)) or reason; + session.log("info", "%s s2s stream %s->%s closed: %s", session.direction, session.from_host or "(unknown host)", session.to_host or "(unknown host)", reason or "stream closed"); + + -- Authenticated incoming stream may still be sending us stanzas, so wait for from remote + local conn = session.conn; + if reason == nil and not session.notopen and session.type == "s2sin" then + add_task(stream_close_timeout, function () + if not session.destroyed then + session.log("warn", "Failed to receive a stream close response, closing connection anyway..."); + s2s_destroy_session(session, reason); + conn:close(); + end + end); + else + s2s_destroy_session(session, reason); + conn:close(); -- Close immediately, as this is an outgoing connection or is not authed end - session.conn:close(); - listener.ondisconnect(session.conn, remote_reason or (reason and (reason.text or reason.condition)) or reason or "stream closed"); end end @@ -414,11 +426,11 @@ local function initialize_session(session) return handlestanza(session, stanza); end - local conn = session.conn; add_task(connect_timeout, function () - if session.conn ~= conn or session.connecting - or session.type == "s2sin" or session.type == "s2sout" then - return; -- Ok, we're connect[ed|ing] + if session.type == "s2sin" or session.type == "s2sout" then + return; -- Ok, we're connected + elseif session.type == "s2s_destroyed" then + return; -- Session already destroyed end -- Not connected, need to close session and clean up (session.log or log)("debug", "Destroying incomplete session %s->%s due to inactivity", @@ -428,8 +440,9 @@ local function initialize_session(session) end function listener.onconnect(conn) - if not sessions[conn] then -- May be an existing outgoing session - local session = s2s_new_incoming(conn); + local session = sessions[conn]; + if not session then -- New incoming connection + session = s2s_new_incoming(conn); sessions[conn] = session; session.log("debug", "Incoming s2s connection"); @@ -449,6 +462,8 @@ function listener.onconnect(conn) end initialize_session(session); + else -- Outgoing session connected + session:open_stream(session.from_host, session.to_host); end end @@ -472,11 +487,17 @@ end function listener.ondisconnect(conn, err) local session = sessions[conn]; if session then - if stream_callbacks.streamdisconnected(session, err) then - return; -- Connection lives, for now + sessions[conn] = nil; + if err and session.direction == "outgoing" and session.notopen then + (session.log or log)("debug", "s2s connection attempt failed: %s", err); + if s2sout.attempt_connection(session, err) then + (session.log or log)("debug", "...so we're going to try another target"); + return; -- Session lives for now + end end + (session.log or log)("debug", "s2s disconnected: %s->%s (%s)", tostring(session.from_host), tostring(session.to_host), tostring(err or "connection closed")); + s2s_destroy_session(session, err); end - sessions[conn] = nil; end function listener.register_outgoing(conn, session) @@ -487,7 +508,16 @@ end s2sout.set_listener(listener); -module:add_item("net-provider", { +module:hook("server-stopping", function(event) + local reason = event.reason; + for _, session in pairs(sessions) do + session:close{ condition = "system-shutdown", text = reason }; + end +end,500); + + + +module:provides("net", { name = "s2s"; listener = listener; default_port = 5269;