|
@@ -250,7 +250,55 @@ function Observable:combineLatest(...)
|
|
end)
|
|
end)
|
|
end
|
|
end
|
|
|
|
|
|
|
|
+----
|
|
|
|
+-- Scheduler
|
|
|
|
+-- Schedulers manage groups of Observables.
|
|
|
|
+local Scheduler = {}
|
|
|
|
+
|
|
|
|
+----
|
|
|
|
+-- Cooperative Scheduler
|
|
|
|
+-- Manages Observables using coroutines and a virtual clock that must be updated manually.
|
|
|
|
+local Cooperative = {}
|
|
|
|
+Cooperative.__index = Cooperative
|
|
|
|
+
|
|
|
|
+--
|
|
|
|
+Cooperative.create = function(currentTime)
|
|
|
|
+ local self = {
|
|
|
|
+ tasks = {},
|
|
|
|
+ currentTime = currentTime or 0
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ return setmetatable(self, Cooperative)
|
|
|
|
+end
|
|
|
|
+
|
|
|
|
+--
|
|
|
|
+function Cooperative:schedule(action, delay)
|
|
|
|
+ table.insert(self.tasks, {
|
|
|
|
+ thread = coroutine.create(action),
|
|
|
|
+ due = self.currentTime + (delay or 0)
|
|
|
|
+ })
|
|
|
|
+end
|
|
|
|
+
|
|
|
|
+--
|
|
|
|
+function Cooperative:update(dt)
|
|
|
|
+ self.currentTime = self.currentTime + dt
|
|
|
|
+ for i = #self.tasks, 1, -1 do
|
|
|
|
+ local task = self.tasks[i]
|
|
|
|
+ if self.currentTime >= task.due then
|
|
|
|
+ local _, delay = coroutine.resume(task.thread)
|
|
|
|
+ task.due = math.max(task.due + (delay or 0), self.currentTime)
|
|
|
|
+ if coroutine.status(task.thread) == 'dead' then
|
|
|
|
+ table.remove(self.tasks, i)
|
|
|
|
+ end
|
|
|
|
+ end
|
|
|
|
+ end
|
|
|
|
+end
|
|
|
|
+
|
|
|
|
+Scheduler.Cooperative = Cooperative
|
|
|
|
+
|
|
return {
|
|
return {
|
|
Observer = Observer,
|
|
Observer = Observer,
|
|
- Observable = Observable
|
|
|
|
|
|
+ Observable = Observable,
|
|
|
|
+ Scheduler = Scheduler,
|
|
|
|
+ scheduler = Scheduler.Cooperative.create()
|
|
}
|
|
}
|