Forráskód Böngészése

Finish Observable.debounce;

bjorn 9 éve
szülő
commit
92556b4925
2 módosított fájl, 18 hozzáadás és 10 törlés
  1. 9 5
      rx.lua
  2. 9 5
      src/operators/debounce.lua

+ 9 - 5
rx.lua

@@ -608,18 +608,19 @@ function Observable:debounce(time, scheduler)
   time = time or 0
 
   return Observable.create(function(observer)
+    local debounced = {}
+
     local function wrap(key)
-      local debounced
       return function(...)
         local value = util.pack(...)
 
-        if debounced then
-          debounced:unsubscribe()
+        if debounced[key] then
+          debounced[key]:unsubscribe()
         end
 
         local values = util.pack(...)
 
-        debounced = scheduler:schedule(function()
+        debounced[key] = scheduler:schedule(function()
           return observer[key](observer, util.unpack(values))
         end, time)
       end
@@ -628,7 +629,10 @@ function Observable:debounce(time, scheduler)
     local subscription = self:subscribe(wrap('onNext'), wrap('onError'), wrap('onCompleted'))
 
     return Subscription.create(function()
-
+      if subscription then subscription:unsubscribe() end
+      for _, timeout in pairs(debounced) do
+        timeout:unsubscribe()
+      end
     end)
   end)
 end

+ 9 - 5
src/operators/debounce.lua

@@ -6,18 +6,19 @@ function Observable:debounce(time, scheduler)
   time = time or 0
 
   return Observable.create(function(observer)
+    local debounced = {}
+
     local function wrap(key)
-      local debounced
       return function(...)
         local value = util.pack(...)
 
-        if debounced then
-          debounced:unsubscribe()
+        if debounced[key] then
+          debounced[key]:unsubscribe()
         end
 
         local values = util.pack(...)
 
-        debounced = scheduler:schedule(function()
+        debounced[key] = scheduler:schedule(function()
           return observer[key](observer, util.unpack(values))
         end, time)
       end
@@ -26,7 +27,10 @@ function Observable:debounce(time, scheduler)
     local subscription = self:subscribe(wrap('onNext'), wrap('onError'), wrap('onCompleted'))
 
     return Subscription.create(function()
-
+      if subscription then subscription:unsubscribe() end
+      for _, timeout in pairs(debounced) do
+        timeout:unsubscribe()
+      end
     end)
   end)
 end