Explorar o código

Observable.amb;

bjorn %!s(int64=9) %!d(string=hai) anos
pai
achega
cfe875978f
Modificáronse 6 ficheiros con 157 adicións e 0 borrados
  1. 11 0
      doc/README.md
  2. 58 0
      rx.lua
  3. 49 0
      src/observable.lua
  4. 10 0
      src/subjects/subject.lua
  5. 28 0
      tests/amb.lua
  6. 1 0
      tests/observable.lua

+ 11 - 0
doc/README.md

@@ -21,6 +21,7 @@ RxLua
   - [fromCoroutine](#fromcoroutinecoroutine)
   - [dump](#dumpname-formatter)
   - [all](#allpredicate)
+  - [amb](#ambobservables)
   - [combineLatest](#combinelatestobservables-combinator)
   - [compact](#compact)
   - [concat](#concatsources)
@@ -246,6 +247,16 @@ Determine whether all items emitted by an Observable meet some criteria.
 
 ---
 
+#### `.amb(observables)`
+
+Given a set of Observables, produces values from only the first one to produce a value.
+
+| Name | Type | Default | Description |
+|------|------|---------|-------------|
+| `observables` | Observable... |  |  |
+
+---
+
 #### `:combineLatest(observables, combinator)`
 
 Returns a new Observable that runs a combinator function on the most recent values from a set of Observables whenever any of them produce a new value. The results of the combinator function are produced by the new Observable.

+ 58 - 0
rx.lua

@@ -247,6 +247,55 @@ function Observable:all(predicate)
   end)
 end
 
+--- Given a set of Observables, produces values from only the first one to produce a value.
+-- @arg {Observable...} observables
+-- @returns {Observable}
+function Observable.amb(a, b, ...)
+  if not a or not b then return a end
+
+  return Observable.create(function(observer)
+    local subscriptionA, subscriptionB
+
+    local function onNextA(...)
+      if subscriptionB then subscriptionB:unsubscribe() end
+      observer:onNext(...)
+    end
+
+    local function onErrorA(e)
+      if subscriptionB then subscriptionB:unsubscribe() end
+      observer:onError(e)
+    end
+
+    local function onCompletedA()
+      if subscriptionB then subscriptionB:unsubscribe() end
+      observer:onCompleted()
+    end
+
+    local function onNextB(...)
+      if subscriptionA then subscriptionA:unsubscribe() end
+      observer:onNext(...)
+    end
+
+    local function onErrorB(e)
+      if subscriptionA then subscriptionA:unsubscribe() end
+      observer:onError(e)
+    end
+
+    local function onCompletedB()
+      if subscriptionA then subscriptionA:unsubscribe() end
+      observer:onCompleted()
+    end
+
+    subscriptionA = a:subscribe(onNextA, onErrorA, onCompletedA)
+    subscriptionB = b:subscribe(onNextB, onErrorB, onCompletedB)
+
+    return Subscription.create(function()
+      subscriptionA:unsubscribe()
+      subscriptionB:unsubscribe()
+    end)
+  end):amb(...)
+end
+
 --- Returns a new Observable that runs a combinator function on the most recent values from a set
 -- of Observables whenever any of them produce a new value. The results of the combinator function
 -- are produced by the new Observable.
@@ -1180,6 +1229,15 @@ function Subject:subscribe(onNext, onError, onCompleted)
   end
 
   table.insert(self.observers, observer)
+
+  return Subscription.create(function()
+    for i = 1, #self.observers do
+      if self.observers[i] == observer then
+        table.remove(self.observers, i)
+        return
+      end
+    end
+  end)
 end
 
 --- Pushes zero or more values to the Subject. They will be broadcasted to all Observers.

+ 49 - 0
src/observable.lua

@@ -162,6 +162,55 @@ function Observable:all(predicate)
   end)
 end
 
+--- Given a set of Observables, produces values from only the first one to produce a value.
+-- @arg {Observable...} observables
+-- @returns {Observable}
+function Observable.amb(a, b, ...)
+  if not a or not b then return a end
+
+  return Observable.create(function(observer)
+    local subscriptionA, subscriptionB
+
+    local function onNextA(...)
+      if subscriptionB then subscriptionB:unsubscribe() end
+      observer:onNext(...)
+    end
+
+    local function onErrorA(e)
+      if subscriptionB then subscriptionB:unsubscribe() end
+      observer:onError(e)
+    end
+
+    local function onCompletedA()
+      if subscriptionB then subscriptionB:unsubscribe() end
+      observer:onCompleted()
+    end
+
+    local function onNextB(...)
+      if subscriptionA then subscriptionA:unsubscribe() end
+      observer:onNext(...)
+    end
+
+    local function onErrorB(e)
+      if subscriptionA then subscriptionA:unsubscribe() end
+      observer:onError(e)
+    end
+
+    local function onCompletedB()
+      if subscriptionA then subscriptionA:unsubscribe() end
+      observer:onCompleted()
+    end
+
+    subscriptionA = a:subscribe(onNextA, onErrorA, onCompletedA)
+    subscriptionB = b:subscribe(onNextB, onErrorB, onCompletedB)
+
+    return Subscription.create(function()
+      subscriptionA:unsubscribe()
+      subscriptionB:unsubscribe()
+    end)
+  end):amb(...)
+end
+
 --- Returns a new Observable that runs a combinator function on the most recent values from a set
 -- of Observables whenever any of them produce a new value. The results of the combinator function
 -- are produced by the new Observable.

+ 10 - 0
src/subjects/subject.lua

@@ -1,4 +1,5 @@
 local Observable = require 'observable'
+local Subscription = require 'subscription'
 local util = require 'util'
 
 --- @class Subject
@@ -35,6 +36,15 @@ function Subject:subscribe(onNext, onError, onCompleted)
   end
 
   table.insert(self.observers, observer)
+
+  return Subscription.create(function()
+    for i = 1, #self.observers do
+      if self.observers[i] == observer then
+        table.remove(self.observers, i)
+        return
+      end
+    end
+  end)
 end
 
 --- Pushes zero or more values to the Subject. They will be broadcasted to all Observers.

+ 28 - 0
tests/amb.lua

@@ -0,0 +1,28 @@
+describe('amb', function()
+  it('returns nil if it is passed nil', function()
+    expect(Rx.Observable.amb()).to.equal(nil)
+  end)
+
+  it('returns the Observable unchanged if it is the only one supplied', function()
+    expect(Rx.Observable.amb(Rx.Observable.fromRange(3))).to.produce(1, 2, 3)
+  end)
+
+  it('produces values from the first Observable to produce a value', function()
+    local a = Rx.Subject.create()
+    local b = Rx.Subject.create()
+    local onNext = spy()
+    local observer = Rx.Observer.create(onNext)
+    local amb = a:amb(b):subscribe(observer)
+
+    b:onNext(4)
+    a:onNext(1)
+    b:onNext(5)
+    b:onNext(6)
+    b:onCompleted()
+    a:onNext(2)
+    a:onNext(3)
+    a:onCompleted()
+
+    expect(onNext).to.equal({{4}, {5}, {6}})
+  end)
+end)

+ 1 - 0
tests/observable.lua

@@ -178,6 +178,7 @@ describe('Observable', function()
   end)
 
   dofile('tests/all.lua')
+  dofile('tests/amb.lua')
   dofile('tests/combineLatest.lua')
   dofile('tests/compact.lua')
   dofile('tests/concat.lua')