local st = require "util.stanza";
local logger = require "util.logger";
local log = logger.init("mod_bosh");
+local initialize_filters = require "util.filters".initialize;
+local math_min = math.min;
+local xpcall, tostring, type = xpcall, tostring, type;
+local traceback = debug.traceback;
local xmlns_streams = "http://etherx.jabber.org/streams";
local xmlns_xmpp_streams = "urn:ietf:params:xml:ns:xmpp-streams";
local BOSH_DEFAULT_INACTIVITY = module:get_option_number("bosh_max_inactivity", 60);
local BOSH_DEFAULT_POLLING = module:get_option_number("bosh_max_polling", 5);
local BOSH_DEFAULT_REQUESTS = module:get_option_number("bosh_max_requests", 2);
+local bosh_max_wait = module:get_option_number("bosh_max_wait", 120);
local consider_bosh_secure = module:get_option_boolean("consider_bosh_secure");
sid = new_uuid();
local session = {
type = "c2s_unauthed", conn = {}, sid = sid, rid = tonumber(attr.rid)-1, host = attr.to,
- bosh_version = attr.ver, bosh_wait = attr.wait, streamid = sid,
+ bosh_version = attr.ver, bosh_wait = math_min(attr.wait, bosh_max_wait), streamid = sid,
bosh_hold = BOSH_DEFAULT_HOLD, bosh_max_inactive = BOSH_DEFAULT_INACTIVITY,
requests = { }, send_buffer = {}, reset_stream = bosh_reset_stream,
close = bosh_close_stream, dispatch_stanza = core_process_stanza, notopen = true,
};
sessions[sid] = session;
+ local filter = initialize_filters(session);
+
session.log("debug", "BOSH session created for request from %s", session.ip);
log("info", "New BOSH session, assigned it sid '%s'", sid);
s = st.clone(s);
s.attr.xmlns = "jabber:client";
end
+ s = filter("stanzas/out", s);
--log("debug", "Sending BOSH data: %s", tostring(s));
t_insert(session.send_buffer, tostring(s));
sid = sid;
};
if creating_session then
- body_attr.wait = attr.wait;
+ creating_session = nil;
body_attr.inactivity = tostring(BOSH_DEFAULT_INACTIVITY);
body_attr.polling = tostring(BOSH_DEFAULT_POLLING);
body_attr.requests = tostring(BOSH_DEFAULT_REQUESTS);
+ body_attr.wait = tostring(session.bosh_wait);
body_attr.hold = tostring(session.bosh_hold);
body_attr.authid = sid;
body_attr.secure = "true";
- body_attr.ver = '1.6'; from = session.host;
+ body_attr.ver = '1.6';
+ body_attr.from = session.host;
body_attr["xmlns:xmpp"] = "urn:xmpp:xbosh";
body_attr["xmpp:version"] = "1.0";
end
session.log("warn", "rid too large (means a request was lost). Last rid: %d New rid: %s", session.rid, attr.rid);
elseif diff <= 0 then
-- Repeated, ignore
- session.log("debug", "rid repeated (on request %s), ignoring: %s (diff %d)", request.id, session.rid, diff);
+ session.log("debug", "rid repeated, ignoring: %s (diff %d)", session.rid, diff);
context.notopen = nil;
context.ignore = true;
context.sid = sid;
local features = st.stanza("stream:features");
hosts[session.host].events.fire_event("stream-features", { origin = session, features = features });
fire_event("stream-features", session, features);
- table.insert(session.send_buffer, tostring(features));
+ session.send(tostring(features));
session.notopen = nil;
end
end
+local function handleerr(err) log("error", "Traceback[bosh]: %s", traceback(tostring(err), 2)); end
function stream_callbacks.handlestanza(context, stanza)
if context.ignore then return; end
log("debug", "BOSH stanza received: %s\n", stanza:top_tag());
if stanza.attr.xmlns == xmlns_bosh then
stanza.attr.xmlns = nil;
end
- core_process_stanza(session, stanza);
+ stanza = session.filter("stanzas/in", stanza);
+ if stanza then
+ return xpcall(function () return core_process_stanza(session, stanza) end, handleerr);
+ end
end
end