Bläddra i källkod

Observable.elementAt;

bjorn 9 år sedan
förälder
incheckning
241c4062af
6 ändrade filer med 115 tillägg och 0 borttagningar
  1. 11 0
      doc/README.md
  2. 33 0
      rx.lua
  3. 34 0
      src/operators/elementAt.lua
  4. 35 0
      tests/elementAt.lua
  5. 1 0
      tests/observable.lua
  6. 1 0
      tools/concat.lua

+ 11 - 0
doc/README.md

@@ -34,6 +34,7 @@ RxLua
   - [defaultIfEmpty](#defaultifemptyvalues)
   - [distinct](#distinct)
   - [distinctUntilChanged](#distinctuntilchangedcomparator)
+  - [elementAt](#elementatindex)
   - [filter](#filterpredicate)
   - [find](#findpredicate)
   - [first](#first)
@@ -373,6 +374,16 @@ Returns an Observable that only produces values from the original if they are di
 
 ---
 
+#### `:elementAt(index)`
+
+Returns an Observable that produces the nth element produced by the source Observable.
+
+| Name | Type | Default | Description |
+|------|------|---------|-------------|
+| `index` | number |  | The index of the item, with an index of 1 representing the first. |
+
+---
+
 #### `:filter(predicate)`
 
 Returns a new Observable that only produces values of the first that satisfy a predicate.

+ 33 - 0
rx.lua

@@ -656,6 +656,39 @@ function Observable:distinctUntilChanged(comparator)
   end)
 end
 
+--- Returns an Observable that produces the nth element produced by the source Observable.
+-- @arg {number} index - The index of the item, with an index of 1 representing the first.
+-- @returns {Observable}
+function Observable:elementAt(index)
+  return Observable.create(function(observer)
+    local subscription
+    local i = 1
+
+    local function onNext(...)
+      if i == index then
+        observer:onNext(...)
+        observer:onCompleted()
+        if subscription then
+          subscription:unsubscribe()
+        end
+      else
+        i = i + 1
+      end
+    end
+
+    local function onError(e)
+      return observer:onError(e)
+    end
+
+    local function onCompleted()
+      return observer:onCompleted()
+    end
+
+    subscription = self:subscribe(onNext, onError, onCompleted)
+    return subscription
+  end)
+end
+
 --- Returns a new Observable that only produces values of the first that satisfy a predicate.
 -- @arg {function} predicate - The predicate used to filter values.
 -- @returns {Observable}

+ 34 - 0
src/operators/elementAt.lua

@@ -0,0 +1,34 @@
+local Observable = require 'observable'
+
+--- Returns an Observable that produces the nth element produced by the source Observable.
+-- @arg {number} index - The index of the item, with an index of 1 representing the first.
+-- @returns {Observable}
+function Observable:elementAt(index)
+  return Observable.create(function(observer)
+    local subscription
+    local i = 1
+
+    local function onNext(...)
+      if i == index then
+        observer:onNext(...)
+        observer:onCompleted()
+        if subscription then
+          subscription:unsubscribe()
+        end
+      else
+        i = i + 1
+      end
+    end
+
+    local function onError(e)
+      return observer:onError(e)
+    end
+
+    local function onCompleted()
+      return observer:onCompleted()
+    end
+
+    subscription = self:subscribe(onNext, onError, onCompleted)
+    return subscription
+  end)
+end

+ 35 - 0
tests/elementAt.lua

@@ -0,0 +1,35 @@
+describe('elementAt', function()
+  it('errors when its parent errors', function()
+    expect(Rx.Observable.throw():elementAt(1).subscribe).to.fail()
+  end)
+
+  it('chains subscriptions', function()
+    local subscription = Rx.Subscription.create()
+    local observable = Rx.Observable.create(function() return subscription end)
+    expect(observable:subscribe()).to.equal(subscription)
+    expect(observable:elementAt(1):subscribe()).to.equal(subscription)
+  end)
+
+  it('errors if no index is specified', function()
+    expect(Rx.Observable.fromValue(1):elementAt().subscribe).to.fail()
+  end)
+
+  it('produces no values if the specified index is less than one', function()
+    expect(Rx.Observable.fromValue(1):elementAt(0)).to.produce.nothing()
+    expect(Rx.Observable.fromValue(1):elementAt(-1)).to.produce.nothing()
+  end)
+
+  it('produces no values if the specified index is greater than the number of elements produced by the source', function()
+    expect(Rx.Observable.fromValue(1):elementAt(2)).to.produce.nothing()
+  end)
+
+  it('produces all values produced by the source at the specified index', function()
+    local observable = Rx.Observable.create(function(observer)
+      observer:onNext(1, 2, 3)
+      observer:onNext(4, 5, 6)
+      observer:onCompleted()
+    end)
+
+    expect(observable:elementAt(2)).to.produce({{4, 5, 6}})
+  end)
+end)

+ 1 - 0
tests/observable.lua

@@ -226,6 +226,7 @@ describe('Observable', function()
   dofile('tests/defaultIfEmpty.lua')
   dofile('tests/distinct.lua')
   dofile('tests/distinctUntilChanged.lua')
+  dofile('tests/elementAt.lua')
   dofile('tests/filter.lua')
   dofile('tests/find.lua')
   dofile('tests/first.lua')

+ 1 - 0
tools/concat.lua

@@ -19,6 +19,7 @@ local files = {
   'src/operators/defaultIfEmpty.lua',
   'src/operators/distinct.lua',
   'src/operators/distinctUntilChanged.lua',
+  'src/operators/elementAt.lua',
   'src/operators/filter.lua',
   'src/operators/find.lua',
   'src/operators/first.lua',