local prosody = prosody;
local hosts = prosody.hosts;
-local core_process_stanza = core_process_stanza;
+local core_process_stanza = prosody.core_process_stanza;
local tostring, type = tostring, type;
local t_insert = table.insert;
local s2sout = module:require("s2sout");
-local connect_timeout = module:get_option_number("s2s_timeout", 60);
+local connect_timeout = module:get_option_number("s2s_timeout", 90);
+local stream_close_timeout = module:get_option_number("s2s_close_timeout", 5);
local sessions = module:shared("sessions");
log("error", "WARNING! This might, possibly, be a bug, but it might not...");
log("error", "We are going to send from %s instead of %s", tostring(host.from_host), tostring(from_host));
end
- host.sends2s(stanza);
- host.log("debug", "stanza sent over "..host.type);
- return true;
+ if host.sends2s(stanza) then
+ host.log("debug", "stanza sent over %s", host.type);
+ return true;
+ end
end
end
end
--- Helper to check that a session peer's certificate is valid
local function check_cert_status(session)
+ local host = session.direction == "incoming" and session.from_host or session.to_host
local conn = session.conn:socket()
local cert
if conn.getpeercertificate then
(session.log or log)("debug", "certificate chain validation result: valid");
session.cert_chain_status = "valid";
- local host = session.direction == "incoming" and session.from_host or session.to_host
-
-- We'll go ahead and verify the asserted identity if the
-- connecting server specified one.
if host then
end
end
end
+ module:fire_event("s2s-check-certificate", { host = host, session = session, cert = cert });
end
--- XMPP stream event handlers
function stream_callbacks.streamopened(session, attr)
local send = session.sends2s;
- -- TODO: #29: SASL/TLS on s2s streams
session.version = tonumber(attr.version) or 0;
-- TODO: Rename session.secure to session.encrypted
if session.secure == false then
session.secure = true;
+
+ -- Check if TLS compression is used
+ local sock = session.conn:socket();
+ if sock.info then
+ session.compressed = sock:info"compression";
+ elseif sock.compression then
+ session.compressed = sock:compression(); --COMPAT mw/luasec-hg
+ end
end
if session.direction == "incoming" then
-- If server is pre-1.0, don't wait for features, just do dialback
if session.version < 1.0 then
if not session.dialback_verifying then
- hosts[session.from_host].events.fire_event("s2s-authenticate-legacy", { origin = session });
+ hosts[session.from_host].events.fire_event("s2sout-authenticate-legacy", { origin = session });
else
s2s_mark_connected(session);
end
function stream_callbacks.streamclosed(session)
(session.log or log)("debug", "Received </stream:stream>");
- session:close();
-end
-
-function stream_callbacks.streamdisconnected(session, err)
- if err and err ~= "closed" and session.direction == "outgoing" then
- (session.log or log)("debug", "s2s connection attempt failed: %s", err);
- if s2sout.attempt_connection(session, err) then
- (session.log or log)("debug", "...so we're going to try another target");
- return true; -- Session lives for now
- end
- end
- (session.log or log)("info", "s2s disconnected: %s->%s (%s)", tostring(session.from_host), tostring(session.to_host), tostring(err or "closed"));
- s2s_destroy_session(session, err);
+ session:close(false);
end
function stream_callbacks.error(session, error, data)
session.sends2s("<?xml version='1.0'?>");
session.sends2s(st.stanza("stream:stream", default_stream_attr):top_tag());
end
- if reason then
+ if reason then -- nil == no err, initiated by us, false == initiated by remote
if type(reason) == "string" then -- assume stream error
- log("info", "Disconnecting %s[%s], <stream:error> is: %s", session.host or "(unknown host)", session.type, reason);
+ log("debug", "Disconnecting %s[%s], <stream:error> is: %s", session.host or "(unknown host)", session.type, reason);
session.sends2s(st.stanza("stream:error"):tag(reason, {xmlns = 'urn:ietf:params:xml:ns:xmpp-streams' }));
elseif type(reason) == "table" then
if reason.condition then
if reason.extra then
stanza:add_child(reason.extra);
end
- log("info", "Disconnecting %s[%s], <stream:error> is: %s", session.host or "(unknown host)", session.type, tostring(stanza));
+ log("debug", "Disconnecting %s[%s], <stream:error> is: %s", session.host or "(unknown host)", session.type, tostring(stanza));
session.sends2s(stanza);
elseif reason.name then -- a stanza
- log("info", "Disconnecting %s->%s[%s], <stream:error> is: %s", session.from_host or "(unknown host)", session.to_host or "(unknown host)", session.type, tostring(reason));
+ log("debug", "Disconnecting %s->%s[%s], <stream:error> is: %s", session.from_host or "(unknown host)", session.to_host or "(unknown host)", session.type, tostring(reason));
session.sends2s(reason);
end
end
end
+
session.sends2s("</stream:stream>");
- if session.notopen or not session.conn:close() then
- session.conn:close(true); -- Force FIXME: timer?
+ function session.sends2s() return false; end
+
+ local reason = remote_reason or (reason and (reason.text or reason.condition)) or reason;
+ session.log("info", "%s s2s stream %s->%s closed: %s", session.direction, session.from_host or "(unknown host)", session.to_host or "(unknown host)", reason or "stream closed");
+
+ -- Authenticated incoming stream may still be sending us stanzas, so wait for </stream:stream> from remote
+ local conn = session.conn;
+ if reason == nil and not session.notopen and session.type == "s2sin" then
+ add_task(stream_close_timeout, function ()
+ if not session.destroyed then
+ session.log("warn", "Failed to receive a stream close response, closing connection anyway...");
+ s2s_destroy_session(session, reason);
+ conn:close();
+ end
+ end);
+ else
+ s2s_destroy_session(session, reason);
+ conn:close(); -- Close immediately, as this is an outgoing connection or is not authed
end
- session.conn:close();
- listener.ondisconnect(session.conn, remote_reason or (reason and (reason.text or reason.condition)) or reason or "stream closed");
end
end
return handlestanza(session, stanza);
end
- local conn = session.conn;
add_task(connect_timeout, function ()
- if session.conn ~= conn or session.connecting
- or session.type == "s2sin" or session.type == "s2sout" then
- return; -- Ok, we're connect[ed|ing]
+ if session.type == "s2sin" or session.type == "s2sout" then
+ return; -- Ok, we're connected
+ elseif session.type == "s2s_destroyed" then
+ return; -- Session already destroyed
end
-- Not connected, need to close session and clean up
(session.log or log)("debug", "Destroying incomplete session %s->%s due to inactivity",
end
function listener.onconnect(conn)
- if not sessions[conn] then -- May be an existing outgoing session
- local session = s2s_new_incoming(conn);
+ local session = sessions[conn];
+ if not session then -- New incoming connection
+ session = s2s_new_incoming(conn);
sessions[conn] = session;
session.log("debug", "Incoming s2s connection");
end
initialize_session(session);
+ else -- Outgoing session connected
+ session:open_stream(session.from_host, session.to_host);
end
end
function listener.ondisconnect(conn, err)
local session = sessions[conn];
if session then
- if stream_callbacks.streamdisconnected(session, err) then
- return; -- Connection lives, for now
+ sessions[conn] = nil;
+ if err and session.direction == "outgoing" and session.notopen then
+ (session.log or log)("debug", "s2s connection attempt failed: %s", err);
+ if s2sout.attempt_connection(session, err) then
+ (session.log or log)("debug", "...so we're going to try another target");
+ return; -- Session lives for now
+ end
end
+ (session.log or log)("debug", "s2s disconnected: %s->%s (%s)", tostring(session.from_host), tostring(session.to_host), tostring(err or "connection closed"));
+ s2s_destroy_session(session, err);
end
- sessions[conn] = nil;
end
function listener.register_outgoing(conn, session)
s2sout.set_listener(listener);
-module:add_item("net-provider", {
+module:hook("server-stopping", function(event)
+ local reason = event.reason;
+ for _, session in pairs(sessions) do
+ session:close{ condition = "system-shutdown", text = reason };
+ end
+end,500);
+
+
+
+module:provides("net", {
name = "s2s";
listener = listener;
default_port = 5269;