rx.lua 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895
  1. local rx
  2. local pack = table.pack or function(...) return {...} end
  3. local unpack = table.unpack or unpack
  4. local function eq(x, y) return x == y end
  5. local function noop() end
  6. local function identity(x) return x end
  7. --- @class Observer
  8. -- @description Observers are simple objects that receive values from Observables.
  9. local Observer = {}
  10. Observer.__index = Observer
  11. --- Creates a new Observer.
  12. -- @arg {function=} onNext - Called when the Observable produces a value.
  13. -- @arg {function=} onError - Called when the Observable terminates due to an error.
  14. -- @arg {function=} onComplete - Called when the Observable completes normally.
  15. -- @returns {Observer}
  16. function Observer.create(onNext, onError, onComplete)
  17. local self = {
  18. _onNext = onNext or noop,
  19. _onError = onError or error,
  20. _onComplete = onComplete or noop,
  21. stopped = false
  22. }
  23. return setmetatable(self, Observer)
  24. end
  25. --- Pushes zero or more values to the Observer.
  26. -- @arg {*...} values
  27. function Observer:onNext(...)
  28. if not self.stopped then
  29. self._onNext(...)
  30. end
  31. end
  32. --- Notify the Observer that an error has occurred.
  33. -- @arg {string=} message - A string describing what went wrong.
  34. function Observer:onError(message)
  35. if not self.stopped then
  36. self.stopped = true
  37. self._onError(message)
  38. end
  39. end
  40. --- Notify the Observer that the sequence has completed and will produce no more values.
  41. function Observer:onComplete()
  42. if not self.stopped then
  43. self.stopped = true
  44. self._onComplete()
  45. end
  46. end
  47. --- @class Observable
  48. -- @description Observables push values to Observers.
  49. local Observable = {}
  50. Observable.__index = Observable
  51. --- Creates a new Observable.
  52. -- @arg {function} subscribe - The subscription function that produces values.
  53. -- @returns {Observable}
  54. function Observable.create(subscribe)
  55. local self = {
  56. _subscribe = subscribe
  57. }
  58. return setmetatable(self, Observable)
  59. end
  60. --- Shorthand for creating an Observer and passing it to this Observable's subscription function.
  61. -- @arg {function} onNext - Called when the Observable produces a value.
  62. -- @arg {function} onError - Called when the Observable terminates due to an error.
  63. -- @arg {function} onComplete - Called when the Observable completes normally.
  64. function Observable:subscribe(onNext, onError, onComplete)
  65. if type(onNext) == 'table' then
  66. return self._subscribe(onNext)
  67. else
  68. return self._subscribe(Observer.create(onNext, onError, onComplete))
  69. end
  70. end
  71. --- Creates an Observable that produces a single value.
  72. -- @arg {*} value
  73. -- @returns {Observable}
  74. function Observable.fromValue(value)
  75. return Observable.create(function(observer)
  76. observer:onNext(value)
  77. observer:onComplete()
  78. end)
  79. end
  80. --- Creates an Observable that produces a range of values in a manner similar to a Lua for loop.
  81. -- @arg {number} initial - The first value of the range, or the upper limit if no other arguments
  82. -- are specified.
  83. -- @arg {number=} limit - The second value of the range.
  84. -- @arg {number=1} step - An amount to increment the value by each iteration.
  85. -- @returns {Observable}
  86. function Observable.fromRange(initial, limit, step)
  87. if not limit and not step then
  88. initial, limit = 1, initial
  89. end
  90. step = step or 1
  91. return Observable.create(function(observer)
  92. for i = initial, limit, step do
  93. observer:onNext(i)
  94. end
  95. observer:onComplete()
  96. end)
  97. end
  98. --- Creates an Observable that produces values from a table.
  99. -- @arg {table} table - The table used to create the Observable.
  100. -- @arg {function=pairs} iterator - An iterator used to iterate the table, e.g. pairs or ipairs.
  101. -- @returns {Observable}
  102. function Observable.fromTable(t, iterator)
  103. iterator = iterator or pairs
  104. return Observable.create(function(observer)
  105. for key, value in iterator(t) do
  106. observer:onNext(value, key)
  107. end
  108. observer:onComplete()
  109. end)
  110. end
  111. --- Creates an Observable that produces values when the specified coroutine yields.
  112. -- @arg {thread} coroutine
  113. -- @returns {Observable}
  114. function Observable.fromCoroutine(thread)
  115. thread = type(thread) == 'function' and coroutine.create(thread) or thread
  116. return Observable.create(function(observer)
  117. return rx.scheduler:schedule(function()
  118. while not observer.stopped do
  119. local success, value = coroutine.resume(thread)
  120. if success then
  121. observer:onNext(value)
  122. else
  123. return observer:onError(value)
  124. end
  125. if coroutine.status(thread) == 'dead' then
  126. return observer:onComplete()
  127. end
  128. coroutine.yield()
  129. end
  130. end)
  131. end)
  132. end
  133. --- Subscribes to this Observable and prints values it produces.
  134. -- @arg {string=} name - Prefixes the printed messages with a name.
  135. function Observable:dump(name)
  136. name = name and (name .. ' ') or ''
  137. local onNext = function(...) print(name .. 'onNext: ' .. table.concat({...}, ', ')) end
  138. local onError = function(e) print(name .. 'onError: ' .. e) end
  139. local onComplete = function() print(name .. 'onComplete') end
  140. return self:subscribe(onNext, onError, onComplete)
  141. end
  142. -- The functions below transform the values produced by an Observable and return a new Observable
  143. -- that produces these values.
  144. --- Returns an Observable that only produces values from the original if they are different from
  145. -- the previous value.
  146. -- @arg {function} comparator - A function used to compare 2 values. If unspecified, == is used.
  147. -- @returns {Observable}
  148. function Observable:changes(comparator)
  149. comparator = comparator or eq
  150. return Observable.create(function(observer)
  151. local first = true
  152. local currentValue = nil
  153. local function onNext(value)
  154. if first or not comparator(value, currentValue) then
  155. observer:onNext(value)
  156. currentValue = value
  157. first = false
  158. end
  159. end
  160. local function onError(message)
  161. return observer:onError(onError)
  162. end
  163. local function onComplete()
  164. return observer:onComplete()
  165. end
  166. return self:subscribe(onNext, onError, onComplete)
  167. end)
  168. end
  169. --- Returns a new Observable that runs a combinator function on the most recent values from a set
  170. -- of Observables whenever any of them produce a new value. The results of the combinator function
  171. -- are produced by the new Observable.
  172. -- @arg {Observable...} observables - One or more Observables to combine.
  173. -- @arg {function} combinator - A function that combines the latest result from each Observable and
  174. -- returns a single value.
  175. -- @returns {Observable}
  176. function Observable:combineLatest(...)
  177. local sources = {...}
  178. local combinator = table.remove(sources)
  179. table.insert(sources, 1, self)
  180. return Observable.create(function(observer)
  181. local latest = {}
  182. local pending = {unpack(sources)}
  183. local completed = {}
  184. local function onNext(i)
  185. return function(value)
  186. latest[i] = value
  187. pending[i] = nil
  188. if not next(pending) then
  189. observer:onNext(combinator(unpack(latest)))
  190. end
  191. end
  192. end
  193. local function onError(e)
  194. return observer:onError(e)
  195. end
  196. local function onComplete(i)
  197. return function()
  198. table.insert(completed, i)
  199. if #completed == #sources then
  200. observer:onComplete()
  201. end
  202. end
  203. end
  204. for i = 1, #sources do
  205. sources[i]:subscribe(onNext(i), onError, onComplete(i))
  206. end
  207. end)
  208. end
  209. --- Returns a new Observable that produces the values of the first with falsy values removed.
  210. -- @returns {Observable}
  211. function Observable:compact()
  212. return self:filter(identity)
  213. end
  214. --- Returns a new Observable that produces the values produced by all the specified Observables in
  215. -- the order they are specified.
  216. -- @arg {Observable...} sources - The Observables to concatenate.
  217. -- @returns {Observable}
  218. function Observable:concat(other, ...)
  219. if not other then return self end
  220. local others = {...}
  221. return Observable.create(function(observer)
  222. local function onNext(...)
  223. return observer:onNext(...)
  224. end
  225. local function onError(message)
  226. return observer:onError(message)
  227. end
  228. local function onComplete()
  229. return observer:onComplete()
  230. end
  231. local function chain()
  232. return other:concat(unpack(others)):subscribe(onNext, onError, onComplete)
  233. end
  234. return self:subscribe(onNext, onError, chain)
  235. end)
  236. end
  237. --- Returns a new Observable that produces the values from the original with duplicates removed.
  238. -- @returns {Observable}
  239. function Observable:distinct()
  240. return Observable.create(function(observer)
  241. local values = {}
  242. local function onNext(x)
  243. if not values[x] then
  244. observer:onNext(x)
  245. end
  246. values[x] = true
  247. end
  248. local function onError(e)
  249. return observer:onError(e)
  250. end
  251. local function onComplete()
  252. return observer:onComplete()
  253. end
  254. return self:subscribe(onNext, onError, onComplete)
  255. end)
  256. end
  257. --- Returns a new Observable that only produces values of the first that satisfy a predicate.
  258. -- @arg {function} predicate - The predicate used to filter values.
  259. -- @returns {Observable}
  260. function Observable:filter(predicate)
  261. predicate = predicate or identity
  262. return Observable.create(function(observer)
  263. local function onNext(...)
  264. if predicate(...) then
  265. return observer:onNext(...)
  266. end
  267. end
  268. local function onError(e)
  269. return observer:onError(e)
  270. end
  271. local function onComplete()
  272. return observer:onComplete(e)
  273. end
  274. return self:subscribe(onNext, onError, onComplete)
  275. end)
  276. end
  277. --- Returns a new Observable that produces the first value of the original that satisfies a
  278. -- predicate.
  279. -- @arg {function} predicate - The predicate used to find a value.
  280. function Observable:find(predicate)
  281. predicate = predicate or identity
  282. return Observable.create(function(observer)
  283. local function onNext(...)
  284. if predicate(...) then
  285. observer:onNext(...)
  286. return observer:onComplete()
  287. end
  288. end
  289. local function onError(message)
  290. return observer:onError(e)
  291. end
  292. local function onComplete()
  293. return observer:onComplete()
  294. end
  295. return self:subscribe(onNext, onError, onComplete)
  296. end)
  297. end
  298. --- Returns a new Observable that only produces the first result of the original.
  299. -- @returns {Observable}
  300. function Observable:first()
  301. return self:take(1)
  302. end
  303. --- Returns a new Observable that subscribes to the Observables produced by the original and
  304. -- produces their values.
  305. -- @returns {Observable}
  306. function Observable:flatten()
  307. return Observable.create(function(observer)
  308. local function onError(message)
  309. return observer:onError(message)
  310. end
  311. local function onNext(observable)
  312. local function innerOnNext(...)
  313. observer:onNext(...)
  314. end
  315. observable:subscribe(innerOnNext, onError, noop)
  316. end
  317. local function onComplete()
  318. return observer:onComplete()
  319. end
  320. return self:subscribe(onNext, onError, onComplete)
  321. end)
  322. end
  323. --- Returns a new Observable that only produces the last result of the original.
  324. -- @returns {Observable}
  325. function Observable:last()
  326. return Observable.create(function(observer)
  327. local value
  328. local function onNext(...)
  329. value = {...}
  330. end
  331. local function onError(e)
  332. return observer:onError(e)
  333. end
  334. local function onComplete()
  335. observer:onNext(unpack(value or {}))
  336. return observer:onComplete()
  337. end
  338. return self:subscribe(onNext, onError, onComplete)
  339. end)
  340. end
  341. --- Returns a new Observable that produces the values of the original transformed by a function.
  342. -- @arg {function} callback - The function to transform values from the original Observable.
  343. -- @returns {Observable}
  344. function Observable:map(callback)
  345. return Observable.create(function(observer)
  346. callback = callback or identity
  347. local function onNext(...)
  348. return observer:onNext(callback(...))
  349. end
  350. local function onError(e)
  351. return observer:onError(e)
  352. end
  353. local function onComplete()
  354. return observer:onComplete()
  355. end
  356. return self:subscribe(onNext, onError, onComplete)
  357. end)
  358. end
  359. --- Returns a new Observable that produces the maximum value produced by the original.
  360. -- @returns {Observable}
  361. function Observable:max()
  362. return self:reduce(math.max)
  363. end
  364. --- Returns a new Observable that produces the minimum value produced by the original.
  365. -- @returns {Observable}
  366. function Observable:min()
  367. return self:reduce(math.min)
  368. end
  369. --- Returns a new Observable that produces the values produced by all the specified Observables in
  370. -- the order they are produced.
  371. -- @arg {Observable...} sources - One or more Observables to merge.
  372. -- @returns {Observable}
  373. function Observable:merge(...)
  374. local sources = {...}
  375. table.insert(sources, 1, self)
  376. return Observable.create(function(observer)
  377. local function onNext(...)
  378. return observer:onNext(...)
  379. end
  380. local function onError(message)
  381. return observer:onError(message)
  382. end
  383. local function onComplete(i)
  384. return function()
  385. sources[i] = nil
  386. if not next(sources) then
  387. observer:onComplete()
  388. end
  389. end
  390. end
  391. for i = 1, #sources do
  392. sources[i]:subscribe(onNext, onError, onComplete(i))
  393. end
  394. end)
  395. end
  396. --- Returns an Observable that produces the values of the original inside tables.
  397. -- @returns {Observable}
  398. function Observable:pack()
  399. return self:map(pack)
  400. end
  401. --- Returns a new Observable that produces values computed by extracting the given key from the
  402. -- tables produced by the original.
  403. -- @arg {function} key - The key to extract from the table.
  404. -- @returns {Observable}
  405. function Observable:pluck(key)
  406. return Observable.create(function(observer)
  407. local function onNext(t)
  408. return observer:onNext(t[key])
  409. end
  410. local function onError(e)
  411. return observer:onError(e)
  412. end
  413. local function onComplete()
  414. return observer:onComplete()
  415. end
  416. return self:subscribe(onNext, onError, onComplete)
  417. end)
  418. end
  419. --- Returns a new Observable that produces a single value computed by accumulating the results of
  420. -- running a function on each value produced by the original Observable.
  421. -- @arg {function} accumulator - Accumulates the values of the original Observable. Will be passed
  422. -- the return value of the last call as the first argument and the
  423. -- current values as the rest of the arguments.
  424. -- @arg {*} seed - A value to pass to the accumulator the first time it is run.
  425. -- @returns {Observable}
  426. function Observable:reduce(accumulator, seed)
  427. return Observable.create(function(observer)
  428. local result
  429. local function onNext(...)
  430. result = result or seed or (...)
  431. result = accumulator(result, ...)
  432. end
  433. local function onError(e)
  434. return observer:onError(e)
  435. end
  436. local function onComplete()
  437. observer:onNext(result)
  438. return observer:onComplete()
  439. end
  440. return self:subscribe(onNext, onError, onComplete)
  441. end)
  442. end
  443. --- Returns a new Observable that produces values from the original which do not satisfy a
  444. -- predicate.
  445. -- @arg {function} predicate - The predicate used to reject values.
  446. -- @returns {Observable}
  447. function Observable:reject(predicate)
  448. predicate = predicate or identity
  449. return Observable.create(function(observer)
  450. local function onNext(...)
  451. if not predicate(...) then
  452. return observer:onNext(...)
  453. end
  454. end
  455. local function onError(e)
  456. return observer:onError(e)
  457. end
  458. local function onComplete()
  459. return observer:onComplete(e)
  460. end
  461. return self:subscribe(onNext, onError, onComplete)
  462. end)
  463. end
  464. --- Returns a new Observable that skips over a specified number of values produced by the original
  465. -- and produces the rest.
  466. -- @arg {number=1} n - The number of values to ignore.
  467. -- @returns {Observable}
  468. function Observable:skip(n)
  469. n = n or 1
  470. return Observable.create(function(observer)
  471. local i = 1
  472. local function onNext(...)
  473. if i > n then
  474. observer:onNext(...)
  475. else
  476. i = i + 1
  477. end
  478. end
  479. local function onError(e)
  480. return observer:onError(e)
  481. end
  482. local function onComplete()
  483. return observer:onComplete()
  484. end
  485. return self:subscribe(onNext, onError, onComplete)
  486. end)
  487. end
  488. --- Returns a new Observable that skips over values produced by the original until the specified
  489. -- Observable produces a value.
  490. -- @arg {Observable} other - The Observable that triggers the production of values.
  491. -- @returns {Observable}
  492. function Observable:skipUntil(other)
  493. return Observable.create(function(observer)
  494. local function trigger()
  495. local function onNext(...)
  496. return observer:onNext(...)
  497. end
  498. local function onError(message)
  499. return observer:onNext(message)
  500. end
  501. local function onComplete()
  502. return observer:onComplete()
  503. end
  504. return self:subscribe(onNext, onError, onComplete)
  505. end
  506. other:subscribe(trigger, trigger, trigger)
  507. end)
  508. end
  509. --- Returns a new Observable that produces the sum of the values of the original Observable as a
  510. -- single result.
  511. -- @returns {Observable}
  512. function Observable:sum()
  513. return self:reduce(function(x, y) return x + y end, 0)
  514. end
  515. --- Returns a new Observable that only produces the first n results of the original.
  516. -- @arg {number=1} n - The number of elements to produce before completing.
  517. -- @returns {Observable}
  518. function Observable:take(n)
  519. n = n or 1
  520. return Observable.create(function(observer)
  521. if n <= 0 then
  522. observer:onComplete()
  523. return
  524. end
  525. local i = 1
  526. local function onNext(...)
  527. observer:onNext(...)
  528. i = i + 1
  529. if i > n then
  530. observer:onComplete()
  531. end
  532. end
  533. local function onError(e)
  534. return observer:onError(e)
  535. end
  536. local function onComplete()
  537. return observer:onComplete()
  538. end
  539. return self:subscribe(onNext, onError, onComplete)
  540. end)
  541. end
  542. --- Returns a new Observable that completes when the specified Observable fires.
  543. -- @arg {Observable} other - The Observable that triggers completion of the original.
  544. -- @returns {Observable}
  545. function Observable:takeUntil(other)
  546. return Observable.create(function(observer)
  547. local function onNext(...)
  548. return observer:onNext(...)
  549. end
  550. local function onError(e)
  551. return observer:onError(e)
  552. end
  553. local function onComplete()
  554. return observer:onComplete()
  555. end
  556. other:subscribe(onComplete, onComplete, onComplete)
  557. return self:subscribe(onNext, onError, onComplete)
  558. end)
  559. end
  560. --- Returns an Observable that unpacks the tables produced by the original.
  561. -- @returns {Observable}
  562. function Observable:unpack()
  563. return self:map(unpack)
  564. end
  565. --- Returns an Observable that takes any values produced by the original that consist of multiple
  566. -- return values and produces each value individually.
  567. -- @returns {Observable}
  568. function Observable:unwrap()
  569. return Observable.create(function(observer)
  570. local function onNext(...)
  571. local values = {...}
  572. for i = 1, #values do
  573. observer:onNext(values[i])
  574. end
  575. end
  576. local function onError(message)
  577. return observer:onError(message)
  578. end
  579. local function onComplete()
  580. return observer:onComplete()
  581. end
  582. return self:subscribe(onNext, onError, onComplete)
  583. end)
  584. end
  585. --- Returns an Observable that buffers values from the original and produces them as multiple
  586. -- values.
  587. -- @arg {number} size - The size of the buffer.
  588. function Observable:wrap(size)
  589. return Observable.create(function(observer)
  590. local buffer = {}
  591. local function emit()
  592. if #buffer > 0 then
  593. observer:onNext(unpack(buffer))
  594. buffer = {}
  595. end
  596. end
  597. local function onNext(...)
  598. local values = {...}
  599. for i = 1, #values do
  600. table.insert(buffer, values[i])
  601. if #buffer >= size then
  602. return emit()
  603. end
  604. end
  605. end
  606. local function onError(message)
  607. emit()
  608. return observer:onError(message)
  609. end
  610. local function onComplete()
  611. emit()
  612. return observer:onComplete()
  613. end
  614. return self:subscribe(onNext, onError, onComplete)
  615. end)
  616. end
  617. --- @class Scheduler
  618. -- @description Schedulers manage groups of Observables.
  619. local Scheduler = {}
  620. --- @class CooperativeScheduler
  621. -- @description Manages Observables using coroutines and a virtual clock that must be updated
  622. -- manually.
  623. local Cooperative = {}
  624. Cooperative.__index = Cooperative
  625. --- Creates a new Cooperative Scheduler.
  626. -- @arg {number=0} currentTime - A time to start the scheduler at.
  627. -- @returns {Scheduler.Cooperative}
  628. function Cooperative.create(currentTime)
  629. local self = {
  630. tasks = {},
  631. currentTime = currentTime or 0
  632. }
  633. return setmetatable(self, Cooperative)
  634. end
  635. --- Schedules a function to be run after an optional delay.
  636. -- @arg {function} action - The function to execute. Will be converted into a coroutine. The
  637. -- coroutine may yield execution back to the scheduler with an optional
  638. -- number, which will put it to sleep for a time period.
  639. -- @arg {number=0} delay - Delay execution of the action by a time period.
  640. function Cooperative:schedule(action, delay)
  641. table.insert(self.tasks, {
  642. thread = coroutine.create(action),
  643. due = self.currentTime + (delay or 0)
  644. })
  645. end
  646. --- Triggers an update of the Cooperative Scheduler. The clock will be advanced and the scheduler
  647. -- will run any coroutines that are due to be run.
  648. -- @arg {number=0} delta - An amount of time to advance the clock by. It is common to pass in the
  649. -- time in seconds or milliseconds elapsed since this function was last
  650. -- called.
  651. function Cooperative:update(delta)
  652. self.currentTime = self.currentTime + (delta or 0)
  653. for i = #self.tasks, 1, -1 do
  654. local task = self.tasks[i]
  655. if self.currentTime >= task.due then
  656. local success, delay = coroutine.resume(task.thread)
  657. if success then
  658. task.due = math.max(task.due + (delay or 0), self.currentTime)
  659. else
  660. error(delay)
  661. end
  662. if coroutine.status(task.thread) == 'dead' then
  663. table.remove(self.tasks, i)
  664. end
  665. end
  666. end
  667. end
  668. --- Returns whether or not the Cooperative Scheduler's queue is empty.
  669. function Cooperative:isEmpty()
  670. return not next(self.tasks)
  671. end
  672. Scheduler.Cooperative = Cooperative
  673. --- @class Subject
  674. -- @description Subjects function both as an Observer and as an Observable. Subjects inherit all
  675. -- Observable functions, including subscribe. Values can also be pushed to the Subject, which will
  676. -- be broadcasted to any subscribed Observers.
  677. local Subject = setmetatable({}, Observable)
  678. Subject.__index = Subject
  679. --- Creates a new Subject.
  680. -- @arg {*...} value - The initial values.
  681. -- @returns {Subject}
  682. function Subject.create(...)
  683. local self = {
  684. value = {...},
  685. observers = {}
  686. }
  687. return setmetatable(self, Subject)
  688. end
  689. --- Creates a new Observer and attaches it to the Subject.
  690. -- @arg {function} onNext - Called when the Subject produces a value.
  691. -- @arg {function} onError - Called when the Subject terminates due to an error.
  692. -- @arg {function} onComplete - Called when the Subject completes normally.
  693. function Subject:subscribe(onNext, onError, onComplete)
  694. table.insert(self.observers, Observer.create(onNext, onError, onComplete))
  695. end
  696. --- Pushes zero or more values to the Subject. It will be broadcasted to all Observers.
  697. -- @arg {*...} values
  698. function Subject:onNext(...)
  699. self.value = {...}
  700. for i = 1, #self.observers do
  701. self.observers[i]:onNext(...)
  702. end
  703. end
  704. --- Signal to all Observers that an error has occurred.
  705. -- @arg {string=} message - A string describing what went wrong.
  706. function Subject:onError(message)
  707. for i = 1, #self.observers do
  708. self.observers[i]:onError(message)
  709. end
  710. end
  711. --- Signal to all Observers that the Subject will not produce any more values.
  712. function Subject:onComplete()
  713. for i = 1, #self.observers do
  714. self.observers[i]:onComplete()
  715. end
  716. end
  717. --- Returns the last value emitted by the Subject, or the initial value passed to the constructor
  718. -- if nothing has been emitted yet.
  719. -- @returns {*...}
  720. function Subject:getValue()
  721. return unpack(self.value or {})
  722. end
  723. Subject.__call = Subject.onNext
  724. rx = {
  725. Observer = Observer,
  726. Observable = Observable,
  727. Scheduler = Scheduler,
  728. scheduler = Scheduler.Cooperative.create(),
  729. Subject = Subject
  730. }
  731. return rx