index.js 1.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657
  1. var stream = require("stream");
  2. var ForkStream = module.exports = function ForkStream(options) {
  3. options = options || {};
  4. options.objectMode = true;
  5. stream.Writable.call(this, options);
  6. if (options.classifier) {
  7. this._classifier = options.classifier;
  8. }
  9. this.a = new stream.Readable(options);
  10. this.b = new stream.Readable(options);
  11. var self = this;
  12. var resume = function resume() {
  13. if (self.resume) {
  14. var r = self.resume;
  15. self.resume = null;
  16. r.call(null);
  17. }
  18. };
  19. this.a._read = resume;
  20. this.b._read = resume;
  21. this.on("finish", function() {
  22. self.a.push(null);
  23. self.b.push(null);
  24. });
  25. };
  26. ForkStream.prototype = Object.create(stream.Writable.prototype, {constructor: {value: ForkStream}});
  27. ForkStream.prototype._classifier = function(e, done) {
  28. return done(null, !!e);
  29. };
  30. ForkStream.prototype._write = function _write(input, encoding, done) {
  31. var self = this;
  32. this._classifier.call(null, input, function(err, res) {
  33. if (err) {
  34. return done(err);
  35. }
  36. var out = res ? self.a : self.b;
  37. if (out.push(input)) {
  38. return done();
  39. } else {
  40. self.resume = done;
  41. }
  42. });
  43. };