Browse Source

Avoid shared coroutines in Observable.fromCoroutine;

Currently, if a function is passed to Observable.fromCoroutine, a
coroutine is created and it is shared among all Observers.  Instead,
use a fresh coroutine for each Observer so all Observers receive the
same values.

When creating an Observable using an existing coroutine, this isn't
possible because the Observable is stateful (calling coroutine.resume
won't always yield the same values).
bjorn 8 years ago
parent
commit
166e61bb5e
4 changed files with 49 additions and 9 deletions
  1. 3 3
      doc/README.md
  2. 6 3
      rx.lua
  3. 6 3
      src/observable.lua
  4. 34 0
      tests/observable.lua

+ 3 - 3
doc/README.md

@@ -18,7 +18,7 @@ RxLua
   - [of](#ofvalues)
   - [of](#ofvalues)
   - [fromRange](#fromrangeinitial-limit-step)
   - [fromRange](#fromrangeinitial-limit-step)
   - [fromTable](#fromtabletable-iterator-keys)
   - [fromTable](#fromtabletable-iterator-keys)
-  - [fromCoroutine](#fromcoroutinecoroutine)
+  - [fromCoroutine](#fromcoroutinefn)
   - [fromFileByLine](#fromfilebylinefilename)
   - [fromFileByLine](#fromfilebylinefilename)
   - [defer](#deferfactory)
   - [defer](#deferfactory)
   - [replicate](#replicatevalue-count)
   - [replicate](#replicatevalue-count)
@@ -250,13 +250,13 @@ Creates an Observable that produces values from a table.
 
 
 ---
 ---
 
 
-#### `.fromCoroutine(coroutine)`
+#### `.fromCoroutine(fn)`
 
 
 Creates an Observable that produces values when the specified coroutine yields.
 Creates an Observable that produces values when the specified coroutine yields.
 
 
 | Name | Type | Default | Description |
 | Name | Type | Default | Description |
 |------|------|---------|-------------|
 |------|------|---------|-------------|
-| `coroutine` | thread |  |  |
+| `fn` | thread|function |  | A coroutine or function to use to generate values.  Note that if a coroutine is used, the values it yields will be shared by all subscribed Observers (influenced by the Scheduler), whereas a new coroutine will be created for each Observer when a function is used. |
 
 
 ---
 ---
 
 

+ 6 - 3
rx.lua

@@ -197,11 +197,14 @@ function Observable.fromTable(t, iterator, keys)
 end
 end
 
 
 --- Creates an Observable that produces values when the specified coroutine yields.
 --- Creates an Observable that produces values when the specified coroutine yields.
--- @arg {thread} coroutine
+-- @arg {thread|function} fn - A coroutine or function to use to generate values.  Note that if a
+--                             coroutine is used, the values it yields will be shared by all
+--                             subscribed Observers (influenced by the Scheduler), whereas a new
+--                             coroutine will be created for each Observer when a function is used.
 -- @returns {Observable}
 -- @returns {Observable}
-function Observable.fromCoroutine(thread, scheduler)
-  thread = type(thread) == 'function' and coroutine.create(thread) or thread
+function Observable.fromCoroutine(fn, scheduler)
   return Observable.create(function(observer)
   return Observable.create(function(observer)
+    local thread = type(fn) == 'function' and coroutine.create(fn) or fn
     return scheduler:schedule(function()
     return scheduler:schedule(function()
       while not observer.stopped do
       while not observer.stopped do
         local success, value = coroutine.resume(thread)
         local success, value = coroutine.resume(thread)

+ 6 - 3
src/observable.lua

@@ -102,11 +102,14 @@ function Observable.fromTable(t, iterator, keys)
 end
 end
 
 
 --- Creates an Observable that produces values when the specified coroutine yields.
 --- Creates an Observable that produces values when the specified coroutine yields.
--- @arg {thread} coroutine
+-- @arg {thread|function} fn - A coroutine or function to use to generate values.  Note that if a
+--                             coroutine is used, the values it yields will be shared by all
+--                             subscribed Observers (influenced by the Scheduler), whereas a new
+--                             coroutine will be created for each Observer when a function is used.
 -- @returns {Observable}
 -- @returns {Observable}
-function Observable.fromCoroutine(thread, scheduler)
-  thread = type(thread) == 'function' and coroutine.create(thread) or thread
+function Observable.fromCoroutine(fn, scheduler)
   return Observable.create(function(observer)
   return Observable.create(function(observer)
+    local thread = type(fn) == 'function' and coroutine.create(fn) or fn
     return scheduler:schedule(function()
     return scheduler:schedule(function()
       while not observer.stopped do
       while not observer.stopped do
         local success, value = coroutine.resume(thread)
         local success, value = coroutine.resume(thread)

+ 34 - 0
tests/observable.lua

@@ -177,6 +177,40 @@ describe('Observable', function()
       until Rx.scheduler:isEmpty()
       until Rx.scheduler:isEmpty()
       expect(onNext).to.equal({{1}, {2}, {3}})
       expect(onNext).to.equal({{1}, {2}, {3}})
     end)
     end)
+
+    it('shares values among Observers when the first argument is a coroutine', function()
+      local coroutine = coroutine.create(function()
+        coroutine.yield(1)
+        coroutine.yield(2)
+        return 3
+      end)
+
+      Rx.scheduler = Rx.CooperativeScheduler.create()
+      local observable = Rx.Observable.fromCoroutine(coroutine, Rx.scheduler)
+      local onNextA = observableSpy(observable)
+      local onNextB = observableSpy(observable)
+      repeat Rx.scheduler:update()
+      until Rx.scheduler:isEmpty()
+      expect(onNextA).to.equal({{2}})
+      expect(onNextB).to.equal({{1}, {3}})
+    end)
+
+    it('uses a unique coroutine for each Observer when the first argument is a function', function()
+      local coroutine = function()
+        coroutine.yield(1)
+        coroutine.yield(2)
+        return 3
+      end
+
+      Rx.scheduler = Rx.CooperativeScheduler.create()
+      local observable = Rx.Observable.fromCoroutine(coroutine, Rx.scheduler)
+      local onNextA = observableSpy(observable)
+      local onNextB = observableSpy(observable)
+      repeat Rx.scheduler:update()
+      until Rx.scheduler:isEmpty()
+      expect(onNextA).to.equal({{1}, {2}, {3}})
+      expect(onNextB).to.equal({{1}, {2}, {3}})
+    end)
   end)
   end)
 
 
   describe('defer', function()
   describe('defer', function()