rx.lua 21 KB


  1. local rx
  2. local pack = table.pack or function(...) return {...} end
  3. local unpack = table.unpack or unpack
  4. local function noop() end
  5. local function identity(x) return x end
  6. --- @class Observer
  7. -- @description Observers are simple objects that receive values from Observables.
  8. local Observer = {}
  9. Observer.__index = Observer
  10. --- Creates a new Observer.
  11. -- @arg {function=} onNext - Called when the Observable produces a value.
  12. -- @arg {function=} onError - Called when the Observable terminates due to an error.
  13. -- @arg {function=} onComplete - Called when the Observable completes normally.
  14. -- @returns {Observer}
  15. function Observer.create(onNext, onError, onComplete)
  16. local self = {
  17. _onNext = onNext or noop,
  18. _onError = onError or error,
  19. _onComplete = onComplete or noop,
  20. stopped = false
  21. }
  22. return setmetatable(self, Observer)
  23. end
  24. --- Pushes zero or more values to the Observer.
  25. -- @arg {*...} values
  26. function Observer:onNext(...)
  27. if not self.stopped then
  28. self._onNext(...)
  29. end
  30. end
  31. --- Notify the Observer that an error has occurred.
  32. -- @arg {string=} message - A string describing what went wrong.
  33. function Observer:onError(message)
  34. if not self.stopped then
  35. self.stopped = true
  36. self._onError(message)
  37. end
  38. end
  39. --- Notify the Observer that the sequence has completed and will produce no more values.
  40. function Observer:onComplete()
  41. if not self.stopped then
  42. self.stopped = true
  43. self._onComplete()
  44. end
  45. end
  46. --- @class Observable
  47. -- @description Observables push values to Observers.
  48. local Observable = {}
  49. Observable.__index = Observable
  50. --- Creates a new Observable.
  51. -- @arg {function} subscribe - The subscription function that produces values.
  52. -- @returns {Observable}
  53. function Observable.create(subscribe)
  54. local self = {
  55. _subscribe = subscribe
  56. }
  57. return setmetatable(self, Observable)
  58. end
  59. --- Creates an Observable that produces a single value.
  60. -- @arg {*} value
  61. -- @returns {Observable}
  62. function Observable.fromValue(value)
  63. return Observable.create(function(observer)
  64. observer:onNext(value)
  65. observer:onComplete()
  66. end)
  67. end
  68. --- Creates an Observable that produces a range of values in a manner similar to a Lua for loop.
  69. -- @arg {number} initial - The first value of the range, or the upper limit if no other arguments
  70. -- are specified.
  71. -- @arg {number=} limit - The second value of the range.
  72. -- @arg {number=1} step - An amount to increment the value by each iteration.
  73. -- @returns {Observable}
  74. function Observable.fromRange(initial, limit, step)
  75. if not limit and not step then
  76. initial, limit = 1, initial
  77. end
  78. step = step or 1
  79. return Observable.create(function(observer)
  80. for i = initial, limit, step do
  81. observer:onNext(i)
  82. end
  83. observer:onComplete()
  84. end)
  85. end
  86. --- Creates an Observable that produces values from a table.
  87. -- @arg {table} table - The table used to create the Observable.
  88. -- @arg {function=pairs} iterator - An iterator used to iterate the table, e.g. pairs or ipairs.
  89. -- @returns {Observable}
  90. function Observable.fromTable(t, iterator)
  91. iterator = iterator or pairs
  92. return Observable.create(function(observer)
  93. for key, value in iterator(t) do
  94. observer:onNext(value, key)
  95. end
  96. observer:onComplete()
  97. end)
  98. end
  99. --- Creates an Observable that produces values when the specified coroutine yields.
  100. -- @arg {thread} coroutine
  101. -- @returns {Observable}
  102. function Observable.fromCoroutine(thread)
  103. thread = type(thread) == 'function' and coroutine.create(thread) or thread
  104. return Observable.create(function(observer)
  105. return rx.scheduler:schedule(function()
  106. while not observer.stopped do
  107. local success, value = coroutine.resume(thread)
  108. if success then
  109. observer:onNext(value)
  110. else
  111. return observer:onError(value)
  112. end
  113. if coroutine.status(thread) == 'dead' then
  114. return observer:onComplete()
  115. end
  116. coroutine.yield()
  117. end
  118. end)
  119. end)
  120. end
  121. --- Shorthand for creating an Observer and passing it to this Observable's subscription function.
  122. -- @arg {function} onNext - Called when the Observable produces a value.
  123. -- @arg {function} onError - Called when the Observable terminates due to an error.
  124. -- @arg {function} onComplete - Called when the Observable completes normally.
  125. function Observable:subscribe(onNext, onError, onComplete)
  126. return self._subscribe(Observer.create(onNext, onError, onComplete))
  127. end
  128. --- Subscribes to this Observable and prints values it produces.
  129. -- @arg {string=} name - Prefixes the printed messages with a name.
  130. function Observable:dump(name)
  131. name = name and (name .. ' ') or ''
  132. local onNext = function(...) print(name .. 'onNext: ' .. table.concat({...}, ', ')) end
  133. local onError = function(e) print(name .. 'onError: ' .. e) end
  134. local onComplete = function() print(name .. 'onComplete') end
  135. return self:subscribe(onNext, onError, onComplete)
  136. end
  137. -- The functions below transform the values produced by an Observable and return a new Observable
  138. -- that produces these values.
  139. --- Returns a new Observable that runs a combinator function on the most recent values from a set
  140. -- of Observables whenever any of them produce a new value. The results of the combinator function
  141. -- are produced by the new Observable.
  142. -- @arg {Observable...} observables - One or more Observables to combine.
  143. -- @arg {function} combinator - A function that combines the latest result from each Observable and
  144. -- returns a single value.
  145. -- @returns {Observable}
  146. function Observable:combineLatest(...)
  147. local sources = {...}
  148. local combinator = table.remove(sources)
  149. table.insert(sources, 1, self)
  150. return Observable.create(function(observer)
  151. local latest = {}
  152. local pending = {unpack(sources)}
  153. local completed = {}
  154. local function onNext(i)
  155. return function(value)
  156. latest[i] = value
  157. pending[i] = nil
  158. if not next(pending) then
  159. observer:onNext(combinator(unpack(latest)))
  160. end
  161. end
  162. end
  163. local function onError(e)
  164. return observer:onError(e)
  165. end
  166. local function onComplete(i)
  167. return function()
  168. table.insert(completed, i)
  169. if #completed == #sources then
  170. observer:onComplete()
  171. end
  172. end
  173. end
  174. for i = 1, #sources do
  175. sources[i]:subscribe(onNext(i), onError, onComplete(i))
  176. end
  177. end)
  178. end
  179. --- Returns a new Observable that produces the values produced by all the specified Observables in
  180. -- the order they are specified.
  181. -- @arg {Observable...} sources - The Observables to concatenate.
  182. -- @returns {Observable}
  183. function Observable:concat(other, ...)
  184. if not other then return self end
  185. local others = {...}
  186. return Observable.create(function(observer)
  187. local function onNext(...)
  188. return observer:onNext(...)
  189. end
  190. local function onError(message)
  191. return observer:onError(message)
  192. end
  193. local function onComplete()
  194. return observer:onComplete()
  195. end
  196. local function chain()
  197. return other:concat(unpack(others)):subscribe(onNext, onError, onComplete)
  198. end
  199. return self:subscribe(onNext, onError, chain)
  200. end)
  201. end
  202. --- Returns a new Observable that produces the values from the original with duplicates removed.
  203. -- @returns {Observable}
  204. function Observable:distinct()
  205. return Observable.create(function(observer)
  206. local values = {}
  207. local function onNext(x)
  208. if not values[x] then
  209. observer:onNext(x)
  210. end
  211. values[x] = true
  212. end
  213. local function onError(e)
  214. return observer:onError(e)
  215. end
  216. local function onComplete()
  217. return observer:onComplete()
  218. end
  219. return self:subscribe(onNext, onError, onComplete)
  220. end)
  221. end
  222. --- Returns a new Observable that only produces values of the first that satisfy a predicate.
  223. -- @arg {function} predicate - The predicate used to filter values.
  224. -- @returns {Observable}
  225. function Observable:filter(predicate)
  226. predicate = predicate or identity
  227. return Observable.create(function(observer)
  228. local function onNext(...)
  229. if predicate(...) then
  230. return observer:onNext(...)
  231. end
  232. end
  233. local function onError(e)
  234. return observer:onError(e)
  235. end
  236. local function onComplete()
  237. return observer:onComplete(e)
  238. end
  239. return self:subscribe(onNext, onError, onComplete)
  240. end)
  241. end
  242. --- Returns a new Observable that produces the first value of the original that satisfies a
  243. -- predicate.
  244. -- @arg {function} predicate - The predicate used to find a value.
  245. function Observable:find(predicate)
  246. predicate = predicate or identity
  247. return Observable.create(function(observer)
  248. local function onNext(...)
  249. if predicate(...) then
  250. observer:onNext(...)
  251. return observer:onComplete()
  252. end
  253. end
  254. local function onError(message)
  255. return observer:onError(e)
  256. end
  257. local function onComplete()
  258. return observer:onComplete()
  259. end
  260. return self:subscribe(onNext, onError, onComplete)
  261. end)
  262. end
  263. --- Returns a new Observable that only produces the first result of the original.
  264. -- @returns {Observable}
  265. function Observable:first()
  266. return self:take(1)
  267. end
  268. --- Returns a new Observable that subscribes to the Observables produced by the original and
  269. -- produces their values.
  270. -- @returns {Observable}
  271. function Observable:flatten()
  272. return Observable.create(function(observer)
  273. local function onError(message)
  274. return observer:onError(message)
  275. end
  276. local function onNext(observable)
  277. local function innerOnNext(...)
  278. observer:onNext(...)
  279. end
  280. observable:subscribe(innerOnNext, onError, noop)
  281. end
  282. local function onComplete()
  283. return observer:onComplete()
  284. end
  285. return self:subscribe(onNext, onError, onComplete)
  286. end)
  287. end
  288. --- Returns a new Observable that only produces the last result of the original.
  289. -- @returns {Observable}
  290. function Observable:last()
  291. return Observable.create(function(observer)
  292. local value
  293. local function onNext(...)
  294. value = {...}
  295. end
  296. local function onError(e)
  297. return observer:onError(e)
  298. end
  299. local function onComplete()
  300. observer:onNext(unpack(value or {}))
  301. return observer:onComplete()
  302. end
  303. return self:subscribe(onNext, onError, onComplete)
  304. end)
  305. end
  306. --- Returns a new Observable that produces the values of the original transformed by a function.
  307. -- @arg {function} callback - The function to transform values from the original Observable.
  308. -- @returns {Observable}
  309. function Observable:map(callback)
  310. return Observable.create(function(observer)
  311. callback = callback or identity
  312. local function onNext(...)
  313. return observer:onNext(callback(...))
  314. end
  315. local function onError(e)
  316. return observer:onError(e)
  317. end
  318. local function onComplete()
  319. return observer:onComplete()
  320. end
  321. return self:subscribe(onNext, onError, onComplete)
  322. end)
  323. end
  324. --- Returns a new Observable that produces the maximum value produced by the original.
  325. -- @returns {Observable}
  326. function Observable:max()
  327. return self:reduce(math.max)
  328. end
  329. --- Returns a new Observable that produces the minimum value produced by the original.
  330. -- @returns {Observable}
  331. function Observable:min()
  332. return self:reduce(math.min)
  333. end
  334. --- Returns a new Observable that produces the values produced by all the specified Observables in
  335. -- the order they are produced.
  336. -- @arg {Observable...} sources - One or more Observables to merge.
  337. -- @returns {Observable}
  338. function Observable:merge(...)
  339. local sources = {...}
  340. table.insert(sources, 1, self)
  341. return Observable.create(function(observer)
  342. local function onNext(...)
  343. return observer:onNext(...)
  344. end
  345. local function onError(message)
  346. return observer:onError(message)
  347. end
  348. local function onComplete(i)
  349. return function()
  350. sources[i] = nil
  351. if not next(sources) then
  352. observer:onComplete()
  353. end
  354. end
  355. end
  356. for i = 1, #sources do
  357. sources[i]:subscribe(onNext, onError, onComplete(i))
  358. end
  359. end)
  360. end
  361. --- Returns an Observable that produces the values of the original inside tables.
  362. -- @returns {Observable}
  363. function Observable:pack()
  364. return self:map(pack)
  365. end
  366. --- Returns a new Observable that produces values computed by extracting the given key from the
  367. -- tables produced by the original.
  368. -- @arg {function} key - The key to extract from the table.
  369. -- @returns {Observable}
  370. function Observable:pluck(key)
  371. return Observable.create(function(observer)
  372. local function onNext(t)
  373. return observer:onNext(t[key])
  374. end
  375. local function onError(e)
  376. return observer:onError(e)
  377. end
  378. local function onComplete()
  379. return observer:onComplete()
  380. end
  381. return self:subscribe(onNext, onError, onComplete)
  382. end)
  383. end
  384. --- Returns a new Observable that produces a single value computed by accumulating the results of
  385. -- running a function on each value produced by the original Observable.
  386. -- @arg {function} accumulator - Accumulates the values of the original Observable. Will be passed
  387. -- the return value of the last call as the first argument and the
  388. -- current values as the rest of the arguments.
  389. -- @arg {*} seed - A value to pass to the accumulator the first time it is run.
  390. -- @returns {Observable}
  391. function Observable:reduce(accumulator, seed)
  392. return Observable.create(function(observer)
  393. local result
  394. local function onNext(...)
  395. result = result or seed or (...)
  396. result = accumulator(result, ...)
  397. end
  398. local function onError(e)
  399. return observer:onError(e)
  400. end
  401. local function onComplete()
  402. observer:onNext(result)
  403. return observer:onComplete()
  404. end
  405. return self:subscribe(onNext, onError, onComplete)
  406. end)
  407. end
  408. --- Returns a new Observable that skips over a specified number of values produced by the original
  409. -- and produces the rest.
  410. -- @arg {number=1} n - The number of values to ignore.
  411. -- @returns {Observable}
  412. function Observable:skip(n)
  413. n = n or 1
  414. return Observable.create(function(observer)
  415. local i = 1
  416. local function onNext(...)
  417. if i > n then
  418. observer:onNext(...)
  419. else
  420. i = i + 1
  421. end
  422. end
  423. local function onError(e)
  424. return observer:onError(e)
  425. end
  426. local function onComplete()
  427. return observer:onComplete()
  428. end
  429. return self:subscribe(onNext, onError, onComplete)
  430. end)
  431. end
  432. --- Returns a new Observable that skips over values produced by the original until the specified
  433. -- Observable produces a value.
  434. -- @arg {Observable} other - The Observable that triggers the production of values.
  435. -- @returns {Observable}
  436. function Observable:skipUntil(other)
  437. return Observable.create(function(observer)
  438. local function trigger()
  439. local function onNext(...)
  440. return observer:onNext(...)
  441. end
  442. local function onError(message)
  443. return observer:onNext(message)
  444. end
  445. local function onComplete()
  446. return observer:onComplete()
  447. end
  448. return self:subscribe(onNext, onError, onComplete)
  449. end
  450. other:subscribe(trigger, trigger, trigger)
  451. end)
  452. end
  453. --- Returns a new Observable that produces the sum of the values of the original Observable as a
  454. -- single result.
  455. -- @returns {Observable}
  456. function Observable:sum()
  457. return self:reduce(function(x, y) return x + y end, 0)
  458. end
  459. --- Returns a new Observable that only produces the first n results of the original.
  460. -- @arg {number=1} n - The number of elements to produce before completing.
  461. -- @returns {Observable}
  462. function Observable:take(n)
  463. n = n or 1
  464. return Observable.create(function(observer)
  465. if n <= 0 then
  466. observer:onComplete()
  467. return
  468. end
  469. local i = 1
  470. local function onNext(...)
  471. observer:onNext(...)
  472. i = i + 1
  473. if i > n then
  474. observer:onComplete()
  475. end
  476. end
  477. local function onError(e)
  478. return observer:onError(e)
  479. end
  480. local function onComplete()
  481. return observer:onComplete()
  482. end
  483. return self:subscribe(onNext, onError, onComplete)
  484. end)
  485. end
  486. --- Returns a new Observable that completes when the specified Observable fires.
  487. -- @arg {Observable} other - The Observable that triggers completion of the original.
  488. -- @returns {Observable}
  489. function Observable:takeUntil(other)
  490. return Observable.create(function(observer)
  491. local function onNext(...)
  492. return observer:onNext(...)
  493. end
  494. local function onError(e)
  495. return observer:onError(e)
  496. end
  497. local function onComplete()
  498. return observer:onComplete()
  499. end
  500. other:subscribe(onComplete, onComplete, onComplete)
  501. return self:subscribe(onNext, onError, onComplete)
  502. end)
  503. end
  504. --- Returns an Observable that unpacks the tables produced by the original.
  505. -- @returns {Observable}
  506. function Observable:unpack()
  507. return self:map(unpack)
  508. end
  509. --- Returns an Observable that takes any values produced by the original that consist of multiple
  510. -- return values and produces each value individually.
  511. -- @returns {Observable}
  512. function Observable:unwrap()
  513. return Observable.create(function(observer)
  514. local function onNext(...)
  515. local values = {...}
  516. for i = 1, #values do
  517. observer:onNext(values[i])
  518. end
  519. end
  520. local function onError(message)
  521. return observer:onError(message)
  522. end
  523. local function onComplete()
  524. return observer:onComplete()
  525. end
  526. return self:subscribe(onNext, onError, onComplete)
  527. end)
  528. end
  529. --- @class Scheduler
  530. -- @description Schedulers manage groups of Observables.
  531. local Scheduler = {}
  532. --- @class CooperativeScheduler
  533. -- @description Manages Observables using coroutines and a virtual clock that must be updated
  534. -- manually.
  535. local Cooperative = {}
  536. Cooperative.__index = Cooperative
  537. --- Creates a new Cooperative Scheduler.
  538. -- @arg {number=0} currentTime - A time to start the scheduler at.
  539. -- @returns {Scheduler.Cooperative}
  540. function Cooperative.create(currentTime)
  541. local self = {
  542. tasks = {},
  543. currentTime = currentTime or 0
  544. }
  545. return setmetatable(self, Cooperative)
  546. end
  547. --- Schedules a function to be run after an optional delay.
  548. -- @arg {function} action - The function to execute. Will be converted into a coroutine. The
  549. -- coroutine may yield execution back to the scheduler with an optional
  550. -- number, which will put it to sleep for a time period.
  551. -- @arg {number=0} delay - Delay execution of the action by a time period.
  552. function Cooperative:schedule(action, delay)
  553. table.insert(self.tasks, {
  554. thread = coroutine.create(action),
  555. due = self.currentTime + (delay or 0)
  556. })
  557. end
  558. --- Triggers an update of the Cooperative Scheduler. The clock will be advanced and the scheduler
  559. -- will run any coroutines that are due to be run.
  560. -- @arg {number=0} delta - An amount of time to advance the clock by. It is common to pass in the
  561. -- time in seconds or milliseconds elapsed since this function was last
  562. -- called.
  563. function Cooperative:update(delta)
  564. self.currentTime = self.currentTime + (delta or 0)
  565. for i = #self.tasks, 1, -1 do
  566. local task = self.tasks[i]
  567. if self.currentTime >= task.due then
  568. local success, delay = coroutine.resume(task.thread)
  569. if success then
  570. task.due = math.max(task.due + (delay or 0), self.currentTime)
  571. else
  572. error(delay)
  573. end
  574. if coroutine.status(task.thread) == 'dead' then
  575. table.remove(self.tasks, i)
  576. end
  577. end
  578. end
  579. end
  580. --- Returns whether or not the Cooperative Scheduler's queue is empty.
  581. function Cooperative:isEmpty()
  582. return not next(self.tasks)
  583. end
  584. Scheduler.Cooperative = Cooperative
  585. --- @class Subject
  586. -- @description Subjects function both as an Observer and as an Observable. Subjects inherit all
  587. -- Observable functions, including subscribe. Values can also be pushed to the Subject, which will
  588. -- be broadcasted to any subscribed Observers.
  589. local Subject = setmetatable({}, Observable)
  590. Subject.__index = Subject
  591. --- Creates a new Subject.
  592. -- @arg {*...} value - The initial values.
  593. -- @returns {Subject}
  594. function Subject.create(...)
  595. local self = {
  596. value = {...},
  597. observers = {}
  598. }
  599. return setmetatable(self, Subject)
  600. end
  601. --- Creates a new Observer and attaches it to the Subject.
  602. -- @arg {function} onNext - Called when the Subject produces a value.
  603. -- @arg {function} onError - Called when the Subject terminates due to an error.
  604. -- @arg {function} onComplete - Called when the Subject completes normally.
  605. function Subject:subscribe(onNext, onError, onComplete)
  606. table.insert(self.observers, Observer.create(onNext, onError, onComplete))
  607. end
  608. --- Pushes zero or more values to the Subject. It will be broadcasted to all Observers.
  609. -- @arg {*...} values
  610. function Subject:onNext(...)
  611. self.value = {...}
  612. for i = 1, #self.observers do
  613. self.observers[i]:onNext(...)
  614. end
  615. end
  616. --- Signal to all Observers that an error has occurred.
  617. -- @arg {string=} message - A string describing what went wrong.
  618. function Subject:onError(message)
  619. for i = 1, #self.observers do
  620. self.observers[i]:onError(message)
  621. end
  622. end
  623. --- Signal to all Observers that the Subject will not produce any more values.
  624. function Subject:onComplete()
  625. for i = 1, #self.observers do
  626. self.observers[i]:onComplete()
  627. end
  628. end
  629. --- Returns the last value emitted by the Subject, or the initial value passed to the constructor
  630. -- if nothing has been emitted yet.
  631. -- @returns {*...}
  632. function Subject:getValue()
  633. return unpack(self.value or {})
  634. end
  635. Subject.__call = Subject.onNext
  636. rx = {
  637. Observer = Observer,
  638. Observable = Observable,
  639. Scheduler = Scheduler,
  640. scheduler = Scheduler.Cooperative.create(),
  641. Subject = Subject
  642. }
  643. return rx