|
@@ -275,6 +275,32 @@ function Observable:combineLatest(...)
|
|
|
end)
|
|
|
end
|
|
|
|
|
|
+--- Returns a new Observable that produces the values from the original with duplicates removed.
|
|
|
+-- @returns {Observable}
|
|
|
+function Observable:distinct()
|
|
|
+ return Observable.create(function(observer)
|
|
|
+ local values = {}
|
|
|
+
|
|
|
+ local function onNext(x)
|
|
|
+ if not values[x] then
|
|
|
+ observer:onNext(x)
|
|
|
+ end
|
|
|
+
|
|
|
+ values[x] = true
|
|
|
+ end
|
|
|
+
|
|
|
+ local function onError(e)
|
|
|
+ observer:onError(e)
|
|
|
+ end
|
|
|
+
|
|
|
+ local function onComplete()
|
|
|
+ observer:onComplete()
|
|
|
+ end
|
|
|
+
|
|
|
+ return self:subscribe(onNext, onError, onComplete)
|
|
|
+ end)
|
|
|
+end
|
|
|
+
|
|
|
--- @class Scheduler
|
|
|
-- @description Schedulers manage groups of Observables.
|
|
|
local Scheduler = {}
|