Browse Source

Organize file; Update documentation;

bjorn 10 years ago
parent
commit
1634b4d0fa
2 changed files with 300 additions and 114 deletions
  1. 118 24
      doc/README.md
  2. 182 90
      rx.lua

+ 118 - 24
doc/README.md

@@ -114,6 +114,45 @@ Arguments:
 
 
 ---
 ---
 
 
+#### `:combineLatest(observables, combinator)`
+
+Returns a new Observable that runs a combinator function on the most recent values from a set of Observables whenever any of them produce a new value. The results of the combinator function are produced by the new Observable.
+
+Arguments:
+
+- `observables` (`Observable...`) - One or more Observables to combine.
+- `combinator` (`function`) - A function that combines the latest result from each Observable and returns a single value.
+
+Returns:
+
+- `Observable`
+
+---
+
+#### `:distinct()`
+
+Returns a new Observable that produces the values from the original with duplicates removed.
+
+Returns:
+
+- `Observable`
+
+---
+
+#### `:filter(predicate)`
+
+Returns a new Observable that only produces values of the first that satisfy a predicate.
+
+Arguments:
+
+- `predicate` (`function`) - The predicate to filter values with.
+
+Returns:
+
+- `Observable`
+
+---
+
 #### `:first()`
 #### `:first()`
 
 
 Returns a new Observable that only produces the first result of the original.
 Returns a new Observable that only produces the first result of the original.
@@ -148,14 +187,13 @@ Returns:
 
 
 ---
 ---
 
 
-#### `:reduce(accumulator, seed)`
+#### `:pluck(key)`
 
 
-Returns a new Observable that produces a single value computed by accumulating the results of running a function on each value produced by the original Observable.
+Returns a new Observable that produces values computed by extracting the given key from the tables produced by the original.
 
 
 Arguments:
 Arguments:
 
 
-- `accumulator` (`function`) - Accumulates the values of the original Observable. Will be passed the return value of the last call as the first argument and the current value as the second.
-- `seed` (`*`) - A value to pass to the accumulator the first time it is run.
+- `key` (`function`) - The key to extract from the table.
 
 
 Returns:
 Returns:
 
 
@@ -163,9 +201,14 @@ Returns:
 
 
 ---
 ---
 
 
-#### `:sum()`
+#### `:reduce(accumulator, seed)`
 
 
-Returns a new Observable that produces the sum of the values of the original Observable as a single result.
+Returns a new Observable that produces a single value computed by accumulating the results of running a function on each value produced by the original Observable.
+
+Arguments:
+
+- `accumulator` (`function`) - Accumulates the values of the original Observable. Will be passed the return value of the last call as the first argument and the current value as the second.
+- `seed` (`*`) - A value to pass to the accumulator the first time it is run.
 
 
 Returns:
 Returns:
 
 
@@ -173,14 +216,13 @@ Returns:
 
 
 ---
 ---
 
 
-#### `:combineLatest(observables, combinator)`
+#### `:skip(n)`
 
 
-Returns a new Observable that runs a combinator function on the most recent values from a set of Observables whenever any of them produce a new value. The results of the combinator function are produced by the new Observable.
+Returns a new Observable that skips over a specified number of values produced by the original and produces the rest.
 
 
 Arguments:
 Arguments:
 
 
-- `observables` (`Observable...`) - One or more Observables to combine.
-- `combinator` (`function`) - A function that combines the latest result from each Observable and returns a single value.
+- `[n=1]` (`number`) - The number of values to ignore.
 
 
 Returns:
 Returns:
 
 
@@ -188,9 +230,13 @@ Returns:
 
 
 ---
 ---
 
 
-#### `:distinct()`
+#### `:skipUntil(other)`
 
 
-Returns a new Observable that produces the values from the original with duplicates removed.
+Returns a new Observable that skips over values produced by the original until the specified Observable produces a value.
+
+Arguments:
+
+- `other` (`Observable`) - The Observable that triggers the production of values.
 
 
 Returns:
 Returns:
 
 
@@ -198,13 +244,9 @@ Returns:
 
 
 ---
 ---
 
 
-#### `:takeUntil(other)`
-
-Returns a new Observable that completes when the specified Observable fires.
-
-Arguments:
+#### `:sum()`
 
 
-- `other` (`Observable`) - The Observable that triggers completion of the original.
+Returns a new Observable that produces the sum of the values of the original Observable as a single result.
 
 
 Returns:
 Returns:
 
 
@@ -212,13 +254,13 @@ Returns:
 
 
 ---
 ---
 
 
-#### `:filter(predicate)`
+#### `:take(n)`
 
 
-Returns a new Observable that only produces values of the first that satisfy a predicate.
+Returns a new Observable that only produces the first n results of the original.
 
 
 Arguments:
 Arguments:
 
 
-- `predicate` (`function`) - The predicate to filter values with.
+- `[n=1]` (`number`) - The number of elements to produce before completing.
 
 
 Returns:
 Returns:
 
 
@@ -226,13 +268,13 @@ Returns:
 
 
 ---
 ---
 
 
-#### `:pluck(key)`
+#### `:takeUntil(other)`
 
 
-Returns a new Observable that produces values computed by extracting the given key from the tables produced by the original.
+Returns a new Observable that completes when the specified Observable fires.
 
 
 Arguments:
 Arguments:
 
 
-- `key` (`function`) - The key to extract from the table.
+- `other` (`Observable`) - The Observable that triggers completion of the original.
 
 
 Returns:
 Returns:
 
 
@@ -287,3 +329,55 @@ Arguments:
 
 
 Returns whether or not the Cooperative Scheduler's queue is empty.
 Returns whether or not the Cooperative Scheduler's queue is empty.
 
 
+# Subject
+
+Subjects function both as an Observer and as an Observable. Subjects inherit all Observable functions, including subscribe. Values can also be pushed to the Subject, which will be broadcasted to any subscribed Observers.
+
+---
+
+#### `.create()`
+
+Creates a new Subject.
+
+Returns:
+
+- `Subject`
+
+---
+
+#### `:subscribe(onNext, onError, onComplete)`
+
+Creates a new Observer and attaches it to the Subject.
+
+Arguments:
+
+- `onNext` (`function`) - Called when the Subject produces a value.
+- `onError` (`function`) - Called when the Subject terminates due to an error.
+- `onComplete` (`function`) - Called when the Subject completes normally.
+
+---
+
+#### `:onNext(value)`
+
+Pushes a value to the Subject. It will be broadcasted to all Observers.
+
+Arguments:
+
+- `value` (`*`)
+
+---
+
+#### `:onError(message)`
+
+Signal to all Observers that an error has occurred.
+
+Arguments:
+
+- `[message]` (`string`) - A string describing what went wrong.
+
+---
+
+#### `:onComplete()`
+
+Signal to all Observers that the Subject will not produce any more values.
+

+ 182 - 90
rx.lua

@@ -124,13 +124,66 @@ end
 -- The functions below transform the values produced by an Observable and return a new Observable
 -- The functions below transform the values produced by an Observable and return a new Observable
 -- that produces these values.
 -- that produces these values.
 
 
---- Returns a new Observable that only produces the first result of the original.
+--- Returns a new Observable that runs a combinator function on the most recent values from a set
+-- of Observables whenever any of them produce a new value. The results of the combinator function
+-- are produced by the new Observable.
+-- @arg {Observable...} observables - One or more Observables to combine.
+-- @arg {function} combinator - A function that combines the latest result from each Observable and
+--                              returns a single value.
 -- @returns {Observable}
 -- @returns {Observable}
-function Observable:first()
+function Observable:combineLatest(...)
+  local sources = {...}
+  local combinator = table.remove(sources)
+  table.insert(sources, 1, self)
+
   return Observable.create(function(observer)
   return Observable.create(function(observer)
+    local latest = {}
+    local pending = {unpack(sources)}
+    local completed = {}
+
+    local function onNext(i)
+      return function(value)
+        latest[i] = value
+        pending[i] = nil
+
+        if not next(pending) then
+          observer:onNext(combinator(unpack(latest)))
+        end
+      end
+    end
+
+    local function onError(e)
+      return observer:onError(e)
+    end
+
+    local function onComplete(i)
+      return function()
+        table.insert(completed, i)
+
+        if #completed == #sources then
+          observer:onComplete()
+        end
+      end
+    end
+
+    for i = 1, #sources do
+      sources[i]:subscribe(onNext(i), onError, onComplete(i))
+    end
+  end)
+end
+
+--- Returns a new Observable that produces the values from the original with duplicates removed.
+-- @returns {Observable}
+function Observable:distinct()
+  return Observable.create(function(observer)
+    local values = {}
+
     local function onNext(x)
     local function onNext(x)
-      observer:onNext(x)
-      return observer:onComplete()
+      if not values[x] then
+        observer:onNext(x)
+      end
+
+      values[x] = true
     end
     end
 
 
     local function onError(e)
     local function onError(e)
@@ -145,6 +198,37 @@ function Observable:first()
   end)
   end)
 end
 end
 
 
+--- Returns a new Observable that only produces values of the first that satisfy a predicate.
+-- @arg {function} predicate - The predicate to filter values with.
+-- @returns {Observable}
+function Observable:filter(predicate)
+  predicate = predicate or identity
+
+  return Observable.create(function(observer)
+    local function onNext(x)
+      if predicate(x) then
+        return observer:onNext(x)
+      end
+    end
+
+    local function onError(e)
+      return observer:onError(e)
+    end
+
+    local function onComplete()
+      return observer:onComplete(e)
+    end
+
+    return self:subscribe(onNext, onError, onComplete)
+  end)
+end
+
+--- Returns a new Observable that only produces the first result of the original.
+-- @returns {Observable}
+function Observable:first()
+  return self:take(1)
+end
+
 --- Returns a new Observable that only produces the last result of the original.
 --- Returns a new Observable that only produces the last result of the original.
 -- @returns {Observable}
 -- @returns {Observable}
 function Observable:last()
 function Observable:last()
@@ -191,6 +275,28 @@ function Observable:map(callback)
   end)
   end)
 end
 end
 
 
+--- Returns a new Observable that produces values computed by extracting the given key from the
+-- tables produced by the original.
+-- @arg {function} key - The key to extract from the table.
+-- @returns {Observable}
+function Observable:pluck(key)
+  return Observable.create(function(observer)
+    local function onNext(t)
+      return observer:onNext(t[key])
+    end
+
+    local function onError(e)
+      return observer:onError(e)
+    end
+
+    local function onComplete()
+      return observer:onComplete()
+    end
+
+    return self:subscribe(onNext, onError, onComplete)
+  end)
+end
+
 --- Returns a new Observable that produces a single value computed by accumulating the results of
 --- Returns a new Observable that produces a single value computed by accumulating the results of
 -- running a function on each value produced by the original Observable.
 -- running a function on each value produced by the original Observable.
 -- @arg {function} accumulator - Accumulates the values of the original Observable. Will be passed
 -- @arg {function} accumulator - Accumulates the values of the original Observable. Will be passed
@@ -220,38 +326,21 @@ function Observable:reduce(accumulator, seed)
   end)
   end)
 end
 end
 
 
---- Returns a new Observable that produces the sum of the values of the original Observable as a
--- single result.
+--- Returns a new Observable that skips over a specified number of values produced by the original
+-- and produces the rest.
+-- @arg {number=1} n - The number of values to ignore.
 -- @returns {Observable}
 -- @returns {Observable}
-function Observable:sum()
-  return self:reduce(function(x, y) return x + y end, 0)
-end
-
---- Returns a new Observable that runs a combinator function on the most recent values from a set
--- of Observables whenever any of them produce a new value. The results of the combinator function
--- are produced by the new Observable.
--- @arg {Observable...} observables - One or more Observables to combine.
--- @arg {function} combinator - A function that combines the latest result from each Observable and
---                              returns a single value.
--- @returns {Observable}
-function Observable:combineLatest(...)
-  local sources = {...}
-  local combinator = table.remove(sources)
-  table.insert(sources, 1, self)
+function Observable:skip(n)
+  n = n or 1
 
 
   return Observable.create(function(observer)
   return Observable.create(function(observer)
-    local latest = {}
-    local pending = {unpack(sources)}
-    local completed = {}
-
-    local function onNext(i)
-      return function(value)
-        latest[i] = value
-        pending[i] = nil
+    local i = 1
 
 
-        if not next(pending) then
-          observer:onNext(combinator(unpack(latest)))
-        end
+    local function onNext(x)
+      if i > n then
+        observer:onNext(x)
+      else
+        i = i + 1
       end
       end
     end
     end
 
 
@@ -259,81 +348,68 @@ function Observable:combineLatest(...)
       return observer:onError(e)
       return observer:onError(e)
     end
     end
 
 
-    local function onComplete(i)
-      return function()
-        table.insert(completed, i)
-
-        if #completed == #sources then
-          observer:onComplete()
-        end
-      end
+    local function onComplete()
+      return observer:onComplete()
     end
     end
 
 
-    for i = 1, #sources do
-      sources[i]:subscribe(onNext(i), onError, onComplete(i))
-    end
+    return self:subscribe(onNext, onError, onComplete)
   end)
   end)
 end
 end
 
 
---- Returns a new Observable that produces the values from the original with duplicates removed.
+--- Returns a new Observable that skips over values produced by the original until the specified
+-- Observable produces a value.
+-- @arg {Observable} other - The Observable that triggers the production of values.
 -- @returns {Observable}
 -- @returns {Observable}
-function Observable:distinct()
+function Observable:skipUntil(other)
   return Observable.create(function(observer)
   return Observable.create(function(observer)
-    local values = {}
-
-    local function onNext(x)
-      if not values[x] then
-        observer:onNext(x)
+    local function trigger()
+      local function onNext(value)
+        return observer:onNext(value)
       end
       end
 
 
-      values[x] = true
-    end
+      local function onError(message)
+        return observer:onNext(message)
+      end
 
 
-    local function onError(e)
-      return observer:onError(e)
-    end
+      local function onComplete()
+        return observer:onComplete()
+      end
 
 
-    local function onComplete()
-      return observer:onComplete()
+      return self:subscribe(onNext, onError, onComplete)
     end
     end
 
 
-    return self:subscribe(onNext, onError, onComplete)
+    other:subscribe(trigger, trigger, trigger)
   end)
   end)
 end
 end
 
 
---- Returns a new Observable that completes when the specified Observable fires.
--- @arg {Observable} other - The Observable that triggers completion of the original.
+--- Returns a new Observable that produces the sum of the values of the original Observable as a
+-- single result.
 -- @returns {Observable}
 -- @returns {Observable}
-function Observable:takeUntil(other)
-  return Observable.create(function(observer)
-    local function onNext(x)
-      return observer:onNext(x)
-    end
+function Observable:sum()
+  return self:reduce(function(x, y) return x + y end, 0)
+end
 
 
-    local function onError(e)
-      return observer:onError(e)
-    end
+--- Returns a new Observable that only produces the first n results of the original.
+-- @arg {number=1} n - The number of elements to produce before completing.
+-- @returns {Observable}
+function Observable:take(n)
+  n = n or 1
 
 
-    local function onComplete()
-      return observer:onComplete()
+  return Observable.create(function(observer)
+    if n <= 0 then
+      observer:onComplete()
+      return
     end
     end
 
 
-    other:subscribe(onComplete, onComplete, onComplete)
+    local i = 1
 
 
-    return self:subscribe(onNext, onError, onComplete)
-  end)
-end
+    local function onNext(x)
+      observer:onNext(x)
 
 
---- Returns a new Observable that only produces values of the first that satisfy a predicate.
--- @arg {function} predicate - The predicate to filter values with.
--- @returns {Observable}
-function Observable:filter(predicate)
-  predicate = predicate or identity
+      i = i + 1
 
 
-  return Observable.create(function(observer)
-    local function onNext(x)
-      if predicate(x) then
-        return observer:onNext(x)
+      if i > n then
+        observer:onComplete()
       end
       end
     end
     end
 
 
@@ -342,21 +418,20 @@ function Observable:filter(predicate)
     end
     end
 
 
     local function onComplete()
     local function onComplete()
-      return observer:onComplete(e)
+      return observer:onComplete()
     end
     end
 
 
     return self:subscribe(onNext, onError, onComplete)
     return self:subscribe(onNext, onError, onComplete)
   end)
   end)
 end
 end
 
 
---- Returns a new Observable that produces values computed by extracting the given key from the
--- tables produced by the original.
--- @arg {function} key - The key to extract from the table.
+--- Returns a new Observable that completes when the specified Observable fires.
+-- @arg {Observable} other - The Observable that triggers completion of the original.
 -- @returns {Observable}
 -- @returns {Observable}
-function Observable:pluck(key)
+function Observable:takeUntil(other)
   return Observable.create(function(observer)
   return Observable.create(function(observer)
-    local function onNext(t)
-      return observer:onNext(t[key])
+    local function onNext(x)
+      return observer:onNext(x)
     end
     end
 
 
     local function onError(e)
     local function onError(e)
@@ -367,6 +442,8 @@ function Observable:pluck(key)
       return observer:onComplete()
       return observer:onComplete()
     end
     end
 
 
+    other:subscribe(onComplete, onComplete, onComplete)
+
     return self:subscribe(onNext, onError, onComplete)
     return self:subscribe(onNext, onError, onComplete)
   end)
   end)
 end
 end
@@ -439,9 +516,15 @@ end
 
 
 Scheduler.Cooperative = Cooperative
 Scheduler.Cooperative = Cooperative
 
 
+--- @class Subject
+-- @description Subjects function both as an Observer and as an Observable. Subjects inherit all
+-- Observable functions, including subscribe. Values can also be pushed to the Subject, which will
+-- be broadcasted to any subscribed Observers.
 local Subject = setmetatable({}, Observable)
 local Subject = setmetatable({}, Observable)
 Subject.__index = Subject
 Subject.__index = Subject
 
 
+--- Creates a new Subject.
+-- @returns {Subject}
 function Subject.create()
 function Subject.create()
   local self = {
   local self = {
     observers = {}
     observers = {}
@@ -450,22 +533,31 @@ function Subject.create()
   return setmetatable(self, Subject)
   return setmetatable(self, Subject)
 end
 end
 
 
+--- Creates a new Observer and attaches it to the Subject.
+-- @arg {function} onNext - Called when the Subject produces a value.
+-- @arg {function} onError - Called when the Subject terminates due to an error.
+-- @arg {function} onComplete - Called when the Subject completes normally.
 function Subject:subscribe(onNext, onError, onComplete)
 function Subject:subscribe(onNext, onError, onComplete)
   table.insert(self.observers, Observer.create(onNext, onError, onComplete))
   table.insert(self.observers, Observer.create(onNext, onError, onComplete))
 end
 end
 
 
+--- Pushes a value to the Subject. It will be broadcasted to all Observers.
+-- @arg {*} value
 function Subject:onNext(value)
 function Subject:onNext(value)
   for i = 1, #self.observers do
   for i = 1, #self.observers do
     self.observers[i]:onNext(value)
     self.observers[i]:onNext(value)
   end
   end
 end
 end
 
 
+--- Signal to all Observers that an error has occurred.
+-- @arg {string=} message - A string describing what went wrong.
 function Subject:onError(message)
 function Subject:onError(message)
   for i = 1, #self.observers do
   for i = 1, #self.observers do
     self.observers[i]:onError(message)
     self.observers[i]:onError(message)
   end
   end
 end
 end
 
 
+--- Signal to all Observers that the Subject will not produce any more values.
 function Subject:onComplete()
 function Subject:onComplete()
   for i = 1, #self.observers do
   for i = 1, #self.observers do
     self.observers[i]:onComplete()
     self.observers[i]:onComplete()