X-Git-Url: https://git.enpas.org/?a=blobdiff_plain;f=core%2Fs2smanager.lua;h=5243085c98220ba0fb3410cbc3ef4049069ea605;hb=61fbebe4017d142510fbcfce61714ecb48b99503;hp=7827381d4b1eee375a674448fc7e760f2048bf82;hpb=32ae5049dfc5d137a711713007a9012cd5d79502;p=prosody.git diff --git a/core/s2smanager.lua b/core/s2smanager.lua index 7827381d..5243085c 100644 --- a/core/s2smanager.lua +++ b/core/s2smanager.lua @@ -1,4 +1,4 @@ --- Prosody IM v0.1 +-- Prosody IM v0.2 -- Copyright (C) 2008 Matthew Wild -- Copyright (C) 2008 Waqas Hussain -- @@ -21,6 +21,7 @@ local hosts = hosts; local sessions = sessions; +local core_process_stanza = function(a, b) core_process_stanza(a, b); end local socket = require "socket"; local format = string.format; local t_insert, t_sort = table.insert, table.sort; @@ -30,7 +31,7 @@ local tostring, pairs, ipairs, getmetatable, print, newproxy, error, tonumber local idna_to_ascii = require "util.encodings".idna.to_ascii; local connlisteners_get = require "net.connlisteners".get; -local wraptlsclient = require "net.server".wraptlsclient; +local wrapclient = require "net.server".wrapclient; local modulemanager = require "core.modulemanager"; local st = require "stanza"; local stanza = st.stanza; @@ -47,10 +48,39 @@ local dialback_secret = sha256_hash(tostring{} .. math.random() .. socket.gettim local dns = require "net.dns"; +incoming_s2s = {}; +local incoming_s2s = incoming_s2s; + module "s2smanager" local function compare_srv_priorities(a,b) return a.priority < b.priority or a.weight < b.weight; end +local function bounce_sendq(session) + local sendq = session.sendq; + if sendq then + session.log("debug", "sending error replies for "..#sendq.." queued stanzas because of failed outgoing connection to "..tostring(session.to_host)); + local dummy = { + type = "s2sin"; + send = function(s) + (session.log or log)("error", "Replying to to an s2s error reply, please report this! Traceback: %s", get_traceback()); + end; + dummy = true; + }; + for i, data in ipairs(sendq) do + local reply = data[2]; + local xmlns = reply.attr.xmlns; + if not xmlns or xmlns == "jabber:client" or xmlns == "jabber:server" then + reply.attr.type = "error"; + reply:tag("error", {type = "cancel"}) + :tag("remote-server-not-found", {xmlns = "urn:ietf:params:xml:ns:xmpp-stanzas"}):up(); + core_process_stanza(dummy, reply); + end + sendq[i] = nil; + end + session.sendq = nil; + end +end + function send_to_host(from_host, to_host, data) local host = hosts[from_host].s2sout[to_host]; if host then @@ -63,8 +93,8 @@ function send_to_host(from_host, to_host, data) end -- Queue stanza until we are able to send it - if host.sendq then t_insert(host.sendq, tostring(data)); - else host.sendq = { tostring(data) }; end + if host.sendq then t_insert(host.sendq, {tostring(data), st.reply(data)}); + else host.sendq = { {tostring(data), st.reply(data)} }; end host.log("debug", "stanza [%s] queued ", data.name); elseif host.type == "local" or host.type == "component" then log("error", "Trying to send a stanza to ourselves??") @@ -84,14 +114,15 @@ function send_to_host(from_host, to_host, data) log("debug", "opening a new outgoing connection for this stanza"); local host_session = new_outgoing(from_host, to_host); -- Store in buffer - host_session.sendq = { tostring(data) }; + host_session.sendq = { {tostring(data), st.reply(data)} }; + if not host_session.conn then destroy_session(host_session); end end end local open_sessions = 0; function new_incoming(conn) - local session = { conn = conn, type = "s2sin_unauthed", direction = "incoming" }; + local session = { conn = conn, type = "s2sin_unauthed", direction = "incoming", hosts = {} }; if true then session.trace = newproxy(true); getmetatable(session.trace).__gc = function () open_sessions = open_sessions - 1; end; @@ -99,6 +130,7 @@ function new_incoming(conn) open_sessions = open_sessions + 1; local w, log = conn.write, logger_init("s2sin"..tostring(conn):match("[a-f0-9]+$")); session.sends2s = function (t) log("debug", "sending: %s", tostring(t)); w(tostring(t)); end + incoming_s2s[session] = true; return session; end @@ -160,10 +192,11 @@ function attempt_connection(host_session, err) local success, err = conn:connect(connect_host, connect_port); if not success and err ~= "timeout" then log("warn", "s2s connect() failed: %s", err); + return false; end local cl = connlisteners_get("xmppserver"); - conn = wraptlsclient(cl, conn, connect_host, connect_port, 0, cl.default_mode or 1, hosts[from_host].ssl_ctx ); + conn = wrapclient(conn, connect_host, connect_port, cl, cl.default_mode or 1, hosts[from_host].ssl_ctx, false ); host_session.conn = conn; -- Register this outgoing connection so that xmppserver_listener knows about it @@ -239,11 +272,16 @@ function verify_dialback(id, to, from, key) return key == generate_dialback(id, to, from); end -function make_authenticated(session) +function make_authenticated(session, host) if session.type == "s2sout_unauthed" then session.type = "s2sout"; elseif session.type == "s2sin_unauthed" then session.type = "s2sin"; + if host then + session.hosts[host].authed = true; + end + elseif session.type == "s2sin" and host then + session.hosts[host].authed = true; else return false; end @@ -269,7 +307,7 @@ function mark_connected(session) if sendq then session.log("debug", "sending "..#sendq.." queued stanzas across new outgoing connection to "..session.to_host); for i, data in ipairs(sendq) do - send(data); + send(data[1]); sendq[i] = nil; end session.sendq = nil; @@ -280,10 +318,12 @@ end function destroy_session(session) (session.log or log)("info", "Destroying "..tostring(session.direction).." session "..tostring(session.from_host).."->"..tostring(session.to_host)); - -- FIXME: Flush sendq here/report errors to originators if session.direction == "outgoing" then hosts[session.from_host].s2sout[session.to_host] = nil; + bounce_sendq(session); + elseif session.direction == "incoming" then + incoming_s2s[session] = nil; end for k in pairs(session) do