rx.lua 30 KB


  1. local rx
  2. local pack = function(...) return {...} end
  3. local unpack = table.unpack or unpack
  4. local function eq(x, y) return x == y end
  5. local function noop() end
  6. local function identity(x) return x end
  7. local function constant(x) return function() return x end end
  8. --- @class Observer
  9. -- @description Observers are simple objects that receive values from Observables.
  10. local Observer = {}
  11. Observer.__index = Observer
  12. Observer.__tostring = constant('Observer')
  13. --- Creates a new Observer.
  14. -- @arg {function=} onNext - Called when the Observable produces a value.
  15. -- @arg {function=} onError - Called when the Observable terminates due to an error.
  16. -- @arg {function=} onComplete - Called when the Observable completes normally.
  17. -- @returns {Observer}
  18. function Observer.create(onNext, onError, onComplete)
  19. local self = {
  20. _onNext = onNext or noop,
  21. _onError = onError or error,
  22. _onComplete = onComplete or noop,
  23. stopped = false
  24. }
  25. return setmetatable(self, Observer)
  26. end
  27. --- Pushes zero or more values to the Observer.
  28. -- @arg {*...} values
  29. function Observer:onNext(...)
  30. if not self.stopped then
  31. self._onNext(...)
  32. end
  33. end
  34. --- Notify the Observer that an error has occurred.
  35. -- @arg {string=} message - A string describing what went wrong.
  36. function Observer:onError(message)
  37. if not self.stopped then
  38. self.stopped = true
  39. self._onError(message)
  40. end
  41. end
  42. --- Notify the Observer that the sequence has completed and will produce no more values.
  43. function Observer:onComplete()
  44. if not self.stopped then
  45. self.stopped = true
  46. self._onComplete()
  47. end
  48. end
  49. --- @class Observable
  50. -- @description Observables push values to Observers.
  51. local Observable = {}
  52. Observable.__index = Observable
  53. Observable.__tostring = constant('Observable')
  54. --- Creates a new Observable.
  55. -- @arg {function} subscribe - The subscription function that produces values.
  56. -- @returns {Observable}
  57. function Observable.create(subscribe)
  58. local self = {
  59. _subscribe = subscribe
  60. }
  61. return setmetatable(self, Observable)
  62. end
  63. --- Shorthand for creating an Observer and passing it to this Observable's subscription function.
  64. -- @arg {function} onNext - Called when the Observable produces a value.
  65. -- @arg {function} onError - Called when the Observable terminates due to an error.
  66. -- @arg {function} onComplete - Called when the Observable completes normally.
  67. function Observable:subscribe(onNext, onError, onComplete)
  68. if type(onNext) == 'table' then
  69. return self._subscribe(onNext)
  70. else
  71. return self._subscribe(Observer.create(onNext, onError, onComplete))
  72. end
  73. end
  74. --- Creates an Observable that produces a single value.
  75. -- @arg {*} value
  76. -- @returns {Observable}
  77. function Observable.fromValue(value)
  78. return Observable.create(function(observer)
  79. observer:onNext(value)
  80. observer:onComplete()
  81. end)
  82. end
  83. --- Creates an Observable that produces a range of values in a manner similar to a Lua for loop.
  84. -- @arg {number} initial - The first value of the range, or the upper limit if no other arguments
  85. -- are specified.
  86. -- @arg {number=} limit - The second value of the range.
  87. -- @arg {number=1} step - An amount to increment the value by each iteration.
  88. -- @returns {Observable}
  89. function Observable.fromRange(initial, limit, step)
  90. if not limit and not step then
  91. initial, limit = 1, initial
  92. end
  93. step = step or 1
  94. return Observable.create(function(observer)
  95. for i = initial, limit, step do
  96. observer:onNext(i)
  97. end
  98. observer:onComplete()
  99. end)
  100. end
  101. --- Creates an Observable that produces values from a table.
  102. -- @arg {table} table - The table used to create the Observable.
  103. -- @arg {function=pairs} iterator - An iterator used to iterate the table, e.g. pairs or ipairs.
  104. -- @arg {boolean} keys - Whether or not to also emit the keys of the table.
  105. -- @returns {Observable}
  106. function Observable.fromTable(t, iterator, keys)
  107. iterator = iterator or pairs
  108. return Observable.create(function(observer)
  109. for key, value in iterator(t) do
  110. observer:onNext(value, keys and key or nil)
  111. end
  112. observer:onComplete()
  113. end)
  114. end
  115. --- Creates an Observable that produces values when the specified coroutine yields.
  116. -- @arg {thread} coroutine
  117. -- @returns {Observable}
  118. function Observable.fromCoroutine(thread)
  119. thread = type(thread) == 'function' and coroutine.create(thread) or thread
  120. return Observable.create(function(observer)
  121. return rx.scheduler:schedule(function()
  122. while not observer.stopped do
  123. local success, value = coroutine.resume(thread)
  124. if success then
  125. observer:onNext(value)
  126. else
  127. return observer:onError(value)
  128. end
  129. if coroutine.status(thread) == 'dead' then
  130. return observer:onComplete()
  131. end
  132. coroutine.yield()
  133. end
  134. end)
  135. end)
  136. end
  137. --- Subscribes to this Observable and prints values it produces.
  138. -- @arg {string=} name - Prefixes the printed messages with a name.
  139. function Observable:dump(name)
  140. name = name and (name .. ' ') or ''
  141. local onNext = function(...) print(name .. 'onNext: ' .. table.concat({...}, ', ')) end
  142. local onError = function(e) print(name .. 'onError: ' .. e) end
  143. local onComplete = function() print(name .. 'onComplete') end
  144. return self:subscribe(onNext, onError, onComplete)
  145. end
  146. -- The functions below transform the values produced by an Observable and return a new Observable
  147. -- that produces these values.
  148. --- Returns an Observable that only produces values from the original if they are different from
  149. -- the previous value.
  150. -- @arg {function} comparator - A function used to compare 2 values. If unspecified, == is used.
  151. -- @returns {Observable}
  152. function Observable:changes(comparator)
  153. comparator = comparator or eq
  154. return Observable.create(function(observer)
  155. local first = true
  156. local currentValue = nil
  157. local function onNext(value, ...)
  158. if first or not comparator(value, currentValue) then
  159. observer:onNext(value, ...)
  160. currentValue = value
  161. first = false
  162. end
  163. end
  164. local function onError(message)
  165. return observer:onError(onError)
  166. end
  167. local function onComplete()
  168. return observer:onComplete()
  169. end
  170. return self:subscribe(onNext, onError, onComplete)
  171. end)
  172. end
  173. --- Returns a new Observable that runs a combinator function on the most recent values from a set
  174. -- of Observables whenever any of them produce a new value. The results of the combinator function
  175. -- are produced by the new Observable.
  176. -- @arg {Observable...} observables - One or more Observables to combine.
  177. -- @arg {function} combinator - A function that combines the latest result from each Observable and
  178. -- returns a single value.
  179. -- @returns {Observable}
  180. function Observable:combine(...)
  181. local sources = {...}
  182. local combinator = table.remove(sources)
  183. table.insert(sources, 1, self)
  184. return Observable.create(function(observer)
  185. local latest = {}
  186. local pending = {unpack(sources)}
  187. local completed = {}
  188. local function onNext(i)
  189. return function(value)
  190. latest[i] = value
  191. pending[i] = nil
  192. if not next(pending) then
  193. observer:onNext(combinator(unpack(latest)))
  194. end
  195. end
  196. end
  197. local function onError(e)
  198. return observer:onError(e)
  199. end
  200. local function onComplete(i)
  201. return function()
  202. table.insert(completed, i)
  203. if #completed == #sources then
  204. observer:onComplete()
  205. end
  206. end
  207. end
  208. for i = 1, #sources do
  209. sources[i]:subscribe(onNext(i), onError, onComplete(i))
  210. end
  211. end)
  212. end
  213. --- Returns a new Observable that produces the values of the first with falsy values removed.
  214. -- @returns {Observable}
  215. function Observable:compact()
  216. return self:filter(identity)
  217. end
  218. --- Returns a new Observable that produces the values produced by all the specified Observables in
  219. -- the order they are specified.
  220. -- @arg {Observable...} sources - The Observables to concatenate.
  221. -- @returns {Observable}
  222. function Observable:concat(other, ...)
  223. if not other then return self end
  224. local others = {...}
  225. return Observable.create(function(observer)
  226. local function onNext(...)
  227. return observer:onNext(...)
  228. end
  229. local function onError(message)
  230. return observer:onError(message)
  231. end
  232. local function onComplete()
  233. return observer:onComplete()
  234. end
  235. local function chain()
  236. return other:concat(unpack(others)):subscribe(onNext, onError, onComplete)
  237. end
  238. return self:subscribe(onNext, onError, chain)
  239. end)
  240. end
  241. --- Returns a new Observable that produces the values from the original with duplicates removed.
  242. -- @returns {Observable}
  243. function Observable:distinct()
  244. return Observable.create(function(observer)
  245. local values = {}
  246. local function onNext(x)
  247. if not values[x] then
  248. observer:onNext(x)
  249. end
  250. values[x] = true
  251. end
  252. local function onError(e)
  253. return observer:onError(e)
  254. end
  255. local function onComplete()
  256. return observer:onComplete()
  257. end
  258. return self:subscribe(onNext, onError, onComplete)
  259. end)
  260. end
  261. --- Returns a new Observable that only produces values of the first that satisfy a predicate.
  262. -- @arg {function} predicate - The predicate used to filter values.
  263. -- @returns {Observable}
  264. function Observable:filter(predicate)
  265. predicate = predicate or identity
  266. return Observable.create(function(observer)
  267. local function onNext(...)
  268. if predicate(...) then
  269. return observer:onNext(...)
  270. end
  271. end
  272. local function onError(e)
  273. return observer:onError(e)
  274. end
  275. local function onComplete()
  276. return observer:onComplete(e)
  277. end
  278. return self:subscribe(onNext, onError, onComplete)
  279. end)
  280. end
  281. --- Returns a new Observable that produces the first value of the original that satisfies a
  282. -- predicate.
  283. -- @arg {function} predicate - The predicate used to find a value.
  284. function Observable:find(predicate)
  285. predicate = predicate or identity
  286. return Observable.create(function(observer)
  287. local function onNext(...)
  288. if predicate(...) then
  289. observer:onNext(...)
  290. return observer:onComplete()
  291. end
  292. end
  293. local function onError(message)
  294. return observer:onError(e)
  295. end
  296. local function onComplete()
  297. return observer:onComplete()
  298. end
  299. return self:subscribe(onNext, onError, onComplete)
  300. end)
  301. end
  302. --- Returns a new Observable that only produces the first result of the original.
  303. -- @returns {Observable}
  304. function Observable:first()
  305. return self:take(1)
  306. end
  307. --- Returns a new Observable that subscribes to the Observables produced by the original and
  308. -- produces their values.
  309. -- @returns {Observable}
  310. function Observable:flatten()
  311. return Observable.create(function(observer)
  312. local function onError(message)
  313. return observer:onError(message)
  314. end
  315. local function onNext(observable)
  316. local function innerOnNext(...)
  317. observer:onNext(...)
  318. end
  319. observable:subscribe(innerOnNext, onError, noop)
  320. end
  321. local function onComplete()
  322. return observer:onComplete()
  323. end
  324. return self:subscribe(onNext, onError, onComplete)
  325. end)
  326. end
  327. --- Returns a new Observable that only produces the last result of the original.
  328. -- @returns {Observable}
  329. function Observable:last()
  330. return Observable.create(function(observer)
  331. local value
  332. local empty = true
  333. local function onNext(...)
  334. value = {...}
  335. empty = false
  336. end
  337. local function onError(e)
  338. return observer:onError(e)
  339. end
  340. local function onComplete()
  341. if not empty then
  342. observer:onNext(unpack(value or {}))
  343. end
  344. return observer:onComplete()
  345. end
  346. return self:subscribe(onNext, onError, onComplete)
  347. end)
  348. end
  349. --- Returns a new Observable that produces the values of the original transformed by a function.
  350. -- @arg {function} callback - The function to transform values from the original Observable.
  351. -- @returns {Observable}
  352. function Observable:map(callback)
  353. return Observable.create(function(observer)
  354. callback = callback or identity
  355. local function onNext(...)
  356. return observer:onNext(callback(...))
  357. end
  358. local function onError(e)
  359. return observer:onError(e)
  360. end
  361. local function onComplete()
  362. return observer:onComplete()
  363. end
  364. return self:subscribe(onNext, onError, onComplete)
  365. end)
  366. end
  367. --- Returns a new Observable that produces the maximum value produced by the original.
  368. -- @returns {Observable}
  369. function Observable:max()
  370. return self:reduce(math.max)
  371. end
  372. --- Returns a new Observable that produces the minimum value produced by the original.
  373. -- @returns {Observable}
  374. function Observable:min()
  375. return self:reduce(math.min)
  376. end
  377. --- Returns a new Observable that produces the values produced by all the specified Observables in
  378. -- the order they are produced.
  379. -- @arg {Observable...} sources - One or more Observables to merge.
  380. -- @returns {Observable}
  381. function Observable:merge(...)
  382. local sources = {...}
  383. table.insert(sources, 1, self)
  384. return Observable.create(function(observer)
  385. local function onNext(...)
  386. return observer:onNext(...)
  387. end
  388. local function onError(message)
  389. return observer:onError(message)
  390. end
  391. local function onComplete(i)
  392. return function()
  393. sources[i] = nil
  394. if not next(sources) then
  395. observer:onComplete()
  396. end
  397. end
  398. end
  399. for i = 1, #sources do
  400. sources[i]:subscribe(onNext, onError, onComplete(i))
  401. end
  402. end)
  403. end
  404. --- Returns an Observable that produces the values of the original inside tables.
  405. -- @returns {Observable}
  406. function Observable:pack()
  407. return self:map(pack)
  408. end
  409. --- Returns two Observables: one that produces values for which the predicate returns truthy for,
  410. -- and another that produces values for which the predicate returns falsy.
  411. -- @arg {function} predicate - The predicate used to partition the values.
  412. -- @returns {Observable}
  413. -- @returns {Observable}
  414. function Observable:partition(predicate)
  415. return self:filter(predicate), self:reject(predicate)
  416. end
  417. --- Returns a new Observable that produces values computed by extracting the given keys from the
  418. -- tables produced by the original.
  419. -- @arg {string...} keys - The key to extract from the table. Multiple keys can be specified to
  420. -- recursively pluck values from nested tables.
  421. -- @returns {Observable}
  422. function Observable:pluck(key, ...)
  423. if not key then return self end
  424. return Observable.create(function(observer)
  425. local function onNext(t)
  426. return observer:onNext(t[key])
  427. end
  428. local function onError(e)
  429. return observer:onError(e)
  430. end
  431. local function onComplete()
  432. return observer:onComplete()
  433. end
  434. return self:subscribe(onNext, onError, onComplete)
  435. end):pluck(...)
  436. end
  437. --- Returns a new Observable that produces a single value computed by accumulating the results of
  438. -- running a function on each value produced by the original Observable.
  439. -- @arg {function} accumulator - Accumulates the values of the original Observable. Will be passed
  440. -- the return value of the last call as the first argument and the
  441. -- current values as the rest of the arguments.
  442. -- @arg {*} seed - A value to pass to the accumulator the first time it is run.
  443. -- @returns {Observable}
  444. function Observable:reduce(accumulator, seed)
  445. return Observable.create(function(observer)
  446. local result = seed
  447. local first = true
  448. local function onNext(...)
  449. if first and seed == nil then
  450. result = ...
  451. first = false
  452. else
  453. result = accumulator(result, ...)
  454. end
  455. end
  456. local function onError(e)
  457. return observer:onError(e)
  458. end
  459. local function onComplete()
  460. observer:onNext(result)
  461. return observer:onComplete()
  462. end
  463. return self:subscribe(onNext, onError, onComplete)
  464. end)
  465. end
  466. --- Returns a new Observable that produces values from the original which do not satisfy a
  467. -- predicate.
  468. -- @arg {function} predicate - The predicate used to reject values.
  469. -- @returns {Observable}
  470. function Observable:reject(predicate)
  471. predicate = predicate or identity
  472. return Observable.create(function(observer)
  473. local function onNext(...)
  474. if not predicate(...) then
  475. return observer:onNext(...)
  476. end
  477. end
  478. local function onError(e)
  479. return observer:onError(e)
  480. end
  481. local function onComplete()
  482. return observer:onComplete(e)
  483. end
  484. return self:subscribe(onNext, onError, onComplete)
  485. end)
  486. end
  487. --- Returns a new Observable that skips over a specified number of values produced by the original
  488. -- and produces the rest.
  489. -- @arg {number=1} n - The number of values to ignore.
  490. -- @returns {Observable}
  491. function Observable:skip(n)
  492. n = n or 1
  493. return Observable.create(function(observer)
  494. local i = 1
  495. local function onNext(...)
  496. if i > n then
  497. observer:onNext(...)
  498. else
  499. i = i + 1
  500. end
  501. end
  502. local function onError(e)
  503. return observer:onError(e)
  504. end
  505. local function onComplete()
  506. return observer:onComplete()
  507. end
  508. return self:subscribe(onNext, onError, onComplete)
  509. end)
  510. end
  511. --- Returns a new Observable that skips over values produced by the original until the specified
  512. -- Observable produces a value.
  513. -- @arg {Observable} other - The Observable that triggers the production of values.
  514. -- @returns {Observable}
  515. function Observable:skipUntil(other)
  516. return Observable.create(function(observer)
  517. local function trigger()
  518. local function onNext(...)
  519. return observer:onNext(...)
  520. end
  521. local function onError(message)
  522. return observer:onNext(message)
  523. end
  524. local function onComplete()
  525. return observer:onComplete()
  526. end
  527. return self:subscribe(onNext, onError, onComplete)
  528. end
  529. other:subscribe(trigger, trigger, trigger)
  530. end)
  531. end
  532. --- Returns a new Observable that skips elements until the predicate returns falsy for one of them.
  533. -- @arg {function} predicate - The predicate used to continue skipping values.
  534. -- @returns {Observable}
  535. function Observable:skipWhile(predicate)
  536. predicate = predicate or identity
  537. return Observable.create(function(observer)
  538. local skipping = true
  539. local function onNext(...)
  540. if skipping then
  541. skipping = predicate(...)
  542. end
  543. if not skipipng then
  544. return observer:onNext(...)
  545. end
  546. end
  547. local function onError(message)
  548. return observer:onError(message)
  549. end
  550. local function onComplete()
  551. return observer:onComplete()
  552. end
  553. return self:subscribe(onNext, onError, onComplete)
  554. end)
  555. end
  556. --- Returns a new Observable that only produces the first n results of the original.
  557. -- @arg {number=1} n - The number of elements to produce before completing.
  558. -- @returns {Observable}
  559. function Observable:take(n)
  560. n = n or 1
  561. return Observable.create(function(observer)
  562. if n <= 0 then
  563. observer:onComplete()
  564. return
  565. end
  566. local i = 1
  567. local function onNext(...)
  568. observer:onNext(...)
  569. i = i + 1
  570. if i > n then
  571. observer:onComplete()
  572. end
  573. end
  574. local function onError(e)
  575. return observer:onError(e)
  576. end
  577. local function onComplete()
  578. return observer:onComplete()
  579. end
  580. return self:subscribe(onNext, onError, onComplete)
  581. end)
  582. end
  583. --- Returns a new Observable that completes when the specified Observable fires.
  584. -- @arg {Observable} other - The Observable that triggers completion of the original.
  585. -- @returns {Observable}
  586. function Observable:takeUntil(other)
  587. return Observable.create(function(observer)
  588. local function onNext(...)
  589. return observer:onNext(...)
  590. end
  591. local function onError(e)
  592. return observer:onError(e)
  593. end
  594. local function onComplete()
  595. return observer:onComplete()
  596. end
  597. other:subscribe(onComplete, onComplete, onComplete)
  598. return self:subscribe(onNext, onError, onComplete)
  599. end)
  600. end
  601. --- Returns a new Observable that produces elements until the predicate returns falsy.
  602. -- @arg {function} predicate - The predicate used to continue production of values.
  603. -- @returns {Observable}
  604. function Observable:takeWhile(predicate)
  605. predicate = predicate or identity
  606. return Observable.create(function(observer)
  607. local taking = true
  608. local function onNext(...)
  609. if taking then
  610. taking = predicate(...)
  611. if taking then
  612. return observer:onNext(...)
  613. else
  614. return observer:onComplete()
  615. end
  616. end
  617. end
  618. local function onError(message)
  619. return observer:onError(message)
  620. end
  621. local function onComplete()
  622. return observer:onComplete()
  623. end
  624. return self:subscribe(onNext, onError, onComplete)
  625. end)
  626. end
  627. --- Runs a function each time this Observable has activity. Similar to subscribe but does not
  628. -- create a subscription.
  629. -- @arg {function=} onNext - Run when the Observable produces values.
  630. -- @arg {function=} onError - Run when the Observable encounters a problem.
  631. -- @arg {function=} onComplete - Run when the Observable completes.
  632. -- @returns {Observable}
  633. function Observable:tap(_onNext, _onError, _onComplete)
  634. _onNext, _onError, _onComplete = _onNext or noop, _onError or noop, _onComplete or noop
  635. return Observable.create(function(observer)
  636. local function onNext(...)
  637. _onNext(...)
  638. return observer:onNext(...)
  639. end
  640. local function onError(message)
  641. _onError(message)
  642. return observer:onError(message)
  643. end
  644. local function onComplete()
  645. _onComplete()
  646. return observer:onComplete()
  647. end
  648. return self:subscribe(onNext, onError, onComplete)
  649. end)
  650. end
  651. --- Returns an Observable that unpacks the tables produced by the original.
  652. -- @returns {Observable}
  653. function Observable:unpack()
  654. return self:map(unpack)
  655. end
  656. --- Returns an Observable that takes any values produced by the original that consist of multiple
  657. -- return values and produces each value individually.
  658. -- @returns {Observable}
  659. function Observable:unwrap()
  660. return Observable.create(function(observer)
  661. local function onNext(...)
  662. local values = {...}
  663. for i = 1, #values do
  664. observer:onNext(values[i])
  665. end
  666. end
  667. local function onError(message)
  668. return observer:onError(message)
  669. end
  670. local function onComplete()
  671. return observer:onComplete()
  672. end
  673. return self:subscribe(onNext, onError, onComplete)
  674. end)
  675. end
  676. --- Returns an Observable that produces a sliding window of the values produced by the original.
  677. -- @arg {number} size - The size of the window. The returned observable will produce this number
  678. -- of the most recent values as multiple arguments to onNext.
  679. -- @returns {Observable}
  680. function Observable:window(size)
  681. return Observable.create(function(observer)
  682. local window = {}
  683. local function onNext(value)
  684. table.insert(window, value)
  685. if #window > size then
  686. table.remove(window, 1)
  687. observer:onNext(unpack(window))
  688. end
  689. end
  690. local function onError(message)
  691. return observer:onError(message)
  692. end
  693. local function onComplete()
  694. return observer:onComplete()
  695. end
  696. return self:subscribe(onNext, onError, onComplete)
  697. end)
  698. end
  699. --- Returns an Observable that produces values from the original along with the most recently
  700. -- produced value from all other specified Observables. Note that only the first argument from each
  701. -- source Observable is used.
  702. -- @arg {Observable...} sources - The Observables to include the most recent values from.
  703. -- @returns {Observable}
  704. function Observable:with(...)
  705. local sources = {...}
  706. return Observable.create(function(observer)
  707. local latest = {}
  708. local function setLatest(i)
  709. return function(value)
  710. latest[i] = value
  711. end
  712. end
  713. local function onNext(value)
  714. return observer:onNext(value, unpack(latest))
  715. end
  716. local function onError(e)
  717. return observer:onError(e)
  718. end
  719. local function onComplete()
  720. return observer:onComplete()
  721. end
  722. for i = 1, #sources do
  723. sources[i]:subscribe(setLatest(i), noop, noop)
  724. end
  725. return self:subscribe(onNext, onError, onComplete)
  726. end)
  727. end
  728. --- Returns an Observable that buffers values from the original and produces them as multiple
  729. -- values.
  730. -- @arg {number} size - The size of the buffer.
  731. function Observable:wrap(size)
  732. return Observable.create(function(observer)
  733. local buffer = {}
  734. local function emit()
  735. if #buffer > 0 then
  736. observer:onNext(unpack(buffer))
  737. buffer = {}
  738. end
  739. end
  740. local function onNext(...)
  741. local values = {...}
  742. for i = 1, #values do
  743. table.insert(buffer, values[i])
  744. if #buffer >= size then
  745. return emit()
  746. end
  747. end
  748. end
  749. local function onError(message)
  750. emit()
  751. return observer:onError(message)
  752. end
  753. local function onComplete()
  754. emit()
  755. return observer:onComplete()
  756. end
  757. return self:subscribe(onNext, onError, onComplete)
  758. end)
  759. end
  760. --- @class Scheduler
  761. -- @description Schedulers manage groups of Observables.
  762. local Scheduler = {}
  763. --- @class ImmediateScheduler
  764. -- @description Schedules Observables by running all operations immediately.
  765. local Immediate = {}
  766. Immediate.__index = Immediate
  767. Immediate.__tostring = constant('ImmediateScheduler')
  768. --- Creates a new Immediate Scheduler.
  769. -- @returns {Scheduler.Immediate}
  770. function Immediate.create()
  771. return setmetatable({}, Immediate)
  772. end
  773. --- Schedules a function to be run on the scheduler. It is executed immediately.
  774. -- @arg {function} action - The function to execute.
  775. function Immediate:schedule(action)
  776. action()
  777. end
  778. Scheduler.Immediate = Immediate
  779. --- @class CooperativeScheduler
  780. -- @description Manages Observables using coroutines and a virtual clock that must be updated
  781. -- manually.
  782. local Cooperative = {}
  783. Cooperative.__index = Cooperative
  784. Cooperative.__tostring = constant('CooperativeScheduler')
  785. --- Creates a new Cooperative Scheduler.
  786. -- @arg {number=0} currentTime - A time to start the scheduler at.
  787. -- @returns {Scheduler.Cooperative}
  788. function Cooperative.create(currentTime)
  789. local self = {
  790. tasks = {},
  791. currentTime = currentTime or 0
  792. }
  793. return setmetatable(self, Cooperative)
  794. end
  795. --- Schedules a function to be run after an optional delay.
  796. -- @arg {function} action - The function to execute. Will be converted into a coroutine. The
  797. -- coroutine may yield execution back to the scheduler with an optional
  798. -- number, which will put it to sleep for a time period.
  799. -- @arg {number=0} delay - Delay execution of the action by a time period.
  800. function Cooperative:schedule(action, delay)
  801. table.insert(self.tasks, {
  802. thread = coroutine.create(action),
  803. due = self.currentTime + (delay or 0)
  804. })
  805. end
  806. --- Triggers an update of the Cooperative Scheduler. The clock will be advanced and the scheduler
  807. -- will run any coroutines that are due to be run.
  808. -- @arg {number=0} delta - An amount of time to advance the clock by. It is common to pass in the
  809. -- time in seconds or milliseconds elapsed since this function was last
  810. -- called.
  811. function Cooperative:update(delta)
  812. self.currentTime = self.currentTime + (delta or 0)
  813. for i = #self.tasks, 1, -1 do
  814. local task = self.tasks[i]
  815. if self.currentTime >= task.due then
  816. local success, delay = coroutine.resume(task.thread)
  817. if success then
  818. task.due = math.max(task.due + (delay or 0), self.currentTime)
  819. else
  820. error(delay)
  821. end
  822. if coroutine.status(task.thread) == 'dead' then
  823. table.remove(self.tasks, i)
  824. end
  825. end
  826. end
  827. end
  828. --- Returns whether or not the Cooperative Scheduler's queue is empty.
  829. function Cooperative:isEmpty()
  830. return not next(self.tasks)
  831. end
  832. Scheduler.Cooperative = Cooperative
  833. --- @class Subject
  834. -- @description Subjects function both as an Observer and as an Observable. Subjects inherit all
  835. -- Observable functions, including subscribe. Values can also be pushed to the Subject, which will
  836. -- be broadcasted to any subscribed Observers.
  837. local Subject = setmetatable({}, Observable)
  838. Subject.__index = Subject
  839. Subject.__tostring = constant('Subject')
  840. --- Creates a new Subject.
  841. -- @arg {*...} value - The initial values.
  842. -- @returns {Subject}
  843. function Subject.create(...)
  844. local self = {
  845. value = {...},
  846. observers = {}
  847. }
  848. return setmetatable(self, Subject)
  849. end
  850. --- Creates a new Observer and attaches it to the Subject.
  851. -- @arg {function} onNext - Called when the Subject produces a value.
  852. -- @arg {function} onError - Called when the Subject terminates due to an error.
  853. -- @arg {function} onComplete - Called when the Subject completes normally.
  854. function Subject:subscribe(onNext, onError, onComplete)
  855. table.insert(self.observers, Observer.create(onNext, onError, onComplete))
  856. end
  857. --- Pushes zero or more values to the Subject. It will be broadcasted to all Observers.
  858. -- @arg {*...} values
  859. function Subject:onNext(...)
  860. self.value = {...}
  861. for i = 1, #self.observers do
  862. self.observers[i]:onNext(...)
  863. end
  864. end
  865. --- Signal to all Observers that an error has occurred.
  866. -- @arg {string=} message - A string describing what went wrong.
  867. function Subject:onError(message)
  868. for i = 1, #self.observers do
  869. self.observers[i]:onError(message)
  870. end
  871. end
  872. --- Signal to all Observers that the Subject will not produce any more values.
  873. function Subject:onComplete()
  874. for i = 1, #self.observers do
  875. self.observers[i]:onComplete()
  876. end
  877. end
  878. --- Returns the last value emitted by the Subject, or the initial value passed to the constructor
  879. -- if nothing has been emitted yet.
  880. -- @returns {*...}
  881. function Subject:getValue()
  882. return unpack(self.value or {})
  883. end
  884. Subject.__call = Subject.onNext
  885. rx = {
  886. Observer = Observer,
  887. Observable = Observable,
  888. Scheduler = Scheduler,
  889. scheduler = Scheduler.Immediate.create(),
  890. Subject = Subject
  891. }
  892. return rx