|
@@ -1,42 +1,61 @@
|
|
local function noop() end
|
|
local function noop() end
|
|
|
|
|
|
|
|
+----
|
|
|
|
+-- Observer
|
|
|
|
+-- A simple object that receives values from an Observable.
|
|
local Observer = {}
|
|
local Observer = {}
|
|
Observer.__index = Observer
|
|
Observer.__index = Observer
|
|
|
|
|
|
-function Observer.create(onNext, onError, onCompleted)
|
|
|
|
|
|
+-- Creates a new Observer.
|
|
|
|
+-- @arg {function=} onNext - Called when the Observable produces a value.
|
|
|
|
+-- @arg {function=} onError - Called when the Observable terminates due to an error.
|
|
|
|
+-- @arg {function=} onComplete - Called when the Observable completes normally.
|
|
|
|
+-- @returns {Observer}
|
|
|
|
+function Observer.create(onNext, onError, onComplete)
|
|
local self = {
|
|
local self = {
|
|
_onNext = onNext or noop,
|
|
_onNext = onNext or noop,
|
|
_onError = onError or error,
|
|
_onError = onError or error,
|
|
- _onCompleted = onCompleted or noop,
|
|
|
|
|
|
+ _onComplete = onComplete or noop,
|
|
stopped = false
|
|
stopped = false
|
|
}
|
|
}
|
|
|
|
|
|
return setmetatable(self, Observer)
|
|
return setmetatable(self, Observer)
|
|
end
|
|
end
|
|
|
|
|
|
|
|
+-- Pushes a new value to the Observer.
|
|
|
|
+-- @arg {*} value
|
|
function Observer:onNext(value)
|
|
function Observer:onNext(value)
|
|
if not self.stopped then
|
|
if not self.stopped then
|
|
self._onNext(value)
|
|
self._onNext(value)
|
|
end
|
|
end
|
|
end
|
|
end
|
|
|
|
|
|
-function Observer:onError(e)
|
|
|
|
|
|
+-- Notify the Observer that an error has occurred.
|
|
|
|
+-- @arg {string=} message - A string describing what went wrong.
|
|
|
|
+function Observer:onError(message)
|
|
if not self.stopped then
|
|
if not self.stopped then
|
|
self.stopped = true
|
|
self.stopped = true
|
|
- self._onError(e)
|
|
|
|
|
|
+ self._onError(message)
|
|
end
|
|
end
|
|
end
|
|
end
|
|
|
|
|
|
-function Observer:onCompleted()
|
|
|
|
|
|
+-- Notify the Observer that the sequence has completed and will produce no more values.
|
|
|
|
+function Observer:onComplete()
|
|
if not self.stopped then
|
|
if not self.stopped then
|
|
self.stopped = true
|
|
self.stopped = true
|
|
- self._onCompleted()
|
|
|
|
|
|
+ self._onComplete()
|
|
end
|
|
end
|
|
end
|
|
end
|
|
|
|
|
|
|
|
+----
|
|
|
|
+-- Observable
|
|
|
|
+-- An object that pushes values to an Observer.
|
|
local Observable = {}
|
|
local Observable = {}
|
|
Observable.__index = Observable
|
|
Observable.__index = Observable
|
|
|
|
|
|
|
|
+-- Creates a new Observable.
|
|
|
|
+-- @arg {function} subscribe - The subscription function that produces values.
|
|
|
|
+-- @returns {Observable}
|
|
function Observable.create(subscribe)
|
|
function Observable.create(subscribe)
|
|
local self = {
|
|
local self = {
|
|
_subscribe = subscribe
|
|
_subscribe = subscribe
|
|
@@ -45,13 +64,19 @@ function Observable.create(subscribe)
|
|
return setmetatable(self, Observable)
|
|
return setmetatable(self, Observable)
|
|
end
|
|
end
|
|
|
|
|
|
|
|
+-- Creates an Observable that produces a single value.
|
|
|
|
+-- @arg {*} value
|
|
|
|
+-- @returns {Observable}
|
|
function Observable.fromValue(value)
|
|
function Observable.fromValue(value)
|
|
return Observable.create(function(observer)
|
|
return Observable.create(function(observer)
|
|
observer:onNext(value)
|
|
observer:onNext(value)
|
|
- observer:onCompleted()
|
|
|
|
|
|
+ observer:onComplete()
|
|
end)
|
|
end)
|
|
end
|
|
end
|
|
|
|
|
|
|
|
+-- Creates an Observable that produces values when the specified coroutine yields.
|
|
|
|
+-- @arg {thread} coroutine
|
|
|
|
+-- @returns {Observable}
|
|
function Observable.fromCoroutine(cr)
|
|
function Observable.fromCoroutine(cr)
|
|
return Observable.create(function(observer)
|
|
return Observable.create(function(observer)
|
|
while true do
|
|
while true do
|
|
@@ -60,41 +85,54 @@ function Observable.fromCoroutine(cr)
|
|
if coroutine.status(cr) == 'dead' then break end
|
|
if coroutine.status(cr) == 'dead' then break end
|
|
end
|
|
end
|
|
|
|
|
|
- observer:onCompleted()
|
|
|
|
|
|
+ observer:onComplete()
|
|
end)
|
|
end)
|
|
end
|
|
end
|
|
|
|
|
|
|
|
+-- Shorthand for creating an Observer and passing it to this Observable's subscription function.
|
|
|
|
+-- @arg {function} onNext - Called when the Observable produces a value.
|
|
|
|
+-- @arg {function} onError - Called when the Observable terminates due to an error.
|
|
|
|
+-- @arg {function} onComplete - Called when the Observable completes normally.
|
|
function Observable:subscribe(onNext, onError, onComplete)
|
|
function Observable:subscribe(onNext, onError, onComplete)
|
|
return self._subscribe(Observer.create(onNext, onError, onComplete))
|
|
return self._subscribe(Observer.create(onNext, onError, onComplete))
|
|
end
|
|
end
|
|
|
|
|
|
|
|
+-- Subscribes to this Observable and prints values it produces.
|
|
|
|
+-- @arg {string=} name - Prefixes the printed messages with a name.
|
|
function Observable:dump(name)
|
|
function Observable:dump(name)
|
|
- name = name or ''
|
|
|
|
|
|
+ name = name and (name .. ' ') or ''
|
|
|
|
|
|
- local onNext = function(x) print(name .. ' onNext: ' .. x) end
|
|
|
|
- local onError = function(e) error(name .. ' onError: ' .. e) end
|
|
|
|
- local onCompleted = function() print(name .. ' onCompleted') end
|
|
|
|
|
|
+ local onNext = function(x) print(name .. 'onNext: ' .. x) end
|
|
|
|
+ local onError = function(e) print(name .. 'onError: ' .. e) end
|
|
|
|
+ local onComplete = function() print(name .. 'onComplete') end
|
|
|
|
|
|
- return self:subscribe(onNext, onError, onCompleted)
|
|
|
|
|
|
+ return self:subscribe(onNext, onError, onComplete)
|
|
end
|
|
end
|
|
|
|
|
|
--- Combinators
|
|
|
|
|
|
+----
|
|
|
|
+-- Transformers
|
|
|
|
+-- These functions transform the values produced by an Observable and return a new Observable that
|
|
|
|
+-- produces these values.
|
|
|
|
+
|
|
|
|
+-- Returns a new Observable that only produces the first result of the original.
|
|
|
|
+-- @returns {Observable}
|
|
function Observable:first()
|
|
function Observable:first()
|
|
return Observable.create(function(observer)
|
|
return Observable.create(function(observer)
|
|
return self:subscribe(function(x)
|
|
return self:subscribe(function(x)
|
|
observer:onNext(x)
|
|
observer:onNext(x)
|
|
- observer:onCompleted()
|
|
|
|
|
|
+ observer:onComplete()
|
|
end,
|
|
end,
|
|
function(e)
|
|
function(e)
|
|
observer:onError(e)
|
|
observer:onError(e)
|
|
end,
|
|
end,
|
|
function()
|
|
function()
|
|
- observer:onCompleted()
|
|
|
|
|
|
+ observer:onComplete()
|
|
end)
|
|
end)
|
|
end)
|
|
end)
|
|
end
|
|
end
|
|
|
|
|
|
-
|
|
|
|
|
|
+-- Returns a new Observable that only produces the last result of the original.
|
|
|
|
+-- @returns {Observable}
|
|
function Observable:last()
|
|
function Observable:last()
|
|
return Observable.create(function(observer)
|
|
return Observable.create(function(observer)
|
|
local value
|
|
local value
|
|
@@ -106,11 +144,14 @@ function Observable:last()
|
|
end,
|
|
end,
|
|
function()
|
|
function()
|
|
observer:onNext(value)
|
|
observer:onNext(value)
|
|
- observer:onCompleted()
|
|
|
|
|
|
+ observer:onComplete()
|
|
end)
|
|
end)
|
|
end)
|
|
end)
|
|
end
|
|
end
|
|
|
|
|
|
|
|
+-- Returns a new Observable that produces the values of the original transformed by a function.
|
|
|
|
+-- @arg {function} callback - The function to transform values from the original Observable.
|
|
|
|
+-- @returns {Observable}
|
|
function Observable:map(fn)
|
|
function Observable:map(fn)
|
|
fn = fn or function(x) return x end
|
|
fn = fn or function(x) return x end
|
|
return Observable.create(function(observer)
|
|
return Observable.create(function(observer)
|
|
@@ -121,14 +162,21 @@ function Observable:map(fn)
|
|
observer:onError(e)
|
|
observer:onError(e)
|
|
end,
|
|
end,
|
|
function()
|
|
function()
|
|
- observer:onCompleted()
|
|
|
|
|
|
+ observer:onComplete()
|
|
end)
|
|
end)
|
|
end)
|
|
end)
|
|
end
|
|
end
|
|
|
|
|
|
|
|
+-- Returns a new Observable that produces a single value computed by accumulating the results of
|
|
|
|
+-- running a function on each value produced by the original Observable.
|
|
|
|
+-- @arg {function} accumulator - Accumulates the values of the original Observable. Will be passed
|
|
|
|
+-- the return value of the last call as the first argument and the
|
|
|
|
+-- current value as the second.
|
|
|
|
+-- @arg {*} seed - A value to pass to the accumulator the first time it is run.
|
|
|
|
+-- @returns {Observable}
|
|
function Observable:reduce(accumulator, seed)
|
|
function Observable:reduce(accumulator, seed)
|
|
return Observable.create(function(observer)
|
|
return Observable.create(function(observer)
|
|
- local currentValue = nil or seed
|
|
|
|
|
|
+ local currentValue = seed
|
|
return self:subscribe(function(x)
|
|
return self:subscribe(function(x)
|
|
currentValue = accumulator(currentValue, x)
|
|
currentValue = accumulator(currentValue, x)
|
|
end,
|
|
end,
|
|
@@ -137,15 +185,24 @@ function Observable:reduce(accumulator, seed)
|
|
end,
|
|
end,
|
|
function()
|
|
function()
|
|
observer:onNext(currentValue)
|
|
observer:onNext(currentValue)
|
|
- observer:onCompleted()
|
|
|
|
|
|
+ observer:onComplete()
|
|
end)
|
|
end)
|
|
end)
|
|
end)
|
|
end
|
|
end
|
|
|
|
|
|
|
|
+-- Returns a new Observable that produces the sum of the values produced by the original Observable.
|
|
|
|
+-- @returns {Observable}
|
|
function Observable:sum()
|
|
function Observable:sum()
|
|
return self:reduce(function(x, y) return x + y end, 0)
|
|
return self:reduce(function(x, y) return x + y end, 0)
|
|
end
|
|
end
|
|
|
|
|
|
|
|
+-- Returns a new Observable that runs a combinator function on the most recent values from a set of
|
|
|
|
+-- Observables whenever any of them produce a new value. The results of the combinator function are
|
|
|
|
+-- produced by the new Observable.
|
|
|
|
+-- @arg {Observable...} observables - One or more Observables to combine.
|
|
|
|
+-- @arg {function} combinator - A function that combines the latest result from each Observable and
|
|
|
|
+-- returns a single value.
|
|
|
|
+-- @returns {Observable}
|
|
function Observable:combineLatest(...)
|
|
function Observable:combineLatest(...)
|
|
local values = {}
|
|
local values = {}
|
|
local done = {}
|
|
local done = {}
|
|
@@ -174,7 +231,7 @@ function Observable:combineLatest(...)
|
|
end
|
|
end
|
|
|
|
|
|
if stop then
|
|
if stop then
|
|
- observer:onCompleted()
|
|
|
|
|
|
+ observer:onComplete()
|
|
end
|
|
end
|
|
end
|
|
end
|
|
|
|
|