Browse Source

Start Subscription class;

bjorn 9 years ago
parent
commit
ce756b980d
1 changed files with 44 additions and 2 deletions
  1. 44 2
      rx.lua

+ 44 - 2
rx.lua

@@ -7,6 +7,33 @@ local function noop() end
 local function identity(x) return x end
 local function constant(x) return function() return x end end
 
+--- @class Subscription
+-- @description A handle representing the link between an Observer and an Observable, as well as
+-- any work required to clean up after the Observable completes or the Observer unsubscribes.
+local Subscription = {}
+Subscription.__index = Subscription
+Subscription.__tostring = constant('Subscription')
+
+--- Creates a new Subscription.
+-- @arg {function=} action - The action to run when the subscription is unsubscribed. It will only
+--                           be run once.
+-- @returns {Subscription}
+function Subscription.create(action)
+  local self = {
+    action = action or noop,
+    unsubscribed = false
+  }
+
+  return setmetatable(self, Subscription)
+end
+
+--- Unsubscribes the subscription, performing any necessary cleanup work.
+function Subscription:unsubscribe()
+  if self.unsubscribed then return end
+  self.action(self)
+  self.unsubscribed = true
+end
+
 --- @class Observer
 -- @description Observers are simple objects that receive values from Observables.
 local Observer = {}
@@ -993,10 +1020,24 @@ end
 --                          number, which will put it to sleep for a time period.
 -- @arg {number=0} delay - Delay execution of the action by a time period.
 function Cooperative:schedule(action, delay)
-  table.insert(self.tasks, {
+  local task = {
     thread = coroutine.create(action),
     due = self.currentTime + (delay or 0)
-  })
+  }
+
+  table.insert(self.tasks, task)
+
+  return rx.Subscription.create(function()
+    return self:unschedule(task)
+  end)
+end
+
+function Cooperative:unschedule(task)
+  for i = 1, #self.tasks do
+    if self.tasks[i] == task then
+      table.remove(self.tasks, i)
+    end
+  end
 end
 
 --- Triggers an update of the Cooperative Scheduler. The clock will be advanced and the scheduler
@@ -1096,6 +1137,7 @@ end
 Subject.__call = Subject.onNext
 
 rx = {
+  Subscription = Subscription,
   Observer = Observer,
   Observable = Observable,
   Scheduler = Scheduler,