Browse Source

Add Observable.filter;

bjorn 10 years ago
parent
commit
606b7c90c6
1 changed files with 25 additions and 1 deletions
  1. 25 1
      rx.lua

+ 25 - 1
rx.lua

@@ -306,7 +306,6 @@ end
 -- @returns {Observable}
 -- @returns {Observable}
 function Observable:takeUntil(other)
 function Observable:takeUntil(other)
   return Observable.create(function(observer)
   return Observable.create(function(observer)
-
     local function onNext(x)
     local function onNext(x)
       observer:onNext(x)
       observer:onNext(x)
     end
     end
@@ -329,6 +328,31 @@ function Observable:takeUntil(other)
   end)
   end)
 end
 end
 
 
+--- Returns a new Observable that only produces values of the first that satisfy a predicate.
+-- @arg {function} predicate - The predicate to filter values with.
+-- @returns {Observable}
+function Observable:filter(predicate)
+  predicate = predicate or identity
+
+  return Observable.create(function(observer)
+    local function onNext(x)
+      if predicate(x) then
+        observer:onNext(x)
+      end
+    end
+
+    local function onError(e)
+      observer:onError(e)
+    end
+
+    local function onComplete()
+      observer:onComplete(e)
+    end
+
+    return self:subscribe(onNext, onError, onComplete)
+  end)
+end
+
 --- @class Scheduler
 --- @class Scheduler
 -- @description Schedulers manage groups of Observables.
 -- @description Schedulers manage groups of Observables.
 local Scheduler = {}
 local Scheduler = {}