X-Git-Url: https://git.enpas.org/?a=blobdiff_plain;f=util%2Fasync.lua;h=76da45fa962f3027dfd965011b6f55e9e876dd70;hb=fa497488efd21d69174779defe57c8a1279004c8;hp=968ec80495489b51b66a8e1e1ba6f8c201b7e14d;hpb=6ac3f1382a0b99e13f28bb68f58650bdca777d35;p=prosody.git diff --git a/util/async.lua b/util/async.lua index 968ec804..76da45fa 100644 --- a/util/async.lua +++ b/util/async.lua @@ -7,6 +7,8 @@ local function runner_continue(thread) end local ok, state, runner = coroutine.resume(thread); if not ok then + -- Running the coroutine failed, which means we have to find the runner manually, + -- in order to inform the error handler local level = 0; while debug.getinfo(thread, level, "") do level = level + 1; end ok, runner = debug.getlocal(thread, level-1, 1); @@ -25,7 +27,7 @@ end local function waiter(num) local thread = coroutine.running(); if not thread then - error("Not running in an async context, see http://prosody.im/doc/developers/async"); + error("Not running in an async context, see https://prosody.im/doc/developers/async"); end num = num or 1; local waiting; @@ -48,7 +50,7 @@ local function guarder() return function (id, func) local thread = coroutine.running(); if not thread then - error("Not running in an async context, see http://prosody.im/doc/developers/async"); + error("Not running in an async context, see https://prosody.im/doc/developers/async"); end local guard = guards[id]; if not guard then @@ -99,37 +101,48 @@ local function runner(func, watchers, data) , runner_mt); end +-- Add a task item for the runner to process function runner_mt:run(input) if input ~= nil then table.insert(self.queue, input); end if self.state ~= "ready" then + -- The runner is busy. Indicate that the task item has been + -- queued, and return information about the current runner state return true, self.state, #self.queue; end local q, thread = self.queue, self.thread; if not thread or coroutine.status(thread) == "dead" then + -- Create a new coroutine for this runner thread = runner_create_thread(self.func, self); self.thread = thread; end + -- Process task item(s) while the queue is not empty, and we're not blocked local n, state, err = #q, self.state, nil; self.state = "running"; while n > 0 and state == "ready" do local consumed; + -- Loop through queue items, and attempt to run them for i = 1,n do local input = q[i]; local ok, new_state = coroutine.resume(thread, input); if not ok then + -- There was an error running the coroutine, save the error, mark runner as ready to begin again consumed, state, err = i, "ready", debug.traceback(thread, new_state); self.thread = nil; break; elseif new_state == "wait" then + -- Runner is blocked on waiting for a task item to complete consumed, state = i, "waiting"; break; end end + -- Loop ended - either queue empty because all tasks passed without blocking (consumed == nil) + -- or runner is blocked/errored, and consumed will contain the number of tasks processed so far if not consumed then consumed = n; end + -- Remove consumed items from the queue array if q[n+1] ~= nil then n = #q; end @@ -138,6 +151,7 @@ function runner_mt:run(input) end n = #q; end + -- Runner processed all items it can, so save current runner state self.state = state; if err or state ~= self.notified_state then if err then @@ -151,6 +165,7 @@ function runner_mt:run(input) return true, state, n; end +-- Add a task item to the queue without invoking the runner, even if it is idle function runner_mt:enqueue(input) table.insert(self.queue, input); end