1 local log = require "util.logger".init("util.async");
3 local function runner_continue(thread)
4 -- ASSUMPTION: runner is in 'waiting' state (but we don't have the runner to know for sure)
5 if coroutine.status(thread) ~= "suspended" then -- This should suffice
8 local ok, state, runner = coroutine.resume(thread);
11 while debug.getinfo(thread, level, "") do level = level + 1; end
12 ok, runner = debug.getlocal(thread, level-1, 1);
13 local error_handler = runner.watchers.error;
14 if error_handler then error_handler(runner, debug.traceback(thread, state)); end
15 elseif state == "ready" then
16 -- If state is 'ready', it is our responsibility to update runner.state from 'waiting'.
17 -- We also have to :run(), because the queue might have further items that will not be
18 -- processed otherwise. FIXME: It's probably best to do this in a nexttick (0 timer).
19 runner.state = "ready";
25 local function waiter(num)
26 local thread = coroutine.running();
28 error("Not running in an async context, see http://prosody.im/doc/developers/async");
33 if num == 0 then return; end -- already done
35 coroutine.yield("wait");
38 if num == 0 and waiting then
39 runner_continue(thread);
41 error("done() called too many times");
47 runner_mt.__index = runner_mt;
49 local function runner_create_thread(func, self)
50 local thread = coroutine.create(function (self)
52 func(coroutine.yield("ready", self));
55 assert(coroutine.resume(thread, self)); -- Start it up, it will return instantly to wait for the first input
59 local empty_watchers = {};
60 local function runner(func, watchers, data)
61 return setmetatable({ func = func, thread = false, state = "ready", notified_state = "ready",
62 queue = {}, watchers = watchers or empty_watchers, data = data }
66 function runner_mt:run(input)
68 table.insert(self.queue, input);
70 if self.state ~= "ready" then
71 return true, self.state, #self.queue;
74 local q, thread = self.queue, self.thread;
75 if not thread or coroutine.status(thread) == "dead" then
76 thread = runner_create_thread(self.func, self);
80 local n, state, err = #q, self.state, nil;
81 self.state = "running";
82 while n > 0 and state == "ready" do
86 local ok, new_state = coroutine.resume(thread, input);
88 consumed, state, err = i, "ready", debug.traceback(thread, new_state);
91 elseif new_state == "wait" then
92 consumed, state = i, "waiting";
96 if not consumed then consumed = n; end
101 q[i] = q[consumed+i];
106 if err or state ~= self.notified_state then
110 self.notified_state = state;
112 local handler = self.watchers[state];
113 if handler then handler(self, err); end
115 return true, state, n;
118 function runner_mt:enqueue(input)
119 table.insert(self.queue, input);
122 return { waiter = waiter, runner = runner };