Browse Source

Separate source into src directory; Minor fixes;

bjorn 9 years ago
parent
commit
04f925dfa8

+ 88 - 83
rx.lua

@@ -1,18 +1,22 @@
-local rx
+-- RxLua v0.0.1
+-- https://github.com/bjornbytes/rxlua
+-- MIT License
 
-local pack = table.pack or function(...) return { n = select('#', ...), ... } end
-local unpack = table.unpack or unpack
-local function eq(x, y) return x == y end
-local function noop() end
-local function identity(x) return x end
-local function constant(x) return function() return x end end
+local util = {}
+
+util.pack = table.pack or function(...) return { n = select('#', ...), ... } end
+util.unpack = table.unpack or unpack
+util.eq = function(x, y) return x == y end
+util.noop = function() end
+util.identity = function(x) return x end
+util.constant = function(x) return function() return x end end
 
 --- @class Subscription
--- @description A handle representing the link between an Observer and an Observable, as well as
--- any work required to clean up after the Observable completes or the Observer unsubscribes.
+-- @description A handle representing the link between an Observer and an Observable, as well as any
+-- work required to clean up after the Observable completes or the Observer unsubscribes.
 local Subscription = {}
 Subscription.__index = Subscription
-Subscription.__tostring = constant('Subscription')
+Subscription.__tostring = util.constant('Subscription')
 
 --- Creates a new Subscription.
 -- @arg {function=} action - The action to run when the subscription is unsubscribed. It will only
@@ -20,7 +24,7 @@ Subscription.__tostring = constant('Subscription')
 -- @returns {Subscription}
 function Subscription.create(action)
   local self = {
-    action = action or noop,
+    action = action or util.noop,
     unsubscribed = false
   }
 
@@ -38,7 +42,7 @@ end
 -- @description Observers are simple objects that receive values from Observables.
 local Observer = {}
 Observer.__index = Observer
-Observer.__tostring = constant('Observer')
+Observer.__tostring = util.constant('Observer')
 
 --- Creates a new Observer.
 -- @arg {function=} onNext - Called when the Observable produces a value.
@@ -47,9 +51,9 @@ Observer.__tostring = constant('Observer')
 -- @returns {Observer}
 function Observer.create(onNext, onError, onComplete)
   local self = {
-    _onNext = onNext or noop,
+    _onNext = onNext or util.noop,
     _onError = onError or error,
-    _onComplete = onComplete or noop,
+    _onComplete = onComplete or util.noop,
     stopped = false
   }
 
@@ -85,7 +89,7 @@ end
 -- @description Observables push values to Observers.
 local Observable = {}
 Observable.__index = Observable
-Observable.__tostring = constant('Observable')
+Observable.__tostring = util.constant('Observable')
 
 --- Creates a new Observable.
 -- @arg {function} subscribe - The subscription function that produces values.
@@ -161,10 +165,10 @@ end
 --- Creates an Observable that produces values when the specified coroutine yields.
 -- @arg {thread} coroutine
 -- @returns {Observable}
-function Observable.fromCoroutine(thread)
+function Observable.fromCoroutine(thread, scheduler)
   thread = type(thread) == 'function' and coroutine.create(thread) or thread
   return Observable.create(function(observer)
-    return rx.scheduler:schedule(function()
+    return scheduler:schedule(function()
       while not observer.stopped do
         local success, value = coroutine.resume(thread)
 
@@ -198,15 +202,12 @@ function Observable:dump(name, formatter)
   return self:subscribe(onNext, onError, onComplete)
 end
 
--- The functions below transform the values produced by an Observable and return a new Observable
--- that produces these values.
-
 --- Returns an Observable that only produces values from the original if they are different from
 -- the previous value.
 -- @arg {function} comparator - A function used to compare 2 values. If unspecified, == is used.
 -- @returns {Observable}
 function Observable:changes(comparator)
-  comparator = comparator or eq
+  comparator = comparator or util.eq
 
   return Observable.create(function(observer)
     local first = true
@@ -242,11 +243,15 @@ end
 function Observable:combine(...)
   local sources = {...}
   local combinator = table.remove(sources)
+  if type(combinator) ~= 'function' then
+    table.insert(sources, combinator)
+    combinator = function(...) return ... end
+  end
   table.insert(sources, 1, self)
 
   return Observable.create(function(observer)
     local latest = {}
-    local pending = {unpack(sources)}
+    local pending = {util.unpack(sources)}
     local completed = {}
 
     local function onNext(i)
@@ -255,7 +260,7 @@ function Observable:combine(...)
         pending[i] = nil
 
         if not next(pending) then
-          observer:onNext(combinator(unpack(latest)))
+          observer:onNext(combinator(util.unpack(latest)))
         end
       end
     end
@@ -283,7 +288,7 @@ end
 --- Returns a new Observable that produces the values of the first with falsy values removed.
 -- @returns {Observable}
 function Observable:compact()
-  return self:filter(identity)
+  return self:filter(util.identity)
 end
 
 --- Returns a new Observable that produces the values produced by all the specified Observables in
@@ -309,7 +314,7 @@ function Observable:concat(other, ...)
     end
 
     local function chain()
-      return other:concat(unpack(others)):subscribe(onNext, onError, onComplete)
+      return other:concat(util.unpack(others)):subscribe(onNext, onError, onComplete)
     end
 
     return self:subscribe(onNext, onError, chain)
@@ -346,7 +351,7 @@ end
 -- @arg {function} predicate - The predicate used to filter values.
 -- @returns {Observable}
 function Observable:filter(predicate)
-  predicate = predicate or identity
+  predicate = predicate or util.identity
 
   return Observable.create(function(observer)
     local function onNext(...)
@@ -371,7 +376,7 @@ end
 -- predicate.
 -- @arg {function} predicate - The predicate used to find a value.
 function Observable:find(predicate)
-  predicate = predicate or identity
+  predicate = predicate or util.identity
 
   return Observable.create(function(observer)
     local function onNext(...)
@@ -413,7 +418,7 @@ function Observable:flatten()
         observer:onNext(...)
       end
 
-      observable:subscribe(innerOnNext, onError, noop)
+      observable:subscribe(innerOnNext, onError, util.noop)
     end
 
     local function onComplete()
@@ -442,7 +447,7 @@ function Observable:last()
 
     local function onComplete()
       if not empty then
-        observer:onNext(unpack(value or {}))
+        observer:onNext(util.unpack(value or {}))
       end
 
       return observer:onComplete()
@@ -457,7 +462,7 @@ end
 -- @returns {Observable}
 function Observable:map(callback)
   return Observable.create(function(observer)
-    callback = callback or identity
+    callback = callback or util.identity
 
     local function onNext(...)
       return observer:onNext(callback(...))
@@ -523,7 +528,7 @@ end
 --- Returns an Observable that produces the values of the original inside tables.
 -- @returns {Observable}
 function Observable:pack()
-  return self:map(pack)
+  return self:map(util.pack)
 end
 
 --- Returns two Observables: one that produces values for which the predicate returns truthy for,
@@ -599,7 +604,7 @@ end
 -- @arg {function} predicate - The predicate used to reject values.
 -- @returns {Observable}
 function Observable:reject(predicate)
-  predicate = predicate or identity
+  predicate = predicate or util.identity
 
   return Observable.create(function(observer)
     local function onNext(...)
@@ -689,7 +694,7 @@ end
 -- @arg {function} predicate - The predicate used to continue skipping values.
 -- @returns {Observable}
 function Observable:skipWhile(predicate)
-  predicate = predicate or identity
+  predicate = predicate or util.identity
 
   return Observable.create(function(observer)
     local skipping = true
@@ -779,7 +784,7 @@ end
 -- @arg {function} predicate - The predicate used to continue production of values.
 -- @returns {Observable}
 function Observable:takeWhile(predicate)
-  predicate = predicate or identity
+  predicate = predicate or util.identity
 
   return Observable.create(function(observer)
     local taking = true
@@ -815,7 +820,9 @@ end
 -- @arg {function=} onComplete - Run when the Observable completes.
 -- @returns {Observable}
 function Observable:tap(_onNext, _onError, _onComplete)
-  _onNext, _onError, _onComplete = _onNext or noop, _onError or noop, _onComplete or noop
+  _onNext = _onNext or util.noop
+  _onError = _onError or util.noop
+  _onComplete = _onComplete or util.noop
 
   return Observable.create(function(observer)
     local function onNext(...)
@@ -837,10 +844,10 @@ function Observable:tap(_onNext, _onError, _onComplete)
   end)
 end
 
---- Returns an Observable that unpacks the tables produced by the original.
+--- Returns an Observable that util.unpacks the tables produced by the original.
 -- @returns {Observable}
 function Observable:unpack()
-  return self:map(unpack)
+  return self:map(util.unpack)
 end
 
 --- Returns an Observable that takes any values produced by the original that consist of multiple
@@ -879,7 +886,7 @@ function Observable:window(size)
       table.insert(window, value)
 
       if #window >= size then
-        observer:onNext(unpack(window))
+        observer:onNext(util.unpack(window))
         table.remove(window, 1)
       end
     end
@@ -905,7 +912,7 @@ function Observable:with(...)
   local sources = {...}
 
   return Observable.create(function(observer)
-    local latest = setmetatable({}, {__len = constant(#sources)})
+    local latest = setmetatable({}, {__len = util.constant(#sources)})
 
     local function setLatest(i)
       return function(value)
@@ -914,7 +921,7 @@ function Observable:with(...)
     end
 
     local function onNext(value)
-      return observer:onNext(value, unpack(latest))
+      return observer:onNext(value, util.unpack(latest))
     end
 
     local function onError(e)
@@ -926,7 +933,7 @@ function Observable:with(...)
     end
 
     for i = 1, #sources do
-      sources[i]:subscribe(setLatest(i), noop, noop)
+      sources[i]:subscribe(setLatest(i), util.noop, util.noop)
     end
 
     return self:subscribe(onNext, onError, onComplete)
@@ -942,7 +949,7 @@ function Observable:wrap(size)
 
     local function emit()
       if #buffer > 0 then
-        observer:onNext(unpack(buffer))
+        observer:onNext(util.unpack(buffer))
         buffer = {}
       end
     end
@@ -971,47 +978,41 @@ function Observable:wrap(size)
   end)
 end
 
---- @class Scheduler
--- @description Schedulers manage groups of Observables.
-local Scheduler = {}
-
 --- @class ImmediateScheduler
 -- @description Schedules Observables by running all operations immediately.
-local Immediate = {}
-Immediate.__index = Immediate
-Immediate.__tostring = constant('ImmediateScheduler')
-
---- Creates a new Immediate Scheduler.
--- @returns {Scheduler.Immediate}
-function Immediate.create()
-  return setmetatable({}, Immediate)
+local ImmediateScheduler = {}
+ImmediateScheduler.__index = ImmediateScheduler
+ImmediateScheduler.__tostring = util.constant('ImmediateScheduler')
+
+--- Creates a new ImmediateScheduler.
+-- @returns {ImmediateScheduler}
+function ImmediateScheduler.create()
+  return setmetatable({}, ImmediateScheduler)
 end
 
 --- Schedules a function to be run on the scheduler. It is executed immediately.
 -- @arg {function} action - The function to execute.
-function Immediate:schedule(action)
+function ImmediateScheduler:schedule(action)
   action()
 end
 
-Scheduler.Immediate = Immediate
-
 --- @class CooperativeScheduler
 -- @description Manages Observables using coroutines and a virtual clock that must be updated
 -- manually.
-local Cooperative = {}
-Cooperative.__index = Cooperative
-Cooperative.__tostring = constant('CooperativeScheduler')
+local CooperativeScheduler = {}
+CooperativeScheduler.__index = CooperativeScheduler
+CooperativeScheduler.__tostring = util.constant('CooperativeScheduler')
 
---- Creates a new Cooperative Scheduler.
+--- Creates a new CooperativeScheduler.
 -- @arg {number=0} currentTime - A time to start the scheduler at.
--- @returns {Scheduler.Cooperative}
-function Cooperative.create(currentTime)
+-- @returns {Scheduler.CooperativeScheduler}
+function CooperativeScheduler.create(currentTime)
   local self = {
     tasks = {},
     currentTime = currentTime or 0
   }
 
-  return setmetatable(self, Cooperative)
+  return setmetatable(self, CooperativeScheduler)
 end
 
 --- Schedules a function to be run after an optional delay.
@@ -1019,7 +1020,7 @@ end
 --                          coroutine may yield execution back to the scheduler with an optional
 --                          number, which will put it to sleep for a time period.
 -- @arg {number=0} delay - Delay execution of the action by a time period.
-function Cooperative:schedule(action, delay)
+function CooperativeScheduler:schedule(action, delay)
   local task = {
     thread = coroutine.create(action),
     due = self.currentTime + (delay or 0)
@@ -1027,12 +1028,12 @@ function Cooperative:schedule(action, delay)
 
   table.insert(self.tasks, task)
 
-  return rx.Subscription.create(function()
+  return Subscription.create(function()
     return self:unschedule(task)
   end)
 end
 
-function Cooperative:unschedule(task)
+function CooperativeScheduler:unschedule(task)
   for i = 1, #self.tasks do
     if self.tasks[i] == task then
       table.remove(self.tasks, i)
@@ -1040,12 +1041,12 @@ function Cooperative:unschedule(task)
   end
 end
 
---- Triggers an update of the Cooperative Scheduler. The clock will be advanced and the scheduler
+--- Triggers an update of the CooperativeScheduler. The clock will be advanced and the scheduler
 -- will run any coroutines that are due to be run.
 -- @arg {number=0} delta - An amount of time to advance the clock by. It is common to pass in the
 --                         time in seconds or milliseconds elapsed since this function was last
 --                         called.
-function Cooperative:update(delta)
+function CooperativeScheduler:update(delta)
   self.currentTime = self.currentTime + (delta or 0)
 
   for i = #self.tasks, 1, -1 do
@@ -1067,20 +1068,18 @@ function Cooperative:update(delta)
   end
 end
 
---- Returns whether or not the Cooperative Scheduler's queue is empty.
-function Cooperative:isEmpty()
+--- Returns whether or not the CooperativeScheduler's queue is empty.
+function CooperativeScheduler:isEmpty()
   return not next(self.tasks)
 end
 
-Scheduler.Cooperative = Cooperative
-
 --- @class Subject
 -- @description Subjects function both as an Observer and as an Observable. Subjects inherit all
 -- Observable functions, including subscribe. Values can also be pushed to the Subject, which will
 -- be broadcasted to any subscribed Observers.
 local Subject = setmetatable({}, Observable)
 Subject.__index = Subject
-Subject.__tostring = constant('Subject')
+Subject.__tostring = util.constant('Subject')
 
 --- Creates a new Subject.
 -- @returns {Subject}
@@ -1150,7 +1149,7 @@ Subject.__call = Subject.onNext
 -- recent pushed value, and all subscribers immediately receive the latest value.
 local BehaviorSubject = setmetatable({}, Subject)
 BehaviorSubject.__index = BehaviorSubject
-BehaviorSubject.__tostring = constant('BehaviorSubject')
+BehaviorSubject.__tostring = util.constant('BehaviorSubject')
 
 --- Creates a new BehaviorSubject.
 -- @arg {*...} value - The initial values.
@@ -1162,7 +1161,7 @@ function BehaviorSubject.create(...)
   }
 
   if select('#', ...) > 0 then
-    self.value = pack(...)
+    self.value = util.pack(...)
   end
 
   return setmetatable(self, BehaviorSubject)
@@ -1184,18 +1183,24 @@ end
 --- Pushes zero or more values to the BehaviorSubject. They will be broadcasted to all Observers.
 -- @arg {*...} values
 function BehaviorSubject:onNext(...)
-  self.value = pack(...)
+  self.value = util.pack(...)
   return Subject.onNext(self, ...)
 end
 
-rx = {
+--- Returns the last value emitted by the Subject, or the initial value passed to the constructor
+-- if nothing has been emitted yet.
+-- @returns {*...}
+function BehaviorSubject:getValue()
+  return self.value and util.unpack(self.value)
+end
+
+return {
+  util = util,
   Subscription = Subscription,
   Observer = Observer,
   Observable = Observable,
-  Scheduler = Scheduler,
-  scheduler = Scheduler.Immediate.create(),
+  ImmediateScheduler = ImmediateScheduler,
+  CooperativeScheduler = CooperativeScheduler,
   Subject = Subject,
   BehaviorSubject = BehaviorSubject
-}
-
-return rx
+}

+ 896 - 0
src/observable.lua

@@ -0,0 +1,896 @@
+local util = require 'util'
+
+--- @class Observable
+-- @description Observables push values to Observers.
+local Observable = {}
+Observable.__index = Observable
+Observable.__tostring = util.constant('Observable')
+
+--- Creates a new Observable.
+-- @arg {function} subscribe - The subscription function that produces values.
+-- @returns {Observable}
+function Observable.create(subscribe)
+  local self = {
+    _subscribe = subscribe
+  }
+
+  return setmetatable(self, Observable)
+end
+
+--- Shorthand for creating an Observer and passing it to this Observable's subscription function.
+-- @arg {function} onNext - Called when the Observable produces a value.
+-- @arg {function} onError - Called when the Observable terminates due to an error.
+-- @arg {function} onComplete - Called when the Observable completes normally.
+function Observable:subscribe(onNext, onError, onComplete)
+  if type(onNext) == 'table' then
+    return self._subscribe(onNext)
+  else
+    return self._subscribe(Observer.create(onNext, onError, onComplete))
+  end
+end
+
+--- Creates an Observable that produces a single value.
+-- @arg {*} value
+-- @returns {Observable}
+function Observable.fromValue(value)
+  return Observable.create(function(observer)
+    observer:onNext(value)
+    observer:onComplete()
+  end)
+end
+
+--- Creates an Observable that produces a range of values in a manner similar to a Lua for loop.
+-- @arg {number} initial - The first value of the range, or the upper limit if no other arguments
+--                         are specified.
+-- @arg {number=} limit - The second value of the range.
+-- @arg {number=1} step - An amount to increment the value by each iteration.
+-- @returns {Observable}
+function Observable.fromRange(initial, limit, step)
+  if not limit and not step then
+    initial, limit = 1, initial
+  end
+
+  step = step or 1
+
+  return Observable.create(function(observer)
+    for i = initial, limit, step do
+      observer:onNext(i)
+    end
+
+    observer:onComplete()
+  end)
+end
+
+--- Creates an Observable that produces values from a table.
+-- @arg {table} table - The table used to create the Observable.
+-- @arg {function=pairs} iterator - An iterator used to iterate the table, e.g. pairs or ipairs.
+-- @arg {boolean} keys - Whether or not to also emit the keys of the table.
+-- @returns {Observable}
+function Observable.fromTable(t, iterator, keys)
+  iterator = iterator or pairs
+  return Observable.create(function(observer)
+    for key, value in iterator(t) do
+      observer:onNext(value, keys and key or nil)
+    end
+
+    observer:onComplete()
+  end)
+end
+
+--- Creates an Observable that produces values when the specified coroutine yields.
+-- @arg {thread} coroutine
+-- @returns {Observable}
+function Observable.fromCoroutine(thread, scheduler)
+  thread = type(thread) == 'function' and coroutine.create(thread) or thread
+  return Observable.create(function(observer)
+    return scheduler:schedule(function()
+      while not observer.stopped do
+        local success, value = coroutine.resume(thread)
+
+        if success then
+          observer:onNext(value)
+        else
+          return observer:onError(value)
+        end
+
+        if coroutine.status(thread) == 'dead' then
+          return observer:onComplete()
+        end
+
+        coroutine.yield()
+      end
+    end)
+  end)
+end
+
+--- Subscribes to this Observable and prints values it produces.
+-- @arg {string=} name - Prefixes the printed messages with a name.
+-- @arg {function=tostring} formatter - A function that formats one or more values to be printed.
+function Observable:dump(name, formatter)
+  name = name and (name .. ' ') or ''
+  formatter = formatter or tostring
+
+  local onNext = function(...) print(name .. 'onNext: ' .. formatter(...)) end
+  local onError = function(e) print(name .. 'onError: ' .. e) end
+  local onComplete = function() print(name .. 'onComplete') end
+
+  return self:subscribe(onNext, onError, onComplete)
+end
+
+--- Returns an Observable that only produces values from the original if they are different from
+-- the previous value.
+-- @arg {function} comparator - A function used to compare 2 values. If unspecified, == is used.
+-- @returns {Observable}
+function Observable:changes(comparator)
+  comparator = comparator or util.eq
+
+  return Observable.create(function(observer)
+    local first = true
+    local currentValue = nil
+
+    local function onNext(value, ...)
+      if first or not comparator(value, currentValue) then
+        observer:onNext(value, ...)
+        currentValue = value
+        first = false
+      end
+    end
+
+    local function onError(message)
+      return observer:onError(onError)
+    end
+
+    local function onComplete()
+      return observer:onComplete()
+    end
+
+    return self:subscribe(onNext, onError, onComplete)
+  end)
+end
+
+--- Returns a new Observable that runs a combinator function on the most recent values from a set
+-- of Observables whenever any of them produce a new value. The results of the combinator function
+-- are produced by the new Observable.
+-- @arg {Observable...} observables - One or more Observables to combine.
+-- @arg {function} combinator - A function that combines the latest result from each Observable and
+--                              returns a single value.
+-- @returns {Observable}
+function Observable:combine(...)
+  local sources = {...}
+  local combinator = table.remove(sources)
+  if type(combinator) ~= 'function' then
+    table.insert(sources, combinator)
+    combinator = function(...) return ... end
+  end
+  table.insert(sources, 1, self)
+
+  return Observable.create(function(observer)
+    local latest = {}
+    local pending = {util.unpack(sources)}
+    local completed = {}
+
+    local function onNext(i)
+      return function(value)
+        latest[i] = value
+        pending[i] = nil
+
+        if not next(pending) then
+          observer:onNext(combinator(util.unpack(latest)))
+        end
+      end
+    end
+
+    local function onError(e)
+      return observer:onError(e)
+    end
+
+    local function onComplete(i)
+      return function()
+        table.insert(completed, i)
+
+        if #completed == #sources then
+          observer:onComplete()
+        end
+      end
+    end
+
+    for i = 1, #sources do
+      sources[i]:subscribe(onNext(i), onError, onComplete(i))
+    end
+  end)
+end
+
+--- Returns a new Observable that produces the values of the first with falsy values removed.
+-- @returns {Observable}
+function Observable:compact()
+  return self:filter(util.identity)
+end
+
+--- Returns a new Observable that produces the values produced by all the specified Observables in
+-- the order they are specified.
+-- @arg {Observable...} sources - The Observables to concatenate.
+-- @returns {Observable}
+function Observable:concat(other, ...)
+  if not other then return self end
+
+  local others = {...}
+
+  return Observable.create(function(observer)
+    local function onNext(...)
+      return observer:onNext(...)
+    end
+
+    local function onError(message)
+      return observer:onError(message)
+    end
+
+    local function onComplete()
+      return observer:onComplete()
+    end
+
+    local function chain()
+      return other:concat(util.unpack(others)):subscribe(onNext, onError, onComplete)
+    end
+
+    return self:subscribe(onNext, onError, chain)
+  end)
+end
+
+--- Returns a new Observable that produces the values from the original with duplicates removed.
+-- @returns {Observable}
+function Observable:distinct()
+  return Observable.create(function(observer)
+    local values = {}
+
+    local function onNext(x)
+      if not values[x] then
+        observer:onNext(x)
+      end
+
+      values[x] = true
+    end
+
+    local function onError(e)
+      return observer:onError(e)
+    end
+
+    local function onComplete()
+      return observer:onComplete()
+    end
+
+    return self:subscribe(onNext, onError, onComplete)
+  end)
+end
+
+--- Returns a new Observable that only produces values of the first that satisfy a predicate.
+-- @arg {function} predicate - The predicate used to filter values.
+-- @returns {Observable}
+function Observable:filter(predicate)
+  predicate = predicate or util.identity
+
+  return Observable.create(function(observer)
+    local function onNext(...)
+      if predicate(...) then
+        return observer:onNext(...)
+      end
+    end
+
+    local function onError(e)
+      return observer:onError(e)
+    end
+
+    local function onComplete()
+      return observer:onComplete(e)
+    end
+
+    return self:subscribe(onNext, onError, onComplete)
+  end)
+end
+
+--- Returns a new Observable that produces the first value of the original that satisfies a
+-- predicate.
+-- @arg {function} predicate - The predicate used to find a value.
+function Observable:find(predicate)
+  predicate = predicate or util.identity
+
+  return Observable.create(function(observer)
+    local function onNext(...)
+      if predicate(...) then
+        observer:onNext(...)
+        return observer:onComplete()
+      end
+    end
+
+    local function onError(message)
+      return observer:onError(e)
+    end
+
+    local function onComplete()
+      return observer:onComplete()
+    end
+
+    return self:subscribe(onNext, onError, onComplete)
+  end)
+end
+
+--- Returns a new Observable that only produces the first result of the original.
+-- @returns {Observable}
+function Observable:first()
+  return self:take(1)
+end
+
+--- Returns a new Observable that subscribes to the Observables produced by the original and
+-- produces their values.
+-- @returns {Observable}
+function Observable:flatten()
+  return Observable.create(function(observer)
+    local function onError(message)
+      return observer:onError(message)
+    end
+
+    local function onNext(observable)
+      local function innerOnNext(...)
+        observer:onNext(...)
+      end
+
+      observable:subscribe(innerOnNext, onError, util.noop)
+    end
+
+    local function onComplete()
+      return observer:onComplete()
+    end
+
+    return self:subscribe(onNext, onError, onComplete)
+  end)
+end
+
+--- Returns a new Observable that only produces the last result of the original.
+-- @returns {Observable}
+function Observable:last()
+  return Observable.create(function(observer)
+    local value
+    local empty = true
+
+    local function onNext(...)
+      value = {...}
+      empty = false
+    end
+
+    local function onError(e)
+      return observer:onError(e)
+    end
+
+    local function onComplete()
+      if not empty then
+        observer:onNext(util.unpack(value or {}))
+      end
+
+      return observer:onComplete()
+    end
+
+    return self:subscribe(onNext, onError, onComplete)
+  end)
+end
+
+--- Returns a new Observable that produces the values of the original transformed by a function.
+-- @arg {function} callback - The function to transform values from the original Observable.
+-- @returns {Observable}
+function Observable:map(callback)
+  return Observable.create(function(observer)
+    callback = callback or util.identity
+
+    local function onNext(...)
+      return observer:onNext(callback(...))
+    end
+
+    local function onError(e)
+      return observer:onError(e)
+    end
+
+    local function onComplete()
+      return observer:onComplete()
+    end
+
+    return self:subscribe(onNext, onError, onComplete)
+  end)
+end
+
+--- Returns a new Observable that produces the maximum value produced by the original.
+-- @returns {Observable}
+function Observable:max()
+  return self:reduce(math.max)
+end
+
+--- Returns a new Observable that produces the values produced by all the specified Observables in
+-- the order they are produced.
+-- @arg {Observable...} sources - One or more Observables to merge.
+-- @returns {Observable}
+function Observable:merge(...)
+  local sources = {...}
+  table.insert(sources, 1, self)
+
+  return Observable.create(function(observer)
+    local function onNext(...)
+      return observer:onNext(...)
+    end
+
+    local function onError(message)
+      return observer:onError(message)
+    end
+
+    local function onComplete(i)
+      return function()
+        sources[i] = nil
+
+        if not next(sources) then
+          observer:onComplete()
+        end
+      end
+    end
+
+    for i = 1, #sources do
+      sources[i]:subscribe(onNext, onError, onComplete(i))
+    end
+  end)
+end
+
+--- Returns a new Observable that produces the minimum value produced by the original.
+-- @returns {Observable}
+function Observable:min()
+  return self:reduce(math.min)
+end
+
+--- Returns an Observable that produces the values of the original inside tables.
+-- @returns {Observable}
+function Observable:pack()
+  return self:map(util.pack)
+end
+
+--- Returns two Observables: one that produces values for which the predicate returns truthy for,
+-- and another that produces values for which the predicate returns falsy.
+-- @arg {function} predicate - The predicate used to partition the values.
+-- @returns {Observable}
+-- @returns {Observable}
+function Observable:partition(predicate)
+  return self:filter(predicate), self:reject(predicate)
+end
+
+--- Returns a new Observable that produces values computed by extracting the given keys from the
+-- tables produced by the original.
+-- @arg {string...} keys - The key to extract from the table. Multiple keys can be specified to
+--                         recursively pluck values from nested tables.
+-- @returns {Observable}
+function Observable:pluck(key, ...)
+  if not key then return self end
+
+  return Observable.create(function(observer)
+    local function onNext(t)
+      return observer:onNext(t[key])
+    end
+
+    local function onError(e)
+      return observer:onError(e)
+    end
+
+    local function onComplete()
+      return observer:onComplete()
+    end
+
+    return self:subscribe(onNext, onError, onComplete)
+  end):pluck(...)
+end
+
+--- Returns a new Observable that produces a single value computed by accumulating the results of
+-- running a function on each value produced by the original Observable.
+-- @arg {function} accumulator - Accumulates the values of the original Observable. Will be passed
+--                               the return value of the last call as the first argument and the
+--                               current values as the rest of the arguments.
+-- @arg {*} seed - A value to pass to the accumulator the first time it is run.
+-- @returns {Observable}
+function Observable:reduce(accumulator, seed)
+  return Observable.create(function(observer)
+    local result = seed
+    local first = true
+
+    local function onNext(...)
+      if first and seed == nil then
+        result = ...
+        first = false
+      else
+        result = accumulator(result, ...)
+      end
+    end
+
+    local function onError(e)
+      return observer:onError(e)
+    end
+
+    local function onComplete()
+      observer:onNext(result)
+      return observer:onComplete()
+    end
+
+    return self:subscribe(onNext, onError, onComplete)
+  end)
+end
+
+--- Returns a new Observable that produces values from the original which do not satisfy a
+-- predicate.
+-- @arg {function} predicate - The predicate used to reject values.
+-- @returns {Observable}
+function Observable:reject(predicate)
+  predicate = predicate or util.identity
+
+  return Observable.create(function(observer)
+    local function onNext(...)
+      if not predicate(...) then
+        return observer:onNext(...)
+      end
+    end
+
+    local function onError(e)
+      return observer:onError(e)
+    end
+
+    local function onComplete()
+      return observer:onComplete(e)
+    end
+
+    return self:subscribe(onNext, onError, onComplete)
+  end)
+end
+
+--- Returns a new Observable that skips over a specified number of values produced by the original
+-- and produces the rest.
+-- @arg {number=1} n - The number of values to ignore.
+-- @returns {Observable}
+function Observable:skip(n)
+  n = n or 1
+
+  return Observable.create(function(observer)
+    local i = 1
+
+    local function onNext(...)
+      if i > n then
+        observer:onNext(...)
+      else
+        i = i + 1
+      end
+    end
+
+    local function onError(e)
+      return observer:onError(e)
+    end
+
+    local function onComplete()
+      return observer:onComplete()
+    end
+
+    return self:subscribe(onNext, onError, onComplete)
+  end)
+end
+
+--- Returns a new Observable that skips over values produced by the original until the specified
+-- Observable produces a value.
+-- @arg {Observable} other - The Observable that triggers the production of values.
+-- @returns {Observable}
+function Observable:skipUntil(other)
+  return Observable.create(function(observer)
+    local triggered = false
+    local function trigger()
+      triggered = true
+    end
+
+    other:subscribe(trigger, trigger, trigger)
+
+    local function onNext(...)
+      if triggered then
+        observer:onNext(...)
+      end
+    end
+
+    local function onError()
+      if triggered then
+        observer:onError()
+      end
+    end
+
+    local function onComplete()
+      if triggered then
+        observer:onComplete()
+      end
+    end
+
+    return self:subscribe(onNext, onError, onComplete)
+  end)
+end
+
+--- Returns a new Observable that skips elements until the predicate returns falsy for one of them.
+-- @arg {function} predicate - The predicate used to continue skipping values.
+-- @returns {Observable}
+function Observable:skipWhile(predicate)
+  predicate = predicate or util.identity
+
+  return Observable.create(function(observer)
+    local skipping = true
+
+    local function onNext(...)
+      if skipping then
+        skipping = predicate(...)
+      end
+
+      if not skipping then
+        return observer:onNext(...)
+      end
+    end
+
+    local function onError(message)
+      return observer:onError(message)
+    end
+
+    local function onComplete()
+      return observer:onComplete()
+    end
+
+    return self:subscribe(onNext, onError, onComplete)
+  end)
+end
+
+--- Returns a new Observable that only produces the first n results of the original.
+-- @arg {number=1} n - The number of elements to produce before completing.
+-- @returns {Observable}
+function Observable:take(n)
+  n = n or 1
+
+  return Observable.create(function(observer)
+    if n <= 0 then
+      observer:onComplete()
+      return
+    end
+
+    local i = 1
+
+    local function onNext(...)
+      observer:onNext(...)
+
+      i = i + 1
+
+      if i > n then
+        observer:onComplete()
+      end
+    end
+
+    local function onError(e)
+      return observer:onError(e)
+    end
+
+    local function onComplete()
+      return observer:onComplete()
+    end
+
+    return self:subscribe(onNext, onError, onComplete)
+  end)
+end
+
+--- Returns a new Observable that completes when the specified Observable fires.
+-- @arg {Observable} other - The Observable that triggers completion of the original.
+-- @returns {Observable}
+function Observable:takeUntil(other)
+  return Observable.create(function(observer)
+    local function onNext(...)
+      return observer:onNext(...)
+    end
+
+    local function onError(e)
+      return observer:onError(e)
+    end
+
+    local function onComplete()
+      return observer:onComplete()
+    end
+
+    other:subscribe(onComplete, onComplete, onComplete)
+
+    return self:subscribe(onNext, onError, onComplete)
+  end)
+end
+
+--- Returns a new Observable that produces elements until the predicate returns falsy.
+-- @arg {function} predicate - The predicate used to continue production of values.
+-- @returns {Observable}
+function Observable:takeWhile(predicate)
+  predicate = predicate or util.identity
+
+  return Observable.create(function(observer)
+    local taking = true
+
+    local function onNext(...)
+      if taking then
+        taking = predicate(...)
+
+        if taking then
+          return observer:onNext(...)
+        else
+          return observer:onComplete()
+        end
+      end
+    end
+
+    local function onError(message)
+      return observer:onError(message)
+    end
+
+    local function onComplete()
+      return observer:onComplete()
+    end
+
+    return self:subscribe(onNext, onError, onComplete)
+  end)
+end
+
+--- Runs a function each time this Observable has activity. Similar to subscribe but does not
+-- create a subscription.
+-- @arg {function=} onNext - Run when the Observable produces values.
+-- @arg {function=} onError - Run when the Observable encounters a problem.
+-- @arg {function=} onComplete - Run when the Observable completes.
+-- @returns {Observable}
+function Observable:tap(_onNext, _onError, _onComplete)
+  _onNext = _onNext or util.noop
+  _onError = _onError or util.noop
+  _onComplete = _onComplete or util.noop
+
+  return Observable.create(function(observer)
+    local function onNext(...)
+      _onNext(...)
+      return observer:onNext(...)
+    end
+
+    local function onError(message)
+      _onError(message)
+      return observer:onError(message)
+    end
+
+    local function onComplete()
+      _onComplete()
+      return observer:onComplete()
+    end
+
+    return self:subscribe(onNext, onError, onComplete)
+  end)
+end
+
+--- Returns an Observable that util.unpacks the tables produced by the original.
+-- @returns {Observable}
+function Observable:unpack()
+  return self:map(util.unpack)
+end
+
+--- Returns an Observable that takes any values produced by the original that consist of multiple
+-- return values and produces each value individually.
+-- @returns {Observable}
+function Observable:unwrap()
+  return Observable.create(function(observer)
+    local function onNext(...)
+      local values = {...}
+      for i = 1, #values do
+        observer:onNext(values[i])
+      end
+    end
+
+    local function onError(message)
+      return observer:onError(message)
+    end
+
+    local function onComplete()
+      return observer:onComplete()
+    end
+
+    return self:subscribe(onNext, onError, onComplete)
+  end)
+end
+
+--- Returns an Observable that produces a sliding window of the values produced by the original.
+-- @arg {number} size - The size of the window. The returned observable will produce this number
+--                      of the most recent values as multiple arguments to onNext.
+-- @returns {Observable}
+function Observable:window(size)
+  return Observable.create(function(observer)
+    local window = {}
+
+    local function onNext(value)
+      table.insert(window, value)
+
+      if #window >= size then
+        observer:onNext(util.unpack(window))
+        table.remove(window, 1)
+      end
+    end
+
+    local function onError(message)
+      return observer:onError(message)
+    end
+
+    local function onComplete()
+      return observer:onComplete()
+    end
+
+    return self:subscribe(onNext, onError, onComplete)
+  end)
+end
+
+--- Returns an Observable that produces values from the original along with the most recently
+-- produced value from all other specified Observables. Note that only the first argument from each
+-- source Observable is used.
+-- @arg {Observable...} sources - The Observables to include the most recent values from.
+-- @returns {Observable}
+function Observable:with(...)
+  local sources = {...}
+
+  return Observable.create(function(observer)
+    local latest = setmetatable({}, {__len = util.constant(#sources)})
+
+    local function setLatest(i)
+      return function(value)
+        latest[i] = value
+      end
+    end
+
+    local function onNext(value)
+      return observer:onNext(value, util.unpack(latest))
+    end
+
+    local function onError(e)
+      return observer:onError(e)
+    end
+
+    local function onComplete()
+      return observer:onComplete()
+    end
+
+    for i = 1, #sources do
+      sources[i]:subscribe(setLatest(i), util.noop, util.noop)
+    end
+
+    return self:subscribe(onNext, onError, onComplete)
+  end)
+end
+
+--- Returns an Observable that buffers values from the original and produces them as multiple
+-- values.
+-- @arg {number} size - The size of the buffer.
+function Observable:wrap(size)
+  return Observable.create(function(observer)
+    local buffer = {}
+
+    local function emit()
+      if #buffer > 0 then
+        observer:onNext(util.unpack(buffer))
+        buffer = {}
+      end
+    end
+
+    local function onNext(...)
+      local values = {...}
+      for i = 1, #values do
+        table.insert(buffer, values[i])
+        if #buffer >= size then
+          emit()
+        end
+      end
+    end
+
+    local function onError(message)
+      emit()
+      return observer:onError(message)
+    end
+
+    local function onComplete()
+      emit()
+      return observer:onComplete()
+    end
+
+    return self:subscribe(onNext, onError, onComplete)
+  end)
+end
+
+return Observable

+ 50 - 0
src/observer.lua

@@ -0,0 +1,50 @@
+local util = require 'util'
+
+--- @class Observer
+-- @description Observers are simple objects that receive values from Observables.
+local Observer = {}
+Observer.__index = Observer
+Observer.__tostring = util.constant('Observer')
+
+--- Creates a new Observer.
+-- @arg {function=} onNext - Called when the Observable produces a value.
+-- @arg {function=} onError - Called when the Observable terminates due to an error.
+-- @arg {function=} onComplete - Called when the Observable completes normally.
+-- @returns {Observer}
+function Observer.create(onNext, onError, onComplete)
+  local self = {
+    _onNext = onNext or util.noop,
+    _onError = onError or error,
+    _onComplete = onComplete or util.noop,
+    stopped = false
+  }
+
+  return setmetatable(self, Observer)
+end
+
+--- Pushes zero or more values to the Observer.
+-- @arg {*...} values
+function Observer:onNext(...)
+  if not self.stopped then
+    self._onNext(...)
+  end
+end
+
+--- Notify the Observer that an error has occurred.
+-- @arg {string=} message - A string describing what went wrong.
+function Observer:onError(message)
+  if not self.stopped then
+    self.stopped = true
+    self._onError(message)
+  end
+end
+
+--- Notify the Observer that the sequence has completed and will produce no more values.
+function Observer:onComplete()
+  if not self.stopped then
+    self.stopped = true
+    self._onComplete()
+  end
+end
+
+return Observer

+ 81 - 0
src/schedulers/cooperativescheduler.lua

@@ -0,0 +1,81 @@
+local util = require 'util'
+local Subscription = require 'subscription'
+
+--- @class CooperativeScheduler
+-- @description Manages Observables using coroutines and a virtual clock that must be updated
+-- manually.
+local CooperativeScheduler = {}
+CooperativeScheduler.__index = CooperativeScheduler
+CooperativeScheduler.__tostring = util.constant('CooperativeScheduler')
+
+--- Creates a new CooperativeScheduler.
+-- @arg {number=0} currentTime - A time to start the scheduler at.
+-- @returns {Scheduler.CooperativeScheduler}
+function CooperativeScheduler.create(currentTime)
+  local self = {
+    tasks = {},
+    currentTime = currentTime or 0
+  }
+
+  return setmetatable(self, CooperativeScheduler)
+end
+
+--- Schedules a function to be run after an optional delay.
+-- @arg {function} action - The function to execute. Will be converted into a coroutine. The
+--                          coroutine may yield execution back to the scheduler with an optional
+--                          number, which will put it to sleep for a time period.
+-- @arg {number=0} delay - Delay execution of the action by a time period.
+function CooperativeScheduler:schedule(action, delay)
+  local task = {
+    thread = coroutine.create(action),
+    due = self.currentTime + (delay or 0)
+  }
+
+  table.insert(self.tasks, task)
+
+  return Subscription.create(function()
+    return self:unschedule(task)
+  end)
+end
+
+function CooperativeScheduler:unschedule(task)
+  for i = 1, #self.tasks do
+    if self.tasks[i] == task then
+      table.remove(self.tasks, i)
+    end
+  end
+end
+
+--- Triggers an update of the CooperativeScheduler. The clock will be advanced and the scheduler
+-- will run any coroutines that are due to be run.
+-- @arg {number=0} delta - An amount of time to advance the clock by. It is common to pass in the
+--                         time in seconds or milliseconds elapsed since this function was last
+--                         called.
+function CooperativeScheduler:update(delta)
+  self.currentTime = self.currentTime + (delta or 0)
+
+  for i = #self.tasks, 1, -1 do
+    local task = self.tasks[i]
+
+    if self.currentTime >= task.due then
+      local success, delay = coroutine.resume(task.thread)
+
+      if success then
+        task.due = math.max(task.due + (delay or 0), self.currentTime)
+      else
+        error(delay)
+      end
+
+      if coroutine.status(task.thread) == 'dead' then
+        table.remove(self.tasks, i)
+      end
+    end
+  end
+end
+
+--- Returns whether or not the CooperativeScheduler's queue is empty.
+function CooperativeScheduler:isEmpty()
+  return not next(self.tasks)
+end
+
+return CooperativeScheduler

+ 21 - 0
src/schedulers/immediatescheduler.lua

@@ -0,0 +1,21 @@
+local util = require 'util'
+
+--- @class ImmediateScheduler
+-- @description Schedules Observables by running all operations immediately.
+local ImmediateScheduler = {}
+ImmediateScheduler.__index = ImmediateScheduler
+ImmediateScheduler.__tostring = util.constant('ImmediateScheduler')
+
+--- Creates a new ImmediateScheduler.
+-- @returns {ImmediateScheduler}
+function ImmediateScheduler.create()
+  return setmetatable({}, ImmediateScheduler)
+end
+
+--- Schedules a function to be run on the scheduler. It is executed immediately.
+-- @arg {function} action - The function to execute.
+function ImmediateScheduler:schedule(action)
+  action()
+end
+
+return ImmediateScheduler

+ 54 - 0
src/subjects/behaviorsubject.lua

@@ -0,0 +1,54 @@
+local Subject = require 'subjects/subject'
+local util = require 'util'
+
+--- @class BehaviorSubject
+-- @description A Subject that tracks its current value. Provides an accessor to retrieve the most
+-- recent pushed value, and all subscribers immediately receive the latest value.
+local BehaviorSubject = setmetatable({}, Subject)
+BehaviorSubject.__index = BehaviorSubject
+BehaviorSubject.__tostring = util.constant('BehaviorSubject')
+
+--- Creates a new BehaviorSubject.
+-- @arg {*...} value - The initial values.
+-- @returns {Subject}
+function BehaviorSubject.create(...)
+  local self = {
+    observers = {},
+    stopped = false
+  }
+
+  if select('#', ...) > 0 then
+    self.value = util.pack(...)
+  end
+
+  return setmetatable(self, BehaviorSubject)
+end
+
+--- Creates a new Observer and attaches it to the Subject. Immediately broadcasts the most recent
+-- value to the Observer.
+-- @arg {function} onNext - Called when the Subject produces a value.
+-- @arg {function} onError - Called when the Subject terminates due to an error.
+-- @arg {function} onComplete - Called when the Subject completes normally.
+function BehaviorSubject:subscribe(onNext, onError, onComplete)
+  local observer = Observer.create(onNext, onError, onComplete)
+  Subject.subscribe(self, observer)
+  if self.value then
+    observer:onNext(unpack(self.value))
+  end
+end
+
+--- Pushes zero or more values to the BehaviorSubject. They will be broadcasted to all Observers.
+-- @arg {*...} values
+function BehaviorSubject:onNext(...)
+  self.value = util.pack(...)
+  return Subject.onNext(self, ...)
+end
+
+--- Returns the last value emitted by the Subject, or the initial value passed to the constructor
+-- if nothing has been emitted yet.
+-- @returns {*...}
+function BehaviorSubject:getValue()
+  return self.value and util.unpack(self.value)
+end
+
+return BehaviorSubject

+ 75 - 0
src/subjects/subject.lua

@@ -0,0 +1,75 @@
+local Observable = require 'observable'
+local util = require 'util'
+
+--- @class Subject
+-- @description Subjects function both as an Observer and as an Observable. Subjects inherit all
+-- Observable functions, including subscribe. Values can also be pushed to the Subject, which will
+-- be broadcasted to any subscribed Observers.
+local Subject = setmetatable({}, Observable)
+Subject.__index = Subject
+Subject.__tostring = util.constant('Subject')
+
+--- Creates a new Subject.
+-- @returns {Subject}
+function Subject.create()
+  local self = {
+    observers = {},
+    stopped = false
+  }
+
+  return setmetatable(self, Subject)
+end
+
+--- Creates a new Observer and attaches it to the Subject.
+-- @arg {function|table} onNext|observer - A function called when the Subject produces a value or
+--                                         an existing Observer to attach to the Subject.
+-- @arg {function} onError - Called when the Subject terminates due to an error.
+-- @arg {function} onComplete - Called when the Subject completes normally.
+function Subject:subscribe(onNext, onError, onComplete)
+  local observer
+
+  if type(onNext) == 'table' then
+    observer = onNext
+  else
+    observer = Observer.create(onNext, onError, onComplete)
+  end
+
+  table.insert(self.observers, observer)
+end
+
+--- Pushes zero or more values to the Subject. They will be broadcasted to all Observers.
+-- @arg {*...} values
+function Subject:onNext(...)
+  if not self.stopped then
+    for i = 1, #self.observers do
+      self.observers[i]:onNext(...)
+    end
+  end
+end
+
+--- Signal to all Observers that an error has occurred.
+-- @arg {string=} message - A string describing what went wrong.
+function Subject:onError(message)
+  if not self.stopped then
+    for i = 1, #self.observers do
+      self.observers[i]:onError(message)
+    end
+
+    self.stopped = true
+  end
+end
+
+--- Signal to all Observers that the Subject will not produce any more values.
+function Subject:onComplete()
+  if not self.stopped then
+    for i = 1, #self.observers do
+      self.observers[i]:onComplete()
+    end
+
+    self.stopped = true
+  end
+end
+
+Subject.__call = Subject.onNext
+
+return Subject

+ 30 - 0
src/subscription.lua

@@ -0,0 +1,30 @@
+local util = require 'util'
+
+--- @class Subscription
+-- @description A handle representing the link between an Observer and an Observable, as well as any
+-- work required to clean up after the Observable completes or the Observer unsubscribes.
+local Subscription = {}
+Subscription.__index = Subscription
+Subscription.__tostring = util.constant('Subscription')
+
+--- Creates a new Subscription.
+-- @arg {function=} action - The action to run when the subscription is unsubscribed. It will only
+--                           be run once.
+-- @returns {Subscription}
+function Subscription.create(action)
+  local self = {
+    action = action or util.noop,
+    unsubscribed = false
+  }
+
+  return setmetatable(self, Subscription)
+end
+
+--- Unsubscribes the subscription, performing any necessary cleanup work.
+function Subscription:unsubscribe()
+  if self.unsubscribed then return end
+  self.action(self)
+  self.unsubscribed = true
+end
+
+return Subscription

+ 10 - 0
src/util.lua

@@ -0,0 +1,10 @@
+local util = {}
+
+util.pack = table.pack or function(...) return { n = select('#', ...), ... } end
+util.unpack = table.unpack or unpack
+util.eq = function(x, y) return x == y end
+util.noop = function() end
+util.identity = function(x) return x end
+util.constant = function(x) return function() return x end end
+
+return util

+ 1 - 1
tests/combine.lua

@@ -9,7 +9,7 @@ describe('combine', function()
     local observableB = Rx.Observable.fromValue('b')
     local observableC = Rx.Observable.fromValue('c')
     local combinator = spy()
-    Rx.Observable.combine(observableA, observableB, observableC, combinator):subscribe()
+    Rx.Observable.combine(observableA, observableB, observableC, function(...) combinator(...) end):subscribe()
     expect(combinator).to.equal({{'a', 'b', 'c'}})
   end)
 

+ 4 - 4
tests/observable.lua

@@ -123,8 +123,8 @@ describe('Observable', function()
         return 3
       end)
 
-      Rx.scheduler = Rx.Scheduler.Cooperative.create()
-      local observable = Rx.Observable.fromCoroutine(coroutine)
+      Rx.scheduler = Rx.CooperativeScheduler.create()
+      local observable = Rx.Observable.fromCoroutine(coroutine, Rx.scheduler)
       local onNext, onError, onComplete = observableSpy(observable)
       repeat Rx.scheduler:update()
       until Rx.scheduler:isEmpty()
@@ -138,8 +138,8 @@ describe('Observable', function()
         return 3
       end
 
-      Rx.scheduler = Rx.Scheduler.Cooperative.create()
-      local observable = Rx.Observable.fromCoroutine(coroutine)
+      Rx.scheduler = Rx.CooperativeScheduler.create()
+      local observable = Rx.Observable.fromCoroutine(coroutine, Rx.scheduler)
       local onNext, onError, onComplete = observableSpy(observable)
       repeat Rx.scheduler:update()
       until Rx.scheduler:isEmpty()

+ 56 - 0
tools/concat.lua

@@ -0,0 +1,56 @@
+-- Horrible script to concatenate everything in /src into a single rx.lua file.
+-- Usage: lua tools/concat.lua [dest=rx.lua]
+
+local files = {
+  'src/util.lua',
+  'src/subscription.lua',
+  'src/observer.lua',
+  'src/observable.lua',
+  'src/schedulers/immediatescheduler.lua',
+  'src/schedulers/cooperativescheduler.lua',
+  'src/subjects/subject.lua',
+  'src/subjects/behaviorsubject.lua'
+}
+
+local header = [[
+-- RxLua v0.0.1
+-- https://github.com/bjornbytes/rxlua
+-- MIT License
+
+]]
+
+local footer = [[return {
+  util = util,
+  Subscription = Subscription,
+  Observer = Observer,
+  Observable = Observable,
+  ImmediateScheduler = ImmediateScheduler,
+  CooperativeScheduler = CooperativeScheduler,
+  Subject = Subject,
+  BehaviorSubject = BehaviorSubject
+}]]
+
+local output = ''
+
+for _, filename in ipairs(files) do
+  local file = io.open(filename)
+
+  if not file then
+    error('error opening "' .. filename .. '"')
+  end
+
+  local str = file:read('*all')
+  file:close()
+
+  str = '\n' .. str .. '\n'
+  str = str:gsub('\n(local[^\n]+require.[^\n]+)', '')
+  str = str:gsub('\n(return[^\n]+)', '')
+  str = str:gsub('^%s+', ''):gsub('%s+$', '')
+  output = output .. str .. '\n\n'
+end
+
+local outputFile = arg[1] or 'rx.lua'
+local file = io.open(outputFile, 'w')
+if file then
+  file:write(header .. output .. footer)
+end