util.session: What is the identity of a session?
[prosody.git] / util / async.lua
index c81f8639b490b30905af66ac3428e70fe6eedbe0..968ec80495489b51b66a8e1e1ba6f8c201b7e14d 100644 (file)
@@ -37,10 +37,48 @@ local function waiter(num)
                num = num - 1;
                if num == 0 and waiting then
                        runner_continue(thread);
+               elseif num < 0 then
+                       error("done() called too many times");
                end
        end;
 end
 
+local function guarder()
+       local guards = {};
+       return function (id, func)
+               local thread = coroutine.running();
+               if not thread then
+                       error("Not running in an async context, see http://prosody.im/doc/developers/async");
+               end
+               local guard = guards[id];
+               if not guard then
+                       guard = {};
+                       guards[id] = guard;
+                       log("debug", "New guard!");
+               else
+                       table.insert(guard, thread);
+                       log("debug", "Guarded. %d threads waiting.", #guard)
+                       coroutine.yield("wait");
+               end
+               local function exit()
+                       local next_waiting = table.remove(guard, 1);
+                       if next_waiting then
+                               log("debug", "guard: Executing next waiting thread (%d left)", #guard)
+                               runner_continue(next_waiting);
+                       else
+                               log("debug", "Guard off duty.")
+                               guards[id] = nil;
+                       end
+               end
+               if func then
+                       func();
+                       exit();
+                       return;
+               end
+               return exit;
+       end;
+end
+
 local runner_mt = {};
 runner_mt.__index = runner_mt;
 
@@ -101,8 +139,12 @@ function runner_mt:run(input)
                n = #q;
        end
        self.state = state;
-       if state ~= self.notified_state then
-               self.notified_state = state;
+       if err or state ~= self.notified_state then
+               if err then
+                       state = "error"
+               else
+                       self.notified_state = state;
+               end
                local handler = self.watchers[state];
                if handler then handler(self, err); end
        end
@@ -113,4 +155,4 @@ function runner_mt:enqueue(input)
        table.insert(self.queue, input);
 end
 
-return { waiter = waiter, runner = runner };
+return { waiter = waiter, guarder = guarder, runner = runner };