rx.lua 30 KB


  1. local rx
  2. local pack = function(...) return {...} end
  3. local unpack = table.unpack or unpack
  4. local function eq(x, y) return x == y end
  5. local function noop() end
  6. local function identity(x) return x end
  7. local function constant(x) return function() return x end end
  8. --- @class Observer
  9. -- @description Observers are simple objects that receive values from Observables.
  10. local Observer = {}
  11. Observer.__index = Observer
  12. Observer.__tostring = constant('Observer')
  13. --- Creates a new Observer.
  14. -- @arg {function=} onNext - Called when the Observable produces a value.
  15. -- @arg {function=} onError - Called when the Observable terminates due to an error.
  16. -- @arg {function=} onComplete - Called when the Observable completes normally.
  17. -- @returns {Observer}
  18. function Observer.create(onNext, onError, onComplete)
  19. local self = {
  20. _onNext = onNext or noop,
  21. _onError = onError or error,
  22. _onComplete = onComplete or noop,
  23. stopped = false
  24. }
  25. return setmetatable(self, Observer)
  26. end
  27. --- Pushes zero or more values to the Observer.
  28. -- @arg {*...} values
  29. function Observer:onNext(...)
  30. if not self.stopped then
  31. self._onNext(...)
  32. end
  33. end
  34. --- Notify the Observer that an error has occurred.
  35. -- @arg {string=} message - A string describing what went wrong.
  36. function Observer:onError(message)
  37. if not self.stopped then
  38. self.stopped = true
  39. self._onError(message)
  40. end
  41. end
  42. --- Notify the Observer that the sequence has completed and will produce no more values.
  43. function Observer:onComplete()
  44. if not self.stopped then
  45. self.stopped = true
  46. self._onComplete()
  47. end
  48. end
  49. --- @class Observable
  50. -- @description Observables push values to Observers.
  51. local Observable = {}
  52. Observable.__index = Observable
  53. Observable.__tostring = constant('Observable')
  54. --- Creates a new Observable.
  55. -- @arg {function} subscribe - The subscription function that produces values.
  56. -- @returns {Observable}
  57. function Observable.create(subscribe)
  58. local self = {
  59. _subscribe = subscribe
  60. }
  61. return setmetatable(self, Observable)
  62. end
  63. --- Shorthand for creating an Observer and passing it to this Observable's subscription function.
  64. -- @arg {function} onNext - Called when the Observable produces a value.
  65. -- @arg {function} onError - Called when the Observable terminates due to an error.
  66. -- @arg {function} onComplete - Called when the Observable completes normally.
  67. function Observable:subscribe(onNext, onError, onComplete)
  68. if type(onNext) == 'table' then
  69. return self._subscribe(onNext)
  70. else
  71. return self._subscribe(Observer.create(onNext, onError, onComplete))
  72. end
  73. end
  74. --- Creates an Observable that produces a single value.
  75. -- @arg {*} value
  76. -- @returns {Observable}
  77. function Observable.fromValue(value)
  78. return Observable.create(function(observer)
  79. observer:onNext(value)
  80. observer:onComplete()
  81. end)
  82. end
  83. --- Creates an Observable that produces a range of values in a manner similar to a Lua for loop.
  84. -- @arg {number} initial - The first value of the range, or the upper limit if no other arguments
  85. -- are specified.
  86. -- @arg {number=} limit - The second value of the range.
  87. -- @arg {number=1} step - An amount to increment the value by each iteration.
  88. -- @returns {Observable}
  89. function Observable.fromRange(initial, limit, step)
  90. if not limit and not step then
  91. initial, limit = 1, initial
  92. end
  93. step = step or 1
  94. return Observable.create(function(observer)
  95. for i = initial, limit, step do
  96. observer:onNext(i)
  97. end
  98. observer:onComplete()
  99. end)
  100. end
  101. --- Creates an Observable that produces values from a table.
  102. -- @arg {table} table - The table used to create the Observable.
  103. -- @arg {function=pairs} iterator - An iterator used to iterate the table, e.g. pairs or ipairs.
  104. -- @arg {boolean} keys - Whether or not to also emit the keys of the table.
  105. -- @returns {Observable}
  106. function Observable.fromTable(t, iterator, keys)
  107. iterator = iterator or pairs
  108. return Observable.create(function(observer)
  109. for key, value in iterator(t) do
  110. observer:onNext(value, keys and key or nil)
  111. end
  112. observer:onComplete()
  113. end)
  114. end
  115. --- Creates an Observable that produces values when the specified coroutine yields.
  116. -- @arg {thread} coroutine
  117. -- @returns {Observable}
  118. function Observable.fromCoroutine(thread)
  119. thread = type(thread) == 'function' and coroutine.create(thread) or thread
  120. return Observable.create(function(observer)
  121. return rx.scheduler:schedule(function()
  122. while not observer.stopped do
  123. local success, value = coroutine.resume(thread)
  124. if success then
  125. observer:onNext(value)
  126. else
  127. return observer:onError(value)
  128. end
  129. if coroutine.status(thread) == 'dead' then
  130. return observer:onComplete()
  131. end
  132. coroutine.yield()
  133. end
  134. end)
  135. end)
  136. end
  137. --- Subscribes to this Observable and prints values it produces.
  138. -- @arg {string=} name - Prefixes the printed messages with a name.
  139. function Observable:dump(name)
  140. name = name and (name .. ' ') or ''
  141. local onNext = function(...) print(name .. 'onNext: ' .. table.concat({...}, ', ')) end
  142. local onError = function(e) print(name .. 'onError: ' .. e) end
  143. local onComplete = function() print(name .. 'onComplete') end
  144. return self:subscribe(onNext, onError, onComplete)
  145. end
  146. -- The functions below transform the values produced by an Observable and return a new Observable
  147. -- that produces these values.
  148. --- Returns an Observable that only produces values from the original if they are different from
  149. -- the previous value.
  150. -- @arg {function} comparator - A function used to compare 2 values. If unspecified, == is used.
  151. -- @returns {Observable}
  152. function Observable:changes(comparator)
  153. comparator = comparator or eq
  154. return Observable.create(function(observer)
  155. local first = true
  156. local currentValue = nil
  157. local function onNext(value, ...)
  158. if first or not comparator(value, currentValue) then
  159. observer:onNext(value, ...)
  160. currentValue = value
  161. first = false
  162. end
  163. end
  164. local function onError(message)
  165. return observer:onError(onError)
  166. end
  167. local function onComplete()
  168. return observer:onComplete()
  169. end
  170. return self:subscribe(onNext, onError, onComplete)
  171. end)
  172. end
  173. --- Returns a new Observable that runs a combinator function on the most recent values from a set
  174. -- of Observables whenever any of them produce a new value. The results of the combinator function
  175. -- are produced by the new Observable.
  176. -- @arg {Observable...} observables - One or more Observables to combine.
  177. -- @arg {function} combinator - A function that combines the latest result from each Observable and
  178. -- returns a single value.
  179. -- @returns {Observable}
  180. function Observable:combine(...)
  181. local sources = {...}
  182. local combinator = table.remove(sources)
  183. table.insert(sources, 1, self)
  184. return Observable.create(function(observer)
  185. local latest = {}
  186. local pending = {unpack(sources)}
  187. local completed = {}
  188. local function onNext(i)
  189. return function(value)
  190. latest[i] = value
  191. pending[i] = nil
  192. if not next(pending) then
  193. observer:onNext(combinator(unpack(latest)))
  194. end
  195. end
  196. end
  197. local function onError(e)
  198. return observer:onError(e)
  199. end
  200. local function onComplete(i)
  201. return function()
  202. table.insert(completed, i)
  203. if #completed == #sources then
  204. observer:onComplete()
  205. end
  206. end
  207. end
  208. for i = 1, #sources do
  209. sources[i]:subscribe(onNext(i), onError, onComplete(i))
  210. end
  211. end)
  212. end
  213. --- Returns a new Observable that produces the values of the first with falsy values removed.
  214. -- @returns {Observable}
  215. function Observable:compact()
  216. return self:filter(identity)
  217. end
  218. --- Returns a new Observable that produces the values produced by all the specified Observables in
  219. -- the order they are specified.
  220. -- @arg {Observable...} sources - The Observables to concatenate.
  221. -- @returns {Observable}
  222. function Observable:concat(other, ...)
  223. if not other then return self end
  224. local others = {...}
  225. return Observable.create(function(observer)
  226. local function onNext(...)
  227. return observer:onNext(...)
  228. end
  229. local function onError(message)
  230. return observer:onError(message)
  231. end
  232. local function onComplete()
  233. return observer:onComplete()
  234. end
  235. local function chain()
  236. return other:concat(unpack(others)):subscribe(onNext, onError, onComplete)
  237. end
  238. return self:subscribe(onNext, onError, chain)
  239. end)
  240. end
  241. --- Returns a new Observable that produces the values from the original with duplicates removed.
  242. -- @returns {Observable}
  243. function Observable:distinct()
  244. return Observable.create(function(observer)
  245. local values = {}
  246. local function onNext(x)
  247. if not values[x] then
  248. observer:onNext(x)
  249. end
  250. values[x] = true
  251. end
  252. local function onError(e)
  253. return observer:onError(e)
  254. end
  255. local function onComplete()
  256. return observer:onComplete()
  257. end
  258. return self:subscribe(onNext, onError, onComplete)
  259. end)
  260. end
  261. --- Returns a new Observable that only produces values of the first that satisfy a predicate.
  262. -- @arg {function} predicate - The predicate used to filter values.
  263. -- @returns {Observable}
  264. function Observable:filter(predicate)
  265. predicate = predicate or identity
  266. return Observable.create(function(observer)
  267. local function onNext(...)
  268. if predicate(...) then
  269. return observer:onNext(...)
  270. end
  271. end
  272. local function onError(e)
  273. return observer:onError(e)
  274. end
  275. local function onComplete()
  276. return observer:onComplete(e)
  277. end
  278. return self:subscribe(onNext, onError, onComplete)
  279. end)
  280. end
  281. --- Returns a new Observable that produces the first value of the original that satisfies a
  282. -- predicate.
  283. -- @arg {function} predicate - The predicate used to find a value.
  284. function Observable:find(predicate)
  285. predicate = predicate or identity
  286. return Observable.create(function(observer)
  287. local function onNext(...)
  288. if predicate(...) then
  289. observer:onNext(...)
  290. return observer:onComplete()
  291. end
  292. end
  293. local function onError(message)
  294. return observer:onError(e)
  295. end
  296. local function onComplete()
  297. return observer:onComplete()
  298. end
  299. return self:subscribe(onNext, onError, onComplete)
  300. end)
  301. end
  302. --- Returns a new Observable that only produces the first result of the original.
  303. -- @returns {Observable}
  304. function Observable:first()
  305. return self:take(1)
  306. end
  307. --- Returns a new Observable that subscribes to the Observables produced by the original and
  308. -- produces their values.
  309. -- @returns {Observable}
  310. function Observable:flatten()
  311. return Observable.create(function(observer)
  312. local function onError(message)
  313. return observer:onError(message)
  314. end
  315. local function onNext(observable)
  316. local function innerOnNext(...)
  317. observer:onNext(...)
  318. end
  319. observable:subscribe(innerOnNext, onError, noop)
  320. end
  321. local function onComplete()
  322. return observer:onComplete()
  323. end
  324. return self:subscribe(onNext, onError, onComplete)
  325. end)
  326. end
  327. --- Returns a new Observable that only produces the last result of the original.
  328. -- @returns {Observable}
  329. function Observable:last()
  330. return Observable.create(function(observer)
  331. local value
  332. local empty = true
  333. local function onNext(...)
  334. value = {...}
  335. empty = false
  336. end
  337. local function onError(e)
  338. return observer:onError(e)
  339. end
  340. local function onComplete()
  341. if not empty then
  342. observer:onNext(unpack(value or {}))
  343. end
  344. return observer:onComplete()
  345. end
  346. return self:subscribe(onNext, onError, onComplete)
  347. end)
  348. end
  349. --- Returns a new Observable that produces the values of the original transformed by a function.
  350. -- @arg {function} callback - The function to transform values from the original Observable.
  351. -- @returns {Observable}
  352. function Observable:map(callback)
  353. return Observable.create(function(observer)
  354. callback = callback or identity
  355. local function onNext(...)
  356. return observer:onNext(callback(...))
  357. end
  358. local function onError(e)
  359. return observer:onError(e)
  360. end
  361. local function onComplete()
  362. return observer:onComplete()
  363. end
  364. return self:subscribe(onNext, onError, onComplete)
  365. end)
  366. end
  367. --- Returns a new Observable that produces the maximum value produced by the original.
  368. -- @returns {Observable}
  369. function Observable:max()
  370. return self:reduce(math.max)
  371. end
  372. --- Returns a new Observable that produces the minimum value produced by the original.
  373. -- @returns {Observable}
  374. function Observable:min()
  375. return self:reduce(math.min)
  376. end
  377. --- Returns a new Observable that produces the values produced by all the specified Observables in
  378. -- the order they are produced.
  379. -- @arg {Observable...} sources - One or more Observables to merge.
  380. -- @returns {Observable}
  381. function Observable:merge(...)
  382. local sources = {...}
  383. table.insert(sources, 1, self)
  384. return Observable.create(function(observer)
  385. local function onNext(...)
  386. return observer:onNext(...)
  387. end
  388. local function onError(message)
  389. return observer:onError(message)
  390. end
  391. local function onComplete(i)
  392. return function()
  393. sources[i] = nil
  394. if not next(sources) then
  395. observer:onComplete()
  396. end
  397. end
  398. end
  399. for i = 1, #sources do
  400. sources[i]:subscribe(onNext, onError, onComplete(i))
  401. end
  402. end)
  403. end
  404. --- Returns an Observable that produces the values of the original inside tables.
  405. -- @returns {Observable}
  406. function Observable:pack()
  407. return self:map(pack)
  408. end
  409. --- Returns two Observables: one that produces values for which the predicate returns truthy for,
  410. -- and another that produces values for which the predicate returns falsy.
  411. -- @arg {function} predicate - The predicate used to partition the values.
  412. -- @returns {Observable}
  413. -- @returns {Observable}
  414. function Observable:partition(predicate)
  415. return self:filter(predicate), self:reject(predicate)
  416. end
  417. --- Returns a new Observable that produces values computed by extracting the given keys from the
  418. -- tables produced by the original.
  419. -- @arg {string...} keys - The key to extract from the table. Multiple keys can be specified to
  420. -- recursively pluck values from nested tables.
  421. -- @returns {Observable}
  422. function Observable:pluck(key, ...)
  423. if not key then return self end
  424. return Observable.create(function(observer)
  425. local function onNext(t)
  426. return observer:onNext(t[key])
  427. end
  428. local function onError(e)
  429. return observer:onError(e)
  430. end
  431. local function onComplete()
  432. return observer:onComplete()
  433. end
  434. return self:subscribe(onNext, onError, onComplete)
  435. end):pluck(...)
  436. end
  437. --- Returns a new Observable that produces a single value computed by accumulating the results of
  438. -- running a function on each value produced by the original Observable.
  439. -- @arg {function} accumulator - Accumulates the values of the original Observable. Will be passed
  440. -- the return value of the last call as the first argument and the
  441. -- current values as the rest of the arguments.
  442. -- @arg {*} seed - A value to pass to the accumulator the first time it is run.
  443. -- @returns {Observable}
  444. function Observable:reduce(accumulator, seed)
  445. return Observable.create(function(observer)
  446. local result = seed
  447. local first = true
  448. local function onNext(...)
  449. if first and seed == nil then
  450. result = ...
  451. first = false
  452. else
  453. result = accumulator(result, ...)
  454. end
  455. end
  456. local function onError(e)
  457. return observer:onError(e)
  458. end
  459. local function onComplete()
  460. observer:onNext(result)
  461. return observer:onComplete()
  462. end
  463. return self:subscribe(onNext, onError, onComplete)
  464. end)
  465. end
  466. --- Returns a new Observable that produces values from the original which do not satisfy a
  467. -- predicate.
  468. -- @arg {function} predicate - The predicate used to reject values.
  469. -- @returns {Observable}
  470. function Observable:reject(predicate)
  471. predicate = predicate or identity
  472. return Observable.create(function(observer)
  473. local function onNext(...)
  474. if not predicate(...) then
  475. return observer:onNext(...)
  476. end
  477. end
  478. local function onError(e)
  479. return observer:onError(e)
  480. end
  481. local function onComplete()
  482. return observer:onComplete(e)
  483. end
  484. return self:subscribe(onNext, onError, onComplete)
  485. end)
  486. end
  487. --- Returns a new Observable that skips over a specified number of values produced by the original
  488. -- and produces the rest.
  489. -- @arg {number=1} n - The number of values to ignore.
  490. -- @returns {Observable}
  491. function Observable:skip(n)
  492. n = n or 1
  493. return Observable.create(function(observer)
  494. local i = 1
  495. local function onNext(...)
  496. if i > n then
  497. observer:onNext(...)
  498. else
  499. i = i + 1
  500. end
  501. end
  502. local function onError(e)
  503. return observer:onError(e)
  504. end
  505. local function onComplete()
  506. return observer:onComplete()
  507. end
  508. return self:subscribe(onNext, onError, onComplete)
  509. end)
  510. end
  511. --- Returns a new Observable that skips over values produced by the original until the specified
  512. -- Observable produces a value.
  513. -- @arg {Observable} other - The Observable that triggers the production of values.
  514. -- @returns {Observable}
  515. function Observable:skipUntil(other)
  516. return Observable.create(function(observer)
  517. local triggered = false
  518. local function trigger()
  519. triggered = true
  520. end
  521. other:subscribe(trigger, trigger, trigger)
  522. local function onNext(...)
  523. if triggered then
  524. observer:onNext(...)
  525. end
  526. end
  527. local function onError()
  528. if triggered then
  529. observer:onError()
  530. end
  531. end
  532. local function onComplete()
  533. if triggered then
  534. observer:onComplete()
  535. end
  536. end
  537. return self:subscribe(onNext, onError, onComplete)
  538. end)
  539. end
  540. --- Returns a new Observable that skips elements until the predicate returns falsy for one of them.
  541. -- @arg {function} predicate - The predicate used to continue skipping values.
  542. -- @returns {Observable}
  543. function Observable:skipWhile(predicate)
  544. predicate = predicate or identity
  545. return Observable.create(function(observer)
  546. local skipping = true
  547. local function onNext(...)
  548. if skipping then
  549. skipping = predicate(...)
  550. end
  551. if not skipipng then
  552. return observer:onNext(...)
  553. end
  554. end
  555. local function onError(message)
  556. return observer:onError(message)
  557. end
  558. local function onComplete()
  559. return observer:onComplete()
  560. end
  561. return self:subscribe(onNext, onError, onComplete)
  562. end)
  563. end
  564. --- Returns a new Observable that only produces the first n results of the original.
  565. -- @arg {number=1} n - The number of elements to produce before completing.
  566. -- @returns {Observable}
  567. function Observable:take(n)
  568. n = n or 1
  569. return Observable.create(function(observer)
  570. if n <= 0 then
  571. observer:onComplete()
  572. return
  573. end
  574. local i = 1
  575. local function onNext(...)
  576. observer:onNext(...)
  577. i = i + 1
  578. if i > n then
  579. observer:onComplete()
  580. end
  581. end
  582. local function onError(e)
  583. return observer:onError(e)
  584. end
  585. local function onComplete()
  586. return observer:onComplete()
  587. end
  588. return self:subscribe(onNext, onError, onComplete)
  589. end)
  590. end
  591. --- Returns a new Observable that completes when the specified Observable fires.
  592. -- @arg {Observable} other - The Observable that triggers completion of the original.
  593. -- @returns {Observable}
  594. function Observable:takeUntil(other)
  595. return Observable.create(function(observer)
  596. local function onNext(...)
  597. return observer:onNext(...)
  598. end
  599. local function onError(e)
  600. return observer:onError(e)
  601. end
  602. local function onComplete()
  603. return observer:onComplete()
  604. end
  605. other:subscribe(onComplete, onComplete, onComplete)
  606. return self:subscribe(onNext, onError, onComplete)
  607. end)
  608. end
  609. --- Returns a new Observable that produces elements until the predicate returns falsy.
  610. -- @arg {function} predicate - The predicate used to continue production of values.
  611. -- @returns {Observable}
  612. function Observable:takeWhile(predicate)
  613. predicate = predicate or identity
  614. return Observable.create(function(observer)
  615. local taking = true
  616. local function onNext(...)
  617. if taking then
  618. taking = predicate(...)
  619. if taking then
  620. return observer:onNext(...)
  621. else
  622. return observer:onComplete()
  623. end
  624. end
  625. end
  626. local function onError(message)
  627. return observer:onError(message)
  628. end
  629. local function onComplete()
  630. return observer:onComplete()
  631. end
  632. return self:subscribe(onNext, onError, onComplete)
  633. end)
  634. end
  635. --- Runs a function each time this Observable has activity. Similar to subscribe but does not
  636. -- create a subscription.
  637. -- @arg {function=} onNext - Run when the Observable produces values.
  638. -- @arg {function=} onError - Run when the Observable encounters a problem.
  639. -- @arg {function=} onComplete - Run when the Observable completes.
  640. -- @returns {Observable}
  641. function Observable:tap(_onNext, _onError, _onComplete)
  642. _onNext, _onError, _onComplete = _onNext or noop, _onError or noop, _onComplete or noop
  643. return Observable.create(function(observer)
  644. local function onNext(...)
  645. _onNext(...)
  646. return observer:onNext(...)
  647. end
  648. local function onError(message)
  649. _onError(message)
  650. return observer:onError(message)
  651. end
  652. local function onComplete()
  653. _onComplete()
  654. return observer:onComplete()
  655. end
  656. return self:subscribe(onNext, onError, onComplete)
  657. end)
  658. end
  659. --- Returns an Observable that unpacks the tables produced by the original.
  660. -- @returns {Observable}
  661. function Observable:unpack()
  662. return self:map(unpack)
  663. end
  664. --- Returns an Observable that takes any values produced by the original that consist of multiple
  665. -- return values and produces each value individually.
  666. -- @returns {Observable}
  667. function Observable:unwrap()
  668. return Observable.create(function(observer)
  669. local function onNext(...)
  670. local values = {...}
  671. for i = 1, #values do
  672. observer:onNext(values[i])
  673. end
  674. end
  675. local function onError(message)
  676. return observer:onError(message)
  677. end
  678. local function onComplete()
  679. return observer:onComplete()
  680. end
  681. return self:subscribe(onNext, onError, onComplete)
  682. end)
  683. end
  684. --- Returns an Observable that produces a sliding window of the values produced by the original.
  685. -- @arg {number} size - The size of the window. The returned observable will produce this number
  686. -- of the most recent values as multiple arguments to onNext.
  687. -- @returns {Observable}
  688. function Observable:window(size)
  689. return Observable.create(function(observer)
  690. local window = {}
  691. local function onNext(value)
  692. table.insert(window, value)
  693. if #window > size then
  694. table.remove(window, 1)
  695. observer:onNext(unpack(window))
  696. end
  697. end
  698. local function onError(message)
  699. return observer:onError(message)
  700. end
  701. local function onComplete()
  702. return observer:onComplete()
  703. end
  704. return self:subscribe(onNext, onError, onComplete)
  705. end)
  706. end
  707. --- Returns an Observable that produces values from the original along with the most recently
  708. -- produced value from all other specified Observables. Note that only the first argument from each
  709. -- source Observable is used.
  710. -- @arg {Observable...} sources - The Observables to include the most recent values from.
  711. -- @returns {Observable}
  712. function Observable:with(...)
  713. local sources = {...}
  714. return Observable.create(function(observer)
  715. local latest = {}
  716. local function setLatest(i)
  717. return function(value)
  718. latest[i] = value
  719. end
  720. end
  721. local function onNext(value)
  722. return observer:onNext(value, unpack(latest))
  723. end
  724. local function onError(e)
  725. return observer:onError(e)
  726. end
  727. local function onComplete()
  728. return observer:onComplete()
  729. end
  730. for i = 1, #sources do
  731. sources[i]:subscribe(setLatest(i), noop, noop)
  732. end
  733. return self:subscribe(onNext, onError, onComplete)
  734. end)
  735. end
  736. --- Returns an Observable that buffers values from the original and produces them as multiple
  737. -- values.
  738. -- @arg {number} size - The size of the buffer.
  739. function Observable:wrap(size)
  740. return Observable.create(function(observer)
  741. local buffer = {}
  742. local function emit()
  743. if #buffer > 0 then
  744. observer:onNext(unpack(buffer))
  745. buffer = {}
  746. end
  747. end
  748. local function onNext(...)
  749. local values = {...}
  750. for i = 1, #values do
  751. table.insert(buffer, values[i])
  752. if #buffer >= size then
  753. return emit()
  754. end
  755. end
  756. end
  757. local function onError(message)
  758. emit()
  759. return observer:onError(message)
  760. end
  761. local function onComplete()
  762. emit()
  763. return observer:onComplete()
  764. end
  765. return self:subscribe(onNext, onError, onComplete)
  766. end)
  767. end
  768. --- @class Scheduler
  769. -- @description Schedulers manage groups of Observables.
  770. local Scheduler = {}
  771. --- @class ImmediateScheduler
  772. -- @description Schedules Observables by running all operations immediately.
  773. local Immediate = {}
  774. Immediate.__index = Immediate
  775. Immediate.__tostring = constant('ImmediateScheduler')
  776. --- Creates a new Immediate Scheduler.
  777. -- @returns {Scheduler.Immediate}
  778. function Immediate.create()
  779. return setmetatable({}, Immediate)
  780. end
  781. --- Schedules a function to be run on the scheduler. It is executed immediately.
  782. -- @arg {function} action - The function to execute.
  783. function Immediate:schedule(action)
  784. action()
  785. end
  786. Scheduler.Immediate = Immediate
  787. --- @class CooperativeScheduler
  788. -- @description Manages Observables using coroutines and a virtual clock that must be updated
  789. -- manually.
  790. local Cooperative = {}
  791. Cooperative.__index = Cooperative
  792. Cooperative.__tostring = constant('CooperativeScheduler')
  793. --- Creates a new Cooperative Scheduler.
  794. -- @arg {number=0} currentTime - A time to start the scheduler at.
  795. -- @returns {Scheduler.Cooperative}
  796. function Cooperative.create(currentTime)
  797. local self = {
  798. tasks = {},
  799. currentTime = currentTime or 0
  800. }
  801. return setmetatable(self, Cooperative)
  802. end
  803. --- Schedules a function to be run after an optional delay.
  804. -- @arg {function} action - The function to execute. Will be converted into a coroutine. The
  805. -- coroutine may yield execution back to the scheduler with an optional
  806. -- number, which will put it to sleep for a time period.
  807. -- @arg {number=0} delay - Delay execution of the action by a time period.
  808. function Cooperative:schedule(action, delay)
  809. table.insert(self.tasks, {
  810. thread = coroutine.create(action),
  811. due = self.currentTime + (delay or 0)
  812. })
  813. end
  814. --- Triggers an update of the Cooperative Scheduler. The clock will be advanced and the scheduler
  815. -- will run any coroutines that are due to be run.
  816. -- @arg {number=0} delta - An amount of time to advance the clock by. It is common to pass in the
  817. -- time in seconds or milliseconds elapsed since this function was last
  818. -- called.
  819. function Cooperative:update(delta)
  820. self.currentTime = self.currentTime + (delta or 0)
  821. for i = #self.tasks, 1, -1 do
  822. local task = self.tasks[i]
  823. if self.currentTime >= task.due then
  824. local success, delay = coroutine.resume(task.thread)
  825. if success then
  826. task.due = math.max(task.due + (delay or 0), self.currentTime)
  827. else
  828. error(delay)
  829. end
  830. if coroutine.status(task.thread) == 'dead' then
  831. table.remove(self.tasks, i)
  832. end
  833. end
  834. end
  835. end
  836. --- Returns whether or not the Cooperative Scheduler's queue is empty.
  837. function Cooperative:isEmpty()
  838. return not next(self.tasks)
  839. end
  840. Scheduler.Cooperative = Cooperative
  841. --- @class Subject
  842. -- @description Subjects function both as an Observer and as an Observable. Subjects inherit all
  843. -- Observable functions, including subscribe. Values can also be pushed to the Subject, which will
  844. -- be broadcasted to any subscribed Observers.
  845. local Subject = setmetatable({}, Observable)
  846. Subject.__index = Subject
  847. Subject.__tostring = constant('Subject')
  848. --- Creates a new Subject.
  849. -- @arg {*...} value - The initial values.
  850. -- @returns {Subject}
  851. function Subject.create(...)
  852. local self = {
  853. value = {...},
  854. observers = {}
  855. }
  856. return setmetatable(self, Subject)
  857. end
  858. --- Creates a new Observer and attaches it to the Subject.
  859. -- @arg {function} onNext - Called when the Subject produces a value.
  860. -- @arg {function} onError - Called when the Subject terminates due to an error.
  861. -- @arg {function} onComplete - Called when the Subject completes normally.
  862. function Subject:subscribe(onNext, onError, onComplete)
  863. table.insert(self.observers, Observer.create(onNext, onError, onComplete))
  864. end
  865. --- Pushes zero or more values to the Subject. It will be broadcasted to all Observers.
  866. -- @arg {*...} values
  867. function Subject:onNext(...)
  868. self.value = {...}
  869. for i = 1, #self.observers do
  870. self.observers[i]:onNext(...)
  871. end
  872. end
  873. --- Signal to all Observers that an error has occurred.
  874. -- @arg {string=} message - A string describing what went wrong.
  875. function Subject:onError(message)
  876. for i = 1, #self.observers do
  877. self.observers[i]:onError(message)
  878. end
  879. end
  880. --- Signal to all Observers that the Subject will not produce any more values.
  881. function Subject:onComplete()
  882. for i = 1, #self.observers do
  883. self.observers[i]:onComplete()
  884. end
  885. end
  886. --- Returns the last value emitted by the Subject, or the initial value passed to the constructor
  887. -- if nothing has been emitted yet.
  888. -- @returns {*...}
  889. function Subject:getValue()
  890. return unpack(self.value or {})
  891. end
  892. Subject.__call = Subject.onNext
  893. rx = {
  894. Observer = Observer,
  895. Observable = Observable,
  896. Scheduler = Scheduler,
  897. scheduler = Scheduler.Immediate.create(),
  898. Subject = Subject
  899. }
  900. return rx