Browse Source

Add Observable:changes;

bjorn 10 years ago
parent
commit
fcdbe09a92
2 changed files with 47 additions and 0 deletions
  1. 15 0
      doc/README.md
  2. 32 0
      rx.lua

+ 15 - 0
doc/README.md

@@ -11,6 +11,7 @@
   - [fromCoroutine](#fromcoroutinecoroutine)
   - [subscribe](#subscribeonnext-onerror-oncomplete)
   - [dump](#dumpname)
+  - [changes](#changescomparator)
   - [combineLatest](#combinelatestobservables-combinator)
   - [compact](#compact)
   - [concat](#concatsources)
@@ -196,6 +197,20 @@ Arguments:
 
 ---
 
+#### `:changes(comparator)`
+
+Returns an Observable that only produces values from the original if they are different from the previous value.
+
+Arguments:
+
+- `comparator` (`function`) - A function used to compare 2 values. If unspecified, == is used.
+
+Returns:
+
+- `Observable`
+
+---
+
 #### `:combineLatest(observables, combinator)`
 
 Returns a new Observable that runs a combinator function on the most recent values from a set of Observables whenever any of them produce a new value. The results of the combinator function are produced by the new Observable.

+ 32 - 0
rx.lua

@@ -2,6 +2,7 @@ local rx
 
 local pack = table.pack or function(...) return {...} end
 local unpack = table.unpack or unpack
+local function eq(x, y) return x == y end
 local function noop() end
 local function identity(x) return x end
 
@@ -163,6 +164,37 @@ end
 -- The functions below transform the values produced by an Observable and return a new Observable
 -- that produces these values.
 
+--- Returns an Observable that only produces values from the original if they are different from
+-- the previous value.
+-- @arg {function} comparator - A function used to compare 2 values. If unspecified, == is used.
+-- @returns {Observable}
+function Observable:changes(comparator)
+  comparator = comparator or eq
+
+  return Observable.create(function(observer)
+    local first = true
+    local currentValue = nil
+
+    local function onNext(value)
+      if first or not comparator(value, currentValue) then
+        observer:onNext(value)
+        currentValue = value
+        first = false
+      end
+    end
+
+    local function onError(message)
+      return observer:onError(onError)
+    end
+
+    local function onComplete()
+      return observer:onComplete()
+    end
+
+    return self:subscribe(onNext, onError, onComplete)
+  end)
+end
+
 --- Returns a new Observable that runs a combinator function on the most recent values from a set
 -- of Observables whenever any of them produce a new value. The results of the combinator function
 -- are produced by the new Observable.