浏览代码

Merge pull request #32 from naxxster/fix-switch-subscription

Fix switch operator subscription to unsubscribe from inner subscription
Bjorn Swenson 6 年之前
父节点
当前提交
66af07f13b
共有 3 个文件被更改,包括 41 次插入10 次删除
  1. 14 5
      rx.lua
  2. 14 5
      src/operators/switch.lua
  3. 13 0
      tests/switch.lua

+ 14 - 5
rx.lua

@@ -1459,7 +1459,7 @@ end
 -- @returns {Observable}
 -- @returns {Observable}
 function Observable:switch()
 function Observable:switch()
   return Observable.create(function(observer)
   return Observable.create(function(observer)
-    local subscription
+    local innerSubscription
 
 
     local function onNext(...)
     local function onNext(...)
       return observer:onNext(...)
       return observer:onNext(...)
@@ -1474,14 +1474,23 @@ function Observable:switch()
     end
     end
 
 
     local function switch(source)
     local function switch(source)
-      if subscription then
-        subscription:unsubscribe()
+      if innerSubscription then
+        innerSubscription:unsubscribe()
       end
       end
 
 
-      subscription = source:subscribe(onNext, onError, nil)
+      innerSubscription = source:subscribe(onNext, onError, nil)
     end
     end
 
 
-    return self:subscribe(switch, onError, onCompleted)
+    local subscription = self:subscribe(switch, onError, onCompleted)
+    return Subscription.create(function()
+      if innerSubscription then
+        innerSubscription:unsubscribe()
+      end
+
+      if subscription then
+        subscription:unsubscribe()
+      end
+    end)
   end)
   end)
 end
 end
 
 

+ 14 - 5
src/operators/switch.lua

@@ -5,7 +5,7 @@ local Observable = require 'observable'
 -- @returns {Observable}
 -- @returns {Observable}
 function Observable:switch()
 function Observable:switch()
   return Observable.create(function(observer)
   return Observable.create(function(observer)
-    local subscription
+    local innerSubscription
 
 
     local function onNext(...)
     local function onNext(...)
       return observer:onNext(...)
       return observer:onNext(...)
@@ -20,13 +20,22 @@ function Observable:switch()
     end
     end
 
 
     local function switch(source)
     local function switch(source)
-      if subscription then
-        subscription:unsubscribe()
+      if innerSubscription then
+        innerSubscription:unsubscribe()
       end
       end
 
 
-      subscription = source:subscribe(onNext, onError, nil)
+      innerSubscription = source:subscribe(onNext, onError, nil)
     end
     end
 
 
-    return self:subscribe(switch, onError, onCompleted)
+    local subscription = self:subscribe(switch, onError, onCompleted)
+    return Subscription.create(function()
+      if innerSubscription then
+        innerSubscription:unsubscribe()
+      end
+
+      if subscription then
+        subscription:unsubscribe()
+      end
+    end)
   end)
   end)
 end
 end

+ 13 - 0
tests/switch.lua

@@ -36,4 +36,17 @@ describe('switch', function()
     expect(#onError).to.equal(0)
     expect(#onError).to.equal(0)
     expect(#onCompleted).to.equal(1)
     expect(#onCompleted).to.equal(1)
   end)
   end)
+
+  it('should unsubscribe from inner subscription too', function()
+    local unsubscribeA = spy()
+    local observableA = Rx.Observable.create(function(observer)
+      return Rx.Subscription.create(unsubscribeA)
+    end)
+
+    local subject = Rx.Subject.create()
+    local subscription = subject:switch():subscribe()
+    subject:onNext(observableA)
+    subscription:unsubscribe()
+    expect(#unsubscribeA).to.equal(1)
+  end)
 end)
 end)