rx.lua 33 KB


  1. -- RxLua v0.0.1
  2. -- https://github.com/bjornbytes/rxlua
  3. -- MIT License
  4. local util = {}
  5. util.pack = table.pack or function(...) return { n = select('#', ...), ... } end
  6. util.unpack = table.unpack or unpack
  7. util.eq = function(x, y) return x == y end
  8. util.noop = function() end
  9. util.identity = function(x) return x end
  10. util.constant = function(x) return function() return x end end
  11. --- @class Subscription
  12. -- @description A handle representing the link between an Observer and an Observable, as well as any
  13. -- work required to clean up after the Observable completes or the Observer unsubscribes.
  14. local Subscription = {}
  15. Subscription.__index = Subscription
  16. Subscription.__tostring = util.constant('Subscription')
  17. --- Creates a new Subscription.
  18. -- @arg {function=} action - The action to run when the subscription is unsubscribed. It will only
  19. -- be run once.
  20. -- @returns {Subscription}
  21. function Subscription.create(action)
  22. local self = {
  23. action = action or util.noop,
  24. unsubscribed = false
  25. }
  26. return setmetatable(self, Subscription)
  27. end
  28. --- Unsubscribes the subscription, performing any necessary cleanup work.
  29. function Subscription:unsubscribe()
  30. if self.unsubscribed then return end
  31. self.action(self)
  32. self.unsubscribed = true
  33. end
  34. --- @class Observer
  35. -- @description Observers are simple objects that receive values from Observables.
  36. local Observer = {}
  37. Observer.__index = Observer
  38. Observer.__tostring = util.constant('Observer')
  39. --- Creates a new Observer.
  40. -- @arg {function=} onNext - Called when the Observable produces a value.
  41. -- @arg {function=} onError - Called when the Observable terminates due to an error.
  42. -- @arg {function=} onComplete - Called when the Observable completes normally.
  43. -- @returns {Observer}
  44. function Observer.create(onNext, onError, onComplete)
  45. local self = {
  46. _onNext = onNext or util.noop,
  47. _onError = onError or error,
  48. _onComplete = onComplete or util.noop,
  49. stopped = false
  50. }
  51. return setmetatable(self, Observer)
  52. end
  53. --- Pushes zero or more values to the Observer.
  54. -- @arg {*...} values
  55. function Observer:onNext(...)
  56. if not self.stopped then
  57. self._onNext(...)
  58. end
  59. end
  60. --- Notify the Observer that an error has occurred.
  61. -- @arg {string=} message - A string describing what went wrong.
  62. function Observer:onError(message)
  63. if not self.stopped then
  64. self.stopped = true
  65. self._onError(message)
  66. end
  67. end
  68. --- Notify the Observer that the sequence has completed and will produce no more values.
  69. function Observer:onComplete()
  70. if not self.stopped then
  71. self.stopped = true
  72. self._onComplete()
  73. end
  74. end
  75. --- @class Observable
  76. -- @description Observables push values to Observers.
  77. local Observable = {}
  78. Observable.__index = Observable
  79. Observable.__tostring = util.constant('Observable')
  80. --- Creates a new Observable.
  81. -- @arg {function} subscribe - The subscription function that produces values.
  82. -- @returns {Observable}
  83. function Observable.create(subscribe)
  84. local self = {
  85. _subscribe = subscribe
  86. }
  87. return setmetatable(self, Observable)
  88. end
  89. --- Shorthand for creating an Observer and passing it to this Observable's subscription function.
  90. -- @arg {function} onNext - Called when the Observable produces a value.
  91. -- @arg {function} onError - Called when the Observable terminates due to an error.
  92. -- @arg {function} onComplete - Called when the Observable completes normally.
  93. function Observable:subscribe(onNext, onError, onComplete)
  94. if type(onNext) == 'table' then
  95. return self._subscribe(onNext)
  96. else
  97. return self._subscribe(Observer.create(onNext, onError, onComplete))
  98. end
  99. end
  100. --- Creates an Observable that produces a single value.
  101. -- @arg {*} value
  102. -- @returns {Observable}
  103. function Observable.fromValue(value)
  104. return Observable.create(function(observer)
  105. observer:onNext(value)
  106. observer:onComplete()
  107. end)
  108. end
  109. --- Creates an Observable that produces a range of values in a manner similar to a Lua for loop.
  110. -- @arg {number} initial - The first value of the range, or the upper limit if no other arguments
  111. -- are specified.
  112. -- @arg {number=} limit - The second value of the range.
  113. -- @arg {number=1} step - An amount to increment the value by each iteration.
  114. -- @returns {Observable}
  115. function Observable.fromRange(initial, limit, step)
  116. if not limit and not step then
  117. initial, limit = 1, initial
  118. end
  119. step = step or 1
  120. return Observable.create(function(observer)
  121. for i = initial, limit, step do
  122. observer:onNext(i)
  123. end
  124. observer:onComplete()
  125. end)
  126. end
  127. --- Creates an Observable that produces values from a table.
  128. -- @arg {table} table - The table used to create the Observable.
  129. -- @arg {function=pairs} iterator - An iterator used to iterate the table, e.g. pairs or ipairs.
  130. -- @arg {boolean} keys - Whether or not to also emit the keys of the table.
  131. -- @returns {Observable}
  132. function Observable.fromTable(t, iterator, keys)
  133. iterator = iterator or pairs
  134. return Observable.create(function(observer)
  135. for key, value in iterator(t) do
  136. observer:onNext(value, keys and key or nil)
  137. end
  138. observer:onComplete()
  139. end)
  140. end
  141. --- Creates an Observable that produces values when the specified coroutine yields.
  142. -- @arg {thread} coroutine
  143. -- @returns {Observable}
  144. function Observable.fromCoroutine(thread, scheduler)
  145. thread = type(thread) == 'function' and coroutine.create(thread) or thread
  146. return Observable.create(function(observer)
  147. return scheduler:schedule(function()
  148. while not observer.stopped do
  149. local success, value = coroutine.resume(thread)
  150. if success then
  151. observer:onNext(value)
  152. else
  153. return observer:onError(value)
  154. end
  155. if coroutine.status(thread) == 'dead' then
  156. return observer:onComplete()
  157. end
  158. coroutine.yield()
  159. end
  160. end)
  161. end)
  162. end
  163. --- Subscribes to this Observable and prints values it produces.
  164. -- @arg {string=} name - Prefixes the printed messages with a name.
  165. -- @arg {function=tostring} formatter - A function that formats one or more values to be printed.
  166. function Observable:dump(name, formatter)
  167. name = name and (name .. ' ') or ''
  168. formatter = formatter or tostring
  169. local onNext = function(...) print(name .. 'onNext: ' .. formatter(...)) end
  170. local onError = function(e) print(name .. 'onError: ' .. e) end
  171. local onComplete = function() print(name .. 'onComplete') end
  172. return self:subscribe(onNext, onError, onComplete)
  173. end
  174. --- Returns an Observable that only produces values from the original if they are different from
  175. -- the previous value.
  176. -- @arg {function} comparator - A function used to compare 2 values. If unspecified, == is used.
  177. -- @returns {Observable}
  178. function Observable:distinctUntilChanged(comparator)
  179. comparator = comparator or util.eq
  180. return Observable.create(function(observer)
  181. local first = true
  182. local currentValue = nil
  183. local function onNext(value, ...)
  184. if first or not comparator(value, currentValue) then
  185. observer:onNext(value, ...)
  186. currentValue = value
  187. first = false
  188. end
  189. end
  190. local function onError(message)
  191. return observer:onError(onError)
  192. end
  193. local function onComplete()
  194. return observer:onComplete()
  195. end
  196. return self:subscribe(onNext, onError, onComplete)
  197. end)
  198. end
  199. --- Returns a new Observable that runs a combinator function on the most recent values from a set
  200. -- of Observables whenever any of them produce a new value. The results of the combinator function
  201. -- are produced by the new Observable.
  202. -- @arg {Observable...} observables - One or more Observables to combine.
  203. -- @arg {function} combinator - A function that combines the latest result from each Observable and
  204. -- returns a single value.
  205. -- @returns {Observable}
  206. function Observable:combine(...)
  207. local sources = {...}
  208. local combinator = table.remove(sources)
  209. if type(combinator) ~= 'function' then
  210. table.insert(sources, combinator)
  211. combinator = function(...) return ... end
  212. end
  213. table.insert(sources, 1, self)
  214. return Observable.create(function(observer)
  215. local latest = {}
  216. local pending = {util.unpack(sources)}
  217. local completed = {}
  218. local function onNext(i)
  219. return function(value)
  220. latest[i] = value
  221. pending[i] = nil
  222. if not next(pending) then
  223. observer:onNext(combinator(util.unpack(latest)))
  224. end
  225. end
  226. end
  227. local function onError(e)
  228. return observer:onError(e)
  229. end
  230. local function onComplete(i)
  231. return function()
  232. table.insert(completed, i)
  233. if #completed == #sources then
  234. observer:onComplete()
  235. end
  236. end
  237. end
  238. for i = 1, #sources do
  239. sources[i]:subscribe(onNext(i), onError, onComplete(i))
  240. end
  241. end)
  242. end
  243. --- Returns a new Observable that produces the values of the first with falsy values removed.
  244. -- @returns {Observable}
  245. function Observable:compact()
  246. return self:filter(util.identity)
  247. end
  248. --- Returns a new Observable that produces the values produced by all the specified Observables in
  249. -- the order they are specified.
  250. -- @arg {Observable...} sources - The Observables to concatenate.
  251. -- @returns {Observable}
  252. function Observable:concat(other, ...)
  253. if not other then return self end
  254. local others = {...}
  255. return Observable.create(function(observer)
  256. local function onNext(...)
  257. return observer:onNext(...)
  258. end
  259. local function onError(message)
  260. return observer:onError(message)
  261. end
  262. local function onComplete()
  263. return observer:onComplete()
  264. end
  265. local function chain()
  266. return other:concat(util.unpack(others)):subscribe(onNext, onError, onComplete)
  267. end
  268. return self:subscribe(onNext, onError, chain)
  269. end)
  270. end
  271. --- Returns a new Observable that produces the values from the original with duplicates removed.
  272. -- @returns {Observable}
  273. function Observable:distinct()
  274. return Observable.create(function(observer)
  275. local values = {}
  276. local function onNext(x)
  277. if not values[x] then
  278. observer:onNext(x)
  279. end
  280. values[x] = true
  281. end
  282. local function onError(e)
  283. return observer:onError(e)
  284. end
  285. local function onComplete()
  286. return observer:onComplete()
  287. end
  288. return self:subscribe(onNext, onError, onComplete)
  289. end)
  290. end
  291. --- Returns a new Observable that only produces values of the first that satisfy a predicate.
  292. -- @arg {function} predicate - The predicate used to filter values.
  293. -- @returns {Observable}
  294. function Observable:filter(predicate)
  295. predicate = predicate or util.identity
  296. return Observable.create(function(observer)
  297. local function onNext(...)
  298. if predicate(...) then
  299. return observer:onNext(...)
  300. end
  301. end
  302. local function onError(e)
  303. return observer:onError(e)
  304. end
  305. local function onComplete()
  306. return observer:onComplete(e)
  307. end
  308. return self:subscribe(onNext, onError, onComplete)
  309. end)
  310. end
  311. --- Returns a new Observable that produces the first value of the original that satisfies a
  312. -- predicate.
  313. -- @arg {function} predicate - The predicate used to find a value.
  314. function Observable:find(predicate)
  315. predicate = predicate or util.identity
  316. return Observable.create(function(observer)
  317. local function onNext(...)
  318. if predicate(...) then
  319. observer:onNext(...)
  320. return observer:onComplete()
  321. end
  322. end
  323. local function onError(message)
  324. return observer:onError(e)
  325. end
  326. local function onComplete()
  327. return observer:onComplete()
  328. end
  329. return self:subscribe(onNext, onError, onComplete)
  330. end)
  331. end
  332. --- Returns a new Observable that only produces the first result of the original.
  333. -- @returns {Observable}
  334. function Observable:first()
  335. return self:take(1)
  336. end
  337. --- Returns a new Observable that subscribes to the Observables produced by the original and
  338. -- produces their values.
  339. -- @returns {Observable}
  340. function Observable:flatten()
  341. return Observable.create(function(observer)
  342. local function onError(message)
  343. return observer:onError(message)
  344. end
  345. local function onNext(observable)
  346. local function innerOnNext(...)
  347. observer:onNext(...)
  348. end
  349. observable:subscribe(innerOnNext, onError, util.noop)
  350. end
  351. local function onComplete()
  352. return observer:onComplete()
  353. end
  354. return self:subscribe(onNext, onError, onComplete)
  355. end)
  356. end
  357. --- Returns a new Observable that only produces the last result of the original.
  358. -- @returns {Observable}
  359. function Observable:last()
  360. return Observable.create(function(observer)
  361. local value
  362. local empty = true
  363. local function onNext(...)
  364. value = {...}
  365. empty = false
  366. end
  367. local function onError(e)
  368. return observer:onError(e)
  369. end
  370. local function onComplete()
  371. if not empty then
  372. observer:onNext(util.unpack(value or {}))
  373. end
  374. return observer:onComplete()
  375. end
  376. return self:subscribe(onNext, onError, onComplete)
  377. end)
  378. end
  379. --- Returns a new Observable that produces the values of the original transformed by a function.
  380. -- @arg {function} callback - The function to transform values from the original Observable.
  381. -- @returns {Observable}
  382. function Observable:map(callback)
  383. return Observable.create(function(observer)
  384. callback = callback or util.identity
  385. local function onNext(...)
  386. return observer:onNext(callback(...))
  387. end
  388. local function onError(e)
  389. return observer:onError(e)
  390. end
  391. local function onComplete()
  392. return observer:onComplete()
  393. end
  394. return self:subscribe(onNext, onError, onComplete)
  395. end)
  396. end
  397. --- Returns a new Observable that produces the maximum value produced by the original.
  398. -- @returns {Observable}
  399. function Observable:max()
  400. return self:reduce(math.max)
  401. end
  402. --- Returns a new Observable that produces the values produced by all the specified Observables in
  403. -- the order they are produced.
  404. -- @arg {Observable...} sources - One or more Observables to merge.
  405. -- @returns {Observable}
  406. function Observable:merge(...)
  407. local sources = {...}
  408. table.insert(sources, 1, self)
  409. return Observable.create(function(observer)
  410. local function onNext(...)
  411. return observer:onNext(...)
  412. end
  413. local function onError(message)
  414. return observer:onError(message)
  415. end
  416. local function onComplete(i)
  417. return function()
  418. sources[i] = nil
  419. if not next(sources) then
  420. observer:onComplete()
  421. end
  422. end
  423. end
  424. for i = 1, #sources do
  425. sources[i]:subscribe(onNext, onError, onComplete(i))
  426. end
  427. end)
  428. end
  429. --- Returns a new Observable that produces the minimum value produced by the original.
  430. -- @returns {Observable}
  431. function Observable:min()
  432. return self:reduce(math.min)
  433. end
  434. --- Returns an Observable that produces the values of the original inside tables.
  435. -- @returns {Observable}
  436. function Observable:pack()
  437. return self:map(util.pack)
  438. end
  439. --- Returns two Observables: one that produces values for which the predicate returns truthy for,
  440. -- and another that produces values for which the predicate returns falsy.
  441. -- @arg {function} predicate - The predicate used to partition the values.
  442. -- @returns {Observable}
  443. -- @returns {Observable}
  444. function Observable:partition(predicate)
  445. return self:filter(predicate), self:reject(predicate)
  446. end
  447. --- Returns a new Observable that produces values computed by extracting the given keys from the
  448. -- tables produced by the original.
  449. -- @arg {string...} keys - The key to extract from the table. Multiple keys can be specified to
  450. -- recursively pluck values from nested tables.
  451. -- @returns {Observable}
  452. function Observable:pluck(key, ...)
  453. if not key then return self end
  454. return Observable.create(function(observer)
  455. local function onNext(t)
  456. return observer:onNext(t[key])
  457. end
  458. local function onError(e)
  459. return observer:onError(e)
  460. end
  461. local function onComplete()
  462. return observer:onComplete()
  463. end
  464. return self:subscribe(onNext, onError, onComplete)
  465. end):pluck(...)
  466. end
  467. --- Returns a new Observable that produces a single value computed by accumulating the results of
  468. -- running a function on each value produced by the original Observable.
  469. -- @arg {function} accumulator - Accumulates the values of the original Observable. Will be passed
  470. -- the return value of the last call as the first argument and the
  471. -- current values as the rest of the arguments.
  472. -- @arg {*} seed - A value to pass to the accumulator the first time it is run.
  473. -- @returns {Observable}
  474. function Observable:reduce(accumulator, seed)
  475. return Observable.create(function(observer)
  476. local result = seed
  477. local first = true
  478. local function onNext(...)
  479. if first and seed == nil then
  480. result = ...
  481. first = false
  482. else
  483. result = accumulator(result, ...)
  484. end
  485. end
  486. local function onError(e)
  487. return observer:onError(e)
  488. end
  489. local function onComplete()
  490. observer:onNext(result)
  491. return observer:onComplete()
  492. end
  493. return self:subscribe(onNext, onError, onComplete)
  494. end)
  495. end
  496. --- Returns a new Observable that produces values from the original which do not satisfy a
  497. -- predicate.
  498. -- @arg {function} predicate - The predicate used to reject values.
  499. -- @returns {Observable}
  500. function Observable:reject(predicate)
  501. predicate = predicate or util.identity
  502. return Observable.create(function(observer)
  503. local function onNext(...)
  504. if not predicate(...) then
  505. return observer:onNext(...)
  506. end
  507. end
  508. local function onError(e)
  509. return observer:onError(e)
  510. end
  511. local function onComplete()
  512. return observer:onComplete(e)
  513. end
  514. return self:subscribe(onNext, onError, onComplete)
  515. end)
  516. end
  517. --- Returns a new Observable that skips over a specified number of values produced by the original
  518. -- and produces the rest.
  519. -- @arg {number=1} n - The number of values to ignore.
  520. -- @returns {Observable}
  521. function Observable:skip(n)
  522. n = n or 1
  523. return Observable.create(function(observer)
  524. local i = 1
  525. local function onNext(...)
  526. if i > n then
  527. observer:onNext(...)
  528. else
  529. i = i + 1
  530. end
  531. end
  532. local function onError(e)
  533. return observer:onError(e)
  534. end
  535. local function onComplete()
  536. return observer:onComplete()
  537. end
  538. return self:subscribe(onNext, onError, onComplete)
  539. end)
  540. end
  541. --- Returns a new Observable that skips over values produced by the original until the specified
  542. -- Observable produces a value.
  543. -- @arg {Observable} other - The Observable that triggers the production of values.
  544. -- @returns {Observable}
  545. function Observable:skipUntil(other)
  546. return Observable.create(function(observer)
  547. local triggered = false
  548. local function trigger()
  549. triggered = true
  550. end
  551. other:subscribe(trigger, trigger, trigger)
  552. local function onNext(...)
  553. if triggered then
  554. observer:onNext(...)
  555. end
  556. end
  557. local function onError()
  558. if triggered then
  559. observer:onError()
  560. end
  561. end
  562. local function onComplete()
  563. if triggered then
  564. observer:onComplete()
  565. end
  566. end
  567. return self:subscribe(onNext, onError, onComplete)
  568. end)
  569. end
  570. --- Returns a new Observable that skips elements until the predicate returns falsy for one of them.
  571. -- @arg {function} predicate - The predicate used to continue skipping values.
  572. -- @returns {Observable}
  573. function Observable:skipWhile(predicate)
  574. predicate = predicate or util.identity
  575. return Observable.create(function(observer)
  576. local skipping = true
  577. local function onNext(...)
  578. if skipping then
  579. skipping = predicate(...)
  580. end
  581. if not skipping then
  582. return observer:onNext(...)
  583. end
  584. end
  585. local function onError(message)
  586. return observer:onError(message)
  587. end
  588. local function onComplete()
  589. return observer:onComplete()
  590. end
  591. return self:subscribe(onNext, onError, onComplete)
  592. end)
  593. end
  594. --- Returns a new Observable that only produces the first n results of the original.
  595. -- @arg {number=1} n - The number of elements to produce before completing.
  596. -- @returns {Observable}
  597. function Observable:take(n)
  598. n = n or 1
  599. return Observable.create(function(observer)
  600. if n <= 0 then
  601. observer:onComplete()
  602. return
  603. end
  604. local i = 1
  605. local function onNext(...)
  606. observer:onNext(...)
  607. i = i + 1
  608. if i > n then
  609. observer:onComplete()
  610. end
  611. end
  612. local function onError(e)
  613. return observer:onError(e)
  614. end
  615. local function onComplete()
  616. return observer:onComplete()
  617. end
  618. return self:subscribe(onNext, onError, onComplete)
  619. end)
  620. end
  621. --- Returns a new Observable that completes when the specified Observable fires.
  622. -- @arg {Observable} other - The Observable that triggers completion of the original.
  623. -- @returns {Observable}
  624. function Observable:takeUntil(other)
  625. return Observable.create(function(observer)
  626. local function onNext(...)
  627. return observer:onNext(...)
  628. end
  629. local function onError(e)
  630. return observer:onError(e)
  631. end
  632. local function onComplete()
  633. return observer:onComplete()
  634. end
  635. other:subscribe(onComplete, onComplete, onComplete)
  636. return self:subscribe(onNext, onError, onComplete)
  637. end)
  638. end
  639. --- Returns a new Observable that produces elements until the predicate returns falsy.
  640. -- @arg {function} predicate - The predicate used to continue production of values.
  641. -- @returns {Observable}
  642. function Observable:takeWhile(predicate)
  643. predicate = predicate or util.identity
  644. return Observable.create(function(observer)
  645. local taking = true
  646. local function onNext(...)
  647. if taking then
  648. taking = predicate(...)
  649. if taking then
  650. return observer:onNext(...)
  651. else
  652. return observer:onComplete()
  653. end
  654. end
  655. end
  656. local function onError(message)
  657. return observer:onError(message)
  658. end
  659. local function onComplete()
  660. return observer:onComplete()
  661. end
  662. return self:subscribe(onNext, onError, onComplete)
  663. end)
  664. end
  665. --- Runs a function each time this Observable has activity. Similar to subscribe but does not
  666. -- create a subscription.
  667. -- @arg {function=} onNext - Run when the Observable produces values.
  668. -- @arg {function=} onError - Run when the Observable encounters a problem.
  669. -- @arg {function=} onComplete - Run when the Observable completes.
  670. -- @returns {Observable}
  671. function Observable:tap(_onNext, _onError, _onComplete)
  672. _onNext = _onNext or util.noop
  673. _onError = _onError or util.noop
  674. _onComplete = _onComplete or util.noop
  675. return Observable.create(function(observer)
  676. local function onNext(...)
  677. _onNext(...)
  678. return observer:onNext(...)
  679. end
  680. local function onError(message)
  681. _onError(message)
  682. return observer:onError(message)
  683. end
  684. local function onComplete()
  685. _onComplete()
  686. return observer:onComplete()
  687. end
  688. return self:subscribe(onNext, onError, onComplete)
  689. end)
  690. end
  691. --- Returns an Observable that unpacks the tables produced by the original.
  692. -- @returns {Observable}
  693. function Observable:unpack()
  694. return self:map(util.unpack)
  695. end
  696. --- Returns an Observable that takes any values produced by the original that consist of multiple
  697. -- return values and produces each value individually.
  698. -- @returns {Observable}
  699. function Observable:unwrap()
  700. return Observable.create(function(observer)
  701. local function onNext(...)
  702. local values = {...}
  703. for i = 1, #values do
  704. observer:onNext(values[i])
  705. end
  706. end
  707. local function onError(message)
  708. return observer:onError(message)
  709. end
  710. local function onComplete()
  711. return observer:onComplete()
  712. end
  713. return self:subscribe(onNext, onError, onComplete)
  714. end)
  715. end
  716. --- Returns an Observable that produces a sliding window of the values produced by the original.
  717. -- @arg {number} size - The size of the window. The returned observable will produce this number
  718. -- of the most recent values as multiple arguments to onNext.
  719. -- @returns {Observable}
  720. function Observable:window(size)
  721. return Observable.create(function(observer)
  722. local window = {}
  723. local function onNext(value)
  724. table.insert(window, value)
  725. if #window >= size then
  726. observer:onNext(util.unpack(window))
  727. table.remove(window, 1)
  728. end
  729. end
  730. local function onError(message)
  731. return observer:onError(message)
  732. end
  733. local function onComplete()
  734. return observer:onComplete()
  735. end
  736. return self:subscribe(onNext, onError, onComplete)
  737. end)
  738. end
  739. --- Returns an Observable that produces values from the original along with the most recently
  740. -- produced value from all other specified Observables. Note that only the first argument from each
  741. -- source Observable is used.
  742. -- @arg {Observable...} sources - The Observables to include the most recent values from.
  743. -- @returns {Observable}
  744. function Observable:with(...)
  745. local sources = {...}
  746. return Observable.create(function(observer)
  747. local latest = setmetatable({}, {__len = util.constant(#sources)})
  748. local function setLatest(i)
  749. return function(value)
  750. latest[i] = value
  751. end
  752. end
  753. local function onNext(value)
  754. return observer:onNext(value, util.unpack(latest))
  755. end
  756. local function onError(e)
  757. return observer:onError(e)
  758. end
  759. local function onComplete()
  760. return observer:onComplete()
  761. end
  762. for i = 1, #sources do
  763. sources[i]:subscribe(setLatest(i), util.noop, util.noop)
  764. end
  765. return self:subscribe(onNext, onError, onComplete)
  766. end)
  767. end
  768. --- Returns an Observable that buffers values from the original and produces them as multiple
  769. -- values.
  770. -- @arg {number} size - The size of the buffer.
  771. function Observable:wrap(size)
  772. return Observable.create(function(observer)
  773. local buffer = {}
  774. local function emit()
  775. if #buffer > 0 then
  776. observer:onNext(util.unpack(buffer))
  777. buffer = {}
  778. end
  779. end
  780. local function onNext(...)
  781. local values = {...}
  782. for i = 1, #values do
  783. table.insert(buffer, values[i])
  784. if #buffer >= size then
  785. emit()
  786. end
  787. end
  788. end
  789. local function onError(message)
  790. emit()
  791. return observer:onError(message)
  792. end
  793. local function onComplete()
  794. emit()
  795. return observer:onComplete()
  796. end
  797. return self:subscribe(onNext, onError, onComplete)
  798. end)
  799. end
  800. --- @class ImmediateScheduler
  801. -- @description Schedules Observables by running all operations immediately.
  802. local ImmediateScheduler = {}
  803. ImmediateScheduler.__index = ImmediateScheduler
  804. ImmediateScheduler.__tostring = util.constant('ImmediateScheduler')
  805. --- Creates a new ImmediateScheduler.
  806. -- @returns {ImmediateScheduler}
  807. function ImmediateScheduler.create()
  808. return setmetatable({}, ImmediateScheduler)
  809. end
  810. --- Schedules a function to be run on the scheduler. It is executed immediately.
  811. -- @arg {function} action - The function to execute.
  812. function ImmediateScheduler:schedule(action)
  813. action()
  814. end
  815. --- @class CooperativeScheduler
  816. -- @description Manages Observables using coroutines and a virtual clock that must be updated
  817. -- manually.
  818. local CooperativeScheduler = {}
  819. CooperativeScheduler.__index = CooperativeScheduler
  820. CooperativeScheduler.__tostring = util.constant('CooperativeScheduler')
  821. --- Creates a new CooperativeScheduler.
  822. -- @arg {number=0} currentTime - A time to start the scheduler at.
  823. -- @returns {Scheduler.CooperativeScheduler}
  824. function CooperativeScheduler.create(currentTime)
  825. local self = {
  826. tasks = {},
  827. currentTime = currentTime or 0
  828. }
  829. return setmetatable(self, CooperativeScheduler)
  830. end
  831. --- Schedules a function to be run after an optional delay.
  832. -- @arg {function} action - The function to execute. Will be converted into a coroutine. The
  833. -- coroutine may yield execution back to the scheduler with an optional
  834. -- number, which will put it to sleep for a time period.
  835. -- @arg {number=0} delay - Delay execution of the action by a time period.
  836. function CooperativeScheduler:schedule(action, delay)
  837. local task = {
  838. thread = coroutine.create(action),
  839. due = self.currentTime + (delay or 0)
  840. }
  841. table.insert(self.tasks, task)
  842. return Subscription.create(function()
  843. return self:unschedule(task)
  844. end)
  845. end
  846. function CooperativeScheduler:unschedule(task)
  847. for i = 1, #self.tasks do
  848. if self.tasks[i] == task then
  849. table.remove(self.tasks, i)
  850. end
  851. end
  852. end
  853. --- Triggers an update of the CooperativeScheduler. The clock will be advanced and the scheduler
  854. -- will run any coroutines that are due to be run.
  855. -- @arg {number=0} delta - An amount of time to advance the clock by. It is common to pass in the
  856. -- time in seconds or milliseconds elapsed since this function was last
  857. -- called.
  858. function CooperativeScheduler:update(delta)
  859. self.currentTime = self.currentTime + (delta or 0)
  860. for i = #self.tasks, 1, -1 do
  861. local task = self.tasks[i]
  862. if self.currentTime >= task.due then
  863. local success, delay = coroutine.resume(task.thread)
  864. if success then
  865. task.due = math.max(task.due + (delay or 0), self.currentTime)
  866. else
  867. error(delay)
  868. end
  869. if coroutine.status(task.thread) == 'dead' then
  870. table.remove(self.tasks, i)
  871. end
  872. end
  873. end
  874. end
  875. --- Returns whether or not the CooperativeScheduler's queue is empty.
  876. function CooperativeScheduler:isEmpty()
  877. return not next(self.tasks)
  878. end
  879. --- @class Subject
  880. -- @description Subjects function both as an Observer and as an Observable. Subjects inherit all
  881. -- Observable functions, including subscribe. Values can also be pushed to the Subject, which will
  882. -- be broadcasted to any subscribed Observers.
  883. local Subject = setmetatable({}, Observable)
  884. Subject.__index = Subject
  885. Subject.__tostring = util.constant('Subject')
  886. --- Creates a new Subject.
  887. -- @returns {Subject}
  888. function Subject.create()
  889. local self = {
  890. observers = {},
  891. stopped = false
  892. }
  893. return setmetatable(self, Subject)
  894. end
  895. --- Creates a new Observer and attaches it to the Subject.
  896. -- @arg {function|table} onNext|observer - A function called when the Subject produces a value or
  897. -- an existing Observer to attach to the Subject.
  898. -- @arg {function} onError - Called when the Subject terminates due to an error.
  899. -- @arg {function} onComplete - Called when the Subject completes normally.
  900. function Subject:subscribe(onNext, onError, onComplete)
  901. local observer
  902. if type(onNext) == 'table' then
  903. observer = onNext
  904. else
  905. observer = Observer.create(onNext, onError, onComplete)
  906. end
  907. table.insert(self.observers, observer)
  908. end
  909. --- Pushes zero or more values to the Subject. They will be broadcasted to all Observers.
  910. -- @arg {*...} values
  911. function Subject:onNext(...)
  912. if not self.stopped then
  913. for i = 1, #self.observers do
  914. self.observers[i]:onNext(...)
  915. end
  916. end
  917. end
  918. --- Signal to all Observers that an error has occurred.
  919. -- @arg {string=} message - A string describing what went wrong.
  920. function Subject:onError(message)
  921. if not self.stopped then
  922. for i = 1, #self.observers do
  923. self.observers[i]:onError(message)
  924. end
  925. self.stopped = true
  926. end
  927. end
  928. --- Signal to all Observers that the Subject will not produce any more values.
  929. function Subject:onComplete()
  930. if not self.stopped then
  931. for i = 1, #self.observers do
  932. self.observers[i]:onComplete()
  933. end
  934. self.stopped = true
  935. end
  936. end
  937. Subject.__call = Subject.onNext
  938. --- @class BehaviorSubject
  939. -- @description A Subject that tracks its current value. Provides an accessor to retrieve the most
  940. -- recent pushed value, and all subscribers immediately receive the latest value.
  941. local BehaviorSubject = setmetatable({}, Subject)
  942. BehaviorSubject.__index = BehaviorSubject
  943. BehaviorSubject.__tostring = util.constant('BehaviorSubject')
  944. --- Creates a new BehaviorSubject.
  945. -- @arg {*...} value - The initial values.
  946. -- @returns {Subject}
  947. function BehaviorSubject.create(...)
  948. local self = {
  949. observers = {},
  950. stopped = false
  951. }
  952. if select('#', ...) > 0 then
  953. self.value = util.pack(...)
  954. end
  955. return setmetatable(self, BehaviorSubject)
  956. end
  957. --- Creates a new Observer and attaches it to the Subject. Immediately broadcasts the most recent
  958. -- value to the Observer.
  959. -- @arg {function} onNext - Called when the Subject produces a value.
  960. -- @arg {function} onError - Called when the Subject terminates due to an error.
  961. -- @arg {function} onComplete - Called when the Subject completes normally.
  962. function BehaviorSubject:subscribe(onNext, onError, onComplete)
  963. local observer = Observer.create(onNext, onError, onComplete)
  964. Subject.subscribe(self, observer)
  965. if self.value then
  966. observer:onNext(unpack(self.value))
  967. end
  968. end
  969. --- Pushes zero or more values to the BehaviorSubject. They will be broadcasted to all Observers.
  970. -- @arg {*...} values
  971. function BehaviorSubject:onNext(...)
  972. self.value = util.pack(...)
  973. return Subject.onNext(self, ...)
  974. end
  975. --- Returns the last value emitted by the Subject, or the initial value passed to the constructor
  976. -- if nothing has been emitted yet.
  977. -- @returns {*...}
  978. function BehaviorSubject:getValue()
  979. return self.value and util.unpack(self.value)
  980. end
  981. return {
  982. util = util,
  983. Subscription = Subscription,
  984. Observer = Observer,
  985. Observable = Observable,
  986. ImmediateScheduler = ImmediateScheduler,
  987. CooperativeScheduler = CooperativeScheduler,
  988. Subject = Subject,
  989. BehaviorSubject = BehaviorSubject
  990. }