X-Git-Url: https://git.enpas.org/?a=blobdiff_plain;f=plugins%2Fmod_s2s%2Fmod_s2s.lua;h=1d03f3e4e57eb208e59ddf4074d8a3b23b7cabe8;hb=bd602f6c077112e2f0c9125ad9cc6dd646322fec;hp=d22463bd25cadafcf434c7d0a611ea9dc03ede4c;hpb=ef8cc7de12cef35445931c07e7bd72b7b17f2b94;p=prosody.git diff --git a/plugins/mod_s2s/mod_s2s.lua b/plugins/mod_s2s/mod_s2s.lua index d22463bd..1d03f3e4 100644 --- a/plugins/mod_s2s/mod_s2s.lua +++ b/plugins/mod_s2s/mod_s2s.lua @@ -1,7 +1,7 @@ -- Prosody IM -- Copyright (C) 2008-2010 Matthew Wild -- Copyright (C) 2008-2010 Waqas Hussain --- +-- -- This project is MIT/X11 licensed. Please see the -- COPYING file in the source package for more information. -- @@ -10,11 +10,12 @@ module:set_global(); local prosody = prosody; local hosts = prosody.hosts; -local core_process_stanza = core_process_stanza; +local core_process_stanza = prosody.core_process_stanza; local tostring, type = tostring, type; local t_insert = table.insert; local xpcall, traceback = xpcall, debug.traceback; +local NULL = {}; local add_task = require "util.timer".add_task; local st = require "util.stanza"; @@ -24,14 +25,19 @@ local new_xmpp_stream = require "util.xmppstream".new; local s2s_new_incoming = require "core.s2smanager".new_incoming; local s2s_new_outgoing = require "core.s2smanager".new_outgoing; local s2s_destroy_session = require "core.s2smanager".destroy_session; -local s2s_mark_connected = require "core.s2smanager".mark_connected; local uuid_gen = require "util.uuid".generate; local cert_verify_identity = require "util.x509".verify_identity; +local fire_global_event = prosody.events.fire_event; local s2sout = module:require("s2sout"); -local connect_timeout = module:get_option_number("s2s_timeout", 60); +local connect_timeout = module:get_option_number("s2s_timeout", 90); local stream_close_timeout = module:get_option_number("s2s_close_timeout", 5); +local opt_keepalives = module:get_option_boolean("s2s_tcp_keepalives", module:get_option_boolean("tcp_keepalives", true)); +local secure_auth = module:get_option_boolean("s2s_secure_auth", false); -- One day... +local secure_domains, insecure_domains = + module:get_option_set("s2s_secure_domains", {})._items, module:get_option_set("s2s_insecure_domains", {})._items; +local require_encryption = module:get_option_boolean("s2s_require_encryption", false); local sessions = module:shared("sessions"); @@ -75,6 +81,10 @@ function route_to_existing_session(event) log("warn", "Attempt to send stanza from %s - a host we don't serve", from_host); return false; end + if hosts[to_host] then + log("warn", "Attempt to route stanza to a remote %s - a host we do serve?!", from_host); + return false; + end local host = hosts[from_host].s2sout[to_host]; if host then -- We have a connection to this host already @@ -125,17 +135,98 @@ function route_to_new_session(event) return true; end +local function keepalive(event) + return event.session.sends2s(' '); +end + +module:hook("s2s-read-timeout", keepalive, -1); + function module.add_host(module) if module:get_option_boolean("disallow_s2s", false) then module:log("warn", "The 'disallow_s2s' config option is deprecated, please see http://prosody.im/doc/s2s#disabling"); return nil, "This host has disallow_s2s set"; end - module:hook("route/remote", route_to_existing_session, 200); - module:hook("route/remote", route_to_new_session, 100); + module:hook("route/remote", route_to_existing_session, -1); + module:hook("route/remote", route_to_new_session, -10); + module:hook("s2s-authenticated", make_authenticated, -1); + module:hook("s2s-read-timeout", keepalive, -1); +end + +-- Stream is authorised, and ready for normal stanzas +function mark_connected(session) + local sendq, send = session.sendq, session.sends2s; + + local from, to = session.from_host, session.to_host; + + session.log("info", "%s s2s connection %s->%s complete", session.direction:gsub("^.", string.upper), from, to); + + local event_data = { session = session }; + if session.type == "s2sout" then + fire_global_event("s2sout-established", event_data); + hosts[from].events.fire_event("s2sout-established", event_data); + else + local host_session = hosts[to]; + session.send = function(stanza) + return host_session.events.fire_event("route/remote", { from_host = to, to_host = from, stanza = stanza }); + end; + + fire_global_event("s2sin-established", event_data); + hosts[to].events.fire_event("s2sin-established", event_data); + end + + if session.direction == "outgoing" then + if sendq then + session.log("debug", "sending %d queued stanzas across new outgoing connection to %s", #sendq, session.to_host); + for i, data in ipairs(sendq) do + send(data[1]); + sendq[i] = nil; + end + session.sendq = nil; + end + + session.ip_hosts = nil; + session.srv_hosts = nil; + end +end + +function make_authenticated(event) + local session, host = event.session, event.host; + if not session.secure then + if require_encryption or (secure_auth and not(insecure_domains[host])) or secure_domains[host] then + session:close({ + condition = "policy-violation", + text = "Encrypted server-to-server communication is required but was not " + ..((session.direction == "outgoing" and "offered") or "used") + }); + end + end + if hosts[host] then + session:close({ condition = "undefined-condition", text = "Attempt to authenticate as a host we serve" }); + end + if session.type == "s2sout_unauthed" then + session.type = "s2sout"; + elseif session.type == "s2sin_unauthed" then + session.type = "s2sin"; + if host then + if not session.hosts[host] then session.hosts[host] = {}; end + session.hosts[host].authed = true; + end + elseif session.type == "s2sin" and host then + if not session.hosts[host] then session.hosts[host] = {}; end + session.hosts[host].authed = true; + else + return false; + end + session.log("debug", "connection %s->%s is now authenticated for %s", session.from_host, session.to_host, host); + + mark_connected(session); + + return true; end --- Helper to check that a session peer's certificate is valid local function check_cert_status(session) + local host = session.direction == "outgoing" and session.to_host or session.from_host local conn = session.conn:socket() local cert if conn.getpeercertificate then @@ -143,11 +234,19 @@ local function check_cert_status(session) end if cert then - local chain_valid, errors = conn:getpeerverification() + local chain_valid, errors; + if conn.getpeerverification then + chain_valid, errors = conn:getpeerverification(); + elseif conn.getpeerchainvalid then -- COMPAT mw/luasec-hg + chain_valid, errors = conn:getpeerchainvalid(); + errors = (not chain_valid) and { { errors } } or nil; + else + chain_valid, errors = false, { { "Chain verification not supported by this version of LuaSec" } }; + end -- Is there any interest in printing out all/the number of errors here? if not chain_valid then (session.log or log)("debug", "certificate chain validation result: invalid"); - for depth, t in ipairs(errors) do + for depth, t in pairs(errors or NULL) do (session.log or log)("debug", "certificate error(s) at depth %d: %s", depth-1, table.concat(t, ", ")) end session.cert_chain_status = "invalid"; @@ -155,8 +254,6 @@ local function check_cert_status(session) (session.log or log)("debug", "certificate chain validation result: valid"); session.cert_chain_status = "valid"; - local host = session.direction == "incoming" and session.from_host or session.to_host - -- We'll go ahead and verify the asserted identity if the -- connecting server specified one. if host then @@ -165,9 +262,11 @@ local function check_cert_status(session) else session.cert_identity_status = "invalid" end + (session.log or log)("debug", "certificate identity validation result: %s", session.cert_identity_status); end end end + return module:fire_event("s2s-check-certificate", { host = host, session = session, cert = cert }); end --- XMPP stream event handlers @@ -178,17 +277,27 @@ local xmlns_xmpp_streams = "urn:ietf:params:xml:ns:xmpp-streams"; function stream_callbacks.streamopened(session, attr) local send = session.sends2s; - + session.version = tonumber(attr.version) or 0; - + -- TODO: Rename session.secure to session.encrypted if session.secure == false then session.secure = true; + + local sock = session.conn:socket(); + if sock.info then + local info = sock:info(); + (session.log or log)("info", "Stream encrypted (%s with %s)", info.protocol, info.cipher); + session.compressed = info.compression; + else + (session.log or log)("info", "Stream encrypted"); + session.compressed = sock.compression and sock:compression(); --COMPAT mw/luasec-hg + end end if session.direction == "incoming" then -- Send a reply stream header - + -- Validate to/from local to, from = nameprep(attr.to), nameprep(attr.from); if not to and attr.to then -- COMPAT: Some servers do not reliably set 'to' (especially on stream restarts) @@ -199,7 +308,7 @@ function stream_callbacks.streamopened(session, attr) session:close({ condition = "improper-addressing", text = "Invalid 'from' address" }); return; end - + -- Set session.[from/to]_host if they have not been set already and if -- this session isn't already authenticated if session.type == "s2sin_unauthed" and from and not session.from_host then @@ -214,10 +323,10 @@ function stream_callbacks.streamopened(session, attr) session:close({ condition = "improper-addressing", text = "New stream 'to' attribute does not match original" }); return; end - + -- For convenience we'll put the sanitised values into these variables to, from = session.to_host, session.from_host; - + session.streamid = uuid_gen(); (session.log or log)("debug", "Incoming s2s received %s", st.stanza("stream:stream", attr):top_tag()); if to then @@ -238,20 +347,27 @@ function stream_callbacks.streamopened(session, attr) end end - if session.secure and not session.cert_chain_status then check_cert_status(session); end + if hosts[from] then + session:close({ condition = "undefined-condition", text = "Attempt to connect from a host we serve" }); + return; + end - send(""); - send(st.stanza("stream:stream", { xmlns='jabber:server', ["xmlns:db"]='jabber:server:dialback', - ["xmlns:stream"]='http://etherx.jabber.org/streams', id=session.streamid, from=to, to=from, version=(session.version > 0 and "1.0" or nil) }):top_tag()); + if session.secure and not session.cert_chain_status then + if check_cert_status(session) == false then + return; + end + end + + session:open_stream(session.to_host, session.from_host) if session.version >= 1.0 then local features = st.stanza("stream:features"); - + if to then hosts[to].events.fire_event("s2s-stream-features", { origin = session, features = features }); else (session.log or log)("warn", "No 'to' on stream header from %s means we can't offer any features", from or "unknown host"); end - + log("debug", "Sending stream features: %s", tostring(features)); send(features); end @@ -260,7 +376,11 @@ function stream_callbacks.streamopened(session, attr) if not attr.id then error("stream response did not give us a streamid!!!"); end session.streamid = attr.id; - if session.secure and not session.cert_chain_status then check_cert_status(session); end + if session.secure and not session.cert_chain_status then + if check_cert_status(session) == false then + return; + end + end -- Send unauthed buffer -- (stanzas which are fine to send before dialback) @@ -275,13 +395,13 @@ function stream_callbacks.streamopened(session, attr) end end session.send_buffer = nil; - + -- If server is pre-1.0, don't wait for features, just do dialback if session.version < 1.0 then if not session.dialback_verifying then - hosts[session.from_host].events.fire_event("s2s-authenticate-legacy", { origin = session }); + hosts[session.from_host].events.fire_event("s2sout-authenticate-legacy", { origin = session }); else - s2s_mark_connected(session); + mark_connected(session); end end end @@ -319,7 +439,7 @@ function stream_callbacks.error(session, error, data) end end -local function handleerr(err) log("error", "Traceback[s2s]: %s: %s", tostring(err), traceback()); end +local function handleerr(err) log("error", "Traceback[s2s]: %s", traceback(tostring(err), 2)); end function stream_callbacks.handlestanza(session, stanza) if stanza.attr.xmlns == "jabber:client" then --COMPAT: Prosody pre-0.6.2 may send jabber:client stanza.attr.xmlns = nil; @@ -334,13 +454,15 @@ local listener = {}; --- Session methods local stream_xmlns_attr = {xmlns='urn:ietf:params:xml:ns:xmpp-streams'}; -local default_stream_attr = { ["xmlns:stream"] = "http://etherx.jabber.org/streams", xmlns = stream_callbacks.default_ns, version = "1.0", id = "" }; local function session_close(session, reason, remote_reason) local log = session.log or log; if session.conn then if session.notopen then - session.sends2s(""); - session.sends2s(st.stanza("stream:stream", default_stream_attr):top_tag()); + if session.direction == "incoming" then + session:open_stream(session.to_host, session.from_host); + else + session:open_stream(session.from_host, session.to_host); + end end if reason then -- nil == no err, initiated by us, false == initiated by remote if type(reason) == "string" then -- assume stream error @@ -366,10 +488,10 @@ local function session_close(session, reason, remote_reason) session.sends2s(""); function session.sends2s() return false; end - + local reason = remote_reason or (reason and (reason.text or reason.condition)) or reason; - session.log("info", "%s s2s stream %s->%s closed: %s", session.direction, session.from_host or "(unknown host)", session.to_host or "(unknown host)", reason or "stream closed"); - + session.log("info", "%s s2s stream %s->%s closed: %s", session.direction:gsub("^.", string.upper), session.from_host or "(unknown host)", session.to_host or "(unknown host)", reason or "stream closed"); + -- Authenticated incoming stream may still be sending us stanzas, so wait for from remote local conn = session.conn; if reason == nil and not session.notopen and session.type == "s2sin" then @@ -387,18 +509,38 @@ local function session_close(session, reason, remote_reason) end end +function session_open_stream(session, from, to) + local attr = { + ["xmlns:stream"] = 'http://etherx.jabber.org/streams', + xmlns = 'jabber:server', + version = session.version and (session.version > 0 and "1.0" or nil), + ["xml:lang"] = 'en', + id = session.streamid, + from = from, to = to, + } + if not from or (hosts[from] and hosts[from].modules.dialback) then + attr["xmlns:db"] = 'jabber:server:dialback'; + end + + session.sends2s(""); + session.sends2s(st.stanza("stream:stream", attr):top_tag()); + return true; +end + -- Session initialization logic shared by incoming and outgoing local function initialize_session(session) local stream = new_xmpp_stream(session, stream_callbacks); session.stream = stream; - + session.notopen = true; - + function session.reset_stream() session.notopen = true; session.stream:reset(); end - + + session.open_stream = session_open_stream; + local filter = session.filter; function session.data(data) data = filter("bytes/in", data); @@ -421,6 +563,8 @@ local function initialize_session(session) add_task(connect_timeout, function () if session.type == "s2sin" or session.type == "s2sout" then return; -- Ok, we're connected + elseif session.type == "s2s_destroyed" then + return; -- Session already destroyed end -- Not connected, need to close session and clean up (session.log or log)("debug", "Destroying incomplete session %s->%s due to inactivity", @@ -430,6 +574,7 @@ local function initialize_session(session) end function listener.onconnect(conn) + conn:setoption("keepalive", opt_keepalives); local session = sessions[conn]; if not session then -- New incoming connection session = s2s_new_incoming(conn); @@ -450,11 +595,12 @@ function listener.onconnect(conn) end end end - + initialize_session(session); else -- Outgoing session connected session:open_stream(session.from_host, session.to_host); end + session.ip = conn:ip(); end function listener.onincoming(conn, data) @@ -463,7 +609,7 @@ function listener.onincoming(conn, data) session.data(data); end end - + function listener.onstatus(conn, status) if status == "ssl-handshake-complete" then local session = sessions[conn]; @@ -477,16 +623,22 @@ end function listener.ondisconnect(conn, err) local session = sessions[conn]; if session then + sessions[conn] = nil; if err and session.direction == "outgoing" and session.notopen then (session.log or log)("debug", "s2s connection attempt failed: %s", err); if s2sout.attempt_connection(session, err) then - (session.log or log)("debug", "...so we're going to try another target"); return; -- Session lives for now end end (session.log or log)("debug", "s2s disconnected: %s->%s (%s)", tostring(session.from_host), tostring(session.to_host), tostring(err or "connection closed")); s2s_destroy_session(session, err); - sessions[conn] = nil; + end +end + +function listener.onreadtimeout(conn) + local session = sessions[conn]; + if session then + return (hosts[session.host] or prosody).events.fire_event("s2s-read-timeout", { session = session }); end end @@ -496,9 +648,41 @@ function listener.register_outgoing(conn, session) initialize_session(session); end +function check_auth_policy(event) + local host, session = event.host, event.session; + local must_secure = secure_auth; + + if not must_secure and secure_domains[host] then + must_secure = true; + elseif must_secure and insecure_domains[host] then + must_secure = false; + end + + if must_secure and (session.cert_chain_status ~= "valid" or session.cert_identity_status ~= "valid") then + module:log("warn", "Forbidding insecure connection to/from %s", host); + if session.direction == "incoming" then + session:close({ condition = "not-authorized", text = "Your server's certificate is invalid, expired, or not trusted by "..session.to_host }); + else -- Close outgoing connections without warning + session:close(false); + end + return false; + end +end + +module:hook("s2s-check-certificate", check_auth_policy, -1); + s2sout.set_listener(listener); -module:add_item("net-provider", { +module:hook("server-stopping", function(event) + local reason = event.reason; + for _, session in pairs(sessions) do + session:close{ condition = "system-shutdown", text = reason }; + end +end,500); + + + +module:provides("net", { name = "s2s"; listener = listener; default_port = 5269;