Browse Source

BehaviorSubject;

bjorn 9 years ago
parent
commit
2101b813ff
1 changed files with 75 additions and 22 deletions
  1. 75 22
      rx.lua

+ 75 - 22
rx.lua

@@ -1083,58 +1083,110 @@ Subject.__index = Subject
 Subject.__tostring = constant('Subject')
 Subject.__tostring = constant('Subject')
 
 
 --- Creates a new Subject.
 --- Creates a new Subject.
--- @arg {*...} value - The initial values.
 -- @returns {Subject}
 -- @returns {Subject}
-function Subject.create(...)
+function Subject.create()
   local self = {
   local self = {
-    value = {...},
-    observers = {}
+    observers = {},
+    stopped = false
   }
   }
 
 
   return setmetatable(self, Subject)
   return setmetatable(self, Subject)
 end
 end
 
 
 --- Creates a new Observer and attaches it to the Subject.
 --- Creates a new Observer and attaches it to the Subject.
--- @arg {function} onNext - Called when the Subject produces a value.
+-- @arg {function|table} onNext|observer - A function called when the Subject produces a value or
+--                                         an existing Observer to attach to the Subject.
 -- @arg {function} onError - Called when the Subject terminates due to an error.
 -- @arg {function} onError - Called when the Subject terminates due to an error.
 -- @arg {function} onComplete - Called when the Subject completes normally.
 -- @arg {function} onComplete - Called when the Subject completes normally.
 function Subject:subscribe(onNext, onError, onComplete)
 function Subject:subscribe(onNext, onError, onComplete)
-  table.insert(self.observers, Observer.create(onNext, onError, onComplete))
+  local observer
+
+  if type(onNext) == 'table' then
+    observer = onNext
+  else
+    observer = Observer.create(onNext, onError, onComplete)
+  end
+
+  table.insert(self.observers, observer)
 end
 end
 
 
---- Pushes zero or more values to the Subject. It will be broadcasted to all Observers.
+--- Pushes zero or more values to the Subject. They will be broadcasted to all Observers.
 -- @arg {*...} values
 -- @arg {*...} values
 function Subject:onNext(...)
 function Subject:onNext(...)
-  self.value = {...}
-
-  for i = 1, #self.observers do
-    self.observers[i]:onNext(...)
+  if not self.stopped then
+    for i = 1, #self.observers do
+      self.observers[i]:onNext(...)
+    end
   end
   end
 end
 end
 
 
 --- Signal to all Observers that an error has occurred.
 --- Signal to all Observers that an error has occurred.
 -- @arg {string=} message - A string describing what went wrong.
 -- @arg {string=} message - A string describing what went wrong.
 function Subject:onError(message)
 function Subject:onError(message)
-  for i = 1, #self.observers do
-    self.observers[i]:onError(message)
+  if not self.stopped then
+    for i = 1, #self.observers do
+      self.observers[i]:onError(message)
+    end
+
+    self.stopped = true
   end
   end
 end
 end
 
 
 --- Signal to all Observers that the Subject will not produce any more values.
 --- Signal to all Observers that the Subject will not produce any more values.
 function Subject:onComplete()
 function Subject:onComplete()
-  for i = 1, #self.observers do
-    self.observers[i]:onComplete()
+  if not self.stopped then
+    for i = 1, #self.observers do
+      self.observers[i]:onComplete()
+    end
+
+    self.stopped = true
   end
   end
 end
 end
 
 
---- Returns the last value emitted by the Subject, or the initial value passed to the constructor
--- if nothing has been emitted yet.
--- @returns {*...}
-function Subject:getValue()
-  return unpack(self.value or {})
+Subject.__call = Subject.onNext
+
+--- @class BehaviorSubject
+-- @description A Subject that tracks its current value. Provides an accessor to retrieve the most
+-- recent pushed value, and all subscribers immediately receive the latest value.
+local BehaviorSubject = setmetatable({}, Subject)
+BehaviorSubject.__index = BehaviorSubject
+BehaviorSubject.__tostring = constant('BehaviorSubject')
+
+--- Creates a new BehaviorSubject.
+-- @arg {*...} value - The initial values.
+-- @returns {Subject}
+function BehaviorSubject.create(...)
+  local self = {
+    observers = {},
+    stopped = false
+  }
+
+  if select('#', ...) > 0 then
+    self.value = pack(...)
+  end
+
+  return setmetatable(self, BehaviorSubject)
 end
 end
 
 
-Subject.__call = Subject.onNext
+--- Creates a new Observer and attaches it to the Subject. Immediately broadcasts the most recent
+-- value to the Observer.
+-- @arg {function} onNext - Called when the Subject produces a value.
+-- @arg {function} onError - Called when the Subject terminates due to an error.
+-- @arg {function} onComplete - Called when the Subject completes normally.
+function BehaviorSubject:subscribe(onNext, onError, onComplete)
+  local observer = Observer.create(onNext, onError, onComplete)
+  Subject.subscribe(self, observer)
+  if self.value then
+    observer:onNext(unpack(self.value))
+  end
+end
+
+--- Pushes zero or more values to the BehaviorSubject. They will be broadcasted to all Observers.
+-- @arg {*...} values
+function BehaviorSubject:onNext(...)
+  self.value = pack(...)
+  return Subject.onNext(self, ...)
+end
 
 
 rx = {
 rx = {
   Subscription = Subscription,
   Subscription = Subscription,
@@ -1142,7 +1194,8 @@ rx = {
   Observable = Observable,
   Observable = Observable,
   Scheduler = Scheduler,
   Scheduler = Scheduler,
   scheduler = Scheduler.Immediate.create(),
   scheduler = Scheduler.Immediate.create(),
-  Subject = Subject
+  Subject = Subject,
+  BehaviorSubject = BehaviorSubject
 }
 }
 
 
 return rx
 return rx