return driver, driver_name;
end
- function open(host, store, typ)
+local map_shim_mt = {
+ __index = {
+ get = function(self, username, key)
+ local ret, err = self.keyval_store:get(username);
+ if ret == nil then return nil, err end
+ return ret[key];
+ end;
+ set = function(self, username, key, data)
+ local current, err = self.keyval_store:get(username);
+ if current == nil then
+ if err then
+ return nil, err;
+ else
+ current = {};
+ end
+ end
+ current[key] = data;
+ return self.keyval_store:set(username, current);
+ end;
+ };
+}
+local function create_map_shim(host, store)
+ local keyval_store, err = open(host, store, "keyval");
+ if keyval_store == nil then return nil, err end
+ return setmetatable({
+ keyval_store = keyval_store;
+ }, map_shim_mt);
+end
+
+ local function open(host, store, typ)
local driver, driver_name = get_driver(host, store);
local ret, err = driver:open(store, typ);
if not ret then
--function handleclient( client, ip, port, server, pattern, listener, _, sslctx ) -- creates an client interface
end
- function addclient( addr, serverport, listener, pattern, localaddr, localport, sslcfg, startssl )
- local client, err = socket.tcp() -- creating new socket
- if not client then
- debug( "cannot create socket:", err )
- return nil, err
+ function addclient( addr, serverport, listener, pattern, sslctx, typ )
+ if sslctx and not has_luasec then
+ debug "need luasec, but not available"
+ return nil, "luasec not found"
end
- client:settimeout( 0 ) -- set nonblocking
- if localaddr then
- local res, err = client:bind( localaddr, localport, -1 )
- if not res then
- debug( "cannot bind client:", err )
- return nil, err
+ if getaddrinfo and not typ then
+ local addrinfo, err = getaddrinfo(addr)
+ if not addrinfo then return nil, err end
+ if addrinfo[1] and addrinfo[1].family == "inet6" then
+ typ = "tcp6"
end
end
- local sslctx
- if sslcfg then -- handle ssl/new context
- if not has_luasec then
- debug "need luasec, but not available"
- return nil, "luasec not found"
+ local create = socket[typ or "tcp"]
+ if type( create ) ~= "function" then
+ return nil, "invalid socket type"
- end
+ end
- sslctx, err = sslcfg
- if err then
- debug( "cannot create new ssl context:", err )
+ local client, err = create() -- creating new socket
+ if not client then
+ debug( "cannot create socket:", err )
- return nil, err
- end
+ return nil, err
+ end
- end
+ client:settimeout( 0 ) -- set nonblocking
local res, err = client:connect( addr, serverport ) -- connect
- if res or ( err == "timeout" ) then
- local ip, port = client:getsockname( )
- local interface = wrapclient( client, ip, serverport, listener, pattern, sslctx, startssl )
- interface:_start_connection( startssl )
+ if res or ( err == "timeout" or err == "Operation already in progress" ) then
+ if client.getsockname then
+ addr = client:getsockname( )
- end
++ end
+ local interface = wrapclient( client, addr, serverport, listener, pattern, sslctx )
debug( "new connection id:", interface.id )
return interface, err
else
-- COPYING file in the source package for more information.
--
+local indexedbheap = require "util.indexedbheap";
+local log = require "util.logger".init("timer");
local server = require "net.server";
-local math_min = math.min
-local math_huge = math.huge
local get_time = require "socket".gettime;
-local t_insert = table.insert;
-local pairs = pairs;
local type = type;
-
-local data = {};
-local new_data = {};
+local debug_traceback = debug.traceback;
+local tostring = tostring;
+local xpcall = xpcall;
- module "timer"
+ local _ENV = nil;
-local _add_task;
-if not server.event then
- function _add_task(delay, callback)
- local current_time = get_time();
- delay = delay + current_time;
- if delay >= current_time then
- t_insert(new_data, {delay, callback});
- else
- local r = callback(current_time);
- if r and type(r) == "number" then
- return _add_task(r, callback);
+local _add_task = server.add_task;
- --add_task = _add_task;
+
+local h = indexedbheap.create();
+local params = {};
+local next_time = nil;
+local _id, _callback, _now, _param;
+local function _call() return _callback(_now, _id, _param); end
+local function _traceback_handler(err) log("error", "Traceback[timer]: %s", debug_traceback(tostring(err), 2)); end
+local function _on_timer(now)
+ local peek;
+ while true do
+ peek = h:peek();
+ if peek == nil or peek > now then break; end
+ local _;
+ _, _callback, _id = h:pop();
+ _now = now;
+ _param = params[_id];
+ params[_id] = nil;
+ --item(now, id, _param); -- FIXME pcall
+ local success, err = xpcall(_call, _traceback_handler);
+ if success and type(err) == "number" then
+ h:insert(_callback, err + now, _id); -- re-add
+ params[_id] = _param;
+ end
end
- end
+ next_time = peek;
+ if peek ~= nil then
+ return peek - now;
end
-
- server._addtimer(function()
+end
- function add_task(delay, callback, param)
- local current_time = get_time();
++local function add_task(delay, callback, param)
+ local current_time = get_time();
- if #new_data > 0 then
- for _, d in pairs(new_data) do
- t_insert(data, d);
- end
- new_data = {};
- end
+ local event_time = current_time + delay;
- local next_time = math_huge;
- for i, d in pairs(data) do
- local t, callback = d[1], d[2];
- if t <= current_time then
- data[i] = nil;
- local r = callback(current_time);
- if type(r) == "number" then
- _add_task(r, callback);
- next_time = math_min(next_time, r);
+ local id = h:insert(callback, event_time);
+ params[id] = param;
+ if next_time == nil or event_time < next_time then
+ next_time = event_time;
+ _add_task(next_time - current_time, _on_timer);
- end
+ end
- else
- next_time = math_min(next_time, t - current_time);
+ return id;
- end
- function stop(id)
+ end
++local function stop(id)
+ params[id] = nil;
+ return h:remove(id);
- end
- function reschedule(id, delay)
+ end
- return next_time;
- end);
-else
- local event = server.event;
- local event_base = server.event_base;
- local EVENT_LEAVE = (event.core and event.core.LEAVE) or -1;
-
- function _add_task(delay, callback)
- local event_handle;
- event_handle = event_base:addevent(nil, 0, function ()
- local ret = callback(get_time());
- if ret then
- return 0, ret;
- elseif event_handle then
- return EVENT_LEAVE;
++local function reschedule(id, delay)
+ local current_time = get_time();
+ local event_time = current_time + delay;
+ h:reprioritize(id, delay);
+ if next_time == nil or event_time < next_time then
+ next_time = event_time;
+ _add_task(next_time - current_time, _on_timer);
- end
+ end
- end
- , delay);
- end
+ return id;
end
- return _M;
+ return {
- add_task = _add_task;
++ add_task = add_task;
++ stop = stop;
++ reschedule = reschedule;
+ };
++