Browse Source

Merge pull request #7 from bjornbytes/better-errors

Better errors
Bjorn Swenson 9 years ago
parent
commit
124252bb12

+ 81 - 36
rx.lua

@@ -13,6 +13,11 @@ util.constant = function(x) return function() return x end end
 util.isa = function(object, class)
   return type(object) == 'table' and getmetatable(object).__index == class
 end
+util.tryWithObserver = function(observer, fn, ...)
+  return xpcall(fn, function(...)
+    return observer:onError(...)
+  end, ...)
+end
 
 --- @class Subscription
 -- @description A handle representing the link between an Observer and an Observable, as well as any
@@ -260,10 +265,12 @@ function Observable:all(predicate)
 
   return Observable.create(function(observer)
     local function onNext(...)
-      if not predicate(...) then
-        observer:onNext(false)
-        observer:onCompleted()
-      end
+      util.tryWithObserver(observer, function(...)
+        if not predicate(...) then
+          observer:onNext(false)
+          observer:onCompleted()
+        end
+      end, ...)
     end
 
     local function onError(e)
@@ -413,12 +420,12 @@ function Observable:catch(handler)
         return observer:onCompleted()
       end
 
-      local continue = handler(e)
-      if continue then
+      local success, continue = pcall(handler, e)
+      if success and continue then
         if subscription then subscription:unsubscribe() end
         continue:subscribe(observer)
       else
-        observer:onError(e)
+        observer:onError(success and e or continue)
       end
     end
 
@@ -458,7 +465,9 @@ function Observable:combineLatest(...)
         pending[i] = nil
 
         if not next(pending) then
-          observer:onNext(combinator(util.unpack(latest)))
+          util.tryWithObserver(observer, function()
+            observer:onNext(combinator(util.unpack(latest)))
+          end)
         end
       end
     end
@@ -569,9 +578,11 @@ function Observable:count(predicate)
     local count = 0
 
     local function onNext(...)
-      if predicate(...) then
-        count = count + 1
-      end
+      util.tryWithObserver(observer, function(...)
+        if predicate(...) then
+          count = count + 1
+        end
+      end, ...)
     end
 
     local function onError(e)
@@ -657,11 +668,14 @@ function Observable:distinctUntilChanged(comparator)
     local currentValue = nil
 
     local function onNext(value, ...)
-      if first or not comparator(value, currentValue) then
-        observer:onNext(value, ...)
-        currentValue = value
-        first = false
-      end
+      local values = util.pack(...)
+      util.tryWithObserver(observer, function()
+        if first or not comparator(value, currentValue) then
+          observer:onNext(value, util.unpack(values))
+          currentValue = value
+          first = false
+        end
+      end)
     end
 
     local function onError(message)
@@ -717,9 +731,11 @@ function Observable:filter(predicate)
 
   return Observable.create(function(observer)
     local function onNext(...)
-      if predicate(...) then
-        return observer:onNext(...)
-      end
+      util.tryWithObserver(observer, function(...)
+        if predicate(...) then
+          return observer:onNext(...)
+        end
+      end, ...)
     end
 
     local function onError(e)
@@ -742,10 +758,12 @@ function Observable:find(predicate)
 
   return Observable.create(function(observer)
     local function onNext(...)
-      if predicate(...) then
-        observer:onNext(...)
-        return observer:onCompleted()
-      end
+      util.tryWithObserver(observer, function(...)
+        if predicate(...) then
+          observer:onNext(...)
+          return observer:onCompleted()
+        end
+      end, ...)
     end
 
     local function onError(message)
@@ -801,7 +819,9 @@ function Observable:flatMapLatest(callback)
         innerSubscription:unsubscribe()
       end
 
-      innerSubscription = callback(...):subscribe(onNext, onError)
+      return util.tryWithObserver(observer, function(...)
+        innerSubscription = callback(...):subscribe(onNext, onError)
+      end, ...)
     end
 
     local subscription = self:subscribe(subscribeInner, onError, onCompleted)
@@ -895,7 +915,9 @@ function Observable:map(callback)
     callback = callback or util.identity
 
     local function onNext(...)
-      return observer:onNext(callback(...))
+      return util.tryWithObserver(observer, function(...)
+        return observer:onNext(callback(...))
+      end, ...)
     end
 
     local function onError(e)
@@ -978,6 +1000,10 @@ end
 function Observable:pluck(key, ...)
   if not key then return self end
 
+  if type(key) ~= 'string' and type(key) ~= 'number' then
+    return Observable.throw('pluck key must be a string')
+  end
+
   return Observable.create(function(observer)
     local function onNext(t)
       return observer:onNext(t[key])
@@ -1012,7 +1038,9 @@ function Observable:reduce(accumulator, seed)
         result = ...
         first = false
       else
-        result = accumulator(result, ...)
+        return util.tryWithObserver(observer, function(...)
+          result = accumulator(result, ...)
+        end, ...)
       end
     end
 
@@ -1038,9 +1066,11 @@ function Observable:reject(predicate)
 
   return Observable.create(function(observer)
     local function onNext(...)
-      if not predicate(...) then
-        return observer:onNext(...)
-      end
+      util.tryWithObserver(observer, function(...)
+        if not predicate(...) then
+          return observer:onNext(...)
+        end
+      end, ...)
     end
 
     local function onError(e)
@@ -1107,8 +1137,10 @@ function Observable:scan(accumulator, seed)
         result = ...
         first = false
       else
-        result = accumulator(result, ...)
-        observer:onNext(result)
+        return util.tryWithObserver(observer, function(...)
+          result = accumulator(result, ...)
+          observer:onNext(result)
+        end, ...)
       end
     end
 
@@ -1232,7 +1264,9 @@ function Observable:skipWhile(predicate)
 
     local function onNext(...)
       if skipping then
-        skipping = predicate(...)
+        util.tryWithObserver(observer, function(...)
+          skipping = predicate(...)
+        end, ...)
       end
 
       if not skipping then
@@ -1403,7 +1437,9 @@ function Observable:takeWhile(predicate)
 
     local function onNext(...)
       if taking then
-        taking = predicate(...)
+        util.tryWithObserver(observer, function(...)
+          taking = predicate(...)
+        end, ...)
 
         if taking then
           return observer:onNext(...)
@@ -1438,17 +1474,26 @@ function Observable:tap(_onNext, _onError, _onCompleted)
 
   return Observable.create(function(observer)
     local function onNext(...)
-      _onNext(...)
+      util.tryWithObserver(observer, function(...)
+        _onNext(...)
+      end, ...)
+
       return observer:onNext(...)
     end
 
     local function onError(message)
-      _onError(message)
+      util.tryWithObserver(observer, function()
+        _onError(message)
+      end)
+
       return observer:onError(message)
     end
 
     local function onCompleted()
-      _onCompleted()
+      util.tryWithObserver(observer, function()
+        _onCompleted()
+      end)
+
       return observer:onCompleted()
     end
 

+ 6 - 4
src/operators/all.lua

@@ -8,10 +8,12 @@ function Observable:all(predicate)
 
   return Observable.create(function(observer)
     local function onNext(...)
-      if not predicate(...) then
-        observer:onNext(false)
-        observer:onCompleted()
-      end
+      util.tryWithObserver(observer, function(...)
+        if not predicate(...) then
+          observer:onNext(false)
+          observer:onCompleted()
+        end
+      end, ...)
     end
 
     local function onError(e)

+ 3 - 3
src/operators/catch.lua

@@ -21,12 +21,12 @@ function Observable:catch(handler)
         return observer:onCompleted()
       end
 
-      local continue = handler(e)
-      if continue then
+      local success, continue = pcall(handler, e)
+      if success and continue then
         if subscription then subscription:unsubscribe() end
         continue:subscribe(observer)
       else
-        observer:onError(e)
+        observer:onError(success and e or continue)
       end
     end
 

+ 3 - 1
src/operators/combineLatest.lua

@@ -28,7 +28,9 @@ function Observable:combineLatest(...)
         pending[i] = nil
 
         if not next(pending) then
-          observer:onNext(combinator(util.unpack(latest)))
+          util.tryWithObserver(observer, function()
+            observer:onNext(combinator(util.unpack(latest)))
+          end)
         end
       end
     end

+ 5 - 3
src/operators/count.lua

@@ -11,9 +11,11 @@ function Observable:count(predicate)
     local count = 0
 
     local function onNext(...)
-      if predicate(...) then
-        count = count + 1
-      end
+      util.tryWithObserver(observer, function(...)
+        if predicate(...) then
+          count = count + 1
+        end
+      end, ...)
     end
 
     local function onError(e)

+ 8 - 5
src/operators/distinctUntilChanged.lua

@@ -13,11 +13,14 @@ function Observable:distinctUntilChanged(comparator)
     local currentValue = nil
 
     local function onNext(value, ...)
-      if first or not comparator(value, currentValue) then
-        observer:onNext(value, ...)
-        currentValue = value
-        first = false
-      end
+      local values = util.pack(...)
+      util.tryWithObserver(observer, function()
+        if first or not comparator(value, currentValue) then
+          observer:onNext(value, util.unpack(values))
+          currentValue = value
+          first = false
+        end
+      end)
     end
 
     local function onError(message)

+ 5 - 3
src/operators/filter.lua

@@ -9,9 +9,11 @@ function Observable:filter(predicate)
 
   return Observable.create(function(observer)
     local function onNext(...)
-      if predicate(...) then
-        return observer:onNext(...)
-      end
+      util.tryWithObserver(observer, function(...)
+        if predicate(...) then
+          return observer:onNext(...)
+        end
+      end, ...)
     end
 
     local function onError(e)

+ 6 - 4
src/operators/find.lua

@@ -9,10 +9,12 @@ function Observable:find(predicate)
 
   return Observable.create(function(observer)
     local function onNext(...)
-      if predicate(...) then
-        observer:onNext(...)
-        return observer:onCompleted()
-      end
+      util.tryWithObserver(observer, function(...)
+        if predicate(...) then
+          observer:onNext(...)
+          return observer:onCompleted()
+        end
+      end, ...)
     end
 
     local function onError(message)

+ 3 - 1
src/operators/flatMapLatest.lua

@@ -28,7 +28,9 @@ function Observable:flatMapLatest(callback)
         innerSubscription:unsubscribe()
       end
 
-      innerSubscription = callback(...):subscribe(onNext, onError)
+      return util.tryWithObserver(observer, function(...)
+        innerSubscription = callback(...):subscribe(onNext, onError)
+      end, ...)
     end
 
     local subscription = self:subscribe(subscribeInner, onError, onCompleted)

+ 3 - 1
src/operators/map.lua

@@ -9,7 +9,9 @@ function Observable:map(callback)
     callback = callback or util.identity
 
     local function onNext(...)
-      return observer:onNext(callback(...))
+      return util.tryWithObserver(observer, function(...)
+        return observer:onNext(callback(...))
+      end, ...)
     end
 
     local function onError(e)

+ 4 - 0
src/operators/pluck.lua

@@ -8,6 +8,10 @@ local Observable = require 'observable'
 function Observable:pluck(key, ...)
   if not key then return self end
 
+  if type(key) ~= 'string' and type(key) ~= 'number' then
+    return Observable.throw('pluck key must be a string')
+  end
+
   return Observable.create(function(observer)
     local function onNext(t)
       return observer:onNext(t[key])

+ 4 - 1
src/operators/reduce.lua

@@ -1,4 +1,5 @@
 local Observable = require 'observable'
+local util = require 'util'
 
 --- Returns a new Observable that produces a single value computed by accumulating the results of
 -- running a function on each value produced by the original Observable.
@@ -17,7 +18,9 @@ function Observable:reduce(accumulator, seed)
         result = ...
         first = false
       else
-        result = accumulator(result, ...)
+        return util.tryWithObserver(observer, function(...)
+          result = accumulator(result, ...)
+        end, ...)
       end
     end
 

+ 5 - 3
src/operators/reject.lua

@@ -10,9 +10,11 @@ function Observable:reject(predicate)
 
   return Observable.create(function(observer)
     local function onNext(...)
-      if not predicate(...) then
-        return observer:onNext(...)
-      end
+      util.tryWithObserver(observer, function(...)
+        if not predicate(...) then
+          return observer:onNext(...)
+        end
+      end, ...)
     end
 
     local function onError(e)

+ 5 - 2
src/operators/scan.lua

@@ -1,4 +1,5 @@
 local Observable = require 'observable'
+local util = require 'util'
 
 --- Returns a new Observable that produces values computed by accumulating the results of running a
 -- function on each value produced by the original Observable.
@@ -18,8 +19,10 @@ function Observable:scan(accumulator, seed)
         result = ...
         first = false
       else
-        result = accumulator(result, ...)
-        observer:onNext(result)
+        return util.tryWithObserver(observer, function(...)
+          result = accumulator(result, ...)
+          observer:onNext(result)
+        end, ...)
       end
     end
 

+ 3 - 1
src/operators/skipWhile.lua

@@ -12,7 +12,9 @@ function Observable:skipWhile(predicate)
 
     local function onNext(...)
       if skipping then
-        skipping = predicate(...)
+        util.tryWithObserver(observer, function(...)
+          skipping = predicate(...)
+        end, ...)
       end
 
       if not skipping then

+ 3 - 1
src/operators/takeWhile.lua

@@ -12,7 +12,9 @@ function Observable:takeWhile(predicate)
 
     local function onNext(...)
       if taking then
-        taking = predicate(...)
+        util.tryWithObserver(observer, function(...)
+          taking = predicate(...)
+        end, ...)
 
         if taking then
           return observer:onNext(...)

+ 12 - 3
src/operators/tap.lua

@@ -14,17 +14,26 @@ function Observable:tap(_onNext, _onError, _onCompleted)
 
   return Observable.create(function(observer)
     local function onNext(...)
-      _onNext(...)
+      util.tryWithObserver(observer, function(...)
+        _onNext(...)
+      end, ...)
+
       return observer:onNext(...)
     end
 
     local function onError(message)
-      _onError(message)
+      util.tryWithObserver(observer, function()
+        _onError(message)
+      end)
+
       return observer:onError(message)
     end
 
     local function onCompleted()
-      _onCompleted()
+      util.tryWithObserver(observer, function()
+        _onCompleted()
+      end)
+
       return observer:onCompleted()
     end
 

+ 5 - 0
src/util.lua

@@ -9,5 +9,10 @@ util.constant = function(x) return function() return x end end
 util.isa = function(object, class)
   return type(object) == 'table' and getmetatable(object).__index == class
 end
+util.tryWithObserver = function(observer, fn, ...)
+  return xpcall(fn, function(...)
+    return observer:onError(...)
+  end, ...)
+end
 
 return util

+ 8 - 3
tests/all.lua

@@ -1,8 +1,13 @@
 describe('all', function()
   it('passes through errors', function()
-    local observable = Rx.Observable.create(function(observer) observer:onError() end)
-    expect(observable.subscribe).to.fail()
-    expect(observable:all().subscribe).to.fail()
+    expect(Rx.Observable.throw():all().subscribe).to.fail()
+  end)
+
+  it('calls onError if the predicate errors', function()
+    local observable = Rx.Observable.fromRange(3):all(error)
+    local onError = spy()
+    observable:subscribe(nil, onError, nil)
+    expect(#onError).to.equal(1)
   end)
 
   it('produces true if all elements satisfy the predicate', function()

+ 7 - 0
tests/catch.lua

@@ -28,4 +28,11 @@ describe('catch', function()
     local handler = function() return Rx.Observable.empty() end
     expect(Rx.Observable.throw():catch(handler)).to.produce.nothing()
   end)
+
+  it('calls onError if the supplied function errors', function()
+    local handler = error
+    local onError = spy()
+    Rx.Observable.throw():catch(handler):subscribe(nil, onError, nil)
+    expect(#onError).to.equal(1)
+  end)
 end)

+ 12 - 6
tests/combineLatest.lua

@@ -4,7 +4,7 @@ describe('combineLatest', function()
     expect(observable).to.produce(1, 2, 3, 4, 5)
   end)
 
-  it('should call the combinator function with all values produced from all input observables once they have all produced a value', function()
+  it('calls the combinator function with all values produced from all input observables once they have all produced a value', function()
     local observableA = Rx.Observable.fromValue('a')
     local observableB = Rx.Observable.fromValue('b')
     local observableC = Rx.Observable.fromValue('c')
@@ -13,7 +13,7 @@ describe('combineLatest', function()
     expect(combinator).to.equal({{'a', 'b', 'c'}})
   end)
 
-  it('should emit the return value of the combinator as values', function()
+  it('emits the return value of the combinator as values', function()
     local observableA = Rx.Subject.create()
     local observableB = Rx.Subject.create()
     local onNext = spy()
@@ -26,7 +26,7 @@ describe('combineLatest', function()
     expect(onNext).to.equal({{3}, {4}, {7}})
   end)
 
-  it('should call onCompleted once all sources complete', function()
+  it('calls onCompleted once all sources complete', function()
     local observableA = Rx.Subject.create()
     local observableB = Rx.Subject.create()
     local complete = spy()
@@ -43,7 +43,7 @@ describe('combineLatest', function()
     expect(#complete).to.equal(1)
   end)
 
-  it('should call onError if one source errors', function()
+  it('calls onError if one source errors', function()
     local observableA = Rx.Subject.create()
     local observableB = Rx.Subject.create()
     local errored = spy()
@@ -53,7 +53,13 @@ describe('combineLatest', function()
     expect(#errored).to.equal(1)
   end)
 
-  it('should error if the combinator is absent', function()
-    expect(Rx.Observable.combineLatest(Rx.Observable.fromRange(1, 3)).subscribe).to.fail()
+  it('calls onError if the combinator is absent', function()
+    expect(Rx.Observable.combineLatest(Rx.Observable.fromRange(3)).subscribe).to.fail()
+  end)
+
+  it('calls onError if the combinator errors', function()
+    local onError = spy()
+    Rx.Observable.combineLatest(Rx.Observable.fromRange(3), error):subscribe(nil, onError, nil)
+    expect(#onError).to.equal(1)
   end)
 end)

+ 6 - 0
tests/count.lua

@@ -14,4 +14,10 @@ describe('count', function()
     local observable = Rx.Observable.fromRange(5):count(function(x) return x > 3 end)
     expect(observable).to.produce(2)
   end)
+
+  it('calls onError if the predicate errors', function()
+    local onError = spy()
+    Rx.Observable.fromRange(3):count(error):subscribe(nil, onError, nil)
+    expect(#onError).to.equal(1)
+  end)
 end)

+ 6 - 0
tests/distinctUntilChanged.lua

@@ -32,5 +32,11 @@ describe('distinctUntilChanged', function()
       local observable = Rx.Observable.fromValue(nil):distinctUntilChanged(function(x, y) return true end)
       expect(observable).to.produce({{}})
     end)
+
+    it('calls onError if the comparator errors', function()
+      local onError = spy()
+      Rx.Observable.fromRange(2):distinctUntilChanged(error):subscribe(nil, onError, nil)
+      expect(#onError).to.equal(1)
+    end)
   end)
 end)

+ 6 - 0
tests/filter.lua

@@ -21,4 +21,10 @@ describe('filter', function()
     expect(observable.subscribe).to.fail()
     expect(observable:filter().subscribe).to.fail()
   end)
+
+  it('calls onError if the predicate errors', function()
+    local onError = spy()
+    Rx.Observable.fromValue(5):filter(error):subscribe(nil, onError, nil)
+    expect(#onError).to.equal(1)
+  end)
 end)

+ 6 - 0
tests/find.lua

@@ -5,6 +5,12 @@ describe('find', function()
     expect(observable:find().subscribe).to.fail()
   end)
 
+  it('calls onError if the predicate errors', function()
+    local onError = spy()
+    Rx.Observable.fromValue(3):find(error):subscribe(nil, onError, nil)
+    expect(#onError).to.equal(1)
+  end)
+
   it('uses the identity function as a predicate if none is specified', function()
     local observable = Rx.Observable.fromTable({false, false, true, true, false}):find()
     expect(observable).to.produce(true)

+ 6 - 0
tests/flatMapLatest.lua

@@ -3,6 +3,12 @@ describe('flatMapLatest', function()
     expect(Rx.Observable.throw():flatMapLatest()).to.fail()
   end)
 
+  it('produces an error if the callback errors', function()
+    local onError = spy()
+    Rx.Observable.fromRange(3):flatMapLatest(error):subscribe(nil, onError, nil)
+    expect(#onError).to.equal(1)
+  end)
+
   it('unsubscribes from the source and the projected observable', function()
     local outerUnsubscribe = spy()
     local outerSubscription = Rx.Subscription.create(outerUnsubscribe)

+ 6 - 0
tests/map.lua

@@ -16,4 +16,10 @@ describe('map', function()
     expect(observable).to.produce({{1, 2, 3}, {1, 2, 3}, {1, 2, 3}})
     expect(callback).to.equal({{3, 1}, {2, 2}, {1, 3}})
   end)
+
+  it('calls onError if the callback errors', function()
+    local onError = spy()
+    Rx.Observable.fromRange(3):map(error):subscribe(nil, onError, nil)
+    expect(#onError).to.equal(1)
+  end)
 end)

+ 6 - 0
tests/reduce.lua

@@ -33,4 +33,10 @@ describe('reduce', function()
     local observable = Rx.Observable.fromTable({2, 3, 4}, ipairs, true):reduce(accumulator, 0):subscribe()
     expect(accumulator).to.equal({{0, 2, 1}, {0, 3, 2}, {0, 4, 3}})
   end)
+
+  it('calls onError if the accumulator errors', function()
+    local onError = spy()
+    Rx.Observable.fromRange(3):reduce(error):subscribe(nil, onError, nil)
+    expect(#onError).to.equal(1)
+  end)
 end)

+ 6 - 0
tests/reject.lua

@@ -21,4 +21,10 @@ describe('reject', function()
     expect(observable.subscribe).to.fail()
     expect(observable:reject().subscribe).to.fail()
   end)
+
+  it('calls onError when the predicate errors', function()
+    local onError = spy()
+    Rx.Observable.fromRange(3):reject(error):subscribe(nil, onError, nil)
+    expect(#onError).to.equal(1)
+  end)
 end)

+ 6 - 0
tests/scan.lua

@@ -33,4 +33,10 @@ describe('scan', function()
     local observable = Rx.Observable.fromTable({2, 3, 4}, ipairs, true):scan(accumulator, 0):subscribe()
     expect(accumulator).to.equal({{0, 2, 1}, {0, 3, 2}, {0, 4, 3}})
   end)
+
+  it('calls onError if the accumulator errors', function()
+    local onError = spy()
+    Rx.Observable.fromRange(3):scan(error):subscribe(nil, onError, nil)
+    expect(#onError).to.equal(1)
+  end)
 end)

+ 6 - 0
tests/skipWhile.lua

@@ -21,4 +21,10 @@ describe('skipWhile', function()
     local observable = Rx.Observable.fromTable({2, 4, 6}):skipWhile(isEven)
     expect(observable).to.produce.nothing()
   end)
+
+  it('calls onError if the predicate errors', function()
+    local onError = spy()
+    Rx.Observable.fromRange(3):skipWhile(error):subscribe(nil, onError, nil)
+    expect(#onError).to.equal(1)
+  end)
 end)

+ 6 - 0
tests/takeWhile.lua

@@ -21,4 +21,10 @@ describe('takeWhile', function()
     local observable = Rx.Observable.fromTable({1, 3, 5}):takeWhile(isEven)
     expect(observable).to.produce.nothing()
   end)
+
+  it('calls onError if the predicate errors', function()
+    local onError = spy()
+    Rx.Observable.fromRange(3):takeWhile(error):subscribe(nil, onError, nil)
+    expect(#onError).to.equal(1)
+  end)
 end)

+ 23 - 0
tests/tap.lua

@@ -13,6 +13,15 @@ describe('tap', function()
     expect(onNext).to.equal({{1}})
   end)
 
+  it('calls onError if the onNext callback errors', function()
+    local onNext = spy()
+    local onError = spy()
+    local observer = Rx.Observer.create(onNext, onError)
+    Rx.Observable.fromValue(1):tap(error):subscribe(observer)
+    expect(#onNext).to.equal(0)
+    expect(#onError).to.equal(1)
+  end)
+
   it('runs the specified onError function', function()
     local onError = spy()
     local observable = Rx.Observable.create(function(observer)
@@ -21,6 +30,12 @@ describe('tap', function()
     expect(onError).to.equal({{'message'}})
   end)
 
+  it('calls onError if the onError callback errors', function()
+    local onError = spy()
+    Rx.Observable.throw():tap(nil, error):subscribe(nil, onError, nil)
+    expect(#onError).to.equal(1)
+  end)
+
   it('runs the specified onCompleted function', function()
     local onCompleted = spy()
     local observable = Rx.Observable.create(function(observer)
@@ -28,4 +43,12 @@ describe('tap', function()
     end):tap(_, _, onCompleted):subscribe()
     expect(#onCompleted).to.equal(1)
   end)
+
+  it('calls onError if the onCompleted callback errors', function()
+    local onError = spy()
+    local onCompleted = spy()
+    Rx.Observable.fromValue(1):tap(nil, nil, error):subscribe(nil, onError, onCompleted)
+    expect(#onCompleted).to.equal(0)
+    expect(#onError).to.equal(1)
+  end)
 end)