|
@@ -631,6 +631,37 @@ function Observable:skipUntil(other)
|
|
|
end)
|
|
|
end
|
|
|
|
|
|
+--- Returns a new Observable that skips elements until the predicate returns falsy for one of them.
|
|
|
+-- @arg {function} predicate - The predicate used to continue skipping values.
|
|
|
+-- @returns {Observable}
|
|
|
+function Observable:skipWhile(predicate)
|
|
|
+ predicate = predicate or identity
|
|
|
+
|
|
|
+ return Observable.create(function(observer)
|
|
|
+ local skipping = true
|
|
|
+
|
|
|
+ local function onNext(...)
|
|
|
+ if skipping then
|
|
|
+ skipping = predicate(...)
|
|
|
+ end
|
|
|
+
|
|
|
+ if not skipipng then
|
|
|
+ return observer:onNext(...)
|
|
|
+ end
|
|
|
+ end
|
|
|
+
|
|
|
+ local function onError(message)
|
|
|
+ return observer:onError(message)
|
|
|
+ end
|
|
|
+
|
|
|
+ local function onComplete()
|
|
|
+ return observer:onComplete()
|
|
|
+ end
|
|
|
+
|
|
|
+ return self:subscribe(onNext, onError, onComplete)
|
|
|
+ end)
|
|
|
+end
|
|
|
+
|
|
|
--- Returns a new Observable that only produces the first n results of the original.
|
|
|
-- @arg {number=1} n - The number of elements to produce before completing.
|
|
|
-- @returns {Observable}
|
|
@@ -690,6 +721,39 @@ function Observable:takeUntil(other)
|
|
|
end)
|
|
|
end
|
|
|
|
|
|
+--- Returns a new Observable that produces elements until the predicate returns falsy.
|
|
|
+-- @arg {function} predicate - The predicate used to continue production of values.
|
|
|
+-- @returns {Observable}
|
|
|
+function Observable:takeWhile(predicate)
|
|
|
+ predicate = predicate or identity
|
|
|
+
|
|
|
+ return Observable.create(function(observer)
|
|
|
+ local taking = true
|
|
|
+
|
|
|
+ local function onNext(...)
|
|
|
+ if taking then
|
|
|
+ taking = predicate(...)
|
|
|
+
|
|
|
+ if taking then
|
|
|
+ return observer:onNext(...)
|
|
|
+ else
|
|
|
+ return observer:onComplete()
|
|
|
+ end
|
|
|
+ end
|
|
|
+ end
|
|
|
+
|
|
|
+ local function onError(message)
|
|
|
+ return observer:onError(message)
|
|
|
+ end
|
|
|
+
|
|
|
+ local function onComplete()
|
|
|
+ return observer:onComplete()
|
|
|
+ end
|
|
|
+
|
|
|
+ return self:subscribe(onNext, onError, onComplete)
|
|
|
+ end)
|
|
|
+end
|
|
|
+
|
|
|
--- Runs a function each time this Observable has activity. Similar to subscribe but does not
|
|
|
-- create a subscription.
|
|
|
-- @arg {function=} onNext - Run when the Observable produces values.
|