util.async: Add some more comments for clarity
[prosody.git] / util / async.lua
1 local log = require "util.logger".init("util.async");
2
3 local function runner_continue(thread)
4         -- ASSUMPTION: runner is in 'waiting' state (but we don't have the runner to know for sure)
5         if coroutine.status(thread) ~= "suspended" then -- This should suffice
6                 return false;
7         end
8         local ok, state, runner = coroutine.resume(thread);
9         if not ok then
10                 -- Running the coroutine failed, which means we have to find the runner manually,
11                 -- in order to inform the error handler
12                 local level = 0;
13                 while debug.getinfo(thread, level, "") do level = level + 1; end
14                 ok, runner = debug.getlocal(thread, level-1, 1);
15                 local error_handler = runner.watchers.error;
16                 if error_handler then error_handler(runner, debug.traceback(thread, state)); end
17         elseif state == "ready" then
18                 -- If state is 'ready', it is our responsibility to update runner.state from 'waiting'.
19                 -- We also have to :run(), because the queue might have further items that will not be
20                 -- processed otherwise. FIXME: It's probably best to do this in a nexttick (0 timer).
21                 runner.state = "ready";
22                 runner:run();
23         end
24         return true;
25 end
26
27 local function waiter(num)
28         local thread = coroutine.running();
29         if not thread then
30                 error("Not running in an async context, see https://prosody.im/doc/developers/async");
31         end
32         num = num or 1;
33         local waiting;
34         return function ()
35                 if num == 0 then return; end -- already done
36                 waiting = true;
37                 coroutine.yield("wait");
38         end, function ()
39                 num = num - 1;
40                 if num == 0 and waiting then
41                         runner_continue(thread);
42                 elseif num < 0 then
43                         error("done() called too many times");
44                 end
45         end;
46 end
47
48 local function guarder()
49         local guards = {};
50         return function (id, func)
51                 local thread = coroutine.running();
52                 if not thread then
53                         error("Not running in an async context, see https://prosody.im/doc/developers/async");
54                 end
55                 local guard = guards[id];
56                 if not guard then
57                         guard = {};
58                         guards[id] = guard;
59                         log("debug", "New guard!");
60                 else
61                         table.insert(guard, thread);
62                         log("debug", "Guarded. %d threads waiting.", #guard)
63                         coroutine.yield("wait");
64                 end
65                 local function exit()
66                         local next_waiting = table.remove(guard, 1);
67                         if next_waiting then
68                                 log("debug", "guard: Executing next waiting thread (%d left)", #guard)
69                                 runner_continue(next_waiting);
70                         else
71                                 log("debug", "Guard off duty.")
72                                 guards[id] = nil;
73                         end
74                 end
75                 if func then
76                         func();
77                         exit();
78                         return;
79                 end
80                 return exit;
81         end;
82 end
83
84 local runner_mt = {};
85 runner_mt.__index = runner_mt;
86
87 local function runner_create_thread(func, self)
88         local thread = coroutine.create(function (self)
89                 while true do
90                         func(coroutine.yield("ready", self));
91                 end
92         end);
93         assert(coroutine.resume(thread, self)); -- Start it up, it will return instantly to wait for the first input
94         return thread;
95 end
96
97 local empty_watchers = {};
98 local function runner(func, watchers, data)
99         return setmetatable({ func = func, thread = false, state = "ready", notified_state = "ready",
100                 queue = {}, watchers = watchers or empty_watchers, data = data }
101         , runner_mt);
102 end
103
104 -- Add a task item for the runner to process
105 function runner_mt:run(input)
106         if input ~= nil then
107                 table.insert(self.queue, input);
108         end
109         if self.state ~= "ready" then
110                 -- The runner is busy. Indicate that the task item has been
111                 -- queued, and return information about the current runner state
112                 return true, self.state, #self.queue;
113         end
114
115         local q, thread = self.queue, self.thread;
116         if not thread or coroutine.status(thread) == "dead" then
117                 -- Create a new coroutine for this runner
118                 thread = runner_create_thread(self.func, self);
119                 self.thread = thread;
120         end
121
122         -- Process task item(s) while the queue is not empty, and we're not blocked
123         local n, state, err = #q, self.state, nil;
124         self.state = "running";
125         while n > 0 and state == "ready" do
126                 local consumed;
127                 -- Loop through queue items, and attempt to run them
128                 for i = 1,n do
129                         local input = q[i];
130                         local ok, new_state = coroutine.resume(thread, input);
131                         if not ok then
132                                 -- There was an error running the coroutine, save the error, mark runner as ready to begin again
133                                 consumed, state, err = i, "ready", debug.traceback(thread, new_state);
134                                 self.thread = nil;
135                                 break;
136                         elseif new_state == "wait" then
137                                  -- Runner is blocked on waiting for a task item to complete
138                                 consumed, state = i, "waiting";
139                                 break;
140                         end
141                 end
142                 -- Loop ended - either queue empty because all tasks passed without blocking (consumed == nil)
143                 -- or runner is blocked/errored, and consumed will contain the number of tasks processed so far
144                 if not consumed then consumed = n; end
145                 -- Remove consumed items from the queue array
146                 if q[n+1] ~= nil then
147                         n = #q;
148                 end
149                 for i = 1, n do
150                         q[i] = q[consumed+i];
151                 end
152                 n = #q;
153         end
154         -- Runner processed all items it can, so save current runner state
155         self.state = state;
156         if err or state ~= self.notified_state then
157                 if err then
158                         state = "error"
159                 else
160                         self.notified_state = state;
161                 end
162                 local handler = self.watchers[state];
163                 if handler then handler(self, err); end
164         end
165         return true, state, n;
166 end
167
168 -- Add a task item to the queue without invoking the runner, even if it is idle
169 function runner_mt:enqueue(input)
170         table.insert(self.queue, input);
171 end
172
173 return { waiter = waiter, guarder = guarder, runner = runner };