X-Git-Url: https://git.enpas.org/?a=blobdiff_plain;f=plugins%2Fmod_bosh.lua;h=a6b84367a392703956f2886d6589f1f2a944682d;hb=dbaf733912c1504f695c3671a5d2e253e826871e;hp=e07ffbaa6179d2a9ae3c44031a6497727d8ce205;hpb=f75d4ed8dfb82b4073997dceb5455d67243b06be;p=prosody.git diff --git a/plugins/mod_bosh.lua b/plugins/mod_bosh.lua index e07ffbaa..a6b84367 100644 --- a/plugins/mod_bosh.lua +++ b/plugins/mod_bosh.lua @@ -10,8 +10,7 @@ module.host = "*" -- Global module local hosts = _G.hosts; local lxp = require "lxp"; -local init_xmlhandlers = require "core.xmlhandlers" -local server = require "net.server"; +local new_xmpp_stream = require "util.xmppstream".new; local httpserver = require "net.httpserver"; local sm = require "core.sessionmanager"; local sm_destroy_session = sm.destroy_session; @@ -21,6 +20,7 @@ local core_process_stanza = core_process_stanza; local st = require "util.stanza"; local logger = require "util.logger"; local log = logger.init("mod_bosh"); +local timer = require "util.timer"; local xmlns_streams = "http://etherx.jabber.org/streams"; local xmlns_xmpp_streams = "urn:ietf:params:xml:ns:xmpp-streams"; @@ -33,7 +33,6 @@ local BOSH_DEFAULT_HOLD = tonumber(module:get_option("bosh_default_hold")) or 1; local BOSH_DEFAULT_INACTIVITY = tonumber(module:get_option("bosh_max_inactivity")) or 60; local BOSH_DEFAULT_POLLING = tonumber(module:get_option("bosh_max_polling")) or 5; local BOSH_DEFAULT_REQUESTS = tonumber(module:get_option("bosh_max_requests")) or 2; -local BOSH_DEFAULT_MAXPAUSE = tonumber(module:get_option("bosh_max_pause")) or 300; local consider_bosh_secure = module:get_option_boolean("consider_bosh_secure"); @@ -55,6 +54,22 @@ if cross_domain then end end +local trusted_proxies = module:get_option_set("trusted_proxies", {"127.0.0.1"})._items; + +local function get_ip_from_request(request) + local ip = request.handler:ip(); + local forwarded_for = request.headers["x-forwarded-for"]; + if forwarded_for then + forwarded_for = forwarded_for..", "..ip; + for forwarded_ip in forwarded_for:gmatch("[^%s,]+") do + if not trusted_proxies[forwarded_ip] then + ip = forwarded_ip; + end + end + end + return ip; +end + local t_insert, t_remove, t_concat = table.insert, table.remove, table.concat; local os_time = os.time; @@ -86,7 +101,10 @@ end function handle_request(method, body, request) if (not body) or request.method ~= "POST" then if request.method == "OPTIONS" then - return { headers = default_headers, body = "" }; + local headers = {}; + for k,v in pairs(default_headers) do headers[k] = v; end + headers["Content-Type"] = nil; + return { headers = headers, body = "" }; else return "You really don't look like a BOSH client to me... what do you want?"; end @@ -100,12 +118,19 @@ function handle_request(method, body, request) request.log = log; request.on_destroy = on_destroy_request; - local parser = lxp.new(init_xmlhandlers(request, stream_callbacks), "\1"); - - parser:parse(body); + local stream = new_xmpp_stream(request, stream_callbacks); + -- stream:feed() calls the stream_callbacks, so all stanzas in + -- the body are processed in this next line before it returns. + stream:feed(body); local session = sessions[request.sid]; if session then + -- Session was marked as inactive, since we have + -- a request open now, unmark it + if inactive_sessions[session] and #session.requests > 0 then + inactive_sessions[session] = nil; + end + local r = session.requests; log("debug", "Session %s has %d out of %d requests open", request.sid, #r, session.bosh_hold); log("debug", "and there are %d things in the send_buffer", #session.send_buffer); @@ -135,14 +160,15 @@ function handle_request(method, body, request) request.reply_before = os_time() + session.bosh_wait; waiting_requests[request] = true; end - if inactive_sessions[session] then - -- Session was marked as inactive, since we have - -- a request open now, unmark it - inactive_sessions[session] = nil; - end end - return true; -- Inform httpserver we shall reply later + if session.bosh_terminate then + session.log("debug", "Closing session with %d requests open", #session.requests); + session:close(); + return nil; + else + return true; -- Inform httpserver we shall reply later + end end end @@ -182,7 +208,6 @@ local function bosh_close_stream(session, reason) local session_close_response = { headers = default_headers, body = tostring(close_reply) }; - --FIXME: Quite sure we shouldn't reply to all requests with the error for _, held_request in ipairs(session.requests) do held_request:send(session_close_response); held_request:destroy(); @@ -202,9 +227,9 @@ function stream_callbacks.streamopened(request, attr) if not hosts[attr.to] then -- Unknown host log("debug", "BOSH client tried to connect to unknown host: %s", tostring(attr.to)); - session_close_reply.body.attr.condition = "host-unknown"; - request:send(session_close_reply); - request.notopen = nil + local close_reply = st.stanza("body", { xmlns = xmlns_bosh, type = "terminate", + ["xmlns:streams"] = xmlns_streams, condition = "host-unknown" }); + request:send(tostring(close_reply)); return; end @@ -216,10 +241,12 @@ function stream_callbacks.streamopened(request, attr) bosh_hold = BOSH_DEFAULT_HOLD, bosh_max_inactive = BOSH_DEFAULT_INACTIVITY, requests = { }, send_buffer = {}, reset_stream = bosh_reset_stream, close = bosh_close_stream, dispatch_stanza = core_process_stanza, - log = logger.init("bosh"..sid), secure = consider_bosh_secure or request.secure + log = logger.init("bosh"..sid), secure = consider_bosh_secure or request.secure, + ip = get_ip_from_request(request); }; sessions[sid] = session; + session.log("debug", "BOSH session created for request from %s", session.ip); log("info", "New BOSH session, assigned it sid '%s'", sid); local r, send_buffer = session.requests, session.send_buffer; local response = { headers = default_headers } @@ -233,7 +260,13 @@ function stream_callbacks.streamopened(request, attr) local oldest_request = r[1]; if oldest_request then log("debug", "We have an open request, so sending on that"); - response.body = t_concat{"", tostring(s), "" }; + response.body = t_concat({ + "", + tostring(s), + "" + }); oldest_request:send(response); --log("debug", "Sent"); if oldest_request.stayopen then @@ -252,6 +285,7 @@ function stream_callbacks.streamopened(request, attr) t_insert(session.send_buffer, tostring(s)); log("debug", "There are now %d things in the send_buffer", #session.send_buffer); end + return true; end -- Send creation response @@ -261,9 +295,17 @@ function stream_callbacks.streamopened(request, attr) fire_event("stream-features", session, features); --xmpp:version='1.0' xmlns:xmpp='urn:xmpp:xbosh' local response = st.stanza("body", { xmlns = xmlns_bosh, - inactivity = tostring(BOSH_DEFAULT_INACTIVITY), polling = tostring(BOSH_DEFAULT_POLLING), requests = tostring(BOSH_DEFAULT_REQUESTS), hold = tostring(session.bosh_hold), maxpause = "120", - sid = sid, authid = sid, ver = '1.6', from = session.host, secure = 'true', ["xmpp:version"] = "1.0", - ["xmlns:xmpp"] = "urn:xmpp:xbosh", ["xmlns:stream"] = "http://etherx.jabber.org/streams" }):add_child(features); + wait = attr.wait, + inactivity = tostring(BOSH_DEFAULT_INACTIVITY), + polling = tostring(BOSH_DEFAULT_POLLING), + requests = tostring(BOSH_DEFAULT_REQUESTS), + hold = tostring(session.bosh_hold), + sid = sid, authid = sid, + ver = '1.6', from = session.host, + secure = 'true', ["xmpp:version"] = "1.0", + ["xmlns:xmpp"] = "urn:xmpp:xbosh", + ["xmlns:stream"] = "http://etherx.jabber.org/streams" + }):add_child(features); request:send{ headers = default_headers, body = tostring(response) }; request.sid = sid; @@ -296,13 +338,6 @@ function stream_callbacks.streamopened(request, attr) session.rid = rid; end - if attr.type == "terminate" then - -- Client wants to end this session - session:close(); - request.notopen = nil; - return; - end - if session.notopen then local features = st.stanza("stream:features"); hosts[session.host].events.fire_event("stream-features", { origin = session, features = features }); @@ -311,6 +346,12 @@ function stream_callbacks.streamopened(request, attr) session.notopen = nil; end + if attr.type == "terminate" then + -- Client wants to end this session, which we'll do + -- after processing any stanzas in this request + session.bosh_terminate = true; + end + request.notopen = nil; -- Signals that we accept this opening tag t_insert(session.requests, request); request.sid = sid; @@ -324,7 +365,6 @@ function stream_callbacks.handlestanza(request, stanza) if stanza.attr.xmlns == xmlns_bosh then stanza.attr.xmlns = nil; end - session.ip = request.handler:ip(); core_process_stanza(session, stanza); end end @@ -381,13 +421,14 @@ function on_timer() dead_sessions[i] = nil; sm_destroy_session(session, "BOSH client silent for over "..session.bosh_max_inactive.." seconds"); end + return 1; end local function setup() local ports = module:get_option("bosh_ports") or { 5280 }; httpserver.new_from_config(ports, handle_request, { base = "http-bind" }); - server.addtimer(on_timer); + timer.add_task(1, on_timer); end if prosody.start_time then -- already started setup();