Browse Source

Observable.window;

bjorn 10 years ago
parent
commit
a607a001d3
2 changed files with 44 additions and 0 deletions
  1. 15 0
      doc/README.md
  2. 29 0
      rx.lua

+ 15 - 0
doc/README.md

@@ -38,6 +38,7 @@ RxLua
   - [takeUntil](#takeuntilother)
   - [unpack](#unpack)
   - [unwrap](#unwrap)
+  - [window](#windowsize)
   - [wrap](#wrapsize)
 - [Scheduler](#scheduler)
 - [ImmediateScheduler](#immediatescheduler)
@@ -498,6 +499,20 @@ Returns:
 
 ---
 
+#### `:window(size)`
+
+Returns an Observable that produces a sliding window of the values produced by the original.
+
+Arguments:
+
+- `size` (`number`) - The size of the window. The returned observable will produce this number of the most recent values as multiple arguments to onNext.
+
+Returns:
+
+- `Observable`
+
+---
+
 #### `:wrap(size)`
 
 Returns an Observable that buffers values from the original and produces them as multiple values.

+ 29 - 0
rx.lua

@@ -711,6 +711,35 @@ function Observable:unwrap()
   end)
 end
 
+--- Returns an Observable that produces a sliding window of the values produced by the original.
+-- @arg {number} size - The size of the window. The returned observable will produce this number
+--                      of the most recent values as multiple arguments to onNext.
+-- @returns {Observable}
+function Observable:window(size)
+  return Observable.create(function(observer)
+    local window = {}
+
+    local function onNext(value)
+      table.insert(window, value)
+
+      if #window > size then
+        table.remove(window, 1)
+        observer:onNext(unpack(window))
+      end
+    end
+
+    local function onError(message)
+      return observer:onError(message)
+    end
+
+    local function onComplete()
+      return observer:onComplete()
+    end
+
+    return self:subscribe(onNext, onError, onComplete)
+  end)
+end
+
 --- Returns an Observable that buffers values from the original and produces them as multiple
 -- values.
 -- @arg {number} size - The size of the buffer.