|
@@ -222,6 +222,26 @@ function Observable.fromCoroutine(thread, scheduler)
|
|
|
end)
|
|
|
end
|
|
|
|
|
|
+--- Creates an Observable that produces values from a file, line by line.
|
|
|
+-- @arg {string} filename - The name of the file used to create the Observable
|
|
|
+-- @returns {Observable}
|
|
|
+function Observable.fromFileByLine(filename)
|
|
|
+ return Observable.create(function(observer)
|
|
|
+ local f = io.open(filename, 'r')
|
|
|
+ if f
|
|
|
+ then
|
|
|
+ f:close()
|
|
|
+ for line in io.lines(filename) do
|
|
|
+ observer:onNext(line)
|
|
|
+ end
|
|
|
+
|
|
|
+ return observer:onCompleted()
|
|
|
+ else
|
|
|
+ return observer:onError(filename)
|
|
|
+ end
|
|
|
+ end)
|
|
|
+end
|
|
|
+
|
|
|
--- Creates an Observable that creates a new Observable for each observer using a factory function.
|
|
|
-- @arg {function} factory - A function that returns an Observable.
|
|
|
-- @returns {Observable}
|
|
@@ -265,26 +285,6 @@ function Observable:dump(name, formatter)
|
|
|
return self:subscribe(onNext, onError, onCompleted)
|
|
|
end
|
|
|
|
|
|
---- Creates an Observable that produces values from a file, line by line.
|
|
|
--- @arg {string} filename - The name of the file used to create the Observable
|
|
|
--- @returns {Observable}
|
|
|
-function Observable.fromFileByLine(filename)
|
|
|
- return Observable.create(function(observer)
|
|
|
- local f = io.open(filename, 'r')
|
|
|
- if f
|
|
|
- then
|
|
|
- f:close()
|
|
|
- for line in io.lines(filename) do
|
|
|
- observer:onNext(line)
|
|
|
- end
|
|
|
-
|
|
|
- return observer:onCompleted()
|
|
|
- else
|
|
|
- return observer:onError(filename)
|
|
|
- end
|
|
|
- end)
|
|
|
-end
|
|
|
-
|
|
|
--- Determine whether all items emitted by an Observable meet some criteria.
|
|
|
-- @arg {function=identity} predicate - The predicate used to evaluate objects.
|
|
|
function Observable:all(predicate)
|