bjorn 10 жил өмнө
parent
commit
f368443041
2 өөрчлөгдсөн 50 нэмэгдсэн , 3 устгасан
  1. 16 2
      doc/README.md
  2. 34 1
      rx.lua

+ 16 - 2
doc/README.md

@@ -160,13 +160,13 @@ Returns:
 
 ---
 
-#### `:concat(observables)`
+#### `:concat(sources)`
 
 Returns a new Observable that produces the values produced by all the specified Observables in the order they are specified.
 
 Arguments:
 
-- `observables` (`Observable...`) - The Observables to concatenate.
+- `sources` (`Observable...`) - The Observables to concatenate.
 
 Returns:
 
@@ -232,6 +232,20 @@ Returns:
 
 ---
 
+#### `:merge(sources)`
+
+Returns a new Observable that produces the values produced by all the specified Observables in the order they are produced.
+
+Arguments:
+
+- `sources` (`Observable...`) - One or more Observables to merge.
+
+Returns:
+
+- `Observable`
+
+---
+
 #### `:pluck(key)`
 
 Returns a new Observable that produces values computed by extracting the given key from the tables produced by the original.

+ 34 - 1
rx.lua

@@ -211,7 +211,7 @@ end
 
 --- Returns a new Observable that produces the values produced by all the specified Observables in
 -- the order they are specified.
--- @arg {Observable...} observables - The Observables to concatenate.
+-- @arg {Observable...} sources - The Observables to concatenate.
 -- @returns {Observable}
 function Observable:concat(other, ...)
   if not other then return self end
@@ -342,6 +342,39 @@ function Observable:map(callback)
   end)
 end
 
+--- Returns a new Observable that produces the values produced by all the specified Observables in
+-- the order they are produced.
+-- @arg {Observable...} sources - One or more Observables to merge.
+-- @returns {Observable}
+function Observable:merge(...)
+  local sources = {...}
+  table.insert(sources, 1, self)
+
+  return Observable.create(function(observer)
+    local function onNext(value)
+      return observer:onNext(value)
+    end
+
+    local function onError(message)
+      return observer:onError(message)
+    end
+
+    local function onComplete(i)
+      return function()
+        sources[i] = nil
+
+        if not next(sources) then
+          observer:onComplete()
+        end
+      end
+    end
+
+    for i = 1, #sources do
+      sources[i]:subscribe(onNext, onError, onComplete(i))
+    end
+  end)
+end
+
 --- Returns a new Observable that produces values computed by extracting the given key from the
 -- tables produced by the original.
 -- @arg {function} key - The key to extract from the table.