Merge 0.10->trunk
authorMatthew Wild <mwild1@gmail.com>
Wed, 21 Jan 2015 01:29:00 +0000 (01:29 +0000)
committerMatthew Wild <mwild1@gmail.com>
Wed, 21 Jan 2015 01:29:00 +0000 (01:29 +0000)
13 files changed:
core/certmanager.lua
core/hostmanager.lua
core/moduleapi.lua
core/portmanager.lua
core/rostermanager.lua
core/sessionmanager.lua
core/statsmanager.lua [new file with mode: 0644]
core/storagemanager.lua
plugins/mod_carbons.lua [new file with mode: 0644]
plugins/mod_pep.lua
prosody
util/hex.lua
util/statistics.lua [new file with mode: 0644]

index 7ad7b034a61df14b7370c7886656fe2ca51b4810..b2c358fe489e7caba6b7e6fd468e638cde141250 100644 (file)
@@ -8,7 +8,7 @@
 
 local configmanager = require "core.configmanager";
 local log = require "util.logger".init("certmanager");
-local ssl = ssl;
+local ssl = _G.ssl;
 local ssl_newcontext = ssl and ssl.newcontext;
 local new_config = require"util.sslconfig".new;
 
index ca532625a4275513f240d4e66a216a360824aeee..b13b1944497c48569bc89643b26c29e8ac82babf 100644 (file)
@@ -13,7 +13,6 @@ local disco_items = require "util.multitable".new();
 local NULL = {};
 
 local jid_split = require "util.jid".split;
-local uuid_gen = require "util.uuid".generate;
 
 local log = require "util.logger".init("hostmanager");
 
index 754d7497e54cd7c8bacc4234b5c1b60bbedab2ce..d6aa0ef0f3d769ef566bf133b7ca7d0427bc7318 100644 (file)
@@ -14,6 +14,7 @@ local logger = require "util.logger";
 local pluginloader = require "util.pluginloader";
 local timer = require "util.timer";
 local resolve_relative_path = require"util.paths".resolve_relative_path;
+local measure = require "core.statsmanager".measure;
 
 local t_insert, t_remove, t_concat = table.insert, table.remove, table.concat;
 local error, setmetatable, type = error, setmetatable, type;
@@ -390,6 +391,10 @@ function api:open_store(name, type)
        return require"core.storagemanager".open(self.host, name or self.name, type);
 end
 
+function api:measure(name, type)
+       return measure(type, "/"..self.host.."/mod_"..self.name.."/"..name);
+end
+
 function api.init(mm)
        modulemanager = mm;
        return api;
index bc2d42648615eed7041b8a230e5844ad0bbddd8c..eab2412a8fe7ade02547d463bd5a3fb8aed08bb7 100644 (file)
@@ -9,7 +9,7 @@ local set = require "util.set";
 
 local table = table;
 local setmetatable, rawset, rawget = setmetatable, rawset, rawget;
-local type, tonumber, tostring, ipairs, pairs = type, tonumber, tostring, ipairs, pairs;
+local type, tonumber, tostring, ipairs = type, tonumber, tostring, ipairs;
 
 local prosody = prosody;
 local fire_event = prosody.events.fire_event;
index 5266afb52679b5bbe7677df3180350de8bc847ce..8c7612b450fe697fe21b81f9400c004b39743c51 100644 (file)
@@ -15,7 +15,7 @@ local pairs = pairs;
 local tostring = tostring;
 
 local hosts = hosts;
-local bare_sessions = bare_sessions;
+local bare_sessions = prosody.bare_sessions;
 
 local datamanager = require "util.datamanager"
 local um_user_exists = require "core.usermanager".user_exists;
index 65e5156cbdca694aecd2eb04a372e52d5ba5bfc7..09920b7dc1a78eaa115af8e2bdfd05f5d39f7369 100644 (file)
@@ -10,8 +10,8 @@ local tostring, setmetatable = tostring, setmetatable;
 local pairs, next= pairs, next;
 
 local hosts = hosts;
-local full_sessions = full_sessions;
-local bare_sessions = bare_sessions;
+local full_sessions = prosody.full_sessions;
+local bare_sessions = prosody.bare_sessions;
 
 local logger = require "util.logger";
 local log = logger.init("sessionmanager");
diff --git a/core/statsmanager.lua b/core/statsmanager.lua
new file mode 100644 (file)
index 0000000..62d217e
--- /dev/null
@@ -0,0 +1,67 @@
+
+local stats = require "util.statistics".new();
+local config = require "core.configmanager";
+local log = require "util.logger".init("stats");
+local timer = require "util.timer";
+local fire_event = prosody.events.fire_event;
+
+local stats_config = config.get("*", "statistics_interval");
+local stats_interval = tonumber(stats_config);
+if stats_config and not stats_interval then
+       log("error", "Invalid 'statistics_interval' setting, statistics will be disabled");
+end
+
+local measure, collect;
+local latest_stats = {};
+local changed_stats = {};
+local stats_extra = {};
+
+if stats_interval then
+       log("debug", "Statistics collection is enabled every %d seconds", stats_interval);
+       function measure(type, name)
+               local f = assert(stats[type], "unknown stat type: "..type);
+               return f(name);
+       end
+
+       local mark_collection_start = measure("times", "stats.collection");
+       local mark_processing_start = measure("times", "stats.processing");
+
+       function collect()
+               local mark_collection_done = mark_collection_start();
+               changed_stats, stats_extra = {}, {};
+               for stat_name, getter in pairs(stats.get_stats()) do
+                       local type, value, extra = getter();
+                       local old_value = latest_stats[stat_name];
+                       latest_stats[stat_name] = value;
+                       if value ~= old_value then
+                               changed_stats[stat_name] = value;
+                       end
+                       if extra then
+                               stats_extra[stat_name] = extra;
+                       end
+               end
+               mark_collection_done();
+               local mark_processing_done = mark_processing_start();
+               fire_event("stats-updated", { stats = latest_stats, changed_stats = changed_stats, stats_extra = stats_extra });
+               mark_processing_done();
+               return stats_interval;
+       end
+
+       timer.add_task(stats_interval, collect);
+else
+       log("debug", "Statistics collection is disabled");
+       -- nop
+       function measure()
+               return measure;
+       end
+       function collect()
+       end
+end
+
+return {
+       measure = measure;
+       collect = collect;
+       get_stats = function ()
+               return latest_stats, changed_stats, stats_extra;
+       end;
+};
index b2ad29d02e9a51f7bbd0e34698fc676089ff47ce..d16bdce5deaa2d8839b00ffebe0c96f26b7eba06 100644 (file)
@@ -1,5 +1,5 @@
 
-local error, type, pairs = error, type, pairs;
+local type, pairs = type, pairs;
 local setmetatable = setmetatable;
 
 local config = require "core.configmanager";
diff --git a/plugins/mod_carbons.lua b/plugins/mod_carbons.lua
new file mode 100644 (file)
index 0000000..5124280
--- /dev/null
@@ -0,0 +1,111 @@
+-- XEP-0280: Message Carbons implementation for Prosody
+-- Copyright (C) 2011 Kim Alvefur
+--
+-- This file is MIT/X11 licensed.
+
+local st = require "util.stanza";
+local jid_bare = require "util.jid".bare;
+local xmlns_carbons = "urn:xmpp:carbons:2";
+local xmlns_forward = "urn:xmpp:forward:0";
+local full_sessions, bare_sessions = full_sessions, bare_sessions;
+
+local function toggle_carbons(event)
+       local origin, stanza = event.origin, event.stanza;
+       local state = stanza.tags[1].name;
+       module:log("debug", "%s %sd carbons", origin.full_jid, state);
+       origin.want_carbons = state == "enable" and stanza.tags[1].attr.xmlns;
+       return origin.send(st.reply(stanza));
+end
+module:hook("iq-set/self/"..xmlns_carbons..":disable", toggle_carbons);
+module:hook("iq-set/self/"..xmlns_carbons..":enable", toggle_carbons);
+
+local function message_handler(event, c2s)
+       local origin, stanza = event.origin, event.stanza;
+       local orig_type = stanza.attr.type;
+       local orig_from = stanza.attr.from;
+       local orig_to = stanza.attr.to;
+       
+       if not (orig_type == nil
+                       or orig_type == "normal"
+                       or orig_type == "chat") then
+               return -- No carbons for messages of type error or headline
+       end
+
+       -- Stanza sent by a local client
+       local bare_jid = jid_bare(orig_from);
+       local target_session = origin;
+       local top_priority = false;
+       local user_sessions = bare_sessions[bare_jid];
+
+       -- Stanza about to be delivered to a local client
+       if not c2s then
+               bare_jid = jid_bare(orig_to);
+               target_session = full_sessions[orig_to];
+               user_sessions = bare_sessions[bare_jid];
+               if not target_session and user_sessions then
+                       -- The top resources will already receive this message per normal routing rules,
+                       -- so we are going to skip them in order to avoid sending duplicated messages.
+                       local top_resources = user_sessions.top_resources;
+                       top_priority = top_resources and top_resources[1].priority
+               end
+       end
+
+       if not user_sessions then
+               module:log("debug", "Skip carbons for offline user");
+               return -- No use in sending carbons to an offline user
+       end
+
+       if stanza:get_child("private", xmlns_carbons) then
+               if not c2s then
+                       stanza:maptags(function(tag)
+                               if not ( tag.attr.xmlns == xmlns_carbons and tag.name == "private" ) then
+                                       return tag;
+                               end
+                       end);
+               end
+               module:log("debug", "Message tagged private, ignoring");
+               return
+       elseif stanza:get_child("no-copy", "urn:xmpp:hints") then
+               module:log("debug", "Message has no-copy hint, ignoring");
+               return
+       elseif stanza:get_child("x", "http://jabber.org/protocol/muc#user") then
+               module:log("debug", "MUC PM, ignoring");
+               return
+       end
+
+       -- Create the carbon copy and wrap it as per the Stanza Forwarding XEP
+       local copy = st.clone(stanza);
+       copy.attr.xmlns = "jabber:client";
+       local carbon = st.message{ from = bare_jid, type = orig_type, }
+               :tag(c2s and "sent" or "received", { xmlns = xmlns_carbons })
+                       :tag("forwarded", { xmlns = xmlns_forward })
+                               :add_child(copy):reset();
+
+       user_sessions = user_sessions and user_sessions.sessions;
+       for _, session in pairs(user_sessions) do
+               -- Carbons are sent to resources that have enabled it
+               if session.want_carbons
+               -- but not the resource that sent the message, or the one that it's directed to
+               and session ~= target_session
+               -- and isn't among the top resources that would receive the message per standard routing rules
+               and (c2s or session.priority ~= top_priority) then
+                       carbon.attr.to = session.full_jid;
+                       module:log("debug", "Sending carbon to %s", session.full_jid);
+                       session.send(carbon);
+               end
+       end
+end
+
+local function c2s_message_handler(event)
+       return message_handler(event, true)
+end
+
+-- Stanzas sent by local clients
+module:hook("pre-message/host", c2s_message_handler, 1);
+module:hook("pre-message/bare", c2s_message_handler, 1);
+module:hook("pre-message/full", c2s_message_handler, 1);
+-- Stanzas to local clients
+module:hook("message/bare", message_handler, 1);
+module:hook("message/full", message_handler, 1);
+
+module:add_feature(xmlns_carbons);
index 752cd28c414f019d2fb09701da6711fc61789dc7..a6916d1f8963b5345bace705d454be985df0590e 100644 (file)
@@ -41,7 +41,8 @@ local function subscription_presence(user_bare, recipient)
        return is_contact_subscribed(username, host, recipient_bare);
 end
 
-local function publish(session, node, id, item)
+module:hook("pep-publish-item", function (event)
+       local session, node, id, item = event.session, event.node, event.id, event.item;
        item.attr.xmlns = nil;
        local disable = #item.tags ~= 1 or #item.tags[1] == 0;
        if #item.tags == 0 then item.name = "retract"; end
@@ -72,7 +73,8 @@ local function publish(session, node, id, item)
                        core_post_stanza(session, stanza);
                end
        end
-end
+end);
+
 local function publish_all(user, recipient, session)
        local d = data[user];
        local notify = recipients[user] and recipients[user][recipient];
@@ -172,7 +174,9 @@ module:hook("iq/bare/http://jabber.org/protocol/pubsub:pubsub", function(event)
                                local id = payload.attr.id or "1";
                                payload.attr.id = id;
                                session.send(st.reply(stanza));
-                               publish(session, node, id, st.clone(payload));
+                               module:fire_event("pep-publish-item", {
+                                       node = node, actor = session.jid, id = id, session = session, item = st.clone(payload);
+                               });
                                return true;
                        end
                end
diff --git a/prosody b/prosody
index e8f81d5d6ec6a8144237cd37635a7f142c303ba7..e6a23d8e703f6fb172f188d45a6e87a089fde333 100755 (executable)
--- a/prosody
+++ b/prosody
@@ -292,6 +292,7 @@ function load_secondary_libraries()
        require "util.import"
        require "util.xmppstream"
        require "core.stanza_router"
+       require "core.statsmanager"
        require "core.hostmanager"
        require "core.portmanager"
        require "core.modulemanager"
index b21ee17e5fe9cb96e4986f9fab89a0570cd4f97e..e41f48633de6faafbc4c94311c641a5fb1e14108 100644 (file)
@@ -1,19 +1,25 @@
 local s_char = string.char;
+local s_format = string.format;
+local s_gsub = string.gsub;
 
-local function char_to_hex(c)
-       return ("%02x"):format(c:byte())
-end
+local char_to_hex = {};
+local hex_to_char = {};
 
-local function hex_to_char(h)
-       return s_char(tonumber(h, 16));
+do
+       local char, hex;
+       for i = 0,255 do
+               char, hex = s_char(i), s_format("%02x", i);
+               char_to_hex[char] = hex;
+               hex_to_char[hex] = char;
+       end
 end
 
 local function to(s)
-       return s:gsub(".", char_to_hex);
+       return (s_gsub(s, ".", char_to_hex));
 end
 
 local function from(s)
-       return s:gsub("..", hex_to_char);
+       return (s_gsub(s, "..", hex_to_char));
 end
 
 return { to = to, from = from }
diff --git a/util/statistics.lua b/util/statistics.lua
new file mode 100644 (file)
index 0000000..08c765a
--- /dev/null
@@ -0,0 +1,160 @@
+local t_sort = table.sort
+local m_floor = math.floor;
+local time = require "socket".gettime;
+
+local function nop_function() end
+
+local function percentile(arr, length, pc)
+       local n = pc/100 * (length + 1);
+       local k, d = m_floor(n), n%1;
+       if k == 0 then
+               return arr[1];
+       elseif k >= length then
+               return arr[length];
+       end
+       return arr[k] + d*(arr[k+1] - arr[k]);
+end
+
+local function new_registry(config)
+       config = config or {};
+       local duration_sample_interval = config.duration_sample_interval or 5;
+       local duration_max_samples = config.duration_max_stored_samples or 5000;
+
+       local function get_distribution_stats(events, n_actual_events, since, new_time, units)
+               local n_stored_events = #events;
+               t_sort(events);
+               local sum = 0;
+               for i = 1, n_stored_events do
+                       sum = sum + events[i];
+               end
+
+               return {
+                       samples = events;
+                       sample_count = n_stored_events;
+                       count = n_actual_events,
+                       rate = n_actual_events/(new_time-since);
+                       average = n_stored_events > 0 and sum/n_stored_events or 0,
+                       min = events[1] or 0,
+                       max = events[n_stored_events] or 0,
+                       units = units,
+               };
+       end
+
+
+       local registry = {};
+       local methods;
+       methods = {
+               amount = function (name, initial)
+                       local v = initial or 0;
+                       registry[name..":amount"] = function () return "amount", v; end
+                       return function (new_v) v = new_v; end
+               end;
+               counter = function (name, initial)
+                       local v = initial or 0;
+                       registry[name..":amount"] = function () return "amount", v; end
+                       return function (delta)
+                               v = v + delta;
+                       end;
+               end;
+               rate = function (name)
+                       local since, n = time(), 0;
+                       registry[name..":rate"] = function ()
+                               local t = time();
+                               local stats = {
+                                       rate = n/(t-since);
+                                       count = n;
+                               };
+                               since, n = t, 0;
+                               return "rate", stats.rate, stats;
+                       end;
+                       return function ()
+                               n = n + 1;
+                       end;
+               end;
+               distribution = function (name, unit, type)
+                       type = type or "distribution";
+                       local events, last_event = {}, 0;
+                       local n_actual_events = 0;
+                       local since = time();
+
+                       registry[name..":"..type] = function ()
+                               local new_time = time();
+                               local stats = get_distribution_stats(events, n_actual_events, since, new_time, unit);
+                               events, last_event = {}, 0;
+                               n_actual_events = 0;
+                               since = new_time;
+                               return type, stats.average, stats;
+                       end;
+
+                       return function (value)
+                               n_actual_events = n_actual_events + 1;
+                               if n_actual_events%duration_sample_interval > 0 then
+                                       last_event = (last_event%duration_max_samples) + 1;
+                                       events[last_event] = value;
+                               end
+                       end;
+               end;
+               sizes = function (name)
+                       return methods.distribution(name, "bytes", "size");
+               end;
+               times = function (name)
+                       local events, last_event = {}, 0;
+                       local n_actual_events = 0;
+                       local since = time();
+
+                       registry[name..":duration"] = function ()
+                               local new_time = time();
+                               local stats = get_distribution_stats(events, n_actual_events, since, new_time, "seconds");
+                               events, last_event = {}, 0;
+                               n_actual_events = 0;
+                               since = new_time;
+                               return "duration", stats.average, stats;
+                       end;
+
+                       return function ()
+                               n_actual_events = n_actual_events + 1;
+                               if n_actual_events%duration_sample_interval > 0 then
+                                       return nop_function;
+                               end
+
+                               local start_time = time();
+                               return function ()
+                                       local end_time = time();
+                                       local duration = end_time - start_time;
+                                       last_event = (last_event%duration_max_samples) + 1;
+                                       events[last_event] = duration;
+                               end
+                       end;
+               end;
+
+               get_stats = function ()
+                       return registry;
+               end;
+       };
+       return methods;
+end
+
+return {
+       new = new_registry;
+       get_histogram = function (duration, n_buckets)
+               n_buckets = n_buckets or 100;
+               local events, n_events = duration.samples, duration.sample_count;
+               if not (events and n_events) then
+                       return nil, "not a valid distribution stat";
+               end
+               local histogram = {};
+
+               for i = 1, 100, 100/n_buckets do
+                       histogram[i] = percentile(events, n_events, i);
+               end
+               return histogram;
+       end;
+
+       get_percentile = function (duration, pc)
+               local events, n_events = duration.samples, duration.sample_count;
+               if not (events and n_events) then
+                       return nil, "not a valid distribution stat";
+               end
+               return percentile(events, n_events, pc);
+       end;
+}