X-Git-Url: https://git.enpas.org/?a=blobdiff_plain;f=plugins%2Fmod_bosh.lua;h=d9c8defd90ead6d891c5948ede45d90e04ce98a0;hb=6a530b77c5ddb807e7bb1f60ec54f9d36f74313e;hp=668511339c30a8893bd73521d79fecc57a90667b;hpb=9fe01c4331cadb424aa1cf6222b61810c97752ed;p=prosody.git diff --git a/plugins/mod_bosh.lua b/plugins/mod_bosh.lua index 66851133..d9c8defd 100644 --- a/plugins/mod_bosh.lua +++ b/plugins/mod_bosh.lua @@ -18,6 +18,10 @@ local core_process_stanza = prosody.core_process_stanza; local st = require "util.stanza"; local logger = require "util.logger"; local log = logger.init("mod_bosh"); +local initialize_filters = require "util.filters".initialize; +local math_min = math.min; +local xpcall, tostring, type = xpcall, tostring, type; +local traceback = debug.traceback; local xmlns_streams = "http://etherx.jabber.org/streams"; local xmlns_xmpp_streams = "urn:ietf:params:xml:ns:xmpp-streams"; @@ -30,6 +34,7 @@ local BOSH_DEFAULT_HOLD = module:get_option_number("bosh_default_hold", 1); local BOSH_DEFAULT_INACTIVITY = module:get_option_number("bosh_max_inactivity", 60); local BOSH_DEFAULT_POLLING = module:get_option_number("bosh_max_polling", 5); local BOSH_DEFAULT_REQUESTS = module:get_option_number("bosh_max_requests", 2); +local bosh_max_wait = module:get_option_number("bosh_max_wait", 120); local consider_bosh_secure = module:get_option_boolean("consider_bosh_secure"); @@ -70,8 +75,8 @@ end local t_insert, t_remove, t_concat = table.insert, table.remove, table.concat; local os_time = os.time; -local sessions = {}; -local inactive_sessions = {}; -- Sessions which have no open requests +-- All sessions, and sessions that have no requests open +local sessions, inactive_sessions = module:shared("sessions", "inactive_sessions"); -- Used to respond to idle sessions (those with waiting requests) local waiting_requests = {}; @@ -243,24 +248,22 @@ function stream_callbacks.streamopened(context, attr) sid = new_uuid(); local session = { type = "c2s_unauthed", conn = {}, sid = sid, rid = tonumber(attr.rid)-1, host = attr.to, - bosh_version = attr.ver, bosh_wait = attr.wait, streamid = sid, + bosh_version = attr.ver, bosh_wait = math_min(attr.wait, bosh_max_wait), streamid = sid, bosh_hold = BOSH_DEFAULT_HOLD, bosh_max_inactive = BOSH_DEFAULT_INACTIVITY, requests = { }, send_buffer = {}, reset_stream = bosh_reset_stream, - close = bosh_close_stream, dispatch_stanza = core_process_stanza, + close = bosh_close_stream, dispatch_stanza = core_process_stanza, notopen = true, log = logger.init("bosh"..sid), secure = consider_bosh_secure or request.secure, ip = get_ip_from_request(request); }; sessions[sid] = session; + local filter = initialize_filters(session); + session.log("debug", "BOSH session created for request from %s", session.ip); log("info", "New BOSH session, assigned it sid '%s'", sid); -- Send creation response local creating_session = true; - local features = st.stanza("stream:features"); - hosts[session.host].events.fire_event("stream-features", { origin = session, features = features }); - fire_event("stream-features", session, features); - table.insert(session.send_buffer, tostring(features)); local r = session.requests; function session.send(s) @@ -269,6 +272,7 @@ function stream_callbacks.streamopened(context, attr) s = st.clone(s); s.attr.xmlns = "jabber:client"; end + s = filter("stanzas/out", s); --log("debug", "Sending BOSH data: %s", tostring(s)); t_insert(session.send_buffer, tostring(s)); @@ -282,14 +286,16 @@ function stream_callbacks.streamopened(context, attr) sid = sid; }; if creating_session then - body_attr.wait = attr.wait; + creating_session = nil; body_attr.inactivity = tostring(BOSH_DEFAULT_INACTIVITY); body_attr.polling = tostring(BOSH_DEFAULT_POLLING); body_attr.requests = tostring(BOSH_DEFAULT_REQUESTS); + body_attr.wait = tostring(session.bosh_wait); body_attr.hold = tostring(session.bosh_hold); body_attr.authid = sid; body_attr.secure = "true"; - body_attr.ver = '1.6'; from = session.host; + body_attr.ver = '1.6'; + body_attr.from = session.host; body_attr["xmlns:xmpp"] = "urn:xmpp:xbosh"; body_attr["xmpp:version"] = "1.0"; end @@ -299,7 +305,6 @@ function stream_callbacks.streamopened(context, attr) return true; end request.sid = sid; - return; end local session = sessions[sid]; @@ -319,7 +324,7 @@ function stream_callbacks.streamopened(context, attr) session.log("warn", "rid too large (means a request was lost). Last rid: %d New rid: %s", session.rid, attr.rid); elseif diff <= 0 then -- Repeated, ignore - session.log("debug", "rid repeated (on request %s), ignoring: %s (diff %d)", request.id, session.rid, diff); + session.log("debug", "rid repeated, ignoring: %s (diff %d)", session.rid, diff); context.notopen = nil; context.ignore = true; context.sid = sid; @@ -344,11 +349,12 @@ function stream_callbacks.streamopened(context, attr) local features = st.stanza("stream:features"); hosts[session.host].events.fire_event("stream-features", { origin = session, features = features }); fire_event("stream-features", session, features); - table.insert(session.send_buffer, tostring(features)); + session.send(tostring(features)); session.notopen = nil; end end +local function handleerr(err) log("error", "Traceback[bosh]: %s", traceback(tostring(err), 2)); end function stream_callbacks.handlestanza(context, stanza) if context.ignore then return; end log("debug", "BOSH stanza received: %s\n", stanza:top_tag()); @@ -357,7 +363,10 @@ function stream_callbacks.handlestanza(context, stanza) if stanza.attr.xmlns == xmlns_bosh then stanza.attr.xmlns = nil; end - core_process_stanza(session, stanza); + stanza = session.filter("stanzas/in", stanza); + if stanza then + return xpcall(function () return core_process_stanza(session, stanza) end, handleerr); + end end end