rx.lua 24 KB


  1. local rx
  2. local pack = table.pack or function(...) return {...} end
  3. local unpack = table.unpack or unpack
  4. local function eq(x, y) return x == y end
  5. local function noop() end
  6. local function identity(x) return x end
  7. --- @class Observer
  8. -- @description Observers are simple objects that receive values from Observables.
  9. local Observer = {}
  10. Observer.__index = Observer
  11. --- Creates a new Observer.
  12. -- @arg {function=} onNext - Called when the Observable produces a value.
  13. -- @arg {function=} onError - Called when the Observable terminates due to an error.
  14. -- @arg {function=} onComplete - Called when the Observable completes normally.
  15. -- @returns {Observer}
  16. function Observer.create(onNext, onError, onComplete)
  17. local self = {
  18. _onNext = onNext or noop,
  19. _onError = onError or error,
  20. _onComplete = onComplete or noop,
  21. stopped = false
  22. }
  23. return setmetatable(self, Observer)
  24. end
  25. --- Pushes zero or more values to the Observer.
  26. -- @arg {*...} values
  27. function Observer:onNext(...)
  28. if not self.stopped then
  29. self._onNext(...)
  30. end
  31. end
  32. --- Notify the Observer that an error has occurred.
  33. -- @arg {string=} message - A string describing what went wrong.
  34. function Observer:onError(message)
  35. if not self.stopped then
  36. self.stopped = true
  37. self._onError(message)
  38. end
  39. end
  40. --- Notify the Observer that the sequence has completed and will produce no more values.
  41. function Observer:onComplete()
  42. if not self.stopped then
  43. self.stopped = true
  44. self._onComplete()
  45. end
  46. end
  47. --- @class Observable
  48. -- @description Observables push values to Observers.
  49. local Observable = {}
  50. Observable.__index = Observable
  51. --- Creates a new Observable.
  52. -- @arg {function} subscribe - The subscription function that produces values.
  53. -- @returns {Observable}
  54. function Observable.create(subscribe)
  55. local self = {
  56. _subscribe = subscribe
  57. }
  58. return setmetatable(self, Observable)
  59. end
  60. --- Creates an Observable that produces a single value.
  61. -- @arg {*} value
  62. -- @returns {Observable}
  63. function Observable.fromValue(value)
  64. return Observable.create(function(observer)
  65. observer:onNext(value)
  66. observer:onComplete()
  67. end)
  68. end
  69. --- Creates an Observable that produces a range of values in a manner similar to a Lua for loop.
  70. -- @arg {number} initial - The first value of the range, or the upper limit if no other arguments
  71. -- are specified.
  72. -- @arg {number=} limit - The second value of the range.
  73. -- @arg {number=1} step - An amount to increment the value by each iteration.
  74. -- @returns {Observable}
  75. function Observable.fromRange(initial, limit, step)
  76. if not limit and not step then
  77. initial, limit = 1, initial
  78. end
  79. step = step or 1
  80. return Observable.create(function(observer)
  81. for i = initial, limit, step do
  82. observer:onNext(i)
  83. end
  84. observer:onComplete()
  85. end)
  86. end
  87. --- Creates an Observable that produces values from a table.
  88. -- @arg {table} table - The table used to create the Observable.
  89. -- @arg {function=pairs} iterator - An iterator used to iterate the table, e.g. pairs or ipairs.
  90. -- @returns {Observable}
  91. function Observable.fromTable(t, iterator)
  92. iterator = iterator or pairs
  93. return Observable.create(function(observer)
  94. for key, value in iterator(t) do
  95. observer:onNext(value, key)
  96. end
  97. observer:onComplete()
  98. end)
  99. end
  100. --- Creates an Observable that produces values when the specified coroutine yields.
  101. -- @arg {thread} coroutine
  102. -- @returns {Observable}
  103. function Observable.fromCoroutine(thread)
  104. thread = type(thread) == 'function' and coroutine.create(thread) or thread
  105. return Observable.create(function(observer)
  106. return rx.scheduler:schedule(function()
  107. while not observer.stopped do
  108. local success, value = coroutine.resume(thread)
  109. if success then
  110. observer:onNext(value)
  111. else
  112. return observer:onError(value)
  113. end
  114. if coroutine.status(thread) == 'dead' then
  115. return observer:onComplete()
  116. end
  117. coroutine.yield()
  118. end
  119. end)
  120. end)
  121. end
  122. --- Shorthand for creating an Observer and passing it to this Observable's subscription function.
  123. -- @arg {function} onNext - Called when the Observable produces a value.
  124. -- @arg {function} onError - Called when the Observable terminates due to an error.
  125. -- @arg {function} onComplete - Called when the Observable completes normally.
  126. function Observable:subscribe(onNext, onError, onComplete)
  127. return self._subscribe(Observer.create(onNext, onError, onComplete))
  128. end
  129. --- Subscribes to this Observable and prints values it produces.
  130. -- @arg {string=} name - Prefixes the printed messages with a name.
  131. function Observable:dump(name)
  132. name = name and (name .. ' ') or ''
  133. local onNext = function(...) print(name .. 'onNext: ' .. table.concat({...}, ', ')) end
  134. local onError = function(e) print(name .. 'onError: ' .. e) end
  135. local onComplete = function() print(name .. 'onComplete') end
  136. return self:subscribe(onNext, onError, onComplete)
  137. end
  138. -- The functions below transform the values produced by an Observable and return a new Observable
  139. -- that produces these values.
  140. --- Returns an Observable that only produces values from the original if they are different from
  141. -- the previous value.
  142. -- @arg {function} comparator - A function used to compare 2 values. If unspecified, == is used.
  143. -- @returns {Observable}
  144. function Observable:changes(comparator)
  145. comparator = comparator or eq
  146. return Observable.create(function(observer)
  147. local first = true
  148. local currentValue = nil
  149. local function onNext(value)
  150. if first or not comparator(value, currentValue) then
  151. observer:onNext(value)
  152. currentValue = value
  153. first = false
  154. end
  155. end
  156. local function onError(message)
  157. return observer:onError(onError)
  158. end
  159. local function onComplete()
  160. return observer:onComplete()
  161. end
  162. return self:subscribe(onNext, onError, onComplete)
  163. end)
  164. end
  165. --- Returns a new Observable that runs a combinator function on the most recent values from a set
  166. -- of Observables whenever any of them produce a new value. The results of the combinator function
  167. -- are produced by the new Observable.
  168. -- @arg {Observable...} observables - One or more Observables to combine.
  169. -- @arg {function} combinator - A function that combines the latest result from each Observable and
  170. -- returns a single value.
  171. -- @returns {Observable}
  172. function Observable:combineLatest(...)
  173. local sources = {...}
  174. local combinator = table.remove(sources)
  175. table.insert(sources, 1, self)
  176. return Observable.create(function(observer)
  177. local latest = {}
  178. local pending = {unpack(sources)}
  179. local completed = {}
  180. local function onNext(i)
  181. return function(value)
  182. latest[i] = value
  183. pending[i] = nil
  184. if not next(pending) then
  185. observer:onNext(combinator(unpack(latest)))
  186. end
  187. end
  188. end
  189. local function onError(e)
  190. return observer:onError(e)
  191. end
  192. local function onComplete(i)
  193. return function()
  194. table.insert(completed, i)
  195. if #completed == #sources then
  196. observer:onComplete()
  197. end
  198. end
  199. end
  200. for i = 1, #sources do
  201. sources[i]:subscribe(onNext(i), onError, onComplete(i))
  202. end
  203. end)
  204. end
  205. --- Returns a new Observable that produces the values of the first with falsy values removed.
  206. -- @returns {Observable}
  207. function Observable:compact()
  208. return self:filter(identity)
  209. end
  210. --- Returns a new Observable that produces the values produced by all the specified Observables in
  211. -- the order they are specified.
  212. -- @arg {Observable...} sources - The Observables to concatenate.
  213. -- @returns {Observable}
  214. function Observable:concat(other, ...)
  215. if not other then return self end
  216. local others = {...}
  217. return Observable.create(function(observer)
  218. local function onNext(...)
  219. return observer:onNext(...)
  220. end
  221. local function onError(message)
  222. return observer:onError(message)
  223. end
  224. local function onComplete()
  225. return observer:onComplete()
  226. end
  227. local function chain()
  228. return other:concat(unpack(others)):subscribe(onNext, onError, onComplete)
  229. end
  230. return self:subscribe(onNext, onError, chain)
  231. end)
  232. end
  233. --- Returns a new Observable that produces the values from the original with duplicates removed.
  234. -- @returns {Observable}
  235. function Observable:distinct()
  236. return Observable.create(function(observer)
  237. local values = {}
  238. local function onNext(x)
  239. if not values[x] then
  240. observer:onNext(x)
  241. end
  242. values[x] = true
  243. end
  244. local function onError(e)
  245. return observer:onError(e)
  246. end
  247. local function onComplete()
  248. return observer:onComplete()
  249. end
  250. return self:subscribe(onNext, onError, onComplete)
  251. end)
  252. end
  253. --- Returns a new Observable that only produces values of the first that satisfy a predicate.
  254. -- @arg {function} predicate - The predicate used to filter values.
  255. -- @returns {Observable}
  256. function Observable:filter(predicate)
  257. predicate = predicate or identity
  258. return Observable.create(function(observer)
  259. local function onNext(...)
  260. if predicate(...) then
  261. return observer:onNext(...)
  262. end
  263. end
  264. local function onError(e)
  265. return observer:onError(e)
  266. end
  267. local function onComplete()
  268. return observer:onComplete(e)
  269. end
  270. return self:subscribe(onNext, onError, onComplete)
  271. end)
  272. end
  273. --- Returns a new Observable that produces the first value of the original that satisfies a
  274. -- predicate.
  275. -- @arg {function} predicate - The predicate used to find a value.
  276. function Observable:find(predicate)
  277. predicate = predicate or identity
  278. return Observable.create(function(observer)
  279. local function onNext(...)
  280. if predicate(...) then
  281. observer:onNext(...)
  282. return observer:onComplete()
  283. end
  284. end
  285. local function onError(message)
  286. return observer:onError(e)
  287. end
  288. local function onComplete()
  289. return observer:onComplete()
  290. end
  291. return self:subscribe(onNext, onError, onComplete)
  292. end)
  293. end
  294. --- Returns a new Observable that only produces the first result of the original.
  295. -- @returns {Observable}
  296. function Observable:first()
  297. return self:take(1)
  298. end
  299. --- Returns a new Observable that subscribes to the Observables produced by the original and
  300. -- produces their values.
  301. -- @returns {Observable}
  302. function Observable:flatten()
  303. return Observable.create(function(observer)
  304. local function onError(message)
  305. return observer:onError(message)
  306. end
  307. local function onNext(observable)
  308. local function innerOnNext(...)
  309. observer:onNext(...)
  310. end
  311. observable:subscribe(innerOnNext, onError, noop)
  312. end
  313. local function onComplete()
  314. return observer:onComplete()
  315. end
  316. return self:subscribe(onNext, onError, onComplete)
  317. end)
  318. end
  319. --- Returns a new Observable that only produces the last result of the original.
  320. -- @returns {Observable}
  321. function Observable:last()
  322. return Observable.create(function(observer)
  323. local value
  324. local function onNext(...)
  325. value = {...}
  326. end
  327. local function onError(e)
  328. return observer:onError(e)
  329. end
  330. local function onComplete()
  331. observer:onNext(unpack(value or {}))
  332. return observer:onComplete()
  333. end
  334. return self:subscribe(onNext, onError, onComplete)
  335. end)
  336. end
  337. --- Returns a new Observable that produces the values of the original transformed by a function.
  338. -- @arg {function} callback - The function to transform values from the original Observable.
  339. -- @returns {Observable}
  340. function Observable:map(callback)
  341. return Observable.create(function(observer)
  342. callback = callback or identity
  343. local function onNext(...)
  344. return observer:onNext(callback(...))
  345. end
  346. local function onError(e)
  347. return observer:onError(e)
  348. end
  349. local function onComplete()
  350. return observer:onComplete()
  351. end
  352. return self:subscribe(onNext, onError, onComplete)
  353. end)
  354. end
  355. --- Returns a new Observable that produces the maximum value produced by the original.
  356. -- @returns {Observable}
  357. function Observable:max()
  358. return self:reduce(math.max)
  359. end
  360. --- Returns a new Observable that produces the minimum value produced by the original.
  361. -- @returns {Observable}
  362. function Observable:min()
  363. return self:reduce(math.min)
  364. end
  365. --- Returns a new Observable that produces the values produced by all the specified Observables in
  366. -- the order they are produced.
  367. -- @arg {Observable...} sources - One or more Observables to merge.
  368. -- @returns {Observable}
  369. function Observable:merge(...)
  370. local sources = {...}
  371. table.insert(sources, 1, self)
  372. return Observable.create(function(observer)
  373. local function onNext(...)
  374. return observer:onNext(...)
  375. end
  376. local function onError(message)
  377. return observer:onError(message)
  378. end
  379. local function onComplete(i)
  380. return function()
  381. sources[i] = nil
  382. if not next(sources) then
  383. observer:onComplete()
  384. end
  385. end
  386. end
  387. for i = 1, #sources do
  388. sources[i]:subscribe(onNext, onError, onComplete(i))
  389. end
  390. end)
  391. end
  392. --- Returns an Observable that produces the values of the original inside tables.
  393. -- @returns {Observable}
  394. function Observable:pack()
  395. return self:map(pack)
  396. end
  397. --- Returns a new Observable that produces values computed by extracting the given key from the
  398. -- tables produced by the original.
  399. -- @arg {function} key - The key to extract from the table.
  400. -- @returns {Observable}
  401. function Observable:pluck(key)
  402. return Observable.create(function(observer)
  403. local function onNext(t)
  404. return observer:onNext(t[key])
  405. end
  406. local function onError(e)
  407. return observer:onError(e)
  408. end
  409. local function onComplete()
  410. return observer:onComplete()
  411. end
  412. return self:subscribe(onNext, onError, onComplete)
  413. end)
  414. end
  415. --- Returns a new Observable that produces a single value computed by accumulating the results of
  416. -- running a function on each value produced by the original Observable.
  417. -- @arg {function} accumulator - Accumulates the values of the original Observable. Will be passed
  418. -- the return value of the last call as the first argument and the
  419. -- current values as the rest of the arguments.
  420. -- @arg {*} seed - A value to pass to the accumulator the first time it is run.
  421. -- @returns {Observable}
  422. function Observable:reduce(accumulator, seed)
  423. return Observable.create(function(observer)
  424. local result
  425. local function onNext(...)
  426. result = result or seed or (...)
  427. result = accumulator(result, ...)
  428. end
  429. local function onError(e)
  430. return observer:onError(e)
  431. end
  432. local function onComplete()
  433. observer:onNext(result)
  434. return observer:onComplete()
  435. end
  436. return self:subscribe(onNext, onError, onComplete)
  437. end)
  438. end
  439. --- Returns a new Observable that produces values from the original which do not satisfy a
  440. -- predicate.
  441. -- @arg {function} predicate - The predicate used to reject values.
  442. -- @returns {Observable}
  443. function Observable:reject(predicate)
  444. predicate = predicate or identity
  445. return Observable.create(function(observer)
  446. local function onNext(...)
  447. if not predicate(...) then
  448. return observer:onNext(...)
  449. end
  450. end
  451. local function onError(e)
  452. return observer:onError(e)
  453. end
  454. local function onComplete()
  455. return observer:onComplete(e)
  456. end
  457. return self:subscribe(onNext, onError, onComplete)
  458. end)
  459. end
  460. --- Returns a new Observable that skips over a specified number of values produced by the original
  461. -- and produces the rest.
  462. -- @arg {number=1} n - The number of values to ignore.
  463. -- @returns {Observable}
  464. function Observable:skip(n)
  465. n = n or 1
  466. return Observable.create(function(observer)
  467. local i = 1
  468. local function onNext(...)
  469. if i > n then
  470. observer:onNext(...)
  471. else
  472. i = i + 1
  473. end
  474. end
  475. local function onError(e)
  476. return observer:onError(e)
  477. end
  478. local function onComplete()
  479. return observer:onComplete()
  480. end
  481. return self:subscribe(onNext, onError, onComplete)
  482. end)
  483. end
  484. --- Returns a new Observable that skips over values produced by the original until the specified
  485. -- Observable produces a value.
  486. -- @arg {Observable} other - The Observable that triggers the production of values.
  487. -- @returns {Observable}
  488. function Observable:skipUntil(other)
  489. return Observable.create(function(observer)
  490. local function trigger()
  491. local function onNext(...)
  492. return observer:onNext(...)
  493. end
  494. local function onError(message)
  495. return observer:onNext(message)
  496. end
  497. local function onComplete()
  498. return observer:onComplete()
  499. end
  500. return self:subscribe(onNext, onError, onComplete)
  501. end
  502. other:subscribe(trigger, trigger, trigger)
  503. end)
  504. end
  505. --- Returns a new Observable that produces the sum of the values of the original Observable as a
  506. -- single result.
  507. -- @returns {Observable}
  508. function Observable:sum()
  509. return self:reduce(function(x, y) return x + y end, 0)
  510. end
  511. --- Returns a new Observable that only produces the first n results of the original.
  512. -- @arg {number=1} n - The number of elements to produce before completing.
  513. -- @returns {Observable}
  514. function Observable:take(n)
  515. n = n or 1
  516. return Observable.create(function(observer)
  517. if n <= 0 then
  518. observer:onComplete()
  519. return
  520. end
  521. local i = 1
  522. local function onNext(...)
  523. observer:onNext(...)
  524. i = i + 1
  525. if i > n then
  526. observer:onComplete()
  527. end
  528. end
  529. local function onError(e)
  530. return observer:onError(e)
  531. end
  532. local function onComplete()
  533. return observer:onComplete()
  534. end
  535. return self:subscribe(onNext, onError, onComplete)
  536. end)
  537. end
  538. --- Returns a new Observable that completes when the specified Observable fires.
  539. -- @arg {Observable} other - The Observable that triggers completion of the original.
  540. -- @returns {Observable}
  541. function Observable:takeUntil(other)
  542. return Observable.create(function(observer)
  543. local function onNext(...)
  544. return observer:onNext(...)
  545. end
  546. local function onError(e)
  547. return observer:onError(e)
  548. end
  549. local function onComplete()
  550. return observer:onComplete()
  551. end
  552. other:subscribe(onComplete, onComplete, onComplete)
  553. return self:subscribe(onNext, onError, onComplete)
  554. end)
  555. end
  556. --- Returns an Observable that unpacks the tables produced by the original.
  557. -- @returns {Observable}
  558. function Observable:unpack()
  559. return self:map(unpack)
  560. end
  561. --- Returns an Observable that takes any values produced by the original that consist of multiple
  562. -- return values and produces each value individually.
  563. -- @returns {Observable}
  564. function Observable:unwrap()
  565. return Observable.create(function(observer)
  566. local function onNext(...)
  567. local values = {...}
  568. for i = 1, #values do
  569. observer:onNext(values[i])
  570. end
  571. end
  572. local function onError(message)
  573. return observer:onError(message)
  574. end
  575. local function onComplete()
  576. return observer:onComplete()
  577. end
  578. return self:subscribe(onNext, onError, onComplete)
  579. end)
  580. end
  581. --- Returns an Observable that buffers values from the original and produces them as multiple
  582. -- values.
  583. -- @arg {number} size - The size of the buffer.
  584. function Observable:wrap(size)
  585. return Observable.create(function(observer)
  586. local buffer = {}
  587. local function emit()
  588. if #buffer > 0 then
  589. observer:onNext(unpack(buffer))
  590. buffer = {}
  591. end
  592. end
  593. local function onNext(...)
  594. local values = {...}
  595. for i = 1, #values do
  596. table.insert(buffer, values[i])
  597. if #buffer >= size then
  598. return emit()
  599. end
  600. end
  601. end
  602. local function onError(message)
  603. emit()
  604. return observer:onError(message)
  605. end
  606. local function onComplete()
  607. emit()
  608. return observer:onComplete()
  609. end
  610. return self:subscribe(onNext, onError, onComplete)
  611. end)
  612. end
  613. --- @class Scheduler
  614. -- @description Schedulers manage groups of Observables.
  615. local Scheduler = {}
  616. --- @class CooperativeScheduler
  617. -- @description Manages Observables using coroutines and a virtual clock that must be updated
  618. -- manually.
  619. local Cooperative = {}
  620. Cooperative.__index = Cooperative
  621. --- Creates a new Cooperative Scheduler.
  622. -- @arg {number=0} currentTime - A time to start the scheduler at.
  623. -- @returns {Scheduler.Cooperative}
  624. function Cooperative.create(currentTime)
  625. local self = {
  626. tasks = {},
  627. currentTime = currentTime or 0
  628. }
  629. return setmetatable(self, Cooperative)
  630. end
  631. --- Schedules a function to be run after an optional delay.
  632. -- @arg {function} action - The function to execute. Will be converted into a coroutine. The
  633. -- coroutine may yield execution back to the scheduler with an optional
  634. -- number, which will put it to sleep for a time period.
  635. -- @arg {number=0} delay - Delay execution of the action by a time period.
  636. function Cooperative:schedule(action, delay)
  637. table.insert(self.tasks, {
  638. thread = coroutine.create(action),
  639. due = self.currentTime + (delay or 0)
  640. })
  641. end
  642. --- Triggers an update of the Cooperative Scheduler. The clock will be advanced and the scheduler
  643. -- will run any coroutines that are due to be run.
  644. -- @arg {number=0} delta - An amount of time to advance the clock by. It is common to pass in the
  645. -- time in seconds or milliseconds elapsed since this function was last
  646. -- called.
  647. function Cooperative:update(delta)
  648. self.currentTime = self.currentTime + (delta or 0)
  649. for i = #self.tasks, 1, -1 do
  650. local task = self.tasks[i]
  651. if self.currentTime >= task.due then
  652. local success, delay = coroutine.resume(task.thread)
  653. if success then
  654. task.due = math.max(task.due + (delay or 0), self.currentTime)
  655. else
  656. error(delay)
  657. end
  658. if coroutine.status(task.thread) == 'dead' then
  659. table.remove(self.tasks, i)
  660. end
  661. end
  662. end
  663. end
  664. --- Returns whether or not the Cooperative Scheduler's queue is empty.
  665. function Cooperative:isEmpty()
  666. return not next(self.tasks)
  667. end
  668. Scheduler.Cooperative = Cooperative
  669. --- @class Subject
  670. -- @description Subjects function both as an Observer and as an Observable. Subjects inherit all
  671. -- Observable functions, including subscribe. Values can also be pushed to the Subject, which will
  672. -- be broadcasted to any subscribed Observers.
  673. local Subject = setmetatable({}, Observable)
  674. Subject.__index = Subject
  675. --- Creates a new Subject.
  676. -- @arg {*...} value - The initial values.
  677. -- @returns {Subject}
  678. function Subject.create(...)
  679. local self = {
  680. value = {...},
  681. observers = {}
  682. }
  683. return setmetatable(self, Subject)
  684. end
  685. --- Creates a new Observer and attaches it to the Subject.
  686. -- @arg {function} onNext - Called when the Subject produces a value.
  687. -- @arg {function} onError - Called when the Subject terminates due to an error.
  688. -- @arg {function} onComplete - Called when the Subject completes normally.
  689. function Subject:subscribe(onNext, onError, onComplete)
  690. table.insert(self.observers, Observer.create(onNext, onError, onComplete))
  691. end
  692. --- Pushes zero or more values to the Subject. It will be broadcasted to all Observers.
  693. -- @arg {*...} values
  694. function Subject:onNext(...)
  695. self.value = {...}
  696. for i = 1, #self.observers do
  697. self.observers[i]:onNext(...)
  698. end
  699. end
  700. --- Signal to all Observers that an error has occurred.
  701. -- @arg {string=} message - A string describing what went wrong.
  702. function Subject:onError(message)
  703. for i = 1, #self.observers do
  704. self.observers[i]:onError(message)
  705. end
  706. end
  707. --- Signal to all Observers that the Subject will not produce any more values.
  708. function Subject:onComplete()
  709. for i = 1, #self.observers do
  710. self.observers[i]:onComplete()
  711. end
  712. end
  713. --- Returns the last value emitted by the Subject, or the initial value passed to the constructor
  714. -- if nothing has been emitted yet.
  715. -- @returns {*...}
  716. function Subject:getValue()
  717. return unpack(self.value or {})
  718. end
  719. Subject.__call = Subject.onNext
  720. rx = {
  721. Observer = Observer,
  722. Observable = Observable,
  723. Scheduler = Scheduler,
  724. scheduler = Scheduler.Cooperative.create(),
  725. Subject = Subject
  726. }
  727. return rx