Browse Source

Style updates;

bjorn 10 years ago
parent
commit
f8b472d0de
1 changed files with 80 additions and 63 deletions
  1. 80 63
      rx.lua

+ 80 - 63
rx.lua

@@ -1,6 +1,7 @@
 local rx
 
 local function noop() end
+local function identity(x) return x end
 
 ----
 -- Observer
@@ -130,16 +131,20 @@ end
 -- @returns {Observable}
 function Observable:first()
   return Observable.create(function(observer)
-    return self:subscribe(function(x)
+    local function onNext(x)
       observer:onNext(x)
       observer:onComplete()
-    end,
-    function(e)
+    end
+
+    local function onError(e)
       observer:onError(e)
-    end,
-    function()
+    end
+
+    local function onComplete()
       observer:onComplete()
-    end)
+    end
+
+    return self:subscribe(onNext, onError, onComplete)
   end)
 end
 
@@ -148,34 +153,44 @@ end
 function Observable:last()
   return Observable.create(function(observer)
     local value
-    return self:subscribe(function(x)
+
+    local function onNext(x)
       value = x
-    end,
-    function(e)
+    end
+
+    local function onError(e)
       observer:onError(e)
-    end,
-    function()
+    end
+
+    local function onComplete()
       observer:onNext(value)
       observer:onComplete()
-    end)
+    end
+
+    return self:subscribe(onNext, onError, onComplete)
   end)
 end
 
 -- Returns a new Observable that produces the values of the original transformed by a function.
 -- @arg {function} callback - The function to transform values from the original Observable.
 -- @returns {Observable}
-function Observable:map(fn)
-  fn = fn or function(x) return x end
+function Observable:map(callback)
   return Observable.create(function(observer)
-    return self:subscribe(function(x)
-      observer:onNext(fn(x))
-    end,
-    function(e)
+    callback = callback or identity
+
+    local function onNext(x)
+      return observer:onNext(callback(x))
+    end
+
+    local function onError(e)
       observer:onError(e)
-    end,
-    function()
+    end
+
+    local function onComplete()
       observer:onComplete()
-    end)
+    end
+
+    return self:subscribe(onNext, onError, onComplete)
   end)
 end
 
@@ -188,17 +203,23 @@ end
 -- @returns {Observable}
 function Observable:reduce(accumulator, seed)
   return Observable.create(function(observer)
-    local currentValue = seed
-    return self:subscribe(function(x)
-      currentValue = accumulator(currentValue, x)
-    end,
-    function(e)
+    local result
+
+    local function onNext(x)
+      result = result or seed or x
+      result = accumulator(result, x)
+    end
+
+    local function onError(e)
       observer:onError(e)
-    end,
-    function()
-      observer:onNext(currentValue)
+    end
+
+    local function onComplete()
+      observer:onNext(result)
       observer:onComplete()
-    end)
+    end
+
+    return self:subscribe(onNext, onError, onComplete)
   end)
 end
 
@@ -216,48 +237,42 @@ end
 --                              returns a single value.
 -- @returns {Observable}
 function Observable:combineLatest(...)
-  local values = {}
-  local done = {}
-  local targets = {...}
-  local fn = table.remove(targets)
-  table.insert(targets, 1, self)
+  local sources = {...}
+  local combinator = table.remove(sources)
+  table.insert(sources, 1, self)
 
   return Observable.create(function(observer)
-    local function handleNext(k, v)
-      values[k] = v
-      local full = true
-      for i = 1, #targets do
-        if not values[i] then full = false break end
-      end
+    local latest = {}
+    local pending = {unpack(sources)}
+    local completed = {}
+
+    local function onNext(i)
+      return function(value)
+        latest[i] = value
+        pending[i] = nil
 
-      if full then
-        observer:onNext(fn(unpack(values)))
+        if not next(pending) then
+          observer:onNext(combinator(unpack(latest)))
+        end
       end
     end
 
-    local function handleCompleted(k)
-      done[k] = true
-      local stop = true
-      for i = 1, #targets do
-        if not done[i] then stop = false break end
-      end
+    local function onError(e)
+      observer:onError(e)
+    end
+
+    local function onComplete(i)
+      return function()
+        table.insert(completed, i)
 
-      if stop then
-        observer:onComplete()
+        if #completed == #sources then
+          observer:onComplete()
+        end
       end
     end
 
-    for i = 1, #targets do
-      targets[i]:subscribe(function(x)
-        values[i] = x
-        handleNext(i, x)
-      end,
-      function(e)
-        observer:onError(e)
-      end,
-      function()
-        handleCompleted(i)
-      end)
+    for i = 1, #sources do
+      sources[i]:subscribe(onNext(i), onError, onComplete(i))
     end
   end)
 end
@@ -274,7 +289,7 @@ local Cooperative = {}
 Cooperative.__index = Cooperative
 
 --
-Cooperative.create = function(currentTime)
+function Cooperative.create(currentTime)
   local self = {
     tasks = {},
     currentTime = currentTime or 0
@@ -294,8 +309,10 @@ end
 --
 function Cooperative:update(dt)
   self.currentTime = self.currentTime + (dt or 0)
+
   for i = #self.tasks, 1, -1 do
     local task = self.tasks[i]
+
     if self.currentTime >= task.due then
       local success, delay = coroutine.resume(task.thread)