X-Git-Url: https://git.enpas.org/?a=blobdiff_plain;f=plugins%2Fmod_bosh.lua;h=ca67db738c6c7f644862363e45e9741375f3d3c4;hb=e42e430aecc7911b56ea99f8bab4ecd4cbff40a9;hp=24dc3755cc3a301c58640d43639e7d0a05fc5e2b;hpb=2dd2707cf32ebe48ac5af9fcabd1dfd9e4d216c8;p=prosody.git diff --git a/plugins/mod_bosh.lua b/plugins/mod_bosh.lua index 24dc3755..ca67db73 100644 --- a/plugins/mod_bosh.lua +++ b/plugins/mod_bosh.lua @@ -1,7 +1,7 @@ -- Prosody IM -- Copyright (C) 2008-2010 Matthew Wild -- Copyright (C) 2008-2010 Waqas Hussain --- +-- -- This project is MIT/X11 licensed. Please see the -- COPYING file in the source package for more information. -- @@ -14,10 +14,14 @@ local sm = require "core.sessionmanager"; local sm_destroy_session = sm.destroy_session; local new_uuid = require "util.uuid".generate; local fire_event = prosody.events.fire_event; -local core_process_stanza = core_process_stanza; +local core_process_stanza = prosody.core_process_stanza; local st = require "util.stanza"; local logger = require "util.logger"; local log = logger.init("mod_bosh"); +local initialize_filters = require "util.filters".initialize; +local math_min = math.min; +local xpcall, tostring, type = xpcall, tostring, type; +local traceback = debug.traceback; local xmlns_streams = "http://etherx.jabber.org/streams"; local xmlns_xmpp_streams = "urn:ietf:params:xml:ns:xmpp-streams"; @@ -30,33 +34,19 @@ local BOSH_DEFAULT_HOLD = module:get_option_number("bosh_default_hold", 1); local BOSH_DEFAULT_INACTIVITY = module:get_option_number("bosh_max_inactivity", 60); local BOSH_DEFAULT_POLLING = module:get_option_number("bosh_max_polling", 5); local BOSH_DEFAULT_REQUESTS = module:get_option_number("bosh_max_requests", 2); +local bosh_max_wait = module:get_option_number("bosh_max_wait", 120); local consider_bosh_secure = module:get_option_boolean("consider_bosh_secure"); -local auto_cork = module:get_option_boolean("bosh_auto_cork", false); - -local default_headers = { ["Content-Type"] = "text/xml; charset=utf-8" }; - local cross_domain = module:get_option("cross_domain_bosh", false); -if cross_domain then - default_headers["Access-Control-Allow-Methods"] = "GET, POST, OPTIONS"; - default_headers["Access-Control-Allow-Headers"] = "Content-Type"; - default_headers["Access-Control-Max-Age"] = "7200"; - - if cross_domain == true then - default_headers["Access-Control-Allow-Origin"] = "*"; - elseif type(cross_domain) == "table" then - cross_domain = table.concat(cross_domain, ", "); - end - if type(cross_domain) == "string" then - default_headers["Access-Control-Allow-Origin"] = cross_domain; - end -end + +if cross_domain == true then cross_domain = "*"; end +if type(cross_domain) == "table" then cross_domain = table.concat(cross_domain, ", "); end local trusted_proxies = module:get_option_set("trusted_proxies", {"127.0.0.1"})._items; local function get_ip_from_request(request) local ip = request.conn:ip(); - local forwarded_for = request.headers["x-forwarded-for"]; + local forwarded_for = request.headers.x_forwarded_for; if forwarded_for then forwarded_for = forwarded_for..", "..ip; for forwarded_ip in forwarded_for:gmatch("[^%s,]+") do @@ -71,11 +61,11 @@ end local t_insert, t_remove, t_concat = table.insert, table.remove, table.concat; local os_time = os.time; -local sessions = {}; -local inactive_sessions = {}; -- Sessions which have no open requests +-- All sessions, and sessions that have no requests open +local sessions, inactive_sessions = module:shared("sessions", "inactive_sessions"); -- Used to respond to idle sessions (those with waiting requests) -local waiting_requests = {}; +local waiting_requests = module:shared("waiting_requests"); function on_destroy_request(request) log("debug", "Request destroyed: %s", tostring(request)); waiting_requests[request] = nil; @@ -88,7 +78,7 @@ function on_destroy_request(request) break; end end - + -- If this session now has no requests open, mark it as inactive local max_inactive = session.bosh_max_inactive; if max_inactive and #requests == 0 then @@ -98,18 +88,20 @@ function on_destroy_request(request) end end -local function handle_GET(request) - return [[ -

It works! Now point your BOSH client to this URL to connect to Prosody.

-

For more information see Prosody: Setting up BOSH.

-]]; +local function set_cross_domain_headers(response) + local headers = response.headers; + headers.access_control_allow_methods = "GET, POST, OPTIONS"; + headers.access_control_allow_headers = "Content-Type"; + headers.access_control_max_age = "7200"; + headers.access_control_allow_origin = cross_domain; + return response; end -function handle_OPTIONS(request) - local headers = {}; - for k,v in pairs(default_headers) do headers[k] = v; end - headers["Content-Type"] = nil; - return { headers = headers, body = "" }; +function handle_OPTIONS(event) + if cross_domain and event.request.headers.origin then + set_cross_domain_headers(event.response); + end + return ""; end function handle_POST(event) @@ -122,14 +114,24 @@ function handle_POST(event) local context = { request = request, response = response, notopen = true }; local stream = new_xmpp_stream(context, stream_callbacks); response.context = context; - + + local headers = response.headers; + headers.content_type = "text/xml; charset=utf-8"; + + if cross_domain and event.request.headers.origin then + set_cross_domain_headers(response); + end + -- stream:feed() calls the stream_callbacks, so all stanzas in -- the body are processed in this next line before it returns. -- In particular, the streamopened() stream callback is where -- much of the session logic happens, because it's where we first -- get to see the 'sid' of this request. - stream:feed(body); - + if not stream:feed(body) then + module:log("warn", "Error parsing BOSH payload") + return 400; + end + -- Stanzas (if any) in the request have now been processed, and -- we take care of the high-level BOSH logic here, including -- giving a response or putting the request "on hold". @@ -144,9 +146,6 @@ function handle_POST(event) local r = session.requests; log("debug", "Session %s has %d out of %d requests open", context.sid, #r, session.bosh_hold); log("debug", "and there are %d things in the send_buffer:", #session.send_buffer); - for i, thing in ipairs(session.send_buffer) do - log("debug", " %s", tostring(thing)); - end if #r > session.bosh_hold then -- We are holding too many requests, send what's in the buffer, log("debug", "We are holding too many requests, so..."); @@ -165,7 +164,7 @@ function handle_POST(event) session.send_buffer = {}; session.send(resp); end - + if not response.finished then -- We're keeping this request open, to respond later log("debug", "Have nothing to say, so leaving request unanswered for now"); @@ -173,7 +172,7 @@ function handle_POST(event) waiting_requests[response] = os_time() + session.bosh_wait; end end - + if session.bosh_terminate then session.log("debug", "Closing session with %d requests open", #session.requests); session:close(); @@ -182,6 +181,8 @@ function handle_POST(event) return true; -- Inform http server we shall reply later end end + module:log("warn", "Unable to associate request with a session (incomplete request?)"); + return 400; end @@ -191,10 +192,10 @@ local stream_xmlns_attr = { xmlns = "urn:ietf:params:xml:ns:xmpp-streams" }; local function bosh_close_stream(session, reason) (session.log or log)("info", "BOSH client disconnected"); - + local close_reply = st.stanza("body", { xmlns = xmlns_bosh, type = "terminate", ["xmlns:stream"] = xmlns_streams }); - + if reason then close_reply.attr.condition = "remote-stream-error"; @@ -220,10 +221,9 @@ local function bosh_close_stream(session, reason) local response_body = tostring(close_reply); for _, held_request in ipairs(session.requests) do - held_request.headers = default_headers; held_request:send(response_body); end - sessions[session.sid] = nil; + sessions[session.sid] = nil; inactive_sessions[session] = nil; sm_destroy_session(session); end @@ -236,7 +236,7 @@ function stream_callbacks.streamopened(context, attr) if not sid then -- New session request context.notopen = nil; -- Signals that we accept this opening tag - + -- TODO: Sanity checks here (rid, to, known host, etc.) if not hosts[attr.to] then -- Unknown host @@ -246,22 +246,28 @@ function stream_callbacks.streamopened(context, attr) response:send(tostring(close_reply)); return; end - + -- New session sid = new_uuid(); local session = { - type = "c2s_unauthed", conn = {}, sid = sid, rid = tonumber(attr.rid), host = attr.to, - bosh_version = attr.ver, bosh_wait = attr.wait, streamid = sid, + type = "c2s_unauthed", conn = {}, sid = sid, rid = tonumber(attr.rid)-1, host = attr.to, + bosh_version = attr.ver, bosh_wait = math_min(attr.wait, bosh_max_wait), streamid = sid, bosh_hold = BOSH_DEFAULT_HOLD, bosh_max_inactive = BOSH_DEFAULT_INACTIVITY, requests = { }, send_buffer = {}, reset_stream = bosh_reset_stream, - close = bosh_close_stream, dispatch_stanza = core_process_stanza, + close = bosh_close_stream, dispatch_stanza = core_process_stanza, notopen = true, log = logger.init("bosh"..sid), secure = consider_bosh_secure or request.secure, ip = get_ip_from_request(request); }; sessions[sid] = session; - + + local filter = initialize_filters(session); + session.log("debug", "BOSH session created for request from %s", session.ip); log("info", "New BOSH session, assigned it sid '%s'", sid); + + -- Send creation response + local creating_session = true; + local r = session.requests; function session.send(s) -- We need to ensure that outgoing stanzas have the jabber:client xmlns @@ -269,62 +275,49 @@ function stream_callbacks.streamopened(context, attr) s = st.clone(s); s.attr.xmlns = "jabber:client"; end + s = filter("stanzas/out", s); --log("debug", "Sending BOSH data: %s", tostring(s)); + t_insert(session.send_buffer, tostring(s)); + local oldest_request = r[1]; - if oldest_request and (not(auto_cork) or waiting_requests[oldest_request]) then + if oldest_request and not session.bosh_processing then log("debug", "We have an open request, so sending on that"); - oldest_request.headers = default_headers; - oldest_request:send(t_concat({ - "", - tostring(s), - "" - })); - elseif s ~= "" then - log("debug", "Saved to send buffer because there are %d open requests", #r); - -- Hmm, no requests are open :( - t_insert(session.send_buffer, tostring(s)); - log("debug", "There are now %d things in the send_buffer", #session.send_buffer); + local body_attr = { xmlns = "http://jabber.org/protocol/httpbind", + ["xmlns:stream"] = "http://etherx.jabber.org/streams"; + type = session.bosh_terminate and "terminate" or nil; + sid = sid; + }; + if creating_session then + creating_session = nil; + body_attr.inactivity = tostring(BOSH_DEFAULT_INACTIVITY); + body_attr.polling = tostring(BOSH_DEFAULT_POLLING); + body_attr.requests = tostring(BOSH_DEFAULT_REQUESTS); + body_attr.wait = tostring(session.bosh_wait); + body_attr.hold = tostring(session.bosh_hold); + body_attr.authid = sid; + body_attr.secure = "true"; + body_attr.ver = '1.6'; + body_attr.from = session.host; + body_attr["xmlns:xmpp"] = "urn:xmpp:xbosh"; + body_attr["xmpp:version"] = "1.0"; + end + oldest_request:send(st.stanza("body", body_attr):top_tag()..t_concat(session.send_buffer)..""); + session.send_buffer = {}; end return true; end - - -- Send creation response - - local features = st.stanza("stream:features"); - hosts[session.host].events.fire_event("stream-features", { origin = session, features = features }); - fire_event("stream-features", session, features); - --xmpp:version='1.0' xmlns:xmpp='urn:xmpp:xbosh' - local body = st.stanza("body", { xmlns = xmlns_bosh, - wait = attr.wait, - inactivity = tostring(BOSH_DEFAULT_INACTIVITY), - polling = tostring(BOSH_DEFAULT_POLLING), - requests = tostring(BOSH_DEFAULT_REQUESTS), - hold = tostring(session.bosh_hold), - sid = sid, authid = sid, - ver = '1.6', from = session.host, - secure = 'true', ["xmpp:version"] = "1.0", - ["xmlns:xmpp"] = "urn:xmpp:xbosh", - ["xmlns:stream"] = "http://etherx.jabber.org/streams" - }):add_child(features); - response.headers = default_headers; - response:send(tostring(body)); - request.sid = sid; - return; end - + local session = sessions[sid]; if not session then -- Unknown sid log("info", "Client tried to use sid '%s' which we don't know about", sid); - response.headers = default_headers; response:send(tostring(st.stanza("body", { xmlns = xmlns_bosh, type = "terminate", condition = "item-not-found" }))); context.notopen = nil; return; end - + if session.rid then local rid = tonumber(attr.rid); local diff = rid - session.rid; @@ -332,7 +325,7 @@ function stream_callbacks.streamopened(context, attr) session.log("warn", "rid too large (means a request was lost). Last rid: %d New rid: %s", session.rid, attr.rid); elseif diff <= 0 then -- Repeated, ignore - session.log("debug", "rid repeated (on request %s), ignoring: %s (diff %d)", request.id, session.rid, diff); + session.log("debug", "rid repeated, ignoring: %s (diff %d)", session.rid, diff); context.notopen = nil; context.ignore = true; context.sid = sid; @@ -341,7 +334,7 @@ function stream_callbacks.streamopened(context, attr) end session.rid = rid; end - + if attr.type == "terminate" then -- Client wants to end this session, which we'll do -- after processing any stanzas in this request @@ -351,16 +344,17 @@ function stream_callbacks.streamopened(context, attr) context.notopen = nil; -- Signals that we accept this opening tag t_insert(session.requests, response); context.sid = sid; + session.bosh_processing = true; -- Used to suppress replies until processing of this request is done if session.notopen then local features = st.stanza("stream:features"); hosts[session.host].events.fire_event("stream-features", { origin = session, features = features }); - fire_event("stream-features", session, features); session.send(features); session.notopen = nil; end end +local function handleerr(err) log("error", "Traceback[bosh]: %s", traceback(tostring(err), 2)); end function stream_callbacks.handlestanza(context, stanza) if context.ignore then return; end log("debug", "BOSH stanza received: %s\n", stanza:top_tag()); @@ -369,7 +363,20 @@ function stream_callbacks.handlestanza(context, stanza) if stanza.attr.xmlns == xmlns_bosh then stanza.attr.xmlns = nil; end - core_process_stanza(session, stanza); + stanza = session.filter("stanzas/in", stanza); + if stanza then + return xpcall(function () return core_process_stanza(session, stanza) end, handleerr); + end + end +end + +function stream_callbacks.streamclosed(context) + local session = sessions[context.sid]; + if session then + session.bosh_processing = false; + if #session.send_buffer > 0 then + session.send(""); + end end end @@ -377,12 +384,11 @@ function stream_callbacks.error(context, error) log("debug", "Error parsing BOSH request payload; %s", error); if not context.sid then local response = context.response; - response.headers = default_headers; response.status_code = 400; response:send(); return; end - + local session = sessions[context.sid]; if error == "stream-error" then -- Remote stream error, we close normally session:close(); @@ -391,7 +397,7 @@ function stream_callbacks.error(context, error) end end -local dead_sessions = {}; +local dead_sessions = module:shared("dead_sessions"); function on_timer() -- log("debug", "Checking for requests soon to timeout..."); -- Identify requests timing out within the next few seconds @@ -406,7 +412,7 @@ function on_timer() end end end - + now = now - 3; local n_dead_sessions = 0; for session, close_after in pairs(inactive_sessions) do @@ -428,13 +434,24 @@ function on_timer() end module:add_timer(1, on_timer); + +local GET_response = { + headers = { + content_type = "text/html"; + }; + body = [[ +

It works! Now point your BOSH client to this URL to connect to Prosody.

+

For more information see Prosody: Setting up BOSH.

+ ]]; +}; + function module.add_host(module) module:depends("http"); module:provides("http", { default_path = "/http-bind"; route = { - ["GET"] = handle_GET; - ["GET /"] = handle_GET; + ["GET"] = GET_response; + ["GET /"] = GET_response; ["OPTIONS"] = handle_OPTIONS; ["OPTIONS /"] = handle_OPTIONS; ["POST"] = handle_POST;