|
@@ -3,13 +3,12 @@ local rx
|
|
|
local function noop() end
|
|
|
local function identity(x) return x end
|
|
|
|
|
|
-----
|
|
|
--- Observer
|
|
|
--- A simple object that receives values from an Observable.
|
|
|
+--- Observer
|
|
|
+-- Observers are simple objects that receive values from Observables.
|
|
|
local Observer = {}
|
|
|
Observer.__index = Observer
|
|
|
|
|
|
--- Creates a new Observer.
|
|
|
+--- Creates a new Observer.
|
|
|
-- @arg {function=} onNext - Called when the Observable produces a value.
|
|
|
-- @arg {function=} onError - Called when the Observable terminates due to an error.
|
|
|
-- @arg {function=} onComplete - Called when the Observable completes normally.
|
|
@@ -25,7 +24,7 @@ function Observer.create(onNext, onError, onComplete)
|
|
|
return setmetatable(self, Observer)
|
|
|
end
|
|
|
|
|
|
--- Pushes a new value to the Observer.
|
|
|
+--- Pushes a new value to the Observer.
|
|
|
-- @arg {*} value
|
|
|
function Observer:onNext(value)
|
|
|
if not self.stopped then
|
|
@@ -33,7 +32,7 @@ function Observer:onNext(value)
|
|
|
end
|
|
|
end
|
|
|
|
|
|
--- Notify the Observer that an error has occurred.
|
|
|
+--- Notify the Observer that an error has occurred.
|
|
|
-- @arg {string=} message - A string describing what went wrong.
|
|
|
function Observer:onError(message)
|
|
|
if not self.stopped then
|
|
@@ -42,7 +41,7 @@ function Observer:onError(message)
|
|
|
end
|
|
|
end
|
|
|
|
|
|
--- Notify the Observer that the sequence has completed and will produce no more values.
|
|
|
+--- Notify the Observer that the sequence has completed and will produce no more values.
|
|
|
function Observer:onComplete()
|
|
|
if not self.stopped then
|
|
|
self.stopped = true
|
|
@@ -50,13 +49,12 @@ function Observer:onComplete()
|
|
|
end
|
|
|
end
|
|
|
|
|
|
-----
|
|
|
--- Observable
|
|
|
--- An object that pushes values to an Observer.
|
|
|
+--- Observable
|
|
|
+-- Observables push values to Observers.
|
|
|
local Observable = {}
|
|
|
Observable.__index = Observable
|
|
|
|
|
|
--- Creates a new Observable.
|
|
|
+--- Creates a new Observable.
|
|
|
-- @arg {function} subscribe - The subscription function that produces values.
|
|
|
-- @returns {Observable}
|
|
|
function Observable.create(subscribe)
|
|
@@ -67,7 +65,7 @@ function Observable.create(subscribe)
|
|
|
return setmetatable(self, Observable)
|
|
|
end
|
|
|
|
|
|
--- Creates an Observable that produces a single value.
|
|
|
+--- Creates an Observable that produces a single value.
|
|
|
-- @arg {*} value
|
|
|
-- @returns {Observable}
|
|
|
function Observable.fromValue(value)
|
|
@@ -77,14 +75,15 @@ function Observable.fromValue(value)
|
|
|
end)
|
|
|
end
|
|
|
|
|
|
--- Creates an Observable that produces values when the specified coroutine yields.
|
|
|
+--- Creates an Observable that produces values when the specified coroutine yields.
|
|
|
-- @arg {thread} coroutine
|
|
|
-- @returns {Observable}
|
|
|
-function Observable.fromCoroutine(cr)
|
|
|
+function Observable.fromCoroutine(thread)
|
|
|
+ thread = type(thread) == 'function' and coroutine.create(thread) or thread
|
|
|
return Observable.create(function(observer)
|
|
|
return rx.scheduler:schedule(function()
|
|
|
while not observer.stopped do
|
|
|
- local success, value = coroutine.resume(cr)
|
|
|
+ local success, value = coroutine.resume(thread)
|
|
|
|
|
|
if success then
|
|
|
observer:onNext(value)
|
|
@@ -92,7 +91,7 @@ function Observable.fromCoroutine(cr)
|
|
|
return observer:onError(value)
|
|
|
end
|
|
|
|
|
|
- if coroutine.status(cr) == 'dead' then
|
|
|
+ if coroutine.status(thread) == 'dead' then
|
|
|
return observer:onComplete()
|
|
|
end
|
|
|
|
|
@@ -102,7 +101,7 @@ function Observable.fromCoroutine(cr)
|
|
|
end)
|
|
|
end
|
|
|
|
|
|
--- Shorthand for creating an Observer and passing it to this Observable's subscription function.
|
|
|
+--- Shorthand for creating an Observer and passing it to this Observable's subscription function.
|
|
|
-- @arg {function} onNext - Called when the Observable produces a value.
|
|
|
-- @arg {function} onError - Called when the Observable terminates due to an error.
|
|
|
-- @arg {function} onComplete - Called when the Observable completes normally.
|
|
@@ -110,7 +109,7 @@ function Observable:subscribe(onNext, onError, onComplete)
|
|
|
return self._subscribe(Observer.create(onNext, onError, onComplete))
|
|
|
end
|
|
|
|
|
|
--- Subscribes to this Observable and prints values it produces.
|
|
|
+--- Subscribes to this Observable and prints values it produces.
|
|
|
-- @arg {string=} name - Prefixes the printed messages with a name.
|
|
|
function Observable:dump(name)
|
|
|
name = name and (name .. ' ') or ''
|
|
@@ -122,12 +121,10 @@ function Observable:dump(name)
|
|
|
return self:subscribe(onNext, onError, onComplete)
|
|
|
end
|
|
|
|
|
|
-----
|
|
|
--- Transformers
|
|
|
--- These functions transform the values produced by an Observable and return a new Observable that
|
|
|
--- produces these values.
|
|
|
+-- The functions below transform the values produced by an Observable and return a new Observable
|
|
|
+-- that produces these values.
|
|
|
|
|
|
--- Returns a new Observable that only produces the first result of the original.
|
|
|
+--- Returns a new Observable that only produces the first result of the original.
|
|
|
-- @returns {Observable}
|
|
|
function Observable:first()
|
|
|
return Observable.create(function(observer)
|
|
@@ -148,7 +145,7 @@ function Observable:first()
|
|
|
end)
|
|
|
end
|
|
|
|
|
|
--- Returns a new Observable that only produces the last result of the original.
|
|
|
+--- Returns a new Observable that only produces the last result of the original.
|
|
|
-- @returns {Observable}
|
|
|
function Observable:last()
|
|
|
return Observable.create(function(observer)
|
|
@@ -171,7 +168,7 @@ function Observable:last()
|
|
|
end)
|
|
|
end
|
|
|
|
|
|
--- Returns a new Observable that produces the values of the original transformed by a function.
|
|
|
+--- Returns a new Observable that produces the values of the original transformed by a function.
|
|
|
-- @arg {function} callback - The function to transform values from the original Observable.
|
|
|
-- @returns {Observable}
|
|
|
function Observable:map(callback)
|
|
@@ -194,7 +191,7 @@ function Observable:map(callback)
|
|
|
end)
|
|
|
end
|
|
|
|
|
|
--- Returns a new Observable that produces a single value computed by accumulating the results of
|
|
|
+--- Returns a new Observable that produces a single value computed by accumulating the results of
|
|
|
-- running a function on each value produced by the original Observable.
|
|
|
-- @arg {function} accumulator - Accumulates the values of the original Observable. Will be passed
|
|
|
-- the return value of the last call as the first argument and the
|
|
@@ -223,15 +220,16 @@ function Observable:reduce(accumulator, seed)
|
|
|
end)
|
|
|
end
|
|
|
|
|
|
--- Returns a new Observable that produces the sum of the values produced by the original Observable.
|
|
|
+--- Returns a new Observable that produces the sum of the values of the original Observable as a
|
|
|
+-- single result.
|
|
|
-- @returns {Observable}
|
|
|
function Observable:sum()
|
|
|
return self:reduce(function(x, y) return x + y end, 0)
|
|
|
end
|
|
|
|
|
|
--- Returns a new Observable that runs a combinator function on the most recent values from a set of
|
|
|
--- Observables whenever any of them produce a new value. The results of the combinator function are
|
|
|
--- produced by the new Observable.
|
|
|
+--- Returns a new Observable that runs a combinator function on the most recent values from a set
|
|
|
+-- of Observables whenever any of them produce a new value. The results of the combinator function
|
|
|
+-- are produced by the new Observable.
|
|
|
-- @arg {Observable...} observables - One or more Observables to combine.
|
|
|
-- @arg {function} combinator - A function that combines the latest result from each Observable and
|
|
|
-- returns a single value.
|
|
@@ -277,12 +275,10 @@ function Observable:combineLatest(...)
|
|
|
end)
|
|
|
end
|
|
|
|
|
|
-----
|
|
|
-- Scheduler
|
|
|
-- Schedulers manage groups of Observables.
|
|
|
local Scheduler = {}
|
|
|
|
|
|
-----
|
|
|
-- Cooperative Scheduler
|
|
|
-- Manages Observables using coroutines and a virtual clock that must be updated manually.
|
|
|
local Cooperative = {}
|