projects
/
prosody.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
mod_blocklist: Use util.cache to manage how many users blocklists are kept in memory
[prosody.git]
/
plugins
/
mod_bosh.lua
diff --git
a/plugins/mod_bosh.lua
b/plugins/mod_bosh.lua
index d8717d18a9075982bd21ebd21d56af3f40f49571..9e8354fec893ae7711806f1b6e1044006c34f75c 100644
(file)
--- a/
plugins/mod_bosh.lua
+++ b/
plugins/mod_bosh.lua
@@
-22,6
+22,7
@@
local initialize_filters = require "util.filters".initialize;
local math_min = math.min;
local xpcall, tostring, type = xpcall, tostring, type;
local traceback = debug.traceback;
local math_min = math.min;
local xpcall, tostring, type = xpcall, tostring, type;
local traceback = debug.traceback;
+local runner = require"util.async".runner;
local xmlns_streams = "http://etherx.jabber.org/streams";
local xmlns_xmpp_streams = "urn:ietf:params:xml:ns:xmpp-streams";
local xmlns_streams = "http://etherx.jabber.org/streams";
local xmlns_xmpp_streams = "urn:ietf:params:xml:ns:xmpp-streams";
@@
-78,7
+79,7
@@
function on_destroy_request(request)
break;
end
end
break;
end
end
-
+
-- If this session now has no requests open, mark it as inactive
local max_inactive = session.bosh_max_inactive;
if max_inactive and #requests == 0 then
-- If this session now has no requests open, mark it as inactive
local max_inactive = session.bosh_max_inactive;
if max_inactive and #requests == 0 then
@@
-121,7
+122,7
@@
function handle_POST(event)
if cross_domain and event.request.headers.origin then
set_cross_domain_headers(response);
end
if cross_domain and event.request.headers.origin then
set_cross_domain_headers(response);
end
-
+
-- stream:feed() calls the stream_callbacks, so all stanzas in
-- the body are processed in this next line before it returns.
-- In particular, the streamopened() stream callback is where
-- stream:feed() calls the stream_callbacks, so all stanzas in
-- the body are processed in this next line before it returns.
-- In particular, the streamopened() stream callback is where
@@
-131,7
+132,7
@@
function handle_POST(event)
module:log("warn", "Error parsing BOSH payload")
return 400;
end
module:log("warn", "Error parsing BOSH payload")
return 400;
end
-
+
-- Stanzas (if any) in the request have now been processed, and
-- we take care of the high-level BOSH logic here, including
-- giving a response or putting the request "on hold".
-- Stanzas (if any) in the request have now been processed, and
-- we take care of the high-level BOSH logic here, including
-- giving a response or putting the request "on hold".
@@
-164,7
+165,7
@@
function handle_POST(event)
session.send_buffer = {};
session.send(resp);
end
session.send_buffer = {};
session.send(resp);
end
-
+
if not response.finished then
-- We're keeping this request open, to respond later
log("debug", "Have nothing to say, so leaving request unanswered for now");
if not response.finished then
-- We're keeping this request open, to respond later
log("debug", "Have nothing to say, so leaving request unanswered for now");
@@
-172,7
+173,7
@@
function handle_POST(event)
waiting_requests[response] = os_time() + session.bosh_wait;
end
end
waiting_requests[response] = os_time() + session.bosh_wait;
end
end
-
+
if session.bosh_terminate then
session.log("debug", "Closing session with %d requests open", #session.requests);
session:close();
if session.bosh_terminate then
session.log("debug", "Closing session with %d requests open", #session.requests);
session:close();
@@
-192,10
+193,10
@@
local stream_xmlns_attr = { xmlns = "urn:ietf:params:xml:ns:xmpp-streams" };
local function bosh_close_stream(session, reason)
(session.log or log)("info", "BOSH client disconnected");
local function bosh_close_stream(session, reason)
(session.log or log)("info", "BOSH client disconnected");
-
+
local close_reply = st.stanza("body", { xmlns = xmlns_bosh, type = "terminate",
["xmlns:stream"] = xmlns_streams });
local close_reply = st.stanza("body", { xmlns = xmlns_bosh, type = "terminate",
["xmlns:stream"] = xmlns_streams });
-
+
if reason then
close_reply.attr.condition = "remote-stream-error";
if reason then
close_reply.attr.condition = "remote-stream-error";
@@
-228,6
+229,8
@@
local function bosh_close_stream(session, reason)
sm_destroy_session(session);
end
sm_destroy_session(session);
end
+local runner_callbacks = { };
+
-- Handle the <body> tag in the request payload.
function stream_callbacks.streamopened(context, attr)
local request, response = context.request, context.response;
-- Handle the <body> tag in the request payload.
function stream_callbacks.streamopened(context, attr)
local request, response = context.request, context.response;
@@
-236,7
+239,7
@@
function stream_callbacks.streamopened(context, attr)
if not sid then
-- New session request
context.notopen = nil; -- Signals that we accept this opening tag
if not sid then
-- New session request
context.notopen = nil; -- Signals that we accept this opening tag
-
+
-- TODO: Sanity checks here (rid, to, known host, etc.)
if not hosts[attr.to] then
-- Unknown host
-- TODO: Sanity checks here (rid, to, known host, etc.)
if not hosts[attr.to] then
-- Unknown host
@@
-246,7
+249,7
@@
function stream_callbacks.streamopened(context, attr)
response:send(tostring(close_reply));
return;
end
response:send(tostring(close_reply));
return;
end
-
+
-- New session
sid = new_uuid();
local session = {
-- New session
sid = new_uuid();
local session = {
@@
-259,9
+262,13
@@
function stream_callbacks.streamopened(context, attr)
ip = get_ip_from_request(request);
};
sessions[sid] = session;
ip = get_ip_from_request(request);
};
sessions[sid] = session;
-
+
+ session.thread = runner(function (stanza)
+ session:dispatch_stanza(stanza);
+ end, runner_callbacks, session);
+
local filter = initialize_filters(session);
local filter = initialize_filters(session);
-
+
session.log("debug", "BOSH session created for request from %s", session.ip);
log("info", "New BOSH session, assigned it sid '%s'", sid);
session.log("debug", "BOSH session created for request from %s", session.ip);
log("info", "New BOSH session, assigned it sid '%s'", sid);
@@
-308,7
+315,7
@@
function stream_callbacks.streamopened(context, attr)
end
request.sid = sid;
end
end
request.sid = sid;
end
-
+
local session = sessions[sid];
if not session then
-- Unknown sid
local session = sessions[sid];
if not session then
-- Unknown sid
@@
-317,7
+324,7
@@
function stream_callbacks.streamopened(context, attr)
context.notopen = nil;
return;
end
context.notopen = nil;
return;
end
-
+
if session.rid then
local rid = tonumber(attr.rid);
local diff = rid - session.rid;
if session.rid then
local rid = tonumber(attr.rid);
local diff = rid - session.rid;
@@
-334,7
+341,7
@@
function stream_callbacks.streamopened(context, attr)
end
session.rid = rid;
end
end
session.rid = rid;
end
-
+
if attr.type == "terminate" then
-- Client wants to end this session, which we'll do
-- after processing any stanzas in this request
if attr.type == "terminate" then
-- Client wants to end this session, which we'll do
-- after processing any stanzas in this request
@@
-349,13
+356,17
@@
function stream_callbacks.streamopened(context, attr)
if session.notopen then
local features = st.stanza("stream:features");
hosts[session.host].events.fire_event("stream-features", { origin = session, features = features });
if session.notopen then
local features = st.stanza("stream:features");
hosts[session.host].events.fire_event("stream-features", { origin = session, features = features });
- fire_event("stream-features", session, features);
session.send(features);
session.notopen = nil;
end
end
local function handleerr(err) log("error", "Traceback[bosh]: %s", traceback(tostring(err), 2)); end
session.send(features);
session.notopen = nil;
end
end
local function handleerr(err) log("error", "Traceback[bosh]: %s", traceback(tostring(err), 2)); end
+
+function runner_callbacks:error(err)
+ return handleerr(err);
+end
+
function stream_callbacks.handlestanza(context, stanza)
if context.ignore then return; end
log("debug", "BOSH stanza received: %s\n", stanza:top_tag());
function stream_callbacks.handlestanza(context, stanza)
if context.ignore then return; end
log("debug", "BOSH stanza received: %s\n", stanza:top_tag());
@@
-365,9
+376,7
@@
function stream_callbacks.handlestanza(context, stanza)
stanza.attr.xmlns = nil;
end
stanza = session.filter("stanzas/in", stanza);
stanza.attr.xmlns = nil;
end
stanza = session.filter("stanzas/in", stanza);
- if stanza then
- return xpcall(function () return core_process_stanza(session, stanza) end, handleerr);
- end
+ session.thread:run(stanza);
end
end
end
end
@@
-389,7
+398,7
@@
function stream_callbacks.error(context, error)
response:send();
return;
end
response:send();
return;
end
-
+
local session = sessions[context.sid];
if error == "stream-error" then -- Remote stream error, we close normally
session:close();
local session = sessions[context.sid];
if error == "stream-error" then -- Remote stream error, we close normally
session:close();
@@
-413,7
+422,7
@@
function on_timer()
end
end
end
end
end
end
-
+
now = now - 3;
local n_dead_sessions = 0;
for session, close_after in pairs(inactive_sessions) do
now = now - 3;
local n_dead_sessions = 0;
for session, close_after in pairs(inactive_sessions) do