123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657 |
- var stream = require("stream");
- var ForkStream = module.exports = function ForkStream(options) {
- options = options || {};
- options.objectMode = true;
- stream.Writable.call(this, options);
- if (options.classifier) {
- this._classifier = options.classifier;
- }
- this.a = new stream.Readable(options);
- this.b = new stream.Readable(options);
- var self = this;
- var resume = function resume() {
- if (self.resume) {
- var r = self.resume;
- self.resume = null;
- r.call(null);
- }
- };
- this.a._read = resume;
- this.b._read = resume;
- this.on("finish", function() {
- self.a.push(null);
- self.b.push(null);
- });
- };
- ForkStream.prototype = Object.create(stream.Writable.prototype, {constructor: {value: ForkStream}});
- ForkStream.prototype._classifier = function(e, done) {
- return done(null, !!e);
- };
- ForkStream.prototype._write = function _write(input, encoding, done) {
- var self = this;
- this._classifier.call(null, input, function(err, res) {
- if (err) {
- return done(err);
- }
- var out = res ? self.a : self.b;
- if (out.push(input)) {
- return done();
- } else {
- self.resume = done;
- }
- });
- };
|