X-Git-Url: https://git.enpas.org/?a=blobdiff_plain;f=util%2Fasync.lua;h=968ec80495489b51b66a8e1e1ba6f8c201b7e14d;hb=a8958cbe5ebad3abc12efa7a76c8aedac8f21389;hp=c81f8639b490b30905af66ac3428e70fe6eedbe0;hpb=cb6f85d7c20eb0883135c6257c6ad87aaabab05b;p=prosody.git diff --git a/util/async.lua b/util/async.lua index c81f8639..968ec804 100644 --- a/util/async.lua +++ b/util/async.lua @@ -37,10 +37,48 @@ local function waiter(num) num = num - 1; if num == 0 and waiting then runner_continue(thread); + elseif num < 0 then + error("done() called too many times"); end end; end +local function guarder() + local guards = {}; + return function (id, func) + local thread = coroutine.running(); + if not thread then + error("Not running in an async context, see http://prosody.im/doc/developers/async"); + end + local guard = guards[id]; + if not guard then + guard = {}; + guards[id] = guard; + log("debug", "New guard!"); + else + table.insert(guard, thread); + log("debug", "Guarded. %d threads waiting.", #guard) + coroutine.yield("wait"); + end + local function exit() + local next_waiting = table.remove(guard, 1); + if next_waiting then + log("debug", "guard: Executing next waiting thread (%d left)", #guard) + runner_continue(next_waiting); + else + log("debug", "Guard off duty.") + guards[id] = nil; + end + end + if func then + func(); + exit(); + return; + end + return exit; + end; +end + local runner_mt = {}; runner_mt.__index = runner_mt; @@ -101,8 +139,12 @@ function runner_mt:run(input) n = #q; end self.state = state; - if state ~= self.notified_state then - self.notified_state = state; + if err or state ~= self.notified_state then + if err then + state = "error" + else + self.notified_state = state; + end local handler = self.watchers[state]; if handler then handler(self, err); end end @@ -113,4 +155,4 @@ function runner_mt:enqueue(input) table.insert(self.queue, input); end -return { waiter = waiter, runner = runner }; +return { waiter = waiter, guarder = guarder, runner = runner };