X-Git-Url: https://git.enpas.org/?a=blobdiff_plain;f=plugins%2Fmod_c2s.lua;h=79a16e2922362bc7190962cd739128916770dfc0;hb=83ca7ad10e62a7177d71fb563e9d5d3434b20ab1;hp=89d678caedc62ef446ab0163f353a7cd5345c42e;hpb=bd91c158fdd4dca695a612c97b207af38420ae7d;p=prosody.git diff --git a/plugins/mod_c2s.lua b/plugins/mod_c2s.lua index 89d678ca..79a16e29 100644 --- a/plugins/mod_c2s.lua +++ b/plugins/mod_c2s.lua @@ -18,6 +18,8 @@ local uuid_generate = require "util.uuid".generate; local xpcall, tostring, type = xpcall, tostring, type; local traceback = debug.traceback; +local t_insert, t_remove = table.insert, table.remove; +local co_running, co_resume = coroutine.running, coroutine.resume; local xmlns_xmpp_streams = "urn:ietf:params:xml:ns:xmpp-streams"; @@ -25,12 +27,13 @@ local log = module._log; local c2s_timeout = module:get_option_number("c2s_timeout"); local stream_close_timeout = module:get_option_number("c2s_close_timeout", 5); -local opt_keepalives = module:get_option_boolean("tcp_keepalives", false); +local opt_keepalives = module:get_option_boolean("c2s_tcp_keepalives", module:get_option_boolean("tcp_keepalives", true)); local sessions = module:shared("sessions"); local core_process_stanza = prosody.core_process_stanza; +local hosts = prosody.hosts; -local stream_callbacks = { default_ns = "jabber:client", handlestanza = core_process_stanza }; +local stream_callbacks = { default_ns = "jabber:client" }; local listener = {}; --- Stream events handlers @@ -49,7 +52,7 @@ function stream_callbacks.streamopened(session, attr) session.streamid = uuid_generate(); (session.log or session)("debug", "Client sent opening to %s", session.host); - if not hosts[session.host] then + if not hosts[session.host] or not hosts[session.host].modules.c2s then -- We don't serve this host... session:close{ condition = "host-unknown", text = "This server does not serve "..tostring(session.host)}; return; @@ -67,19 +70,20 @@ function stream_callbacks.streamopened(session, attr) if session.secure == false then session.secure = true; - -- Check if TLS compression is used local sock = session.conn:socket(); if sock.info then - session.compressed = sock:info"compression"; - elseif sock.compression then - session.compressed = sock:compression(); --COMPAT mw/luasec-hg + local info = sock:info(); + (session.log or log)("info", "Stream encrypted (%s) with %s, authenticated with %s and exchanged keys with %s", + info.protocol, info.encryption, info.authentication, info.key); + session.compressed = info.compression; + else + (session.log or log)("info", "Stream encrypted"); + session.compressed = sock.compression and sock:compression(); --COMPAT mw/luasec-hg end end local features = st.stanza("stream:features"); hosts[session.host].events.fire_event("stream-features", { origin = session, features = features }); - module:fire_event("stream-features", session, features); - send(features); end @@ -115,12 +119,10 @@ function stream_callbacks.error(session, error, data) end end -local function handleerr(err) log("error", "Traceback[c2s]: %s: %s", tostring(err), traceback()); end +local function handleerr(err) log("error", "Traceback[c2s]: %s", traceback(tostring(err), 2)); end function stream_callbacks.handlestanza(session, stanza) stanza = session.filter("stanzas/in", stanza); - if stanza then - return xpcall(function () return core_process_stanza(session, stanza) end, handleerr); - end + t_insert(session.pending_stanzas, stanza); end --- Session methods @@ -132,31 +134,31 @@ local function session_close(session, reason) session.send(st.stanza("stream:stream", default_stream_attr):top_tag()); end if reason then -- nil == no err, initiated by us, false == initiated by client + local stream_error = st.stanza("stream:error"); if type(reason) == "string" then -- assume stream error - log("debug", "Disconnecting client, is: %s", reason); - session.send(st.stanza("stream:error"):tag(reason, {xmlns = 'urn:ietf:params:xml:ns:xmpp-streams' })); + stream_error:tag(reason, {xmlns = 'urn:ietf:params:xml:ns:xmpp-streams' }); elseif type(reason) == "table" then if reason.condition then - local stanza = st.stanza("stream:error"):tag(reason.condition, stream_xmlns_attr):up(); + stream_error:tag(reason.condition, stream_xmlns_attr):up(); if reason.text then - stanza:tag("text", stream_xmlns_attr):text(reason.text):up(); + stream_error:tag("text", stream_xmlns_attr):text(reason.text):up(); end if reason.extra then - stanza:add_child(reason.extra); + stream_error:add_child(reason.extra); end - log("debug", "Disconnecting client, is: %s", tostring(stanza)); - session.send(stanza); elseif reason.name then -- a stanza - log("debug", "Disconnecting client, is: %s", tostring(reason)); - session.send(reason); + stream_error = reason; end end + stream_error = tostring(stream_error); + log("debug", "Disconnecting client, is: %s", stream_error); + session.send(stream_error); end session.send(""); function session.send() return false; end - local reason = (reason and (reason.text or reason.condition)) or reason; + local reason = (reason and (reason.name or reason.text or reason.condition)) or reason; session.log("info", "c2s stream for %s closed: %s", session.full_jid or ("<"..session.ip..">"), reason or "session closed"); -- Authenticated incoming stream may still be sending us stanzas, so wait for from remote @@ -222,18 +224,65 @@ function listener.onconnect(conn) session.stream:reset(); end + session.thread = coroutine.create(function (stanza) + while true do + core_process_stanza(session, stanza); + stanza = coroutine.yield("ready"); + end + end); + + session.pending_stanzas = {}; + local filter = session.filter; function session.data(data) - data = filter("bytes/in", data); + -- Parse the data, which will store stanzas in session.pending_stanzas if data then - local ok, err = stream:feed(data); - if ok then return; end - log("debug", "Received invalid XML (%s) %d bytes: %s", tostring(err), #data, data:sub(1, 300):gsub("[\r\n]+", " "):gsub("[%z\1-\31]", "_")); - session:close("not-well-formed"); + data = filter("bytes/in", data); + if data then + local ok, err = stream:feed(data); + if not ok then + log("debug", "Received invalid XML (%s) %d bytes: %s", tostring(err), #data, data:sub(1, 300):gsub("[\r\n]+", " "):gsub("[%z\1-\31]", "_")); + session:close("not-well-formed"); + end + end + end + + if co_running() ~= session.thread and not session.paused then + if session.state == "wait" then + session.state = "ready"; + local ok, state = co_resume(session.thread); + if not ok then + log("error", "Traceback[c2s]: %s", state); + elseif state == "wait" then + return; + end + end + -- We're not currently running, so start the thread to process pending stanzas + local s, thread = session.pending_stanzas, session.thread; + local n = #s; + while n > 0 and session.state ~= "wait" do + session.log("debug", "processing %d stanzas", n); + local consumed; + for i = 1,n do + local stanza = s[i]; + local ok, state = co_resume(thread, stanza); + if not ok then + log("error", "Traceback[c2s]: %s", state); + elseif state == "wait" then + consumed = i; + session.state = "wait"; + break; + end + end + if not consumed then consumed = n; end + for i = 1, #s do + s[i] = s[consumed+i]; + end + n = #s; + end end end - if c2s_timeout then add_task(c2s_timeout, function () if session.type == "c2s_unauthed" then @@ -243,6 +292,22 @@ function listener.onconnect(conn) end session.dispatch_stanza = stream_callbacks.handlestanza; + + function session:sleep(by) + session.log("debug", "Sleeping for %s", by); + session.paused = by or "?"; + session.conn:pause(); + if co_running() == session.thread then + coroutine.yield("wait"); + end + end + function session:wake(by) + assert(session.paused == (by or "?")); + session.log("debug", "Waking for %s", by); + session.paused = nil; + session.conn:resume(); + session.data(); --FIXME: next tick? + end end function listener.onincoming(conn, data) @@ -261,10 +326,27 @@ function listener.ondisconnect(conn, err) end end +function listener.onreadtimeout(conn) + local session = sessions[conn]; + if session then + return (hosts[session.host] or prosody).events.fire_event("c2s-read-timeout", { session = session }); + end +end + +local function keepalive(event) + return event.session.send(' '); +end + function listener.associate_session(conn, session) sessions[conn] = session; end +function module.add_host(module) + module:hook("c2s-read-timeout", keepalive, -1); +end + +module:hook("c2s-read-timeout", keepalive, -1); + module:hook("server-stopping", function(event) local reason = event.reason; for _, session in pairs(sessions) do