index.js 1.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455
  1. 'use strict';
  2. var through2 = require('through2');
  3. var ForkStream = require('fork-stream');
  4. var mergeStream = require('merge-stream');
  5. var duplexify = require('duplexify');
  6. module.exports = function (condition, trueStream, falseStream) {
  7. if (!trueStream) {
  8. throw new Error('fork-stream: child action is required');
  9. }
  10. // output stream
  11. var outStream = through2.obj();
  12. // create fork-stream
  13. var forkStream = new ForkStream({
  14. classifier: function (e, cb) {
  15. var ans = !!condition(e);
  16. return cb(null, ans);
  17. }
  18. });
  19. // if condition is true, pipe input to trueStream
  20. forkStream.a.pipe(trueStream);
  21. var mergedStream;
  22. if (falseStream) {
  23. // if there's an 'else' condition
  24. // if condition is false
  25. // pipe input to falseStream
  26. forkStream.b.pipe(falseStream);
  27. // merge output with trueStream's output
  28. mergedStream = mergeStream(falseStream, trueStream);
  29. // redirect falseStream errors to mergedStream
  30. falseStream.on('error', function(err) { mergedStream.emit('error', err); });
  31. } else {
  32. // if there's no 'else' condition
  33. // if condition is false
  34. // merge output with trueStream's output
  35. mergedStream = mergeStream(forkStream.b, trueStream);
  36. }
  37. // redirect trueStream errors to mergedStream
  38. trueStream.on('error', function(err) { mergedStream.emit('error', err); });
  39. // send everything down-stream
  40. mergedStream.pipe(outStream);
  41. // redirect mergedStream errors to outStream
  42. mergedStream.on('error', function(err) { outStream.emit('error', err); });
  43. // consumers write in to forkStream, we write out to outStream
  44. return duplexify.obj(forkStream, outStream);
  45. };