Ver Fonte

Use scheduler in Observable.fromCoroutine; Cooperative.isEmpty;

bjorn há 10 anos atrás
pai
commit
322171c216
4 ficheiros alterados com 79 adições e 47 exclusões
  1. 12 5
      examples/coroutine.lua
  2. 0 31
      examples/delay.lua
  3. 31 0
      examples/scheduler.lua
  4. 36 11
      rx.lua

+ 12 - 5
examples/coroutine.lua

@@ -1,9 +1,16 @@
 local Rx = require 'rx'
 
--- Observable created from a coroutine that produces two values.
-local cr = coroutine.create(function()
-  coroutine.yield('hello')
-  return 'world'
+-- Observable created from a coroutine.
+local cheer = coroutine.create(function()
+  for i = 2, 8, 2 do
+    coroutine.yield(i)
+  end
+
+  return 'who do we appreciate'
 end)
 
-Rx.Observable.fromCoroutine(cr):dump('coroutine')
+Rx.Observable.fromCoroutine(cheer):dump('cheer')
+
+repeat
+  Rx.scheduler:update()
+until Rx.scheduler:isEmpty()

+ 0 - 31
examples/delay.lua

@@ -1,31 +0,0 @@
-local rx = require 'rx'
-
-local timerResolution = .25
-local function log(message)
-  print('[' .. string.format('%.2f', rx.scheduler.currentTime) .. '] ' .. message)
-end
-
-rx.scheduler:schedule(function()
-  log('this is like a setTimeout')
-end, 2)
-
-rx.scheduler:schedule(function()
-  local i = 1
-  while true do
-    log('this prints i twice per second: ' .. i)
-    i = i + 1
-    coroutine.yield(.5)
-  end
-end)
-
-rx.scheduler:schedule(function()
-  for i = 1, 3 do
-    log('this will print for 3 updates after 1 second')
-    coroutine.yield()
-  end
-end, 1)
-
-repeat
-  rx.scheduler:update(timerResolution)
-  os.execute('sleep ' .. timerResolution)
-until rx.scheduler.currentTime >= 3

+ 31 - 0
examples/scheduler.lua

@@ -0,0 +1,31 @@
+local Rx = require 'rx'
+local timerResolution = .25
+local function log(message)
+  print('[' .. string.format('%.2f', Rx.scheduler.currentTime) .. '] ' .. message)
+end
+
+-- Demonstrate Rx.Scheduler.Cooperative by running some simultaneous cooperative threads.
+Rx.scheduler:schedule(function()
+  log('this is like a setTimeout')
+end, 2)
+
+Rx.scheduler:schedule(function()
+  local i = 1
+  while true do
+    log('this prints i twice per second: ' .. i)
+    i = i + 1
+    coroutine.yield(.5)
+  end
+end)
+
+Rx.scheduler:schedule(function()
+  for i = 1, 3 do
+    log('this will print for 3 updates after 1 second')
+    coroutine.yield()
+  end
+end, 1)
+
+-- Simulate 3 virtual seconds.
+repeat
+  Rx.scheduler:update(timerResolution)
+until Rx.scheduler.currentTime >= 3

+ 36 - 11
rx.lua

@@ -1,3 +1,5 @@
+local rx
+
 local function noop() end
 
 ----
@@ -79,13 +81,23 @@ end
 -- @returns {Observable}
 function Observable.fromCoroutine(cr)
   return Observable.create(function(observer)
-    while true do
-      local success, value = coroutine.resume(cr)
-      observer:onNext(value)
-      if coroutine.status(cr) == 'dead' then break end
-    end
+    return rx.scheduler:schedule(function()
+      while not observer.stopped do
+        local success, value = coroutine.resume(cr)
 
-    observer:onComplete()
+        if success then
+          observer:onNext(value)
+        else
+          return observer:onError(value)
+        end
+
+        if coroutine.status(cr) == 'dead' then
+          return observer:onComplete()
+        end
+
+        coroutine.yield()
+      end
+    end)
   end)
 end
 
@@ -102,7 +114,7 @@ end
 function Observable:dump(name)
   name = name and (name .. ' ') or ''
 
-  local onNext = function(x) print(name .. 'onNext: ' .. x) end
+  local onNext = function(x) print(name .. 'onNext: ' .. (x or '')) end
   local onError = function(e) print(name .. 'onError: ' .. e) end
   local onComplete = function() print(name .. 'onComplete') end
 
@@ -281,12 +293,18 @@ end
 
 --
 function Cooperative:update(dt)
-  self.currentTime = self.currentTime + dt
+  self.currentTime = self.currentTime + (dt or 0)
   for i = #self.tasks, 1, -1 do
     local task = self.tasks[i]
     if self.currentTime >= task.due then
-      local _, delay = coroutine.resume(task.thread)
-      task.due = math.max(task.due + (delay or 0), self.currentTime)
+      local success, delay = coroutine.resume(task.thread)
+
+      if success then
+        task.due = math.max(task.due + (delay or 0), self.currentTime)
+      else
+        error(delay)
+      end
+
       if coroutine.status(task.thread) == 'dead' then
         table.remove(self.tasks, i)
       end
@@ -294,11 +312,18 @@ function Cooperative:update(dt)
   end
 end
 
+--
+function Cooperative:isEmpty()
+  return not next(self.tasks)
+end
+
 Scheduler.Cooperative = Cooperative
 
-return {
+rx = {
   Observer = Observer,
   Observable = Observable,
   Scheduler = Scheduler,
   scheduler = Scheduler.Cooperative.create()
 }
+
+return rx