Browse Source

Separate operators into separate files;

bjorn 9 years ago
parent
commit
0974478aec

+ 0 - 984
src/observable.lua

@@ -136,988 +136,4 @@ function Observable:dump(name, formatter)
   return self:subscribe(onNext, onError, onCompleted)
   return self:subscribe(onNext, onError, onCompleted)
 end
 end
 
 
---- Determine whether all items emitted by an Observable meet some criteria.
--- @arg {function=identity} predicate - The predicate used to evaluate objects.
-function Observable:all(predicate)
-  predicate = predicate or util.identity
-
-  return Observable.create(function(observer)
-    local function onNext(...)
-      if not predicate(...) then
-        observer:onNext(false)
-        observer:onCompleted()
-      end
-    end
-
-    local function onError(e)
-      return observer:onError(e)
-    end
-
-    local function onCompleted()
-      observer:onNext(true)
-      return observer:onCompleted()
-    end
-
-    return self:subscribe(onNext, onError, onCompleted)
-  end)
-end
-
---- Given a set of Observables, produces values from only the first one to produce a value.
--- @arg {Observable...} observables
--- @returns {Observable}
-function Observable.amb(a, b, ...)
-  if not a or not b then return a end
-
-  return Observable.create(function(observer)
-    local subscriptionA, subscriptionB
-
-    local function onNextA(...)
-      if subscriptionB then subscriptionB:unsubscribe() end
-      observer:onNext(...)
-    end
-
-    local function onErrorA(e)
-      if subscriptionB then subscriptionB:unsubscribe() end
-      observer:onError(e)
-    end
-
-    local function onCompletedA()
-      if subscriptionB then subscriptionB:unsubscribe() end
-      observer:onCompleted()
-    end
-
-    local function onNextB(...)
-      if subscriptionA then subscriptionA:unsubscribe() end
-      observer:onNext(...)
-    end
-
-    local function onErrorB(e)
-      if subscriptionA then subscriptionA:unsubscribe() end
-      observer:onError(e)
-    end
-
-    local function onCompletedB()
-      if subscriptionA then subscriptionA:unsubscribe() end
-      observer:onCompleted()
-    end
-
-    subscriptionA = a:subscribe(onNextA, onErrorA, onCompletedA)
-    subscriptionB = b:subscribe(onNextB, onErrorB, onCompletedB)
-
-    return Subscription.create(function()
-      subscriptionA:unsubscribe()
-      subscriptionB:unsubscribe()
-    end)
-  end):amb(...)
-end
-
---- Returns an Observable that produces the average of all values produced by the original.
--- @returns {Observable}
-function Observable:average()
-  return Observable.create(function(observer)
-    local sum, count = 0, 0
-
-    local function onNext(value)
-      sum = sum + value
-      count = count + 1
-    end
-
-    local function onError(e)
-      observer:onError(e)
-    end
-
-    local function onCompleted()
-      if count > 0 then
-        observer:onNext(sum / count)
-      end
-
-      observer:onCompleted()
-    end
-
-    return self:subscribe(onNext, onError, onCompleted)
-  end)
-end
-
---- Returns an Observable that buffers values from the original and produces them as multiple
--- values.
--- @arg {number} size - The size of the buffer.
-function Observable:buffer(size)
-  return Observable.create(function(observer)
-    local buffer = {}
-
-    local function emit()
-      if #buffer > 0 then
-        observer:onNext(util.unpack(buffer))
-        buffer = {}
-      end
-    end
-
-    local function onNext(...)
-      local values = {...}
-      for i = 1, #values do
-        table.insert(buffer, values[i])
-        if #buffer >= size then
-          emit()
-        end
-      end
-    end
-
-    local function onError(message)
-      emit()
-      return observer:onError(message)
-    end
-
-    local function onCompleted()
-      emit()
-      return observer:onCompleted()
-    end
-
-    return self:subscribe(onNext, onError, onCompleted)
-  end)
-end
-
---- Returns an Observable that intercepts any errors from the previous and replace them with values
--- produced by a new Observable.
--- @arg {function|Observable} handler - An Observable or a function that returns an Observable to
---                                       replace the source Observable in the event of an error.
--- @returns {Observable}
-function Observable:catch(handler)
-  handler = handler and (type(handler) == 'function' and handler or util.constant(handler))
-
-  return Observable.create(function(observer)
-    local subscription
-
-    local function onNext(...)
-      return observer:onNext(...)
-    end
-
-    local function onError(e)
-      if not handler then
-        return observer:onCompleted()
-      end
-
-      local continue = handler(e)
-      if continue then
-        if subscription then subscription:unsubscribe() end
-        continue:subscribe(observer)
-      else
-        observer:onError(e)
-      end
-    end
-
-    local function onCompleted()
-      observer:onCompleted()
-    end
-
-    subscription = self:subscribe(onNext, onError, onCompleted)
-    return subscription
-  end)
-end
-
---- Returns a new Observable that runs a combinator function on the most recent values from a set
--- of Observables whenever any of them produce a new value. The results of the combinator function
--- are produced by the new Observable.
--- @arg {Observable...} observables - One or more Observables to combine.
--- @arg {function} combinator - A function that combines the latest result from each Observable and
---                              returns a single value.
--- @returns {Observable}
-function Observable:combineLatest(...)
-  local sources = {...}
-  local combinator = table.remove(sources)
-  if type(combinator) ~= 'function' then
-    table.insert(sources, combinator)
-    combinator = function(...) return ... end
-  end
-  table.insert(sources, 1, self)
-
-  return Observable.create(function(observer)
-    local latest = {}
-    local pending = {util.unpack(sources)}
-    local completed = {}
-
-    local function onNext(i)
-      return function(value)
-        latest[i] = value
-        pending[i] = nil
-
-        if not next(pending) then
-          observer:onNext(combinator(util.unpack(latest)))
-        end
-      end
-    end
-
-    local function onError(e)
-      return observer:onError(e)
-    end
-
-    local function onCompleted(i)
-      return function()
-        table.insert(completed, i)
-
-        if #completed == #sources then
-          observer:onCompleted()
-        end
-      end
-    end
-
-    for i = 1, #sources do
-      sources[i]:subscribe(onNext(i), onError, onCompleted(i))
-    end
-  end)
-end
-
---- Returns a new Observable that produces the values of the first with falsy values removed.
--- @returns {Observable}
-function Observable:compact()
-  return self:filter(util.identity)
-end
-
---- Returns a new Observable that produces the values produced by all the specified Observables in
--- the order they are specified.
--- @arg {Observable...} sources - The Observables to concatenate.
--- @returns {Observable}
-function Observable:concat(other, ...)
-  if not other then return self end
-
-  local others = {...}
-
-  return Observable.create(function(observer)
-    local function onNext(...)
-      return observer:onNext(...)
-    end
-
-    local function onError(message)
-      return observer:onError(message)
-    end
-
-    local function onCompleted()
-      return observer:onCompleted()
-    end
-
-    local function chain()
-      return other:concat(util.unpack(others)):subscribe(onNext, onError, onCompleted)
-    end
-
-    return self:subscribe(onNext, onError, chain)
-  end)
-end
-
---- Returns a new Observable that produces a single boolean value representing whether or not the
--- specified value was produced by the original.
--- @arg {*} value - The value to search for.  == is used for equality testing.
--- @returns {Observable}
-function Observable:contains(value)
-  return Observable.create(function(observer)
-    local subscription
-
-    local function onNext(...)
-      local args = util.pack(...)
-
-      if #args == 0 and value == nil then
-        observer:onNext(true)
-        if subscription then subscription:unsubscribe() end
-        return observer:onCompleted()
-      end
-
-      for i = 1, #args do
-        if args[i] == value then
-          observer:onNext(true)
-        if subscription then subscription:unsubscribe() end
-          return observer:onCompleted()
-        end
-      end
-    end
-
-    local function onError(e)
-      return observer:onError(e)
-    end
-
-    local function onCompleted()
-      observer:onNext(false)
-      return observer:onCompleted()
-    end
-
-    subscription = self:subscribe(onNext, onError, onCompleted)
-    return subscription
-  end)
-end
-
---- Returns an Observable that produces a single value representing the number of values produced
--- by the source value that satisfy an optional predicate.
--- @arg {function=} predicate - The predicate used to match values.
-function Observable:count(predicate)
-  predicate = predicate or util.constant(true)
-
-  return Observable.create(function(observer)
-    local count = 0
-
-    local function onNext(...)
-      if predicate(...) then
-        count = count + 1
-      end
-    end
-
-    local function onError(e)
-      return observer:onError(e)
-    end
-
-    local function onCompleted()
-      observer:onNext(count)
-      observer:onCompleted()
-    end
-
-    return self:subscribe(onNext, onError, onCompleted)
-  end)
-end
-
---- Returns a new Observable that produces the values from the original with duplicates removed.
--- @returns {Observable}
-function Observable:distinct()
-  return Observable.create(function(observer)
-    local values = {}
-
-    local function onNext(x)
-      if not values[x] then
-        observer:onNext(x)
-      end
-
-      values[x] = true
-    end
-
-    local function onError(e)
-      return observer:onError(e)
-    end
-
-    local function onCompleted()
-      return observer:onCompleted()
-    end
-
-    return self:subscribe(onNext, onError, onCompleted)
-  end)
-end
-
---- Returns an Observable that only produces values from the original if they are different from
--- the previous value.
--- @arg {function} comparator - A function used to compare 2 values. If unspecified, == is used.
--- @returns {Observable}
-function Observable:distinctUntilChanged(comparator)
-  comparator = comparator or util.eq
-
-  return Observable.create(function(observer)
-    local first = true
-    local currentValue = nil
-
-    local function onNext(value, ...)
-      if first or not comparator(value, currentValue) then
-        observer:onNext(value, ...)
-        currentValue = value
-        first = false
-      end
-    end
-
-    local function onError(message)
-      return observer:onError(onError)
-    end
-
-    local function onCompleted()
-      return observer:onCompleted()
-    end
-
-    return self:subscribe(onNext, onError, onCompleted)
-  end)
-end
-
---- Returns a new Observable that only produces values of the first that satisfy a predicate.
--- @arg {function} predicate - The predicate used to filter values.
--- @returns {Observable}
-function Observable:filter(predicate)
-  predicate = predicate or util.identity
-
-  return Observable.create(function(observer)
-    local function onNext(...)
-      if predicate(...) then
-        return observer:onNext(...)
-      end
-    end
-
-    local function onError(e)
-      return observer:onError(e)
-    end
-
-    local function onCompleted()
-      return observer:onCompleted(e)
-    end
-
-    return self:subscribe(onNext, onError, onCompleted)
-  end)
-end
-
---- Returns a new Observable that produces the first value of the original that satisfies a
--- predicate.
--- @arg {function} predicate - The predicate used to find a value.
-function Observable:find(predicate)
-  predicate = predicate or util.identity
-
-  return Observable.create(function(observer)
-    local function onNext(...)
-      if predicate(...) then
-        observer:onNext(...)
-        return observer:onCompleted()
-      end
-    end
-
-    local function onError(message)
-      return observer:onError(e)
-    end
-
-    local function onCompleted()
-      return observer:onCompleted()
-    end
-
-    return self:subscribe(onNext, onError, onCompleted)
-  end)
-end
-
---- Returns a new Observable that only produces the first result of the original.
--- @returns {Observable}
-function Observable:first()
-  return self:take(1)
-end
-
---- Returns a new Observable that subscribes to the Observables produced by the original and
--- produces their values.
--- @returns {Observable}
-function Observable:flatten()
-  return Observable.create(function(observer)
-    local function onError(message)
-      return observer:onError(message)
-    end
-
-    local function onNext(observable)
-      local function innerOnNext(...)
-        observer:onNext(...)
-      end
-
-      observable:subscribe(innerOnNext, onError, util.noop)
-    end
-
-    local function onCompleted()
-      return observer:onCompleted()
-    end
-
-    return self:subscribe(onNext, onError, onCompleted)
-  end)
-end
-
---- Returns a new Observable that only produces the last result of the original.
--- @returns {Observable}
-function Observable:last()
-  return Observable.create(function(observer)
-    local value
-    local empty = true
-
-    local function onNext(...)
-      value = {...}
-      empty = false
-    end
-
-    local function onError(e)
-      return observer:onError(e)
-    end
-
-    local function onCompleted()
-      if not empty then
-        observer:onNext(util.unpack(value or {}))
-      end
-
-      return observer:onCompleted()
-    end
-
-    return self:subscribe(onNext, onError, onCompleted)
-  end)
-end
-
---- Returns a new Observable that produces the values of the original transformed by a function.
--- @arg {function} callback - The function to transform values from the original Observable.
--- @returns {Observable}
-function Observable:map(callback)
-  return Observable.create(function(observer)
-    callback = callback or util.identity
-
-    local function onNext(...)
-      return observer:onNext(callback(...))
-    end
-
-    local function onError(e)
-      return observer:onError(e)
-    end
-
-    local function onCompleted()
-      return observer:onCompleted()
-    end
-
-    return self:subscribe(onNext, onError, onCompleted)
-  end)
-end
-
---- Returns a new Observable that produces the maximum value produced by the original.
--- @returns {Observable}
-function Observable:max()
-  return self:reduce(math.max)
-end
-
---- Returns a new Observable that produces the values produced by all the specified Observables in
--- the order they are produced.
--- @arg {Observable...} sources - One or more Observables to merge.
--- @returns {Observable}
-function Observable:merge(...)
-  local sources = {...}
-  table.insert(sources, 1, self)
-
-  return Observable.create(function(observer)
-    local function onNext(...)
-      return observer:onNext(...)
-    end
-
-    local function onError(message)
-      return observer:onError(message)
-    end
-
-    local function onCompleted(i)
-      return function()
-        sources[i] = nil
-
-        if not next(sources) then
-          observer:onCompleted()
-        end
-      end
-    end
-
-    for i = 1, #sources do
-      sources[i]:subscribe(onNext, onError, onCompleted(i))
-    end
-  end)
-end
-
---- Returns a new Observable that produces the minimum value produced by the original.
--- @returns {Observable}
-function Observable:min()
-  return self:reduce(math.min)
-end
-
---- Returns an Observable that produces the values of the original inside tables.
--- @returns {Observable}
-function Observable:pack()
-  return self:map(util.pack)
-end
-
---- Returns two Observables: one that produces values for which the predicate returns truthy for,
--- and another that produces values for which the predicate returns falsy.
--- @arg {function} predicate - The predicate used to partition the values.
--- @returns {Observable}
--- @returns {Observable}
-function Observable:partition(predicate)
-  return self:filter(predicate), self:reject(predicate)
-end
-
---- Returns a new Observable that produces values computed by extracting the given keys from the
--- tables produced by the original.
--- @arg {string...} keys - The key to extract from the table. Multiple keys can be specified to
---                         recursively pluck values from nested tables.
--- @returns {Observable}
-function Observable:pluck(key, ...)
-  if not key then return self end
-
-  return Observable.create(function(observer)
-    local function onNext(t)
-      return observer:onNext(t[key])
-    end
-
-    local function onError(e)
-      return observer:onError(e)
-    end
-
-    local function onCompleted()
-      return observer:onCompleted()
-    end
-
-    return self:subscribe(onNext, onError, onCompleted)
-  end):pluck(...)
-end
-
---- Returns a new Observable that produces a single value computed by accumulating the results of
--- running a function on each value produced by the original Observable.
--- @arg {function} accumulator - Accumulates the values of the original Observable. Will be passed
---                               the return value of the last call as the first argument and the
---                               current values as the rest of the arguments.
--- @arg {*} seed - A value to pass to the accumulator the first time it is run.
--- @returns {Observable}
-function Observable:reduce(accumulator, seed)
-  return Observable.create(function(observer)
-    local result = seed
-    local first = true
-
-    local function onNext(...)
-      if first and seed == nil then
-        result = ...
-        first = false
-      else
-        result = accumulator(result, ...)
-      end
-    end
-
-    local function onError(e)
-      return observer:onError(e)
-    end
-
-    local function onCompleted()
-      observer:onNext(result)
-      return observer:onCompleted()
-    end
-
-    return self:subscribe(onNext, onError, onCompleted)
-  end)
-end
-
---- Returns a new Observable that produces values from the original which do not satisfy a
--- predicate.
--- @arg {function} predicate - The predicate used to reject values.
--- @returns {Observable}
-function Observable:reject(predicate)
-  predicate = predicate or util.identity
-
-  return Observable.create(function(observer)
-    local function onNext(...)
-      if not predicate(...) then
-        return observer:onNext(...)
-      end
-    end
-
-    local function onError(e)
-      return observer:onError(e)
-    end
-
-    local function onCompleted()
-      return observer:onCompleted(e)
-    end
-
-    return self:subscribe(onNext, onError, onCompleted)
-  end)
-end
-
---- Returns a new Observable that skips over a specified number of values produced by the original
--- and produces the rest.
--- @arg {number=1} n - The number of values to ignore.
--- @returns {Observable}
-function Observable:skip(n)
-  n = n or 1
-
-  return Observable.create(function(observer)
-    local i = 1
-
-    local function onNext(...)
-      if i > n then
-        observer:onNext(...)
-      else
-        i = i + 1
-      end
-    end
-
-    local function onError(e)
-      return observer:onError(e)
-    end
-
-    local function onCompleted()
-      return observer:onCompleted()
-    end
-
-    return self:subscribe(onNext, onError, onCompleted)
-  end)
-end
-
---- Returns a new Observable that skips over values produced by the original until the specified
--- Observable produces a value.
--- @arg {Observable} other - The Observable that triggers the production of values.
--- @returns {Observable}
-function Observable:skipUntil(other)
-  return Observable.create(function(observer)
-    local triggered = false
-    local function trigger()
-      triggered = true
-    end
-
-    other:subscribe(trigger, trigger, trigger)
-
-    local function onNext(...)
-      if triggered then
-        observer:onNext(...)
-      end
-    end
-
-    local function onError()
-      if triggered then
-        observer:onError()
-      end
-    end
-
-    local function onCompleted()
-      if triggered then
-        observer:onCompleted()
-      end
-    end
-
-    return self:subscribe(onNext, onError, onCompleted)
-  end)
-end
-
---- Returns a new Observable that skips elements until the predicate returns falsy for one of them.
--- @arg {function} predicate - The predicate used to continue skipping values.
--- @returns {Observable}
-function Observable:skipWhile(predicate)
-  predicate = predicate or util.identity
-
-  return Observable.create(function(observer)
-    local skipping = true
-
-    local function onNext(...)
-      if skipping then
-        skipping = predicate(...)
-      end
-
-      if not skipping then
-        return observer:onNext(...)
-      end
-    end
-
-    local function onError(message)
-      return observer:onError(message)
-    end
-
-    local function onCompleted()
-      return observer:onCompleted()
-    end
-
-    return self:subscribe(onNext, onError, onCompleted)
-  end)
-end
-
---- Returns a new Observable that only produces the first n results of the original.
--- @arg {number=1} n - The number of elements to produce before completing.
--- @returns {Observable}
-function Observable:take(n)
-  n = n or 1
-
-  return Observable.create(function(observer)
-    if n <= 0 then
-      observer:onCompleted()
-      return
-    end
-
-    local i = 1
-
-    local function onNext(...)
-      observer:onNext(...)
-
-      i = i + 1
-
-      if i > n then
-        observer:onCompleted()
-      end
-    end
-
-    local function onError(e)
-      return observer:onError(e)
-    end
-
-    local function onCompleted()
-      return observer:onCompleted()
-    end
-
-    return self:subscribe(onNext, onError, onCompleted)
-  end)
-end
-
---- Returns a new Observable that completes when the specified Observable fires.
--- @arg {Observable} other - The Observable that triggers completion of the original.
--- @returns {Observable}
-function Observable:takeUntil(other)
-  return Observable.create(function(observer)
-    local function onNext(...)
-      return observer:onNext(...)
-    end
-
-    local function onError(e)
-      return observer:onError(e)
-    end
-
-    local function onCompleted()
-      return observer:onCompleted()
-    end
-
-    other:subscribe(onCompleted, onCompleted, onCompleted)
-
-    return self:subscribe(onNext, onError, onCompleted)
-  end)
-end
-
---- Returns a new Observable that produces elements until the predicate returns falsy.
--- @arg {function} predicate - The predicate used to continue production of values.
--- @returns {Observable}
-function Observable:takeWhile(predicate)
-  predicate = predicate or util.identity
-
-  return Observable.create(function(observer)
-    local taking = true
-
-    local function onNext(...)
-      if taking then
-        taking = predicate(...)
-
-        if taking then
-          return observer:onNext(...)
-        else
-          return observer:onCompleted()
-        end
-      end
-    end
-
-    local function onError(message)
-      return observer:onError(message)
-    end
-
-    local function onCompleted()
-      return observer:onCompleted()
-    end
-
-    return self:subscribe(onNext, onError, onCompleted)
-  end)
-end
-
---- Runs a function each time this Observable has activity. Similar to subscribe but does not
--- create a subscription.
--- @arg {function=} onNext - Run when the Observable produces values.
--- @arg {function=} onError - Run when the Observable encounters a problem.
--- @arg {function=} onCompleted - Run when the Observable completes.
--- @returns {Observable}
-function Observable:tap(_onNext, _onError, _onCompleted)
-  _onNext = _onNext or util.noop
-  _onError = _onError or util.noop
-  _onCompleted = _onCompleted or util.noop
-
-  return Observable.create(function(observer)
-    local function onNext(...)
-      _onNext(...)
-      return observer:onNext(...)
-    end
-
-    local function onError(message)
-      _onError(message)
-      return observer:onError(message)
-    end
-
-    local function onCompleted()
-      _onCompleted()
-      return observer:onCompleted()
-    end
-
-    return self:subscribe(onNext, onError, onCompleted)
-  end)
-end
-
---- Returns an Observable that unpacks the tables produced by the original.
--- @returns {Observable}
-function Observable:unpack()
-  return self:map(util.unpack)
-end
-
---- Returns an Observable that takes any values produced by the original that consist of multiple
--- return values and produces each value individually.
--- @returns {Observable}
-function Observable:unwrap()
-  return Observable.create(function(observer)
-    local function onNext(...)
-      local values = {...}
-      for i = 1, #values do
-        observer:onNext(values[i])
-      end
-    end
-
-    local function onError(message)
-      return observer:onError(message)
-    end
-
-    local function onCompleted()
-      return observer:onCompleted()
-    end
-
-    return self:subscribe(onNext, onError, onCompleted)
-  end)
-end
-
---- Returns an Observable that produces a sliding window of the values produced by the original.
--- @arg {number} size - The size of the window. The returned observable will produce this number
---                      of the most recent values as multiple arguments to onNext.
--- @returns {Observable}
-function Observable:window(size)
-  return Observable.create(function(observer)
-    local window = {}
-
-    local function onNext(value)
-      table.insert(window, value)
-
-      if #window >= size then
-        observer:onNext(util.unpack(window))
-        table.remove(window, 1)
-      end
-    end
-
-    local function onError(message)
-      return observer:onError(message)
-    end
-
-    local function onCompleted()
-      return observer:onCompleted()
-    end
-
-    return self:subscribe(onNext, onError, onCompleted)
-  end)
-end
-
---- Returns an Observable that produces values from the original along with the most recently
--- produced value from all other specified Observables. Note that only the first argument from each
--- source Observable is used.
--- @arg {Observable...} sources - The Observables to include the most recent values from.
--- @returns {Observable}
-function Observable:with(...)
-  local sources = {...}
-
-  return Observable.create(function(observer)
-    local latest = setmetatable({}, {__len = util.constant(#sources)})
-
-    local function setLatest(i)
-      return function(value)
-        latest[i] = value
-      end
-    end
-
-    local function onNext(value)
-      return observer:onNext(value, util.unpack(latest))
-    end
-
-    local function onError(e)
-      return observer:onError(e)
-    end
-
-    local function onCompleted()
-      return observer:onCompleted()
-    end
-
-    for i = 1, #sources do
-      sources[i]:subscribe(setLatest(i), util.noop, util.noop)
-    end
-
-    return self:subscribe(onNext, onError, onCompleted)
-  end)
-end
-
 return Observable
 return Observable

+ 29 - 0
src/operators/all.lua

@@ -0,0 +1,29 @@
+local Observable = require 'observable'
+local util = require 'util'
+
+--- Determine whether all items emitted by an Observable meet some criteria.
+-- @arg {function=identity} predicate - The predicate used to evaluate objects.
+function Observable:all(predicate)
+  predicate = predicate or util.identity
+
+  return Observable.create(function(observer)
+    local function onNext(...)
+      if not predicate(...) then
+        observer:onNext(false)
+        observer:onCompleted()
+      end
+    end
+
+    local function onError(e)
+      return observer:onError(e)
+    end
+
+    local function onCompleted()
+      observer:onNext(true)
+      return observer:onCompleted()
+    end
+
+    return self:subscribe(onNext, onError, onCompleted)
+  end)
+end
+

+ 51 - 0
src/operators/amb.lua

@@ -0,0 +1,51 @@
+local Observable = require 'observable'
+
+--- Given a set of Observables, produces values from only the first one to produce a value.
+-- @arg {Observable...} observables
+-- @returns {Observable}
+function Observable.amb(a, b, ...)
+  if not a or not b then return a end
+
+  return Observable.create(function(observer)
+    local subscriptionA, subscriptionB
+
+    local function onNextA(...)
+      if subscriptionB then subscriptionB:unsubscribe() end
+      observer:onNext(...)
+    end
+
+    local function onErrorA(e)
+      if subscriptionB then subscriptionB:unsubscribe() end
+      observer:onError(e)
+    end
+
+    local function onCompletedA()
+      if subscriptionB then subscriptionB:unsubscribe() end
+      observer:onCompleted()
+    end
+
+    local function onNextB(...)
+      if subscriptionA then subscriptionA:unsubscribe() end
+      observer:onNext(...)
+    end
+
+    local function onErrorB(e)
+      if subscriptionA then subscriptionA:unsubscribe() end
+      observer:onError(e)
+    end
+
+    local function onCompletedB()
+      if subscriptionA then subscriptionA:unsubscribe() end
+      observer:onCompleted()
+    end
+
+    subscriptionA = a:subscribe(onNextA, onErrorA, onCompletedA)
+    subscriptionB = b:subscribe(onNextB, onErrorB, onCompletedB)
+
+    return Subscription.create(function()
+      subscriptionA:unsubscribe()
+      subscriptionB:unsubscribe()
+    end)
+  end):amb(...)
+end
+

+ 29 - 0
src/operators/average.lua

@@ -0,0 +1,29 @@
+local Observable = require 'observable'
+
+--- Returns an Observable that produces the average of all values produced by the original.
+-- @returns {Observable}
+function Observable:average()
+  return Observable.create(function(observer)
+    local sum, count = 0, 0
+
+    local function onNext(value)
+      sum = sum + value
+      count = count + 1
+    end
+
+    local function onError(e)
+      observer:onError(e)
+    end
+
+    local function onCompleted()
+      if count > 0 then
+        observer:onNext(sum / count)
+      end
+
+      observer:onCompleted()
+    end
+
+    return self:subscribe(onNext, onError, onCompleted)
+  end)
+end
+

+ 41 - 0
src/operators/buffer.lua

@@ -0,0 +1,41 @@
+local Observable = require 'observable'
+local util = require 'util'
+
+--- Returns an Observable that buffers values from the original and produces them as multiple
+-- values.
+-- @arg {number} size - The size of the buffer.
+function Observable:buffer(size)
+  return Observable.create(function(observer)
+    local buffer = {}
+
+    local function emit()
+      if #buffer > 0 then
+        observer:onNext(util.unpack(buffer))
+        buffer = {}
+      end
+    end
+
+    local function onNext(...)
+      local values = {...}
+      for i = 1, #values do
+        table.insert(buffer, values[i])
+        if #buffer >= size then
+          emit()
+        end
+      end
+    end
+
+    local function onError(message)
+      emit()
+      return observer:onError(message)
+    end
+
+    local function onCompleted()
+      emit()
+      return observer:onCompleted()
+    end
+
+    return self:subscribe(onNext, onError, onCompleted)
+  end)
+end
+

+ 41 - 0
src/operators/catch.lua

@@ -0,0 +1,41 @@
+local Observable = require 'observable'
+local util = require 'util'
+
+--- Returns an Observable that intercepts any errors from the previous and replace them with values
+-- produced by a new Observable.
+-- @arg {function|Observable} handler - An Observable or a function that returns an Observable to
+--                                       replace the source Observable in the event of an error.
+-- @returns {Observable}
+function Observable:catch(handler)
+  handler = handler and (type(handler) == 'function' and handler or util.constant(handler))
+
+  return Observable.create(function(observer)
+    local subscription
+
+    local function onNext(...)
+      return observer:onNext(...)
+    end
+
+    local function onError(e)
+      if not handler then
+        return observer:onCompleted()
+      end
+
+      local continue = handler(e)
+      if continue then
+        if subscription then subscription:unsubscribe() end
+        continue:subscribe(observer)
+      else
+        observer:onError(e)
+      end
+    end
+
+    local function onCompleted()
+      observer:onCompleted()
+    end
+
+    subscription = self:subscribe(onNext, onError, onCompleted)
+    return subscription
+  end)
+end
+

+ 55 - 0
src/operators/combineLatest.lua

@@ -0,0 +1,55 @@
+local Observable = require 'observable'
+local util = require 'util'
+
+--- Returns a new Observable that runs a combinator function on the most recent values from a set
+-- of Observables whenever any of them produce a new value. The results of the combinator function
+-- are produced by the new Observable.
+-- @arg {Observable...} observables - One or more Observables to combine.
+-- @arg {function} combinator - A function that combines the latest result from each Observable and
+--                              returns a single value.
+-- @returns {Observable}
+function Observable:combineLatest(...)
+  local sources = {...}
+  local combinator = table.remove(sources)
+  if type(combinator) ~= 'function' then
+    table.insert(sources, combinator)
+    combinator = function(...) return ... end
+  end
+  table.insert(sources, 1, self)
+
+  return Observable.create(function(observer)
+    local latest = {}
+    local pending = {util.unpack(sources)}
+    local completed = {}
+
+    local function onNext(i)
+      return function(value)
+        latest[i] = value
+        pending[i] = nil
+
+        if not next(pending) then
+          observer:onNext(combinator(util.unpack(latest)))
+        end
+      end
+    end
+
+    local function onError(e)
+      return observer:onError(e)
+    end
+
+    local function onCompleted(i)
+      return function()
+        table.insert(completed, i)
+
+        if #completed == #sources then
+          observer:onCompleted()
+        end
+      end
+    end
+
+    for i = 1, #sources do
+      sources[i]:subscribe(onNext(i), onError, onCompleted(i))
+    end
+  end)
+end
+

+ 9 - 0
src/operators/compact.lua

@@ -0,0 +1,9 @@
+local Observable = require 'observable'
+local util = require 'util'
+
+--- Returns a new Observable that produces the values of the first with falsy values removed.
+-- @returns {Observable}
+function Observable:compact()
+  return self:filter(util.identity)
+end
+

+ 33 - 0
src/operators/concat.lua

@@ -0,0 +1,33 @@
+local Observable = require 'observable'
+local util = require 'util'
+
+--- Returns a new Observable that produces the values produced by all the specified Observables in
+-- the order they are specified.
+-- @arg {Observable...} sources - The Observables to concatenate.
+-- @returns {Observable}
+function Observable:concat(other, ...)
+  if not other then return self end
+
+  local others = {...}
+
+  return Observable.create(function(observer)
+    local function onNext(...)
+      return observer:onNext(...)
+    end
+
+    local function onError(message)
+      return observer:onError(message)
+    end
+
+    local function onCompleted()
+      return observer:onCompleted()
+    end
+
+    local function chain()
+      return other:concat(util.unpack(others)):subscribe(onNext, onError, onCompleted)
+    end
+
+    return self:subscribe(onNext, onError, chain)
+  end)
+end
+

+ 43 - 0
src/operators/contains.lua

@@ -0,0 +1,43 @@
+local Observable = require 'observable'
+local util = require 'util'
+
+--- Returns a new Observable that produces a single boolean value representing whether or not the
+-- specified value was produced by the original.
+-- @arg {*} value - The value to search for.  == is used for equality testing.
+-- @returns {Observable}
+function Observable:contains(value)
+  return Observable.create(function(observer)
+    local subscription
+
+    local function onNext(...)
+      local args = util.pack(...)
+
+      if #args == 0 and value == nil then
+        observer:onNext(true)
+        if subscription then subscription:unsubscribe() end
+        return observer:onCompleted()
+      end
+
+      for i = 1, #args do
+        if args[i] == value then
+          observer:onNext(true)
+        if subscription then subscription:unsubscribe() end
+          return observer:onCompleted()
+        end
+      end
+    end
+
+    local function onError(e)
+      return observer:onError(e)
+    end
+
+    local function onCompleted()
+      observer:onNext(false)
+      return observer:onCompleted()
+    end
+
+    subscription = self:subscribe(onNext, onError, onCompleted)
+    return subscription
+  end)
+end
+

+ 31 - 0
src/operators/count.lua

@@ -0,0 +1,31 @@
+local Observable = require 'observable'
+local util = require 'util'
+
+--- Returns an Observable that produces a single value representing the number of values produced
+-- by the source value that satisfy an optional predicate.
+-- @arg {function=} predicate - The predicate used to match values.
+function Observable:count(predicate)
+  predicate = predicate or util.constant(true)
+
+  return Observable.create(function(observer)
+    local count = 0
+
+    local function onNext(...)
+      if predicate(...) then
+        count = count + 1
+      end
+    end
+
+    local function onError(e)
+      return observer:onError(e)
+    end
+
+    local function onCompleted()
+      observer:onNext(count)
+      observer:onCompleted()
+    end
+
+    return self:subscribe(onNext, onError, onCompleted)
+  end)
+end
+

+ 28 - 0
src/operators/distinct.lua

@@ -0,0 +1,28 @@
+local Observable = require 'observable'
+
+--- Returns a new Observable that produces the values from the original with duplicates removed.
+-- @returns {Observable}
+function Observable:distinct()
+  return Observable.create(function(observer)
+    local values = {}
+
+    local function onNext(x)
+      if not values[x] then
+        observer:onNext(x)
+      end
+
+      values[x] = true
+    end
+
+    local function onError(e)
+      return observer:onError(e)
+    end
+
+    local function onCompleted()
+      return observer:onCompleted()
+    end
+
+    return self:subscribe(onNext, onError, onCompleted)
+  end)
+end
+

+ 34 - 0
src/operators/distinctUntilChanged.lua

@@ -0,0 +1,34 @@
+local Observable = require 'observable'
+local util = require 'util'
+
+--- Returns an Observable that only produces values from the original if they are different from
+-- the previous value.
+-- @arg {function} comparator - A function used to compare 2 values. If unspecified, == is used.
+-- @returns {Observable}
+function Observable:distinctUntilChanged(comparator)
+  comparator = comparator or util.eq
+
+  return Observable.create(function(observer)
+    local first = true
+    local currentValue = nil
+
+    local function onNext(value, ...)
+      if first or not comparator(value, currentValue) then
+        observer:onNext(value, ...)
+        currentValue = value
+        first = false
+      end
+    end
+
+    local function onError(message)
+      return observer:onError(onError)
+    end
+
+    local function onCompleted()
+      return observer:onCompleted()
+    end
+
+    return self:subscribe(onNext, onError, onCompleted)
+  end)
+end
+

+ 28 - 0
src/operators/filter.lua

@@ -0,0 +1,28 @@
+local Observable = require 'observable'
+local util = require 'util'
+
+--- Returns a new Observable that only produces values of the first that satisfy a predicate.
+-- @arg {function} predicate - The predicate used to filter values.
+-- @returns {Observable}
+function Observable:filter(predicate)
+  predicate = predicate or util.identity
+
+  return Observable.create(function(observer)
+    local function onNext(...)
+      if predicate(...) then
+        return observer:onNext(...)
+      end
+    end
+
+    local function onError(e)
+      return observer:onError(e)
+    end
+
+    local function onCompleted()
+      return observer:onCompleted(e)
+    end
+
+    return self:subscribe(onNext, onError, onCompleted)
+  end)
+end
+

+ 29 - 0
src/operators/find.lua

@@ -0,0 +1,29 @@
+local Observable = require 'observable'
+local util = require 'util'
+
+--- Returns a new Observable that produces the first value of the original that satisfies a
+-- predicate.
+-- @arg {function} predicate - The predicate used to find a value.
+function Observable:find(predicate)
+  predicate = predicate or util.identity
+
+  return Observable.create(function(observer)
+    local function onNext(...)
+      if predicate(...) then
+        observer:onNext(...)
+        return observer:onCompleted()
+      end
+    end
+
+    local function onError(message)
+      return observer:onError(e)
+    end
+
+    local function onCompleted()
+      return observer:onCompleted()
+    end
+
+    return self:subscribe(onNext, onError, onCompleted)
+  end)
+end
+

+ 8 - 0
src/operators/first.lua

@@ -0,0 +1,8 @@
+local Observable = require 'observable'
+
+--- Returns a new Observable that only produces the first result of the original.
+-- @returns {Observable}
+function Observable:first()
+  return self:take(1)
+end
+

+ 28 - 0
src/operators/flatten.lua

@@ -0,0 +1,28 @@
+local Observable = require 'observable'
+local util = require 'util'
+
+--- Returns a new Observable that subscribes to the Observables produced by the original and
+-- produces their values.
+-- @returns {Observable}
+function Observable:flatten()
+  return Observable.create(function(observer)
+    local function onError(message)
+      return observer:onError(message)
+    end
+
+    local function onNext(observable)
+      local function innerOnNext(...)
+        observer:onNext(...)
+      end
+
+      observable:subscribe(innerOnNext, onError, util.noop)
+    end
+
+    local function onCompleted()
+      return observer:onCompleted()
+    end
+
+    return self:subscribe(onNext, onError, onCompleted)
+  end)
+end
+

+ 31 - 0
src/operators/last.lua

@@ -0,0 +1,31 @@
+local Observable = require 'observable'
+local util = require 'util'
+
+--- Returns a new Observable that only produces the last result of the original.
+-- @returns {Observable}
+function Observable:last()
+  return Observable.create(function(observer)
+    local value
+    local empty = true
+
+    local function onNext(...)
+      value = {...}
+      empty = false
+    end
+
+    local function onError(e)
+      return observer:onError(e)
+    end
+
+    local function onCompleted()
+      if not empty then
+        observer:onNext(util.unpack(value or {}))
+      end
+
+      return observer:onCompleted()
+    end
+
+    return self:subscribe(onNext, onError, onCompleted)
+  end)
+end
+

+ 26 - 0
src/operators/map.lua

@@ -0,0 +1,26 @@
+local Observable = require 'observable'
+local util = require 'util'
+
+--- Returns a new Observable that produces the values of the original transformed by a function.
+-- @arg {function} callback - The function to transform values from the original Observable.
+-- @returns {Observable}
+function Observable:map(callback)
+  return Observable.create(function(observer)
+    callback = callback or util.identity
+
+    local function onNext(...)
+      return observer:onNext(callback(...))
+    end
+
+    local function onError(e)
+      return observer:onError(e)
+    end
+
+    local function onCompleted()
+      return observer:onCompleted()
+    end
+
+    return self:subscribe(onNext, onError, onCompleted)
+  end)
+end
+

+ 8 - 0
src/operators/max.lua

@@ -0,0 +1,8 @@
+local Observable = require 'observable'
+
+--- Returns a new Observable that produces the maximum value produced by the original.
+-- @returns {Observable}
+function Observable:max()
+  return self:reduce(math.max)
+end
+

+ 35 - 0
src/operators/merge.lua

@@ -0,0 +1,35 @@
+local Observable = require 'observable'
+
+--- Returns a new Observable that produces the values produced by all the specified Observables in
+-- the order they are produced.
+-- @arg {Observable...} sources - One or more Observables to merge.
+-- @returns {Observable}
+function Observable:merge(...)
+  local sources = {...}
+  table.insert(sources, 1, self)
+
+  return Observable.create(function(observer)
+    local function onNext(...)
+      return observer:onNext(...)
+    end
+
+    local function onError(message)
+      return observer:onError(message)
+    end
+
+    local function onCompleted(i)
+      return function()
+        sources[i] = nil
+
+        if not next(sources) then
+          observer:onCompleted()
+        end
+      end
+    end
+
+    for i = 1, #sources do
+      sources[i]:subscribe(onNext, onError, onCompleted(i))
+    end
+  end)
+end
+

+ 8 - 0
src/operators/min.lua

@@ -0,0 +1,8 @@
+local Observable = require 'observable'
+
+--- Returns a new Observable that produces the minimum value produced by the original.
+-- @returns {Observable}
+function Observable:min()
+  return self:reduce(math.min)
+end
+

+ 9 - 0
src/operators/pack.lua

@@ -0,0 +1,9 @@
+local Observable = require 'observable'
+local util = require 'util'
+
+--- Returns an Observable that produces the values of the original inside tables.
+-- @returns {Observable}
+function Observable:pack()
+  return self:map(util.pack)
+end
+

+ 11 - 0
src/operators/partition.lua

@@ -0,0 +1,11 @@
+local Observable = require 'observable'
+
+--- Returns two Observables: one that produces values for which the predicate returns truthy for,
+-- and another that produces values for which the predicate returns falsy.
+-- @arg {function} predicate - The predicate used to partition the values.
+-- @returns {Observable}
+-- @returns {Observable}
+function Observable:partition(predicate)
+  return self:filter(predicate), self:reject(predicate)
+end
+

+ 27 - 0
src/operators/pluck.lua

@@ -0,0 +1,27 @@
+local Observable = require 'observable'
+
+--- Returns a new Observable that produces values computed by extracting the given keys from the
+-- tables produced by the original.
+-- @arg {string...} keys - The key to extract from the table. Multiple keys can be specified to
+--                         recursively pluck values from nested tables.
+-- @returns {Observable}
+function Observable:pluck(key, ...)
+  if not key then return self end
+
+  return Observable.create(function(observer)
+    local function onNext(t)
+      return observer:onNext(t[key])
+    end
+
+    local function onError(e)
+      return observer:onError(e)
+    end
+
+    local function onCompleted()
+      return observer:onCompleted()
+    end
+
+    return self:subscribe(onNext, onError, onCompleted)
+  end):pluck(...)
+end
+

+ 36 - 0
src/operators/reduce.lua

@@ -0,0 +1,36 @@
+local Observable = require 'observable'
+
+--- Returns a new Observable that produces a single value computed by accumulating the results of
+-- running a function on each value produced by the original Observable.
+-- @arg {function} accumulator - Accumulates the values of the original Observable. Will be passed
+--                               the return value of the last call as the first argument and the
+--                               current values as the rest of the arguments.
+-- @arg {*} seed - A value to pass to the accumulator the first time it is run.
+-- @returns {Observable}
+function Observable:reduce(accumulator, seed)
+  return Observable.create(function(observer)
+    local result = seed
+    local first = true
+
+    local function onNext(...)
+      if first and seed == nil then
+        result = ...
+        first = false
+      else
+        result = accumulator(result, ...)
+      end
+    end
+
+    local function onError(e)
+      return observer:onError(e)
+    end
+
+    local function onCompleted()
+      observer:onNext(result)
+      return observer:onCompleted()
+    end
+
+    return self:subscribe(onNext, onError, onCompleted)
+  end)
+end
+

+ 29 - 0
src/operators/reject.lua

@@ -0,0 +1,29 @@
+local Observable = require 'observable'
+local util = require 'util'
+
+--- Returns a new Observable that produces values from the original which do not satisfy a
+-- predicate.
+-- @arg {function} predicate - The predicate used to reject values.
+-- @returns {Observable}
+function Observable:reject(predicate)
+  predicate = predicate or util.identity
+
+  return Observable.create(function(observer)
+    local function onNext(...)
+      if not predicate(...) then
+        return observer:onNext(...)
+      end
+    end
+
+    local function onError(e)
+      return observer:onError(e)
+    end
+
+    local function onCompleted()
+      return observer:onCompleted(e)
+    end
+
+    return self:subscribe(onNext, onError, onCompleted)
+  end)
+end
+

+ 32 - 0
src/operators/skip.lua

@@ -0,0 +1,32 @@
+local Observable = require 'observable'
+
+--- Returns a new Observable that skips over a specified number of values produced by the original
+-- and produces the rest.
+-- @arg {number=1} n - The number of values to ignore.
+-- @returns {Observable}
+function Observable:skip(n)
+  n = n or 1
+
+  return Observable.create(function(observer)
+    local i = 1
+
+    local function onNext(...)
+      if i > n then
+        observer:onNext(...)
+      else
+        i = i + 1
+      end
+    end
+
+    local function onError(e)
+      return observer:onError(e)
+    end
+
+    local function onCompleted()
+      return observer:onCompleted()
+    end
+
+    return self:subscribe(onNext, onError, onCompleted)
+  end)
+end
+

+ 37 - 0
src/operators/skipUntil.lua

@@ -0,0 +1,37 @@
+local Observable = require 'observable'
+
+--- Returns a new Observable that skips over values produced by the original until the specified
+-- Observable produces a value.
+-- @arg {Observable} other - The Observable that triggers the production of values.
+-- @returns {Observable}
+function Observable:skipUntil(other)
+  return Observable.create(function(observer)
+    local triggered = false
+    local function trigger()
+      triggered = true
+    end
+
+    other:subscribe(trigger, trigger, trigger)
+
+    local function onNext(...)
+      if triggered then
+        observer:onNext(...)
+      end
+    end
+
+    local function onError()
+      if triggered then
+        observer:onError()
+      end
+    end
+
+    local function onCompleted()
+      if triggered then
+        observer:onCompleted()
+      end
+    end
+
+    return self:subscribe(onNext, onError, onCompleted)
+  end)
+end
+

+ 34 - 0
src/operators/skipWhile.lua

@@ -0,0 +1,34 @@
+local Observable = require 'observable'
+local util = require 'util'
+
+--- Returns a new Observable that skips elements until the predicate returns falsy for one of them.
+-- @arg {function} predicate - The predicate used to continue skipping values.
+-- @returns {Observable}
+function Observable:skipWhile(predicate)
+  predicate = predicate or util.identity
+
+  return Observable.create(function(observer)
+    local skipping = true
+
+    local function onNext(...)
+      if skipping then
+        skipping = predicate(...)
+      end
+
+      if not skipping then
+        return observer:onNext(...)
+      end
+    end
+
+    local function onError(message)
+      return observer:onError(message)
+    end
+
+    local function onCompleted()
+      return observer:onCompleted()
+    end
+
+    return self:subscribe(onNext, onError, onCompleted)
+  end)
+end
+

+ 38 - 0
src/operators/take.lua

@@ -0,0 +1,38 @@
+local Observable = require 'observable'
+
+--- Returns a new Observable that only produces the first n results of the original.
+-- @arg {number=1} n - The number of elements to produce before completing.
+-- @returns {Observable}
+function Observable:take(n)
+  n = n or 1
+
+  return Observable.create(function(observer)
+    if n <= 0 then
+      observer:onCompleted()
+      return
+    end
+
+    local i = 1
+
+    local function onNext(...)
+      observer:onNext(...)
+
+      i = i + 1
+
+      if i > n then
+        observer:onCompleted()
+      end
+    end
+
+    local function onError(e)
+      return observer:onError(e)
+    end
+
+    local function onCompleted()
+      return observer:onCompleted()
+    end
+
+    return self:subscribe(onNext, onError, onCompleted)
+  end)
+end
+

+ 25 - 0
src/operators/takeUntil.lua

@@ -0,0 +1,25 @@
+local Observable = require 'observable'
+
+--- Returns a new Observable that completes when the specified Observable fires.
+-- @arg {Observable} other - The Observable that triggers completion of the original.
+-- @returns {Observable}
+function Observable:takeUntil(other)
+  return Observable.create(function(observer)
+    local function onNext(...)
+      return observer:onNext(...)
+    end
+
+    local function onError(e)
+      return observer:onError(e)
+    end
+
+    local function onCompleted()
+      return observer:onCompleted()
+    end
+
+    other:subscribe(onCompleted, onCompleted, onCompleted)
+
+    return self:subscribe(onNext, onError, onCompleted)
+  end)
+end
+

+ 36 - 0
src/operators/takeWhile.lua

@@ -0,0 +1,36 @@
+local Observable = require 'observable'
+local util = require 'util'
+
+--- Returns a new Observable that produces elements until the predicate returns falsy.
+-- @arg {function} predicate - The predicate used to continue production of values.
+-- @returns {Observable}
+function Observable:takeWhile(predicate)
+  predicate = predicate or util.identity
+
+  return Observable.create(function(observer)
+    local taking = true
+
+    local function onNext(...)
+      if taking then
+        taking = predicate(...)
+
+        if taking then
+          return observer:onNext(...)
+        else
+          return observer:onCompleted()
+        end
+      end
+    end
+
+    local function onError(message)
+      return observer:onError(message)
+    end
+
+    local function onCompleted()
+      return observer:onCompleted()
+    end
+
+    return self:subscribe(onNext, onError, onCompleted)
+  end)
+end
+

+ 34 - 0
src/operators/tap.lua

@@ -0,0 +1,34 @@
+local Observable = require 'observable'
+local util = require 'util'
+
+--- Runs a function each time this Observable has activity. Similar to subscribe but does not
+-- create a subscription.
+-- @arg {function=} onNext - Run when the Observable produces values.
+-- @arg {function=} onError - Run when the Observable encounters a problem.
+-- @arg {function=} onCompleted - Run when the Observable completes.
+-- @returns {Observable}
+function Observable:tap(_onNext, _onError, _onCompleted)
+  _onNext = _onNext or util.noop
+  _onError = _onError or util.noop
+  _onCompleted = _onCompleted or util.noop
+
+  return Observable.create(function(observer)
+    local function onNext(...)
+      _onNext(...)
+      return observer:onNext(...)
+    end
+
+    local function onError(message)
+      _onError(message)
+      return observer:onError(message)
+    end
+
+    local function onCompleted()
+      _onCompleted()
+      return observer:onCompleted()
+    end
+
+    return self:subscribe(onNext, onError, onCompleted)
+  end)
+end
+

+ 9 - 0
src/operators/unpack.lua

@@ -0,0 +1,9 @@
+local Observable = require 'observable'
+local util = require 'util'
+
+--- Returns an Observable that unpacks the tables produced by the original.
+-- @returns {Observable}
+function Observable:unpack()
+  return self:map(util.unpack)
+end
+

+ 26 - 0
src/operators/unwrap.lua

@@ -0,0 +1,26 @@
+local Observable = require 'observable'
+
+--- Returns an Observable that takes any values produced by the original that consist of multiple
+-- return values and produces each value individually.
+-- @returns {Observable}
+function Observable:unwrap()
+  return Observable.create(function(observer)
+    local function onNext(...)
+      local values = {...}
+      for i = 1, #values do
+        observer:onNext(values[i])
+      end
+    end
+
+    local function onError(message)
+      return observer:onError(message)
+    end
+
+    local function onCompleted()
+      return observer:onCompleted()
+    end
+
+    return self:subscribe(onNext, onError, onCompleted)
+  end)
+end
+

+ 32 - 0
src/operators/window.lua

@@ -0,0 +1,32 @@
+local Observable = require 'observable'
+local util = require 'util'
+
+--- Returns an Observable that produces a sliding window of the values produced by the original.
+-- @arg {number} size - The size of the window. The returned observable will produce this number
+--                      of the most recent values as multiple arguments to onNext.
+-- @returns {Observable}
+function Observable:window(size)
+  return Observable.create(function(observer)
+    local window = {}
+
+    local function onNext(value)
+      table.insert(window, value)
+
+      if #window >= size then
+        observer:onNext(util.unpack(window))
+        table.remove(window, 1)
+      end
+    end
+
+    local function onError(message)
+      return observer:onError(message)
+    end
+
+    local function onCompleted()
+      return observer:onCompleted()
+    end
+
+    return self:subscribe(onNext, onError, onCompleted)
+  end)
+end
+

+ 40 - 0
src/operators/with.lua

@@ -0,0 +1,40 @@
+local Observable = require 'observable'
+local util = require 'util'
+
+--- Returns an Observable that produces values from the original along with the most recently
+-- produced value from all other specified Observables. Note that only the first argument from each
+-- source Observable is used.
+-- @arg {Observable...} sources - The Observables to include the most recent values from.
+-- @returns {Observable}
+function Observable:with(...)
+  local sources = {...}
+
+  return Observable.create(function(observer)
+    local latest = setmetatable({}, {__len = util.constant(#sources)})
+
+    local function setLatest(i)
+      return function(value)
+        latest[i] = value
+      end
+    end
+
+    local function onNext(value)
+      return observer:onNext(value, util.unpack(latest))
+    end
+
+    local function onError(e)
+      return observer:onError(e)
+    end
+
+    local function onCompleted()
+      return observer:onCompleted()
+    end
+
+    for i = 1, #sources do
+      sources[i]:subscribe(setLatest(i), util.noop, util.noop)
+    end
+
+    return self:subscribe(onNext, onError, onCompleted)
+  end)
+end
+

+ 37 - 0
tools/concat.lua

@@ -6,6 +6,43 @@ local files = {
   'src/subscription.lua',
   'src/subscription.lua',
   'src/observer.lua',
   'src/observer.lua',
   'src/observable.lua',
   'src/observable.lua',
+  'src/operators/all.lua',
+  'src/operators/amb.lua',
+  'src/operators/average.lua',
+  'src/operators/buffer.lua',
+  'src/operators/catch.lua',
+  'src/operators/combineLatest.lua',
+  'src/operators/compact.lua',
+  'src/operators/concat.lua',
+  'src/operators/contains.lua',
+  'src/operators/count.lua',
+  'src/operators/distinct.lua',
+  'src/operators/distinctUntilChanged.lua',
+  'src/operators/filter.lua',
+  'src/operators/find.lua',
+  'src/operators/first.lua',
+  'src/operators/flatten.lua',
+  'src/operators/last.lua',
+  'src/operators/map.lua',
+  'src/operators/max.lua',
+  'src/operators/merge.lua',
+  'src/operators/min.lua',
+  'src/operators/pack.lua',
+  'src/operators/partition.lua',
+  'src/operators/pluck.lua',
+  'src/operators/reduce.lua',
+  'src/operators/reject.lua',
+  'src/operators/skip.lua',
+  'src/operators/skipUntil.lua',
+  'src/operators/skipWhile.lua',
+  'src/operators/take.lua',
+  'src/operators/takeUntil.lua',
+  'src/operators/takeWhile.lua',
+  'src/operators/tap.lua',
+  'src/operators/unpack.lua',
+  'src/operators/unwrap.lua',
+  'src/operators/window.lua',
+  'src/operators/with.lua',
   'src/schedulers/immediatescheduler.lua',
   'src/schedulers/immediatescheduler.lua',
   'src/schedulers/cooperativescheduler.lua',
   'src/schedulers/cooperativescheduler.lua',
   'src/subjects/subject.lua',
   'src/subjects/subject.lua',