Pārlūkot izejas kodu

Observable.retry;

bjorn 9 gadi atpakaļ
vecāks
revīzija
3f75b68580
6 mainītis faili ar 133 papildinājumiem un 0 dzēšanām
  1. 11 0
      doc/README.md
  2. 34 0
      rx.lua
  3. 35 0
      src/operators/retry.lua
  4. 1 0
      tests/observable.lua
  5. 51 0
      tests/retry.lua
  6. 1 0
      tools/concat.lua

+ 11 - 0
doc/README.md

@@ -53,6 +53,7 @@ RxLua
   - [pluck](#pluckkeys)
   - [reduce](#reduceaccumulator-seed)
   - [reject](#rejectpredicate)
+  - [retry](#retrycount)
   - [scan](#scanaccumulator-seed)
   - [skip](#skipn)
   - [skipUntil](#skipuntilother)
@@ -542,6 +543,16 @@ Returns a new Observable that produces values from the original which do not sat
 
 ---
 
+#### `:retry(count)`
+
+Returns an Observable that restarts in the event of an error.
+
+| Name | Type | Default | Description |
+|------|------|---------|-------------|
+| `count` | number (optional) |  | The maximum number of times to retry.  If left unspecified, an infinite number of retries will be attempted. |
+
+---
+
 #### `:scan(accumulator, seed)`
 
 Returns a new Observable that produces values computed by accumulating the results of running a function on each value produced by the original Observable.

+ 34 - 0
rx.lua

@@ -1052,6 +1052,40 @@ function Observable:reject(predicate)
   end)
 end
 
+--- Returns an Observable that restarts in the event of an error.
+-- @arg {number=} count - The maximum number of times to retry.  If left unspecified, an infinite
+--                        number of retries will be attempted.
+-- @returns {Observable}
+function Observable:retry(count)
+  return Observable.create(function(observer)
+    local subscription
+    local retries = 0
+
+    local function onNext(...)
+      return observer:onNext(...)
+    end
+
+    local function onCompleted()
+      return observer:onCompleted()
+    end
+
+    local function onError(message)
+      if subscription then
+        subscription:unsubscribe()
+      end
+
+      retries = retries + 1
+      if count and retries > count then
+        return observer:onError(message)
+      end
+
+      subscription = self:subscribe(onNext, onError, onCompleted)
+    end
+
+    return self:subscribe(onNext, onError, onCompleted)
+  end)
+end
+
 --- Returns a new Observable that produces values computed by accumulating the results of running a
 -- function on each value produced by the original Observable.
 -- @arg {function} accumulator - Accumulates the values of the original Observable. Will be passed

+ 35 - 0
src/operators/retry.lua

@@ -0,0 +1,35 @@
+local Observable = require 'observable'
+
+--- Returns an Observable that restarts in the event of an error.
+-- @arg {number=} count - The maximum number of times to retry.  If left unspecified, an infinite
+--                        number of retries will be attempted.
+-- @returns {Observable}
+function Observable:retry(count)
+  return Observable.create(function(observer)
+    local subscription
+    local retries = 0
+
+    local function onNext(...)
+      return observer:onNext(...)
+    end
+
+    local function onCompleted()
+      return observer:onCompleted()
+    end
+
+    local function onError(message)
+      if subscription then
+        subscription:unsubscribe()
+      end
+
+      retries = retries + 1
+      if count and retries > count then
+        return observer:onError(message)
+      end
+
+      subscription = self:subscribe(onNext, onError, onCompleted)
+    end
+
+    return self:subscribe(onNext, onError, onCompleted)
+  end)
+end

+ 1 - 0
tests/observable.lua

@@ -259,6 +259,7 @@ describe('Observable', function()
   dofile('tests/pluck.lua')
   dofile('tests/reduce.lua')
   dofile('tests/reject.lua')
+  dofile('tests/retry.lua')
   dofile('tests/scan.lua')
   dofile('tests/skip.lua')
   dofile('tests/skipUntil.lua')

+ 51 - 0
tests/retry.lua

@@ -0,0 +1,51 @@
+describe('retry', function()
+  it('produces values normally if no errors occur', function()
+    expect(Rx.Observable.fromRange(3):retry()).to.produce(1, 2, 3)
+  end)
+
+  local function createBadObservable(errorCount)
+    local i = 0
+    return Rx.Observable.create(function(observer)
+      i = i + 1
+      observer:onNext()
+      if i <= errorCount then
+        observer:onError('error')
+      else
+        observer:onCompleted()
+      end
+    end)
+  end
+
+  it('does not retry if the count is less than or equal to zero', function()
+    local onNext, onError, onCompleted
+    onNext, onError, onCompleted = observableSpy(createBadObservable(1):retry(0))
+    expect(#onNext).to.equal(1)
+    expect(#onError).to.equal(1)
+    expect(#onCompleted).to.equal(0)
+
+    onNext, onError, onCompleted = observableSpy(createBadObservable(1):retry(-1))
+    expect(#onNext).to.equal(1)
+    expect(#onError).to.equal(1)
+    expect(#onCompleted).to.equal(0)
+  end)
+
+  it('completes successfully if the number of errors is less than or equal to the number of retries', function()
+    local onNext, onError, onCompleted
+    onNext, onError, onCompleted = observableSpy(createBadObservable(1):retry(1))
+    expect(#onNext).to.equal(2)
+    expect(#onError).to.equal(0)
+    expect(#onCompleted).to.equal(1)
+
+    onNext, onError, onCompleted = observableSpy(createBadObservable(1):retry(2))
+    expect(#onNext).to.equal(2)
+    expect(#onError).to.equal(0)
+    expect(#onCompleted).to.equal(1)
+  end)
+
+  it('produces an error if the number of errors is greater than the number of retries', function()
+    local onNext, onError, onCompleted = observableSpy(createBadObservable(3):retry(2))
+    expect(#onNext).to.equal(3)
+    expect(#onError).to.equal(1)
+    expect(#onCompleted).to.equal(0)
+  end)
+end)

+ 1 - 0
tools/concat.lua

@@ -37,6 +37,7 @@ local files = {
   'src/operators/pluck.lua',
   'src/operators/reduce.lua',
   'src/operators/reject.lua',
+  'src/operators/retry.lua',
   'src/operators/scan.lua',
   'src/operators/skip.lua',
   'src/operators/skipUntil.lua',