1 local log = require "util.logger".init("util.async");
3 local function runner_continue(thread)
4 -- ASSUMPTION: runner is in 'waiting' state (but we don't have the runner to know for sure)
5 if coroutine.status(thread) ~= "suspended" then -- This should suffice
8 local ok, state, runner = coroutine.resume(thread);
10 -- Running the coroutine failed, which means we have to find the runner manually,
11 -- in order to inform the error handler
13 while debug.getinfo(thread, level, "") do level = level + 1; end
14 ok, runner = debug.getlocal(thread, level-1, 1);
15 local error_handler = runner.watchers.error;
16 if error_handler then error_handler(runner, debug.traceback(thread, state)); end
17 elseif state == "ready" then
18 -- If state is 'ready', it is our responsibility to update runner.state from 'waiting'.
19 -- We also have to :run(), because the queue might have further items that will not be
20 -- processed otherwise. FIXME: It's probably best to do this in a nexttick (0 timer).
21 runner.state = "ready";
27 local function waiter(num)
28 local thread = coroutine.running();
30 error("Not running in an async context, see https://prosody.im/doc/developers/async");
35 if num == 0 then return; end -- already done
37 coroutine.yield("wait");
40 if num == 0 and waiting then
41 runner_continue(thread);
43 error("done() called too many times");
48 local function guarder()
50 return function (id, func)
51 local thread = coroutine.running();
53 error("Not running in an async context, see https://prosody.im/doc/developers/async");
55 local guard = guards[id];
59 log("debug", "New guard!");
61 table.insert(guard, thread);
62 log("debug", "Guarded. %d threads waiting.", #guard)
63 coroutine.yield("wait");
66 local next_waiting = table.remove(guard, 1);
68 log("debug", "guard: Executing next waiting thread (%d left)", #guard)
69 runner_continue(next_waiting);
71 log("debug", "Guard off duty.")
85 runner_mt.__index = runner_mt;
87 local function runner_create_thread(func, self)
88 local thread = coroutine.create(function (self)
90 func(coroutine.yield("ready", self));
93 assert(coroutine.resume(thread, self)); -- Start it up, it will return instantly to wait for the first input
97 local empty_watchers = {};
98 local function runner(func, watchers, data)
99 return setmetatable({ func = func, thread = false, state = "ready", notified_state = "ready",
100 queue = {}, watchers = watchers or empty_watchers, data = data }
104 -- Add a task item for the runner to process
105 function runner_mt:run(input)
107 table.insert(self.queue, input);
109 if self.state ~= "ready" then
110 -- The runner is busy. Indicate that the task item has been
111 -- queued, and return information about the current runner state
112 return true, self.state, #self.queue;
115 local q, thread = self.queue, self.thread;
116 if not thread or coroutine.status(thread) == "dead" then
117 -- Create a new coroutine for this runner
118 thread = runner_create_thread(self.func, self);
119 self.thread = thread;
122 -- Process task item(s) while the queue is not empty, and we're not blocked
123 local n, state, err = #q, self.state, nil;
124 self.state = "running";
125 while n > 0 and state == "ready" do
127 -- Loop through queue items, and attempt to run them
130 local ok, new_state = coroutine.resume(thread, input);
132 -- There was an error running the coroutine, save the error, mark runner as ready to begin again
133 consumed, state, err = i, "ready", debug.traceback(thread, new_state);
136 elseif new_state == "wait" then
137 -- Runner is blocked on waiting for a task item to complete
138 consumed, state = i, "waiting";
142 -- Loop ended - either queue empty because all tasks passed without blocking (consumed == nil)
143 -- or runner is blocked/errored, and consumed will contain the number of tasks processed so far
144 if not consumed then consumed = n; end
145 -- Remove consumed items from the queue array
146 if q[n+1] ~= nil then
150 q[i] = q[consumed+i];
154 -- Runner processed all items it can, so save current runner state
156 if err or state ~= self.notified_state then
160 self.notified_state = state;
162 local handler = self.watchers[state];
163 if handler then handler(self, err); end
165 return true, state, n;
168 -- Add a task item to the queue without invoking the runner, even if it is idle
169 function runner_mt:enqueue(input)
170 table.insert(self.queue, input);
173 return { waiter = waiter, guarder = guarder, runner = runner };