瀏覽代碼

Add Observable.pluck; TCO;

bjorn 10 年之前
父節點
當前提交
b6aa6deed8
共有 1 個文件被更改,包括 39 次插入23 次删除
  1. 39 23
      rx.lua

+ 39 - 23
rx.lua

@@ -130,15 +130,15 @@ function Observable:first()
   return Observable.create(function(observer)
   return Observable.create(function(observer)
     local function onNext(x)
     local function onNext(x)
       observer:onNext(x)
       observer:onNext(x)
-      observer:onComplete()
+      return observer:onComplete()
     end
     end
 
 
     local function onError(e)
     local function onError(e)
-      observer:onError(e)
+      return observer:onError(e)
     end
     end
 
 
     local function onComplete()
     local function onComplete()
-      observer:onComplete()
+      return observer:onComplete()
     end
     end
 
 
     return self:subscribe(onNext, onError, onComplete)
     return self:subscribe(onNext, onError, onComplete)
@@ -156,12 +156,12 @@ function Observable:last()
     end
     end
 
 
     local function onError(e)
     local function onError(e)
-      observer:onError(e)
+      return observer:onError(e)
     end
     end
 
 
     local function onComplete()
     local function onComplete()
       observer:onNext(value)
       observer:onNext(value)
-      observer:onComplete()
+      return observer:onComplete()
     end
     end
 
 
     return self:subscribe(onNext, onError, onComplete)
     return self:subscribe(onNext, onError, onComplete)
@@ -180,11 +180,11 @@ function Observable:map(callback)
     end
     end
 
 
     local function onError(e)
     local function onError(e)
-      observer:onError(e)
+      return observer:onError(e)
     end
     end
 
 
     local function onComplete()
     local function onComplete()
-      observer:onComplete()
+      return observer:onComplete()
     end
     end
 
 
     return self:subscribe(onNext, onError, onComplete)
     return self:subscribe(onNext, onError, onComplete)
@@ -208,12 +208,12 @@ function Observable:reduce(accumulator, seed)
     end
     end
 
 
     local function onError(e)
     local function onError(e)
-      observer:onError(e)
+      return observer:onError(e)
     end
     end
 
 
     local function onComplete()
     local function onComplete()
       observer:onNext(result)
       observer:onNext(result)
-      observer:onComplete()
+      return observer:onComplete()
     end
     end
 
 
     return self:subscribe(onNext, onError, onComplete)
     return self:subscribe(onNext, onError, onComplete)
@@ -256,7 +256,7 @@ function Observable:combineLatest(...)
     end
     end
 
 
     local function onError(e)
     local function onError(e)
-      observer:onError(e)
+      return observer:onError(e)
     end
     end
 
 
     local function onComplete(i)
     local function onComplete(i)
@@ -290,11 +290,11 @@ function Observable:distinct()
     end
     end
 
 
     local function onError(e)
     local function onError(e)
-      observer:onError(e)
+      return observer:onError(e)
     end
     end
 
 
     local function onComplete()
     local function onComplete()
-      observer:onComplete()
+      return observer:onComplete()
     end
     end
 
 
     return self:subscribe(onNext, onError, onComplete)
     return self:subscribe(onNext, onError, onComplete)
@@ -307,22 +307,18 @@ end
 function Observable:takeUntil(other)
 function Observable:takeUntil(other)
   return Observable.create(function(observer)
   return Observable.create(function(observer)
     local function onNext(x)
     local function onNext(x)
-      observer:onNext(x)
+      return observer:onNext(x)
     end
     end
 
 
     local function onError(e)
     local function onError(e)
-      observer:onError(e)
+      return observer:onError(e)
     end
     end
 
 
     local function onComplete()
     local function onComplete()
-      observer:onComplete()
-    end
-
-    local terminate = function()
-      observer:onComplete()
+      return observer:onComplete()
     end
     end
 
 
-    other:subscribe(terminate, terminate, terminate)
+    other:subscribe(onComplete, onComplete, onComplete)
 
 
     return self:subscribe(onNext, onError, onComplete)
     return self:subscribe(onNext, onError, onComplete)
   end)
   end)
@@ -337,22 +333,42 @@ function Observable:filter(predicate)
   return Observable.create(function(observer)
   return Observable.create(function(observer)
     local function onNext(x)
     local function onNext(x)
       if predicate(x) then
       if predicate(x) then
-        observer:onNext(x)
+        return observer:onNext(x)
       end
       end
     end
     end
 
 
     local function onError(e)
     local function onError(e)
-      observer:onError(e)
+      return observer:onError(e)
     end
     end
 
 
     local function onComplete()
     local function onComplete()
-      observer:onComplete(e)
+      return observer:onComplete(e)
     end
     end
 
 
     return self:subscribe(onNext, onError, onComplete)
     return self:subscribe(onNext, onError, onComplete)
   end)
   end)
 end
 end
 
 
+--- Returns a new Observable that produces values computed by extracting the given key from the
+-- tables produced by the original.
+-- @arg {function} key - The key to extract from the table.
+-- @returns {Observable}
+function Observable:pluck(key)
+  return Observable.create(function(observer)
+    local function onNext(t)
+      return observer:onNext(t[key])
+    end
+
+    local function onError(e)
+      return observer:onError(e)
+    end
+
+    local function onComplete()
+      return observer:onComplete()
+    end
+  end)
+end
+
 --- @class Scheduler
 --- @class Scheduler
 -- @description Schedulers manage groups of Observables.
 -- @description Schedulers manage groups of Observables.
 local Scheduler = {}
 local Scheduler = {}