Browse Source

haxe package

Aurel Bílý 6 years ago
parent
commit
616b89f141

+ 116 - 0
std/haxe/Error.hx

@@ -0,0 +1,116 @@
+package haxe;
+
+import asys.uv.UVErrorType;
+import haxe.PosInfos;
+
+/**
+	Common class for errors.
+**/
+class Error {
+	function get_message():String {
+		return (switch (type) {
+			case UVError(UVErrorType.E2BIG): "argument list too long";
+			case UVError(UVErrorType.EACCES): "permission denied";
+			case UVError(UVErrorType.EADDRINUSE): "address already in use";
+			case UVError(UVErrorType.EADDRNOTAVAIL): "address not available";
+			case UVError(UVErrorType.EAFNOSUPPORT): "address family not supported";
+			case UVError(UVErrorType.EAGAIN): "resource temporarily unavailable";
+			case UVError(UVErrorType.EAI_ADDRFAMILY): "address family not supported";
+			case UVError(UVErrorType.EAI_AGAIN): "temporary failure";
+			case UVError(UVErrorType.EAI_BADFLAGS): "bad ai_flags value";
+			case UVError(UVErrorType.EAI_BADHINTS): "invalid value for hints";
+			case UVError(UVErrorType.EAI_CANCELED): "request canceled";
+			case UVError(UVErrorType.EAI_FAIL): "permanent failure";
+			case UVError(UVErrorType.EAI_FAMILY): "ai_family not supported";
+			case UVError(UVErrorType.EAI_MEMORY): "out of memory";
+			case UVError(UVErrorType.EAI_NODATA): "no address";
+			case UVError(UVErrorType.EAI_NONAME): "unknown node or service";
+			case UVError(UVErrorType.EAI_OVERFLOW): "argument buffer overflow";
+			case UVError(UVErrorType.EAI_PROTOCOL): "resolved protocol is unknown";
+			case UVError(UVErrorType.EAI_SERVICE): "service not available for socket type";
+			case UVError(UVErrorType.EAI_SOCKTYPE): "socket type not supported";
+			case UVError(UVErrorType.EALREADY): "connection already in progress";
+			case UVError(UVErrorType.EBADF): "bad file descriptor";
+			case UVError(UVErrorType.EBUSY): "resource busy or locked";
+			case UVError(UVErrorType.ECANCELED): "operation canceled";
+			case UVError(UVErrorType.ECHARSET): "invalid Unicode character";
+			case UVError(UVErrorType.ECONNABORTED): "software caused connection abort";
+			case UVError(UVErrorType.ECONNREFUSED): "connection refused";
+			case UVError(UVErrorType.ECONNRESET): "connection reset by peer";
+			case UVError(UVErrorType.EDESTADDRREQ): "destination address required";
+			case UVError(UVErrorType.EEXIST): "file already exists";
+			case UVError(UVErrorType.EFAULT): "bad address in system call argument";
+			case UVError(UVErrorType.EFBIG): "file too large";
+			case UVError(UVErrorType.EHOSTUNREACH): "host is unreachable";
+			case UVError(UVErrorType.EINTR): "interrupted system call";
+			case UVError(UVErrorType.EINVAL): "invalid argument";
+			case UVError(UVErrorType.EIO): "i/o error";
+			case UVError(UVErrorType.EISCONN): "socket is already connected";
+			case UVError(UVErrorType.EISDIR): "illegal operation on a directory";
+			case UVError(UVErrorType.ELOOP): "too many symbolic links encountered";
+			case UVError(UVErrorType.EMFILE): "too many open files";
+			case UVError(UVErrorType.EMSGSIZE): "message too long";
+			case UVError(UVErrorType.ENAMETOOLONG): "name too long";
+			case UVError(UVErrorType.ENETDOWN): "network is down";
+			case UVError(UVErrorType.ENETUNREACH): "network is unreachable";
+			case UVError(UVErrorType.ENFILE): "file table overflow";
+			case UVError(UVErrorType.ENOBUFS): "no buffer space available";
+			case UVError(UVErrorType.ENODEV): "no such device";
+			case UVError(UVErrorType.ENOENT): "no such file or directory";
+			case UVError(UVErrorType.ENOMEM): "not enough memory";
+			case UVError(UVErrorType.ENONET): "machine is not on the network";
+			case UVError(UVErrorType.ENOPROTOOPT): "protocol not available";
+			case UVError(UVErrorType.ENOSPC): "no space left on device";
+			case UVError(UVErrorType.ENOSYS): "function not implemented";
+			case UVError(UVErrorType.ENOTCONN): "socket is not connected";
+			case UVError(UVErrorType.ENOTDIR): "not a directory";
+			case UVError(UVErrorType.ENOTEMPTY): "directory not empty";
+			case UVError(UVErrorType.ENOTSOCK): "socket operation on non-socket";
+			case UVError(UVErrorType.ENOTSUP): "operation not supported on socket";
+			case UVError(UVErrorType.EPERM): "operation not permitted";
+			case UVError(UVErrorType.EPIPE): "broken pipe";
+			case UVError(UVErrorType.EPROTO): "protocol error";
+			case UVError(UVErrorType.EPROTONOSUPPORT): "protocol not supported";
+			case UVError(UVErrorType.EPROTOTYPE): "protocol wrong type for socket";
+			case UVError(UVErrorType.ERANGE): "result too large";
+			case UVError(UVErrorType.EROFS): "read-only file system";
+			case UVError(UVErrorType.ESHUTDOWN): "cannot send after transport endpoint shutdown";
+			case UVError(UVErrorType.ESPIPE): "invalid seek";
+			case UVError(UVErrorType.ESRCH): "no such process";
+			case UVError(UVErrorType.ETIMEDOUT): "connection timed out";
+			case UVError(UVErrorType.ETXTBSY): "text file is busy";
+			case UVError(UVErrorType.EXDEV): "cross-device link not permitted";
+			case UVError(UVErrorType.UNKNOWN): "unknown error";
+			case UVError(UVErrorType.EOF): "end of file";
+			case UVError(UVErrorType.ENXIO): "no such device or address";
+			case UVError(UVErrorType.EMLINK): "too many links";
+			case UVError(UVErrorType.EHOSTDOWN): "host is down";
+			case UVError(UVErrorType.EOTHER): "other UV error";
+			case _: "unknown error";
+		});
+	}
+
+	/**
+		A human-readable representation of the error.
+	**/
+	public var message(get, never):String;
+
+	/**
+		Position where the error was thrown. By default, this is the place where the error is constructed.
+	**/
+	public final posInfos:PosInfos;
+
+	/**
+		Error type, usable for discerning error types with `switch` statements.
+	**/
+	public final type:ErrorType;
+
+	public function new(type:ErrorType, ?posInfos:PosInfos) {
+		this.type = type;
+		this.posInfos = posInfos;
+	}
+
+	public function toString():String {
+		return '$message at $posInfos';
+	}
+}

+ 5 - 0
std/haxe/ErrorType.hx

@@ -0,0 +1,5 @@
+package haxe;
+
+enum ErrorType {
+	UVError(errno:asys.uv.UVErrorType);
+}

+ 10 - 0
std/haxe/NoData.hx

@@ -0,0 +1,10 @@
+package haxe;
+
+/**
+	Data type used to indicate the absence of a value, especially in types with
+	type parameters.
+**/
+abstract NoData(Int) {
+	public inline function new()
+		this = 0;
+}

+ 42 - 0
std/haxe/async/ArraySignal.hx

@@ -0,0 +1,42 @@
+package haxe.async;
+
+/**
+	Basic implementation of a `haxe.async.Signal`. Uses an array for storing
+	listeners for the signal.
+**/
+class ArraySignal<T> implements Signal<T> {
+	final listeners:Array<Listener<T>> = [];
+
+	function get_listenerCount():Int {
+		return listeners.length;
+	}
+
+	public var listenerCount(get, never):Int;
+
+	public function new() {}
+
+	public function on(listener:Listener<T>):Void {
+		listeners.push(listener);
+	}
+
+	public function once(listener:Listener<T>):Void {
+		listeners.push(function wrapped(data:T):Void {
+			listeners.remove(wrapped);
+			listener(data);
+		});
+	}
+
+	public function off(?listener:Listener<T>):Void {
+		if (listener != null) {
+			listeners.remove(listener);
+		} else {
+			listeners.resize(0);
+		}
+	}
+
+	public function emit(data:T):Void {
+		for (listener in listeners) {
+			listener(data);
+		}
+	}
+}

+ 65 - 0
std/haxe/async/Callback.hx

@@ -0,0 +1,65 @@
+package haxe.async;
+
+import haxe.Error;
+import haxe.NoData;
+
+typedef CallbackData<T> = (?error:Error, ?result:T) -> Void;
+
+/**
+	A callback. All callbacks in the standard library are functions which accept
+	two arguments: an error (`haxe.Error`) and a result (`T`). If error is 
+	non-`null`, result must be `null`. The callback type is declared in 	`CallbackData`.
+
+	This abstract defines multiple `@:from` conversions to improve readability of
+	callback code.
+**/
+@:callable
+abstract Callback<T>(CallbackData<T>) from CallbackData<T> {
+	/**
+		Returns a callback of the same type as `cb` which is guaranteed to be
+		non-`null`. If `cb` is given and is not `null` it is returned directly.
+		If `cb` is `null` a dummy callback which does nothing is returned instead.
+	**/
+	public static function nonNull<T>(?cb:Callback<T>):Callback<T> {
+		if (cb == null)
+			return (_, _) -> {};
+		return cb;
+	}
+
+	/**
+		Wraps a function which takes a single optional `haxe.Error` argument into
+		a callback of type `Callback<NoData>`. Allows:
+
+		```haxe
+		var cb:Callback<NoData> = (?err) -> trace("error!", err);
+		```
+	**/
+	@:from public static inline function fromOptionalErrorOnly(f:(?error:Error) -> Void):Callback<NoData> {
+		return (?err:Error, ?result:NoData) -> f(err);
+	}
+
+	/**
+		Wraps a function which takes a single `haxe.Error` argument into a callback
+		of type `Callback<NoData>`. Allows:
+
+		```haxe
+		var cb:Callback<NoData> = (err) -> trace("error!", err);
+		```
+	**/
+	@:from public static inline function fromErrorOnly(f:(error:Error) -> Void):Callback<NoData> {
+		return (?err:Error, ?result:NoData) -> f(err);
+	}
+
+	/*
+	// this should not be encouraged, may mess up from(Optional)ErrorOnly
+	@:from static inline function fromResultOnly<T>(f:(?result:T) -> Void):Callback<T> return (?err:Error, ?result:T) -> f(result);
+	*/
+
+	/**
+		Wraps a callback function declared without `?` (optional) arguments into a
+		callback.
+	**/
+	@:from public static inline function fromErrorResult<T>(f:(error:Error, result:T) -> Void):Callback<T> {
+		return (?err:Error, ?result:T) -> f(err, result);
+	}
+}

+ 11 - 0
std/haxe/async/Defer.hx

@@ -0,0 +1,11 @@
+package haxe.async;
+
+class Defer {
+	/**
+		Schedules the given function to run during the next processing tick.
+		Convenience shortcut for `Timer.delay(f, 0)`.
+	**/
+	public static inline function nextTick(f:() -> Void):asys.Timer {
+		return asys.Timer.delay(f, 0);
+	}
+}

+ 19 - 0
std/haxe/async/Listener.hx

@@ -0,0 +1,19 @@
+package haxe.async;
+
+import haxe.NoData;
+
+typedef ListenerData<T> = (data:T) -> Void;
+
+/**
+	Signal listener. A signal listener is a function which accepts one argument
+	and has a `Void` return type.
+**/
+@:callable
+abstract Listener<T>(ListenerData<T>) from ListenerData<T> {
+	/**
+		This function allows a listener to a `Signal<NoData>` to be defined as a
+		function which accepts no arguments.
+	**/
+	@:from static inline function fromNoArguments(f:() -> Void):Listener<NoData>
+		return(data:NoData) -> f();
+}

+ 38 - 0
std/haxe/async/Signal.hx

@@ -0,0 +1,38 @@
+package haxe.async;
+
+/**
+	Signals are a type-safe system to emit events. A signal will calls its
+	listeners whenever _something_ (the event that the signal represents) happens,
+	passing along any relevant associated data.
+
+	Signals which have no associated data should use `haxe.NoData` as their type
+	parameter.
+**/
+interface Signal<T> {
+	/**
+		Number of listeners to `this` signal.
+	**/
+	var listenerCount(get, never):Int;
+
+	/**
+		Adds a listener to `this` signal, which will be called for all signal
+		emissions until it is removed with `off`.
+	**/
+	function on(listener:Listener<T>):Void;
+
+	/**
+		Adds a listener to `this` signal, which will be called only once, the next
+		time the signal emits.
+	**/
+	function once(listener:Listener<T>):Void;
+
+	/**
+		Removes the given listener from `this` signal.
+	**/
+	function off(?listener:Listener<T>):Void;
+
+	/**
+		Emits `data` to all current listeners of `this` signal.
+	**/
+	function emit(data:T):Void;
+}

+ 51 - 0
std/haxe/async/WrappedSignal.hx

@@ -0,0 +1,51 @@
+package haxe.async;
+
+import haxe.NoData;
+
+/**
+	An implementation of `haxe.async.Signal` which will listen for changes in its
+	listeners. This is useful when a class changes its behavior depending on
+	whether there are any listeners to some of its signals, e.g. a `Readable`
+	stream will not emit data signals when there are no data handlers.
+**/
+class WrappedSignal<T> implements Signal<T> {
+	final listeners:Array<Listener<T>> = [];
+	public final changeSignal:Signal<NoData> = new ArraySignal<NoData>();
+
+	function get_listenerCount():Int {
+		return listeners.length;
+	}
+
+	public var listenerCount(get, never):Int;
+
+	public function new() {}
+
+	public function on(listener:Listener<T>):Void {
+		listeners.push(listener);
+		changeSignal.emit(new NoData());
+	}
+
+	public function once(listener:Listener<T>):Void {
+		listeners.push(function wrapped(data:T):Void {
+			listeners.remove(wrapped);
+			changeSignal.emit(new NoData());
+			listener(data);
+		});
+		changeSignal.emit(new NoData());
+	}
+
+	public function off(?listener:Listener<T>):Void {
+		if (listener != null) {
+			listeners.remove(listener);
+		} else {
+			listeners.resize(0);
+		}
+		changeSignal.emit(new NoData());
+	}
+
+	public function emit(data:T):Void {
+		for (listener in listeners) {
+			listener(data);
+		}
+	}
+}

+ 137 - 0
std/haxe/io/Duplex.hx

@@ -0,0 +1,137 @@
+package haxe.io;
+
+import haxe.Error;
+import haxe.NoData;
+import haxe.async.*;
+import haxe.ds.List;
+import haxe.io.Readable.ReadResult;
+
+/**
+	A stream which is both readable and writable.
+
+	This is an abstract base class that should never be used directly. Instead,
+	child classes should override the `internalRead` and `internalWrite` methods.
+	See `haxe.io.Readable` and `haxe.io.Writable`.
+**/
+@:access(haxe.io.Readable)
+@:access(haxe.io.Writable)
+class Duplex implements IReadable implements IWritable {
+	public final dataSignal:Signal<Bytes>;
+	public final endSignal:Signal<NoData>;
+	public final errorSignal:Signal<Error>;
+	public final pauseSignal:Signal<NoData>;
+	public final resumeSignal:Signal<NoData>;
+
+	public final drainSignal:Signal<NoData>;
+	public final finishSignal:Signal<NoData>;
+	public final pipeSignal:Signal<IReadable>;
+	public final unpipeSignal:Signal<IReadable>;
+
+	final input:Writable;
+	final output:Readable;
+	final inputBuffer:List<Bytes>;
+	final outputBuffer:List<Bytes>;
+
+	function get_inputBufferLength() {
+		return input.bufferLength;
+	}
+	var inputBufferLength(get, never):Int;
+
+	function get_outputBufferLength() {
+		return output.bufferLength;
+	}
+	var outputBufferLength(get, never):Int;
+
+	function new() {
+		input = new DuplexWritable(this);
+		output = new DuplexReadable(this);
+		dataSignal = output.dataSignal;
+		endSignal = output.endSignal;
+		errorSignal = output.errorSignal;
+		pauseSignal = output.pauseSignal;
+		resumeSignal = output.resumeSignal;
+		drainSignal = input.drainSignal;
+		finishSignal = input.finishSignal;
+		pipeSignal = input.pipeSignal;
+		unpipeSignal = input.unpipeSignal;
+		inputBuffer = input.buffer;
+		outputBuffer = output.buffer;
+	}
+
+	// override by implementing classes
+	function internalRead(remaining:Int):ReadResult {
+		throw "not implemented";
+	}
+
+	function internalWrite():Void {
+		throw "not implemented";
+	}
+
+	inline function pop():Bytes {
+		return input.pop();
+	}
+
+	inline function push(chunk:Bytes):Void {
+		output.push(chunk);
+	}
+
+	inline function asyncRead(chunks:Array<Bytes>, eof:Bool):Void {
+		output.asyncRead(chunks, eof);
+	}
+
+	public inline function write(chunk:Bytes):Bool {
+		return input.write(chunk);
+	}
+
+	public function end():Void {
+		input.end();
+		output.asyncRead(null, true);
+	}
+
+	public inline function pause():Void {
+		output.pause();
+	}
+
+	public inline function resume():Void {
+		output.resume();
+	}
+
+	public inline function pipe(to:IWritable):Void {
+		output.pipe(to);
+	}
+
+	public inline function cork():Void {
+		input.cork();
+	}
+
+	public inline function uncork():Void {
+		input.uncork();
+	}
+}
+
+@:access(haxe.io.Duplex)
+private class DuplexWritable extends Writable {
+	final parent:Duplex;
+
+	public function new(parent:Duplex) {
+		this.parent = parent;
+	}
+
+	override function internalWrite():Void {
+		parent.internalWrite();
+	}
+}
+
+@:access(haxe.io.Duplex)
+private class DuplexReadable extends Readable {
+	final parent:Duplex;
+
+	public function new(parent:Duplex) {
+		super();
+		this.parent = parent;
+	}
+
+	override function internalRead(remaining):ReadResult {
+		return parent.internalRead(remaining);
+	}
+}

+ 51 - 0
std/haxe/io/FilePath.hx

@@ -0,0 +1,51 @@
+package haxe.io;
+
+/**
+	Represents a relative or absolute file path.
+**/
+abstract FilePath(String) from String {
+	@:from public static function encode(bytes:Bytes):FilePath {
+		// TODO: standard UTF-8 decoding, except any invalid bytes is replaced
+		// with (for example) U+FFFD, followed by the byte itself as a codepoint
+		return null;
+	}
+
+	public function decode():Bytes {
+		return null;
+	}
+
+	/**
+		The components of `this` path.
+	**/
+	public var components(get, never):Array<FilePath>;
+
+	private function get_components():Array<FilePath> {
+		return this.split("/");
+	}
+
+	@:op(A / B)
+	public function addComponent(other:FilePath):FilePath {
+		return this + "/" + other.get_raw();
+	}
+
+	private function get_raw():String
+		return this;
+
+	#if hl
+	private function decodeNative():hl.Bytes {
+		return @:privateAccess this.toUtf8();
+	}
+
+	private static function encodeNative(data:hl.Bytes):FilePath {
+		return ((@:privateAccess String.fromUCS2(data)) : FilePath);
+	}
+	#elseif neko
+	private function decodeNative():neko.NativeString {
+		return neko.NativeString.ofString(this);
+	}
+
+	private static function encodeNative(data:neko.NativeString):FilePath {
+		return (neko.NativeString.toString(data) : FilePath);
+	}
+	#end
+}

+ 12 - 0
std/haxe/io/IDuplex.hx

@@ -0,0 +1,12 @@
+package haxe.io;
+
+/**
+	A stream which is both readable and writable.
+
+	This interface should be used wherever an object that is both readable and
+	writable is expected, regardless of a specific implementation. See `Duplex`
+	for an abstract base class that can be used to implement an `IDuplex`.
+
+	See also `IReadable` and `IWritable`.
+**/
+interface IDuplex extends IReadable extends IWritable {}

+ 63 - 0
std/haxe/io/IReadable.hx

@@ -0,0 +1,63 @@
+package haxe.io;
+
+import haxe.Error;
+import haxe.NoData;
+import haxe.async.*;
+
+/**
+	A readable stream.
+
+	This interface should be used wherever an object that is readable is
+	expected, regardless of a specific implementation. See `Readable` for an
+	abstract base class that can be used to implement an `IReadable`.
+**/
+interface IReadable {
+	/**
+		Emitted whenever a chunk of data is available.
+	**/
+	final dataSignal:Signal<Bytes>;
+
+	/**
+		Emitted when the stream is finished. No further signals will be emitted by
+		`this` instance after `endSignal` is emitted.
+	**/
+	final endSignal:Signal<NoData>;
+
+	/**
+		Emitted for any error that occurs during reading.
+	**/
+	final errorSignal:Signal<Error>;
+
+	/**
+		Emitted when `this` stream is paused.
+	**/
+	final pauseSignal:Signal<NoData>;
+
+	/**
+		Emitted when `this` stream is resumed.
+	**/
+	final resumeSignal:Signal<NoData>;
+
+	/**
+		Resumes flow of data. Note that this method is called automatically
+		whenever listeners to either `dataSignal` or `endSignal` are added.
+	**/
+	function resume():Void;
+
+	/**
+		Pauses flow of data.
+	**/
+	function pause():Void;
+
+	/**
+		Pipes the data from `this` stream to `target`.
+	**/
+	function pipe(target:IWritable):Void;
+
+	/**
+		Indicates to `this` stream that an additional `amount` bytes should be read
+		from the underlying data source. Note that the actual data will arrive via
+		`dataSignal`.
+	**/
+	// function read(amount:Int):Void;
+}

+ 15 - 0
std/haxe/io/IWritable.hx

@@ -0,0 +1,15 @@
+package haxe.io;
+
+import haxe.NoData;
+import haxe.async.Signal;
+
+interface IWritable {
+	final drainSignal:Signal<NoData>;
+	final finishSignal:Signal<NoData>;
+	final pipeSignal:Signal<IReadable>;
+	final unpipeSignal:Signal<IReadable>;
+	function write(chunk:Bytes):Bool;
+	function end():Void;
+	function cork():Void;
+	function uncork():Void;
+}

+ 240 - 0
std/haxe/io/Readable.hx

@@ -0,0 +1,240 @@
+package haxe.io;
+
+import haxe.Error;
+import haxe.NoData;
+import haxe.async.*;
+import haxe.ds.List;
+
+/**
+	A readable stream.
+
+	This is an abstract base class that should never be used directly. Instead,
+	subclasses should override the `internalRead` method.
+**/
+class Readable implements IReadable {
+	/**
+		See `IReadable.dataSignal`.
+	**/
+	public final dataSignal:Signal<Bytes>;
+
+	/**
+		See `IReadable.endSignal`.
+	**/
+	public final endSignal:Signal<NoData>;
+
+	/**
+		See `IReadable.errorSignal`.
+	**/
+	public final errorSignal:Signal<Error> = new ArraySignal();
+
+	/**
+		See `IReadable.pauseSignal`.
+	**/
+	public final pauseSignal:Signal<NoData> = new ArraySignal();
+
+	/**
+		See `IReadable.resumeSignal`.
+	**/
+	public final resumeSignal:Signal<NoData> = new ArraySignal();
+
+	/**
+		High water mark. `Readable` will call `internalRead` pre-emptively to fill
+		up the internal buffer up to this value when possible. Set to `0` to
+		disable pre-emptive reading.
+	**/
+	public var highWaterMark = 8192;
+
+	/**
+		Total amount of data currently in the internal buffer, in bytes.
+	**/
+	public var bufferLength(default, null) = 0;
+
+	/**
+		Whether data is flowing at the moment. When flowing, data signals will be
+		emitted and the internal buffer will be empty.
+	**/
+	public var flowing(default, null) = false;
+
+	/**
+		Whether this stream is finished. When `true`, no further signals will be
+		emmited by `this` instance.
+	**/
+	public var done(default, null) = false;
+
+	var buffer = new List<Bytes>();
+	var deferred:asys.Timer;
+	var willEof = false;
+
+	@:dox(show)
+	function new(?highWaterMark:Int = 8192) {
+		this.highWaterMark = highWaterMark;
+		var dataSignal = new WrappedSignal<Bytes>();
+		dataSignal.changeSignal.on(() -> {
+			if (dataSignal.listenerCount > 0)
+				resume();
+		});
+		this.dataSignal = dataSignal;
+		var endSignal = new WrappedSignal<NoData>();
+		endSignal.changeSignal.on(() -> {
+			if (endSignal.listenerCount > 0)
+				resume();
+		});
+		this.endSignal = endSignal;
+	}
+
+	inline function shouldFlow():Bool {
+		return !done && (dataSignal.listenerCount > 0 || endSignal.listenerCount > 0);
+	}
+
+	function process():Void {
+		deferred = null;
+		if (!shouldFlow())
+			flowing = false;
+		if (!flowing)
+			return;
+
+		var reschedule = false;
+
+		// pre-emptive read until HWM
+		if (!willEof && !done)
+			while (bufferLength < highWaterMark) {
+				switch (internalRead(highWaterMark - bufferLength)) {
+					case None:
+						break;
+					case Data(chunks, eof):
+						reschedule = true;
+						for (chunk in chunks)
+							push(chunk);
+						if (eof) {
+							willEof = true;
+							break;
+						}
+				}
+			}
+
+		// emit data
+		while (buffer.length > 0 && flowing && shouldFlow()) {
+			reschedule = true;
+			dataSignal.emit(pop());
+		}
+
+		if (willEof) {
+			endSignal.emit(new NoData());
+			flowing = false;
+			done = true;
+			return;
+		}
+
+		if (!shouldFlow())
+			flowing = false;
+		else if (reschedule)
+			scheduleProcess();
+	}
+
+	inline function scheduleProcess():Void {
+		if (deferred == null)
+			deferred = Defer.nextTick(process);
+	}
+
+	function push(chunk:Bytes):Bool {
+		if (done)
+			throw "stream already done";
+		buffer.add(chunk);
+		bufferLength += chunk.length;
+		return bufferLength < highWaterMark;
+	}
+
+	/**
+		This method should be used internally from `internalRead` to provide data
+		resulting from asynchronous operations. The arguments to this method are
+		the same as `ReadableResult.Data`. See `internalRead` for more details.
+	**/
+	@:dox(show)
+	function asyncRead(chunks:Array<Bytes>, eof:Bool):Void {
+		if (done || willEof)
+			throw "stream already done";
+		if (chunks != null)
+			for (chunk in chunks)
+				push(chunk);
+		if (eof)
+			willEof = true;
+		if (chunks != null || eof)
+			scheduleProcess();
+	}
+
+	function pop():Bytes {
+		if (done)
+			throw "stream already done";
+		var chunk = buffer.pop();
+		bufferLength -= chunk.length;
+		return chunk;
+	}
+
+	/**
+		This method should be overridden by a subclass.
+
+		This method will be called as needed by `Readable`. The `remaining`
+		argument is an indication of how much data is needed to fill the internal
+		buffer up to the high water mark, or the current requested amount of data.
+		This method is called in a cycle until the read cycle is stopped with a
+		`None` return or an EOF is indicated, as described below.
+
+		If a call to this method returns `None`, the current read cycle is
+		ended. This value should be returned when there is no data available at the
+		moment, but a read request was scheduled and will later be fulfilled by a
+		call to `asyncRead`.
+
+		If a call to this method returns `Data(chunks, eof)`, `chunks` will be
+		added to the internal buffer. If `eof` is `true`, the read cycle is ended
+		and the readable stream signals an EOF (end-of-file). After an EOF, no
+		further calls will be made. `chunks` should not be an empty array if `eof`
+		is `false`.
+
+		Code inside this method should only call `asyncRead` (asynchronously from
+		a callback) or provide data using the return value.
+	**/
+	@:dox(show)
+	function internalRead(remaining:Int):ReadResult {
+		throw "not implemented";
+	}
+
+	/**
+		See `IReadable.resume`.
+	**/
+	public function resume():Void {
+		if (done)
+			return;
+		if (!flowing) {
+			resumeSignal.emit(new NoData());
+			flowing = true;
+			scheduleProcess();
+		}
+	}
+
+	/**
+		See `IReadable.pause`.
+	**/
+	public function pause():Void {
+		if (done)
+			return;
+		if (flowing) {
+			pauseSignal.emit(new NoData());
+			flowing = false;
+		}
+	}
+
+	/**
+		See `IReadable.pipe`.
+	**/
+	public function pipe(to:IWritable):Void {
+		throw "!";
+	}
+}
+
+/**
+	See `Readable.internalRead`.
+**/
+enum ReadResult {
+	None;
+	Data(chunks:Array<Bytes>, eof:Bool);
+}

+ 21 - 0
std/haxe/io/StreamTools.hx

@@ -0,0 +1,21 @@
+package haxe.io;
+
+class StreamTools {
+	/**
+		Creates a pipeline out of the given streams. `input` is piped to the first
+		element in `intermediate`, which is piped to the next element in
+		`intermediate`, and so on, until the last stream is piped to `output`. If
+		`intermediate` is `null`, it is treated as an empty array and `input` is
+		connected directly to `output`.
+	**/
+	public static function pipeline(input:IReadable, ?intermediate:Array<IDuplex>, output:IWritable):Void {
+		if (intermediate == null || intermediate.length == 0)
+			return input.pipe(output);
+
+		input.pipe(intermediate[0]);
+		for (i in 0...intermediate.length - 1) {
+			intermediate[i].pipe(intermediate[i + 1]);
+		}
+		intermediate[intermediate.length - 1].pipe(output);
+	}
+}

+ 94 - 0
std/haxe/io/Transform.hx

@@ -0,0 +1,94 @@
+package haxe.io;
+
+import haxe.Error;
+import haxe.NoData;
+import haxe.async.*;
+
+@:access(haxe.io.Readable)
+@:access(haxe.io.Writable)
+class Transform implements IReadable implements IWritable {
+	public final dataSignal:Signal<Bytes>;
+	public final endSignal:Signal<NoData>;
+	public final errorSignal:Signal<Error>;
+	public final pauseSignal:Signal<NoData>;
+	public final resumeSignal:Signal<NoData>;
+
+	public final drainSignal:Signal<NoData>;
+	public final finishSignal:Signal<NoData>;
+	public final pipeSignal:Signal<IReadable>;
+	public final unpipeSignal:Signal<IReadable>;
+
+	final input:Writable;
+	final output:Readable;
+
+	var transforming:Bool = false;
+
+	function new() {
+		input = new TransformWritable(this);
+		output = @:privateAccess new Readable(0);
+		dataSignal = output.dataSignal;
+		endSignal = output.endSignal;
+		errorSignal = output.errorSignal;
+		pauseSignal = output.pauseSignal;
+		resumeSignal = output.resumeSignal;
+		drainSignal = input.drainSignal;
+		finishSignal = input.finishSignal;
+		pipeSignal = input.pipeSignal;
+		unpipeSignal = input.unpipeSignal;
+	}
+
+	function internalTransform(chunk:Bytes):Void {
+		throw "not implemented";
+	}
+
+	function push(chunk:Bytes):Void {
+		transforming = false;
+		output.asyncRead([chunk], false);
+		input.internalWrite();
+	}
+
+	public inline function write(chunk:Bytes):Bool {
+		return input.write(chunk);
+	}
+
+	public function end():Void {
+		input.end();
+		output.asyncRead(null, true);
+	}
+
+	public inline function pause():Void {
+		output.pause();
+	}
+
+	public inline function resume():Void {
+		output.resume();
+	}
+
+	public inline function pipe(to:IWritable):Void {
+		output.pipe(to);
+	}
+
+	public inline function cork():Void {
+		input.cork();
+	}
+
+	public inline function uncork():Void {
+		input.uncork();
+	}
+}
+
+@:access(haxe.io.Transform)
+private class TransformWritable extends Writable {
+	final parent:Transform;
+
+	public function new(parent:Transform) {
+		this.parent = parent;
+	}
+
+	override function internalWrite():Void {
+		if (buffer.length > 0) {
+			parent.transforming = true;
+			parent.internalTransform(pop());
+		}
+	}
+}

+ 91 - 0
std/haxe/io/Writable.hx

@@ -0,0 +1,91 @@
+package haxe.io;
+
+import haxe.NoData;
+import haxe.async.*;
+import haxe.ds.List;
+
+/**
+	A writable stream.
+
+	This is an abstract base class that should never be used directly. Instead,
+	subclasses should override the `internalWrite` method.
+**/
+class Writable implements IWritable {
+	public final drainSignal:Signal<NoData> = new ArraySignal<NoData>();
+	public final finishSignal:Signal<NoData> = new ArraySignal<NoData>();
+	public final pipeSignal:Signal<IReadable> = new ArraySignal<IReadable>();
+	public final unpipeSignal:Signal<IReadable> = new ArraySignal<IReadable>();
+
+	public var highWaterMark = 8192;
+	public var bufferLength(default, null) = 0;
+	public var corkCount(default, null) = 0;
+	public var done(default, null) = false;
+
+	var willDrain = false;
+	var willFinish = false;
+	var deferred:asys.Timer;
+	var buffer = new List<Bytes>();
+
+	// for use by implementing classes
+	function pop():Bytes {
+		var chunk = buffer.pop();
+		bufferLength -= chunk.length;
+		if (willDrain && buffer.length == 0) {
+			willDrain = false;
+			if (deferred == null)
+				deferred = Defer.nextTick(() -> {
+					deferred = null;
+					drainSignal.emit(new NoData());
+				});
+		}
+		if (willFinish && buffer.length == 0) {
+			willFinish = false;
+			Defer.nextTick(() -> finishSignal.emit(new NoData()));
+		}
+		return chunk;
+	}
+
+	// override by implementing classes
+	function internalWrite():Void {
+		throw "not implemented";
+	}
+
+	// for producers
+	public function write(chunk:Bytes):Bool {
+		if (done)
+			throw "stream already done";
+		buffer.add(chunk);
+		bufferLength += chunk.length;
+		if (corkCount <= 0)
+			internalWrite();
+		if (bufferLength >= highWaterMark) {
+			willDrain = true;
+			return false;
+		}
+		return true;
+	}
+
+	public function end():Void {
+		corkCount = 0;
+		if (buffer.length > 0)
+			internalWrite();
+		if (buffer.length > 0)
+			willFinish = true;
+		else
+			finishSignal.emit(new NoData());
+		done = true;
+	}
+
+	public function cork():Void {
+		if (done)
+			return;
+		corkCount++;
+	}
+
+	public function uncork():Void {
+		if (done || corkCount <= 0)
+			return;
+		if (--corkCount == 0 && buffer.length > 0)
+			internalWrite();
+	}
+}