Browse Source

Fix merge operator to handle onCompleted of input observables not a destructive way

The merge operator removed the source observable if it complete.
After that the subscriptions after that source observable index can not unsubscribe.
Fix the issue by recording the completed source index, as combineLatest operator does.
Junseong Jang 6 years ago
parent
commit
5e4e857c96
3 changed files with 20 additions and 4 deletions
  1. 3 2
      rx.lua
  2. 3 2
      src/operators/merge.lua
  3. 14 0
      tests/merge.lua

+ 3 - 2
rx.lua

@@ -1049,6 +1049,7 @@ function Observable:merge(...)
   table.insert(sources, 1, self)
   table.insert(sources, 1, self)
 
 
   return Observable.create(function(observer)
   return Observable.create(function(observer)
+    local completed = {}
     local subscriptions = {}
     local subscriptions = {}
 
 
     local function onNext(...)
     local function onNext(...)
@@ -1061,9 +1062,9 @@ function Observable:merge(...)
 
 
     local function onCompleted(i)
     local function onCompleted(i)
       return function()
       return function()
-        sources[i] = nil
+        table.insert(completed, i)
 
 
-        if not next(sources) then
+        if #completed == #sources then
           observer:onCompleted()
           observer:onCompleted()
         end
         end
       end
       end

+ 3 - 2
src/operators/merge.lua

@@ -9,6 +9,7 @@ function Observable:merge(...)
   table.insert(sources, 1, self)
   table.insert(sources, 1, self)
 
 
   return Observable.create(function(observer)
   return Observable.create(function(observer)
+    local completed = {}
     local subscriptions = {}
     local subscriptions = {}
 
 
     local function onNext(...)
     local function onNext(...)
@@ -21,9 +22,9 @@ function Observable:merge(...)
 
 
     local function onCompleted(i)
     local function onCompleted(i)
       return function()
       return function()
-        sources[i] = nil
+        table.insert(completed, i)
 
 
-        if not next(sources) then
+        if #completed == #sources then
           observer:onCompleted()
           observer:onCompleted()
         end
         end
       end
       end

+ 14 - 0
tests/merge.lua

@@ -20,6 +20,20 @@ describe('merge', function()
     expect(#unsubscribeB).to.equal(1)
     expect(#unsubscribeB).to.equal(1)
   end)
   end)
 
 
+  it('unsubscribes from all input observables included completed', function()
+    local observableA = Rx.Observable.empty()
+
+    local unsubscribeB = spy()
+    local subscriptionB = Rx.Subscription.create(unsubscribeB)
+    local observableB = Rx.Observable.create(function(observer)
+      return subscriptionB
+    end)
+
+    local subscription = observableA:merge(Rx.Observable.empty(), observableB):subscribe()
+    subscription:unsubscribe()
+    expect(#unsubscribeB).to.equal(1)
+  end)
+
   it('produces values from all input observables, in order', function()
   it('produces values from all input observables, in order', function()
     local observableA = Rx.Subject.create()
     local observableA = Rx.Subject.create()
     local observableB = Rx.Subject.create()
     local observableB = Rx.Subject.create()