rx.lua 43 KB


  1. -- RxLua v0.0.1
  2. -- https://github.com/bjornbytes/rxlua
  3. -- MIT License
  4. local util = {}
  5. util.pack = table.pack or function(...) return { n = select('#', ...), ... } end
  6. util.unpack = table.unpack or unpack
  7. util.eq = function(x, y) return x == y end
  8. util.noop = function() end
  9. util.identity = function(x) return x end
  10. util.constant = function(x) return function() return x end end
  11. --- @class Subscription
  12. -- @description A handle representing the link between an Observer and an Observable, as well as any
  13. -- work required to clean up after the Observable completes or the Observer unsubscribes.
  14. local Subscription = {}
  15. Subscription.__index = Subscription
  16. Subscription.__tostring = util.constant('Subscription')
  17. --- Creates a new Subscription.
  18. -- @arg {function=} action - The action to run when the subscription is unsubscribed. It will only
  19. -- be run once.
  20. -- @returns {Subscription}
  21. function Subscription.create(action)
  22. local self = {
  23. action = action or util.noop,
  24. unsubscribed = false
  25. }
  26. return setmetatable(self, Subscription)
  27. end
  28. --- Unsubscribes the subscription, performing any necessary cleanup work.
  29. function Subscription:unsubscribe()
  30. if self.unsubscribed then return end
  31. self.action(self)
  32. self.unsubscribed = true
  33. end
  34. --- @class Observer
  35. -- @description Observers are simple objects that receive values from Observables.
  36. local Observer = {}
  37. Observer.__index = Observer
  38. Observer.__tostring = util.constant('Observer')
  39. --- Creates a new Observer.
  40. -- @arg {function=} onNext - Called when the Observable produces a value.
  41. -- @arg {function=} onError - Called when the Observable terminates due to an error.
  42. -- @arg {function=} onCompleted - Called when the Observable completes normally.
  43. -- @returns {Observer}
  44. function Observer.create(onNext, onError, onCompleted)
  45. local self = {
  46. _onNext = onNext or util.noop,
  47. _onError = onError or error,
  48. _onCompleted = onCompleted or util.noop,
  49. stopped = false
  50. }
  51. return setmetatable(self, Observer)
  52. end
  53. --- Pushes zero or more values to the Observer.
  54. -- @arg {*...} values
  55. function Observer:onNext(...)
  56. if not self.stopped then
  57. self._onNext(...)
  58. end
  59. end
  60. --- Notify the Observer that an error has occurred.
  61. -- @arg {string=} message - A string describing what went wrong.
  62. function Observer:onError(message)
  63. if not self.stopped then
  64. self.stopped = true
  65. self._onError(message)
  66. end
  67. end
  68. --- Notify the Observer that the sequence has completed and will produce no more values.
  69. function Observer:onCompleted()
  70. if not self.stopped then
  71. self.stopped = true
  72. self._onCompleted()
  73. end
  74. end
  75. --- @class Observable
  76. -- @description Observables push values to Observers.
  77. local Observable = {}
  78. Observable.__index = Observable
  79. Observable.__tostring = util.constant('Observable')
  80. --- Creates a new Observable.
  81. -- @arg {function} subscribe - The subscription function that produces values.
  82. -- @returns {Observable}
  83. function Observable.create(subscribe)
  84. local self = {
  85. _subscribe = subscribe
  86. }
  87. return setmetatable(self, Observable)
  88. end
  89. --- Shorthand for creating an Observer and passing it to this Observable's subscription function.
  90. -- @arg {function} onNext - Called when the Observable produces a value.
  91. -- @arg {function} onError - Called when the Observable terminates due to an error.
  92. -- @arg {function} onCompleted - Called when the Observable completes normally.
  93. function Observable:subscribe(onNext, onError, onCompleted)
  94. if type(onNext) == 'table' then
  95. return self._subscribe(onNext)
  96. else
  97. return self._subscribe(Observer.create(onNext, onError, onCompleted))
  98. end
  99. end
  100. --- Returns an Observable that immediately completes without producing a value.
  101. function Observable.empty()
  102. return Observable.create(function(observer)
  103. observer:onCompleted()
  104. end)
  105. end
  106. --- Returns an Observable that never produces values and never completes.
  107. function Observable.never()
  108. return Observable.create(function(observer) end)
  109. end
  110. --- Returns an Observable that immediately produces an error.
  111. function Observable.throw(message)
  112. return Observable.create(function(observer)
  113. observer:onError(message)
  114. end)
  115. end
  116. --- Creates an Observable that produces a single value.
  117. -- @arg {*} value
  118. -- @returns {Observable}
  119. function Observable.fromValue(value)
  120. return Observable.create(function(observer)
  121. observer:onNext(value)
  122. observer:onCompleted()
  123. end)
  124. end
  125. --- Creates an Observable that produces a range of values in a manner similar to a Lua for loop.
  126. -- @arg {number} initial - The first value of the range, or the upper limit if no other arguments
  127. -- are specified.
  128. -- @arg {number=} limit - The second value of the range.
  129. -- @arg {number=1} step - An amount to increment the value by each iteration.
  130. -- @returns {Observable}
  131. function Observable.fromRange(initial, limit, step)
  132. if not limit and not step then
  133. initial, limit = 1, initial
  134. end
  135. step = step or 1
  136. return Observable.create(function(observer)
  137. for i = initial, limit, step do
  138. observer:onNext(i)
  139. end
  140. observer:onCompleted()
  141. end)
  142. end
  143. --- Creates an Observable that produces values from a table.
  144. -- @arg {table} table - The table used to create the Observable.
  145. -- @arg {function=pairs} iterator - An iterator used to iterate the table, e.g. pairs or ipairs.
  146. -- @arg {boolean} keys - Whether or not to also emit the keys of the table.
  147. -- @returns {Observable}
  148. function Observable.fromTable(t, iterator, keys)
  149. iterator = iterator or pairs
  150. return Observable.create(function(observer)
  151. for key, value in iterator(t) do
  152. observer:onNext(value, keys and key or nil)
  153. end
  154. observer:onCompleted()
  155. end)
  156. end
  157. --- Creates an Observable that produces values when the specified coroutine yields.
  158. -- @arg {thread} coroutine
  159. -- @returns {Observable}
  160. function Observable.fromCoroutine(thread, scheduler)
  161. thread = type(thread) == 'function' and coroutine.create(thread) or thread
  162. return Observable.create(function(observer)
  163. return scheduler:schedule(function()
  164. while not observer.stopped do
  165. local success, value = coroutine.resume(thread)
  166. if success then
  167. observer:onNext(value)
  168. else
  169. return observer:onError(value)
  170. end
  171. if coroutine.status(thread) == 'dead' then
  172. return observer:onCompleted()
  173. end
  174. coroutine.yield()
  175. end
  176. end)
  177. end)
  178. end
  179. --- Creates an Observable that creates a new Observable for each observer using a factory function.
  180. -- @arg {function} factory - A function that returns an Observable.
  181. -- @returns {Observable}
  182. function Observable.defer(fn)
  183. return setmetatable({
  184. subscribe = function(_, ...)
  185. local observable = fn()
  186. return observable:subscribe(...)
  187. end
  188. }, Observable)
  189. end
  190. --- Subscribes to this Observable and prints values it produces.
  191. -- @arg {string=} name - Prefixes the printed messages with a name.
  192. -- @arg {function=tostring} formatter - A function that formats one or more values to be printed.
  193. function Observable:dump(name, formatter)
  194. name = name and (name .. ' ') or ''
  195. formatter = formatter or tostring
  196. local onNext = function(...) print(name .. 'onNext: ' .. formatter(...)) end
  197. local onError = function(e) print(name .. 'onError: ' .. e) end
  198. local onCompleted = function() print(name .. 'onCompleted') end
  199. return self:subscribe(onNext, onError, onCompleted)
  200. end
  201. --- Determine whether all items emitted by an Observable meet some criteria.
  202. -- @arg {function=identity} predicate - The predicate used to evaluate objects.
  203. function Observable:all(predicate)
  204. predicate = predicate or util.identity
  205. return Observable.create(function(observer)
  206. local function onNext(...)
  207. if not predicate(...) then
  208. observer:onNext(false)
  209. observer:onCompleted()
  210. end
  211. end
  212. local function onError(e)
  213. return observer:onError(e)
  214. end
  215. local function onCompleted()
  216. observer:onNext(true)
  217. return observer:onCompleted()
  218. end
  219. return self:subscribe(onNext, onError, onCompleted)
  220. end)
  221. end
  222. --- Given a set of Observables, produces values from only the first one to produce a value.
  223. -- @arg {Observable...} observables
  224. -- @returns {Observable}
  225. function Observable.amb(a, b, ...)
  226. if not a or not b then return a end
  227. return Observable.create(function(observer)
  228. local subscriptionA, subscriptionB
  229. local function onNextA(...)
  230. if subscriptionB then subscriptionB:unsubscribe() end
  231. observer:onNext(...)
  232. end
  233. local function onErrorA(e)
  234. if subscriptionB then subscriptionB:unsubscribe() end
  235. observer:onError(e)
  236. end
  237. local function onCompletedA()
  238. if subscriptionB then subscriptionB:unsubscribe() end
  239. observer:onCompleted()
  240. end
  241. local function onNextB(...)
  242. if subscriptionA then subscriptionA:unsubscribe() end
  243. observer:onNext(...)
  244. end
  245. local function onErrorB(e)
  246. if subscriptionA then subscriptionA:unsubscribe() end
  247. observer:onError(e)
  248. end
  249. local function onCompletedB()
  250. if subscriptionA then subscriptionA:unsubscribe() end
  251. observer:onCompleted()
  252. end
  253. subscriptionA = a:subscribe(onNextA, onErrorA, onCompletedA)
  254. subscriptionB = b:subscribe(onNextB, onErrorB, onCompletedB)
  255. return Subscription.create(function()
  256. subscriptionA:unsubscribe()
  257. subscriptionB:unsubscribe()
  258. end)
  259. end):amb(...)
  260. end
  261. --- Returns an Observable that produces the average of all values produced by the original.
  262. -- @returns {Observable}
  263. function Observable:average()
  264. return Observable.create(function(observer)
  265. local sum, count = 0, 0
  266. local function onNext(value)
  267. sum = sum + value
  268. count = count + 1
  269. end
  270. local function onError(e)
  271. observer:onError(e)
  272. end
  273. local function onCompleted()
  274. if count > 0 then
  275. observer:onNext(sum / count)
  276. end
  277. observer:onCompleted()
  278. end
  279. return self:subscribe(onNext, onError, onCompleted)
  280. end)
  281. end
  282. --- Returns an Observable that buffers values from the original and produces them as multiple
  283. -- values.
  284. -- @arg {number} size - The size of the buffer.
  285. function Observable:buffer(size)
  286. return Observable.create(function(observer)
  287. local buffer = {}
  288. local function emit()
  289. if #buffer > 0 then
  290. observer:onNext(util.unpack(buffer))
  291. buffer = {}
  292. end
  293. end
  294. local function onNext(...)
  295. local values = {...}
  296. for i = 1, #values do
  297. table.insert(buffer, values[i])
  298. if #buffer >= size then
  299. emit()
  300. end
  301. end
  302. end
  303. local function onError(message)
  304. emit()
  305. return observer:onError(message)
  306. end
  307. local function onCompleted()
  308. emit()
  309. return observer:onCompleted()
  310. end
  311. return self:subscribe(onNext, onError, onCompleted)
  312. end)
  313. end
  314. --- Returns an Observable that intercepts any errors from the previous and replace them with values
  315. -- produced by a new Observable.
  316. -- @arg {function|Observable} handler - An Observable or a function that returns an Observable to
  317. -- replace the source Observable in the event of an error.
  318. -- @returns {Observable}
  319. function Observable:catch(handler)
  320. handler = handler and (type(handler) == 'function' and handler or util.constant(handler))
  321. return Observable.create(function(observer)
  322. local subscription
  323. local function onNext(...)
  324. return observer:onNext(...)
  325. end
  326. local function onError(e)
  327. if not handler then
  328. return observer:onCompleted()
  329. end
  330. local continue = handler(e)
  331. if continue then
  332. if subscription then subscription:unsubscribe() end
  333. continue:subscribe(observer)
  334. else
  335. observer:onError(e)
  336. end
  337. end
  338. local function onCompleted()
  339. observer:onCompleted()
  340. end
  341. subscription = self:subscribe(onNext, onError, onCompleted)
  342. return subscription
  343. end)
  344. end
  345. --- Returns a new Observable that runs a combinator function on the most recent values from a set
  346. -- of Observables whenever any of them produce a new value. The results of the combinator function
  347. -- are produced by the new Observable.
  348. -- @arg {Observable...} observables - One or more Observables to combine.
  349. -- @arg {function} combinator - A function that combines the latest result from each Observable and
  350. -- returns a single value.
  351. -- @returns {Observable}
  352. function Observable:combineLatest(...)
  353. local sources = {...}
  354. local combinator = table.remove(sources)
  355. if type(combinator) ~= 'function' then
  356. table.insert(sources, combinator)
  357. combinator = function(...) return ... end
  358. end
  359. table.insert(sources, 1, self)
  360. return Observable.create(function(observer)
  361. local latest = {}
  362. local pending = {util.unpack(sources)}
  363. local completed = {}
  364. local function onNext(i)
  365. return function(value)
  366. latest[i] = value
  367. pending[i] = nil
  368. if not next(pending) then
  369. observer:onNext(combinator(util.unpack(latest)))
  370. end
  371. end
  372. end
  373. local function onError(e)
  374. return observer:onError(e)
  375. end
  376. local function onCompleted(i)
  377. return function()
  378. table.insert(completed, i)
  379. if #completed == #sources then
  380. observer:onCompleted()
  381. end
  382. end
  383. end
  384. for i = 1, #sources do
  385. sources[i]:subscribe(onNext(i), onError, onCompleted(i))
  386. end
  387. end)
  388. end
  389. --- Returns a new Observable that produces the values of the first with falsy values removed.
  390. -- @returns {Observable}
  391. function Observable:compact()
  392. return self:filter(util.identity)
  393. end
  394. --- Returns a new Observable that produces the values produced by all the specified Observables in
  395. -- the order they are specified.
  396. -- @arg {Observable...} sources - The Observables to concatenate.
  397. -- @returns {Observable}
  398. function Observable:concat(other, ...)
  399. if not other then return self end
  400. local others = {...}
  401. return Observable.create(function(observer)
  402. local function onNext(...)
  403. return observer:onNext(...)
  404. end
  405. local function onError(message)
  406. return observer:onError(message)
  407. end
  408. local function onCompleted()
  409. return observer:onCompleted()
  410. end
  411. local function chain()
  412. return other:concat(util.unpack(others)):subscribe(onNext, onError, onCompleted)
  413. end
  414. return self:subscribe(onNext, onError, chain)
  415. end)
  416. end
  417. --- Returns a new Observable that produces a single boolean value representing whether or not the
  418. -- specified value was produced by the original.
  419. -- @arg {*} value - The value to search for. == is used for equality testing.
  420. -- @returns {Observable}
  421. function Observable:contains(value)
  422. return Observable.create(function(observer)
  423. local subscription
  424. local function onNext(...)
  425. local args = util.pack(...)
  426. if #args == 0 and value == nil then
  427. observer:onNext(true)
  428. if subscription then subscription:unsubscribe() end
  429. return observer:onCompleted()
  430. end
  431. for i = 1, #args do
  432. if args[i] == value then
  433. observer:onNext(true)
  434. if subscription then subscription:unsubscribe() end
  435. return observer:onCompleted()
  436. end
  437. end
  438. end
  439. local function onError(e)
  440. return observer:onError(e)
  441. end
  442. local function onCompleted()
  443. observer:onNext(false)
  444. return observer:onCompleted()
  445. end
  446. subscription = self:subscribe(onNext, onError, onCompleted)
  447. return subscription
  448. end)
  449. end
  450. --- Returns an Observable that produces a single value representing the number of values produced
  451. -- by the source value that satisfy an optional predicate.
  452. -- @arg {function=} predicate - The predicate used to match values.
  453. function Observable:count(predicate)
  454. predicate = predicate or util.constant(true)
  455. return Observable.create(function(observer)
  456. local count = 0
  457. local function onNext(...)
  458. if predicate(...) then
  459. count = count + 1
  460. end
  461. end
  462. local function onError(e)
  463. return observer:onError(e)
  464. end
  465. local function onCompleted()
  466. observer:onNext(count)
  467. observer:onCompleted()
  468. end
  469. return self:subscribe(onNext, onError, onCompleted)
  470. end)
  471. end
  472. --- Returns a new Observable that produces a default set of items if the source Observable produces
  473. -- no values.
  474. -- @arg {*...} values - Zero or more values to produce if the source completes without emitting
  475. -- anything.
  476. -- @returns {Observable}
  477. function Observable:defaultIfEmpty(...)
  478. local defaults = util.pack(...)
  479. return Observable.create(function(observer)
  480. local hasValue = false
  481. local function onNext(...)
  482. hasValue = true
  483. observer:onNext(...)
  484. end
  485. local function onError(e)
  486. observer:onError(e)
  487. end
  488. local function onCompleted()
  489. if not hasValue then
  490. observer:onNext(util.unpack(defaults))
  491. end
  492. observer:onCompleted()
  493. end
  494. return self:subscribe(onNext, onError, onCompleted)
  495. end)
  496. end
  497. --- Returns a new Observable that produces the values from the original with duplicates removed.
  498. -- @returns {Observable}
  499. function Observable:distinct()
  500. return Observable.create(function(observer)
  501. local values = {}
  502. local function onNext(x)
  503. if not values[x] then
  504. observer:onNext(x)
  505. end
  506. values[x] = true
  507. end
  508. local function onError(e)
  509. return observer:onError(e)
  510. end
  511. local function onCompleted()
  512. return observer:onCompleted()
  513. end
  514. return self:subscribe(onNext, onError, onCompleted)
  515. end)
  516. end
  517. --- Returns an Observable that only produces values from the original if they are different from
  518. -- the previous value.
  519. -- @arg {function} comparator - A function used to compare 2 values. If unspecified, == is used.
  520. -- @returns {Observable}
  521. function Observable:distinctUntilChanged(comparator)
  522. comparator = comparator or util.eq
  523. return Observable.create(function(observer)
  524. local first = true
  525. local currentValue = nil
  526. local function onNext(value, ...)
  527. if first or not comparator(value, currentValue) then
  528. observer:onNext(value, ...)
  529. currentValue = value
  530. first = false
  531. end
  532. end
  533. local function onError(message)
  534. return observer:onError(onError)
  535. end
  536. local function onCompleted()
  537. return observer:onCompleted()
  538. end
  539. return self:subscribe(onNext, onError, onCompleted)
  540. end)
  541. end
  542. --- Returns an Observable that produces the nth element produced by the source Observable.
  543. -- @arg {number} index - The index of the item, with an index of 1 representing the first.
  544. -- @returns {Observable}
  545. function Observable:elementAt(index)
  546. return Observable.create(function(observer)
  547. local subscription
  548. local i = 1
  549. local function onNext(...)
  550. if i == index then
  551. observer:onNext(...)
  552. observer:onCompleted()
  553. if subscription then
  554. subscription:unsubscribe()
  555. end
  556. else
  557. i = i + 1
  558. end
  559. end
  560. local function onError(e)
  561. return observer:onError(e)
  562. end
  563. local function onCompleted()
  564. return observer:onCompleted()
  565. end
  566. subscription = self:subscribe(onNext, onError, onCompleted)
  567. return subscription
  568. end)
  569. end
  570. --- Returns a new Observable that only produces values of the first that satisfy a predicate.
  571. -- @arg {function} predicate - The predicate used to filter values.
  572. -- @returns {Observable}
  573. function Observable:filter(predicate)
  574. predicate = predicate or util.identity
  575. return Observable.create(function(observer)
  576. local function onNext(...)
  577. if predicate(...) then
  578. return observer:onNext(...)
  579. end
  580. end
  581. local function onError(e)
  582. return observer:onError(e)
  583. end
  584. local function onCompleted()
  585. return observer:onCompleted(e)
  586. end
  587. return self:subscribe(onNext, onError, onCompleted)
  588. end)
  589. end
  590. --- Returns a new Observable that produces the first value of the original that satisfies a
  591. -- predicate.
  592. -- @arg {function} predicate - The predicate used to find a value.
  593. function Observable:find(predicate)
  594. predicate = predicate or util.identity
  595. return Observable.create(function(observer)
  596. local function onNext(...)
  597. if predicate(...) then
  598. observer:onNext(...)
  599. return observer:onCompleted()
  600. end
  601. end
  602. local function onError(message)
  603. return observer:onError(e)
  604. end
  605. local function onCompleted()
  606. return observer:onCompleted()
  607. end
  608. return self:subscribe(onNext, onError, onCompleted)
  609. end)
  610. end
  611. --- Returns a new Observable that only produces the first result of the original.
  612. -- @returns {Observable}
  613. function Observable:first()
  614. return self:take(1)
  615. end
  616. --- Returns a new Observable that transform the items emitted by an Observable into Observables,
  617. -- then flatten the emissions from those into a single Observable
  618. -- @arg {function} callback - The function to transform values from the original Observable.
  619. -- @returns {Observable}
  620. function Observable:flatMap(callback)
  621. callback = callback or util.identity
  622. return self:map(callback):flatten()
  623. end
  624. --- Returns a new Observable that uses a callback to create Observables from the values produced by
  625. -- the source, then produces values from the most recent of these Observables.
  626. -- @arg {function=identity} callback - The function used to convert values to Observables.
  627. -- @returns {Observable}
  628. function Observable:flatMapLatest(callback)
  629. callback = callback or util.identity
  630. return Observable.create(function(observer)
  631. local innerSubscription
  632. local function onNext(...)
  633. observer:onNext(...)
  634. end
  635. local function onError(e)
  636. return observer:onError(e)
  637. end
  638. local function onCompleted()
  639. return observer:onCompleted()
  640. end
  641. local function subscribeInner(...)
  642. if innerSubscription then
  643. innerSubscription:unsubscribe()
  644. end
  645. innerSubscription = callback(...):subscribe(onNext, onError)
  646. end
  647. local subscription = self:subscribe(subscribeInner, onError, onCompleted)
  648. return Subscription.create(function()
  649. if innerSubscription then
  650. innerSubscription:unsubscribe()
  651. end
  652. if subscription then
  653. subscription:unsubscribe()
  654. end
  655. end)
  656. end)
  657. end
  658. --- Returns a new Observable that subscribes to the Observables produced by the original and
  659. -- produces their values.
  660. -- @returns {Observable}
  661. function Observable:flatten()
  662. return Observable.create(function(observer)
  663. local function onError(message)
  664. return observer:onError(message)
  665. end
  666. local function onNext(observable)
  667. local function innerOnNext(...)
  668. observer:onNext(...)
  669. end
  670. observable:subscribe(innerOnNext, onError, util.noop)
  671. end
  672. local function onCompleted()
  673. return observer:onCompleted()
  674. end
  675. return self:subscribe(onNext, onError, onCompleted)
  676. end)
  677. end
  678. --- Returns a new Observable that only produces the last result of the original.
  679. -- @returns {Observable}
  680. function Observable:last()
  681. return Observable.create(function(observer)
  682. local value
  683. local empty = true
  684. local function onNext(...)
  685. value = {...}
  686. empty = false
  687. end
  688. local function onError(e)
  689. return observer:onError(e)
  690. end
  691. local function onCompleted()
  692. if not empty then
  693. observer:onNext(util.unpack(value or {}))
  694. end
  695. return observer:onCompleted()
  696. end
  697. return self:subscribe(onNext, onError, onCompleted)
  698. end)
  699. end
  700. --- Returns a new Observable that produces the values of the original transformed by a function.
  701. -- @arg {function} callback - The function to transform values from the original Observable.
  702. -- @returns {Observable}
  703. function Observable:map(callback)
  704. return Observable.create(function(observer)
  705. callback = callback or util.identity
  706. local function onNext(...)
  707. return observer:onNext(callback(...))
  708. end
  709. local function onError(e)
  710. return observer:onError(e)
  711. end
  712. local function onCompleted()
  713. return observer:onCompleted()
  714. end
  715. return self:subscribe(onNext, onError, onCompleted)
  716. end)
  717. end
  718. --- Returns a new Observable that produces the maximum value produced by the original.
  719. -- @returns {Observable}
  720. function Observable:max()
  721. return self:reduce(math.max)
  722. end
  723. --- Returns a new Observable that produces the values produced by all the specified Observables in
  724. -- the order they are produced.
  725. -- @arg {Observable...} sources - One or more Observables to merge.
  726. -- @returns {Observable}
  727. function Observable:merge(...)
  728. local sources = {...}
  729. table.insert(sources, 1, self)
  730. return Observable.create(function(observer)
  731. local function onNext(...)
  732. return observer:onNext(...)
  733. end
  734. local function onError(message)
  735. return observer:onError(message)
  736. end
  737. local function onCompleted(i)
  738. return function()
  739. sources[i] = nil
  740. if not next(sources) then
  741. observer:onCompleted()
  742. end
  743. end
  744. end
  745. for i = 1, #sources do
  746. sources[i]:subscribe(onNext, onError, onCompleted(i))
  747. end
  748. end)
  749. end
  750. --- Returns a new Observable that produces the minimum value produced by the original.
  751. -- @returns {Observable}
  752. function Observable:min()
  753. return self:reduce(math.min)
  754. end
  755. --- Returns an Observable that produces the values of the original inside tables.
  756. -- @returns {Observable}
  757. function Observable:pack()
  758. return self:map(util.pack)
  759. end
  760. --- Returns two Observables: one that produces values for which the predicate returns truthy for,
  761. -- and another that produces values for which the predicate returns falsy.
  762. -- @arg {function} predicate - The predicate used to partition the values.
  763. -- @returns {Observable}
  764. -- @returns {Observable}
  765. function Observable:partition(predicate)
  766. return self:filter(predicate), self:reject(predicate)
  767. end
  768. --- Returns a new Observable that produces values computed by extracting the given keys from the
  769. -- tables produced by the original.
  770. -- @arg {string...} keys - The key to extract from the table. Multiple keys can be specified to
  771. -- recursively pluck values from nested tables.
  772. -- @returns {Observable}
  773. function Observable:pluck(key, ...)
  774. if not key then return self end
  775. return Observable.create(function(observer)
  776. local function onNext(t)
  777. return observer:onNext(t[key])
  778. end
  779. local function onError(e)
  780. return observer:onError(e)
  781. end
  782. local function onCompleted()
  783. return observer:onCompleted()
  784. end
  785. return self:subscribe(onNext, onError, onCompleted)
  786. end):pluck(...)
  787. end
  788. --- Returns a new Observable that produces a single value computed by accumulating the results of
  789. -- running a function on each value produced by the original Observable.
  790. -- @arg {function} accumulator - Accumulates the values of the original Observable. Will be passed
  791. -- the return value of the last call as the first argument and the
  792. -- current values as the rest of the arguments.
  793. -- @arg {*} seed - A value to pass to the accumulator the first time it is run.
  794. -- @returns {Observable}
  795. function Observable:reduce(accumulator, seed)
  796. return Observable.create(function(observer)
  797. local result = seed
  798. local first = true
  799. local function onNext(...)
  800. if first and seed == nil then
  801. result = ...
  802. first = false
  803. else
  804. result = accumulator(result, ...)
  805. end
  806. end
  807. local function onError(e)
  808. return observer:onError(e)
  809. end
  810. local function onCompleted()
  811. observer:onNext(result)
  812. return observer:onCompleted()
  813. end
  814. return self:subscribe(onNext, onError, onCompleted)
  815. end)
  816. end
  817. --- Returns a new Observable that produces values from the original which do not satisfy a
  818. -- predicate.
  819. -- @arg {function} predicate - The predicate used to reject values.
  820. -- @returns {Observable}
  821. function Observable:reject(predicate)
  822. predicate = predicate or util.identity
  823. return Observable.create(function(observer)
  824. local function onNext(...)
  825. if not predicate(...) then
  826. return observer:onNext(...)
  827. end
  828. end
  829. local function onError(e)
  830. return observer:onError(e)
  831. end
  832. local function onCompleted()
  833. return observer:onCompleted(e)
  834. end
  835. return self:subscribe(onNext, onError, onCompleted)
  836. end)
  837. end
  838. --- Returns a new Observable that skips over a specified number of values produced by the original
  839. -- and produces the rest.
  840. -- @arg {number=1} n - The number of values to ignore.
  841. -- @returns {Observable}
  842. function Observable:skip(n)
  843. n = n or 1
  844. return Observable.create(function(observer)
  845. local i = 1
  846. local function onNext(...)
  847. if i > n then
  848. observer:onNext(...)
  849. else
  850. i = i + 1
  851. end
  852. end
  853. local function onError(e)
  854. return observer:onError(e)
  855. end
  856. local function onCompleted()
  857. return observer:onCompleted()
  858. end
  859. return self:subscribe(onNext, onError, onCompleted)
  860. end)
  861. end
  862. --- Returns a new Observable that skips over values produced by the original until the specified
  863. -- Observable produces a value.
  864. -- @arg {Observable} other - The Observable that triggers the production of values.
  865. -- @returns {Observable}
  866. function Observable:skipUntil(other)
  867. return Observable.create(function(observer)
  868. local triggered = false
  869. local function trigger()
  870. triggered = true
  871. end
  872. other:subscribe(trigger, trigger, trigger)
  873. local function onNext(...)
  874. if triggered then
  875. observer:onNext(...)
  876. end
  877. end
  878. local function onError()
  879. if triggered then
  880. observer:onError()
  881. end
  882. end
  883. local function onCompleted()
  884. if triggered then
  885. observer:onCompleted()
  886. end
  887. end
  888. return self:subscribe(onNext, onError, onCompleted)
  889. end)
  890. end
  891. --- Returns a new Observable that skips elements until the predicate returns falsy for one of them.
  892. -- @arg {function} predicate - The predicate used to continue skipping values.
  893. -- @returns {Observable}
  894. function Observable:skipWhile(predicate)
  895. predicate = predicate or util.identity
  896. return Observable.create(function(observer)
  897. local skipping = true
  898. local function onNext(...)
  899. if skipping then
  900. skipping = predicate(...)
  901. end
  902. if not skipping then
  903. return observer:onNext(...)
  904. end
  905. end
  906. local function onError(message)
  907. return observer:onError(message)
  908. end
  909. local function onCompleted()
  910. return observer:onCompleted()
  911. end
  912. return self:subscribe(onNext, onError, onCompleted)
  913. end)
  914. end
  915. --- Returns an Observable that produces a single value representing the sum of the values produced
  916. -- by the original.
  917. -- @returns {Observable}
  918. function Observable:sum()
  919. return self:reduce(function(x, y) return x + y end, 0)
  920. end
  921. --- Returns a new Observable that only produces the first n results of the original.
  922. -- @arg {number=1} n - The number of elements to produce before completing.
  923. -- @returns {Observable}
  924. function Observable:take(n)
  925. n = n or 1
  926. return Observable.create(function(observer)
  927. if n <= 0 then
  928. observer:onCompleted()
  929. return
  930. end
  931. local i = 1
  932. local function onNext(...)
  933. observer:onNext(...)
  934. i = i + 1
  935. if i > n then
  936. observer:onCompleted()
  937. end
  938. end
  939. local function onError(e)
  940. return observer:onError(e)
  941. end
  942. local function onCompleted()
  943. return observer:onCompleted()
  944. end
  945. return self:subscribe(onNext, onError, onCompleted)
  946. end)
  947. end
  948. --- Returns a new Observable that completes when the specified Observable fires.
  949. -- @arg {Observable} other - The Observable that triggers completion of the original.
  950. -- @returns {Observable}
  951. function Observable:takeUntil(other)
  952. return Observable.create(function(observer)
  953. local function onNext(...)
  954. return observer:onNext(...)
  955. end
  956. local function onError(e)
  957. return observer:onError(e)
  958. end
  959. local function onCompleted()
  960. return observer:onCompleted()
  961. end
  962. other:subscribe(onCompleted, onCompleted, onCompleted)
  963. return self:subscribe(onNext, onError, onCompleted)
  964. end)
  965. end
  966. --- Returns a new Observable that produces elements until the predicate returns falsy.
  967. -- @arg {function} predicate - The predicate used to continue production of values.
  968. -- @returns {Observable}
  969. function Observable:takeWhile(predicate)
  970. predicate = predicate or util.identity
  971. return Observable.create(function(observer)
  972. local taking = true
  973. local function onNext(...)
  974. if taking then
  975. taking = predicate(...)
  976. if taking then
  977. return observer:onNext(...)
  978. else
  979. return observer:onCompleted()
  980. end
  981. end
  982. end
  983. local function onError(message)
  984. return observer:onError(message)
  985. end
  986. local function onCompleted()
  987. return observer:onCompleted()
  988. end
  989. return self:subscribe(onNext, onError, onCompleted)
  990. end)
  991. end
  992. --- Runs a function each time this Observable has activity. Similar to subscribe but does not
  993. -- create a subscription.
  994. -- @arg {function=} onNext - Run when the Observable produces values.
  995. -- @arg {function=} onError - Run when the Observable encounters a problem.
  996. -- @arg {function=} onCompleted - Run when the Observable completes.
  997. -- @returns {Observable}
  998. function Observable:tap(_onNext, _onError, _onCompleted)
  999. _onNext = _onNext or util.noop
  1000. _onError = _onError or util.noop
  1001. _onCompleted = _onCompleted or util.noop
  1002. return Observable.create(function(observer)
  1003. local function onNext(...)
  1004. _onNext(...)
  1005. return observer:onNext(...)
  1006. end
  1007. local function onError(message)
  1008. _onError(message)
  1009. return observer:onError(message)
  1010. end
  1011. local function onCompleted()
  1012. _onCompleted()
  1013. return observer:onCompleted()
  1014. end
  1015. return self:subscribe(onNext, onError, onCompleted)
  1016. end)
  1017. end
  1018. --- Returns an Observable that unpacks the tables produced by the original.
  1019. -- @returns {Observable}
  1020. function Observable:unpack()
  1021. return self:map(util.unpack)
  1022. end
  1023. --- Returns an Observable that takes any values produced by the original that consist of multiple
  1024. -- return values and produces each value individually.
  1025. -- @returns {Observable}
  1026. function Observable:unwrap()
  1027. return Observable.create(function(observer)
  1028. local function onNext(...)
  1029. local values = {...}
  1030. for i = 1, #values do
  1031. observer:onNext(values[i])
  1032. end
  1033. end
  1034. local function onError(message)
  1035. return observer:onError(message)
  1036. end
  1037. local function onCompleted()
  1038. return observer:onCompleted()
  1039. end
  1040. return self:subscribe(onNext, onError, onCompleted)
  1041. end)
  1042. end
  1043. --- Returns an Observable that produces a sliding window of the values produced by the original.
  1044. -- @arg {number} size - The size of the window. The returned observable will produce this number
  1045. -- of the most recent values as multiple arguments to onNext.
  1046. -- @returns {Observable}
  1047. function Observable:window(size)
  1048. return Observable.create(function(observer)
  1049. local window = {}
  1050. local function onNext(value)
  1051. table.insert(window, value)
  1052. if #window >= size then
  1053. observer:onNext(util.unpack(window))
  1054. table.remove(window, 1)
  1055. end
  1056. end
  1057. local function onError(message)
  1058. return observer:onError(message)
  1059. end
  1060. local function onCompleted()
  1061. return observer:onCompleted()
  1062. end
  1063. return self:subscribe(onNext, onError, onCompleted)
  1064. end)
  1065. end
  1066. --- Returns an Observable that produces values from the original along with the most recently
  1067. -- produced value from all other specified Observables. Note that only the first argument from each
  1068. -- source Observable is used.
  1069. -- @arg {Observable...} sources - The Observables to include the most recent values from.
  1070. -- @returns {Observable}
  1071. function Observable:with(...)
  1072. local sources = {...}
  1073. return Observable.create(function(observer)
  1074. local latest = setmetatable({}, {__len = util.constant(#sources)})
  1075. local function setLatest(i)
  1076. return function(value)
  1077. latest[i] = value
  1078. end
  1079. end
  1080. local function onNext(value)
  1081. return observer:onNext(value, util.unpack(latest))
  1082. end
  1083. local function onError(e)
  1084. return observer:onError(e)
  1085. end
  1086. local function onCompleted()
  1087. return observer:onCompleted()
  1088. end
  1089. for i = 1, #sources do
  1090. sources[i]:subscribe(setLatest(i), util.noop, util.noop)
  1091. end
  1092. return self:subscribe(onNext, onError, onCompleted)
  1093. end)
  1094. end
  1095. --- @class ImmediateScheduler
  1096. -- @description Schedules Observables by running all operations immediately.
  1097. local ImmediateScheduler = {}
  1098. ImmediateScheduler.__index = ImmediateScheduler
  1099. ImmediateScheduler.__tostring = util.constant('ImmediateScheduler')
  1100. --- Creates a new ImmediateScheduler.
  1101. -- @returns {ImmediateScheduler}
  1102. function ImmediateScheduler.create()
  1103. return setmetatable({}, ImmediateScheduler)
  1104. end
  1105. --- Schedules a function to be run on the scheduler. It is executed immediately.
  1106. -- @arg {function} action - The function to execute.
  1107. function ImmediateScheduler:schedule(action)
  1108. action()
  1109. end
  1110. --- @class CooperativeScheduler
  1111. -- @description Manages Observables using coroutines and a virtual clock that must be updated
  1112. -- manually.
  1113. local CooperativeScheduler = {}
  1114. CooperativeScheduler.__index = CooperativeScheduler
  1115. CooperativeScheduler.__tostring = util.constant('CooperativeScheduler')
  1116. --- Creates a new CooperativeScheduler.
  1117. -- @arg {number=0} currentTime - A time to start the scheduler at.
  1118. -- @returns {Scheduler.CooperativeScheduler}
  1119. function CooperativeScheduler.create(currentTime)
  1120. local self = {
  1121. tasks = {},
  1122. currentTime = currentTime or 0
  1123. }
  1124. return setmetatable(self, CooperativeScheduler)
  1125. end
  1126. --- Schedules a function to be run after an optional delay.
  1127. -- @arg {function} action - The function to execute. Will be converted into a coroutine. The
  1128. -- coroutine may yield execution back to the scheduler with an optional
  1129. -- number, which will put it to sleep for a time period.
  1130. -- @arg {number=0} delay - Delay execution of the action by a time period.
  1131. function CooperativeScheduler:schedule(action, delay)
  1132. local task = {
  1133. thread = coroutine.create(action),
  1134. due = self.currentTime + (delay or 0)
  1135. }
  1136. table.insert(self.tasks, task)
  1137. return Subscription.create(function()
  1138. return self:unschedule(task)
  1139. end)
  1140. end
  1141. function CooperativeScheduler:unschedule(task)
  1142. for i = 1, #self.tasks do
  1143. if self.tasks[i] == task then
  1144. table.remove(self.tasks, i)
  1145. end
  1146. end
  1147. end
  1148. --- Triggers an update of the CooperativeScheduler. The clock will be advanced and the scheduler
  1149. -- will run any coroutines that are due to be run.
  1150. -- @arg {number=0} delta - An amount of time to advance the clock by. It is common to pass in the
  1151. -- time in seconds or milliseconds elapsed since this function was last
  1152. -- called.
  1153. function CooperativeScheduler:update(delta)
  1154. self.currentTime = self.currentTime + (delta or 0)
  1155. for i = #self.tasks, 1, -1 do
  1156. local task = self.tasks[i]
  1157. if self.currentTime >= task.due then
  1158. local success, delay = coroutine.resume(task.thread)
  1159. if success then
  1160. task.due = math.max(task.due + (delay or 0), self.currentTime)
  1161. else
  1162. error(delay)
  1163. end
  1164. if coroutine.status(task.thread) == 'dead' then
  1165. table.remove(self.tasks, i)
  1166. end
  1167. end
  1168. end
  1169. end
  1170. --- Returns whether or not the CooperativeScheduler's queue is empty.
  1171. function CooperativeScheduler:isEmpty()
  1172. return not next(self.tasks)
  1173. end
  1174. --- @class Subject
  1175. -- @description Subjects function both as an Observer and as an Observable. Subjects inherit all
  1176. -- Observable functions, including subscribe. Values can also be pushed to the Subject, which will
  1177. -- be broadcasted to any subscribed Observers.
  1178. local Subject = setmetatable({}, Observable)
  1179. Subject.__index = Subject
  1180. Subject.__tostring = util.constant('Subject')
  1181. --- Creates a new Subject.
  1182. -- @returns {Subject}
  1183. function Subject.create()
  1184. local self = {
  1185. observers = {},
  1186. stopped = false
  1187. }
  1188. return setmetatable(self, Subject)
  1189. end
  1190. --- Creates a new Observer and attaches it to the Subject.
  1191. -- @arg {function|table} onNext|observer - A function called when the Subject produces a value or
  1192. -- an existing Observer to attach to the Subject.
  1193. -- @arg {function} onError - Called when the Subject terminates due to an error.
  1194. -- @arg {function} onCompleted - Called when the Subject completes normally.
  1195. function Subject:subscribe(onNext, onError, onCompleted)
  1196. local observer
  1197. if type(onNext) == 'table' then
  1198. observer = onNext
  1199. else
  1200. observer = Observer.create(onNext, onError, onCompleted)
  1201. end
  1202. table.insert(self.observers, observer)
  1203. return Subscription.create(function()
  1204. for i = 1, #self.observers do
  1205. if self.observers[i] == observer then
  1206. table.remove(self.observers, i)
  1207. return
  1208. end
  1209. end
  1210. end)
  1211. end
  1212. --- Pushes zero or more values to the Subject. They will be broadcasted to all Observers.
  1213. -- @arg {*...} values
  1214. function Subject:onNext(...)
  1215. if not self.stopped then
  1216. for i = 1, #self.observers do
  1217. self.observers[i]:onNext(...)
  1218. end
  1219. end
  1220. end
  1221. --- Signal to all Observers that an error has occurred.
  1222. -- @arg {string=} message - A string describing what went wrong.
  1223. function Subject:onError(message)
  1224. if not self.stopped then
  1225. for i = 1, #self.observers do
  1226. self.observers[i]:onError(message)
  1227. end
  1228. self.stopped = true
  1229. end
  1230. end
  1231. --- Signal to all Observers that the Subject will not produce any more values.
  1232. function Subject:onCompleted()
  1233. if not self.stopped then
  1234. for i = 1, #self.observers do
  1235. self.observers[i]:onCompleted()
  1236. end
  1237. self.stopped = true
  1238. end
  1239. end
  1240. Subject.__call = Subject.onNext
  1241. --- @class BehaviorSubject
  1242. -- @description A Subject that tracks its current value. Provides an accessor to retrieve the most
  1243. -- recent pushed value, and all subscribers immediately receive the latest value.
  1244. local BehaviorSubject = setmetatable({}, Subject)
  1245. BehaviorSubject.__index = BehaviorSubject
  1246. BehaviorSubject.__tostring = util.constant('BehaviorSubject')
  1247. --- Creates a new BehaviorSubject.
  1248. -- @arg {*...} value - The initial values.
  1249. -- @returns {Subject}
  1250. function BehaviorSubject.create(...)
  1251. local self = {
  1252. observers = {},
  1253. stopped = false
  1254. }
  1255. if select('#', ...) > 0 then
  1256. self.value = util.pack(...)
  1257. end
  1258. return setmetatable(self, BehaviorSubject)
  1259. end
  1260. --- Creates a new Observer and attaches it to the Subject. Immediately broadcasts the most recent
  1261. -- value to the Observer.
  1262. -- @arg {function} onNext - Called when the Subject produces a value.
  1263. -- @arg {function} onError - Called when the Subject terminates due to an error.
  1264. -- @arg {function} onCompleted - Called when the Subject completes normally.
  1265. function BehaviorSubject:subscribe(onNext, onError, onCompleted)
  1266. local observer = Observer.create(onNext, onError, onCompleted)
  1267. Subject.subscribe(self, observer)
  1268. if self.value then
  1269. observer:onNext(unpack(self.value))
  1270. end
  1271. end
  1272. --- Pushes zero or more values to the BehaviorSubject. They will be broadcasted to all Observers.
  1273. -- @arg {*...} values
  1274. function BehaviorSubject:onNext(...)
  1275. self.value = util.pack(...)
  1276. return Subject.onNext(self, ...)
  1277. end
  1278. --- Returns the last value emitted by the Subject, or the initial value passed to the constructor
  1279. -- if nothing has been emitted yet.
  1280. -- @returns {*...}
  1281. function BehaviorSubject:getValue()
  1282. return self.value and util.unpack(self.value)
  1283. end
  1284. Observable.wrap = Observable.buffer
  1285. return {
  1286. util = util,
  1287. Subscription = Subscription,
  1288. Observer = Observer,
  1289. Observable = Observable,
  1290. ImmediateScheduler = ImmediateScheduler,
  1291. CooperativeScheduler = CooperativeScheduler,
  1292. Subject = Subject,
  1293. BehaviorSubject = BehaviorSubject
  1294. }