Эх сурвалжийг харах

Observable.flatMapLatest;

bjorn 9 жил өмнө
parent
commit
271e9ffda1

+ 11 - 0
doc/README.md

@@ -39,6 +39,7 @@ RxLua
   - [find](#findpredicate)
   - [first](#first)
   - [flatMap](#flatmapcallback)
+  - [flatMapLatest](#flatmaplatestcallback)
   - [flatten](#flatten)
   - [last](#last)
   - [map](#mapcallback)
@@ -420,6 +421,16 @@ Returns a new Observable that transform the items emitted by an Observable into
 
 ---
 
+#### `:flatMapLatest(callback)`
+
+Returns a new Observable that uses a callback to create Observables from the values produced by the source, then produces values from the most recent of these Observables.
+
+| Name | Type | Default | Description |
+|------|------|---------|-------------|
+| `callback` | function (optional) | identity | The function used to convert values to Observables. |
+
+---
+
 #### `:flatten()`
 
 Returns a new Observable that subscribes to the Observables produced by the original and produces their values.

+ 42 - 0
rx.lua

@@ -755,6 +755,48 @@ function Observable:flatMap(callback)
   return self:map(callback):flatten()
 end
 
+--- Returns a new Observable that uses a callback to create Observables from the values produced by
+-- the source, then produces values from the most recent of these Observables.
+-- @arg {function=identity} callback - The function used to convert values to Observables.
+-- @returns {Observable}
+function Observable:flatMapLatest(callback)
+  callback = callback or util.identity
+  return Observable.create(function(observer)
+    local innerSubscription
+
+    local function onNext(...)
+      observer:onNext(...)
+    end
+
+    local function onError(e)
+      return observer:onError(e)
+    end
+
+    local function onCompleted()
+      return observer:onCompleted()
+    end
+
+    local function subscribeInner(...)
+      if innerSubscription then
+        innerSubscription:unsubscribe()
+      end
+
+      innerSubscription = callback(...):subscribe(onNext, onError)
+    end
+
+    local subscription = self:subscribe(subscribeInner, onError, onCompleted)
+    return Subscription.create(function()
+      if innerSubscription then
+        innerSubscription:unsubscribe()
+      end
+
+      if subscription then
+        subscription:unsubscribe()
+      end
+    end)
+  end)
+end
+
 --- Returns a new Observable that subscribes to the Observables produced by the original and
 -- produces their values.
 -- @returns {Observable}

+ 45 - 0
src/operators/flatMapLatest.lua

@@ -0,0 +1,45 @@
+local Observable = require 'observable'
+local Subscription = require 'subscription'
+local util = require 'util'
+
+--- Returns a new Observable that uses a callback to create Observables from the values produced by
+-- the source, then produces values from the most recent of these Observables.
+-- @arg {function=identity} callback - The function used to convert values to Observables.
+-- @returns {Observable}
+function Observable:flatMapLatest(callback)
+  callback = callback or util.identity
+  return Observable.create(function(observer)
+    local innerSubscription
+
+    local function onNext(...)
+      observer:onNext(...)
+    end
+
+    local function onError(e)
+      return observer:onError(e)
+    end
+
+    local function onCompleted()
+      return observer:onCompleted()
+    end
+
+    local function subscribeInner(...)
+      if innerSubscription then
+        innerSubscription:unsubscribe()
+      end
+
+      innerSubscription = callback(...):subscribe(onNext, onError)
+    end
+
+    local subscription = self:subscribe(subscribeInner, onError, onCompleted)
+    return Subscription.create(function()
+      if innerSubscription then
+        innerSubscription:unsubscribe()
+      end
+
+      if subscription then
+        subscription:unsubscribe()
+      end
+    end)
+  end)
+end

+ 67 - 0
tests/flatMapLatest.lua

@@ -0,0 +1,67 @@
+describe('flatMapLatest', function()
+  it('produces an error if its parent errors', function()
+    expect(Rx.Observable.throw():flatMapLatest()).to.fail()
+  end)
+
+  it('unsubscribes from the source and the projected observable', function()
+    local outerUnsubscribe = spy()
+    local outerSubscription = Rx.Subscription.create(outerUnsubscribe)
+    local outer = Rx.Observable.create(function(observer)
+      observer:onNext()
+      observer:onCompleted()
+      return outerSubscription
+    end)
+
+    local innerUnsubscribe = spy()
+    local innerSubscription = Rx.Subscription.create(innerUnsubscribe)
+    local inner = Rx.Observable.create(function()
+      return innerSubscription
+    end)
+
+    local subscription = outer:flatMapLatest(function() return inner end):subscribe()
+    subscription:unsubscribe()
+    expect(#innerUnsubscribe).to.equal(1)
+    expect(#outerUnsubscribe).to.equal(1)
+  end)
+
+  it('uses the identity function as the callback if none is specified', function()
+    local observable = Rx.Observable.fromTable({
+      Rx.Observable.fromRange(3),
+      Rx.Observable.fromRange(5)
+    }):flatMapLatest()
+    expect(observable).to.produce(1, 2, 3, 1, 2, 3, 4, 5)
+  end)
+
+  it('produces values from the most recent projected Observable of the source', function()
+    local children = {Rx.Subject.create(), Rx.Subject.create()}
+    local subject = Rx.Subject.create()
+    local onNext = observableSpy(subject:flatMapLatest(function(i)
+      return children[i]
+    end))
+    subject:onNext(1)
+    children[1]:onNext(1)
+    children[1]:onNext(2)
+    children[1]:onNext(3)
+    children[2]:onNext(10)
+    subject:onNext(2)
+    children[1]:onNext(4)
+    children[2]:onNext(20)
+    children[1]:onNext(5)
+    children[2]:onNext(30)
+    children[2]:onNext(40)
+    children[2]:onNext(50)
+    expect(onNext).to.equal({{1}, {2}, {3}, {20}, {30}, {40}, {50}})
+  end)
+
+  it('does not complete if one of the children completes', function()
+    local subject = Rx.Subject.create()
+    local flatMapped = subject:flatMapLatest(function() return Rx.Observable.empty() end)
+    local _, _, onCompleted = observableSpy(flatMapped)
+    subject:onNext()
+    expect(#onCompleted).to.equal(0)
+    subject:onNext()
+    expect(#onCompleted).to.equal(0)
+    subject:onCompleted()
+    expect(#onCompleted).to.equal(1)
+  end)
+end)

+ 1 - 0
tests/observable.lua

@@ -231,6 +231,7 @@ describe('Observable', function()
   dofile('tests/find.lua')
   dofile('tests/first.lua')
   dofile('tests/flatMap.lua')
+  dofile('tests/flatMapLatest.lua')
   dofile('tests/flatten.lua')
   dofile('tests/last.lua')
   dofile('tests/map.lua')

+ 1 - 0
tools/concat.lua

@@ -24,6 +24,7 @@ local files = {
   'src/operators/find.lua',
   'src/operators/first.lua',
   'src/operators/flatMap.lua',
+  'src/operators/flatMapLatest.lua',
   'src/operators/flatten.lua',
   'src/operators/last.lua',
   'src/operators/map.lua',