rx.lua 41 KB


  1. -- RxLua v0.0.1
  2. -- https://github.com/bjornbytes/rxlua
  3. -- MIT License
  4. local util = {}
  5. util.pack = table.pack or function(...) return { n = select('#', ...), ... } end
  6. util.unpack = table.unpack or unpack
  7. util.eq = function(x, y) return x == y end
  8. util.noop = function() end
  9. util.identity = function(x) return x end
  10. util.constant = function(x) return function() return x end end
  11. --- @class Subscription
  12. -- @description A handle representing the link between an Observer and an Observable, as well as any
  13. -- work required to clean up after the Observable completes or the Observer unsubscribes.
  14. local Subscription = {}
  15. Subscription.__index = Subscription
  16. Subscription.__tostring = util.constant('Subscription')
  17. --- Creates a new Subscription.
  18. -- @arg {function=} action - The action to run when the subscription is unsubscribed. It will only
  19. -- be run once.
  20. -- @returns {Subscription}
  21. function Subscription.create(action)
  22. local self = {
  23. action = action or util.noop,
  24. unsubscribed = false
  25. }
  26. return setmetatable(self, Subscription)
  27. end
  28. --- Unsubscribes the subscription, performing any necessary cleanup work.
  29. function Subscription:unsubscribe()
  30. if self.unsubscribed then return end
  31. self.action(self)
  32. self.unsubscribed = true
  33. end
  34. --- @class Observer
  35. -- @description Observers are simple objects that receive values from Observables.
  36. local Observer = {}
  37. Observer.__index = Observer
  38. Observer.__tostring = util.constant('Observer')
  39. --- Creates a new Observer.
  40. -- @arg {function=} onNext - Called when the Observable produces a value.
  41. -- @arg {function=} onError - Called when the Observable terminates due to an error.
  42. -- @arg {function=} onCompleted - Called when the Observable completes normally.
  43. -- @returns {Observer}
  44. function Observer.create(onNext, onError, onCompleted)
  45. local self = {
  46. _onNext = onNext or util.noop,
  47. _onError = onError or error,
  48. _onCompleted = onCompleted or util.noop,
  49. stopped = false
  50. }
  51. return setmetatable(self, Observer)
  52. end
  53. --- Pushes zero or more values to the Observer.
  54. -- @arg {*...} values
  55. function Observer:onNext(...)
  56. if not self.stopped then
  57. self._onNext(...)
  58. end
  59. end
  60. --- Notify the Observer that an error has occurred.
  61. -- @arg {string=} message - A string describing what went wrong.
  62. function Observer:onError(message)
  63. if not self.stopped then
  64. self.stopped = true
  65. self._onError(message)
  66. end
  67. end
  68. --- Notify the Observer that the sequence has completed and will produce no more values.
  69. function Observer:onCompleted()
  70. if not self.stopped then
  71. self.stopped = true
  72. self._onCompleted()
  73. end
  74. end
  75. --- @class Observable
  76. -- @description Observables push values to Observers.
  77. local Observable = {}
  78. Observable.__index = Observable
  79. Observable.__tostring = util.constant('Observable')
  80. --- Creates a new Observable.
  81. -- @arg {function} subscribe - The subscription function that produces values.
  82. -- @returns {Observable}
  83. function Observable.create(subscribe)
  84. local self = {
  85. _subscribe = subscribe
  86. }
  87. return setmetatable(self, Observable)
  88. end
  89. --- Shorthand for creating an Observer and passing it to this Observable's subscription function.
  90. -- @arg {function} onNext - Called when the Observable produces a value.
  91. -- @arg {function} onError - Called when the Observable terminates due to an error.
  92. -- @arg {function} onCompleted - Called when the Observable completes normally.
  93. function Observable:subscribe(onNext, onError, onCompleted)
  94. if type(onNext) == 'table' then
  95. return self._subscribe(onNext)
  96. else
  97. return self._subscribe(Observer.create(onNext, onError, onCompleted))
  98. end
  99. end
  100. --- Returns an Observable that immediately completes without producing a value.
  101. function Observable.empty()
  102. return Observable.create(function(observer)
  103. observer:onCompleted()
  104. end)
  105. end
  106. --- Returns an Observable that never produces values and never completes.
  107. function Observable.never()
  108. return Observable.create(function(observer) end)
  109. end
  110. --- Returns an Observable that immediately produces an error.
  111. function Observable.throw(message)
  112. return Observable.create(function(observer)
  113. observer:onError(message)
  114. end)
  115. end
  116. --- Creates an Observable that produces a single value.
  117. -- @arg {*} value
  118. -- @returns {Observable}
  119. function Observable.fromValue(value)
  120. return Observable.create(function(observer)
  121. observer:onNext(value)
  122. observer:onCompleted()
  123. end)
  124. end
  125. --- Creates an Observable that produces a range of values in a manner similar to a Lua for loop.
  126. -- @arg {number} initial - The first value of the range, or the upper limit if no other arguments
  127. -- are specified.
  128. -- @arg {number=} limit - The second value of the range.
  129. -- @arg {number=1} step - An amount to increment the value by each iteration.
  130. -- @returns {Observable}
  131. function Observable.fromRange(initial, limit, step)
  132. if not limit and not step then
  133. initial, limit = 1, initial
  134. end
  135. step = step or 1
  136. return Observable.create(function(observer)
  137. for i = initial, limit, step do
  138. observer:onNext(i)
  139. end
  140. observer:onCompleted()
  141. end)
  142. end
  143. --- Creates an Observable that produces values from a table.
  144. -- @arg {table} table - The table used to create the Observable.
  145. -- @arg {function=pairs} iterator - An iterator used to iterate the table, e.g. pairs or ipairs.
  146. -- @arg {boolean} keys - Whether or not to also emit the keys of the table.
  147. -- @returns {Observable}
  148. function Observable.fromTable(t, iterator, keys)
  149. iterator = iterator or pairs
  150. return Observable.create(function(observer)
  151. for key, value in iterator(t) do
  152. observer:onNext(value, keys and key or nil)
  153. end
  154. observer:onCompleted()
  155. end)
  156. end
  157. --- Creates an Observable that produces values when the specified coroutine yields.
  158. -- @arg {thread} coroutine
  159. -- @returns {Observable}
  160. function Observable.fromCoroutine(thread, scheduler)
  161. thread = type(thread) == 'function' and coroutine.create(thread) or thread
  162. return Observable.create(function(observer)
  163. return scheduler:schedule(function()
  164. while not observer.stopped do
  165. local success, value = coroutine.resume(thread)
  166. if success then
  167. observer:onNext(value)
  168. else
  169. return observer:onError(value)
  170. end
  171. if coroutine.status(thread) == 'dead' then
  172. return observer:onCompleted()
  173. end
  174. coroutine.yield()
  175. end
  176. end)
  177. end)
  178. end
  179. --- Creates an Observable that creates a new Observable for each observer using a factory function.
  180. -- @arg {function} factory - A function that returns an Observable.
  181. -- @returns {Observable}
  182. function Observable.defer(fn)
  183. return setmetatable({
  184. subscribe = function(_, ...)
  185. local observable = fn()
  186. return observable:subscribe(...)
  187. end
  188. }, Observable)
  189. end
  190. --- Subscribes to this Observable and prints values it produces.
  191. -- @arg {string=} name - Prefixes the printed messages with a name.
  192. -- @arg {function=tostring} formatter - A function that formats one or more values to be printed.
  193. function Observable:dump(name, formatter)
  194. name = name and (name .. ' ') or ''
  195. formatter = formatter or tostring
  196. local onNext = function(...) print(name .. 'onNext: ' .. formatter(...)) end
  197. local onError = function(e) print(name .. 'onError: ' .. e) end
  198. local onCompleted = function() print(name .. 'onCompleted') end
  199. return self:subscribe(onNext, onError, onCompleted)
  200. end
  201. --- Determine whether all items emitted by an Observable meet some criteria.
  202. -- @arg {function=identity} predicate - The predicate used to evaluate objects.
  203. function Observable:all(predicate)
  204. predicate = predicate or util.identity
  205. return Observable.create(function(observer)
  206. local function onNext(...)
  207. if not predicate(...) then
  208. observer:onNext(false)
  209. observer:onCompleted()
  210. end
  211. end
  212. local function onError(e)
  213. return observer:onError(e)
  214. end
  215. local function onCompleted()
  216. observer:onNext(true)
  217. return observer:onCompleted()
  218. end
  219. return self:subscribe(onNext, onError, onCompleted)
  220. end)
  221. end
  222. --- Given a set of Observables, produces values from only the first one to produce a value.
  223. -- @arg {Observable...} observables
  224. -- @returns {Observable}
  225. function Observable.amb(a, b, ...)
  226. if not a or not b then return a end
  227. return Observable.create(function(observer)
  228. local subscriptionA, subscriptionB
  229. local function onNextA(...)
  230. if subscriptionB then subscriptionB:unsubscribe() end
  231. observer:onNext(...)
  232. end
  233. local function onErrorA(e)
  234. if subscriptionB then subscriptionB:unsubscribe() end
  235. observer:onError(e)
  236. end
  237. local function onCompletedA()
  238. if subscriptionB then subscriptionB:unsubscribe() end
  239. observer:onCompleted()
  240. end
  241. local function onNextB(...)
  242. if subscriptionA then subscriptionA:unsubscribe() end
  243. observer:onNext(...)
  244. end
  245. local function onErrorB(e)
  246. if subscriptionA then subscriptionA:unsubscribe() end
  247. observer:onError(e)
  248. end
  249. local function onCompletedB()
  250. if subscriptionA then subscriptionA:unsubscribe() end
  251. observer:onCompleted()
  252. end
  253. subscriptionA = a:subscribe(onNextA, onErrorA, onCompletedA)
  254. subscriptionB = b:subscribe(onNextB, onErrorB, onCompletedB)
  255. return Subscription.create(function()
  256. subscriptionA:unsubscribe()
  257. subscriptionB:unsubscribe()
  258. end)
  259. end):amb(...)
  260. end
  261. --- Returns an Observable that produces the average of all values produced by the original.
  262. -- @returns {Observable}
  263. function Observable:average()
  264. return Observable.create(function(observer)
  265. local sum, count = 0, 0
  266. local function onNext(value)
  267. sum = sum + value
  268. count = count + 1
  269. end
  270. local function onError(e)
  271. observer:onError(e)
  272. end
  273. local function onCompleted()
  274. if count > 0 then
  275. observer:onNext(sum / count)
  276. end
  277. observer:onCompleted()
  278. end
  279. return self:subscribe(onNext, onError, onCompleted)
  280. end)
  281. end
  282. --- Returns an Observable that buffers values from the original and produces them as multiple
  283. -- values.
  284. -- @arg {number} size - The size of the buffer.
  285. function Observable:buffer(size)
  286. return Observable.create(function(observer)
  287. local buffer = {}
  288. local function emit()
  289. if #buffer > 0 then
  290. observer:onNext(util.unpack(buffer))
  291. buffer = {}
  292. end
  293. end
  294. local function onNext(...)
  295. local values = {...}
  296. for i = 1, #values do
  297. table.insert(buffer, values[i])
  298. if #buffer >= size then
  299. emit()
  300. end
  301. end
  302. end
  303. local function onError(message)
  304. emit()
  305. return observer:onError(message)
  306. end
  307. local function onCompleted()
  308. emit()
  309. return observer:onCompleted()
  310. end
  311. return self:subscribe(onNext, onError, onCompleted)
  312. end)
  313. end
  314. --- Returns an Observable that intercepts any errors from the previous and replace them with values
  315. -- produced by a new Observable.
  316. -- @arg {function|Observable} handler - An Observable or a function that returns an Observable to
  317. -- replace the source Observable in the event of an error.
  318. -- @returns {Observable}
  319. function Observable:catch(handler)
  320. handler = handler and (type(handler) == 'function' and handler or util.constant(handler))
  321. return Observable.create(function(observer)
  322. local subscription
  323. local function onNext(...)
  324. return observer:onNext(...)
  325. end
  326. local function onError(e)
  327. if not handler then
  328. return observer:onCompleted()
  329. end
  330. local continue = handler(e)
  331. if continue then
  332. if subscription then subscription:unsubscribe() end
  333. continue:subscribe(observer)
  334. else
  335. observer:onError(e)
  336. end
  337. end
  338. local function onCompleted()
  339. observer:onCompleted()
  340. end
  341. subscription = self:subscribe(onNext, onError, onCompleted)
  342. return subscription
  343. end)
  344. end
  345. --- Returns a new Observable that runs a combinator function on the most recent values from a set
  346. -- of Observables whenever any of them produce a new value. The results of the combinator function
  347. -- are produced by the new Observable.
  348. -- @arg {Observable...} observables - One or more Observables to combine.
  349. -- @arg {function} combinator - A function that combines the latest result from each Observable and
  350. -- returns a single value.
  351. -- @returns {Observable}
  352. function Observable:combineLatest(...)
  353. local sources = {...}
  354. local combinator = table.remove(sources)
  355. if type(combinator) ~= 'function' then
  356. table.insert(sources, combinator)
  357. combinator = function(...) return ... end
  358. end
  359. table.insert(sources, 1, self)
  360. return Observable.create(function(observer)
  361. local latest = {}
  362. local pending = {util.unpack(sources)}
  363. local completed = {}
  364. local function onNext(i)
  365. return function(value)
  366. latest[i] = value
  367. pending[i] = nil
  368. if not next(pending) then
  369. observer:onNext(combinator(util.unpack(latest)))
  370. end
  371. end
  372. end
  373. local function onError(e)
  374. return observer:onError(e)
  375. end
  376. local function onCompleted(i)
  377. return function()
  378. table.insert(completed, i)
  379. if #completed == #sources then
  380. observer:onCompleted()
  381. end
  382. end
  383. end
  384. for i = 1, #sources do
  385. sources[i]:subscribe(onNext(i), onError, onCompleted(i))
  386. end
  387. end)
  388. end
  389. --- Returns a new Observable that produces the values of the first with falsy values removed.
  390. -- @returns {Observable}
  391. function Observable:compact()
  392. return self:filter(util.identity)
  393. end
  394. --- Returns a new Observable that produces the values produced by all the specified Observables in
  395. -- the order they are specified.
  396. -- @arg {Observable...} sources - The Observables to concatenate.
  397. -- @returns {Observable}
  398. function Observable:concat(other, ...)
  399. if not other then return self end
  400. local others = {...}
  401. return Observable.create(function(observer)
  402. local function onNext(...)
  403. return observer:onNext(...)
  404. end
  405. local function onError(message)
  406. return observer:onError(message)
  407. end
  408. local function onCompleted()
  409. return observer:onCompleted()
  410. end
  411. local function chain()
  412. return other:concat(util.unpack(others)):subscribe(onNext, onError, onCompleted)
  413. end
  414. return self:subscribe(onNext, onError, chain)
  415. end)
  416. end
  417. --- Returns a new Observable that produces a single boolean value representing whether or not the
  418. -- specified value was produced by the original.
  419. -- @arg {*} value - The value to search for. == is used for equality testing.
  420. -- @returns {Observable}
  421. function Observable:contains(value)
  422. return Observable.create(function(observer)
  423. local subscription
  424. local function onNext(...)
  425. local args = util.pack(...)
  426. if #args == 0 and value == nil then
  427. observer:onNext(true)
  428. if subscription then subscription:unsubscribe() end
  429. return observer:onCompleted()
  430. end
  431. for i = 1, #args do
  432. if args[i] == value then
  433. observer:onNext(true)
  434. if subscription then subscription:unsubscribe() end
  435. return observer:onCompleted()
  436. end
  437. end
  438. end
  439. local function onError(e)
  440. return observer:onError(e)
  441. end
  442. local function onCompleted()
  443. observer:onNext(false)
  444. return observer:onCompleted()
  445. end
  446. subscription = self:subscribe(onNext, onError, onCompleted)
  447. return subscription
  448. end)
  449. end
  450. --- Returns an Observable that produces a single value representing the number of values produced
  451. -- by the source value that satisfy an optional predicate.
  452. -- @arg {function=} predicate - The predicate used to match values.
  453. function Observable:count(predicate)
  454. predicate = predicate or util.constant(true)
  455. return Observable.create(function(observer)
  456. local count = 0
  457. local function onNext(...)
  458. if predicate(...) then
  459. count = count + 1
  460. end
  461. end
  462. local function onError(e)
  463. return observer:onError(e)
  464. end
  465. local function onCompleted()
  466. observer:onNext(count)
  467. observer:onCompleted()
  468. end
  469. return self:subscribe(onNext, onError, onCompleted)
  470. end)
  471. end
  472. --- Returns a new Observable that produces a default set of items if the source Observable produces
  473. -- no values.
  474. -- @arg {*...} values - Zero or more values to produce if the source completes without emitting
  475. -- anything.
  476. -- @returns {Observable}
  477. function Observable:defaultIfEmpty(...)
  478. local defaults = util.pack(...)
  479. return Observable.create(function(observer)
  480. local hasValue = false
  481. local function onNext(...)
  482. hasValue = true
  483. observer:onNext(...)
  484. end
  485. local function onError(e)
  486. observer:onError(e)
  487. end
  488. local function onCompleted()
  489. if not hasValue then
  490. observer:onNext(util.unpack(defaults))
  491. end
  492. observer:onCompleted()
  493. end
  494. return self:subscribe(onNext, onError, onCompleted)
  495. end)
  496. end
  497. --- Returns a new Observable that produces the values from the original with duplicates removed.
  498. -- @returns {Observable}
  499. function Observable:distinct()
  500. return Observable.create(function(observer)
  501. local values = {}
  502. local function onNext(x)
  503. if not values[x] then
  504. observer:onNext(x)
  505. end
  506. values[x] = true
  507. end
  508. local function onError(e)
  509. return observer:onError(e)
  510. end
  511. local function onCompleted()
  512. return observer:onCompleted()
  513. end
  514. return self:subscribe(onNext, onError, onCompleted)
  515. end)
  516. end
  517. --- Returns an Observable that only produces values from the original if they are different from
  518. -- the previous value.
  519. -- @arg {function} comparator - A function used to compare 2 values. If unspecified, == is used.
  520. -- @returns {Observable}
  521. function Observable:distinctUntilChanged(comparator)
  522. comparator = comparator or util.eq
  523. return Observable.create(function(observer)
  524. local first = true
  525. local currentValue = nil
  526. local function onNext(value, ...)
  527. if first or not comparator(value, currentValue) then
  528. observer:onNext(value, ...)
  529. currentValue = value
  530. first = false
  531. end
  532. end
  533. local function onError(message)
  534. return observer:onError(onError)
  535. end
  536. local function onCompleted()
  537. return observer:onCompleted()
  538. end
  539. return self:subscribe(onNext, onError, onCompleted)
  540. end)
  541. end
  542. --- Returns a new Observable that only produces values of the first that satisfy a predicate.
  543. -- @arg {function} predicate - The predicate used to filter values.
  544. -- @returns {Observable}
  545. function Observable:filter(predicate)
  546. predicate = predicate or util.identity
  547. return Observable.create(function(observer)
  548. local function onNext(...)
  549. if predicate(...) then
  550. return observer:onNext(...)
  551. end
  552. end
  553. local function onError(e)
  554. return observer:onError(e)
  555. end
  556. local function onCompleted()
  557. return observer:onCompleted(e)
  558. end
  559. return self:subscribe(onNext, onError, onCompleted)
  560. end)
  561. end
  562. --- Returns a new Observable that produces the first value of the original that satisfies a
  563. -- predicate.
  564. -- @arg {function} predicate - The predicate used to find a value.
  565. function Observable:find(predicate)
  566. predicate = predicate or util.identity
  567. return Observable.create(function(observer)
  568. local function onNext(...)
  569. if predicate(...) then
  570. observer:onNext(...)
  571. return observer:onCompleted()
  572. end
  573. end
  574. local function onError(message)
  575. return observer:onError(e)
  576. end
  577. local function onCompleted()
  578. return observer:onCompleted()
  579. end
  580. return self:subscribe(onNext, onError, onCompleted)
  581. end)
  582. end
  583. --- Returns a new Observable that only produces the first result of the original.
  584. -- @returns {Observable}
  585. function Observable:first()
  586. return self:take(1)
  587. end
  588. --- Returns a new Observable that subscribes to the Observables produced by the original and
  589. -- produces their values.
  590. -- @returns {Observable}
  591. function Observable:flatten()
  592. return Observable.create(function(observer)
  593. local function onError(message)
  594. return observer:onError(message)
  595. end
  596. local function onNext(observable)
  597. local function innerOnNext(...)
  598. observer:onNext(...)
  599. end
  600. observable:subscribe(innerOnNext, onError, util.noop)
  601. end
  602. local function onCompleted()
  603. return observer:onCompleted()
  604. end
  605. return self:subscribe(onNext, onError, onCompleted)
  606. end)
  607. end
  608. --- Returns a new Observable that only produces the last result of the original.
  609. -- @returns {Observable}
  610. function Observable:last()
  611. return Observable.create(function(observer)
  612. local value
  613. local empty = true
  614. local function onNext(...)
  615. value = {...}
  616. empty = false
  617. end
  618. local function onError(e)
  619. return observer:onError(e)
  620. end
  621. local function onCompleted()
  622. if not empty then
  623. observer:onNext(util.unpack(value or {}))
  624. end
  625. return observer:onCompleted()
  626. end
  627. return self:subscribe(onNext, onError, onCompleted)
  628. end)
  629. end
  630. --- Returns a new Observable that produces the values of the original transformed by a function.
  631. -- @arg {function} callback - The function to transform values from the original Observable.
  632. -- @returns {Observable}
  633. function Observable:map(callback)
  634. return Observable.create(function(observer)
  635. callback = callback or util.identity
  636. local function onNext(...)
  637. return observer:onNext(callback(...))
  638. end
  639. local function onError(e)
  640. return observer:onError(e)
  641. end
  642. local function onCompleted()
  643. return observer:onCompleted()
  644. end
  645. return self:subscribe(onNext, onError, onCompleted)
  646. end)
  647. end
  648. --- Returns a new Observable that produces the maximum value produced by the original.
  649. -- @returns {Observable}
  650. function Observable:max()
  651. return self:reduce(math.max)
  652. end
  653. --- Returns a new Observable that produces the values produced by all the specified Observables in
  654. -- the order they are produced.
  655. -- @arg {Observable...} sources - One or more Observables to merge.
  656. -- @returns {Observable}
  657. function Observable:merge(...)
  658. local sources = {...}
  659. table.insert(sources, 1, self)
  660. return Observable.create(function(observer)
  661. local function onNext(...)
  662. return observer:onNext(...)
  663. end
  664. local function onError(message)
  665. return observer:onError(message)
  666. end
  667. local function onCompleted(i)
  668. return function()
  669. sources[i] = nil
  670. if not next(sources) then
  671. observer:onCompleted()
  672. end
  673. end
  674. end
  675. for i = 1, #sources do
  676. sources[i]:subscribe(onNext, onError, onCompleted(i))
  677. end
  678. end)
  679. end
  680. --- Returns a new Observable that produces the minimum value produced by the original.
  681. -- @returns {Observable}
  682. function Observable:min()
  683. return self:reduce(math.min)
  684. end
  685. --- Returns an Observable that produces the values of the original inside tables.
  686. -- @returns {Observable}
  687. function Observable:pack()
  688. return self:map(util.pack)
  689. end
  690. --- Returns two Observables: one that produces values for which the predicate returns truthy for,
  691. -- and another that produces values for which the predicate returns falsy.
  692. -- @arg {function} predicate - The predicate used to partition the values.
  693. -- @returns {Observable}
  694. -- @returns {Observable}
  695. function Observable:partition(predicate)
  696. return self:filter(predicate), self:reject(predicate)
  697. end
  698. --- Returns a new Observable that produces values computed by extracting the given keys from the
  699. -- tables produced by the original.
  700. -- @arg {string...} keys - The key to extract from the table. Multiple keys can be specified to
  701. -- recursively pluck values from nested tables.
  702. -- @returns {Observable}
  703. function Observable:pluck(key, ...)
  704. if not key then return self end
  705. return Observable.create(function(observer)
  706. local function onNext(t)
  707. return observer:onNext(t[key])
  708. end
  709. local function onError(e)
  710. return observer:onError(e)
  711. end
  712. local function onCompleted()
  713. return observer:onCompleted()
  714. end
  715. return self:subscribe(onNext, onError, onCompleted)
  716. end):pluck(...)
  717. end
  718. --- Returns a new Observable that produces a single value computed by accumulating the results of
  719. -- running a function on each value produced by the original Observable.
  720. -- @arg {function} accumulator - Accumulates the values of the original Observable. Will be passed
  721. -- the return value of the last call as the first argument and the
  722. -- current values as the rest of the arguments.
  723. -- @arg {*} seed - A value to pass to the accumulator the first time it is run.
  724. -- @returns {Observable}
  725. function Observable:reduce(accumulator, seed)
  726. return Observable.create(function(observer)
  727. local result = seed
  728. local first = true
  729. local function onNext(...)
  730. if first and seed == nil then
  731. result = ...
  732. first = false
  733. else
  734. result = accumulator(result, ...)
  735. end
  736. end
  737. local function onError(e)
  738. return observer:onError(e)
  739. end
  740. local function onCompleted()
  741. observer:onNext(result)
  742. return observer:onCompleted()
  743. end
  744. return self:subscribe(onNext, onError, onCompleted)
  745. end)
  746. end
  747. --- Returns a new Observable that produces values from the original which do not satisfy a
  748. -- predicate.
  749. -- @arg {function} predicate - The predicate used to reject values.
  750. -- @returns {Observable}
  751. function Observable:reject(predicate)
  752. predicate = predicate or util.identity
  753. return Observable.create(function(observer)
  754. local function onNext(...)
  755. if not predicate(...) then
  756. return observer:onNext(...)
  757. end
  758. end
  759. local function onError(e)
  760. return observer:onError(e)
  761. end
  762. local function onCompleted()
  763. return observer:onCompleted(e)
  764. end
  765. return self:subscribe(onNext, onError, onCompleted)
  766. end)
  767. end
  768. --- Returns a new Observable that skips over a specified number of values produced by the original
  769. -- and produces the rest.
  770. -- @arg {number=1} n - The number of values to ignore.
  771. -- @returns {Observable}
  772. function Observable:skip(n)
  773. n = n or 1
  774. return Observable.create(function(observer)
  775. local i = 1
  776. local function onNext(...)
  777. if i > n then
  778. observer:onNext(...)
  779. else
  780. i = i + 1
  781. end
  782. end
  783. local function onError(e)
  784. return observer:onError(e)
  785. end
  786. local function onCompleted()
  787. return observer:onCompleted()
  788. end
  789. return self:subscribe(onNext, onError, onCompleted)
  790. end)
  791. end
  792. --- Returns a new Observable that skips over values produced by the original until the specified
  793. -- Observable produces a value.
  794. -- @arg {Observable} other - The Observable that triggers the production of values.
  795. -- @returns {Observable}
  796. function Observable:skipUntil(other)
  797. return Observable.create(function(observer)
  798. local triggered = false
  799. local function trigger()
  800. triggered = true
  801. end
  802. other:subscribe(trigger, trigger, trigger)
  803. local function onNext(...)
  804. if triggered then
  805. observer:onNext(...)
  806. end
  807. end
  808. local function onError()
  809. if triggered then
  810. observer:onError()
  811. end
  812. end
  813. local function onCompleted()
  814. if triggered then
  815. observer:onCompleted()
  816. end
  817. end
  818. return self:subscribe(onNext, onError, onCompleted)
  819. end)
  820. end
  821. --- Returns a new Observable that skips elements until the predicate returns falsy for one of them.
  822. -- @arg {function} predicate - The predicate used to continue skipping values.
  823. -- @returns {Observable}
  824. function Observable:skipWhile(predicate)
  825. predicate = predicate or util.identity
  826. return Observable.create(function(observer)
  827. local skipping = true
  828. local function onNext(...)
  829. if skipping then
  830. skipping = predicate(...)
  831. end
  832. if not skipping then
  833. return observer:onNext(...)
  834. end
  835. end
  836. local function onError(message)
  837. return observer:onError(message)
  838. end
  839. local function onCompleted()
  840. return observer:onCompleted()
  841. end
  842. return self:subscribe(onNext, onError, onCompleted)
  843. end)
  844. end
  845. --- Returns an Observable that produces a single value representing the sum of the values produced
  846. -- by the original.
  847. -- @returns {Observable}
  848. function Observable:sum()
  849. return self:reduce(function(x, y) return x + y end, 0)
  850. end
  851. --- Returns a new Observable that only produces the first n results of the original.
  852. -- @arg {number=1} n - The number of elements to produce before completing.
  853. -- @returns {Observable}
  854. function Observable:take(n)
  855. n = n or 1
  856. return Observable.create(function(observer)
  857. if n <= 0 then
  858. observer:onCompleted()
  859. return
  860. end
  861. local i = 1
  862. local function onNext(...)
  863. observer:onNext(...)
  864. i = i + 1
  865. if i > n then
  866. observer:onCompleted()
  867. end
  868. end
  869. local function onError(e)
  870. return observer:onError(e)
  871. end
  872. local function onCompleted()
  873. return observer:onCompleted()
  874. end
  875. return self:subscribe(onNext, onError, onCompleted)
  876. end)
  877. end
  878. --- Returns a new Observable that completes when the specified Observable fires.
  879. -- @arg {Observable} other - The Observable that triggers completion of the original.
  880. -- @returns {Observable}
  881. function Observable:takeUntil(other)
  882. return Observable.create(function(observer)
  883. local function onNext(...)
  884. return observer:onNext(...)
  885. end
  886. local function onError(e)
  887. return observer:onError(e)
  888. end
  889. local function onCompleted()
  890. return observer:onCompleted()
  891. end
  892. other:subscribe(onCompleted, onCompleted, onCompleted)
  893. return self:subscribe(onNext, onError, onCompleted)
  894. end)
  895. end
  896. --- Returns a new Observable that produces elements until the predicate returns falsy.
  897. -- @arg {function} predicate - The predicate used to continue production of values.
  898. -- @returns {Observable}
  899. function Observable:takeWhile(predicate)
  900. predicate = predicate or util.identity
  901. return Observable.create(function(observer)
  902. local taking = true
  903. local function onNext(...)
  904. if taking then
  905. taking = predicate(...)
  906. if taking then
  907. return observer:onNext(...)
  908. else
  909. return observer:onCompleted()
  910. end
  911. end
  912. end
  913. local function onError(message)
  914. return observer:onError(message)
  915. end
  916. local function onCompleted()
  917. return observer:onCompleted()
  918. end
  919. return self:subscribe(onNext, onError, onCompleted)
  920. end)
  921. end
  922. --- Runs a function each time this Observable has activity. Similar to subscribe but does not
  923. -- create a subscription.
  924. -- @arg {function=} onNext - Run when the Observable produces values.
  925. -- @arg {function=} onError - Run when the Observable encounters a problem.
  926. -- @arg {function=} onCompleted - Run when the Observable completes.
  927. -- @returns {Observable}
  928. function Observable:tap(_onNext, _onError, _onCompleted)
  929. _onNext = _onNext or util.noop
  930. _onError = _onError or util.noop
  931. _onCompleted = _onCompleted or util.noop
  932. return Observable.create(function(observer)
  933. local function onNext(...)
  934. _onNext(...)
  935. return observer:onNext(...)
  936. end
  937. local function onError(message)
  938. _onError(message)
  939. return observer:onError(message)
  940. end
  941. local function onCompleted()
  942. _onCompleted()
  943. return observer:onCompleted()
  944. end
  945. return self:subscribe(onNext, onError, onCompleted)
  946. end)
  947. end
  948. --- Returns an Observable that unpacks the tables produced by the original.
  949. -- @returns {Observable}
  950. function Observable:unpack()
  951. return self:map(util.unpack)
  952. end
  953. --- Returns an Observable that takes any values produced by the original that consist of multiple
  954. -- return values and produces each value individually.
  955. -- @returns {Observable}
  956. function Observable:unwrap()
  957. return Observable.create(function(observer)
  958. local function onNext(...)
  959. local values = {...}
  960. for i = 1, #values do
  961. observer:onNext(values[i])
  962. end
  963. end
  964. local function onError(message)
  965. return observer:onError(message)
  966. end
  967. local function onCompleted()
  968. return observer:onCompleted()
  969. end
  970. return self:subscribe(onNext, onError, onCompleted)
  971. end)
  972. end
  973. --- Returns an Observable that produces a sliding window of the values produced by the original.
  974. -- @arg {number} size - The size of the window. The returned observable will produce this number
  975. -- of the most recent values as multiple arguments to onNext.
  976. -- @returns {Observable}
  977. function Observable:window(size)
  978. return Observable.create(function(observer)
  979. local window = {}
  980. local function onNext(value)
  981. table.insert(window, value)
  982. if #window >= size then
  983. observer:onNext(util.unpack(window))
  984. table.remove(window, 1)
  985. end
  986. end
  987. local function onError(message)
  988. return observer:onError(message)
  989. end
  990. local function onCompleted()
  991. return observer:onCompleted()
  992. end
  993. return self:subscribe(onNext, onError, onCompleted)
  994. end)
  995. end
  996. --- Returns an Observable that produces values from the original along with the most recently
  997. -- produced value from all other specified Observables. Note that only the first argument from each
  998. -- source Observable is used.
  999. -- @arg {Observable...} sources - The Observables to include the most recent values from.
  1000. -- @returns {Observable}
  1001. function Observable:with(...)
  1002. local sources = {...}
  1003. return Observable.create(function(observer)
  1004. local latest = setmetatable({}, {__len = util.constant(#sources)})
  1005. local function setLatest(i)
  1006. return function(value)
  1007. latest[i] = value
  1008. end
  1009. end
  1010. local function onNext(value)
  1011. return observer:onNext(value, util.unpack(latest))
  1012. end
  1013. local function onError(e)
  1014. return observer:onError(e)
  1015. end
  1016. local function onCompleted()
  1017. return observer:onCompleted()
  1018. end
  1019. for i = 1, #sources do
  1020. sources[i]:subscribe(setLatest(i), util.noop, util.noop)
  1021. end
  1022. return self:subscribe(onNext, onError, onCompleted)
  1023. end)
  1024. end
  1025. --- @class ImmediateScheduler
  1026. -- @description Schedules Observables by running all operations immediately.
  1027. local ImmediateScheduler = {}
  1028. ImmediateScheduler.__index = ImmediateScheduler
  1029. ImmediateScheduler.__tostring = util.constant('ImmediateScheduler')
  1030. --- Creates a new ImmediateScheduler.
  1031. -- @returns {ImmediateScheduler}
  1032. function ImmediateScheduler.create()
  1033. return setmetatable({}, ImmediateScheduler)
  1034. end
  1035. --- Schedules a function to be run on the scheduler. It is executed immediately.
  1036. -- @arg {function} action - The function to execute.
  1037. function ImmediateScheduler:schedule(action)
  1038. action()
  1039. end
  1040. --- @class CooperativeScheduler
  1041. -- @description Manages Observables using coroutines and a virtual clock that must be updated
  1042. -- manually.
  1043. local CooperativeScheduler = {}
  1044. CooperativeScheduler.__index = CooperativeScheduler
  1045. CooperativeScheduler.__tostring = util.constant('CooperativeScheduler')
  1046. --- Creates a new CooperativeScheduler.
  1047. -- @arg {number=0} currentTime - A time to start the scheduler at.
  1048. -- @returns {Scheduler.CooperativeScheduler}
  1049. function CooperativeScheduler.create(currentTime)
  1050. local self = {
  1051. tasks = {},
  1052. currentTime = currentTime or 0
  1053. }
  1054. return setmetatable(self, CooperativeScheduler)
  1055. end
  1056. --- Schedules a function to be run after an optional delay.
  1057. -- @arg {function} action - The function to execute. Will be converted into a coroutine. The
  1058. -- coroutine may yield execution back to the scheduler with an optional
  1059. -- number, which will put it to sleep for a time period.
  1060. -- @arg {number=0} delay - Delay execution of the action by a time period.
  1061. function CooperativeScheduler:schedule(action, delay)
  1062. local task = {
  1063. thread = coroutine.create(action),
  1064. due = self.currentTime + (delay or 0)
  1065. }
  1066. table.insert(self.tasks, task)
  1067. return Subscription.create(function()
  1068. return self:unschedule(task)
  1069. end)
  1070. end
  1071. function CooperativeScheduler:unschedule(task)
  1072. for i = 1, #self.tasks do
  1073. if self.tasks[i] == task then
  1074. table.remove(self.tasks, i)
  1075. end
  1076. end
  1077. end
  1078. --- Triggers an update of the CooperativeScheduler. The clock will be advanced and the scheduler
  1079. -- will run any coroutines that are due to be run.
  1080. -- @arg {number=0} delta - An amount of time to advance the clock by. It is common to pass in the
  1081. -- time in seconds or milliseconds elapsed since this function was last
  1082. -- called.
  1083. function CooperativeScheduler:update(delta)
  1084. self.currentTime = self.currentTime + (delta or 0)
  1085. for i = #self.tasks, 1, -1 do
  1086. local task = self.tasks[i]
  1087. if self.currentTime >= task.due then
  1088. local success, delay = coroutine.resume(task.thread)
  1089. if success then
  1090. task.due = math.max(task.due + (delay or 0), self.currentTime)
  1091. else
  1092. error(delay)
  1093. end
  1094. if coroutine.status(task.thread) == 'dead' then
  1095. table.remove(self.tasks, i)
  1096. end
  1097. end
  1098. end
  1099. end
  1100. --- Returns whether or not the CooperativeScheduler's queue is empty.
  1101. function CooperativeScheduler:isEmpty()
  1102. return not next(self.tasks)
  1103. end
  1104. --- @class Subject
  1105. -- @description Subjects function both as an Observer and as an Observable. Subjects inherit all
  1106. -- Observable functions, including subscribe. Values can also be pushed to the Subject, which will
  1107. -- be broadcasted to any subscribed Observers.
  1108. local Subject = setmetatable({}, Observable)
  1109. Subject.__index = Subject
  1110. Subject.__tostring = util.constant('Subject')
  1111. --- Creates a new Subject.
  1112. -- @returns {Subject}
  1113. function Subject.create()
  1114. local self = {
  1115. observers = {},
  1116. stopped = false
  1117. }
  1118. return setmetatable(self, Subject)
  1119. end
  1120. --- Creates a new Observer and attaches it to the Subject.
  1121. -- @arg {function|table} onNext|observer - A function called when the Subject produces a value or
  1122. -- an existing Observer to attach to the Subject.
  1123. -- @arg {function} onError - Called when the Subject terminates due to an error.
  1124. -- @arg {function} onCompleted - Called when the Subject completes normally.
  1125. function Subject:subscribe(onNext, onError, onCompleted)
  1126. local observer
  1127. if type(onNext) == 'table' then
  1128. observer = onNext
  1129. else
  1130. observer = Observer.create(onNext, onError, onCompleted)
  1131. end
  1132. table.insert(self.observers, observer)
  1133. return Subscription.create(function()
  1134. for i = 1, #self.observers do
  1135. if self.observers[i] == observer then
  1136. table.remove(self.observers, i)
  1137. return
  1138. end
  1139. end
  1140. end)
  1141. end
  1142. --- Pushes zero or more values to the Subject. They will be broadcasted to all Observers.
  1143. -- @arg {*...} values
  1144. function Subject:onNext(...)
  1145. if not self.stopped then
  1146. for i = 1, #self.observers do
  1147. self.observers[i]:onNext(...)
  1148. end
  1149. end
  1150. end
  1151. --- Signal to all Observers that an error has occurred.
  1152. -- @arg {string=} message - A string describing what went wrong.
  1153. function Subject:onError(message)
  1154. if not self.stopped then
  1155. for i = 1, #self.observers do
  1156. self.observers[i]:onError(message)
  1157. end
  1158. self.stopped = true
  1159. end
  1160. end
  1161. --- Signal to all Observers that the Subject will not produce any more values.
  1162. function Subject:onCompleted()
  1163. if not self.stopped then
  1164. for i = 1, #self.observers do
  1165. self.observers[i]:onCompleted()
  1166. end
  1167. self.stopped = true
  1168. end
  1169. end
  1170. Subject.__call = Subject.onNext
  1171. --- @class BehaviorSubject
  1172. -- @description A Subject that tracks its current value. Provides an accessor to retrieve the most
  1173. -- recent pushed value, and all subscribers immediately receive the latest value.
  1174. local BehaviorSubject = setmetatable({}, Subject)
  1175. BehaviorSubject.__index = BehaviorSubject
  1176. BehaviorSubject.__tostring = util.constant('BehaviorSubject')
  1177. --- Creates a new BehaviorSubject.
  1178. -- @arg {*...} value - The initial values.
  1179. -- @returns {Subject}
  1180. function BehaviorSubject.create(...)
  1181. local self = {
  1182. observers = {},
  1183. stopped = false
  1184. }
  1185. if select('#', ...) > 0 then
  1186. self.value = util.pack(...)
  1187. end
  1188. return setmetatable(self, BehaviorSubject)
  1189. end
  1190. --- Creates a new Observer and attaches it to the Subject. Immediately broadcasts the most recent
  1191. -- value to the Observer.
  1192. -- @arg {function} onNext - Called when the Subject produces a value.
  1193. -- @arg {function} onError - Called when the Subject terminates due to an error.
  1194. -- @arg {function} onCompleted - Called when the Subject completes normally.
  1195. function BehaviorSubject:subscribe(onNext, onError, onCompleted)
  1196. local observer = Observer.create(onNext, onError, onCompleted)
  1197. Subject.subscribe(self, observer)
  1198. if self.value then
  1199. observer:onNext(unpack(self.value))
  1200. end
  1201. end
  1202. --- Pushes zero or more values to the BehaviorSubject. They will be broadcasted to all Observers.
  1203. -- @arg {*...} values
  1204. function BehaviorSubject:onNext(...)
  1205. self.value = util.pack(...)
  1206. return Subject.onNext(self, ...)
  1207. end
  1208. --- Returns the last value emitted by the Subject, or the initial value passed to the constructor
  1209. -- if nothing has been emitted yet.
  1210. -- @returns {*...}
  1211. function BehaviorSubject:getValue()
  1212. return self.value and util.unpack(self.value)
  1213. end
  1214. Observable.wrap = Observable.buffer
  1215. return {
  1216. util = util,
  1217. Subscription = Subscription,
  1218. Observer = Observer,
  1219. Observable = Observable,
  1220. ImmediateScheduler = ImmediateScheduler,
  1221. CooperativeScheduler = CooperativeScheduler,
  1222. Subject = Subject,
  1223. BehaviorSubject = BehaviorSubject
  1224. }