Browse Source

Add Observable:wrap;

bjorn 10 years ago
parent
commit
698403d37d
2 changed files with 49 additions and 0 deletions
  1. 11 0
      doc/README.md
  2. 38 0
      rx.lua

+ 11 - 0
doc/README.md

@@ -36,6 +36,7 @@
   - [takeUntil](#takeuntilother)
   - [takeUntil](#takeuntilother)
   - [unpack](#unpack)
   - [unpack](#unpack)
   - [unwrap](#unwrap)
   - [unwrap](#unwrap)
+  - [wrap](#wrapsize)
 - [Scheduler](#scheduler)
 - [Scheduler](#scheduler)
 - [CooperativeScheduler](#cooperativescheduler)
 - [CooperativeScheduler](#cooperativescheduler)
   - [create](#createcurrenttime)
   - [create](#createcurrenttime)
@@ -499,6 +500,16 @@ Returns:
 
 
 - `Observable`
 - `Observable`
 
 
+---
+
+#### `:wrap(size)`
+
+Returns an Observable that buffers values from the original and produces them as multiple values.
+
+Arguments:
+
+- `size` (`number`) - The size of the buffer.
+
 # Scheduler
 # Scheduler
 
 
 Schedulers manage groups of Observables.
 Schedulers manage groups of Observables.

+ 38 - 0
rx.lua

@@ -713,6 +713,44 @@ function Observable:unwrap()
   end)
   end)
 end
 end
 
 
+--- Returns an Observable that buffers values from the original and produces them as multiple
+-- values.
+-- @arg {number} size - The size of the buffer.
+function Observable:wrap(size)
+  return Observable.create(function(observer)
+    local buffer = {}
+
+    local function emit()
+      if #buffer > 0 then
+        observer:onNext(unpack(buffer))
+        buffer = {}
+      end
+    end
+
+    local function onNext(...)
+      local values = {...}
+      for i = 1, #values do
+        table.insert(buffer, values[i])
+        if #buffer >= size then
+          return emit()
+        end
+      end
+    end
+
+    local function onError(message)
+      emit()
+      return observer:onError(message)
+    end
+
+    local function onComplete()
+      emit()
+      return observer:onComplete()
+    end
+
+    return self:subscribe(onNext, onError, onComplete)
+  end)
+end
+
 --- @class Scheduler
 --- @class Scheduler
 -- @description Schedulers manage groups of Observables.
 -- @description Schedulers manage groups of Observables.
 local Scheduler = {}
 local Scheduler = {}