Parcourir la source

Merge pull request #33 from naxxster/fix-merge-source-completed

Fix merge operator to handle onCompleted of input observables not a d…
Bjorn Swenson il y a 6 ans
Parent
commit
687c7e53b4
3 fichiers modifiés avec 20 ajouts et 4 suppressions
  1. 3 2
      rx.lua
  2. 3 2
      src/operators/merge.lua
  3. 14 0
      tests/merge.lua

+ 3 - 2
rx.lua

@@ -1049,6 +1049,7 @@ function Observable:merge(...)
   table.insert(sources, 1, self)
   table.insert(sources, 1, self)
 
 
   return Observable.create(function(observer)
   return Observable.create(function(observer)
+    local completed = {}
     local subscriptions = {}
     local subscriptions = {}
 
 
     local function onNext(...)
     local function onNext(...)
@@ -1061,9 +1062,9 @@ function Observable:merge(...)
 
 
     local function onCompleted(i)
     local function onCompleted(i)
       return function()
       return function()
-        sources[i] = nil
+        table.insert(completed, i)
 
 
-        if not next(sources) then
+        if #completed == #sources then
           observer:onCompleted()
           observer:onCompleted()
         end
         end
       end
       end

+ 3 - 2
src/operators/merge.lua

@@ -9,6 +9,7 @@ function Observable:merge(...)
   table.insert(sources, 1, self)
   table.insert(sources, 1, self)
 
 
   return Observable.create(function(observer)
   return Observable.create(function(observer)
+    local completed = {}
     local subscriptions = {}
     local subscriptions = {}
 
 
     local function onNext(...)
     local function onNext(...)
@@ -21,9 +22,9 @@ function Observable:merge(...)
 
 
     local function onCompleted(i)
     local function onCompleted(i)
       return function()
       return function()
-        sources[i] = nil
+        table.insert(completed, i)
 
 
-        if not next(sources) then
+        if #completed == #sources then
           observer:onCompleted()
           observer:onCompleted()
         end
         end
       end
       end

+ 14 - 0
tests/merge.lua

@@ -20,6 +20,20 @@ describe('merge', function()
     expect(#unsubscribeB).to.equal(1)
     expect(#unsubscribeB).to.equal(1)
   end)
   end)
 
 
+  it('unsubscribes from all input observables included completed', function()
+    local observableA = Rx.Observable.empty()
+
+    local unsubscribeB = spy()
+    local subscriptionB = Rx.Subscription.create(unsubscribeB)
+    local observableB = Rx.Observable.create(function(observer)
+      return subscriptionB
+    end)
+
+    local subscription = observableA:merge(Rx.Observable.empty(), observableB):subscribe()
+    subscription:unsubscribe()
+    expect(#unsubscribeB).to.equal(1)
+  end)
+
   it('produces values from all input observables, in order', function()
   it('produces values from all input observables, in order', function()
     local observableA = Rx.Subject.create()
     local observableA = Rx.Subject.create()
     local observableB = Rx.Subject.create()
     local observableB = Rx.Subject.create()