rx.lua 61 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203
  1. -- RxLua v0.0.2
  2. -- https://github.com/bjornbytes/rxlua
  3. -- MIT License
  4. local util = {}
  5. util.pack = table.pack or function(...) return { n = select('#', ...), ... } end
  6. util.unpack = table.unpack or unpack
  7. util.eq = function(x, y) return x == y end
  8. util.noop = function() end
  9. util.identity = function(x) return x end
  10. util.constant = function(x) return function() return x end end
  11. util.isa = function(object, class)
  12. return type(object) == 'table' and getmetatable(object).__index == class
  13. end
  14. util.tryWithObserver = function(observer, fn, ...)
  15. local success, result = pcall(fn, ...)
  16. if not success then
  17. observer:onError(result)
  18. end
  19. return success, result
  20. end
  21. --- @class Subscription
  22. -- @description A handle representing the link between an Observer and an Observable, as well as any
  23. -- work required to clean up after the Observable completes or the Observer unsubscribes.
  24. local Subscription = {}
  25. Subscription.__index = Subscription
  26. Subscription.__tostring = util.constant('Subscription')
  27. --- Creates a new Subscription.
  28. -- @arg {function=} action - The action to run when the subscription is unsubscribed. It will only
  29. -- be run once.
  30. -- @returns {Subscription}
  31. function Subscription.create(action)
  32. local self = {
  33. action = action or util.noop,
  34. unsubscribed = false
  35. }
  36. return setmetatable(self, Subscription)
  37. end
  38. --- Unsubscribes the subscription, performing any necessary cleanup work.
  39. function Subscription:unsubscribe()
  40. if self.unsubscribed then return end
  41. self.action(self)
  42. self.unsubscribed = true
  43. end
  44. --- @class Observer
  45. -- @description Observers are simple objects that receive values from Observables.
  46. local Observer = {}
  47. Observer.__index = Observer
  48. Observer.__tostring = util.constant('Observer')
  49. --- Creates a new Observer.
  50. -- @arg {function=} onNext - Called when the Observable produces a value.
  51. -- @arg {function=} onError - Called when the Observable terminates due to an error.
  52. -- @arg {function=} onCompleted - Called when the Observable completes normally.
  53. -- @returns {Observer}
  54. function Observer.create(onNext, onError, onCompleted)
  55. local self = {
  56. _onNext = onNext or util.noop,
  57. _onError = onError or error,
  58. _onCompleted = onCompleted or util.noop,
  59. stopped = false
  60. }
  61. return setmetatable(self, Observer)
  62. end
  63. --- Pushes zero or more values to the Observer.
  64. -- @arg {*...} values
  65. function Observer:onNext(...)
  66. if not self.stopped then
  67. return self._onNext(...)
  68. end
  69. end
  70. --- Notify the Observer that an error has occurred.
  71. -- @arg {string=} message - A string describing what went wrong.
  72. function Observer:onError(message)
  73. --if not self.stopped then
  74. self.stopped = true
  75. self._onError(message)
  76. --end
  77. end
  78. --- Notify the Observer that the sequence has completed and will produce no more values.
  79. function Observer:onCompleted()
  80. if not self.stopped then
  81. self.stopped = true
  82. self._onCompleted()
  83. end
  84. end
  85. --- @class Observable
  86. -- @description Observables push values to Observers.
  87. local Observable = {}
  88. Observable.__index = Observable
  89. Observable.__tostring = util.constant('Observable')
  90. --- Creates a new Observable.
  91. -- @arg {function} subscribe - The subscription function that produces values.
  92. -- @returns {Observable}
  93. function Observable.create(subscribe)
  94. local self = {
  95. _subscribe = subscribe
  96. }
  97. return setmetatable(self, Observable)
  98. end
  99. --- Shorthand for creating an Observer and passing it to this Observable's subscription function.
  100. -- @arg {function} onNext - Called when the Observable produces a value.
  101. -- @arg {function} onError - Called when the Observable terminates due to an error.
  102. -- @arg {function} onCompleted - Called when the Observable completes normally.
  103. function Observable:subscribe(onNext, onError, onCompleted)
  104. if type(onNext) == 'table' then
  105. return self._subscribe(onNext)
  106. else
  107. return self._subscribe(Observer.create(onNext, onError, onCompleted))
  108. end
  109. end
  110. --- Returns an Observable that immediately completes without producing a value.
  111. function Observable.empty()
  112. return Observable.create(function(observer)
  113. observer:onCompleted()
  114. end)
  115. end
  116. --- Returns an Observable that never produces values and never completes.
  117. function Observable.never()
  118. return Observable.create(function(observer) end)
  119. end
  120. --- Returns an Observable that immediately produces an error.
  121. function Observable.throw(message)
  122. return Observable.create(function(observer)
  123. observer:onError(message)
  124. end)
  125. end
  126. --- Creates an Observable that produces a single value.
  127. -- @arg {*} value
  128. -- @returns {Observable}
  129. function Observable.fromValue(value)
  130. return Observable.create(function(observer)
  131. observer:onNext(value)
  132. observer:onCompleted()
  133. end)
  134. end
  135. --- Creates an Observable that produces a range of values in a manner similar to a Lua for loop.
  136. -- @arg {number} initial - The first value of the range, or the upper limit if no other arguments
  137. -- are specified.
  138. -- @arg {number=} limit - The second value of the range.
  139. -- @arg {number=1} step - An amount to increment the value by each iteration.
  140. -- @returns {Observable}
  141. function Observable.fromRange(initial, limit, step)
  142. if not limit and not step then
  143. initial, limit = 1, initial
  144. end
  145. step = step or 1
  146. return Observable.create(function(observer)
  147. for i = initial, limit, step do
  148. observer:onNext(i)
  149. end
  150. observer:onCompleted()
  151. end)
  152. end
  153. --- Creates an Observable that produces values from a table.
  154. -- @arg {table} table - The table used to create the Observable.
  155. -- @arg {function=pairs} iterator - An iterator used to iterate the table, e.g. pairs or ipairs.
  156. -- @arg {boolean} keys - Whether or not to also emit the keys of the table.
  157. -- @returns {Observable}
  158. function Observable.fromTable(t, iterator, keys)
  159. iterator = iterator or pairs
  160. return Observable.create(function(observer)
  161. for key, value in iterator(t) do
  162. observer:onNext(value, keys and key or nil)
  163. end
  164. observer:onCompleted()
  165. end)
  166. end
  167. --- Creates an Observable that produces values when the specified coroutine yields.
  168. -- @arg {thread} coroutine
  169. -- @returns {Observable}
  170. function Observable.fromCoroutine(thread, scheduler)
  171. thread = type(thread) == 'function' and coroutine.create(thread) or thread
  172. return Observable.create(function(observer)
  173. return scheduler:schedule(function()
  174. while not observer.stopped do
  175. local success, value = coroutine.resume(thread)
  176. if success then
  177. observer:onNext(value)
  178. else
  179. return observer:onError(value)
  180. end
  181. if coroutine.status(thread) == 'dead' then
  182. return observer:onCompleted()
  183. end
  184. coroutine.yield()
  185. end
  186. end)
  187. end)
  188. end
  189. --- Creates an Observable that creates a new Observable for each observer using a factory function.
  190. -- @arg {function} factory - A function that returns an Observable.
  191. -- @returns {Observable}
  192. function Observable.defer(fn)
  193. return setmetatable({
  194. subscribe = function(_, ...)
  195. local observable = fn()
  196. return observable:subscribe(...)
  197. end
  198. }, Observable)
  199. end
  200. --- Returns an Observable that repeats a value a specified number of times.
  201. -- @arg {*} value - The value to repeat.
  202. -- @arg {number=} count - The number of times to repeat the value. If left unspecified, the value
  203. -- is repeated an infinite number of times.
  204. -- @returns {Observable}
  205. function Observable.replicate(value, count)
  206. return Observable.create(function(observer)
  207. while count == nil or count > 0 do
  208. observer:onNext(value)
  209. if count then
  210. count = count - 1
  211. end
  212. end
  213. observer:onCompleted()
  214. end)
  215. end
  216. --- Subscribes to this Observable and prints values it produces.
  217. -- @arg {string=} name - Prefixes the printed messages with a name.
  218. -- @arg {function=tostring} formatter - A function that formats one or more values to be printed.
  219. function Observable:dump(name, formatter)
  220. name = name and (name .. ' ') or ''
  221. formatter = formatter or tostring
  222. local onNext = function(...) print(name .. 'onNext: ' .. formatter(...)) end
  223. local onError = function(e) print(name .. 'onError: ' .. e) end
  224. local onCompleted = function() print(name .. 'onCompleted') end
  225. return self:subscribe(onNext, onError, onCompleted)
  226. end
  227. --- Determine whether all items emitted by an Observable meet some criteria.
  228. -- @arg {function=identity} predicate - The predicate used to evaluate objects.
  229. function Observable:all(predicate)
  230. predicate = predicate or util.identity
  231. return Observable.create(function(observer)
  232. local function onNext(...)
  233. util.tryWithObserver(observer, function(...)
  234. if not predicate(...) then
  235. observer:onNext(false)
  236. observer:onCompleted()
  237. end
  238. end, ...)
  239. end
  240. local function onError(e)
  241. return observer:onError(e)
  242. end
  243. local function onCompleted()
  244. observer:onNext(true)
  245. return observer:onCompleted()
  246. end
  247. return self:subscribe(onNext, onError, onCompleted)
  248. end)
  249. end
  250. --- Given a set of Observables, produces values from only the first one to produce a value.
  251. -- @arg {Observable...} observables
  252. -- @returns {Observable}
  253. function Observable.amb(a, b, ...)
  254. if not a or not b then return a end
  255. return Observable.create(function(observer)
  256. local subscriptionA, subscriptionB
  257. local function onNextA(...)
  258. if subscriptionB then subscriptionB:unsubscribe() end
  259. observer:onNext(...)
  260. end
  261. local function onErrorA(e)
  262. if subscriptionB then subscriptionB:unsubscribe() end
  263. observer:onError(e)
  264. end
  265. local function onCompletedA()
  266. if subscriptionB then subscriptionB:unsubscribe() end
  267. observer:onCompleted()
  268. end
  269. local function onNextB(...)
  270. if subscriptionA then subscriptionA:unsubscribe() end
  271. observer:onNext(...)
  272. end
  273. local function onErrorB(e)
  274. if subscriptionA then subscriptionA:unsubscribe() end
  275. observer:onError(e)
  276. end
  277. local function onCompletedB()
  278. if subscriptionA then subscriptionA:unsubscribe() end
  279. observer:onCompleted()
  280. end
  281. subscriptionA = a:subscribe(onNextA, onErrorA, onCompletedA)
  282. subscriptionB = b:subscribe(onNextB, onErrorB, onCompletedB)
  283. return Subscription.create(function()
  284. subscriptionA:unsubscribe()
  285. subscriptionB:unsubscribe()
  286. end)
  287. end):amb(...)
  288. end
  289. --- Returns an Observable that produces the average of all values produced by the original.
  290. -- @returns {Observable}
  291. function Observable:average()
  292. return Observable.create(function(observer)
  293. local sum, count = 0, 0
  294. local function onNext(value)
  295. sum = sum + value
  296. count = count + 1
  297. end
  298. local function onError(e)
  299. observer:onError(e)
  300. end
  301. local function onCompleted()
  302. if count > 0 then
  303. observer:onNext(sum / count)
  304. end
  305. observer:onCompleted()
  306. end
  307. return self:subscribe(onNext, onError, onCompleted)
  308. end)
  309. end
  310. --- Returns an Observable that buffers values from the original and produces them as multiple
  311. -- values.
  312. -- @arg {number} size - The size of the buffer.
  313. function Observable:buffer(size)
  314. return Observable.create(function(observer)
  315. local buffer = {}
  316. local function emit()
  317. if #buffer > 0 then
  318. observer:onNext(util.unpack(buffer))
  319. buffer = {}
  320. end
  321. end
  322. local function onNext(...)
  323. local values = {...}
  324. for i = 1, #values do
  325. table.insert(buffer, values[i])
  326. if #buffer >= size then
  327. emit()
  328. end
  329. end
  330. end
  331. local function onError(message)
  332. emit()
  333. return observer:onError(message)
  334. end
  335. local function onCompleted()
  336. emit()
  337. return observer:onCompleted()
  338. end
  339. return self:subscribe(onNext, onError, onCompleted)
  340. end)
  341. end
  342. --- Returns an Observable that intercepts any errors from the previous and replace them with values
  343. -- produced by a new Observable.
  344. -- @arg {function|Observable} handler - An Observable or a function that returns an Observable to
  345. -- replace the source Observable in the event of an error.
  346. -- @returns {Observable}
  347. function Observable:catch(handler)
  348. handler = handler and (type(handler) == 'function' and handler or util.constant(handler))
  349. return Observable.create(function(observer)
  350. local subscription
  351. local function onNext(...)
  352. return observer:onNext(...)
  353. end
  354. local function onError(e)
  355. if not handler then
  356. return observer:onCompleted()
  357. end
  358. local success, continue = pcall(handler, e)
  359. if success and continue then
  360. if subscription then subscription:unsubscribe() end
  361. continue:subscribe(observer)
  362. else
  363. observer:onError(success and e or continue)
  364. end
  365. end
  366. local function onCompleted()
  367. observer:onCompleted()
  368. end
  369. subscription = self:subscribe(onNext, onError, onCompleted)
  370. return subscription
  371. end)
  372. end
  373. --- Returns a new Observable that runs a combinator function on the most recent values from a set
  374. -- of Observables whenever any of them produce a new value. The results of the combinator function
  375. -- are produced by the new Observable.
  376. -- @arg {Observable...} observables - One or more Observables to combine.
  377. -- @arg {function} combinator - A function that combines the latest result from each Observable and
  378. -- returns a single value.
  379. -- @returns {Observable}
  380. function Observable:combineLatest(...)
  381. local sources = {...}
  382. local combinator = table.remove(sources)
  383. if type(combinator) ~= 'function' then
  384. table.insert(sources, combinator)
  385. combinator = function(...) return ... end
  386. end
  387. table.insert(sources, 1, self)
  388. return Observable.create(function(observer)
  389. local latest = {}
  390. local pending = {util.unpack(sources)}
  391. local completed = {}
  392. local function onNext(i)
  393. return function(value)
  394. latest[i] = value
  395. pending[i] = nil
  396. if not next(pending) then
  397. util.tryWithObserver(observer, function()
  398. observer:onNext(combinator(util.unpack(latest)))
  399. end)
  400. end
  401. end
  402. end
  403. local function onError(e)
  404. return observer:onError(e)
  405. end
  406. local function onCompleted(i)
  407. return function()
  408. table.insert(completed, i)
  409. if #completed == #sources then
  410. observer:onCompleted()
  411. end
  412. end
  413. end
  414. for i = 1, #sources do
  415. sources[i]:subscribe(onNext(i), onError, onCompleted(i))
  416. end
  417. end)
  418. end
  419. --- Returns a new Observable that produces the values of the first with falsy values removed.
  420. -- @returns {Observable}
  421. function Observable:compact()
  422. return self:filter(util.identity)
  423. end
  424. --- Returns a new Observable that produces the values produced by all the specified Observables in
  425. -- the order they are specified.
  426. -- @arg {Observable...} sources - The Observables to concatenate.
  427. -- @returns {Observable}
  428. function Observable:concat(other, ...)
  429. if not other then return self end
  430. local others = {...}
  431. return Observable.create(function(observer)
  432. local function onNext(...)
  433. return observer:onNext(...)
  434. end
  435. local function onError(message)
  436. return observer:onError(message)
  437. end
  438. local function onCompleted()
  439. return observer:onCompleted()
  440. end
  441. local function chain()
  442. return other:concat(util.unpack(others)):subscribe(onNext, onError, onCompleted)
  443. end
  444. return self:subscribe(onNext, onError, chain)
  445. end)
  446. end
  447. --- Returns a new Observable that produces a single boolean value representing whether or not the
  448. -- specified value was produced by the original.
  449. -- @arg {*} value - The value to search for. == is used for equality testing.
  450. -- @returns {Observable}
  451. function Observable:contains(value)
  452. return Observable.create(function(observer)
  453. local subscription
  454. local function onNext(...)
  455. local args = util.pack(...)
  456. if #args == 0 and value == nil then
  457. observer:onNext(true)
  458. if subscription then subscription:unsubscribe() end
  459. return observer:onCompleted()
  460. end
  461. for i = 1, #args do
  462. if args[i] == value then
  463. observer:onNext(true)
  464. if subscription then subscription:unsubscribe() end
  465. return observer:onCompleted()
  466. end
  467. end
  468. end
  469. local function onError(e)
  470. return observer:onError(e)
  471. end
  472. local function onCompleted()
  473. observer:onNext(false)
  474. return observer:onCompleted()
  475. end
  476. subscription = self:subscribe(onNext, onError, onCompleted)
  477. return subscription
  478. end)
  479. end
  480. --- Returns an Observable that produces a single value representing the number of values produced
  481. -- by the source value that satisfy an optional predicate.
  482. -- @arg {function=} predicate - The predicate used to match values.
  483. function Observable:count(predicate)
  484. predicate = predicate or util.constant(true)
  485. return Observable.create(function(observer)
  486. local count = 0
  487. local function onNext(...)
  488. util.tryWithObserver(observer, function(...)
  489. if predicate(...) then
  490. count = count + 1
  491. end
  492. end, ...)
  493. end
  494. local function onError(e)
  495. return observer:onError(e)
  496. end
  497. local function onCompleted()
  498. observer:onNext(count)
  499. observer:onCompleted()
  500. end
  501. return self:subscribe(onNext, onError, onCompleted)
  502. end)
  503. end
  504. function Observable:debounce(time, scheduler)
  505. time = time or 0
  506. return Observable.create(function(observer)
  507. local debounced = {}
  508. local function wrap(key)
  509. return function(...)
  510. local value = util.pack(...)
  511. if debounced[key] then
  512. debounced[key]:unsubscribe()
  513. end
  514. local values = util.pack(...)
  515. debounced[key] = scheduler:schedule(function()
  516. return observer[key](observer, util.unpack(values))
  517. end, time)
  518. end
  519. end
  520. local subscription = self:subscribe(wrap('onNext'), wrap('onError'), wrap('onCompleted'))
  521. return Subscription.create(function()
  522. if subscription then subscription:unsubscribe() end
  523. for _, timeout in pairs(debounced) do
  524. timeout:unsubscribe()
  525. end
  526. end)
  527. end)
  528. end
  529. --- Returns a new Observable that produces a default set of items if the source Observable produces
  530. -- no values.
  531. -- @arg {*...} values - Zero or more values to produce if the source completes without emitting
  532. -- anything.
  533. -- @returns {Observable}
  534. function Observable:defaultIfEmpty(...)
  535. local defaults = util.pack(...)
  536. return Observable.create(function(observer)
  537. local hasValue = false
  538. local function onNext(...)
  539. hasValue = true
  540. observer:onNext(...)
  541. end
  542. local function onError(e)
  543. observer:onError(e)
  544. end
  545. local function onCompleted()
  546. if not hasValue then
  547. observer:onNext(util.unpack(defaults))
  548. end
  549. observer:onCompleted()
  550. end
  551. return self:subscribe(onNext, onError, onCompleted)
  552. end)
  553. end
  554. --- Returns a new Observable that produces the values of the original delayed by a time period.
  555. -- @arg {number|function} time - An amount in milliseconds to delay by, or a function which returns
  556. -- this value.
  557. -- @arg {Scheduler} scheduler - The scheduler to run the Observable on.
  558. -- @returns {Observable}
  559. function Observable:delay(time, scheduler)
  560. time = type(time) ~= 'function' and util.constant(time) or time
  561. return Observable.create(function(observer)
  562. local actions = {}
  563. local function delay(key)
  564. return function(...)
  565. local arg = util.pack(...)
  566. local handle = scheduler:schedule(function()
  567. observer[key](observer, util.unpack(arg))
  568. end, time())
  569. table.insert(actions, handle)
  570. end
  571. end
  572. local subscription = self:subscribe(delay('onNext'), delay('onError'), delay('onCompleted'))
  573. return Subscription.create(function()
  574. if subscription then subscription:unsubscribe() end
  575. for i = 1, #actions do
  576. actions[i]:unsubscribe()
  577. end
  578. end)
  579. end)
  580. end
  581. --- Returns a new Observable that produces the values from the original with duplicates removed.
  582. -- @returns {Observable}
  583. function Observable:distinct()
  584. return Observable.create(function(observer)
  585. local values = {}
  586. local function onNext(x)
  587. if not values[x] then
  588. observer:onNext(x)
  589. end
  590. values[x] = true
  591. end
  592. local function onError(e)
  593. return observer:onError(e)
  594. end
  595. local function onCompleted()
  596. return observer:onCompleted()
  597. end
  598. return self:subscribe(onNext, onError, onCompleted)
  599. end)
  600. end
  601. --- Returns an Observable that only produces values from the original if they are different from
  602. -- the previous value.
  603. -- @arg {function} comparator - A function used to compare 2 values. If unspecified, == is used.
  604. -- @returns {Observable}
  605. function Observable:distinctUntilChanged(comparator)
  606. comparator = comparator or util.eq
  607. return Observable.create(function(observer)
  608. local first = true
  609. local currentValue = nil
  610. local function onNext(value, ...)
  611. local values = util.pack(...)
  612. util.tryWithObserver(observer, function()
  613. if first or not comparator(value, currentValue) then
  614. observer:onNext(value, util.unpack(values))
  615. currentValue = value
  616. first = false
  617. end
  618. end)
  619. end
  620. local function onError(message)
  621. return observer:onError(message)
  622. end
  623. local function onCompleted()
  624. return observer:onCompleted()
  625. end
  626. return self:subscribe(onNext, onError, onCompleted)
  627. end)
  628. end
  629. --- Returns an Observable that produces the nth element produced by the source Observable.
  630. -- @arg {number} index - The index of the item, with an index of 1 representing the first.
  631. -- @returns {Observable}
  632. function Observable:elementAt(index)
  633. return Observable.create(function(observer)
  634. local subscription
  635. local i = 1
  636. local function onNext(...)
  637. if i == index then
  638. observer:onNext(...)
  639. observer:onCompleted()
  640. if subscription then
  641. subscription:unsubscribe()
  642. end
  643. else
  644. i = i + 1
  645. end
  646. end
  647. local function onError(e)
  648. return observer:onError(e)
  649. end
  650. local function onCompleted()
  651. return observer:onCompleted()
  652. end
  653. subscription = self:subscribe(onNext, onError, onCompleted)
  654. return subscription
  655. end)
  656. end
  657. --- Returns a new Observable that only produces values of the first that satisfy a predicate.
  658. -- @arg {function} predicate - The predicate used to filter values.
  659. -- @returns {Observable}
  660. function Observable:filter(predicate)
  661. predicate = predicate or util.identity
  662. return Observable.create(function(observer)
  663. local function onNext(...)
  664. util.tryWithObserver(observer, function(...)
  665. if predicate(...) then
  666. return observer:onNext(...)
  667. end
  668. end, ...)
  669. end
  670. local function onError(e)
  671. return observer:onError(e)
  672. end
  673. local function onCompleted()
  674. return observer:onCompleted()
  675. end
  676. return self:subscribe(onNext, onError, onCompleted)
  677. end)
  678. end
  679. --- Returns a new Observable that produces the first value of the original that satisfies a
  680. -- predicate.
  681. -- @arg {function} predicate - The predicate used to find a value.
  682. function Observable:find(predicate)
  683. predicate = predicate or util.identity
  684. return Observable.create(function(observer)
  685. local function onNext(...)
  686. util.tryWithObserver(observer, function(...)
  687. if predicate(...) then
  688. observer:onNext(...)
  689. return observer:onCompleted()
  690. end
  691. end, ...)
  692. end
  693. local function onError(message)
  694. return observer:onError(message)
  695. end
  696. local function onCompleted()
  697. return observer:onCompleted()
  698. end
  699. return self:subscribe(onNext, onError, onCompleted)
  700. end)
  701. end
  702. --- Returns a new Observable that only produces the first result of the original.
  703. -- @returns {Observable}
  704. function Observable:first()
  705. return self:take(1)
  706. end
  707. --- Returns a new Observable that transform the items emitted by an Observable into Observables,
  708. -- then flatten the emissions from those into a single Observable
  709. -- @arg {function} callback - The function to transform values from the original Observable.
  710. -- @returns {Observable}
  711. function Observable:flatMap(callback)
  712. callback = callback or util.identity
  713. return self:map(callback):flatten()
  714. end
  715. --- Returns a new Observable that uses a callback to create Observables from the values produced by
  716. -- the source, then produces values from the most recent of these Observables.
  717. -- @arg {function=identity} callback - The function used to convert values to Observables.
  718. -- @returns {Observable}
  719. function Observable:flatMapLatest(callback)
  720. callback = callback or util.identity
  721. return Observable.create(function(observer)
  722. local innerSubscription
  723. local function onNext(...)
  724. observer:onNext(...)
  725. end
  726. local function onError(e)
  727. return observer:onError(e)
  728. end
  729. local function onCompleted()
  730. return observer:onCompleted()
  731. end
  732. local function subscribeInner(...)
  733. if innerSubscription then
  734. innerSubscription:unsubscribe()
  735. end
  736. return util.tryWithObserver(observer, function(...)
  737. innerSubscription = callback(...):subscribe(onNext, onError)
  738. end, ...)
  739. end
  740. local subscription = self:subscribe(subscribeInner, onError, onCompleted)
  741. return Subscription.create(function()
  742. if innerSubscription then
  743. innerSubscription:unsubscribe()
  744. end
  745. if subscription then
  746. subscription:unsubscribe()
  747. end
  748. end)
  749. end)
  750. end
  751. --- Returns a new Observable that subscribes to the Observables produced by the original and
  752. -- produces their values.
  753. -- @returns {Observable}
  754. function Observable:flatten()
  755. return Observable.create(function(observer)
  756. local function onError(message)
  757. return observer:onError(message)
  758. end
  759. local function onNext(observable)
  760. local function innerOnNext(...)
  761. observer:onNext(...)
  762. end
  763. observable:subscribe(innerOnNext, onError, util.noop)
  764. end
  765. local function onCompleted()
  766. return observer:onCompleted()
  767. end
  768. return self:subscribe(onNext, onError, onCompleted)
  769. end)
  770. end
  771. --- Returns an Observable that terminates when the source terminates but does not produce any
  772. -- elements.
  773. -- @returns {Observable}
  774. function Observable:ignoreElements()
  775. return Observable.create(function(observer)
  776. local function onError(message)
  777. return observer:onError(message)
  778. end
  779. local function onCompleted()
  780. return observer:onCompleted()
  781. end
  782. return self:subscribe(nil, onError, onCompleted)
  783. end)
  784. end
  785. --- Returns a new Observable that only produces the last result of the original.
  786. -- @returns {Observable}
  787. function Observable:last()
  788. return Observable.create(function(observer)
  789. local value
  790. local empty = true
  791. local function onNext(...)
  792. value = {...}
  793. empty = false
  794. end
  795. local function onError(e)
  796. return observer:onError(e)
  797. end
  798. local function onCompleted()
  799. if not empty then
  800. observer:onNext(util.unpack(value or {}))
  801. end
  802. return observer:onCompleted()
  803. end
  804. return self:subscribe(onNext, onError, onCompleted)
  805. end)
  806. end
  807. --- Returns a new Observable that produces the values of the original transformed by a function.
  808. -- @arg {function} callback - The function to transform values from the original Observable.
  809. -- @returns {Observable}
  810. function Observable:map(callback)
  811. return Observable.create(function(observer)
  812. callback = callback or util.identity
  813. local function onNext(...)
  814. return util.tryWithObserver(observer, function(...)
  815. return observer:onNext(callback(...))
  816. end, ...)
  817. end
  818. local function onError(e)
  819. return observer:onError(e)
  820. end
  821. local function onCompleted()
  822. return observer:onCompleted()
  823. end
  824. return self:subscribe(onNext, onError, onCompleted)
  825. end)
  826. end
  827. --- Returns a new Observable that produces the maximum value produced by the original.
  828. -- @returns {Observable}
  829. function Observable:max()
  830. return self:reduce(math.max)
  831. end
  832. --- Returns a new Observable that produces the values produced by all the specified Observables in
  833. -- the order they are produced.
  834. -- @arg {Observable...} sources - One or more Observables to merge.
  835. -- @returns {Observable}
  836. function Observable:merge(...)
  837. local sources = {...}
  838. table.insert(sources, 1, self)
  839. return Observable.create(function(observer)
  840. local function onNext(...)
  841. return observer:onNext(...)
  842. end
  843. local function onError(message)
  844. return observer:onError(message)
  845. end
  846. local function onCompleted(i)
  847. return function()
  848. sources[i] = nil
  849. if not next(sources) then
  850. observer:onCompleted()
  851. end
  852. end
  853. end
  854. for i = 1, #sources do
  855. sources[i]:subscribe(onNext, onError, onCompleted(i))
  856. end
  857. end)
  858. end
  859. --- Returns a new Observable that produces the minimum value produced by the original.
  860. -- @returns {Observable}
  861. function Observable:min()
  862. return self:reduce(math.min)
  863. end
  864. --- Returns an Observable that produces the values of the original inside tables.
  865. -- @returns {Observable}
  866. function Observable:pack()
  867. return self:map(util.pack)
  868. end
  869. --- Returns two Observables: one that produces values for which the predicate returns truthy for,
  870. -- and another that produces values for which the predicate returns falsy.
  871. -- @arg {function} predicate - The predicate used to partition the values.
  872. -- @returns {Observable}
  873. -- @returns {Observable}
  874. function Observable:partition(predicate)
  875. return self:filter(predicate), self:reject(predicate)
  876. end
  877. --- Returns a new Observable that produces values computed by extracting the given keys from the
  878. -- tables produced by the original.
  879. -- @arg {string...} keys - The key to extract from the table. Multiple keys can be specified to
  880. -- recursively pluck values from nested tables.
  881. -- @returns {Observable}
  882. function Observable:pluck(key, ...)
  883. if not key then return self end
  884. if type(key) ~= 'string' and type(key) ~= 'number' then
  885. return Observable.throw('pluck key must be a string')
  886. end
  887. return Observable.create(function(observer)
  888. local function onNext(t)
  889. return observer:onNext(t[key])
  890. end
  891. local function onError(e)
  892. return observer:onError(e)
  893. end
  894. local function onCompleted()
  895. return observer:onCompleted()
  896. end
  897. return self:subscribe(onNext, onError, onCompleted)
  898. end):pluck(...)
  899. end
  900. --- Returns a new Observable that produces a single value computed by accumulating the results of
  901. -- running a function on each value produced by the original Observable.
  902. -- @arg {function} accumulator - Accumulates the values of the original Observable. Will be passed
  903. -- the return value of the last call as the first argument and the
  904. -- current values as the rest of the arguments.
  905. -- @arg {*} seed - A value to pass to the accumulator the first time it is run.
  906. -- @returns {Observable}
  907. function Observable:reduce(accumulator, seed)
  908. return Observable.create(function(observer)
  909. local result = seed
  910. local first = true
  911. local function onNext(...)
  912. if first and seed == nil then
  913. result = ...
  914. first = false
  915. else
  916. return util.tryWithObserver(observer, function(...)
  917. result = accumulator(result, ...)
  918. end, ...)
  919. end
  920. end
  921. local function onError(e)
  922. return observer:onError(e)
  923. end
  924. local function onCompleted()
  925. observer:onNext(result)
  926. return observer:onCompleted()
  927. end
  928. return self:subscribe(onNext, onError, onCompleted)
  929. end)
  930. end
  931. --- Returns a new Observable that produces values from the original which do not satisfy a
  932. -- predicate.
  933. -- @arg {function} predicate - The predicate used to reject values.
  934. -- @returns {Observable}
  935. function Observable:reject(predicate)
  936. predicate = predicate or util.identity
  937. return Observable.create(function(observer)
  938. local function onNext(...)
  939. util.tryWithObserver(observer, function(...)
  940. if not predicate(...) then
  941. return observer:onNext(...)
  942. end
  943. end, ...)
  944. end
  945. local function onError(e)
  946. return observer:onError(e)
  947. end
  948. local function onCompleted()
  949. return observer:onCompleted()
  950. end
  951. return self:subscribe(onNext, onError, onCompleted)
  952. end)
  953. end
  954. --- Returns an Observable that restarts in the event of an error.
  955. -- @arg {number=} count - The maximum number of times to retry. If left unspecified, an infinite
  956. -- number of retries will be attempted.
  957. -- @returns {Observable}
  958. function Observable:retry(count)
  959. return Observable.create(function(observer)
  960. local subscription
  961. local retries = 0
  962. local function onNext(...)
  963. return observer:onNext(...)
  964. end
  965. local function onCompleted()
  966. return observer:onCompleted()
  967. end
  968. local function onError(message)
  969. if subscription then
  970. subscription:unsubscribe()
  971. end
  972. retries = retries + 1
  973. if count and retries > count then
  974. return observer:onError(message)
  975. end
  976. subscription = self:subscribe(onNext, onError, onCompleted)
  977. end
  978. return self:subscribe(onNext, onError, onCompleted)
  979. end)
  980. end
  981. --- Returns a new Observable that produces its most recent value every time the specified observable
  982. -- produces a value.
  983. -- @arg {Observable} sampler - The Observable that is used to sample values from this Observable.
  984. -- @returns {Observable}
  985. function Observable:sample(sampler)
  986. if not sampler then error('Expected an Observable') end
  987. return Observable.create(function(observer)
  988. local latest = {}
  989. local function setLatest(...)
  990. latest = util.pack(...)
  991. end
  992. local function onNext()
  993. return observer:onNext(util.unpack(latest))
  994. end
  995. local function onError(message)
  996. return observer:onError(message)
  997. end
  998. local function onCompleted()
  999. return observer:onCompleted()
  1000. end
  1001. local sourceSubscription = self:subscribe(setLatest, onError)
  1002. local sampleSubscription = sampler:subscribe(onNext, onError, onCompleted)
  1003. return Subscription.create(function()
  1004. if sourceSubscription then sourceSubscription:unsubscribe() end
  1005. if sampleSubscription then sampleSubscription:unsubscribe() end
  1006. end)
  1007. end)
  1008. end
  1009. --- Returns a new Observable that produces values computed by accumulating the results of running a
  1010. -- function on each value produced by the original Observable.
  1011. -- @arg {function} accumulator - Accumulates the values of the original Observable. Will be passed
  1012. -- the return value of the last call as the first argument and the
  1013. -- current values as the rest of the arguments. Each value returned
  1014. -- from this function will be emitted by the Observable.
  1015. -- @arg {*} seed - A value to pass to the accumulator the first time it is run.
  1016. -- @returns {Observable}
  1017. function Observable:scan(accumulator, seed)
  1018. return Observable.create(function(observer)
  1019. local result = seed
  1020. local first = true
  1021. local function onNext(...)
  1022. if first and seed == nil then
  1023. result = ...
  1024. first = false
  1025. else
  1026. return util.tryWithObserver(observer, function(...)
  1027. result = accumulator(result, ...)
  1028. observer:onNext(result)
  1029. end, ...)
  1030. end
  1031. end
  1032. local function onError(e)
  1033. return observer:onError(e)
  1034. end
  1035. local function onCompleted()
  1036. return observer:onCompleted()
  1037. end
  1038. return self:subscribe(onNext, onError, onCompleted)
  1039. end)
  1040. end
  1041. --- Returns a new Observable that skips over a specified number of values produced by the original
  1042. -- and produces the rest.
  1043. -- @arg {number=1} n - The number of values to ignore.
  1044. -- @returns {Observable}
  1045. function Observable:skip(n)
  1046. n = n or 1
  1047. return Observable.create(function(observer)
  1048. local i = 1
  1049. local function onNext(...)
  1050. if i > n then
  1051. observer:onNext(...)
  1052. else
  1053. i = i + 1
  1054. end
  1055. end
  1056. local function onError(e)
  1057. return observer:onError(e)
  1058. end
  1059. local function onCompleted()
  1060. return observer:onCompleted()
  1061. end
  1062. return self:subscribe(onNext, onError, onCompleted)
  1063. end)
  1064. end
  1065. --- Returns an Observable that omits a specified number of values from the end of the original
  1066. -- Observable.
  1067. -- @arg {number} count - The number of items to omit from the end.
  1068. -- @returns {Observable}
  1069. function Observable:skipLast(count)
  1070. local buffer = {}
  1071. return Observable.create(function(observer)
  1072. local function emit()
  1073. if #buffer > count and buffer[1] then
  1074. local values = table.remove(buffer, 1)
  1075. observer:onNext(util.unpack(values))
  1076. end
  1077. end
  1078. local function onNext(...)
  1079. emit()
  1080. table.insert(buffer, util.pack(...))
  1081. end
  1082. local function onError(message)
  1083. return observer:onError(message)
  1084. end
  1085. local function onCompleted()
  1086. emit()
  1087. return observer:onCompleted()
  1088. end
  1089. return self:subscribe(onNext, onError, onCompleted)
  1090. end)
  1091. end
  1092. --- Returns a new Observable that skips over values produced by the original until the specified
  1093. -- Observable produces a value.
  1094. -- @arg {Observable} other - The Observable that triggers the production of values.
  1095. -- @returns {Observable}
  1096. function Observable:skipUntil(other)
  1097. return Observable.create(function(observer)
  1098. local triggered = false
  1099. local function trigger()
  1100. triggered = true
  1101. end
  1102. other:subscribe(trigger, trigger, trigger)
  1103. local function onNext(...)
  1104. if triggered then
  1105. observer:onNext(...)
  1106. end
  1107. end
  1108. local function onError()
  1109. if triggered then
  1110. observer:onError()
  1111. end
  1112. end
  1113. local function onCompleted()
  1114. if triggered then
  1115. observer:onCompleted()
  1116. end
  1117. end
  1118. return self:subscribe(onNext, onError, onCompleted)
  1119. end)
  1120. end
  1121. --- Returns a new Observable that skips elements until the predicate returns falsy for one of them.
  1122. -- @arg {function} predicate - The predicate used to continue skipping values.
  1123. -- @returns {Observable}
  1124. function Observable:skipWhile(predicate)
  1125. predicate = predicate or util.identity
  1126. return Observable.create(function(observer)
  1127. local skipping = true
  1128. local function onNext(...)
  1129. if skipping then
  1130. util.tryWithObserver(observer, function(...)
  1131. skipping = predicate(...)
  1132. end, ...)
  1133. end
  1134. if not skipping then
  1135. return observer:onNext(...)
  1136. end
  1137. end
  1138. local function onError(message)
  1139. return observer:onError(message)
  1140. end
  1141. local function onCompleted()
  1142. return observer:onCompleted()
  1143. end
  1144. return self:subscribe(onNext, onError, onCompleted)
  1145. end)
  1146. end
  1147. --- Returns a new Observable that produces the specified values followed by all elements produced by
  1148. -- the source Observable.
  1149. -- @arg {*...} values - The values to produce before the Observable begins producing values
  1150. -- normally.
  1151. -- @returns {Observable}
  1152. function Observable:startWith(...)
  1153. local values = util.pack(...)
  1154. return Observable.create(function(observer)
  1155. observer:onNext(util.unpack(values))
  1156. return self:subscribe(observer)
  1157. end)
  1158. end
  1159. --- Returns an Observable that produces a single value representing the sum of the values produced
  1160. -- by the original.
  1161. -- @returns {Observable}
  1162. function Observable:sum()
  1163. return self:reduce(function(x, y) return x + y end, 0)
  1164. end
  1165. --- Given an Observable that produces Observables, returns an Observable that produces the values
  1166. -- produced by the most recently produced Observable.
  1167. -- @returns {Observable}
  1168. function Observable:switch()
  1169. return Observable.create(function(observer)
  1170. local subscription
  1171. local function onNext(...)
  1172. return observer:onNext(...)
  1173. end
  1174. local function onError(message)
  1175. return observer:onError(message)
  1176. end
  1177. local function onCompleted()
  1178. return observer:onCompleted()
  1179. end
  1180. local function switch(source)
  1181. if subscription then
  1182. subscription:unsubscribe()
  1183. end
  1184. subscription = source:subscribe(onNext, onError, nil)
  1185. end
  1186. return self:subscribe(switch, onError, onCompleted)
  1187. end)
  1188. end
  1189. --- Returns a new Observable that only produces the first n results of the original.
  1190. -- @arg {number=1} n - The number of elements to produce before completing.
  1191. -- @returns {Observable}
  1192. function Observable:take(n)
  1193. n = n or 1
  1194. return Observable.create(function(observer)
  1195. if n <= 0 then
  1196. observer:onCompleted()
  1197. return
  1198. end
  1199. local i = 1
  1200. local function onNext(...)
  1201. observer:onNext(...)
  1202. i = i + 1
  1203. if i > n then
  1204. observer:onCompleted()
  1205. end
  1206. end
  1207. local function onError(e)
  1208. return observer:onError(e)
  1209. end
  1210. local function onCompleted()
  1211. return observer:onCompleted()
  1212. end
  1213. return self:subscribe(onNext, onError, onCompleted)
  1214. end)
  1215. end
  1216. --- Returns an Observable that produces a specified number of elements from the end of a source
  1217. -- Observable.
  1218. -- @arg {number} count - The number of elements to produce.
  1219. -- @returns {Observable}
  1220. function Observable:takeLast(count)
  1221. return Observable.create(function(observer)
  1222. local buffer = {}
  1223. local function onNext(...)
  1224. table.insert(buffer, util.pack(...))
  1225. if #buffer > count then
  1226. table.remove(buffer, 1)
  1227. end
  1228. end
  1229. local function onError(message)
  1230. return observer:onError(message)
  1231. end
  1232. local function onCompleted()
  1233. for i = 1, #buffer do
  1234. observer:onNext(util.unpack(buffer[i]))
  1235. end
  1236. return observer:onCompleted()
  1237. end
  1238. return self:subscribe(onNext, onError, onCompleted)
  1239. end)
  1240. end
  1241. --- Returns a new Observable that completes when the specified Observable fires.
  1242. -- @arg {Observable} other - The Observable that triggers completion of the original.
  1243. -- @returns {Observable}
  1244. function Observable:takeUntil(other)
  1245. return Observable.create(function(observer)
  1246. local function onNext(...)
  1247. return observer:onNext(...)
  1248. end
  1249. local function onError(e)
  1250. return observer:onError(e)
  1251. end
  1252. local function onCompleted()
  1253. return observer:onCompleted()
  1254. end
  1255. other:subscribe(onCompleted, onCompleted, onCompleted)
  1256. return self:subscribe(onNext, onError, onCompleted)
  1257. end)
  1258. end
  1259. --- Returns a new Observable that produces elements until the predicate returns falsy.
  1260. -- @arg {function} predicate - The predicate used to continue production of values.
  1261. -- @returns {Observable}
  1262. function Observable:takeWhile(predicate)
  1263. predicate = predicate or util.identity
  1264. return Observable.create(function(observer)
  1265. local taking = true
  1266. local function onNext(...)
  1267. if taking then
  1268. util.tryWithObserver(observer, function(...)
  1269. taking = predicate(...)
  1270. end, ...)
  1271. if taking then
  1272. return observer:onNext(...)
  1273. else
  1274. return observer:onCompleted()
  1275. end
  1276. end
  1277. end
  1278. local function onError(message)
  1279. return observer:onError(message)
  1280. end
  1281. local function onCompleted()
  1282. return observer:onCompleted()
  1283. end
  1284. return self:subscribe(onNext, onError, onCompleted)
  1285. end)
  1286. end
  1287. --- Runs a function each time this Observable has activity. Similar to subscribe but does not
  1288. -- create a subscription.
  1289. -- @arg {function=} onNext - Run when the Observable produces values.
  1290. -- @arg {function=} onError - Run when the Observable encounters a problem.
  1291. -- @arg {function=} onCompleted - Run when the Observable completes.
  1292. -- @returns {Observable}
  1293. function Observable:tap(_onNext, _onError, _onCompleted)
  1294. _onNext = _onNext or util.noop
  1295. _onError = _onError or util.noop
  1296. _onCompleted = _onCompleted or util.noop
  1297. return Observable.create(function(observer)
  1298. local function onNext(...)
  1299. util.tryWithObserver(observer, function(...)
  1300. _onNext(...)
  1301. end, ...)
  1302. return observer:onNext(...)
  1303. end
  1304. local function onError(message)
  1305. util.tryWithObserver(observer, function()
  1306. _onError(message)
  1307. end)
  1308. return observer:onError(message)
  1309. end
  1310. local function onCompleted()
  1311. util.tryWithObserver(observer, function()
  1312. _onCompleted()
  1313. end)
  1314. return observer:onCompleted()
  1315. end
  1316. return self:subscribe(onNext, onError, onCompleted)
  1317. end)
  1318. end
  1319. --- Returns an Observable that unpacks the tables produced by the original.
  1320. -- @returns {Observable}
  1321. function Observable:unpack()
  1322. return self:map(util.unpack)
  1323. end
  1324. --- Returns an Observable that takes any values produced by the original that consist of multiple
  1325. -- return values and produces each value individually.
  1326. -- @returns {Observable}
  1327. function Observable:unwrap()
  1328. return Observable.create(function(observer)
  1329. local function onNext(...)
  1330. local values = {...}
  1331. for i = 1, #values do
  1332. observer:onNext(values[i])
  1333. end
  1334. end
  1335. local function onError(message)
  1336. return observer:onError(message)
  1337. end
  1338. local function onCompleted()
  1339. return observer:onCompleted()
  1340. end
  1341. return self:subscribe(onNext, onError, onCompleted)
  1342. end)
  1343. end
  1344. --- Returns an Observable that produces a sliding window of the values produced by the original.
  1345. -- @arg {number} size - The size of the window. The returned observable will produce this number
  1346. -- of the most recent values as multiple arguments to onNext.
  1347. -- @returns {Observable}
  1348. function Observable:window(size)
  1349. return Observable.create(function(observer)
  1350. local window = {}
  1351. local function onNext(value)
  1352. table.insert(window, value)
  1353. if #window >= size then
  1354. observer:onNext(util.unpack(window))
  1355. table.remove(window, 1)
  1356. end
  1357. end
  1358. local function onError(message)
  1359. return observer:onError(message)
  1360. end
  1361. local function onCompleted()
  1362. return observer:onCompleted()
  1363. end
  1364. return self:subscribe(onNext, onError, onCompleted)
  1365. end)
  1366. end
  1367. --- Returns an Observable that produces values from the original along with the most recently
  1368. -- produced value from all other specified Observables. Note that only the first argument from each
  1369. -- source Observable is used.
  1370. -- @arg {Observable...} sources - The Observables to include the most recent values from.
  1371. -- @returns {Observable}
  1372. function Observable:with(...)
  1373. local sources = {...}
  1374. return Observable.create(function(observer)
  1375. local latest = setmetatable({}, {__len = util.constant(#sources)})
  1376. local function setLatest(i)
  1377. return function(value)
  1378. latest[i] = value
  1379. end
  1380. end
  1381. local function onNext(value)
  1382. return observer:onNext(value, util.unpack(latest))
  1383. end
  1384. local function onError(e)
  1385. return observer:onError(e)
  1386. end
  1387. local function onCompleted()
  1388. return observer:onCompleted()
  1389. end
  1390. for i = 1, #sources do
  1391. sources[i]:subscribe(setLatest(i), util.noop, util.noop)
  1392. end
  1393. return self:subscribe(onNext, onError, onCompleted)
  1394. end)
  1395. end
  1396. --- Returns an Observable that merges the values produced by the source Observables by grouping them
  1397. -- by their index. The first onNext event contains the first value of all of the sources, the
  1398. -- second onNext event contains the second value of all of the sources, and so on. onNext is called
  1399. -- a number of times equal to the number of values produced by the Observable that produces the
  1400. -- fewest number of values.
  1401. -- @arg {Observable...} sources - The Observables to zip.
  1402. -- @returns {Observable}
  1403. function Observable.zip(...)
  1404. local sources = util.pack(...)
  1405. local count = #sources
  1406. return Observable.create(function(observer)
  1407. local values = {}
  1408. local active = {}
  1409. for i = 1, count do
  1410. values[i] = {n = 0}
  1411. active[i] = true
  1412. end
  1413. local function onNext(i)
  1414. return function(value)
  1415. table.insert(values[i], value)
  1416. values[i].n = values[i].n + 1
  1417. local ready = true
  1418. for i = 1, count do
  1419. if values[i].n == 0 then
  1420. ready = false
  1421. break
  1422. end
  1423. end
  1424. if ready then
  1425. local payload = {}
  1426. for i = 1, count do
  1427. payload[i] = table.remove(values[i], 1)
  1428. values[i].n = values[i].n - 1
  1429. end
  1430. observer:onNext(util.unpack(payload))
  1431. end
  1432. end
  1433. end
  1434. local function onError(message)
  1435. return observer:onError(message)
  1436. end
  1437. local function onCompleted(i)
  1438. return function()
  1439. active[i] = nil
  1440. if not next(active) or values[i].n == 0 then
  1441. return observer:onCompleted()
  1442. end
  1443. end
  1444. end
  1445. for i = 1, count do
  1446. sources[i]:subscribe(onNext(i), onError, onCompleted(i))
  1447. end
  1448. end)
  1449. end
  1450. --- @class ImmediateScheduler
  1451. -- @description Schedules Observables by running all operations immediately.
  1452. local ImmediateScheduler = {}
  1453. ImmediateScheduler.__index = ImmediateScheduler
  1454. ImmediateScheduler.__tostring = util.constant('ImmediateScheduler')
  1455. --- Creates a new ImmediateScheduler.
  1456. -- @returns {ImmediateScheduler}
  1457. function ImmediateScheduler.create()
  1458. return setmetatable({}, ImmediateScheduler)
  1459. end
  1460. --- Schedules a function to be run on the scheduler. It is executed immediately.
  1461. -- @arg {function} action - The function to execute.
  1462. function ImmediateScheduler:schedule(action)
  1463. action()
  1464. end
  1465. --- @class CooperativeScheduler
  1466. -- @description Manages Observables using coroutines and a virtual clock that must be updated
  1467. -- manually.
  1468. local CooperativeScheduler = {}
  1469. CooperativeScheduler.__index = CooperativeScheduler
  1470. CooperativeScheduler.__tostring = util.constant('CooperativeScheduler')
  1471. --- Creates a new CooperativeScheduler.
  1472. -- @arg {number=0} currentTime - A time to start the scheduler at.
  1473. -- @returns {CooperativeScheduler}
  1474. function CooperativeScheduler.create(currentTime)
  1475. local self = {
  1476. tasks = {},
  1477. currentTime = currentTime or 0
  1478. }
  1479. return setmetatable(self, CooperativeScheduler)
  1480. end
  1481. --- Schedules a function to be run after an optional delay. Returns a subscription that will stop
  1482. -- the action from running.
  1483. -- @arg {function} action - The function to execute. Will be converted into a coroutine. The
  1484. -- coroutine may yield execution back to the scheduler with an optional
  1485. -- number, which will put it to sleep for a time period.
  1486. -- @arg {number=0} delay - Delay execution of the action by a virtual time period.
  1487. -- @returns {Subscription}
  1488. function CooperativeScheduler:schedule(action, delay)
  1489. local task = {
  1490. thread = coroutine.create(action),
  1491. due = self.currentTime + (delay or 0)
  1492. }
  1493. table.insert(self.tasks, task)
  1494. return Subscription.create(function()
  1495. return self:unschedule(task)
  1496. end)
  1497. end
  1498. function CooperativeScheduler:unschedule(task)
  1499. for i = 1, #self.tasks do
  1500. if self.tasks[i] == task then
  1501. table.remove(self.tasks, i)
  1502. end
  1503. end
  1504. end
  1505. --- Triggers an update of the CooperativeScheduler. The clock will be advanced and the scheduler
  1506. -- will run any coroutines that are due to be run.
  1507. -- @arg {number=0} delta - An amount of time to advance the clock by. It is common to pass in the
  1508. -- time in seconds or milliseconds elapsed since this function was last
  1509. -- called.
  1510. function CooperativeScheduler:update(delta)
  1511. self.currentTime = self.currentTime + (delta or 0)
  1512. for i = #self.tasks, 1, -1 do
  1513. local task = self.tasks[i]
  1514. if self.currentTime >= task.due then
  1515. local success, delay = coroutine.resume(task.thread)
  1516. if success then
  1517. task.due = math.max(task.due + (delay or 0), self.currentTime)
  1518. else
  1519. error(delay)
  1520. end
  1521. if coroutine.status(task.thread) == 'dead' then
  1522. table.remove(self.tasks, i)
  1523. end
  1524. end
  1525. end
  1526. end
  1527. --- Returns whether or not the CooperativeScheduler's queue is empty.
  1528. function CooperativeScheduler:isEmpty()
  1529. return not next(self.tasks)
  1530. end
  1531. --- @class TimeoutScheduler
  1532. -- @description A scheduler that uses luvit's timer library to schedule events on an event loop.
  1533. local TimeoutScheduler = {}
  1534. TimeoutScheduler.__index = TimeoutScheduler
  1535. TimeoutScheduler.__tostring = util.constant('TimeoutScheduler')
  1536. --- Creates a new TimeoutScheduler.
  1537. -- @returns {TimeoutScheduler}
  1538. function TimeoutScheduler.create()
  1539. return setmetatable({}, TimeoutScheduler)
  1540. end
  1541. --- Schedules an action to run at a future point in time.
  1542. -- @arg {function} action - The action to run.
  1543. -- @arg {number=0} delay - The delay, in milliseconds.
  1544. -- @returns {Subscription}
  1545. function TimeoutScheduler:schedule(action, delay, ...)
  1546. local timer = require 'timer'
  1547. local subscription
  1548. local handle = timer.setTimeout(delay, action, ...)
  1549. return Subscription.create(function()
  1550. timer.clearTimeout(handle)
  1551. end)
  1552. end
  1553. --- @class Subject
  1554. -- @description Subjects function both as an Observer and as an Observable. Subjects inherit all
  1555. -- Observable functions, including subscribe. Values can also be pushed to the Subject, which will
  1556. -- be broadcasted to any subscribed Observers.
  1557. local Subject = setmetatable({}, Observable)
  1558. Subject.__index = Subject
  1559. Subject.__tostring = util.constant('Subject')
  1560. --- Creates a new Subject.
  1561. -- @returns {Subject}
  1562. function Subject.create()
  1563. local self = {
  1564. observers = {},
  1565. stopped = false
  1566. }
  1567. return setmetatable(self, Subject)
  1568. end
  1569. --- Creates a new Observer and attaches it to the Subject.
  1570. -- @arg {function|table} onNext|observer - A function called when the Subject produces a value or
  1571. -- an existing Observer to attach to the Subject.
  1572. -- @arg {function} onError - Called when the Subject terminates due to an error.
  1573. -- @arg {function} onCompleted - Called when the Subject completes normally.
  1574. function Subject:subscribe(onNext, onError, onCompleted)
  1575. local observer
  1576. if util.isa(onNext, Observer) then
  1577. observer = onNext
  1578. else
  1579. observer = Observer.create(onNext, onError, onCompleted)
  1580. end
  1581. table.insert(self.observers, observer)
  1582. return Subscription.create(function()
  1583. for i = 1, #self.observers do
  1584. if self.observers[i] == observer then
  1585. table.remove(self.observers, i)
  1586. return
  1587. end
  1588. end
  1589. end)
  1590. end
  1591. --- Pushes zero or more values to the Subject. They will be broadcasted to all Observers.
  1592. -- @arg {*...} values
  1593. function Subject:onNext(...)
  1594. if not self.stopped then
  1595. for i = 1, #self.observers do
  1596. if self.observers[i] and self.observers[i].onNext then self.observers[i]:onNext(...) end
  1597. end
  1598. end
  1599. end
  1600. --- Signal to all Observers that an error has occurred.
  1601. -- @arg {string=} message - A string describing what went wrong.
  1602. function Subject:onError(message)
  1603. if not self.stopped then
  1604. for i = 1, #self.observers do
  1605. self.observers[i]:onError(message)
  1606. end
  1607. self.stopped = true
  1608. end
  1609. end
  1610. --- Signal to all Observers that the Subject will not produce any more values.
  1611. function Subject:onCompleted()
  1612. if not self.stopped then
  1613. for i = 1, #self.observers do
  1614. self.observers[i]:onCompleted()
  1615. end
  1616. self.stopped = true
  1617. end
  1618. end
  1619. Subject.__call = Subject.onNext
  1620. --- @class AsyncSubject
  1621. -- @description AsyncSubjects are subjects that produce either no values or a single value. If
  1622. -- multiple values are produced via onNext, only the last one is used. If onError is called, then
  1623. -- no value is produced and onError is called on any subscribed Observers. If an Observer
  1624. -- subscribes and the AsyncSubject has already terminated, the Observer will immediately receive the
  1625. -- value or the error.
  1626. local AsyncSubject = setmetatable({}, Observable)
  1627. AsyncSubject.__index = AsyncSubject
  1628. AsyncSubject.__tostring = util.constant('AsyncSubject')
  1629. --- Creates a new AsyncSubject.
  1630. -- @returns {AsyncSubject}
  1631. function AsyncSubject.create()
  1632. local self = {
  1633. observers = {},
  1634. stopped = false,
  1635. value = nil,
  1636. errorMessage = nil
  1637. }
  1638. return setmetatable(self, AsyncSubject)
  1639. end
  1640. --- Creates a new Observer and attaches it to the AsyncSubject.
  1641. -- @arg {function|table} onNext|observer - A function called when the AsyncSubject produces a value
  1642. -- or an existing Observer to attach to the AsyncSubject.
  1643. -- @arg {function} onError - Called when the AsyncSubject terminates due to an error.
  1644. -- @arg {function} onCompleted - Called when the AsyncSubject completes normally.
  1645. function AsyncSubject:subscribe(onNext, onError, onCompleted)
  1646. local observer
  1647. if util.isa(onNext, Observer) then
  1648. observer = onNext
  1649. else
  1650. observer = Observer.create(onNext, onError, onCompleted)
  1651. end
  1652. if self.value then
  1653. observer:onNext(util.unpack(self.value))
  1654. observer:onCompleted()
  1655. return
  1656. elseif self.errorMessage then
  1657. observer:onError(self.errorMessage)
  1658. return
  1659. end
  1660. table.insert(self.observers, observer)
  1661. return Subscription.create(function()
  1662. for i = 1, #self.observers do
  1663. if self.observers[i] == observer then
  1664. table.remove(self.observers, i)
  1665. return
  1666. end
  1667. end
  1668. end)
  1669. end
  1670. --- Pushes zero or more values to the AsyncSubject.
  1671. -- @arg {*...} values
  1672. function AsyncSubject:onNext(...)
  1673. if not self.stopped then
  1674. self.value = util.pack(...)
  1675. end
  1676. end
  1677. --- Signal to all Observers that an error has occurred.
  1678. -- @arg {string=} message - A string describing what went wrong.
  1679. function AsyncSubject:onError(message)
  1680. if not self.stopped then
  1681. self.errorMessage = message
  1682. for i = 1, #self.observers do
  1683. self.observers[i]:onError(self.errorMessage)
  1684. end
  1685. self.stopped = true
  1686. end
  1687. end
  1688. --- Signal to all Observers that the AsyncSubject will not produce any more values.
  1689. function AsyncSubject:onCompleted()
  1690. if not self.stopped then
  1691. for i = 1, #self.observers do
  1692. if self.value then
  1693. self.observers[i]:onNext(util.unpack(self.value))
  1694. end
  1695. self.observers[i]:onCompleted()
  1696. end
  1697. self.stopped = true
  1698. end
  1699. end
  1700. AsyncSubject.__call = AsyncSubject.onNext
  1701. --- @class BehaviorSubject
  1702. -- @description A Subject that tracks its current value. Provides an accessor to retrieve the most
  1703. -- recent pushed value, and all subscribers immediately receive the latest value.
  1704. local BehaviorSubject = setmetatable({}, Subject)
  1705. BehaviorSubject.__index = BehaviorSubject
  1706. BehaviorSubject.__tostring = util.constant('BehaviorSubject')
  1707. --- Creates a new BehaviorSubject.
  1708. -- @arg {*...} value - The initial values.
  1709. -- @returns {BehaviorSubject}
  1710. function BehaviorSubject.create(...)
  1711. local self = {
  1712. observers = {},
  1713. stopped = false
  1714. }
  1715. if select('#', ...) > 0 then
  1716. self.value = util.pack(...)
  1717. end
  1718. return setmetatable(self, BehaviorSubject)
  1719. end
  1720. --- Creates a new Observer and attaches it to the BehaviorSubject. Immediately broadcasts the most
  1721. -- recent value to the Observer.
  1722. -- @arg {function} onNext - Called when the BehaviorSubject produces a value.
  1723. -- @arg {function} onError - Called when the BehaviorSubject terminates due to an error.
  1724. -- @arg {function} onCompleted - Called when the BehaviorSubject completes normally.
  1725. function BehaviorSubject:subscribe(onNext, onError, onCompleted)
  1726. local observer
  1727. if util.isa(onNext, Observer) then
  1728. observer = onNext
  1729. else
  1730. observer = Observer.create(onNext, onError, onCompleted)
  1731. end
  1732. local subscription = Subject.subscribe(self, observer)
  1733. if self.value then
  1734. observer:onNext(util.unpack(self.value))
  1735. end
  1736. return subscription
  1737. end
  1738. --- Pushes zero or more values to the BehaviorSubject. They will be broadcasted to all Observers.
  1739. -- @arg {*...} values
  1740. function BehaviorSubject:onNext(...)
  1741. self.value = util.pack(...)
  1742. return Subject.onNext(self, ...)
  1743. end
  1744. --- Returns the last value emitted by the BehaviorSubject, or the initial value passed to the
  1745. -- constructor if nothing has been emitted yet.
  1746. -- @returns {*...}
  1747. function BehaviorSubject:getValue()
  1748. if self.value ~= nil then
  1749. return util.unpack(self.value)
  1750. end
  1751. end
  1752. BehaviorSubject.__call = BehaviorSubject.onNext
  1753. --- @class ReplaySubject
  1754. -- @description A Subject that provides new Subscribers with some or all of the most recently
  1755. -- produced values upon subscription.
  1756. local ReplaySubject = setmetatable({}, Subject)
  1757. ReplaySubject.__index = ReplaySubject
  1758. ReplaySubject.__tostring = util.constant('ReplaySubject')
  1759. --- Creates a new ReplaySubject.
  1760. -- @arg {number=} bufferSize - The number of values to send to new subscribers. If nil, an infinite
  1761. -- buffer is used (note that this could lead to memory issues).
  1762. -- @returns {ReplaySubject}
  1763. function ReplaySubject.create(n)
  1764. local self = {
  1765. observers = {},
  1766. stopped = false,
  1767. buffer = {},
  1768. bufferSize = n
  1769. }
  1770. return setmetatable(self, ReplaySubject)
  1771. end
  1772. --- Creates a new Observer and attaches it to the ReplaySubject. Immediately broadcasts the most
  1773. -- contents of the buffer to the Observer.
  1774. -- @arg {function} onNext - Called when the ReplaySubject produces a value.
  1775. -- @arg {function} onError - Called when the ReplaySubject terminates due to an error.
  1776. -- @arg {function} onCompleted - Called when the ReplaySubject completes normally.
  1777. function ReplaySubject:subscribe(onNext, onError, onCompleted)
  1778. local observer
  1779. if util.isa(onNext, Observer) then
  1780. observer = onNext
  1781. else
  1782. observer = Observer.create(onNext, onError, onCompleted)
  1783. end
  1784. local subscription = Subject.subscribe(self, observer)
  1785. for i = 1, #self.buffer do
  1786. observer:onNext(util.unpack(self.buffer[i]))
  1787. end
  1788. return subscription
  1789. end
  1790. --- Pushes zero or more values to the ReplaySubject. They will be broadcasted to all Observers.
  1791. -- @arg {*...} values
  1792. function ReplaySubject:onNext(...)
  1793. table.insert(self.buffer, util.pack(...))
  1794. if self.bufferSize and #self.buffer > self.bufferSize then
  1795. table.remove(self.buffer, 1)
  1796. end
  1797. return Subject.onNext(self, ...)
  1798. end
  1799. ReplaySubject.__call = ReplaySubject.onNext
  1800. Observable.wrap = Observable.buffer
  1801. Observable['repeat'] = Observable.replicate
  1802. return {
  1803. util = util,
  1804. Subscription = Subscription,
  1805. Observer = Observer,
  1806. Observable = Observable,
  1807. ImmediateScheduler = ImmediateScheduler,
  1808. CooperativeScheduler = CooperativeScheduler,
  1809. TimeoutScheduler = TimeoutScheduler,
  1810. Subject = Subject,
  1811. AsyncSubject = AsyncSubject,
  1812. BehaviorSubject = BehaviorSubject,
  1813. ReplaySubject = ReplaySubject
  1814. }