X-Git-Url: https://git.enpas.org/?a=blobdiff_plain;f=plugins%2Fmod_c2s.lua;h=7a8af406acdd6b3fe2a5710bea28338365c08d36;hb=fea2a20a1c12fe7415426658e55095fee94fe656;hp=dabed021307f570902e9676c8d32c08ebc5ffb00;hpb=6fa453d785f470b652fd316df795d26e4a1b4694;p=prosody.git diff --git a/plugins/mod_c2s.lua b/plugins/mod_c2s.lua index dabed021..7a8af406 100644 --- a/plugins/mod_c2s.lua +++ b/plugins/mod_c2s.lua @@ -1,7 +1,7 @@ -- Prosody IM -- Copyright (C) 2008-2010 Matthew Wild -- Copyright (C) 2008-2010 Waqas Hussain --- +-- -- This project is MIT/X11 licensed. Please see the -- COPYING file in the source package for more information. -- @@ -15,9 +15,10 @@ local sessionmanager = require "core.sessionmanager"; local st = require "util.stanza"; local sm_new_session, sm_destroy_session = sessionmanager.new_session, sessionmanager.destroy_session; local uuid_generate = require "util.uuid".generate; +local runner = require "util.async".runner; local xpcall, tostring, type = xpcall, tostring, type; -local traceback = debug.traceback; +local t_insert, t_remove = table.insert, table.remove; local xmlns_xmpp_streams = "urn:ietf:params:xml:ns:xmpp-streams"; @@ -25,17 +26,18 @@ local log = module._log; local c2s_timeout = module:get_option_number("c2s_timeout"); local stream_close_timeout = module:get_option_number("c2s_close_timeout", 5); -local opt_keepalives = module:get_option_boolean("tcp_keepalives", false); +local opt_keepalives = module:get_option_boolean("c2s_tcp_keepalives", module:get_option_boolean("tcp_keepalives", true)); local sessions = module:shared("sessions"); local core_process_stanza = prosody.core_process_stanza; +local hosts = prosody.hosts; -local stream_callbacks = { default_ns = "jabber:client", handlestanza = core_process_stanza }; +local stream_callbacks = { default_ns = "jabber:client" }; local listener = {}; +local runner_callbacks = {}; --- Stream events handlers local stream_xmlns_attr = {xmlns='urn:ietf:params:xml:ns:xmpp-streams'}; -local default_stream_attr = { ["xmlns:stream"] = "http://etherx.jabber.org/streams", xmlns = stream_callbacks.default_ns, version = "1.0", id = "" }; function stream_callbacks.streamopened(session, attr) local send = session.send; @@ -49,15 +51,13 @@ function stream_callbacks.streamopened(session, attr) session.streamid = uuid_generate(); (session.log or session)("debug", "Client sent opening to %s", session.host); - if not hosts[session.host] then + if not hosts[session.host] or not hosts[session.host].modules.c2s then -- We don't serve this host... session:close{ condition = "host-unknown", text = "This server does not serve "..tostring(session.host)}; return; end - send(""..st.stanza("stream:stream", { - xmlns = 'jabber:client', ["xmlns:stream"] = 'http://etherx.jabber.org/streams'; - id = session.streamid, from = session.host, version = '1.0', ["xml:lang"] = 'en' }):top_tag()); + session:open_stream(); (session.log or log)("debug", "Sent reply to client"); session.notopen = nil; @@ -66,12 +66,21 @@ function stream_callbacks.streamopened(session, attr) -- since we now have a new stream header, session is secured if session.secure == false then session.secure = true; + session.encrypted = true; + + local sock = session.conn:socket(); + if sock.info then + local info = sock:info(); + (session.log or log)("info", "Stream encrypted (%s with %s)", info.protocol, info.cipher); + session.compressed = info.compression; + else + (session.log or log)("info", "Stream encrypted"); + session.compressed = sock.compression and sock:compression(); --COMPAT mw/luasec-hg + end end local features = st.stanza("stream:features"); hosts[session.host].events.fire_event("stream-features", { origin = session, features = features }); - module:fire_event("stream-features", session, features); - send(features); end @@ -107,12 +116,9 @@ function stream_callbacks.error(session, error, data) end end -local function handleerr(err) log("error", "Traceback[c2s]: %s: %s", tostring(err), traceback()); end function stream_callbacks.handlestanza(session, stanza) stanza = session.filter("stanzas/in", stanza); - if stanza then - return xpcall(function () return core_process_stanza(session, stanza) end, handleerr); - end + session.thread:run(stanza); end --- Session methods @@ -120,36 +126,35 @@ local function session_close(session, reason) local log = session.log or log; if session.conn then if session.notopen then - session.send(""); - session.send(st.stanza("stream:stream", default_stream_attr):top_tag()); + session:open_stream(); end if reason then -- nil == no err, initiated by us, false == initiated by client + local stream_error = st.stanza("stream:error"); if type(reason) == "string" then -- assume stream error - log("debug", "Disconnecting client, is: %s", reason); - session.send(st.stanza("stream:error"):tag(reason, {xmlns = 'urn:ietf:params:xml:ns:xmpp-streams' })); + stream_error:tag(reason, {xmlns = 'urn:ietf:params:xml:ns:xmpp-streams' }); elseif type(reason) == "table" then if reason.condition then - local stanza = st.stanza("stream:error"):tag(reason.condition, stream_xmlns_attr):up(); + stream_error:tag(reason.condition, stream_xmlns_attr):up(); if reason.text then - stanza:tag("text", stream_xmlns_attr):text(reason.text):up(); + stream_error:tag("text", stream_xmlns_attr):text(reason.text):up(); end if reason.extra then - stanza:add_child(reason.extra); + stream_error:add_child(reason.extra); end - log("debug", "Disconnecting client, is: %s", tostring(stanza)); - session.send(stanza); elseif reason.name then -- a stanza - log("debug", "Disconnecting client, is: %s", tostring(reason)); - session.send(reason); + stream_error = reason; end end + stream_error = tostring(stream_error); + log("debug", "Disconnecting client, is: %s", stream_error); + session.send(stream_error); end - + session.send(""); function session.send() return false; end - - local reason = (reason and (reason.text or reason.condition)) or reason; - session.log("info", "c2s stream for %s closed: %s", session.full_jid or ("<"..session.ip..">"), reason or "session closed"); + + local reason = (reason and (reason.name or reason.text or reason.condition)) or reason; + session.log("debug", "c2s stream for %s closed: %s", session.full_jid or ("<"..session.ip..">"), reason or "session closed"); -- Authenticated incoming stream may still be sending us stanzas, so wait for from remote local conn = session.conn; @@ -169,6 +174,19 @@ local function session_close(session, reason) end end +local function session_open_stream(session) + local attr = { + ["xmlns:stream"] = 'http://etherx.jabber.org/streams', + xmlns = stream_callbacks.default_ns, + version = "1.0", + ["xml:lang"] = 'en', + id = session.streamid or "", + from = session.host + }; + session.send(""); + session.send(st.stanza("stream:stream", attr):top_tag()); +end + module:hook_global("user-deleted", function(event) local username, host = event.username, event.host; local user = hosts[host].sessions[username]; @@ -179,45 +197,74 @@ module:hook_global("user-deleted", function(event) end end, 200); +function runner_callbacks:ready() + self.data.conn:resume(); +end + +function runner_callbacks:waiting() + self.data.conn:pause(); +end + +function runner_callbacks:error(err) + (self.data.log or log)("error", "Traceback[c2s]: %s", err); +end + --- Port listener function listener.onconnect(conn) local session = sm_new_session(conn); sessions[conn] = session; - + session.log("info", "Client connected"); - + -- Client is using legacy SSL (otherwise mod_tls sets this flag) if conn:ssl() then session.secure = true; + session.encrypted = true; + + -- Check if TLS compression is used + local sock = conn:socket(); + if sock.info then + session.compressed = sock:info"compression"; + elseif sock.compression then + session.compressed = sock:compression(); --COMPAT mw/luasec-hg + end end - + if opt_keepalives then conn:setoption("keepalive", opt_keepalives); end - + + session.open_stream = session_open_stream; session.close = session_close; - + local stream = new_xmpp_stream(session, stream_callbacks); session.stream = stream; session.notopen = true; - + function session.reset_stream() session.notopen = true; session.stream:reset(); end - + + session.thread = runner(function (stanza) + core_process_stanza(session, stanza); + end, runner_callbacks, session); + local filter = session.filter; function session.data(data) - data = filter("bytes/in", data); + -- Parse the data, which will store stanzas in session.pending_stanzas if data then - local ok, err = stream:feed(data); - if ok then return; end - log("debug", "Received invalid XML (%s) %d bytes: %s", tostring(err), #data, data:sub(1, 300):gsub("[\r\n]+", " "):gsub("[%z\1-\31]", "_")); - session:close("not-well-formed"); + data = filter("bytes/in", data); + if data then + local ok, err = stream:feed(data); + if not ok then + log("debug", "Received invalid XML (%s) %d bytes: %s", tostring(err), #data, data:sub(1, 300):gsub("[\r\n]+", " "):gsub("[%z\1-\31]", "_")); + session:close("not-well-formed"); + end + end end end - if c2s_timeout then add_task(c2s_timeout, function () if session.type == "c2s_unauthed" then @@ -245,10 +292,36 @@ function listener.ondisconnect(conn, err) end end +function listener.onreadtimeout(conn) + local session = sessions[conn]; + if session then + return (hosts[session.host] or prosody).events.fire_event("c2s-read-timeout", { session = session }); + end +end + +local function keepalive(event) + return event.session.send(' '); +end + function listener.associate_session(conn, session) sessions[conn] = session; end +function module.add_host(module) + module:hook("c2s-read-timeout", keepalive, -1); +end + +module:hook("c2s-read-timeout", keepalive, -1); + +module:hook("server-stopping", function(event) + local reason = event.reason; + for _, session in pairs(sessions) do + session:close{ condition = "system-shutdown", text = reason }; + end +end, 1000); + + + module:provides("net", { name = "c2s"; listener = listener;