X-Git-Url: https://git.enpas.org/?a=blobdiff_plain;f=util%2Fasync.lua;h=afbaba5c62df52e434b40040118db5fe4de1af63;hb=b894a262724d2912d61f2499aba6b134d572ae47;hp=9036a87631dd8f77b24042dbe896aea152b66df3;hpb=70d5cbff4fc9dcc04f4f440709ed7d2339d358d7;p=prosody.git diff --git a/util/async.lua b/util/async.lua index 9036a876..968ec804 100644 --- a/util/async.lua +++ b/util/async.lua @@ -22,21 +22,60 @@ local function runner_continue(thread) return true; end -function waiter(num) +local function waiter(num) local thread = coroutine.running(); if not thread then error("Not running in an async context, see http://prosody.im/doc/developers/async"); end num = num or 1; + local waiting; return function () + if num == 0 then return; end -- already done + waiting = true; coroutine.yield("wait"); end, function () num = num - 1; - if num == 0 then - if not runner_continue(thread) then - error("done() called without wait()!"); + if num == 0 and waiting then + runner_continue(thread); + elseif num < 0 then + error("done() called too many times"); + end + end; +end + +local function guarder() + local guards = {}; + return function (id, func) + local thread = coroutine.running(); + if not thread then + error("Not running in an async context, see http://prosody.im/doc/developers/async"); + end + local guard = guards[id]; + if not guard then + guard = {}; + guards[id] = guard; + log("debug", "New guard!"); + else + table.insert(guard, thread); + log("debug", "Guarded. %d threads waiting.", #guard) + coroutine.yield("wait"); + end + local function exit() + local next_waiting = table.remove(guard, 1); + if next_waiting then + log("debug", "guard: Executing next waiting thread (%d left)", #guard) + runner_continue(next_waiting); + else + log("debug", "Guard off duty.") + guards[id] = nil; end end + if func then + func(); + exit(); + return; + end + return exit; end; end @@ -54,7 +93,7 @@ local function runner_create_thread(func, self) end local empty_watchers = {}; -function runner(func, watchers, data) +local function runner(func, watchers, data) return setmetatable({ func = func, thread = false, state = "ready", notified_state = "ready", queue = {}, watchers = watchers or empty_watchers, data = data } , runner_mt); @@ -85,7 +124,7 @@ function runner_mt:run(input) consumed, state, err = i, "ready", debug.traceback(thread, new_state); self.thread = nil; break; - elseif state == "wait" then + elseif new_state == "wait" then consumed, state = i, "waiting"; break; end @@ -100,8 +139,12 @@ function runner_mt:run(input) n = #q; end self.state = state; - if state ~= self.notified_state then - self.notified_state = state; + if err or state ~= self.notified_state then + if err then + state = "error" + else + self.notified_state = state; + end local handler = self.watchers[state]; if handler then handler(self, err); end end @@ -112,4 +155,4 @@ function runner_mt:enqueue(input) table.insert(self.queue, input); end -return { waiter = waiter, runner = runner }; +return { waiter = waiter, guarder = guarder, runner = runner };