浏览代码

Merge pull request #51 from bonzini/master

flatten: avoid early completion
Bjorn 5 年之前
父节点
当前提交
16edbf9fee
共有 4 个文件被更改,包括 46 次插入10 次删除
  1. 10 5
      rx.lua
  2. 10 5
      src/operators/flatten.lua
  3. 13 0
      tests/flatMap.lua
  4. 13 0
      tests/flatten.lua

+ 10 - 5
rx.lua

@@ -963,24 +963,29 @@ end
 function Observable:flatten()
   return Observable.create(function(observer)
     local subscriptions = {}
+    local remaining = 1
 
     local function onError(message)
       return observer:onError(message)
     end
 
+    local function onCompleted()
+      remaining = remaining - 1
+      if remaining == 0 then
+        return observer:onCompleted()
+      end
+    end
+
     local function onNext(observable)
       local function innerOnNext(...)
         observer:onNext(...)
       end
 
-      local subscription = observable:subscribe(innerOnNext, onError, util.noop)
+      remaining = remaining + 1
+      local subscription = observable:subscribe(innerOnNext, onError, onCompleted)
       subscriptions[#subscriptions + 1] = subscription
     end
 
-    local function onCompleted()
-      return observer:onCompleted()
-    end
-
     subscriptions[#subscriptions + 1] = self:subscribe(onNext, onError, onCompleted)
     return Subscription.create(function ()
       for i = 1, #subscriptions do

+ 10 - 5
src/operators/flatten.lua

@@ -7,24 +7,29 @@ local util = require 'util'
 function Observable:flatten()
   return Observable.create(function(observer)
     local subscriptions = {}
+    local remaining = 1
 
     local function onError(message)
       return observer:onError(message)
     end
 
+    local function onCompleted()
+      remaining = remaining - 1
+      if remaining == 0 then
+        return observer:onCompleted()
+      end
+    end
+
     local function onNext(observable)
       local function innerOnNext(...)
         observer:onNext(...)
       end
 
-      local subscription = observable:subscribe(innerOnNext, onError, util.noop)
+      remaining = remaining + 1
+      local subscription = observable:subscribe(innerOnNext, onError, onCompleted)
       subscriptions[#subscriptions + 1] = subscription
     end
 
-    local function onCompleted()
-      return observer:onCompleted()
-    end
-
     subscriptions[#subscriptions + 1] = self:subscribe(onNext, onError, onCompleted)
     return Subscription.create(function ()
       for i = 1, #subscriptions do

+ 13 - 0
tests/flatMap.lua

@@ -19,4 +19,17 @@ describe('flatMap', function()
 
     expect(observable).to.produce(1, 2, 3, 2, 3, 3)
   end)
+
+  it('completes after all observables produced by its parent', function()
+    s = Rx.CooperativeScheduler.create()
+    local observable = Rx.Observable.fromRange(3):flatMap(function(i)
+      return Rx.Observable.fromRange(i, 3):delay(i, s)
+    end)
+
+    local onNext, onError, onCompleted, order = observableSpy(observable)
+    repeat s:update(1)
+    until s:isEmpty()
+    expect(#onNext).to.equal(6)
+    expect(#onCompleted).to.equal(1)
+  end)
 end)

+ 13 - 0
tests/flatten.lua

@@ -13,6 +13,19 @@ describe('flatten', function()
     expect(observable).to.produce(1, 2, 3, 2, 3, 3)
   end)
 
+  it('completes after all observables produced by its parent', function()
+    s = Rx.CooperativeScheduler.create()
+    local observable = Rx.Observable.fromRange(3):map(function(i)
+      return Rx.Observable.fromRange(i, 3):delay(i, s)
+    end):flatten()
+
+    local onNext, onError, onCompleted, order = observableSpy(observable)
+    repeat s:update(1)
+    until s:isEmpty()
+    expect(#onNext).to.equal(6)
+    expect(#onCompleted).to.equal(1)
+  end)
+
   it('should unsubscribe from all source observables', function()
     local unsubscribeA = spy()
     local observableA = Rx.Observable.create(function(observer)