bjorn 9 years ago
parent
commit
47225756b9
1 changed files with 43 additions and 17 deletions
  1. 43 17
      rx.lua

+ 43 - 17
rx.lua

@@ -202,30 +202,25 @@ function Observable:dump(name, formatter)
   return self:subscribe(onNext, onError, onComplete)
 end
 
---- Returns an Observable that only produces values from the original if they are different from
--- the previous value.
--- @arg {function} comparator - A function used to compare 2 values. If unspecified, == is used.
--- @returns {Observable}
-function Observable:distinctUntilChanged(comparator)
-  comparator = comparator or util.eq
+--- Determine whether all items emitted by an Observable meet some criteria.
+-- @arg {function=identity} predicate - The predicate used to evaluate objects.
+function Observable:all(predicate)
+  predicate = predicate or util.identity
 
   return Observable.create(function(observer)
-    local first = true
-    local currentValue = nil
-
-    local function onNext(value, ...)
-      if first or not comparator(value, currentValue) then
-        observer:onNext(value, ...)
-        currentValue = value
-        first = false
+    local function onNext(...)
+      if not predicate(...) then
+        observer:onNext(false)
+        observer:onComplete()
       end
     end
 
-    local function onError(message)
-      return observer:onError(onError)
+    local function onError(e)
+      return observer:onError(e)
     end
 
     local function onComplete()
+      observer:onNext(true)
       return observer:onComplete()
     end
 
@@ -240,7 +235,7 @@ end
 -- @arg {function} combinator - A function that combines the latest result from each Observable and
 --                              returns a single value.
 -- @returns {Observable}
-function Observable:combine(...)
+function Observable:combineLatest(...)
   local sources = {...}
   local combinator = table.remove(sources)
   if type(combinator) ~= 'function' then
@@ -347,6 +342,37 @@ function Observable:distinct()
   end)
 end
 
+--- Returns an Observable that only produces values from the original if they are different from
+-- the previous value.
+-- @arg {function} comparator - A function used to compare 2 values. If unspecified, == is used.
+-- @returns {Observable}
+function Observable:distinctUntilChanged(comparator)
+  comparator = comparator or util.eq
+
+  return Observable.create(function(observer)
+    local first = true
+    local currentValue = nil
+
+    local function onNext(value, ...)
+      if first or not comparator(value, currentValue) then
+        observer:onNext(value, ...)
+        currentValue = value
+        first = false
+      end
+    end
+
+    local function onError(message)
+      return observer:onError(onError)
+    end
+
+    local function onComplete()
+      return observer:onComplete()
+    end
+
+    return self:subscribe(onNext, onError, onComplete)
+  end)
+end
+
 --- Returns a new Observable that only produces values of the first that satisfy a predicate.
 -- @arg {function} predicate - The predicate used to filter values.
 -- @returns {Observable}