|
@@ -24,11 +24,11 @@ function Observer.create(onNext, onError, onComplete)
|
|
|
return setmetatable(self, Observer)
|
|
|
end
|
|
|
|
|
|
---- Pushes a new value to the Observer.
|
|
|
--- @arg {*} value
|
|
|
-function Observer:onNext(value)
|
|
|
+--- Pushes zero or more values to the Observer.
|
|
|
+-- @arg {*...} values
|
|
|
+function Observer:onNext(...)
|
|
|
if not self.stopped then
|
|
|
- self._onNext(value)
|
|
|
+ self._onNext(...)
|
|
|
end
|
|
|
end
|
|
|
|
|
@@ -151,7 +151,7 @@ end
|
|
|
function Observable:dump(name)
|
|
|
name = name and (name .. ' ') or ''
|
|
|
|
|
|
- local onNext = function(x) print(name .. 'onNext: ' .. (x or '')) end
|
|
|
+ local onNext = function(...) print(name .. 'onNext: ' .. table.concat({...}, ', ')) end
|
|
|
local onError = function(e) print(name .. 'onError: ' .. e) end
|
|
|
local onComplete = function() print(name .. 'onComplete') end
|
|
|
|
|
@@ -219,8 +219,8 @@ function Observable:concat(other, ...)
|
|
|
local others = {...}
|
|
|
|
|
|
return Observable.create(function(observer)
|
|
|
- local function onNext(value)
|
|
|
- return observer:onNext(value)
|
|
|
+ local function onNext(...)
|
|
|
+ return observer:onNext(...)
|
|
|
end
|
|
|
|
|
|
local function onError(message)
|
|
@@ -272,9 +272,9 @@ function Observable:filter(predicate)
|
|
|
predicate = predicate or identity
|
|
|
|
|
|
return Observable.create(function(observer)
|
|
|
- local function onNext(x)
|
|
|
- if predicate(x) then
|
|
|
- return observer:onNext(x)
|
|
|
+ local function onNext(...)
|
|
|
+ if predicate(...) then
|
|
|
+ return observer:onNext(...)
|
|
|
end
|
|
|
end
|
|
|
|
|
@@ -297,9 +297,9 @@ function Observable:find(predicate)
|
|
|
predicate = predicate or identity
|
|
|
|
|
|
return Observable.create(function(observer)
|
|
|
- local function onNext(x)
|
|
|
- if predicate(x) then
|
|
|
- observer:onNext(x)
|
|
|
+ local function onNext(...)
|
|
|
+ if predicate(...) then
|
|
|
+ observer:onNext(...)
|
|
|
return observer:onComplete()
|
|
|
end
|
|
|
end
|
|
@@ -328,8 +328,8 @@ function Observable:last()
|
|
|
return Observable.create(function(observer)
|
|
|
local value
|
|
|
|
|
|
- local function onNext(x)
|
|
|
- value = x
|
|
|
+ local function onNext(...)
|
|
|
+ value = {...}
|
|
|
end
|
|
|
|
|
|
local function onError(e)
|
|
@@ -337,7 +337,7 @@ function Observable:last()
|
|
|
end
|
|
|
|
|
|
local function onComplete()
|
|
|
- observer:onNext(value)
|
|
|
+ observer:onNext(unpack(value or {}))
|
|
|
return observer:onComplete()
|
|
|
end
|
|
|
|
|
@@ -352,8 +352,8 @@ function Observable:map(callback)
|
|
|
return Observable.create(function(observer)
|
|
|
callback = callback or identity
|
|
|
|
|
|
- local function onNext(x)
|
|
|
- return observer:onNext(callback(x))
|
|
|
+ local function onNext(...)
|
|
|
+ return observer:onNext(callback(...))
|
|
|
end
|
|
|
|
|
|
local function onError(e)
|
|
@@ -389,8 +389,8 @@ function Observable:merge(...)
|
|
|
table.insert(sources, 1, self)
|
|
|
|
|
|
return Observable.create(function(observer)
|
|
|
- local function onNext(value)
|
|
|
- return observer:onNext(value)
|
|
|
+ local function onNext(...)
|
|
|
+ return observer:onNext(...)
|
|
|
end
|
|
|
|
|
|
local function onError(message)
|
|
@@ -439,16 +439,16 @@ end
|
|
|
-- running a function on each value produced by the original Observable.
|
|
|
-- @arg {function} accumulator - Accumulates the values of the original Observable. Will be passed
|
|
|
-- the return value of the last call as the first argument and the
|
|
|
--- current value as the second.
|
|
|
+-- current values as the rest of the arguments.
|
|
|
-- @arg {*} seed - A value to pass to the accumulator the first time it is run.
|
|
|
-- @returns {Observable}
|
|
|
function Observable:reduce(accumulator, seed)
|
|
|
return Observable.create(function(observer)
|
|
|
local result
|
|
|
|
|
|
- local function onNext(x)
|
|
|
- result = result or seed or x
|
|
|
- result = accumulator(result, x)
|
|
|
+ local function onNext(...)
|
|
|
+ result = result or seed or (...)
|
|
|
+ result = accumulator(result, ...)
|
|
|
end
|
|
|
|
|
|
local function onError(e)
|
|
@@ -474,9 +474,9 @@ function Observable:skip(n)
|
|
|
return Observable.create(function(observer)
|
|
|
local i = 1
|
|
|
|
|
|
- local function onNext(x)
|
|
|
+ local function onNext(...)
|
|
|
if i > n then
|
|
|
- observer:onNext(x)
|
|
|
+ observer:onNext(...)
|
|
|
else
|
|
|
i = i + 1
|
|
|
end
|
|
@@ -501,8 +501,8 @@ end
|
|
|
function Observable:skipUntil(other)
|
|
|
return Observable.create(function(observer)
|
|
|
local function trigger()
|
|
|
- local function onNext(value)
|
|
|
- return observer:onNext(value)
|
|
|
+ local function onNext(...)
|
|
|
+ return observer:onNext(...)
|
|
|
end
|
|
|
|
|
|
local function onError(message)
|
|
@@ -541,8 +541,8 @@ function Observable:take(n)
|
|
|
|
|
|
local i = 1
|
|
|
|
|
|
- local function onNext(x)
|
|
|
- observer:onNext(x)
|
|
|
+ local function onNext(...)
|
|
|
+ observer:onNext(...)
|
|
|
|
|
|
i = i + 1
|
|
|
|
|
@@ -568,8 +568,8 @@ end
|
|
|
-- @returns {Observable}
|
|
|
function Observable:takeUntil(other)
|
|
|
return Observable.create(function(observer)
|
|
|
- local function onNext(x)
|
|
|
- return observer:onNext(x)
|
|
|
+ local function onNext(...)
|
|
|
+ return observer:onNext(...)
|
|
|
end
|
|
|
|
|
|
local function onError(e)
|