observable.lua 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. local path = (...):gsub('%.[^%.]+$', '')
  2. local Observer = require(path .. '.observer')
  3. local Observable = {}
  4. Observable.__index = Observable
  5. function Observable.create(subscribe)
  6. local self = {
  7. _subscribe = subscribe
  8. }
  9. return setmetatable(self, Observable)
  10. end
  11. function Observable.fromValue(value)
  12. return Observable.create(function(observer)
  13. observer:onNext(value)
  14. observer:onCompleted()
  15. end)
  16. end
  17. function Observable.fromCoroutine(cr)
  18. return Observable.create(function(observer)
  19. while true do
  20. local success, value = coroutine.resume(cr)
  21. observer:onNext(value)
  22. if coroutine.status(cr) == 'dead' then break end
  23. end
  24. observer:onCompleted()
  25. end)
  26. end
  27. function Observable:subscribe(onNext, onError, onComplete)
  28. return self._subscribe(Observer.create(onNext, onError, onComplete))
  29. end
  30. function Observable:dump(name)
  31. name = name or ''
  32. local onNext = function(x) print(name .. ' onNext: ' .. x) end
  33. local onError = function(e) error(name .. ' onError: ' .. e) end
  34. local onCompleted = function() print(name .. ' onCompleted') end
  35. return self:subscribe(onNext, onError, onCompleted)
  36. end
  37. -- Combinators
  38. function Observable:first()
  39. return Observable.create(function(observer)
  40. return self:subscribe(function(x)
  41. observer:onNext(x)
  42. observer:onCompleted()
  43. end,
  44. function(e)
  45. observer:onError(e)
  46. end,
  47. function()
  48. observer:onCompleted()
  49. end)
  50. end)
  51. end
  52. function Observable:last()
  53. return Observable.create(function(observer)
  54. local value
  55. return self:subscribe(function(x)
  56. value = x
  57. end,
  58. function(e)
  59. observer:onError(e)
  60. end,
  61. function()
  62. observer:onNext(value)
  63. observer:onCompleted()
  64. end)
  65. end)
  66. end
  67. function Observable:map(fn)
  68. fn = fn or function(x) return x end
  69. return Observable.create(function(observer)
  70. return self:subscribe(function(x)
  71. observer:onNext(fn(x))
  72. end,
  73. function(e)
  74. observer:onError(e)
  75. end,
  76. function()
  77. observer:onCompleted()
  78. end)
  79. end)
  80. end
  81. function Observable:reduce(accumulator, seed)
  82. return Observable.create(function(observer)
  83. local currentValue = nil or seed
  84. return self:subscribe(function(x)
  85. currentValue = accumulator(currentValue, x)
  86. end,
  87. function(e)
  88. observer:onError(e)
  89. end,
  90. function()
  91. observer:onNext(currentValue)
  92. observer:onCompleted()
  93. end)
  94. end)
  95. end
  96. function Observable:sum()
  97. return self:reduce(function(x, y) return x + y end, 0)
  98. end
  99. function Observable:combineLatest(...)
  100. local values = {}
  101. local done = {}
  102. local targets = {...}
  103. local fn = table.remove(targets)
  104. table.insert(targets, 1, self)
  105. return Observable.create(function(observer)
  106. local function handleNext(k, v)
  107. values[k] = v
  108. local full = true
  109. for i = 1, #targets do
  110. if not values[i] then full = false break end
  111. end
  112. if full then
  113. observer:onNext(fn(unpack(values)))
  114. end
  115. end
  116. local function handleCompleted(k)
  117. done[k] = true
  118. local stop = true
  119. for i = 1, #targets do
  120. if not done[i] then stop = false break end
  121. end
  122. if stop then
  123. observer:onCompleted()
  124. end
  125. end
  126. for i = 1, #targets do
  127. targets[i]:subscribe(function(x)
  128. values[i] = x
  129. handleNext(i, x)
  130. end,
  131. function(e)
  132. observer:onError(e)
  133. end,
  134. function()
  135. handleCompleted(i)
  136. end)
  137. end
  138. end)
  139. end
  140. return Observable