123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163 |
- local path = (...):gsub('%.[^%.]+$', '')
- local Observer = require(path .. '.observer')
- local Observable = {}
- Observable.__index = Observable
- function Observable.create(subscribe)
- local self = {
- _subscribe = subscribe
- }
- return setmetatable(self, Observable)
- end
- function Observable.fromValue(value)
- return Observable.create(function(observer)
- observer:onNext(value)
- observer:onCompleted()
- end)
- end
- function Observable.fromCoroutine(cr)
- return Observable.create(function(observer)
- while true do
- local success, value = coroutine.resume(cr)
- observer:onNext(value)
- if coroutine.status(cr) == 'dead' then break end
- end
- observer:onCompleted()
- end)
- end
- function Observable:subscribe(onNext, onError, onComplete)
- return self._subscribe(Observer.create(onNext, onError, onComplete))
- end
- function Observable:dump(name)
- name = name or ''
- local onNext = function(x) print(name .. ' onNext: ' .. x) end
- local onError = function(e) error(name .. ' onError: ' .. e) end
- local onCompleted = function() print(name .. ' onCompleted') end
- return self:subscribe(onNext, onError, onCompleted)
- end
- -- Combinators
- function Observable:first()
- return Observable.create(function(observer)
- return self:subscribe(function(x)
- observer:onNext(x)
- observer:onCompleted()
- end,
- function(e)
- observer:onError(e)
- end,
- function()
- observer:onCompleted()
- end)
- end)
- end
- function Observable:last()
- return Observable.create(function(observer)
- local value
- return self:subscribe(function(x)
- value = x
- end,
- function(e)
- observer:onError(e)
- end,
- function()
- observer:onNext(value)
- observer:onCompleted()
- end)
- end)
- end
- function Observable:map(fn)
- fn = fn or function(x) return x end
- return Observable.create(function(observer)
- return self:subscribe(function(x)
- observer:onNext(fn(x))
- end,
- function(e)
- observer:onError(e)
- end,
- function()
- observer:onCompleted()
- end)
- end)
- end
- function Observable:reduce(accumulator, seed)
- return Observable.create(function(observer)
- local currentValue = nil or seed
- return self:subscribe(function(x)
- currentValue = accumulator(currentValue, x)
- end,
- function(e)
- observer:onError(e)
- end,
- function()
- observer:onNext(currentValue)
- observer:onCompleted()
- end)
- end)
- end
- function Observable:sum()
- return self:reduce(function(x, y) return x + y end, 0)
- end
- function Observable:combineLatest(...)
- local values = {}
- local done = {}
- local targets = {...}
- local fn = table.remove(targets)
- table.insert(targets, 1, self)
- return Observable.create(function(observer)
- local function handleNext(k, v)
- values[k] = v
- local full = true
- for i = 1, #targets do
- if not values[i] then full = false break end
- end
- if full then
- observer:onNext(fn(unpack(values)))
- end
- end
- local function handleCompleted(k)
- done[k] = true
- local stop = true
- for i = 1, #targets do
- if not done[i] then stop = false break end
- end
- if stop then
- observer:onCompleted()
- end
- end
- for i = 1, #targets do
- targets[i]:subscribe(function(x)
- values[i] = x
- handleNext(i, x)
- end,
- function(e)
- observer:onError(e)
- end,
- function()
- handleCompleted(i)
- end)
- end
- end)
- end
- return Observable
|