X-Git-Url: https://git.enpas.org/?a=blobdiff_plain;f=plugins%2Fmod_s2s%2Fmod_s2s.lua;h=e704c25a00c172262ecac5ede188541c6df88258;hb=41ec48760b564924ad75de119a018252d2a8cbcc;hp=f6c206065a47e6a01cfe3fd0bf48d4de42e54212;hpb=d5b64eb8c48229d7327851c415915dd3707af7b9;p=prosody.git diff --git a/plugins/mod_s2s/mod_s2s.lua b/plugins/mod_s2s/mod_s2s.lua index f6c20606..e704c25a 100644 --- a/plugins/mod_s2s/mod_s2s.lua +++ b/plugins/mod_s2s/mod_s2s.lua @@ -1,7 +1,7 @@ -- Prosody IM -- Copyright (C) 2008-2010 Matthew Wild -- Copyright (C) 2008-2010 Waqas Hussain --- +-- -- This project is MIT/X11 licensed. Please see the -- COPYING file in the source package for more information. -- @@ -10,11 +10,12 @@ module:set_global(); local prosody = prosody; local hosts = prosody.hosts; -local core_process_stanza = core_process_stanza; +local core_process_stanza = prosody.core_process_stanza; local tostring, type = tostring, type; local t_insert = table.insert; local xpcall, traceback = xpcall, debug.traceback; +local NULL = {}; local add_task = require "util.timer".add_task; local st = require "util.stanza"; @@ -24,13 +25,19 @@ local new_xmpp_stream = require "util.xmppstream".new; local s2s_new_incoming = require "core.s2smanager".new_incoming; local s2s_new_outgoing = require "core.s2smanager".new_outgoing; local s2s_destroy_session = require "core.s2smanager".destroy_session; -local s2s_mark_connected = require "core.s2smanager".mark_connected; local uuid_gen = require "util.uuid".generate; local cert_verify_identity = require "util.x509".verify_identity; +local fire_global_event = prosody.events.fire_event; local s2sout = module:require("s2sout"); -local connect_timeout = module:get_option_number("s2s_timeout", 60); +local connect_timeout = module:get_option_number("s2s_timeout", 90); +local stream_close_timeout = module:get_option_number("s2s_close_timeout", 5); +local opt_keepalives = module:get_option_boolean("s2s_tcp_keepalives", module:get_option_boolean("tcp_keepalives", true)); +local secure_auth = module:get_option_boolean("s2s_secure_auth", false); -- One day... +local secure_domains, insecure_domains = + module:get_option_set("s2s_secure_domains", {})._items, module:get_option_set("s2s_insecure_domains", {})._items; +local require_encryption = module:get_option_boolean("s2s_require_encryption", false); local sessions = module:shared("sessions"); @@ -74,6 +81,10 @@ function route_to_existing_session(event) log("warn", "Attempt to send stanza from %s - a host we don't serve", from_host); return false; end + if hosts[to_host] then + log("warn", "Attempt to route stanza to a remote %s - a host we do serve?!", from_host); + return false; + end local host = hosts[from_host].s2sout[to_host]; if host then -- We have a connection to this host already @@ -97,9 +108,10 @@ function route_to_existing_session(event) log("error", "WARNING! This might, possibly, be a bug, but it might not..."); log("error", "We are going to send from %s instead of %s", tostring(host.from_host), tostring(from_host)); end - host.sends2s(stanza); - host.log("debug", "stanza sent over "..host.type); - return true; + if host.sends2s(stanza) then + host.log("debug", "stanza sent over %s", host.type); + return true; + end end end end @@ -123,17 +135,108 @@ function route_to_new_session(event) return true; end +local function keepalive(event) + return event.session.sends2s(' '); +end + +module:hook("s2s-read-timeout", keepalive, -1); + function module.add_host(module) if module:get_option_boolean("disallow_s2s", false) then module:log("warn", "The 'disallow_s2s' config option is deprecated, please see http://prosody.im/doc/s2s#disabling"); return nil, "This host has disallow_s2s set"; end - module:hook("route/remote", route_to_existing_session, 200); - module:hook("route/remote", route_to_new_session, 100); + module:hook("route/remote", route_to_existing_session, -1); + module:hook("route/remote", route_to_new_session, -10); + module:hook("s2s-authenticated", make_authenticated, -1); + module:hook("s2s-read-timeout", keepalive, -1); + module:hook_stanza("http://etherx.jabber.org/streams", "features", function (session, stanza) + if session.type == "s2sout" then + -- Stream is authenticated and we are seem to be done with feature negotiation, + -- so the stream is ready for stanzas. RFC 6120 Section 4.3 + mark_connected(session); + end + end, -1); +end + +-- Stream is authorised, and ready for normal stanzas +function mark_connected(session) + local sendq, send = session.sendq, session.sends2s; + + local from, to = session.from_host, session.to_host; + + session.log("info", "%s s2s connection %s->%s complete", session.direction:gsub("^.", string.upper), from, to); + + local event_data = { session = session }; + if session.type == "s2sout" then + fire_global_event("s2sout-established", event_data); + hosts[from].events.fire_event("s2sout-established", event_data); + else + local host_session = hosts[to]; + session.send = function(stanza) + return host_session.events.fire_event("route/remote", { from_host = to, to_host = from, stanza = stanza }); + end; + + fire_global_event("s2sin-established", event_data); + hosts[to].events.fire_event("s2sin-established", event_data); + end + + if session.direction == "outgoing" then + if sendq then + session.log("debug", "sending %d queued stanzas across new outgoing connection to %s", #sendq, session.to_host); + for i, data in ipairs(sendq) do + send(data[1]); + sendq[i] = nil; + end + session.sendq = nil; + end + + session.ip_hosts = nil; + session.srv_hosts = nil; + end +end + +function make_authenticated(event) + local session, host = event.session, event.host; + if not session.secure then + if require_encryption or (secure_auth and not(insecure_domains[host])) or secure_domains[host] then + session:close({ + condition = "policy-violation", + text = "Encrypted server-to-server communication is required but was not " + ..((session.direction == "outgoing" and "offered") or "used") + }); + end + end + if hosts[host] then + session:close({ condition = "undefined-condition", text = "Attempt to authenticate as a host we serve" }); + end + if session.type == "s2sout_unauthed" then + session.type = "s2sout"; + elseif session.type == "s2sin_unauthed" then + session.type = "s2sin"; + if host then + if not session.hosts[host] then session.hosts[host] = {}; end + session.hosts[host].authed = true; + end + elseif session.type == "s2sin" and host then + if not session.hosts[host] then session.hosts[host] = {}; end + session.hosts[host].authed = true; + else + return false; + end + session.log("debug", "connection %s->%s is now authenticated for %s", session.from_host, session.to_host, host); + + if (session.type == "s2sout" and session.external_auth ~= "succeeded") or session.type == "s2sin" then + -- Stream either used dialback for authentication or is an incoming stream. + mark_connected(session); + end + + return true; end --- Helper to check that a session peer's certificate is valid -local function check_cert_status(session) +function check_cert_status(session) + local host = session.direction == "outgoing" and session.to_host or session.from_host local conn = session.conn:socket() local cert if conn.getpeercertificate then @@ -141,11 +244,19 @@ local function check_cert_status(session) end if cert then - local chain_valid, errors = conn:getpeerverification() + local chain_valid, errors; + if conn.getpeerverification then + chain_valid, errors = conn:getpeerverification(); + elseif conn.getpeerchainvalid then -- COMPAT mw/luasec-hg + chain_valid, errors = conn:getpeerchainvalid(); + errors = (not chain_valid) and { { errors } } or nil; + else + chain_valid, errors = false, { { "Chain verification not supported by this version of LuaSec" } }; + end -- Is there any interest in printing out all/the number of errors here? if not chain_valid then (session.log or log)("debug", "certificate chain validation result: invalid"); - for depth, t in ipairs(errors) do + for depth, t in pairs(errors or NULL) do (session.log or log)("debug", "certificate error(s) at depth %d: %s", depth-1, table.concat(t, ", ")) end session.cert_chain_status = "invalid"; @@ -153,8 +264,6 @@ local function check_cert_status(session) (session.log or log)("debug", "certificate chain validation result: valid"); session.cert_chain_status = "valid"; - local host = session.direction == "incoming" and session.from_host or session.to_host - -- We'll go ahead and verify the asserted identity if the -- connecting server specified one. if host then @@ -163,9 +272,11 @@ local function check_cert_status(session) else session.cert_identity_status = "invalid" end + (session.log or log)("debug", "certificate identity validation result: %s", session.cert_identity_status); end end end + return module:fire_event("s2s-check-certificate", { host = host, session = session, cert = cert }); end --- XMPP stream event handlers @@ -176,17 +287,28 @@ local xmlns_xmpp_streams = "urn:ietf:params:xml:ns:xmpp-streams"; function stream_callbacks.streamopened(session, attr) local send = session.sends2s; - + session.version = tonumber(attr.version) or 0; - + -- TODO: Rename session.secure to session.encrypted if session.secure == false then session.secure = true; + session.encrypted = true; + + local sock = session.conn:socket(); + if sock.info then + local info = sock:info(); + (session.log or log)("info", "Stream encrypted (%s with %s)", info.protocol, info.cipher); + session.compressed = info.compression; + else + (session.log or log)("info", "Stream encrypted"); + session.compressed = sock.compression and sock:compression(); --COMPAT mw/luasec-hg + end end if session.direction == "incoming" then -- Send a reply stream header - + -- Validate to/from local to, from = nameprep(attr.to), nameprep(attr.from); if not to and attr.to then -- COMPAT: Some servers do not reliably set 'to' (especially on stream restarts) @@ -197,7 +319,7 @@ function stream_callbacks.streamopened(session, attr) session:close({ condition = "improper-addressing", text = "Invalid 'from' address" }); return; end - + -- Set session.[from/to]_host if they have not been set already and if -- this session isn't already authenticated if session.type == "s2sin_unauthed" and from and not session.from_host then @@ -212,10 +334,10 @@ function stream_callbacks.streamopened(session, attr) session:close({ condition = "improper-addressing", text = "New stream 'to' attribute does not match original" }); return; end - + -- For convenience we'll put the sanitised values into these variables to, from = session.to_host, session.from_host; - + session.streamid = uuid_gen(); (session.log or log)("debug", "Incoming s2s received %s", st.stanza("stream:stream", attr):top_tag()); if to then @@ -236,20 +358,27 @@ function stream_callbacks.streamopened(session, attr) end end - if session.secure and not session.cert_chain_status then check_cert_status(session); end + if hosts[from] then + session:close({ condition = "undefined-condition", text = "Attempt to connect from a host we serve" }); + return; + end + + if session.secure and not session.cert_chain_status then + if check_cert_status(session) == false then + return; + end + end - send(""); - send(st.stanza("stream:stream", { xmlns='jabber:server', ["xmlns:db"]='jabber:server:dialback', - ["xmlns:stream"]='http://etherx.jabber.org/streams', id=session.streamid, from=to, to=from, version=(session.version > 0 and "1.0" or nil) }):top_tag()); + session:open_stream(session.to_host, session.from_host) if session.version >= 1.0 then local features = st.stanza("stream:features"); - + if to then hosts[to].events.fire_event("s2s-stream-features", { origin = session, features = features }); else - (session.log or log)("warn", "No 'to' on stream header from %s means we can't offer any features", from or "unknown host"); + (session.log or log)("warn", "No 'to' on stream header from %s means we can't offer any features", from or session.ip or "unknown host"); end - + log("debug", "Sending stream features: %s", tostring(features)); send(features); end @@ -258,7 +387,11 @@ function stream_callbacks.streamopened(session, attr) if not attr.id then error("stream response did not give us a streamid!!!"); end session.streamid = attr.id; - if session.secure and not session.cert_chain_status then check_cert_status(session); end + if session.secure and not session.cert_chain_status then + if check_cert_status(session) == false then + return; + end + end -- Send unauthed buffer -- (stanzas which are fine to send before dialback) @@ -273,13 +406,13 @@ function stream_callbacks.streamopened(session, attr) end end session.send_buffer = nil; - + -- If server is pre-1.0, don't wait for features, just do dialback if session.version < 1.0 then if not session.dialback_verifying then - hosts[session.from_host].events.fire_event("s2s-authenticate-legacy", { origin = session }); + hosts[session.from_host].events.fire_event("s2sout-authenticate-legacy", { origin = session }); else - s2s_mark_connected(session); + mark_connected(session); end end end @@ -288,19 +421,7 @@ end function stream_callbacks.streamclosed(session) (session.log or log)("debug", "Received "); - session:close(); -end - -function stream_callbacks.streamdisconnected(session, err) - if err and err ~= "closed" and session.direction == "outgoing" and session.notopen then - (session.log or log)("debug", "s2s connection attempt failed: %s", err); - if s2sout.attempt_connection(session, err) then - (session.log or log)("debug", "...so we're going to try another target"); - return true; -- Session lives for now - end - end - (session.log or log)("info", "s2s disconnected: %s->%s (%s)", tostring(session.from_host), tostring(session.to_host), tostring(err or "closed")); - s2s_destroy_session(session, err); + session:close(false); end function stream_callbacks.error(session, error, data) @@ -329,7 +450,7 @@ function stream_callbacks.error(session, error, data) end end -local function handleerr(err) log("error", "Traceback[s2s]: %s: %s", tostring(err), traceback()); end +local function handleerr(err) log("error", "Traceback[s2s]: %s", traceback(tostring(err), 2)); end function stream_callbacks.handlestanza(session, stanza) if stanza.attr.xmlns == "jabber:client" then --COMPAT: Prosody pre-0.6.2 may send jabber:client stanza.attr.xmlns = nil; @@ -344,17 +465,19 @@ local listener = {}; --- Session methods local stream_xmlns_attr = {xmlns='urn:ietf:params:xml:ns:xmpp-streams'}; -local default_stream_attr = { ["xmlns:stream"] = "http://etherx.jabber.org/streams", xmlns = stream_callbacks.default_ns, version = "1.0", id = "" }; local function session_close(session, reason, remote_reason) local log = session.log or log; if session.conn then if session.notopen then - session.sends2s(""); - session.sends2s(st.stanza("stream:stream", default_stream_attr):top_tag()); + if session.direction == "incoming" then + session:open_stream(session.to_host, session.from_host); + else + session:open_stream(session.from_host, session.to_host); + end end - if reason then + if reason then -- nil == no err, initiated by us, false == initiated by remote if type(reason) == "string" then -- assume stream error - log("info", "Disconnecting %s[%s], is: %s", session.host or "(unknown host)", session.type, reason); + log("debug", "Disconnecting %s[%s], is: %s", session.host or session.ip or "(unknown host)", session.type, reason); session.sends2s(st.stanza("stream:error"):tag(reason, {xmlns = 'urn:ietf:params:xml:ns:xmpp-streams' })); elseif type(reason) == "table" then if reason.condition then @@ -365,43 +488,83 @@ local function session_close(session, reason, remote_reason) if reason.extra then stanza:add_child(reason.extra); end - log("info", "Disconnecting %s[%s], is: %s", session.host or "(unknown host)", session.type, tostring(stanza)); + log("debug", "Disconnecting %s[%s], is: %s", session.host or session.ip or "(unknown host)", session.type, tostring(stanza)); session.sends2s(stanza); elseif reason.name then -- a stanza - log("info", "Disconnecting %s->%s[%s], is: %s", session.from_host or "(unknown host)", session.to_host or "(unknown host)", session.type, tostring(reason)); + log("debug", "Disconnecting %s->%s[%s], is: %s", session.from_host or "(unknown host)", session.to_host or "(unknown host)", session.type, tostring(reason)); session.sends2s(reason); end end end + session.sends2s(""); - if session.notopen or not session.conn:close() then - session.conn:close(true); -- Force FIXME: timer? + function session.sends2s() return false; end + + local reason = remote_reason or (reason and (reason.text or reason.condition)) or reason; + session.log("info", "%s s2s stream %s->%s closed: %s", session.direction:gsub("^.", string.upper), session.from_host or "(unknown host)", session.to_host or "(unknown host)", reason or "stream closed"); + + -- Authenticated incoming stream may still be sending us stanzas, so wait for from remote + local conn = session.conn; + if reason == nil and not session.notopen and session.type == "s2sin" then + add_task(stream_close_timeout, function () + if not session.destroyed then + session.log("warn", "Failed to receive a stream close response, closing connection anyway..."); + s2s_destroy_session(session, reason); + conn:close(); + end + end); + else + s2s_destroy_session(session, reason); + conn:close(); -- Close immediately, as this is an outgoing connection or is not authed end - session.conn:close(); - listener.ondisconnect(session.conn, remote_reason or (reason and (reason.text or reason.condition)) or reason or "stream closed"); + end +end + +function session_stream_attrs(session, from, to, attr) + if not from or (hosts[from] and hosts[from].modules.dialback) then + attr["xmlns:db"] = 'jabber:server:dialback'; end end -- Session initialization logic shared by incoming and outgoing local function initialize_session(session) local stream = new_xmpp_stream(session, stream_callbacks); + local log = session.log or log; session.stream = stream; - + session.notopen = true; - + function session.reset_stream() session.notopen = true; session.stream:reset(); end - - local filter = session.filter; + + session.stream_attrs = session_stream_attrs; + + local filter = initialize_filters(session); + local conn = session.conn; + local w = conn.write; + + function session.sends2s(t) + log("debug", "sending: %s", t.top_tag and t:top_tag() or t:match("^[^>]*>?")); + if t.name then + t = filter("stanzas/out", t); + end + if t then + t = filter("bytes/out", tostring(t)); + if t then + return w(conn, t); + end + end + end + function session.data(data) data = filter("bytes/in", data); if data then local ok, err = stream:feed(data); if ok then return; end - (session.log or log)("warn", "Received invalid XML: %s", data); - (session.log or log)("warn", "Problem was: %s", err); + log("warn", "Received invalid XML: %s", data); + log("warn", "Problem was: %s", err); session:close("not-well-formed"); end end @@ -413,11 +576,13 @@ local function initialize_session(session) return handlestanza(session, stanza); end - local conn = session.conn; + module:fire_event("s2s-created", { session = session }); + add_task(connect_timeout, function () - if session.conn ~= conn or session.connecting - or session.type == "s2sin" or session.type == "s2sout" then - return; -- Ok, we're connect[ed|ing] + if session.type == "s2sin" or session.type == "s2sout" then + return; -- Ok, we're connected + elseif session.type == "s2s_destroyed" then + return; -- Session already destroyed end -- Not connected, need to close session and clean up (session.log or log)("debug", "Destroying incomplete session %s->%s due to inactivity", @@ -427,31 +592,17 @@ local function initialize_session(session) end function listener.onconnect(conn) + conn:setoption("keepalive", opt_keepalives); local session = sessions[conn]; if not session then -- New incoming connection session = s2s_new_incoming(conn); sessions[conn] = session; session.log("debug", "Incoming s2s connection"); - - local filter = initialize_filters(session); - local w = conn.write; - session.sends2s = function (t) - log("debug", "sending: %s", t.top_tag and t:top_tag() or t:match("^([^>]*>?)")); - if t.name then - t = filter("stanzas/out", t); - end - if t then - t = filter("bytes/out", tostring(t)); - if t then - return w(conn, t); - end - end - end - initialize_session(session); else -- Outgoing session connected session:open_stream(session.from_host, session.to_host); end + session.ip = conn:ip(); end function listener.onincoming(conn, data) @@ -460,7 +611,7 @@ function listener.onincoming(conn, data) session.data(data); end end - + function listener.onstatus(conn, status) if status == "ssl-handshake-complete" then local session = sessions[conn]; @@ -474,22 +625,65 @@ end function listener.ondisconnect(conn, err) local session = sessions[conn]; if session then - if stream_callbacks.streamdisconnected(session, err) then - return; -- Connection lives, for now + sessions[conn] = nil; + if err and session.direction == "outgoing" and session.notopen then + (session.log or log)("debug", "s2s connection attempt failed: %s", err); + if s2sout.attempt_connection(session, err) then + return; -- Session lives for now + end end + (session.log or log)("debug", "s2s disconnected: %s->%s (%s)", tostring(session.from_host), tostring(session.to_host), tostring(err or "connection closed")); + s2s_destroy_session(session, err); + end +end + +function listener.onreadtimeout(conn) + local session = sessions[conn]; + if session then + return (hosts[session.host] or prosody).events.fire_event("s2s-read-timeout", { session = session }); end - sessions[conn] = nil; end function listener.register_outgoing(conn, session) - session.direction = "outgoing"; sessions[conn] = session; initialize_session(session); end +function check_auth_policy(event) + local host, session = event.host, event.session; + local must_secure = secure_auth; + + if not must_secure and secure_domains[host] then + must_secure = true; + elseif must_secure and insecure_domains[host] then + must_secure = false; + end + + if must_secure and (session.cert_chain_status ~= "valid" or session.cert_identity_status ~= "valid") then + module:log("warn", "Forbidding insecure connection to/from %s", host or session.ip or "(unknown host)"); + if session.direction == "incoming" then + session:close({ condition = "not-authorized", text = "Your server's certificate is invalid, expired, or not trusted by "..session.to_host }); + else -- Close outgoing connections without warning + session:close(false); + end + return false; + end +end + +module:hook("s2s-check-certificate", check_auth_policy, -1); + s2sout.set_listener(listener); -module:add_item("net-provider", { +module:hook("server-stopping", function(event) + local reason = event.reason; + for _, session in pairs(sessions) do + session:close{ condition = "system-shutdown", text = reason }; + end +end,500); + + + +module:provides("net", { name = "s2s"; listener = listener; default_port = 5269;