Browse Source

Merge pull request #36 from naxxster/fix_flatten_subscription

Fix `flatten` operator's subscription to unsubscribe from all sources
Bjorn Swenson 6 years ago
parent
commit
c6e6c543b2
3 changed files with 41 additions and 4 deletions
  1. 10 2
      rx.lua
  2. 10 2
      src/operators/flatten.lua
  3. 21 0
      tests/flatten.lua

+ 10 - 2
rx.lua

@@ -950,6 +950,8 @@ end
 -- @returns {Observable}
 function Observable:flatten()
   return Observable.create(function(observer)
+    local subscriptions = {}
+
     local function onError(message)
       return observer:onError(message)
     end
@@ -959,14 +961,20 @@ function Observable:flatten()
         observer:onNext(...)
       end
 
-      observable:subscribe(innerOnNext, onError, util.noop)
+      local subscription = observable:subscribe(innerOnNext, onError, util.noop)
+      subscriptions[#subscriptions + 1] = subscription
     end
 
     local function onCompleted()
       return observer:onCompleted()
     end
 
-    return self:subscribe(onNext, onError, onCompleted)
+    subscriptions[#subscriptions + 1] = self:subscribe(onNext, onError, onCompleted)
+    return Subscription.create(function ()
+      for i = 1, #subscriptions do
+        subscriptions[i]:unsubscribe()
+      end
+    end)
   end)
 end
 

+ 10 - 2
src/operators/flatten.lua

@@ -6,6 +6,8 @@ local util = require 'util'
 -- @returns {Observable}
 function Observable:flatten()
   return Observable.create(function(observer)
+    local subscriptions = {}
+
     local function onError(message)
       return observer:onError(message)
     end
@@ -15,13 +17,19 @@ function Observable:flatten()
         observer:onNext(...)
       end
 
-      observable:subscribe(innerOnNext, onError, util.noop)
+      local subscription = observable:subscribe(innerOnNext, onError, util.noop)
+      subscriptions[#subscriptions + 1] = subscription
     end
 
     local function onCompleted()
       return observer:onCompleted()
     end
 
-    return self:subscribe(onNext, onError, onCompleted)
+    subscriptions[#subscriptions + 1] = self:subscribe(onNext, onError, onCompleted)
+    return Subscription.create(function ()
+      for i = 1, #subscriptions do
+        subscriptions[i]:unsubscribe()
+      end
+    end)
   end)
 end

+ 21 - 0
tests/flatten.lua

@@ -12,4 +12,25 @@ describe('flatten', function()
 
     expect(observable).to.produce(1, 2, 3, 2, 3, 3)
   end)
+
+  it('should unsubscribe from all source observables', function()
+    local unsubscribeA = spy()
+    local observableA = Rx.Observable.create(function(observer)
+      return Rx.Subscription.create(unsubscribeA)
+    end)
+
+    local unsubscribeB = spy()
+    local observableB = Rx.Observable.create(function(observer)
+      return Rx.Subscription.create(unsubscribeB)
+    end)
+
+    local subject = Rx.Subject.create()
+    local subscription = subject:flatten():subscribe()
+
+    subject:onNext(observableA)
+    subject:onNext(observableB)
+    subscription:unsubscribe()
+    expect(#unsubscribeA).to.equal(1)
+    expect(#unsubscribeB).to.equal(1)
+  end)
 end)