bjorn 9 роки тому
батько
коміт
e4cad53294
6 змінених файлів з 198 додано та 42 видалено
  1. 10 10
      doc/README.md
  2. 96 32
      rx.lua
  3. 66 0
      src/operators/zip.lua
  4. 1 0
      tests/observable.lua
  5. 24 0
      tests/zip.lua
  6. 1 0
      tools/concat.lua

+ 10 - 10
doc/README.md

@@ -56,9 +56,9 @@ RxLua
   - [retry](#retrycount)
   - [scan](#scanaccumulator-seed)
   - [skip](#skipn)
+  - [skipLast](#skiplastcount)
   - [skipUntil](#skipuntilother)
   - [skipWhile](#skipwhilepredicate)
-  - [skipLast](#skiplastcount)
   - [startWith](#startwithvalues)
   - [sum](#sum)
   - [switch](#switch)
@@ -578,33 +578,33 @@ Returns a new Observable that skips over a specified number of values produced b
 
 ---
 
-#### `:skipUntil(other)`
+#### `:skipLast(count)`
 
-Returns a new Observable that skips over values produced by the original until the specified Observable produces a value.
+Returns an Observable that omits a specified number of values from the end of the original Observable.
 
 | Name | Type | Default | Description |
 |------|------|---------|-------------|
-| `other` | Observable |  | The Observable that triggers the production of values. |
+| `count` | number |  | The number of items to omit from the end. |
 
 ---
 
-#### `:skipWhile(predicate)`
+#### `:skipUntil(other)`
 
-Returns a new Observable that skips elements until the predicate returns falsy for one of them.
+Returns a new Observable that skips over values produced by the original until the specified Observable produces a value.
 
 | Name | Type | Default | Description |
 |------|------|---------|-------------|
-| `predicate` | function |  | The predicate used to continue skipping values. |
+| `other` | Observable |  | The Observable that triggers the production of values. |
 
 ---
 
-#### `:skipLast(count)`
+#### `:skipWhile(predicate)`
 
-Returns an Observable that omits a specified number of values from the end of the original Observable.
+Returns a new Observable that skips elements until the predicate returns falsy for one of them.
 
 | Name | Type | Default | Description |
 |------|------|---------|-------------|
-| `count` | number |  | The number of items to omit from the end. |
+| `predicate` | function |  | The predicate used to continue skipping values. |
 
 ---
 

+ 96 - 32
rx.lua

@@ -1151,6 +1151,38 @@ function Observable:skip(n)
   end)
 end
 
+--- Returns an Observable that omits a specified number of values from the end of the original
+-- Observable.
+-- @arg {number} count - The number of items to omit from the end.
+-- @returns {Observable}
+function Observable:skipLast(count)
+  local buffer = {}
+  return Observable.create(function(observer)
+    local function emit()
+      if #buffer > count and buffer[1] then
+        local values = table.remove(buffer, 1)
+        observer:onNext(util.unpack(values))
+      end
+    end
+
+    local function onNext(...)
+      emit()
+      table.insert(buffer, util.pack(...))
+    end
+
+    local function onError(message)
+      return observer:onError(message)
+    end
+
+    local function onCompleted()
+      emit()
+      return observer:onCompleted()
+    end
+
+    return self:subscribe(onNext, onError, onCompleted)
+  end)
+end
+
 --- Returns a new Observable that skips over values produced by the original until the specified
 -- Observable produces a value.
 -- @arg {Observable} other - The Observable that triggers the production of values.
@@ -1217,38 +1249,6 @@ function Observable:skipWhile(predicate)
   end)
 end
 
---- Returns an Observable that omits a specified number of values from the end of the original
--- Observable.
--- @arg {number} count - The number of items to omit from the end.
--- @returns {Observable}
-function Observable:skipLast(count)
-  local buffer = {}
-  return Observable.create(function(observer)
-    local function emit()
-      if #buffer > count and buffer[1] then
-        local values = table.remove(buffer, 1)
-        observer:onNext(util.unpack(values))
-      end
-    end
-
-    local function onNext(...)
-      emit()
-      table.insert(buffer, util.pack(...))
-    end
-
-    local function onError(message)
-      return observer:onError(message)
-    end
-
-    local function onCompleted()
-      emit()
-      return observer:onCompleted()
-    end
-
-    return self:subscribe(onNext, onError, onCompleted)
-  end)
-end
-
 --- Returns a new Observable that produces the specified values followed by all elements produced by
 -- the source Observable.
 -- @arg {*...} values - The values to produce before the Observable begins producing values
@@ -1549,6 +1549,70 @@ function Observable:with(...)
   end)
 end
 
+--- Returns an Observable that merges the values produced by the source Observables by grouping them
+-- by their index.  The first onNext event contains the first value of all of the sources, the
+-- second onNext event contains the second value of all of the sources, and so on.  onNext is called
+-- a number of times equal to the number of values produced by the Observable that produces the
+-- fewest number of values.
+-- @arg {Observable...} sources - The Observables to zip.
+-- @returns {Observable}
+function Observable.zip(...)
+  local sources = util.pack(...)
+  local count = #sources
+
+  return Observable.create(function(observer)
+    local values = {}
+    local active = {}
+    for i = 1, count do
+      values[i] = {n = 0}
+      active[i] = true
+    end
+
+    local function onNext(i)
+      return function(value)
+        table.insert(values[i], value)
+        values[i].n = values[i].n + 1
+
+        local ready = true
+        for i = 1, count do
+          if values[i].n == 0 then
+            ready = false
+            break
+          end
+        end
+
+        if ready then
+          local payload = {}
+
+          for i = 1, count do
+            payload[i] = table.remove(values[i], 1)
+            values[i].n = values[i].n - 1
+          end
+
+          observer:onNext(util.unpack(payload))
+        end
+      end
+    end
+
+    local function onError(message)
+      return observer:onError(message)
+    end
+
+    local function onCompleted(i)
+      return function()
+        active[i] = nil
+        if not next(active) or values[i].n == 0 then
+          return observer:onCompleted()
+        end
+      end
+    end
+
+    for i = 1, count do
+      sources[i]:subscribe(onNext(i), onError, onCompleted(i))
+    end
+  end)
+end
+
 --- @class ImmediateScheduler
 -- @description Schedules Observables by running all operations immediately.
 local ImmediateScheduler = {}

+ 66 - 0
src/operators/zip.lua

@@ -0,0 +1,66 @@
+local Observable = require 'observable'
+local util = require 'util'
+
+--- Returns an Observable that merges the values produced by the source Observables by grouping them
+-- by their index.  The first onNext event contains the first value of all of the sources, the
+-- second onNext event contains the second value of all of the sources, and so on.  onNext is called
+-- a number of times equal to the number of values produced by the Observable that produces the
+-- fewest number of values.
+-- @arg {Observable...} sources - The Observables to zip.
+-- @returns {Observable}
+function Observable.zip(...)
+  local sources = util.pack(...)
+  local count = #sources
+
+  return Observable.create(function(observer)
+    local values = {}
+    local active = {}
+    for i = 1, count do
+      values[i] = {n = 0}
+      active[i] = true
+    end
+
+    local function onNext(i)
+      return function(value)
+        table.insert(values[i], value)
+        values[i].n = values[i].n + 1
+
+        local ready = true
+        for i = 1, count do
+          if values[i].n == 0 then
+            ready = false
+            break
+          end
+        end
+
+        if ready then
+          local payload = {}
+
+          for i = 1, count do
+            payload[i] = table.remove(values[i], 1)
+            values[i].n = values[i].n - 1
+          end
+
+          observer:onNext(util.unpack(payload))
+        end
+      end
+    end
+
+    local function onError(message)
+      return observer:onError(message)
+    end
+
+    local function onCompleted(i)
+      return function()
+        active[i] = nil
+        if not next(active) or values[i].n == 0 then
+          return observer:onCompleted()
+        end
+      end
+    end
+
+    for i = 1, count do
+      sources[i]:subscribe(onNext(i), onError, onCompleted(i))
+    end
+  end)
+end

+ 1 - 0
tests/observable.lua

@@ -277,4 +277,5 @@ describe('Observable', function()
   dofile('tests/unwrap.lua')
   dofile('tests/window.lua')
   dofile('tests/with.lua')
+  dofile('tests/zip.lua')
 end)

+ 24 - 0
tests/zip.lua

@@ -0,0 +1,24 @@
+describe('zip', function()
+  it('behaves as an identity function if only one Observable argument is specified', function()
+    expect(Rx.Observable.fromRange(1, 5):zip()).to.produce(1, 2, 3, 4, 5)
+  end)
+
+  it('groups values produced by the sources by their index', function()
+    local observableA = Rx.Observable.fromRange(1, 3)
+    local observableB = Rx.Observable.fromRange(2, 4)
+    local observableC = Rx.Observable.fromRange(3, 5)
+    expect(Rx.Observable.zip(observableA, observableB, observableC)).to.produce({{1, 2, 3}, {2, 3, 4}, {3, 4, 5}})
+  end)
+
+  it('tolerates nils', function()
+    local observableA = Rx.Observable.create(function(observer)
+      observer:onNext(nil)
+      observer:onNext(nil)
+      observer:onNext(nil)
+      observer:onCompleted()
+    end)
+    local observableB = Rx.Observable.fromRange(3)
+    local onNext = observableSpy(Rx.Observable.zip(observableA, observableB))
+    expect(#onNext).to.equal(3)
+  end)
+end)

+ 1 - 0
tools/concat.lua

@@ -55,6 +55,7 @@ local files = {
   'src/operators/unwrap.lua',
   'src/operators/window.lua',
   'src/operators/with.lua',
+  'src/operators/zip.lua',
   'src/schedulers/immediatescheduler.lua',
   'src/schedulers/cooperativescheduler.lua',
   'src/subjects/subject.lua',