ソースを参照

Observable.scan;

bjorn 10 年 前
コミット
fc63946173
6 ファイル変更122 行追加0 行削除
  1. 12 0
      doc/README.md
  2. 36 0
      rx.lua
  3. 36 0
      src/operators/scan.lua
  4. 1 0
      tests/observable.lua
  5. 36 0
      tests/scan.lua
  6. 1 0
      tools/concat.lua

+ 12 - 0
doc/README.md

@@ -53,6 +53,7 @@ RxLua
   - [pluck](#pluckkeys)
   - [reduce](#reduceaccumulator-seed)
   - [reject](#rejectpredicate)
+  - [scan](#scanaccumulator-seed)
   - [skip](#skipn)
   - [skipUntil](#skipuntilother)
   - [skipWhile](#skipwhilepredicate)
@@ -541,6 +542,17 @@ Returns a new Observable that produces values from the original which do not sat
 
 ---
 
+#### `:scan(accumulator, seed)`
+
+Returns a new Observable that produces values computed by accumulating the results of running a function on each value produced by the original Observable.
+
+| Name | Type | Default | Description |
+|------|------|---------|-------------|
+| `accumulator` | function |  | Accumulates the values of the original Observable. Will be passed the return value of the last call as the first argument and the current values as the rest of the arguments.  Each value returned from this function will be emitted by the Observable. |
+| `seed` | * |  | A value to pass to the accumulator the first time it is run. |
+
+---
+
 #### `:skip(n)`
 
 Returns a new Observable that skips over a specified number of values produced by the original and produces the rest.

+ 36 - 0
rx.lua

@@ -1052,6 +1052,41 @@ function Observable:reject(predicate)
   end)
 end
 
+--- Returns a new Observable that produces values computed by accumulating the results of running a
+-- function on each value produced by the original Observable.
+-- @arg {function} accumulator - Accumulates the values of the original Observable. Will be passed
+--                               the return value of the last call as the first argument and the
+--                               current values as the rest of the arguments.  Each value returned
+--                               from this function will be emitted by the Observable.
+-- @arg {*} seed - A value to pass to the accumulator the first time it is run.
+-- @returns {Observable}
+function Observable:scan(accumulator, seed)
+  return Observable.create(function(observer)
+    local result = seed
+    local first = true
+
+    local function onNext(...)
+      if first and seed == nil then
+        result = ...
+        first = false
+      else
+        result = accumulator(result, ...)
+        observer:onNext(result)
+      end
+    end
+
+    local function onError(e)
+      return observer:onError(e)
+    end
+
+    local function onCompleted()
+      return observer:onCompleted()
+    end
+
+    return self:subscribe(onNext, onError, onCompleted)
+  end)
+end
+
 --- Returns a new Observable that skips over a specified number of values produced by the original
 -- and produces the rest.
 -- @arg {number=1} n - The number of values to ignore.
@@ -1600,6 +1635,7 @@ function BehaviorSubject:getValue()
 end
 
 Observable.wrap = Observable.buffer
+Observable['repeat'] = Observable.replicate
 
 return {
   util = util,

+ 36 - 0
src/operators/scan.lua

@@ -0,0 +1,36 @@
+local Observable = require 'observable'
+
+--- Returns a new Observable that produces values computed by accumulating the results of running a
+-- function on each value produced by the original Observable.
+-- @arg {function} accumulator - Accumulates the values of the original Observable. Will be passed
+--                               the return value of the last call as the first argument and the
+--                               current values as the rest of the arguments.  Each value returned
+--                               from this function will be emitted by the Observable.
+-- @arg {*} seed - A value to pass to the accumulator the first time it is run.
+-- @returns {Observable}
+function Observable:scan(accumulator, seed)
+  return Observable.create(function(observer)
+    local result = seed
+    local first = true
+
+    local function onNext(...)
+      if first and seed == nil then
+        result = ...
+        first = false
+      else
+        result = accumulator(result, ...)
+        observer:onNext(result)
+      end
+    end
+
+    local function onError(e)
+      return observer:onError(e)
+    end
+
+    local function onCompleted()
+      return observer:onCompleted()
+    end
+
+    return self:subscribe(onNext, onError, onCompleted)
+  end)
+end

+ 1 - 0
tests/observable.lua

@@ -259,6 +259,7 @@ describe('Observable', function()
   dofile('tests/pluck.lua')
   dofile('tests/reduce.lua')
   dofile('tests/reject.lua')
+  dofile('tests/scan.lua')
   dofile('tests/skip.lua')
   dofile('tests/skipUntil.lua')
   dofile('tests/skipWhile.lua')

+ 36 - 0
tests/scan.lua

@@ -0,0 +1,36 @@
+describe('scan', function()
+  it('fails if the first argument is not a function', function()
+    local observable = Rx.Observable.fromValue(0)
+    expect(observable:scan()).to.fail()
+    expect(observable:scan(1)).to.fail()
+    expect(observable:scan('')).to.fail()
+    expect(observable:scan({})).to.fail()
+    expect(observable:scan(true)).to.fail()
+  end)
+
+  it('uses the seed as the initial value to the accumulator', function()
+    local accumulator = spy()
+    Rx.Observable.fromValue(3):scan(accumulator, 4):subscribe()
+    expect(accumulator[1]).to.equal({4, 3})
+  end)
+
+  it('waits for 2 values before accumulating if the seed is nil', function()
+    local accumulator = spy(function(x, y) return x * y end)
+    local observable = Rx.Observable.fromTable({2, 4, 6}, ipairs):scan(accumulator)
+    expect(observable).to.produce(8, 48)
+    expect(accumulator).to.equal({{2, 4}, {8, 6}})
+  end)
+
+  it('uses the return value of the accumulator as the next input to the accumulator', function()
+    local accumulator = spy(function(x, y) return x + y end)
+    local observable = Rx.Observable.fromTable({1, 2, 3}, ipairs):scan(accumulator, 0)
+    expect(observable).to.produce(1, 3, 6)
+    expect(accumulator).to.equal({{0, 1}, {1, 2}, {3, 3}})
+  end)
+
+  it('passes all produced values to the accumulator', function()
+    local accumulator = spy(function() return 0 end)
+    local observable = Rx.Observable.fromTable({2, 3, 4}, ipairs, true):scan(accumulator, 0):subscribe()
+    expect(accumulator).to.equal({{0, 2, 1}, {0, 3, 2}, {0, 4, 3}})
+  end)
+end)

+ 1 - 0
tools/concat.lua

@@ -37,6 +37,7 @@ local files = {
   'src/operators/pluck.lua',
   'src/operators/reduce.lua',
   'src/operators/reject.lua',
+  'src/operators/scan.lua',
   'src/operators/skip.lua',
   'src/operators/skipUntil.lua',
   'src/operators/skipWhile.lua',