flatten.lua 1.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849
  1. describe('flatten', function()
  2. it('produces an error if its parent errors', function()
  3. local observable = Rx.Observable.of(''):map(function(x) return x() end)
  4. expect(observable).to.produce.error()
  5. expect(observable:flatten()).to.produce.error()
  6. end)
  7. it('produces all values produced by the observables produced by its parent', function()
  8. local observable = Rx.Observable.fromRange(3):map(function(i)
  9. return Rx.Observable.fromRange(i, 3)
  10. end):flatten()
  11. expect(observable).to.produce(1, 2, 3, 2, 3, 3)
  12. end)
  13. it('completes after all observables produced by its parent', function()
  14. s = Rx.CooperativeScheduler.create()
  15. local observable = Rx.Observable.fromRange(3):map(function(i)
  16. return Rx.Observable.fromRange(i, 3):delay(i, s)
  17. end):flatten()
  18. local onNext, onError, onCompleted, order = observableSpy(observable)
  19. repeat s:update(1)
  20. until s:isEmpty()
  21. expect(#onNext).to.equal(6)
  22. expect(#onCompleted).to.equal(1)
  23. end)
  24. it('should unsubscribe from all source observables', function()
  25. local unsubscribeA = spy()
  26. local observableA = Rx.Observable.create(function(observer)
  27. return Rx.Subscription.create(unsubscribeA)
  28. end)
  29. local unsubscribeB = spy()
  30. local observableB = Rx.Observable.create(function(observer)
  31. return Rx.Subscription.create(unsubscribeB)
  32. end)
  33. local subject = Rx.Subject.create()
  34. local subscription = subject:flatten():subscribe()
  35. subject:onNext(observableA)
  36. subject:onNext(observableB)
  37. subscription:unsubscribe()
  38. expect(#unsubscribeA).to.equal(1)
  39. expect(#unsubscribeB).to.equal(1)
  40. end)
  41. end)