-- Prosody IM
-- Copyright (C) 2008-2010 Matthew Wild
-- Copyright (C) 2008-2010 Waqas Hussain
---
+--
-- This project is MIT/X11 licensed. Please see the
-- COPYING file in the source package for more information.
--
if req.query then
t_insert(request_line, 4, "?"..req.query);
end
-
+
conn:write(t_concat(request_line));
local t = { [2] = ": ", [4] = "\r\n" };
for k, v in pairs(req.headers) do
conn:write(t_concat(t));
end
conn:write("\r\n");
-
+
if req.body then
conn:write(req.body);
end
requests[conn] = nil;
end
+ function listener.ondetach(conn)
+ requests[conn] = nil;
+ end
+
local function request_reader(request, data, err)
if not request.parser then
local function error_cb(reason)
end
destroy_request(request);
end
-
+
if not data then
error_cb(err);
return;
end
-
+
local function success_cb(r)
if request.callback then
request.callback(r.body, r.code, r, request);
local function handleerr(err) log("error", "Traceback[http]: %s", traceback(tostring(err), 2)); end
function request(u, ex, callback)
local req = url.parse(u);
-
+
if not (req and req.host) then
callback(nil, 0, req);
return nil, "invalid-url";
end
-
+
if not req.path then
req.path = "/";
end
-
+
local method, headers, body;
-
+
local host, port = req.host, req.port;
local host_header = host;
if (port == "80" and req.scheme == "http")
["Host"] = host_header;
["User-Agent"] = "Prosody XMPP Server";
};
-
+
if req.userinfo then
headers["Authorization"] = "Basic "..b64(req.userinfo);
end
end
end
end
-
+
-- Attach to request object
req.method, req.headers, req.body = method, headers, body;
-
+
local using_https = req.scheme == "https";
if using_https and not ssl_available then
error("SSL not available, unable to contact https URL");
end
local port_number = port and tonumber(port) or (using_https and 443 or 80);
-
+
-- Connect the socket, and wrap it with net.server
local conn = socket.tcp();
conn:settimeout(10);
callback(nil, 0, req);
return nil, err;
end
-
+
local sslctx = false;
if using_https then
sslctx = ex and ex.sslctx or { mode = "client", protocol = "sslv23", options = { "no_sslv2" } };
req.handler, req.conn = assert(server.wrapclient(conn, host, port_number, listener, "*a", sslctx));
req.write = function (...) return req.handler:write(...); end
-
+
req.callback = function (content, code, request, response) log("debug", "Calling callback, status %s", code or "---"); return select(2, xpcall(function () return callback(content, code, request, response) end, handleerr)); end
req.reader = request_reader;
req.state = "status";
sessions[conn] = nil;
end
+ function listener.ondetach(conn)
+ sessions[conn] = nil;
+ end
+
function listener.onincoming(conn, data)
sessions[conn]:feed(data);
end
persistent = persistent;
conn = conn;
send = _M.send_response;
+ done = _M.finish_response;
finish_cb = finish_cb;
};
conn._http_open_response = response;
err_code, err = 400, "Missing or invalid 'Host' header";
end
end
-
+
if err then
response.status_code = err_code;
response:send(events.fire_event("http-error", { code = err_code, message = err }));
response.status_code = 404;
response:send(events.fire_event("http-error", { code = 404 }));
end
-function _M.send_response(response, body)
- if response.finished then return; end
- response.finished = true;
- response.conn._http_open_response = nil;
-
+local function prepare_header(response)
local status_line = "HTTP/"..response.request.httpversion.." "..(response.status or codes[response.status_code]);
local headers = response.headers;
- body = body or response.body or "";
- headers.content_length = #body;
-
local output = { status_line };
for k,v in pairs(headers) do
t_insert(output, headerfix[k]..v);
end
t_insert(output, "\r\n\r\n");
+ return output;
+end
+_M.prepare_header = prepare_header;
+function _M.send_response(response, body)
+ if response.finished then return; end
+ body = body or response.body or "";
+ response.headers.content_length = #body;
+ local output = prepare_header(response);
t_insert(output, body);
-
response.conn:write(t_concat(output));
+ response:done();
+end
+function _M.finish_response(response)
+ if response.finished then return; end
+ response.finished = true;
+ response.conn._http_open_response = nil;
if response.on_destroy then
response:on_destroy();
response.on_destroy = nil;
local interface_mt
do
interface_mt = {}; interface_mt.__index = interface_mt;
-
+
local addevent = base.addevent
local coroutine_wrap, coroutine_yield = coroutine.wrap,coroutine.yield
-
+
-- Private methods
function interface_mt:_position(new_position)
self.position = new_position or self.position
function interface_mt:_close()
return self:_destroy();
end
-
+
function interface_mt:_start_connection(plainssl) -- should be called from addclient
local callback = function( event )
if EV_TIMEOUT == event then -- timeout during connection
interfacelist( "delete", self )
return true
end
-
+
function interface_mt:_lock(nointerface, noreading, nowriting) -- lock or unlock this interface or events
self.nointerface, self.noreading, self.nowriting = nointerface, noreading, nowriting
return nointerface, noreading, nowriting
end
-
+
--TODO: Deprecate
function interface_mt:lock_read(switch)
if switch then
end
return self._connections
end
-
+
-- Public methods
function interface_mt:write(data)
if self.nowriting then return nil, "locked" end
return true
end
end
-
+
function interface_mt:socket()
return self.conn
end
-
+
function interface_mt:server()
return self._server or self;
end
-
+
function interface_mt:port()
return self._port
end
-
+
function interface_mt:serverport()
return self._serverport
end
-
+
function interface_mt:ip()
return self._ip
end
-
+
function interface_mt:ssl()
return self._usingssl
end
function interface_mt:type()
return self._type or "client"
end
-
+
function interface_mt:connections()
return self._connections
end
-
+
function interface_mt:address()
return self.addr
end
-
+
function interface_mt:set_sslctx(sslctx)
self._sslctx = sslctx;
if sslctx then
end
return self._pattern;
end
-
+
function interface_mt:set_send(new_send)
-- No-op, we always use the underlying connection's send
end
-
+
function interface_mt:starttls(sslctx, call_onconnect)
debug( "try to start ssl at client id:", self.id )
local err
self.starttls = false;
return true
end
-
+
function interface_mt:setoption(option, value)
if self.conn.setoption then
return self.conn:setoption(option, value);
end
return false, "setoption not implemented";
end
-
+
function interface_mt:setlistener(listener)
- self.onconnect, self.ondisconnect, self.onincoming, self.ontimeout, self.onreadtimeout, self.onstatus
- = listener.onconnect, listener.ondisconnect, listener.onincoming,
- listener.ontimeout, listener.onreadtimeout, listener.onstatus;
+ self:ondetach(); -- Notify listener that it is no longer responsible for this connection
- self.onconnect, self.ondisconnect, self.onincoming,
- self.ontimeout, self.onstatus, self.ondetach
- = listener.onconnect, listener.ondisconnect, listener.onincoming,
- listener.ontimeout, listener.onstatus, listener.ondetach;
++ self.onconnect, self.ondisconnect, self.onincoming, self.ontimeout,
++ self.onreadtimeout, self.onstatus, self.ondetach
++ = listener.onconnect, listener.ondisconnect, listener.onincoming, listener.ontimeout,
++ listener.onreadtimeout, listener.onstatus, listener.ondetach;
end
-
+
-- Stub handlers
function interface_mt:onconnect()
end
end
function interface_mt:ontimeout()
end
+ function interface_mt:onreadtimeout()
+ self.fatalerror = "timeout during receiving"
+ debug( "connection failed:", self.fatalerror )
+ self:_close()
+ self.eventread = nil
+ end
function interface_mt:ondrain()
end
+ function interface_mt:ondetach()
+ end
function interface_mt:onstatus()
end
end
ondisconnect = listener.ondisconnect; -- will be called when client disconnects
onincoming = listener.onincoming; -- will be called when client sends data
ontimeout = listener.ontimeout; -- called when fatal socket timeout occurs
+ onreadtimeout = listener.onreadtimeout; -- called when socket inactivity timeout occurs
ondrain = listener.ondrain; -- called when writebuffer is empty
+ ondetach = listener.ondetach; -- called when disassociating this listener from this connection
onstatus = listener.onstatus; -- called for status changes (e.g. of SSL/TLS)
eventread = false, eventwrite = false, eventclose = false,
eventhandshake = false, eventstarthandshake = false; -- event handler
noreading = false, nowriting = false; -- locks of the read/writecallback
startsslcallback = false; -- starting handshake callback
position = false; -- position of client in interfacelist
-
+
-- Properties
_ip = ip, _port = port, _server = server, _pattern = pattern,
_serverport = (server and server:port() or nil),
end
end
end
-
+
interface.readcallback = function( event ) -- called on read events
--vdebug( "new client read event, id/ip/port:", tostring(interface.id), tostring(ip), tostring(port) )
if interface.noreading or interface.fatalerror then -- leave this event
interface.eventread = nil
return -1
end
- if EV_TIMEOUT == event then -- took too long to get some data from client -> disconnect
- interface.fatalerror = "timeout during receiving"
- debug( "connection failed:", interface.fatalerror )
+ if EV_TIMEOUT == event and interface:onreadtimeout() ~= true then
+ return -1 -- took too long to get some data from client -> disconnect
+ end
+ if interface._usingssl then -- handle luasec
+ if interface.eventwritetimeout then -- ok, in the past writecallback was regged
+ local ret = interface.writecallback( ) -- call it
+ --vdebug( "tried to write in readcallback, result:", tostring(ret) )
+ end
+ if interface.eventreadtimeout then
+ interface.eventreadtimeout:close( )
+ interface.eventreadtimeout = nil
+ end
+ end
+ local buffer, err, part = interface.conn:receive( interface._pattern ) -- receive buffer with "pattern"
+ --vdebug( "read data:", tostring(buffer), "error:", tostring(err), "part:", tostring(part) )
+ buffer = buffer or part
+ if buffer and #buffer > cfg.MAX_READ_LENGTH then -- check buffer length
+ interface.fatalerror = "receive buffer exceeded"
+ debug( "fatal error:", interface.fatalerror )
interface:_close()
interface.eventread = nil
return -1
- else -- can read
- if interface._usingssl then -- handle luasec
- if interface.eventwritetimeout then -- ok, in the past writecallback was regged
- local ret = interface.writecallback( ) -- call it
- --vdebug( "tried to write in readcallback, result:", tostring(ret) )
- end
- if interface.eventreadtimeout then
- interface.eventreadtimeout:close( )
- interface.eventreadtimeout = nil
+ end
+ if err and ( err ~= "timeout" and err ~= "wantread" ) then
+ if "wantwrite" == err then -- need to read on write event
+ if not interface.eventwrite then -- register new write event if needed
+ interface.eventwrite = addevent( base, interface.conn, EV_WRITE, interface.writecallback, cfg.WRITE_TIMEOUT )
end
- end
- local buffer, err, part = interface.conn:receive( interface._pattern ) -- receive buffer with "pattern"
- --vdebug( "read data:", tostring(buffer), "error:", tostring(err), "part:", tostring(part) )
- buffer = buffer or part
- if buffer and #buffer > cfg.MAX_READ_LENGTH then -- check buffer length
- interface.fatalerror = "receive buffer exceeded"
- debug( "fatal error:", interface.fatalerror )
+ interface.eventreadtimeout = addevent( base, nil, EV_TIMEOUT,
+ function( )
+ interface:_close()
+ end, cfg.READ_TIMEOUT
+ )
+ debug( "wantwrite during read attempt, reg it in writecallback but dont know what really happens next..." )
+ -- to be honest i dont know what happens next, if it is allowed to first read, the write etc...
+ else -- connection was closed or fatal error
+ interface.fatalerror = err
+ debug( "connection failed in read event:", interface.fatalerror )
interface:_close()
interface.eventread = nil
return -1
end
- if err and ( err ~= "timeout" and err ~= "wantread" ) then
- if "wantwrite" == err then -- need to read on write event
- if not interface.eventwrite then -- register new write event if needed
- interface.eventwrite = addevent( base, interface.conn, EV_WRITE, interface.writecallback, cfg.WRITE_TIMEOUT )
- end
- interface.eventreadtimeout = addevent( base, nil, EV_TIMEOUT,
- function( )
- interface:_close()
- end, cfg.READ_TIMEOUT
- )
- debug( "wantwrite during read attempt, reg it in writecallback but dont know what really happens next..." )
- -- to be honest i dont know what happens next, if it is allowed to first read, the write etc...
- else -- connection was closed or fatal error
- interface.fatalerror = err
- debug( "connection failed in read event:", interface.fatalerror )
- interface:_close()
- interface.eventread = nil
- return -1
- end
- else
- interface.onincoming( interface, buffer, err ) -- send new data to listener
- end
- if interface.noreading then
- interface.eventread = nil;
- return -1;
- end
- return EV_READ, cfg.READ_TIMEOUT
+ else
+ interface.onincoming( interface, buffer, err ) -- send new data to listener
end
+ if interface.noreading then
+ interface.eventread = nil;
+ return -1;
+ end
+ return EV_READ, cfg.READ_TIMEOUT
end
client:settimeout( 0 ) -- set non blocking
debug "creating server interface..."
local interface = {
_connections = 0;
-
+
conn = server;
onconnect = listener.onconnect; -- will be called when new client connected
eventread = false; -- read event handler
readcallback = false; -- read event callback
fatalerror = false; -- error message
nointerface = true; -- lock/unlock parameter
-
+
_ip = addr, _port = port, _pattern = pattern,
_sslctx = sslctx;
}
clientinterface:_start_session( true )
end
debug( "accepted incoming client connection from:", client_ip or "<unknown IP>", client_port or "<unknown port>", "to", port or "<unknown port>");
-
+
client, err = server:accept() -- try to accept again
end
return EV_READ
end
-
+
server:settimeout( 0 )
setmetatable(interface, interface_mt)
interfacelist( "add", interface )
return interface, client
--function handleclient( client, ip, port, server, pattern, listener, _, sslctx ) -- creates an client interface
end
-
+
function addclient( addr, serverport, listener, pattern, localaddr, localport, sslcfg, startssl )
local client, err = socket.tcp() -- creating new socket
if not client then
local function link(sender, receiver, buffersize)
local sender_locked;
-
+
function receiver:ondrain()
if sender_locked then
sender:resume();
sender_locked = nil;
end
end
-
+
function sender:onincoming(data)
receiver:write(data);
if receiver.writebufferlen >= buffersize then
---
+--
-- server.lua by blastbeat of the luadch project
-- Re-used here under the MIT/X Consortium License
---
+--
-- Modifications (C) 2008-2010 Matthew Wild, Waqas Hussain
--
_maxsendlen = 51000 * 1024 -- max len of send buffer
_maxreadlen = 25000 * 1024 -- max len of read buffer
-_checkinterval = 1200000 -- interval in secs to check idle clients
+_checkinterval = 30 -- interval in secs to check idle clients
_sendtimeout = 60000 -- allowed send idle time in secs
_readtimeout = 6 * 60 * 60 -- allowed read idle time in secs
local status = listeners.onstatus
local disconnect = listeners.ondisconnect
local drain = listeners.ondrain
+ local onreadtimeout = listeners.onreadtimeout;
+ local detach = listeners.ondetach
local bufferqueue = { } -- buffer array
local bufferqueuelen = 0 -- end of buffer array
handler.disconnect = function( )
return disconnect
end
+ handler.onreadtimeout = onreadtimeout;
+
handler.setlistener = function( self, listeners )
+ if detach then
+ detach(self) -- Notify listener that it is no longer responsible for this connection
+ end
dispatch = listeners.onincoming
disconnect = listeners.ondisconnect
status = listeners.onstatus
drain = listeners.ondrain
+ handler.onreadtimeout = listeners.onreadtimeout
+ detach = listeners.ondetach
end
handler.getstats = function( )
return readtraffic, sendtraffic
shutdown = id
_socketlist[ socket ] = handler
_readlistlen = addsocket(_readlist, socket, _readlistlen)
-
+
-- remove traces of the old socket
_readlistlen = removesocket( _readlist, oldsocket, _readlistlen )
_sendlistlen = removesocket( _sendlist, oldsocket, _sendlistlen )
sender_locked = nil;
end
end
-
+
local _readbuffer = sender.readbuffer;
function sender.readbuffer()
_readbuffer();
_starttime = _currenttime
for handler, timestamp in pairs( _writetimes ) do
if os_difftime( _currenttime - timestamp ) > _sendtimeout then
- --_writetimes[ handler ] = nil
handler.disconnect( )( handler, "send timeout" )
handler:force_close() -- forced disconnect
end
end
for handler, timestamp in pairs( _readtimes ) do
if os_difftime( _currenttime - timestamp ) > _readtimeout then
- --_readtimes[ handler ] = nil
- handler.disconnect( )( handler, "read timeout" )
- handler:close( ) -- forced disconnect?
+ if not(handler.onreadtimeout) or handler:onreadtimeout() ~= true then
+ handler.disconnect( )( handler, "read timeout" )
+ handler:close( ) -- forced disconnect?
+ end
end
end
end
client:settimeout( 0 )
_, err = client:connect( address, port )
if err then -- try again
- local handler = wrapclient( client, address, port, listeners )
+ return wrapclient( client, address, port, listeners, pattern, sslctx )
else
- wrapconnection( nil, listeners, client, address, port, "clientport", pattern, sslctx )
+ return wrapconnection( nil, listeners, client, address, port, "clientport", pattern, sslctx )
end
end
addclient = addclient,
wrapclient = wrapclient,
-
+
loop = loop,
link = link,
step = step,
-- Prosody IM
-- Copyright (C) 2008-2010 Matthew Wild
-- Copyright (C) 2008-2010 Waqas Hussain
---
+--
-- This project is MIT/X11 licensed. Please see the
-- COPYING file in the source package for more information.
--
local prosody = _G.prosody;
local hosts = prosody.hosts;
-local incoming_s2s = prosody.incoming_s2s;
local console_listener = { default_port = 5582; default_mode = "*a"; interface = "127.0.0.1" };
disconnect = function () conn:close(); end;
};
session.env = setmetatable({}, default_env_mt);
-
+
-- Load up environment with helper objects
for name, t in pairs(def_env) do
if type(t) == "table" then
session.env[name] = setmetatable({ session = session }, { __index = t });
end
end
-
+
return session;
end
function console:process_line(session, line)
local useglobalenv;
-
+
if line:match("^>") then
line = line:gsub("^>", "");
useglobalenv = true;
return;
end
end
-
+
session.env._ = line;
-
+
local chunkname = "=console";
local env = (useglobalenv and redirect_output(_G, session)) or session.env or nil
local chunk, err = envload("return "..line, chunkname, env);
return;
end
end
-
+
local ranok, taskok, message = pcall(chunk);
-
+
if not (ranok or message or useglobalenv) and commands[line:lower()] then
commands[line:lower()](session, line);
return;
end
-
+
if not ranok then
session.print("Fatal error while running command, it did not complete");
session.print("Error: "..taskok);
return;
end
-
+
if not message then
session.print("Result: "..tostring(taskok));
return;
session.print("Message: "..tostring(message));
return;
end
-
+
session.print("OK: "..tostring(message));
end
session.partial_data = data:match("[^\n]+$");
end
+function console_listener.onreadtimeout(conn)
+ local session = sessions[conn];
+ if session then
+ session.send("\0");
+ return true;
+ end
+end
+
function console_listener.ondisconnect(conn, err)
local session = sessions[conn];
if session then
end
end
+ function console_listener.ondetach(conn)
+ sessions[conn] = nil;
+ end
+
-- Console commands --
-- These are simple commands, not valid standalone in Lua
print [[c2s:show(jid) - Show all client sessions with the specified JID (or all if no JID given)]]
print [[c2s:show_insecure() - Show all unencrypted client connections]]
print [[c2s:show_secure() - Show all encrypted client connections]]
+ print [[c2s:show_tls() - Show TLS cipher info for encrypted sessions]]
print [[c2s:close(jid) - Close all sessions for the specified JID]]
elseif section == "s2s" then
print [[s2s:show(domain) - Show all s2s connections for the given domain (or all if no domain given)]]
+ print [[s2s:show_tls(domain) - Show TLS cipher info for encrypted sessions]]
print [[s2s:close(from, to) - Close a connection from one domain to another]]
print [[s2s:closeall(host) - Close all the incoming/outgoing s2s sessions to specified host]]
elseif section == "module" then
function def_env.module:load(name, hosts, config)
local mm = require "modulemanager";
-
+
hosts = get_hosts_set(hosts);
-
+
-- Load the module for each host
local ok, err, count, mod = true, nil, 0, nil;
for host in hosts do
end
end
end
-
- return ok, (ok and "Module loaded onto "..count.." host"..(count ~= 1 and "s" or "")) or ("Last error: "..tostring(err));
+
+ return ok, (ok and "Module loaded onto "..count.." host"..(count ~= 1 and "s" or "")) or ("Last error: "..tostring(err));
end
function def_env.module:unload(name, hosts)
local mm = require "modulemanager";
hosts = get_hosts_set(hosts, name);
-
+
-- Unload the module for each host
local ok, err, count = true, nil, 0;
for host in hosts do
if type(hosts) ~= "table" then
return false, "Please supply a host or a list of hosts you would like to see";
end
-
+
local print = self.session.print;
for _, host in ipairs(hosts) do
print((host == "*" and "Global" or host)..":");
return ok, (ok and "Config reloaded (you may need to reload modules to take effect)") or tostring(err);
end
-def_env.hosts = {};
-function def_env.hosts:list()
- for host, host_session in pairs(hosts) do
- self.session.print(host);
+local function common_info(session, line)
+ if session.id then
+ line[#line+1] = "["..session.id.."]"
+ else
+ line[#line+1] = "["..session.type..(tostring(session):match("%x*$")).."]"
end
- return true, "Done";
end
-function def_env.hosts:add(name)
+local function session_flags(session, line)
+ line = line or {};
+ common_info(session, line);
+ if session.type == "c2s" then
+ local status, priority = "unavailable", tostring(session.priority or "-");
+ if session.presence then
+ status = session.presence:get_child_text("show") or "available";
+ end
+ line[#line+1] = status.."("..priority..")";
+ end
+ if session.cert_identity_status == "valid" then
+ line[#line+1] = "(authenticated)";
+ end
+ if session.secure then
+ line[#line+1] = "(encrypted)";
+ end
+ if session.compressed then
+ line[#line+1] = "(compressed)";
+ end
+ if session.smacks then
+ line[#line+1] = "(sm)";
+ end
+ if session.ip and session.ip:match(":") then
+ line[#line+1] = "(IPv6)";
+ end
+ return table.concat(line, " ");
+end
+
+local function tls_info(session, line)
+ line = line or {};
+ common_info(session, line);
+ if session.secure then
+ local sock = session.conn and session.conn.socket and session.conn:socket();
+ if sock and sock.info then
+ local info = sock:info();
+ line[#line+1] = ("(%s with %s)"):format(info.protocol, info.cipher);
+ else
+ line[#line+1] = "(cipher info unavailable)";
+ end
+ else
+ line[#line+1] = "(insecure)";
+ end
+ return table.concat(line, " ");
end
def_env.c2s = {};
show_c2s(function (jid, session)
if (not match_jid) or jid:match(match_jid) then
count = count + 1;
- end
+ end
end);
return true, "Total: "..count.." clients";
end
-function def_env.c2s:show(match_jid)
+function def_env.c2s:show(match_jid, annotate)
local print, count = self.session.print, 0;
+ annotate = annotate or session_flags;
local curr_host;
show_c2s(function (jid, session)
if curr_host ~= session.host then
end
if (not match_jid) or jid:match(match_jid) then
count = count + 1;
- local status, priority = "unavailable", tostring(session.priority or "-");
- if session.presence then
- status = session.presence:child_with_name("show");
- if status then
- status = status:get_text() or "[invalid!]";
- else
- status = "available";
- end
- end
- print(" "..jid.." - "..status.."("..priority..")");
- end
+ print(annotate(session, { " ", jid }));
+ end
end);
return true, "Total: "..count.." clients";
end
if ((not match_jid) or jid:match(match_jid)) and not session.secure then
count = count + 1;
print(jid);
- end
+ end
end);
return true, "Total: "..count.." insecure client connections";
end
if ((not match_jid) or jid:match(match_jid)) and session.secure then
count = count + 1;
print(jid);
- end
+ end
end);
return true, "Total: "..count.." secure client connections";
end
+function def_env.c2s:show_tls(match_jid)
+ return self:show(match_jid, tls_info);
+end
+
function def_env.c2s:close(match_jid)
local count = 0;
show_c2s(function (jid, session)
return true, "Total: "..count.." sessions closed";
end
-local function session_flags(session, line)
- if session.cert_identity_status == "valid" then
- line[#line+1] = "(secure)";
- elseif session.secure then
- line[#line+1] = "(encrypted)";
- end
- if session.compressed then
- line[#line+1] = "(compressed)";
- end
- if session.smacks then
- line[#line+1] = "(sm)";
- end
- if session.conn and session.conn:ip():match(":") then
- line[#line+1] = "(IPv6)";
- end
- return table.concat(line, " ");
-end
def_env.s2s = {};
-function def_env.s2s:show(match_jid)
- local _print = self.session.print;
+function def_env.s2s:show(match_jid, annotate)
local print = self.session.print;
-
+ annotate = annotate or session_flags;
+
local count_in, count_out = 0,0;
-
- for host, host_session in pairs(hosts) do
- print = function (...) _print(host); _print(...); print = _print; end
- for remotehost, session in pairs(host_session.s2sout) do
- if (not match_jid) or remotehost:match(match_jid) or host:match(match_jid) then
- count_out = count_out + 1;
- print(session_flags(session, {" ", host, "->", remotehost}));
- if session.sendq then
- print(" There are "..#session.sendq.." queued outgoing stanzas for this connection");
- end
- if session.type == "s2sout_unauthed" then
- if session.connecting then
- print(" Connection not yet established");
- if not session.srv_hosts then
- if not session.conn then
- print(" We do not yet have a DNS answer for this host's SRV records");
- else
- print(" This host has no SRV records, using A record instead");
- end
- elseif session.srv_choice then
- print(" We are on SRV record "..session.srv_choice.." of "..#session.srv_hosts);
- local srv_choice = session.srv_hosts[session.srv_choice];
- print(" Using "..(srv_choice.target or ".")..":"..(srv_choice.port or 5269));
+ local s2s_list = { };
+
+ local s2s_sessions = module:shared"/*/s2s/sessions";
+ for _, session in pairs(s2s_sessions) do
+ local remotehost, localhost, direction;
+ if session.direction == "outgoing" then
+ direction = "->";
+ count_out = count_out + 1;
+ remotehost, localhost = session.to_host or "?", session.from_host or "?";
+ else
+ direction = "<-";
+ count_in = count_in + 1;
+ remotehost, localhost = session.from_host or "?", session.to_host or "?";
+ end
+ local sess_lines = { l = localhost, r = remotehost,
+ annotate(session, { "", direction, remotehost or "?" })};
+
+ if (not match_jid) or remotehost:match(match_jid) or localhost:match(match_jid) then
+ table.insert(s2s_list, sess_lines);
+ local print = function (s) table.insert(sess_lines, " "..s); end
+ if session.sendq then
+ print("There are "..#session.sendq.." queued outgoing stanzas for this connection");
+ end
+ if session.type == "s2sout_unauthed" then
+ if session.connecting then
+ print("Connection not yet established");
+ if not session.srv_hosts then
+ if not session.conn then
+ print("We do not yet have a DNS answer for this host's SRV records");
+ else
+ print("This host has no SRV records, using A record instead");
end
- elseif session.notopen then
- print(" The <stream> has not yet been opened");
- elseif not session.dialback_key then
- print(" Dialback has not been initiated yet");
- elseif session.dialback_key then
- print(" Dialback has been requested, but no result received");
+ elseif session.srv_choice then
+ print("We are on SRV record "..session.srv_choice.." of "..#session.srv_hosts);
+ local srv_choice = session.srv_hosts[session.srv_choice];
+ print("Using "..(srv_choice.target or ".")..":"..(srv_choice.port or 5269));
end
+ elseif session.notopen then
+ print("The <stream> has not yet been opened");
+ elseif not session.dialback_key then
+ print("Dialback has not been initiated yet");
+ elseif session.dialback_key then
+ print("Dialback has been requested, but no result received");
end
end
- end
- local subhost_filter = function (h)
- return (match_jid and h:match(match_jid));
- end
- for session in pairs(incoming_s2s) do
- if session.to_host == host and ((not match_jid) or host:match(match_jid)
- or (session.from_host and session.from_host:match(match_jid))
- -- Pft! is what I say to list comprehensions
- or (session.hosts and #array.collect(keys(session.hosts)):filter(subhost_filter)>0)) then
- count_in = count_in + 1;
- print(session_flags(session, {" ", host, "<-", session.from_host or "(unknown)"}));
- if session.type == "s2sin_unauthed" then
- print(" Connection not yet authenticated");
- end
+ if session.type == "s2sin_unauthed" then
+ print("Connection not yet authenticated");
+ elseif session.type == "s2sin" then
for name in pairs(session.hosts) do
if name ~= session.from_host then
- print(" also hosts "..tostring(name));
+ print("also hosts "..tostring(name));
end
end
end
end
-
- print = _print;
end
-
- for session in pairs(incoming_s2s) do
- if not session.to_host and ((not match_jid) or session.from_host and session.from_host:match(match_jid)) then
- count_in = count_in + 1;
- print("Other incoming s2s connections");
- print(" (unknown) <- "..(session.from_host or "(unknown)"));
- end
+
+ -- Sort by local host, then remote host
+ table.sort(s2s_list, function(a,b)
+ if a.l == b.l then return a.r < b.r; end
+ return a.l < b.l;
+ end);
+ local lasthost;
+ for _, sess_lines in ipairs(s2s_list) do
+ if sess_lines.l ~= lasthost then print(sess_lines.l); lasthost=sess_lines.l end
+ for _, line in ipairs(sess_lines) do print(line); end
end
-
return true, "Total: "..count_out.." outgoing, "..count_in.." incoming connections";
end
+function def_env.s2s:show_tls(match_jid)
+ return self:show(match_jid, tls_info);
+end
+
local function print_subject(print, subject)
for _, entry in ipairs(subject) do
print(
function def_env.s2s:showcert(domain)
local ser = require "util.serialization".serialize;
local print = self.session.print;
- local domain_sessions = set.new(array.collect(keys(incoming_s2s)))
- /function(session) return session.from_host == domain and session or nil; end;
- for local_host in values(prosody.hosts) do
- local s2sout = local_host.s2sout;
- if s2sout and s2sout[domain] then
- domain_sessions:add(s2sout[domain]);
- end
- end
+ local s2s_sessions = module:shared"/*/s2s/sessions";
+ local domain_sessions = set.new(array.collect(values(s2s_sessions)))
+ /function(session) return (session.to_host == domain or session.from_host == domain) and session or nil; end;
local cert_set = {};
for session in domain_sessions do
local conn = session.conn;
local domain_certs = array.collect(values(cert_set));
-- Phew. We now have a array of unique certificates presented by domain.
local n_certs = #domain_certs;
-
+
if n_certs == 0 then
return "No certificates found for "..domain;
end
-
+
local function _capitalize_and_colon(byte)
return string.upper(byte)..":";
end
local function pretty_fingerprint(hash)
return hash:gsub("..", _capitalize_and_colon):sub(1, -2);
end
-
+
for cert_info in values(domain_certs) do
local certs = cert_info.certs;
local cert = certs[1];
function def_env.s2s:close(from, to)
local print, count = self.session.print, 0;
-
- if not (from and to) then
+ local s2s_sessions = module:shared"/*/s2s/sessions";
+
+ local match_id;
+ if from and not to then
+ match_id, from = from;
+ elseif not to then
return false, "Syntax: s2s:close('from', 'to') - Closes all s2s sessions from 'from' to 'to'";
elseif from == to then
return false, "Both from and to are the same... you can't do that :)";
end
-
- if hosts[from] and not hosts[to] then
- -- Is an outgoing connection
- local session = hosts[from].s2sout[to];
- if not session then
- print("No outgoing connection from "..from.." to "..to)
- else
+
+ for _, session in pairs(s2s_sessions) do
+ local id = session.type..tostring(session):match("[a-f0-9]+$");
+ if (match_id and match_id == id)
+ or (session.from_host == from and session.to_host == to) then
+ print(("Closing connection from %s to %s [%s]"):format(session.from_host, session.to_host, id));
(session.close or s2smanager.destroy_session)(session);
- count = count + 1;
- print("Closed outgoing session from "..from.." to "..to);
+ count = count + 1 ;
end
- elseif hosts[to] and not hosts[from] then
- -- Is an incoming connection
- for session in pairs(incoming_s2s) do
- if session.to_host == to and session.from_host == from then
- (session.close or s2smanager.destroy_session)(session);
- count = count + 1;
end
- end
-
- if count == 0 then
- print("No incoming connections from "..from.." to "..to);
- else
- print("Closed "..count.." incoming session"..((count == 1 and "") or "s").." from "..from.." to "..to);
- end
- elseif hosts[to] and hosts[from] then
- return false, "Both of the hostnames you specified are local, there are no s2s sessions to close";
- else
- return false, "Neither of the hostnames you specified are being used on this server";
- end
-
return true, "Closed "..count.." s2s session"..((count == 1 and "") or "s");
end
function def_env.s2s:closeall(host)
local count = 0;
-
- if not host or type(host) ~= "string" then return false, "wrong syntax: please use s2s:closeall('hostname.tld')"; end
- if hosts[host] then
- for session in pairs(incoming_s2s) do
- if session.to_host == host then
- (session.close or s2smanager.destroy_session)(session);
+ local s2s_sessions = module:shared"/*/s2s/sessions";
+ for _,session in pairs(s2s_sessions) do
+ if not host or session.from_host == host or session.to_host == host then
+ session:close();
count = count + 1;
end
end
- for _, session in pairs(hosts[host].s2sout) do
- (session.close or s2smanager.destroy_session)(session);
- count = count + 1;
- end
- else
- for session in pairs(incoming_s2s) do
- if session.from_host == host then
- (session.close or s2smanager.destroy_session)(session);
- count = count + 1;
- end
- end
- for _, h in pairs(hosts) do
- if h.s2sout[host] then
- (h.s2sout[host].close or s2smanager.destroy_session)(h.s2sout[host]);
- count = count + 1;
- end
- end
- end
-
if count == 0 then return false, "No sessions to close.";
else return true, "Closed "..count.." s2s session"..((count == 1 and "") or "s"); end
end
function def_env.host:list()
local print = self.session.print;
local i = 0;
+ local type;
for host in values(array.collect(keys(prosody.hosts)):sort()) do
i = i + 1;
- print(host);
+ type = hosts[host].type;
+ if type == "local" then
+ print(host);
+ else
+ type = module:context(host):get_option_string("component_module", type);
+ if type ~= "component" then
+ type = type .. " component";
+ end
+ print(("%s (%s)"):format(host, type));
+ end
end
return true, i.." hosts";
end
return setmetatable({ room = room_obj }, console_room_mt);
end
+function def_env.muc:list(host)
+ local host_session = hosts[host];
+ if not host_session or not host_session.modules.muc then
+ return nil, "Please supply the address of a local MUC component";
+ end
+ local c = 0;
+ for name in keys(host_session.modules.muc.rooms) do
+ print(name);
+ c = c + 1;
+ end
+ return true, c.." rooms";
+end
+
local um = require"core.usermanager";
def_env.user = {};
local option = module:get_option("console_banner");
if option == nil or option == "full" or option == "graphic" then
session.print [[
- ____ \ / _
- | _ \ _ __ ___ ___ _-_ __| |_ _
+ ____ \ / _
+ | _ \ _ __ ___ ___ _-_ __| |_ _
| |_) | '__/ _ \/ __|/ _ \ / _` | | | |
| __/| | | (_) \__ \ |_| | (_| | |_| |
|_| |_| \___/|___/\___/ \__,_|\__, |
- A study in simplicity |___/
+ A study in simplicity |___/
]]
end
-- Prosody IM
-- Copyright (C) 2008-2010 Matthew Wild
-- Copyright (C) 2008-2010 Waqas Hussain
---
+--
-- This project is MIT/X11 licensed. Please see the
-- COPYING file in the source package for more information.
--
local st = require "util.stanza";
local sm_new_session, sm_destroy_session = sessionmanager.new_session, sessionmanager.destroy_session;
local uuid_generate = require "util.uuid".generate;
+local runner = require "util.async".runner;
local xpcall, tostring, type = xpcall, tostring, type;
-local traceback = debug.traceback;
+local t_insert, t_remove = table.insert, table.remove;
local xmlns_xmpp_streams = "urn:ietf:params:xml:ns:xmpp-streams";
local core_process_stanza = prosody.core_process_stanza;
local hosts = prosody.hosts;
-local stream_callbacks = { default_ns = "jabber:client", handlestanza = core_process_stanza };
+local stream_callbacks = { default_ns = "jabber:client" };
local listener = {};
+local runner_callbacks = {};
--- Stream events handlers
local stream_xmlns_attr = {xmlns='urn:ietf:params:xml:ns:xmpp-streams'};
-local default_stream_attr = { ["xmlns:stream"] = "http://etherx.jabber.org/streams", xmlns = stream_callbacks.default_ns, version = "1.0", id = "" };
function stream_callbacks.streamopened(session, attr)
local send = session.send;
session.streamid = uuid_generate();
(session.log or session)("debug", "Client sent opening <stream:stream> to %s", session.host);
- if not hosts[session.host] or not hosts[session.host].users then
+ if not hosts[session.host] or not hosts[session.host].modules.c2s then
-- We don't serve this host...
session:close{ condition = "host-unknown", text = "This server does not serve "..tostring(session.host)};
return;
end
- send("<?xml version='1.0'?>"..st.stanza("stream:stream", {
- xmlns = 'jabber:client', ["xmlns:stream"] = 'http://etherx.jabber.org/streams';
- id = session.streamid, from = session.host, version = '1.0', ["xml:lang"] = 'en' }):top_tag());
+ session:open_stream();
(session.log or log)("debug", "Sent reply <stream:stream> to client");
session.notopen = nil;
-- since we now have a new stream header, session is secured
if session.secure == false then
session.secure = true;
+ session.encrypted = true;
- -- Check if TLS compression is used
local sock = session.conn:socket();
if sock.info then
- session.compressed = sock:info"compression";
- elseif sock.compression then
- session.compressed = sock:compression(); --COMPAT mw/luasec-hg
+ local info = sock:info();
+ (session.log or log)("info", "Stream encrypted (%s with %s)", info.protocol, info.cipher);
+ session.compressed = info.compression;
+ else
+ (session.log or log)("info", "Stream encrypted");
+ session.compressed = sock.compression and sock:compression(); --COMPAT mw/luasec-hg
end
end
local features = st.stanza("stream:features");
hosts[session.host].events.fire_event("stream-features", { origin = session, features = features });
- module:fire_event("stream-features", session, features);
-
send(features);
end
end
end
-local function handleerr(err) log("error", "Traceback[c2s]: %s", traceback(tostring(err), 2)); end
function stream_callbacks.handlestanza(session, stanza)
stanza = session.filter("stanzas/in", stanza);
- if stanza then
- return xpcall(function () return core_process_stanza(session, stanza) end, handleerr);
- end
+ session.thread:run(stanza);
end
--- Session methods
local log = session.log or log;
if session.conn then
if session.notopen then
- session.send("<?xml version='1.0'?>");
- session.send(st.stanza("stream:stream", default_stream_attr):top_tag());
+ session:open_stream();
end
if reason then -- nil == no err, initiated by us, false == initiated by client
local stream_error = st.stanza("stream:error");
log("debug", "Disconnecting client, <stream:error> is: %s", stream_error);
session.send(stream_error);
end
-
+
session.send("</stream:stream>");
function session.send() return false; end
-
+
local reason = (reason and (reason.name or reason.text or reason.condition)) or reason;
- session.log("info", "c2s stream for %s closed: %s", session.full_jid or ("<"..session.ip..">"), reason or "session closed");
+ session.log("debug", "c2s stream for %s closed: %s", session.full_jid or ("<"..session.ip..">"), reason or "session closed");
-- Authenticated incoming stream may still be sending us stanzas, so wait for </stream:stream> from remote
local conn = session.conn;
end
end, 200);
+function runner_callbacks:ready()
+ self.data.conn:resume();
+end
+
+function runner_callbacks:waiting()
+ self.data.conn:pause();
+end
+
+function runner_callbacks:error(err)
+ (self.data.log or log)("error", "Traceback[c2s]: %s", err);
+end
+
--- Port listener
function listener.onconnect(conn)
local session = sm_new_session(conn);
sessions[conn] = session;
-
+
session.log("info", "Client connected");
-
+
-- Client is using legacy SSL (otherwise mod_tls sets this flag)
if conn:ssl() then
session.secure = true;
+ session.encrypted = true;
-- Check if TLS compression is used
local sock = conn:socket();
session.compressed = sock:compression(); --COMPAT mw/luasec-hg
end
end
-
+
if opt_keepalives then
conn:setoption("keepalive", opt_keepalives);
end
-
+
session.close = session_close;
-
+
local stream = new_xmpp_stream(session, stream_callbacks);
session.stream = stream;
session.notopen = true;
-
+
function session.reset_stream()
session.notopen = true;
session.stream:reset();
end
-
+
+ session.thread = runner(function (stanza)
+ core_process_stanza(session, stanza);
+ end, runner_callbacks, session);
+
local filter = session.filter;
function session.data(data)
- data = filter("bytes/in", data);
- if data then
- local ok, err = stream:feed(data);
+ -- Parse the data, which will store stanzas in session.pending_stanzas
+ if data then
- if ok then return; end
- log("debug", "Received invalid XML (%s) %d bytes: %s", tostring(err), #data, data:sub(1, 300):gsub("[\r\n]+", " "):gsub("[%z\1-\31]", "_"));
- session:close("not-well-formed");
+ data = filter("bytes/in", data);
+ if data then
+ local ok, err = stream:feed(data);
+ if not ok then
+ log("debug", "Received invalid XML (%s) %d bytes: %s", tostring(err), #data, data:sub(1, 300):gsub("[\r\n]+", " "):gsub("[%z\1-\31]", "_"));
+ session:close("not-well-formed");
+ end
+ end
end
end
-
if c2s_timeout then
add_task(c2s_timeout, function ()
if session.type == "c2s_unauthed" then
end
end
+function listener.onreadtimeout(conn)
+ local session = sessions[conn];
+ if session then
+ return (hosts[session.host] or prosody).events.fire_event("c2s-read-timeout", { session = session });
+ end
+end
+
+local function keepalive(event)
+ return event.session.send(' ');
+end
+
function listener.associate_session(conn, session)
sessions[conn] = session;
end
-function listener.ondetach(conn)
- sessions[conn] = nil;
+function module.add_host(module)
+ module:hook("c2s-read-timeout", keepalive, -1);
end
+module:hook("c2s-read-timeout", keepalive, -1);
+
module:hook("server-stopping", function(event)
local reason = event.reason;
for _, session in pairs(sessions) do
-- Prosody IM
-- Copyright (C) 2008-2010 Matthew Wild
-- Copyright (C) 2008-2010 Waqas Hussain
---
+--
-- This project is MIT/X11 licensed. Please see the
-- COPYING file in the source package for more information.
--
if module:get_host_type() ~= "component" then
error("Don't load mod_component manually, it should be for a component, please see http://prosody.im/doc/components", 0);
end
-
+
local env = module.environment;
env.connected = false;
send = nil;
session.on_destroy = nil;
end
-
+
-- Handle authentication attempts by component
local function handle_component_auth(event)
local session, stanza = event.origin, event.stanza;
-
+
if session.type ~= "component_unauthed" then return; end
-
+
if (not session.host) or #stanza.tags > 0 then
(session.log or log)("warn", "Invalid component handshake for host: %s", session.host);
session:close("not-authorized");
return true;
end
-
+
local secret = module:get_option("component_secret");
if not secret then
(session.log or log)("warn", "Component attempted to identify as %s, but component_secret is not set", session.host);
session:close("not-authorized");
return true;
end
-
+
local supplied_token = t_concat(stanza);
local calculated_token = sha1(session.streamid..secret, true);
if supplied_token:lower() ~= calculated_token:lower() then
session:close{ condition = "not-authorized", text = "Given token does not match calculated token" };
return true;
end
-
+
if env.connected then
module:log("error", "Second component attempted to connect, denying connection");
session:close{ condition = "conflict", text = "Component already connected" };
return true;
end
-
+
env.connected = true;
send = session.send;
session.on_destroy = on_destroy;
session.type = "component";
module:log("info", "External component successfully authenticated");
session.send(st.stanza("handshake"));
-
+
return true;
end
module:hook("stanza/jabber:component:accept:handshake", handle_component_auth, -1);
end
return true;
end
-
+
module:hook("iq/bare", handle_stanza, -1);
module:hook("message/bare", handle_stanza, -1);
module:hook("presence/bare", handle_stanza, -1);
session.streamid = uuid_gen();
session.notopen = nil;
-- Return stream header
- session.send("<?xml version='1.0'?>");
- session.send(st.stanza("stream:stream", { xmlns=xmlns_component,
- ["xmlns:stream"]='http://etherx.jabber.org/streams', id=session.streamid, from=session.host }):top_tag());
+ session:open_stream();
end
function stream_callbacks.streamclosed(session)
if opt_keepalives then
conn:setoption("keepalive", opt_keepalives);
end
-
+
session.log("info", "Incoming Jabber component connection");
-
+
local stream = new_xmpp_stream(session, stream_callbacks);
session.stream = stream;
-
+
session.notopen = true;
-
+
function session.reset_stream()
session.notopen = true;
session.stream:reset();
module:log("debug", "Received invalid XML (%s) %d bytes: %s", tostring(err), #data, data:sub(1, 300):gsub("[\r\n]+", " "):gsub("[%z\1-\31]", "_"));
session:close("not-well-formed");
end
-
+
session.dispatch_stanza = stream_callbacks.handlestanza;
sessions[conn] = session;
end
end
+ function listener.ondetach(conn)
+ sessions[conn] = nil;
+ end
+
module:provides("net", {
name = "component";
private = true;
-- Prosody IM
-- Copyright (C) 2008-2010 Matthew Wild
-- Copyright (C) 2008-2010 Waqas Hussain
---
+--
-- This project is MIT/X11 licensed. Please see the
-- COPYING file in the source package for more information.
--
local tostring, type = tostring, type;
local t_insert = table.insert;
local xpcall, traceback = xpcall, debug.traceback;
-local NULL = {};
local add_task = require "util.timer".add_task;
local st = require "util.stanza";
local s2s_new_outgoing = require "core.s2smanager".new_outgoing;
local s2s_destroy_session = require "core.s2smanager".destroy_session;
local uuid_gen = require "util.uuid".generate;
-local cert_verify_identity = require "util.x509".verify_identity;
local fire_global_event = prosody.events.fire_event;
local s2sout = module:require("s2sout");
return true;
end
+local function keepalive(event)
+ return event.session.sends2s(' ');
+end
+
+module:hook("s2s-read-timeout", keepalive, -1);
+
function module.add_host(module)
if module:get_option_boolean("disallow_s2s", false) then
module:log("warn", "The 'disallow_s2s' config option is deprecated, please see http://prosody.im/doc/s2s#disabling");
module:hook("route/remote", route_to_existing_session, -1);
module:hook("route/remote", route_to_new_session, -10);
module:hook("s2s-authenticated", make_authenticated, -1);
+ module:hook("s2s-read-timeout", keepalive, -1);
+ module:hook_stanza("http://etherx.jabber.org/streams", "features", function (session, stanza)
+ if session.type == "s2sout" then
+ -- Stream is authenticated and we are seem to be done with feature negotiation,
+ -- so the stream is ready for stanzas. RFC 6120 Section 4.3
+ mark_connected(session);
+ end
+ end, -1);
end
-- Stream is authorised, and ready for normal stanzas
function mark_connected(session)
local sendq, send = session.sendq, session.sends2s;
-
+
local from, to = session.from_host, session.to_host;
-
- session.log("info", "%s s2s connection %s->%s complete", session.direction, from, to);
+
+ session.log("info", "%s s2s connection %s->%s complete", session.direction:gsub("^.", string.upper), from, to);
local event_data = { session = session };
if session.type == "s2sout" then
fire_global_event("s2sin-established", event_data);
hosts[to].events.fire_event("s2sin-established", event_data);
end
-
+
if session.direction == "outgoing" then
if sendq then
session.log("debug", "sending %d queued stanzas across new outgoing connection to %s", #sendq, session.to_host);
end
session.sendq = nil;
end
-
+
session.ip_hosts = nil;
session.srv_hosts = nil;
end
return false;
end
session.log("debug", "connection %s->%s is now authenticated for %s", session.from_host, session.to_host, host);
-
- mark_connected(session);
-
+
+ if (session.type == "s2sout" and session.external_auth ~= "succeeded") or session.type == "s2sin" then
+ -- Stream either used dialback for authentication or is an incoming stream.
+ mark_connected(session);
+ end
+
return true;
end
--- Helper to check that a session peer's certificate is valid
-local function check_cert_status(session)
+function check_cert_status(session)
local host = session.direction == "outgoing" and session.to_host or session.from_host
local conn = session.conn:socket()
local cert
cert = conn:getpeercertificate()
end
- if cert then
- local chain_valid, errors;
- if conn.getpeerverification then
- chain_valid, errors = conn:getpeerverification();
- elseif conn.getpeerchainvalid then -- COMPAT mw/luasec-hg
- chain_valid, errors = conn:getpeerchainvalid();
- errors = (not chain_valid) and { { errors } } or nil;
- else
- chain_valid, errors = false, { { "Chain verification not supported by this version of LuaSec" } };
- end
- -- Is there any interest in printing out all/the number of errors here?
- if not chain_valid then
- (session.log or log)("debug", "certificate chain validation result: invalid");
- for depth, t in pairs(errors or NULL) do
- (session.log or log)("debug", "certificate error(s) at depth %d: %s", depth-1, table.concat(t, ", "))
- end
- session.cert_chain_status = "invalid";
- else
- (session.log or log)("debug", "certificate chain validation result: valid");
- session.cert_chain_status = "valid";
-
- -- We'll go ahead and verify the asserted identity if the
- -- connecting server specified one.
- if host then
- if cert_verify_identity(host, "xmpp-server", cert) then
- session.cert_identity_status = "valid"
- else
- session.cert_identity_status = "invalid"
- end
- (session.log or log)("debug", "certificate identity validation result: %s", session.cert_identity_status);
- end
- end
- end
return module:fire_event("s2s-check-certificate", { host = host, session = session, cert = cert });
end
function stream_callbacks.streamopened(session, attr)
local send = session.sends2s;
-
+
session.version = tonumber(attr.version) or 0;
-
+
-- TODO: Rename session.secure to session.encrypted
if session.secure == false then
session.secure = true;
+ session.encrypted = true;
- -- Check if TLS compression is used
local sock = session.conn:socket();
if sock.info then
- session.compressed = sock:info"compression";
- elseif sock.compression then
- session.compressed = sock:compression(); --COMPAT mw/luasec-hg
+ local info = sock:info();
+ (session.log or log)("info", "Stream encrypted (%s with %s)", info.protocol, info.cipher);
+ session.compressed = info.compression;
+ else
+ (session.log or log)("info", "Stream encrypted");
+ session.compressed = sock.compression and sock:compression(); --COMPAT mw/luasec-hg
end
end
if session.direction == "incoming" then
-- Send a reply stream header
-
+
-- Validate to/from
local to, from = nameprep(attr.to), nameprep(attr.from);
if not to and attr.to then -- COMPAT: Some servers do not reliably set 'to' (especially on stream restarts)
session:close({ condition = "improper-addressing", text = "Invalid 'from' address" });
return;
end
-
+
-- Set session.[from/to]_host if they have not been set already and if
-- this session isn't already authenticated
if session.type == "s2sin_unauthed" and from and not session.from_host then
session:close({ condition = "improper-addressing", text = "New stream 'to' attribute does not match original" });
return;
end
-
+
-- For convenience we'll put the sanitised values into these variables
to, from = session.to_host, session.from_host;
-
+
session.streamid = uuid_gen();
(session.log or log)("debug", "Incoming s2s received %s", st.stanza("stream:stream", attr):top_tag());
if to then
session:open_stream(session.to_host, session.from_host)
if session.version >= 1.0 then
local features = st.stanza("stream:features");
-
+
if to then
hosts[to].events.fire_event("s2s-stream-features", { origin = session, features = features });
else
(session.log or log)("warn", "No 'to' on stream header from %s means we can't offer any features", from or session.ip or "unknown host");
end
-
+
log("debug", "Sending stream features: %s", tostring(features));
send(features);
end
session.notopen = nil;
elseif session.direction == "outgoing" then
session.notopen = nil;
- -- If we are just using the connection for verifying dialback keys, we won't try and auth it
- if not attr.id then error("stream response did not give us a streamid!!!"); end
+ if not attr.id then
+ log("error", "Stream response did not give us a stream id!");
+ session:close({ condition = "undefined-condition", text = "Missing stream ID" });
+ return;
+ end
session.streamid = attr.id;
if session.secure and not session.cert_chain_status then
end
end
session.send_buffer = nil;
-
+
-- If server is pre-1.0, don't wait for features, just do dialback
if session.version < 1.0 then
if not session.dialback_verifying then
session.sends2s("</stream:stream>");
function session.sends2s() return false; end
-
+
local reason = remote_reason or (reason and (reason.text or reason.condition)) or reason;
- session.log("info", "%s s2s stream %s->%s closed: %s", session.direction, session.from_host or "(unknown host)", session.to_host or "(unknown host)", reason or "stream closed");
-
+ session.log("info", "%s s2s stream %s->%s closed: %s", session.direction:gsub("^.", string.upper), session.from_host or "(unknown host)", session.to_host or "(unknown host)", reason or "stream closed");
+
-- Authenticated incoming stream may still be sending us stanzas, so wait for </stream:stream> from remote
local conn = session.conn;
if reason == nil and not session.notopen and session.type == "s2sin" then
end
end
-function session_open_stream(session, from, to)
- local attr = {
- ["xmlns:stream"] = 'http://etherx.jabber.org/streams',
- xmlns = 'jabber:server',
- version = session.version and (session.version > 0 and "1.0" or nil),
- ["xml:lang"] = 'en',
- id = session.streamid,
- from = from, to = to,
- }
+function session_stream_attrs(session, from, to, attr)
if not from or (hosts[from] and hosts[from].modules.dialback) then
attr["xmlns:db"] = 'jabber:server:dialback';
end
-
- session.sends2s("<?xml version='1.0'?>");
- session.sends2s(st.stanza("stream:stream", attr):top_tag());
- return true;
end
-- Session initialization logic shared by incoming and outgoing
local function initialize_session(session)
local stream = new_xmpp_stream(session, stream_callbacks);
+ local log = session.log or log;
session.stream = stream;
-
+
session.notopen = true;
-
+
function session.reset_stream()
session.notopen = true;
session.streamid = nil;
session.stream:reset();
end
- session.open_stream = session_open_stream;
-
- local filter = session.filter;
+ session.stream_attrs = session_stream_attrs;
+
+ local filter = initialize_filters(session);
+ local conn = session.conn;
+ local w = conn.write;
+
+ function session.sends2s(t)
+ log("debug", "sending: %s", t.top_tag and t:top_tag() or t:match("^[^>]*>?"));
+ if t.name then
+ t = filter("stanzas/out", t);
+ end
+ if t then
+ t = filter("bytes/out", tostring(t));
+ if t then
+ return w(conn, t);
+ end
+ end
+ end
+
function session.data(data)
data = filter("bytes/in", data);
if data then
local ok, err = stream:feed(data);
if ok then return; end
- (session.log or log)("warn", "Received invalid XML: %s", data);
- (session.log or log)("warn", "Problem was: %s", err);
+ log("warn", "Received invalid XML: %s", data);
+ log("warn", "Problem was: %s", err);
session:close("not-well-formed");
end
end
return handlestanza(session, stanza);
end
+ module:fire_event("s2s-created", { session = session });
+
add_task(connect_timeout, function ()
if session.type == "s2sin" or session.type == "s2sout" then
return; -- Ok, we're connected
session = s2s_new_incoming(conn);
sessions[conn] = session;
session.log("debug", "Incoming s2s connection");
-
- local filter = initialize_filters(session);
- local w = conn.write;
- session.sends2s = function (t)
- log("debug", "sending: %s", t.top_tag and t:top_tag() or t:match("^([^>]*>?)"));
- if t.name then
- t = filter("stanzas/out", t);
- end
- if t then
- t = filter("bytes/out", tostring(t));
- if t then
- return w(conn, t);
- end
- end
- end
-
initialize_session(session);
else -- Outgoing session connected
session:open_stream(session.from_host, session.to_host);
end
+ session.ip = conn:ip();
end
function listener.onincoming(conn, data)
session.data(data);
end
end
-
+
function listener.onstatus(conn, status)
if status == "ssl-handshake-complete" then
local session = sessions[conn];
if err and session.direction == "outgoing" and session.notopen then
(session.log or log)("debug", "s2s connection attempt failed: %s", err);
if s2sout.attempt_connection(session, err) then
- (session.log or log)("debug", "...so we're going to try another target");
return; -- Session lives for now
end
end
end
end
+function listener.onreadtimeout(conn)
+ local session = sessions[conn];
+ if session then
+ return (hosts[session.host] or prosody).events.fire_event("s2s-read-timeout", { session = session });
+ end
+end
+
function listener.register_outgoing(conn, session)
- session.direction = "outgoing";
sessions[conn] = session;
initialize_session(session);
end
+ function listener.ondetach(conn)
+ sessions[conn] = nil;
+ end
+
function check_auth_policy(event)
local host, session = event.host, event.session;
local must_secure = secure_auth;
elseif must_secure and insecure_domains[host] then
must_secure = false;
end
-
+
if must_secure and (session.cert_chain_status ~= "valid" or session.cert_identity_status ~= "valid") then
module:log("warn", "Forbidding insecure connection to/from %s", host or session.ip or "(unknown host)");
if session.direction == "incoming" then