瀏覽代碼

[std] move hxcoro package to hxcoro

And add basic 3rd party tests (eval only for now)
Rudy Ges 5 天之前
父節點
當前提交
ad0b56a397
共有 42 個文件被更改,包括 12 次插入2669 次删除
  1. 0 56
      std/haxe/coro/continuations/RacingContinuation.hx
  2. 0 112
      std/hxcoro/Coro.hx
  3. 0 72
      std/hxcoro/CoroRun.hx
  4. 0 18
      std/hxcoro/components/CoroName.hx
  5. 0 66
      std/hxcoro/concurrent/AtomicInt.hx
  6. 0 68
      std/hxcoro/concurrent/AtomicObject.hx
  7. 0 7
      std/hxcoro/concurrent/CoroMutex.hx
  8. 0 75
      std/hxcoro/concurrent/CoroSemaphore.hx
  9. 0 86
      std/hxcoro/continuations/CancellingContinuation.hx
  10. 0 28
      std/hxcoro/continuations/TimeoutContinuation.hx
  11. 0 92
      std/hxcoro/ds/CircularBuffer.hx
  12. 0 82
      std/hxcoro/ds/ConcurrentCircularBuffer.hx
  13. 0 29
      std/hxcoro/ds/Out.hx
  14. 0 221
      std/hxcoro/ds/PagedDeque.hx
  15. 0 92
      std/hxcoro/ds/channels/Channel.hx
  16. 0 13
      std/hxcoro/ds/channels/IChannelReader.hx
  17. 0 11
      std/hxcoro/ds/channels/IChannelWriter.hx
  18. 0 3
      std/hxcoro/ds/channels/bounded/BoundedChannel.hx
  19. 0 108
      std/hxcoro/ds/channels/bounded/BoundedReader.hx
  20. 0 136
      std/hxcoro/ds/channels/bounded/BoundedWriter.hx
  21. 0 110
      std/hxcoro/ds/channels/bounded/SingleBoundedReader.hx
  22. 0 101
      std/hxcoro/ds/channels/bounded/SingleBoundedWriter.hx
  23. 0 3
      std/hxcoro/ds/channels/unbounded/UnboundedChannel.hx
  24. 0 93
      std/hxcoro/ds/channels/unbounded/UnboundedReader.hx
  25. 0 77
      std/hxcoro/ds/channels/unbounded/UnboundedWriter.hx
  26. 0 9
      std/hxcoro/exceptions/ChannelClosedException.hx
  27. 0 5
      std/hxcoro/exceptions/TimeoutException.hx
  28. 0 4
      std/hxcoro/import.hx
  29. 0 324
      std/hxcoro/task/AbstractTask.hx
  30. 0 221
      std/hxcoro/task/CoroBaseTask.hx
  31. 0 88
      std/hxcoro/task/CoroTask.hx
  32. 0 20
      std/hxcoro/task/ICoroNode.hx
  33. 0 17
      std/hxcoro/task/ICoroTask.hx
  34. 0 8
      std/hxcoro/task/ILocalContext.hx
  35. 0 5
      std/hxcoro/task/NodeLambda.hx
  36. 0 25
      std/hxcoro/task/StartableCoroTask.hx
  37. 0 35
      std/hxcoro/task/node/CoroChildStrategy.hx
  38. 0 33
      std/hxcoro/task/node/CoroScopeStrategy.hx
  39. 0 21
      std/hxcoro/task/node/CoroSupervisorStrategy.hx
  40. 0 13
      std/hxcoro/task/node/INodeStrategy.hx
  41. 0 81
      std/hxcoro/util/Convenience.hx
  42. 12 1
      tests/runci/targets/Macro.hx

+ 0 - 56
std/haxe/coro/continuations/RacingContinuation.hx

@@ -1,56 +0,0 @@
-package haxe.coro.continuations;
-
-import hxcoro.concurrent.AtomicInt;
-import haxe.coro.context.Context;
-import haxe.coro.schedulers.Scheduler;
-import haxe.coro.schedulers.IScheduleObject;
-
-private enum abstract State(Int) to Int {
-	var Active;
-	var Resumed;
-	var Resolved;
-}
-
-class RacingContinuation<T> extends SuspensionResult<T> implements IContinuation<T> implements IScheduleObject {
-	final inputCont:IContinuation<T>;
-
-	var resumeState:AtomicInt;
-
-	public var context(get, never):Context;
-
-	final scheduler:Scheduler;
-
-	public function new(inputCont:IContinuation<T>) {
-		this.inputCont = inputCont;
-		resumeState = new AtomicInt(Active);
-		scheduler = context.get(Scheduler);
-	}
-
-	inline function get_context() {
-		return inputCont.context;
-	}
-
-	public function resume(result:T, error:Exception):Void {
-		this.result = result;
-		this.error = error;
-		if (resumeState.compareExchange(Active, Resumed) != Active) {
-			scheduler.scheduleObject(this);
-		}
-	}
-
-	public function resolve():Void {
-		if (resumeState.compareExchange(Active, Resolved) == Active) {
-			state = Pending;
-		} else {
-			if (error != null) {
-				state = Thrown;
-			} else {
-				state = Returned;
-			}
-		}
-	}
-
-	public function onSchedule() {
-		inputCont.resume(result, error);
-	}
-}

+ 0 - 112
std/hxcoro/Coro.hx

@@ -1,112 +0,0 @@
-package hxcoro;
-
-import hxcoro.continuations.CancellingContinuation;
-import haxe.coro.IContinuation;
-import haxe.coro.ICancellableContinuation;
-import haxe.coro.schedulers.Scheduler;
-import haxe.coro.cancellation.CancellationToken;
-import haxe.exceptions.CancellationException;
-import haxe.exceptions.ArgumentException;
-import hxcoro.task.NodeLambda;
-import hxcoro.task.CoroTask;
-import hxcoro.exceptions.TimeoutException;
-import hxcoro.continuations.TimeoutContinuation;
-
-class Coro {
-	@:coroutine @:coroutine.transformed
-	public static function suspend<T>(func:IContinuation<T>->Void, completion:IContinuation<T>):T {
-		var safe = new haxe.coro.continuations.RacingContinuation(completion);
-		func(safe);
-		safe.resolve();
-		return cast safe;
-	}
-
-	/**
-	 * Suspends a coroutine which will be automatically resumed with a `haxe.exceptions.CancellationException` when cancelled.
-	 * The `ICancellableContinuation` passed to the function allows registering a callback which is invoked on cancellation
-	 * allowing the easy cleanup of resources.
-	 */
-	@:coroutine @:coroutine.transformed public static function suspendCancellable<T>(func:ICancellableContinuation<T>->Void, completion:IContinuation<T>):T {
-		var safe = new CancellingContinuation(completion);
-		func(safe);
-		return cast safe;
-	}
-
-	static function cancellationRequested(cont:IContinuation<Any>) {
-		return cont.context.get(CancellationToken)?.isCancellationRequested();
-	}
-
-	static function delayImpl<T>(ms:Int, cont:ICancellableContinuation<T>) {
-		final handle = cont.context.get(Scheduler).schedule(ms, () -> {
-			cont.callSync();
-		});
-
-		cont.onCancellationRequested = _ -> {
-			handle.close();
-		}
-	}
-
-	@:coroutine @:coroutine.nothrow public static function delay(ms:Int):Void {
-		suspendCancellable(cont -> delayImpl(ms, cont));
-	}
-
-	@:coroutine @:coroutine.nothrow public static function yield():Void {
-		suspendCancellable(cont -> delayImpl(0, cont));
-	}
-
-	@:coroutine static public function scope<T>(lambda:NodeLambda<T>):T {
-		return suspend(cont -> {
-			final context = cont.context;
-			final scope = new CoroTask(context, CoroTask.CoroScopeStrategy);
-			scope.runNodeLambda(lambda);
-			scope.awaitContinuation(cont);
-		});
-	}
-
-	/**
-		Executes `lambda` in a new task, ignoring all child exceptions.
-
-		The task itself can still raise an exception. This is also true when calling
-		`child.await()` on a child that raises an exception.
-	**/
-	@:coroutine static public function supervisor<T>(lambda:NodeLambda<T>):T {
-		return suspend(cont -> {
-			final context = cont.context;
-			final scope = new CoroTask(context, CoroTask.CoroSupervisorStrategy);
-			scope.runNodeLambda(lambda);
-			scope.awaitContinuation(cont);
-		});
-	}
-
-	/**
-	 * Runs the provided lambda with a timeout, if the timeout is exceeded this functions throws `hxcoro.exceptions.TimeoutException`.
-	 * If a timeout of zero is provided the function immediately throws `hxcoro.exceptions.TimeoutException`.
-	 * @param ms Timeout in milliseconds.
-	 * @param lambda Lambda function to execute.
-	 * @throws `hxcoro.exceptions.TimeoutException` If the timeout is exceeded.
-	 * @throws `haxe.ArgumentException` If the `ms` parameter is less than zero.
-	 */
-	@:coroutine public static function timeout<T>(ms:Int, lambda:NodeLambda<T>):T {
-		return suspend(cont -> {
-			if (ms < 0) {
-				cont.failSync(new ArgumentException('timeout must be positive'));
-
-				return;
-			}
-			if (ms == 0) {
-				cont.failSync(new TimeoutException());
-
-				return;
-			}
-
-			final context = cont.context;
-			final scope = new CoroTask(context, CoroTask.CoroScopeStrategy);
-			final handle = context.get(Scheduler).schedule(ms, () -> {
-				scope.cancel(new TimeoutException());
-			});
-
-			scope.runNodeLambda(lambda);
-			scope.awaitContinuation(new TimeoutContinuation(cont, handle));
-		});
-	}
-}

+ 0 - 72
std/hxcoro/CoroRun.hx

@@ -1,72 +0,0 @@
-package hxcoro;
-
-import haxe.coro.Coroutine;
-import haxe.coro.context.Context;
-import haxe.coro.context.IElement;
-import haxe.coro.schedulers.EventLoopScheduler;
-import hxcoro.task.ICoroTask;
-import hxcoro.task.CoroTask;
-import hxcoro.task.StartableCoroTask;
-import hxcoro.task.NodeLambda;
-
-private abstract RunnableContext(ElementTree) {
-	inline function new(tree:ElementTree) {
-		this = tree;
-	}
-
-	public function create<T>(lambda:NodeLambda<T>):IStartableCoroTask<T> {
-		return new StartableCoroTask(new Context(this), lambda, CoroTask.CoroScopeStrategy);
-	}
-
-	public function run<T>(lambda:NodeLambda<T>):T {
-		return CoroRun.runWith(new Context(this), lambda);
-	}
-
-	@:from static function fromAdjustableContext(context:AdjustableContext) {
-		return new RunnableContext(cast context);
-	}
-
-	public function with(...elements:IElement<Any>):RunnableContext {
-		return new AdjustableContext(this.copy()).with(...elements);
-	}
-}
-
-class CoroRun {
-	static var defaultContext(get, null):Context;
-
-	static function get_defaultContext() {
-		if (defaultContext != null) {
-			return defaultContext;
-		}
-		final stackTraceManagerComponent = new haxe.coro.BaseContinuation.StackTraceManager();
-		defaultContext = Context.create(stackTraceManagerComponent);
-		return defaultContext;
-	}
-
-	public static function with(...elements:IElement<Any>):RunnableContext {
-		return defaultContext.clone().with(...elements);
-	}
-
-	static public function run<T>(lambda:Coroutine<() -> T>):T {
-		return runScoped(_ -> lambda());
-	}
-
-	static public function runScoped<T>(lambda:NodeLambda<T>):T {
-		return runWith(defaultContext, lambda);
-	}
-
-	static public function runWith<T>(context:Context, lambda:NodeLambda<T>):T {
-		final schedulerComponent = new EventLoopScheduler();
-		final scope = new CoroTask(context.clone().with(schedulerComponent), CoroTask.CoroScopeStrategy);
-		scope.runNodeLambda(lambda);
-		while (scope.isActive()) {
-			schedulerComponent.run();
-		}
-		switch (scope.getError()) {
-			case null:
-				return scope.get();
-			case error:
-				throw error;
-		}
-	}
-}

+ 0 - 18
std/hxcoro/components/CoroName.hx

@@ -1,18 +0,0 @@
-package hxcoro.components;
-
-import haxe.coro.context.Key;
-import haxe.coro.context.IElement;
-
-class CoroName implements IElement<CoroName> {
-	static public final key = new Key<CoroName>("Name");
-
-	public final name:String;
-
-	public function new(name:String) {
-		this.name = name;
-	}
-
-	public function getKey() {
-		return key;
-	}
-}

+ 0 - 66
std/hxcoro/concurrent/AtomicInt.hx

@@ -1,66 +0,0 @@
-package hxcoro.concurrent;
-
-import haxe.coro.Mutex;
-
-#if (cpp || hl || js || jvm || eval)
-typedef AtomicInt = haxe.atomic.AtomicInt;
-#else
-typedef AtomicInt = AtomicIntImpl;
-
-private class AtomicIntData {
-	public final mutex:Mutex;
-	public var value:Int;
-
-	public function new(value:Int) {
-		this.value = value;
-		mutex = new Mutex();
-	}
-}
-
-abstract AtomicIntImpl(AtomicIntData) {
-	public function new(v:Int) {
-		this = new AtomicIntData(v);
-	}
-
-	public function load() {
-		return this.value;
-	}
-
-	public function compareExchange(expected:Int, replacement:Int) {
-		this.mutex.acquire();
-		if (this.value == expected) {
-			this.value = replacement;
-			this.mutex.release();
-			return expected;
-		} else {
-			final value = this.value;
-			this.mutex.release();
-			return value;
-		}
-	}
-
-	public function sub(b:Int) {
-		this.mutex.acquire();
-		final value = this.value;
-		this.value -= b;
-		this.mutex.release();
-		return value;
-	}
-
-	public function add(b:Int) {
-		this.mutex.acquire();
-		final value = this.value;
-		this.value += b;
-		this.mutex.release();
-		return value;
-	}
-
-	public function store(b:Int) {
-		this.mutex.acquire();
-		final value = this.value;
-		this.value = b;
-		this.mutex.release();
-		return value;
-	}
-}
-#end

+ 0 - 68
std/hxcoro/concurrent/AtomicObject.hx

@@ -1,68 +0,0 @@
-package hxcoro.concurrent;
-
-#if (hl || jvm)
-typedef AtomicObject<T:{}> = haxe.atomic.AtomicObject<T>;
-#else
-import haxe.coro.Mutex;
-
-typedef AtomicObject<T:{}> = AtomicObjectImpl<T>;
-
-private class AtomicObjectImpl<T:{}> {
-	final mutex : Mutex;
-	var object : T;
-
-	public function new(object) {
-		mutex = new Mutex();
-		this.object = object;
-	}
-	
-	public function compareExchange(expected : T, replacement : T) {
-		mutex.acquire();
-
-		return if (object == expected) {
-			object = replacement;
-			mutex.release();
-
-			expected;
-		} else {
-			final current = object;
-
-			mutex.release();
-
-			current;
-		}
-	}
-
-	public function exchange(replacement : T) {
-		mutex.acquire();
-
-		final current = object;
-
-		object = replacement;
-
-		mutex.release();
-
-		return current;
-	}
-
-	public function load() {
-		mutex.acquire();
-
-		final current = object;
-
-		mutex.release();
-
-		return current;
-	}
-
-	public function store(replacement : T) {
-		mutex.acquire();
-
-		object = replacement;
-
-		mutex.release();
-
-		return replacement;
-	}
-}
-#end

+ 0 - 7
std/hxcoro/concurrent/CoroMutex.hx

@@ -1,7 +0,0 @@
-package hxcoro.concurrent;
-
-class CoroMutex extends CoroSemaphore {
-	public function new() {
-		super(1);
-	}
-}

+ 0 - 75
std/hxcoro/concurrent/CoroSemaphore.hx

@@ -1,75 +0,0 @@
-package hxcoro.concurrent;
-
-import haxe.coro.Mutex;
-import hxcoro.task.CoroTask;
-import hxcoro.ds.PagedDeque;
-import haxe.coro.IContinuation;
-import haxe.coro.cancellation.CancellationToken;
-
-class CoroSemaphore {
-	final maxFree:Int;
-	final dequeMutex:Mutex;
-	var deque:Null<PagedDeque<IContinuation<Any>>>;
-	var free:AtomicInt;
-
-	public function new(free:Int) {
-		maxFree = free;
-		dequeMutex = new Mutex();
-		this.free = new AtomicInt(free);
-	}
-
-	@:coroutine public function acquire() {
-		if (free.sub(1) > 0) {
-			return;
-		}
-		suspendCancellable(cont -> {
-			final task = cont.context.get(CoroTask);
-			dequeMutex.acquire();
-			if (deque == null) {
-				deque = new PagedDeque();
-			}
-			deque.push(cont);
-			task.putOnHold(); // TODO: condition this on some heuristic?
-			dequeMutex.release();
-		});
-	}
-
-	public function tryAcquire() {
-		while (true) {
-			var free = free.load();
-			if (free <= 0) {
-				return false;
-			}
-			if (this.free.compareExchange(free, free - 1) == free) {
-				return true;
-			}
-		}
-	}
-
-	public function release() {
-		free.add(1);
-		dequeMutex.acquire();
-		if (deque == null) {
-			dequeMutex.release();
-			return;
-		}
-		while (true) {
-			if (deque.isEmpty()) {
-				// nobody else wants it right now, return
-				dequeMutex.release();
-				return;
-			}
-			// a continuation waits for this mutex, wake it up now
-			final cont = deque.pop();
-			final ct = cont.context.get(CancellationToken);
-			if (ct.isCancellationRequested()) {
-				// ignore, back to the loop
-			} else {
-				// continue normally
-				dequeMutex.release();
-				cont.callAsync();
-				return;
-			}
-		}
-	}
-}

+ 0 - 86
std/hxcoro/continuations/CancellingContinuation.hx

@@ -1,86 +0,0 @@
-package hxcoro.continuations;
-
-import haxe.coro.SuspensionResult;
-import haxe.coro.schedulers.IScheduleObject;
-import hxcoro.concurrent.AtomicInt;
-import haxe.Exception;
-import haxe.exceptions.CancellationException;
-import haxe.coro.IContinuation;
-import haxe.coro.ICancellableContinuation;
-import haxe.coro.context.Context;
-import haxe.coro.schedulers.Scheduler;
-import haxe.coro.cancellation.ICancellationHandle;
-import haxe.coro.cancellation.CancellationToken;
-import haxe.coro.cancellation.ICancellationCallback;
-
-private enum abstract State(Int) to Int {
-	var Active;
-	var Resumed;
-	var Cancelled;
-}
-
-class CancellingContinuation<T> extends SuspensionResult<T> implements ICancellableContinuation<T> implements ICancellationCallback implements IScheduleObject {
-	final resumeState : AtomicInt;
-
-	final cont : IContinuation<T>;
-
-	final handle : ICancellationHandle;
-
-	public var context (get, never) : Context;
-
-	function get_context() {
-		return cont.context;
-	}
-
-	public var onCancellationRequested (default, set) : CancellationException->Void;
-
-	function set_onCancellationRequested(f : CancellationException->Void) {
-		return switch (cont.context.get(CancellationToken).cancellationException) {
-			case null:
-				if (null != onCancellationRequested) {
-					throw new Exception("Callback already registered");
-				}
-
-				onCancellationRequested = f;
-			case exc:
-				f(exc);
-
-				f;
-		}
-	}
-
-	public function new(cont) {
-		this.resumeState  = new AtomicInt(Active);
-		this.cont   = cont;
-		this.handle = this.cont.context.get(CancellationToken).onCancellationRequested(this);
-		this.state  = Pending;
-	}
-
-	public function resume(result:T, error:Exception) {
-		this.result = result;
-		this.error = error;
-		if (resumeState.compareExchange(Active, Resumed) == Active) {
-			handle.close();
-			context.get(Scheduler).scheduleObject(this);
-		} else {
-			cont.failAsync(error.orCancellationException());
-		}
-
-	}
-
-	public function onCancellation(cause:CancellationException) {
-		handle?.close();
-
-		if (resumeState.compareExchange(Active, Cancelled) == Active) {
-			if (null != onCancellationRequested) {
-				onCancellationRequested(cause);
-			}
-
-			cont.failAsync(cause);
-		}
-	}
-
-	public function onSchedule() {
-		cont.resume(result, error);
-	}
-}

+ 0 - 28
std/hxcoro/continuations/TimeoutContinuation.hx

@@ -1,28 +0,0 @@
-package hxcoro.continuations;
-
-import haxe.Exception;
-import haxe.coro.IContinuation;
-import haxe.coro.context.Context;
-import haxe.coro.schedulers.ISchedulerHandle;
-
-class TimeoutContinuation<T> implements IContinuation<T> {
-	final cont : IContinuation<T>;
-	final handle : ISchedulerHandle;
-
-	public var context (get, never) : Context;
-
-	inline function get_context() {
-		return cont.context;
-	}
-
-	public function new(cont, handle) {
-		this.cont   = cont;
-		this.handle = handle;
-	}
-
-	public function resume(value:T, error:Exception) {
-		handle.close();
-
-		cont.resumeAsync(value, error);
-	}
-}

+ 0 - 92
std/hxcoro/ds/CircularBuffer.hx

@@ -1,92 +0,0 @@
-package hxcoro.ds;
-
-import haxe.ds.Vector;
-import haxe.exceptions.ArgumentException;
-
-final class CircularBuffer<T> {
-	final storage : Vector<T>;
-
-	var head : Int;
-
-	var tail : Int;
-
-	public function new(capacity : Int) {
-		if (capacity < 1) {
-			throw new ArgumentException("capacity", "Capacity must be greater than zero");
-		}
-
-		// We need +1 since we do a "full" check by comparing the head and tail.
-		storage = new Vector(capacity + 1);
-		head    = 0;
-		tail    = 0;
-	}
-
-	public function tryPush(v:T) {
-		final nextHead = increment(head);
-
-		return if (tail != nextHead) {
-			storage[head] = v;
-			head = nextHead;
-
-			true;
-		} else {
-			false;
-		}
-	}
-
-	public function tryPeekHead(out:Out<T>) {
-		if (wasEmpty()) {
-			return false;
-		}
-
-		out.set(storage[decrement(head)]);
-
-		return true;
-	}
-
-	public function tryPopHead(out:Out<T>) {
-		if (wasEmpty()) {
-			return false;
-		}
-
-		head = decrement(head);
-
-		out.set(storage[head]);
-
-		return true;
-	}
-
-	public function tryPopTail(out:Out<T>) {
-		if (wasEmpty()) {
-			return false;
-		}
-
-		out.set(storage[tail]);
-
-		tail = increment(tail);
-
-		return true;
-	}
-
-	public function wasEmpty() {
-		return head == tail;
-	}
-
-	public function wasFull() {
-		final nextHead = increment(head);
-
-		return nextHead == tail;
-	}
-
-	inline function increment(v : Int) {
-		return (v + 1) % storage.length;
-	}
-
-	inline function decrement(v : Int) {
-		return if (v == 0) {
-			storage.length - 1;
-		} else {
-			v - 1;
-		}
-	}
-}

+ 0 - 82
std/hxcoro/ds/ConcurrentCircularBuffer.hx

@@ -1,82 +0,0 @@
-package hxcoro.ds;
-
-import haxe.ds.Vector;
-import haxe.exceptions.ArgumentException;
-import hxcoro.concurrent.AtomicInt;
-
-/**
- * Thread safe FIFO circular buffer.
- * 
- * This buffer supports at most a single producer and a single consumer at any one time,
- * the behaviour when multiple produces and consumers act on the buffer is undefined.
- */
-final class ConcurrentCircularBuffer<T> {
-	final storage : Vector<T>;
-
-	final head : AtomicInt;
-
-	final tail : AtomicInt;
-
-	public function new(capacity : Int) {
-		if (capacity < 1) {
-			throw new ArgumentException("capacity", "Capacity must be greater than zero");
-		}
-
-		// We need +1 since we do a "full" check by comparing the head and tail.
-		storage = new Vector(capacity + 1);
-		head    = new AtomicInt(0);
-		tail    = new AtomicInt(0);
-	}
-
-	/**
-	 * Attempts to add an item to the end of the buffer.
-	 * @param v Item to add.
-	 * @returns `true` if the item was added to the buffer, otherwise `false`.
-	 */
-	public function tryPush(v : T) {
-		final currentTail = tail.load();
-		final nextTail    = increment(currentTail);
-
-		if (nextTail != head.load()) {
-			storage[currentTail] = v;
-			tail.store(nextTail);
-			return true;
-		}
-
-		return false;
-	}
-
-	/**
-	 * Attempts to remove an item from the beginning of the buffer.
-	 * @param out If this function returns `true` the removed item will be stored in this out object.
-	 * @returns `true` if an item was removed from the buffer, otherwise `false`.
-	 */
-	public function tryPop(out : Out<T>) {
-		final currentHead = head.load();
-		if (currentHead == tail.load()) {
-			return false;
-		}
-
-		// Note : We should probably wipe the previous value here to prevent references being kept to otherwise dead objects.
-		// is it safe to do a `= null` even if the circular buffer is storing, say, ints?
-		out.set(storage[currentHead]);
-
-		head.store(increment(currentHead));
-
-		return true;
-	}
-
-	public function wasEmpty() {
-		return head.load() == tail.load();
-	}
-
-	public function wasFull() {
-		final nextTail = increment(tail.load());
-
-		return nextTail == head.load();
-	}
-
-	inline function increment(v : Int) {
-		return (v + 1) % storage.length;
-	}
-}

+ 0 - 29
std/hxcoro/ds/Out.hx

@@ -1,29 +0,0 @@
-package hxcoro.ds;
-
-private abstract OutData<T>(Array<T>) from Array<T> {
-	public inline function new() {
-		this = [];
-	}
-
-	public inline function set(v:T):Void {
-		this[0] = v;
-	}
-
-	public inline function get() {
-		return this[0];
-	}
-}
-
-abstract Out<T>(OutData<T>) {
-	public inline function new() {
-		this = new OutData();
-	}
-
-	public inline function get() {
-		return this.get();
-	}
-
-	public inline function set(v:T) {
-		this.set(v);
-	}
-}

+ 0 - 221
std/hxcoro/ds/PagedDeque.hx

@@ -1,221 +0,0 @@
-package hxcoro.ds;
-
-import haxe.ds.Vector;
-import haxe.Exception;
-import hxcoro.ds.Out;
-
-class Page<T> {
-	public final data:Vector<T>;
-	public var numDeleted:Int;
-	public var next:Null<Page<T>>;
-
-	public function new(size:Int) {
-		this.data = new Vector(size);
-		numDeleted = 0;
-	}
-
-	function removeFrom(element:T, startIndex:Int) {
-		for (i in startIndex...data.length - numDeleted) {
-			if (data[i] == element) {
-				blitAt(i);
-				return true;
-			}
-		}
-		return false;
-	}
-
-	public function reset() {
-		numDeleted = 0;
-		next = null;
-	}
-
-	public inline function freeSpace() {
-		return data.length - numDeleted;
-	}
-
-	function blitAt(index:Int) {
-		final toBlit = freeSpace() - index - 1;
-		if (toBlit > 0) {
-			Vector.blit(data, index + 1, data, index, toBlit);
-		}
-		numDeleted++;
-	}
-}
-
-class PagedDeque<T> {
-	final vectorSize:Int;
-	var currentPage:Page<T>;
-	var currentIndex:Int;
-	var lastPage:Page<T>;
-	public var lastIndex(default, null):Int;
-
-	public function new(vectorSize = 8) {
-		this.vectorSize = vectorSize;
-		currentPage = new Page(vectorSize);
-		currentIndex = 0;
-		lastPage = currentPage;
-		lastIndex = 0;
-	}
-
-	inline function getPageDataAt<T>(page:Page<T>, index:Int) {
-		return page.data[index];
-	}
-
-	inline function setPageDataAt<T>(page:Page<T>, index:Int, value:T) {
-		page.data[index - page.numDeleted] = value;
-	}
-
-	public function forEach(f:T->Void) {
-		var currentPage = currentPage;
-		var currentIndex = currentIndex;
-		while (currentPage != lastPage) {
-			while (currentIndex < currentPage.freeSpace()) {
-				f(getPageDataAt(currentPage, currentIndex++));
-			}
-			currentIndex = 0;
-			currentPage = currentPage.next;
-		}
-		while (currentIndex < lastIndex - currentPage.numDeleted) {
-			f(getPageDataAt(currentPage, currentIndex++));
-		}
-	}
-
-	public function mapInPlace(f:T->T) {
-		var currentPage = currentPage;
-		var currentIndex = currentIndex;
-		while (currentPage != lastPage) {
-			while (currentIndex < currentPage.freeSpace()) {
-				setPageDataAt(currentPage, currentIndex, f(getPageDataAt(currentPage, currentIndex++)));
-			}
-			currentIndex = 0;
-			currentPage = currentPage.next;
-		}
-		while (currentIndex < lastIndex) {
-			setPageDataAt(currentPage, currentIndex, f(getPageDataAt(currentPage, currentIndex++)));
-		}
-	}
-
-	public function fold<A>(acc:A, f:(acc:A, elt:T) -> A) {
-		var currentPage = currentPage;
-		var currentIndex = currentIndex;
-		while (currentPage != lastPage) {
-			while (currentIndex < currentPage.freeSpace()) {
-				acc = f(acc, getPageDataAt(currentPage, currentIndex++));
-			}
-			currentIndex = 0;
-			currentPage = currentPage.next;
-		}
-		while (currentIndex < lastIndex) {
-			acc = f(acc, getPageDataAt(currentPage, currentIndex++));
-		}
-		return acc;
-	}
-
-	public function push(x:T) {
-		if (lastIndex == lastPage.freeSpace()) {
-			// current page is full
-			if (lastPage.next == null) {
-				// we have no next page, allocate one
-				lastPage.next = new Page(vectorSize);
-			}
-			lastPage = lastPage.next;
-			lastPage.next = null;
-			lastIndex = 1;
-			setPageDataAt(lastPage, 0, x);
-			return lastPage;
-		}
-		setPageDataAt(lastPage, lastIndex++, x);
-		return lastPage;
-	}
-
-	public function pop() {
-		if (currentIndex == currentPage.freeSpace()) {
-			// end of page, need to swap
-			var nextPage = currentPage.next;
-			if (nextPage == null) {
-				throw new Exception("pop() was called on empty PagedDeque");
-			}
-			if (lastPage.next == null) {
-				// reuse current page as next last page
-				lastPage.next = currentPage;
-				currentPage.next = null;
-				currentPage.reset();
-			}
-			currentPage = nextPage;
-			currentIndex = 1;
-			return getPageDataAt(currentPage, 0);
-		} else if (currentIndex == currentPage.freeSpace() - 1 && currentPage.next == null) {
-			// deque is empty, reset to reuse current page
-			resetCurrent();
-			return getPageDataAt(currentPage, currentPage.freeSpace() - 1);
-		} else {
-			return getPageDataAt(currentPage, currentIndex++);
-		}
-	}
-
-	public function remove(page:Page<T>, element:T) {
-		return if (page == currentPage) {
-			@:privateAccess page.removeFrom(element, currentIndex);
-		} else {
-			@:privateAccess page.removeFrom(element, 0);
-		}
-	}
-
-	public function tryPop(out:Out<T>) {
-		if (isEmpty()) {
-			// TODO: could probably integrate this better in the branches below
-			return false;
-		}
-		if (currentIndex == vectorSize) {
-			// end of page, need to swap
-			var nextPage = currentPage.next;
-			if (lastPage.next == null) {
-				// reuse current page as next last page
-				lastPage.next = currentPage;
-				currentPage.next = null;
-			}
-			currentPage = nextPage;
-			currentIndex = 1;
-			out.set(currentPage.data[0]);
-			return true;
-		} else if (currentIndex == vectorSize - 1 && currentPage.next == null) {
-			// deque is empty, reset to reuse current page
-			currentIndex = 0;
-			lastIndex = 0;
-			out.set(currentPage.data[vectorSize - 1]);
-			return true;
-		} else {
-			out.set(currentPage.data[currentIndex++]);
-			return true;
-		}
-	}
-
-	public function tryPeek(out:Out<T>) {
-		if (isEmpty()) {
-			return false;
-		}
-
-		out.set(getPageDataAt(lastPage, lastIndex - 1));
-
-		return true;
-	}
-
-	public function isEmpty() {
-		while (currentIndex == currentPage.freeSpace()) {
-			if (currentPage.next == null || currentPage == lastPage) {
-				resetCurrent();
-				return true;
-			}
-			currentPage = currentPage.next;
-			currentIndex = 0;
-		}
-
-		return currentPage == lastPage && currentIndex == lastIndex - currentPage.numDeleted;
-	}
-
-	function resetCurrent() {
-		currentIndex = 0;
-		lastIndex = 0;
-		currentPage.reset();
-	}
-}

+ 0 - 92
std/hxcoro/ds/channels/Channel.hx

@@ -1,92 +0,0 @@
-package hxcoro.ds.channels;
-
-import haxe.coro.IContinuation;
-import haxe.exceptions.ArgumentException;
-import hxcoro.ds.Out;
-import hxcoro.ds.PagedDeque;
-import hxcoro.ds.CircularBuffer;
-import hxcoro.ds.channels.bounded.BoundedReader;
-import hxcoro.ds.channels.bounded.BoundedWriter;
-import hxcoro.ds.channels.bounded.SingleBoundedReader;
-import hxcoro.ds.channels.bounded.SingleBoundedWriter;
-import hxcoro.ds.channels.bounded.BoundedChannel;
-import hxcoro.ds.channels.unbounded.UnboundedReader;
-import hxcoro.ds.channels.unbounded.UnboundedWriter;
-import hxcoro.ds.channels.unbounded.UnboundedChannel;
-import hxcoro.concurrent.AtomicObject;
-
-typedef DropCallback<T> = (dropped : T)->Void;
-
-enum FullBehaviour<T> {
-	Wait;
-	DropNewest(f : DropCallback<T>);
-	DropOldest(f : DropCallback<T>);
-	DropWrite(f : DropCallback<T>);
-}
-
-typedef ChannelOptions = {
-	var ?singleReader : Bool;
-
-	var ?singleWriter : Bool;
-}
-
-typedef BoundedChannelOptions<T> = ChannelOptions & {
-	var size : Int;
-
-	var ?writeBehaviour : FullBehaviour<T>;
-}
-
-abstract class Channel<T> {
-
-	public final reader : IChannelReader<T>;
-
-	public final writer : IChannelWriter<T>;
-
-	function new(reader, writer) {
-		this.reader = reader;
-		this.writer = writer;
-	}
-
-	public static function createBounded<T>(options : BoundedChannelOptions<T>):Channel<T> { 
-		if (options.size < 1) {
-			throw new ArgumentException("size");
-		}
-
-		final closed         = new Out();
-		final writeBehaviour = options.writeBehaviour ?? Wait;
-		
-		// TODO : Revisit this single consumer producer implementation once we have threading in and can make some comparisons.
-		// final singleReader   = options.singleReader ?? false;
-		// final singleWriter   = options.singleWriter ?? false;
-		// if (singleReader && singleWriter && writeBehaviour.match(DropNewest(_)) == false && writeBehaviour.match(DropOldest(_)) == false) {
-		// 	final buffer      = new ConcurrentCircularBuffer(options.size);
-		// 	final readWaiter  = new AtomicObject<IContinuation<Bool>>(null);
-		// 	final writeWaiter = new AtomicObject<IContinuation<Bool>>(null);
-			
-		// 	return
-		// 		new BoundedChannel(
-		// 			new SingleBoundedReader(buffer, writeWaiter, readWaiter, closed),
-		// 			new SingleBoundedWriter(buffer, writeWaiter, readWaiter, closed, writeBehaviour));
-		// }
-
-		final buffer       = new CircularBuffer(options.size);
-		final readWaiters  = new PagedDeque();
-		final writeWaiters = new PagedDeque();
-
-		return
-			new BoundedChannel(
-				new BoundedReader(buffer, writeWaiters, readWaiters, closed),
-				new BoundedWriter(buffer, writeWaiters, readWaiters, closed, writeBehaviour));
-	}
-
-	public static function createUnbounded<T>(options : ChannelOptions):Channel<T> {
-		final closed      = new Out();
-		final buffer      = new PagedDeque();
-		final readWaiters = new PagedDeque();
-
-		return
-			new UnboundedChannel(
-				new UnboundedReader(buffer, readWaiters, closed),
-				new UnboundedWriter(buffer, readWaiters, closed));
-	}
-}

+ 0 - 13
std/hxcoro/ds/channels/IChannelReader.hx

@@ -1,13 +0,0 @@
-package hxcoro.ds.channels;
-
-import hxcoro.ds.Out;
-
-interface IChannelReader<T> {
-	function tryRead(out:Out<T>):Bool;
-
-	function tryPeek(out:Out<T>):Bool;
-
-	@:coroutine function read():T;
-
-	@:coroutine function waitForRead():Bool;
-}

+ 0 - 11
std/hxcoro/ds/channels/IChannelWriter.hx

@@ -1,11 +0,0 @@
-package hxcoro.ds.channels;
-
-interface IChannelWriter<T> {
-	function tryWrite(out:T):Bool;
-
-	@:coroutine function write(v:T):Void;
-
-	@:coroutine function waitForWrite():Bool;
-
-	function close():Void;
-}

+ 0 - 3
std/hxcoro/ds/channels/bounded/BoundedChannel.hx

@@ -1,3 +0,0 @@
-package hxcoro.ds.channels.bounded;
-
-class BoundedChannel<T> extends Channel<T> {}

+ 0 - 108
std/hxcoro/ds/channels/bounded/BoundedReader.hx

@@ -1,108 +0,0 @@
-package hxcoro.ds.channels.bounded;
-
-import haxe.Exception;
-import haxe.coro.IContinuation;
-import haxe.coro.context.Context;
-import hxcoro.ds.Out;
-import hxcoro.ds.CircularBuffer;
-import hxcoro.exceptions.ChannelClosedException;
-
-using hxcoro.util.Convenience;
-
-private final class WaitContinuation<T> implements IContinuation<Bool> {
-	final cont : IContinuation<Bool>;
-
-	final buffer : CircularBuffer<T>;
-
-	final closed : Out<Bool>;
-
-	public var context (get, never) : Context;
-
-	function get_context() {
-		return cont.context;
-	}
-
-	public function new(cont, buffer, closed) {
-		this.cont   = cont;
-		this.buffer = buffer;
-		this.closed = closed;
-	}
-
-	public function resume(result:Bool, error:Exception) {
-		if (false == result) {
-			closed.set(false);
-
-			cont.succeedAsync(buffer.wasEmpty());
-		} else {
-			cont.succeedAsync(true);
-		}
-	}
-}
-
-final class BoundedReader<T> implements IChannelReader<T> {
-	final buffer : CircularBuffer<T>;
-
-	final writeWaiters : PagedDeque<IContinuation<Bool>>;
-
-	final readWaiters : PagedDeque<IContinuation<Bool>>;
-
-	final closed : Out<Bool>;
-
-	public function new(buffer, writeWaiters, readWaiters, closed) {
-		this.buffer        = buffer;
-		this.writeWaiters  = writeWaiters;
-		this.readWaiters   = readWaiters;
-		this.closed        = closed;
-	}
-
-	public function tryRead(out:Out<T>):Bool {
-		return if (buffer.tryPopTail(out)) {
-			final cont = new Out();
-			while (writeWaiters.tryPop(cont)) {
-				cont.get().succeedAsync(true);
-			}
-
-			true;
-		} else {
-			false;
-		}
-	}
-
-	public function tryPeek(out:Out<T>):Bool {
-		return buffer.tryPeekHead(out);
-	}
-
-	@:coroutine public function read():T {
-		final out = new Out();
-
-		while (true)
-		{
-			if (waitForRead() == false) {
-				throw new ChannelClosedException();
-			}
-
-			if (tryRead(out)) {
-				return out.get();
-			}
-		}
-	}
-
-	@:coroutine public function waitForRead():Bool {
-		if (buffer.wasEmpty() == false) {
-			return true;
-		}
-
-		if (closed.get()) {
-			return false;
-		}
-
-		return suspendCancellable(cont -> {
-			final obj       = new WaitContinuation(cont, buffer, closed);
-			final hostPage  = readWaiters.push(obj);
-
-			cont.onCancellationRequested = _ -> {
-				readWaiters.remove(hostPage, obj);
-			}
-		});
-	}
-}

+ 0 - 136
std/hxcoro/ds/channels/bounded/BoundedWriter.hx

@@ -1,136 +0,0 @@
-package hxcoro.ds.channels.bounded;
-
-import haxe.Exception;
-import haxe.coro.IContinuation;
-import hxcoro.ds.Out;
-import hxcoro.ds.CircularBuffer;
-import hxcoro.ds.channels.Channel;
-import hxcoro.exceptions.ChannelClosedException;
-
-using hxcoro.util.Convenience;
-
-final class BoundedWriter<T> implements IChannelWriter<T> {
-	final closed : Out<Bool>;
-
-	final buffer : CircularBuffer<T>;
-
-	final writeWaiters : PagedDeque<IContinuation<Bool>>;
-
-	final readWaiters : PagedDeque<IContinuation<Bool>>;
-
-	final behaviour : FullBehaviour<T>;
-
-	public function new(buffer, writeWaiters, readWaiters, closed, behaviour) {
-		this.buffer        = buffer;
-		this.writeWaiters  = writeWaiters;
-		this.readWaiters   = readWaiters;
-		this.closed        = closed;
-		this.behaviour     = behaviour;
-	}
-
-	public function tryWrite(v:T):Bool {
-		if (closed.get()) {
-			return false;
-		}
-
-		return if (buffer.tryPush(v)) {
-			final cont = new Out();
-			while (readWaiters.tryPop(cont)) {
-				cont.get().succeedAsync(true);
-			}
-
-			true;
-		} else {
-			false;
-		}
-	}
-
-	@:coroutine public function write(v:T) {
-		if (tryWrite(v)) {
-			return;
-		}
-
-		switch behaviour {
-			case Wait:
-				while (waitForWrite()) {
-					if (tryWrite(v)) {
-						return;
-					}
-				}
-			case DropNewest(f):
-				final out = new Out();
-
-				while (tryWrite(v) == false) {
-					if (buffer.tryPopHead(out)) {
-						f(out.get());
-					} else {
-						throw new Exception('Failed to drop newest item');
-					}
-				}
-
-				return;
-			case DropOldest(f):
-				final out = new Out();
-
-				while (tryWrite(v) == false) {
-					if (buffer.tryPopTail(out)) {
-						f(out.get());
-					} else {
-						throw new Exception('Failed to drop oldest item');
-					}
-				}
-				
-				return;
-			case DropWrite(f):
-				f(v);
-
-				return;
-		}
-
-		throw new ChannelClosedException();
-	}
-
-	@:coroutine public function waitForWrite():Bool {
-		if (closed.get()) {
-			return false;
-		}
-
-		return if (buffer.wasFull()) {
-			return suspendCancellable(cont -> {
-				final hostPage  = writeWaiters.push(cont);
-
-				cont.onCancellationRequested = _ -> {
-					writeWaiters.remove(hostPage, cont);
-				}
-			});
-		} else {
-			true;
-		}
-	}
-
-	public function close() {
-		if (closed.get()) {
-			return;
-		}
-
-		closed.set(true);
-
-		while (writeWaiters.isEmpty() == false) {
-			switch writeWaiters.pop() {
-				case null:
-					continue;
-				case cont:
-					cont.succeedAsync(false);
-			}
-		};
-
-		while (readWaiters.isEmpty() == false) {
-			switch (readWaiters.pop()) {
-				case null:
-					continue;
-				case cont:
-					cont.succeedAsync(false);
-			}
-		};
-	}
-}

+ 0 - 110
std/hxcoro/ds/channels/bounded/SingleBoundedReader.hx

@@ -1,110 +0,0 @@
-package hxcoro.ds.channels.bounded;
-
-import haxe.Exception;
-import haxe.coro.IContinuation;
-import haxe.coro.context.Context;
-import hxcoro.ds.Out;
-import hxcoro.exceptions.ChannelClosedException;
-import hxcoro.ds.ConcurrentCircularBuffer;
-import hxcoro.concurrent.AtomicObject;
-
-using hxcoro.util.Convenience;
-
-private final class WaitContinuation<T> implements IContinuation<Bool> {
-	final cont : IContinuation<Bool>;
-
-	final buffer : ConcurrentCircularBuffer<T>;
-
-	final closed : Out<Bool>;
-
-	public var context (get, never) : Context;
-
-	function get_context() {
-		return cont.context;
-	}
-
-	public function new(cont, buffer, closed) {
-		this.cont   = cont;
-		this.buffer = buffer;
-		this.closed = closed;
-	}
-
-	public function resume(result:Bool, error:Exception) {
-		if (false == result) {
-			closed.set(false);
-
-			cont.succeedAsync(buffer.wasEmpty());
-		} else {
-			cont.succeedAsync(true);
-		}
-	}
-}
-
-final class SingleBoundedReader<T> implements IChannelReader<T> {
-	final buffer : ConcurrentCircularBuffer<T>;
-
-	final writeWaiter : AtomicObject<IContinuation<Bool>>;
-
-	final readWaiter : AtomicObject<IContinuation<Bool>>;
-
-	final closed : Out<Bool>;
-
-	final readOut : Out<T>;
-
-	public function new(buffer, writeWaiter, readWaiter, closed) {
-		this.buffer      = buffer;
-		this.writeWaiter = writeWaiter;
-		this.readWaiter  = readWaiter;
-		this.closed      = closed;
-		this.readOut     = new Out();
-	}
-
-	public function tryRead(out:Out<T>):Bool {
-		return if (buffer.tryPop(readOut)) {
-
-			writeWaiter.exchange(null)?.succeedAsync(true);
-			
-			true;
-		} else {
-			false;
-		}
-	}
-
-	public function tryPeek(out:Out<T>):Bool {
-		throw new haxe.exceptions.NotImplementedException();
-	}
-
-	@:coroutine public function read():T {
-		final out = new Out();
-
-		while (true)
-		{
-			if (waitForRead() == false) {
-				throw new ChannelClosedException();
-			}
-
-			if (tryRead(out)) {
-				return out.get();
-			}
-		}
-	}
-
-	@:coroutine public function waitForRead():Bool {
-		if (buffer.wasEmpty() == false) {
-			return true;
-		}
-
-		if (closed.get()) {
-			return false;
-		}
-
-		return suspendCancellable(cont -> {
-			final obj       = new WaitContinuation(cont, buffer, closed);
-			final hostPage  = readWaiter.store(obj);
-
-			cont.onCancellationRequested = _ -> {
-				readWaiter.store(null);
-			}
-		});
-	}
-}

+ 0 - 101
std/hxcoro/ds/channels/bounded/SingleBoundedWriter.hx

@@ -1,101 +0,0 @@
-package hxcoro.ds.channels.bounded;
-
-import haxe.Exception;
-import hxcoro.ds.ConcurrentCircularBuffer;
-import hxcoro.concurrent.AtomicObject;
-import haxe.coro.IContinuation;
-import hxcoro.ds.Out;
-import hxcoro.ds.channels.Channel;
-import hxcoro.exceptions.ChannelClosedException;
-
-using hxcoro.util.Convenience;
-
-final class SingleBoundedWriter<T> implements IChannelWriter<T> {
-	final closed : Out<Bool>;
-
-	final buffer : ConcurrentCircularBuffer<T>;
-
-	final writeWaiter : AtomicObject<IContinuation<Bool>>;
-
-	final readWaiter : AtomicObject<IContinuation<Bool>>;
-
-	final behaviour : FullBehaviour<T>;
-
-	final writeOut : Out<T>;
-
-	public function new(buffer, writeWaiter, readWaiter, closed, behaviour) {
-		this.buffer      = buffer;
-		this.writeWaiter = writeWaiter;
-		this.readWaiter  = readWaiter;
-		this.closed      = closed;
-		this.behaviour   = behaviour;
-		this.writeOut    = new Out();
-	}
-
-	public function tryWrite(v:T):Bool {
-		if (closed.get()) {
-			return false;
-		}
-
-		return if (buffer.tryPush(v)) {
-
-			readWaiter.exchange(null)?.succeedAsync(true);
-
-			true;
-		} else {
-			false;
-		}
-	}
-
-	@:coroutine public function write(v:T) {
-		if (tryWrite(v)) {
-			return;
-		}
-
-		switch behaviour {
-			case Wait:
-				while (waitForWrite()) {
-					if (tryWrite(v)) {
-						return;
-					}
-				}
-			case DropWrite(f):
-				f(v);
-
-				return;
-			case _:
-				throw new Exception("Unsupported behaviour mode");
-		}
-
-		throw new ChannelClosedException();
-	}
-
-	@:coroutine public function waitForWrite():Bool {
-		if (closed.get()) {
-			return false;
-		}
-
-		return if (buffer.wasFull()) {
-			suspendCancellable(cont -> {
-				writeWaiter.store(cont);
-
-				cont.onCancellationRequested = _ -> {
-					writeWaiter.store(null);
-				}
-			});
-		} else {
-			true;
-		}
-	}
-
-	public function close() {
-		if (closed.get()) {
-			return;
-		}
-
-		closed.set(true);
-
-		writeWaiter.exchange(null)?.succeedAsync(false);
-		readWaiter.exchange(null)?.succeedAsync(false);
-	}
-}

+ 0 - 3
std/hxcoro/ds/channels/unbounded/UnboundedChannel.hx

@@ -1,3 +0,0 @@
-package hxcoro.ds.channels.unbounded;
-
-class UnboundedChannel<T> extends Channel<T> {}

+ 0 - 93
std/hxcoro/ds/channels/unbounded/UnboundedReader.hx

@@ -1,93 +0,0 @@
-package hxcoro.ds.channels.unbounded;
-
-import haxe.Exception;
-import haxe.coro.IContinuation;
-import haxe.coro.context.Context;
-import hxcoro.ds.Out;
-import hxcoro.exceptions.ChannelClosedException;
-
-private final class WaitContinuation<T> implements IContinuation<Bool> {
-	final cont : IContinuation<Bool>;
-
-	final buffer : PagedDeque<T>;
-
-	final closed : Out<Bool>;
-
-	public var context (get, never) : Context;
-
-	function get_context() {
-		return cont.context;
-	}
-
-	public function new(cont, buffer, closed) {
-		this.cont   = cont;
-		this.buffer = buffer;
-		this.closed = closed;
-	}
-
-	public function resume(result:Bool, error:Exception) {
-		if (false == result) {
-			closed.set(false);
-
-			cont.succeedAsync(buffer.isEmpty());
-		} else {
-			cont.succeedAsync(true);
-		}
-	}
-}
-
-final class UnboundedReader<T> implements IChannelReader<T> {
-	final buffer : PagedDeque<T>;
-
-	final readWaiters : PagedDeque<IContinuation<Bool>>;
-
-	final closed : Out<Bool>;
-
-	public function new(buffer, readWaiters, closed) {
-		this.buffer      = buffer;
-		this.readWaiters = readWaiters;
-		this.closed      = closed;
-	}
-
-	public function tryRead(out:Out<T>):Bool {
-		return buffer.tryPop(out);
-	}
-
-	public function tryPeek(out:Out<T>):Bool {
-		return buffer.tryPeek(out);
-	}
-
-	@:coroutine public function read():T {
-		final out = new Out();
-
-		while (true)
-		{
-			if (waitForRead() == false) {
-				throw new ChannelClosedException();
-			}
-
-			if (tryRead(out)) {
-				return out.get();
-			}
-		}
-	}
-
-	@:coroutine public function waitForRead():Bool {
-		if (buffer.isEmpty() == false) {
-			return true;
-		}
-
-		if (closed.get()) {
-			return false;
-		}
-
-		return suspendCancellable(cont -> {
-			final obj       = new WaitContinuation(cont, buffer, closed);
-			final hostPage  = readWaiters.push(obj);
-
-			cont.onCancellationRequested = _ -> {
-				readWaiters.remove(hostPage, obj);
-			}
-		});
-	}
-}

+ 0 - 77
std/hxcoro/ds/channels/unbounded/UnboundedWriter.hx

@@ -1,77 +0,0 @@
-package hxcoro.ds.channels.unbounded;
-
-import haxe.coro.IContinuation;
-import hxcoro.Coro;
-import hxcoro.ds.Out;
-import hxcoro.ds.PagedDeque;
-import hxcoro.exceptions.ChannelClosedException;
-
-using hxcoro.util.Convenience;
-
-final class UnboundedWriter<T> implements IChannelWriter<T> {
-	final closed : Out<Bool>;
-
-	final buffer : PagedDeque<T>;
-
-	final readWaiters : PagedDeque<IContinuation<Bool>>;
-
-	public function new(buffer, readWaiters, closed) {
-		this.buffer      = buffer;
-		this.readWaiters = readWaiters;
-		this.closed      = closed;
-	}
-
-	public function tryWrite(v:T):Bool {
-		if (closed.get()) {
-			return false;
-		}
-
-		final _ = buffer.push(v);
-
-		final cont = new Out();
-		while (readWaiters.tryPop(cont)) {
-			cont.get().succeedAsync(true);
-		}
-
-		return true;
-	}
-
-	@:coroutine public function waitForWrite():Bool {
-		checkCancellation();
-
-		if (closed.get()) {
-			return false;
-		}
-
-		return true;
-	}
-
-	@:coroutine public function write(v:T) {
-		while (waitForWrite()) {
-			if (tryWrite(v)) {
-				return;
-			}
-		}
-
-		throw new ChannelClosedException();
-	}
-
-	public function close() {
-		if (closed.get()) {
-			return;
-		}
-
-		closed.set(true);
-
-		final cont = new Out();
-		while (readWaiters.tryPop(cont)) {
-			cont.get().succeedAsync(false);
-		}
-	}
-
-	@:coroutine function checkCancellation() {
-		return Coro.suspendCancellable(cont -> {
-			cont.succeedAsync(null);
-		});
-	}
-}

+ 0 - 9
std/hxcoro/exceptions/ChannelClosedException.hx

@@ -1,9 +0,0 @@
-package hxcoro.exceptions;
-
-import haxe.Exception;
-
-class ChannelClosedException extends Exception {
-	public function new() {
-		super('Channel closed');
-	}
-}

+ 0 - 5
std/hxcoro/exceptions/TimeoutException.hx

@@ -1,5 +0,0 @@
-package hxcoro.exceptions;
-
-import haxe.exceptions.CancellationException;
-
-class TimeoutException extends CancellationException {}

+ 0 - 4
std/hxcoro/import.hx

@@ -1,4 +0,0 @@
-package hxcoro;
-
-using hxcoro.util.Convenience;
-import hxcoro.Coro.*;

+ 0 - 324
std/hxcoro/task/AbstractTask.hx

@@ -1,324 +0,0 @@
-package hxcoro.task;
-
-import hxcoro.concurrent.AtomicInt;
-import haxe.coro.cancellation.ICancellationToken;
-import haxe.coro.cancellation.ICancellationHandle;
-import haxe.coro.cancellation.ICancellationCallback;
-import haxe.exceptions.CancellationException;
-import haxe.Exception;
-
-enum abstract TaskState(Int) {
-	final Created;
-	final Running;
-	final Completing;
-	final Completed;
-	final Cancelling;
-	final Cancelled;
-}
-
-private class TaskException extends Exception {}
-
-private class CancellationHandle implements ICancellationHandle {
-	final callback:ICancellationCallback;
-	final task:AbstractTask;
-
-	var closed:Bool;
-
-	public function new(callback, task) {
-		this.callback = callback;
-		this.task = task;
-
-		closed = false;
-	}
-
-	public function run() {
-		if (closed) {
-			return;
-		}
-
-		final error = task.getError();
-		callback.onCancellation(error.orCancellationException());
-
-		closed = true;
-	}
-
-	public function close() {
-		if (closed) {
-			return;
-		}
-		final all = @:privateAccess task.cancellationCallbacks;
-
-		if (all != null) {
-			if (all.length == 1 && all[0] == this) {
-				all.resize(0);
-			} else {
-				all.remove(this);
-			}
-		}
-
-		closed = true;
-	}
-}
-
-private class NoOpCancellationHandle implements ICancellationHandle {
-	public function new() {}
-
-	public function close() {}
-}
-
-/**
-	AbstractTask is the base class for tasks which manages its `TaskState` and children.
-
-	Developer note: it should have no knowledge of any asynchronous behavior or anything related to coroutines,
-	and should be kept in a state where it could even be moved outside the hxcoro package. Also, `state` should
-	be treated like a truly private variable and only be modified from within this class.
-**/
-abstract class AbstractTask<T = Any> implements ICancellationToken {
-	static final atomicId = new AtomicInt(1); // start with 1 so we can use 0 for "no task" situations
-	static final noOpCancellationHandle = new NoOpCancellationHandle();
-
-	final parent:AbstractTask<Any>;
-
-	var children:Null<Array<AbstractTask>>;
-	var cancellationCallbacks:Null<Array<CancellationHandle>>;
-	var state:TaskState;
-	var error:Null<Exception>;
-	var numCompletedChildren:Int;
-	var indexInParent:Int;
-	var allChildrenCompleted:Bool;
-
-	public var id(get, null):Int;
-	public var cancellationException(get, never):Null<CancellationException>;
-
-	inline function get_cancellationException() {
-		return switch state {
-			case Cancelling | Cancelled:
-				error.orCancellationException();
-			case _:
-				null;
-		}
-	}
-
-	public inline function get_id() {
-		return id;
-	}
-
-	/**
-		Creates a new task.
-	**/
-	public function new(parent:Null<AbstractTask>, initialState:TaskState) {
-		id = atomicId.add(1);
-		this.parent = parent;
-		state = Created;
-		children = null;
-		cancellationCallbacks = null;
-		numCompletedChildren = 0;
-		indexInParent = -1;
-		allChildrenCompleted = false;
-		if (parent != null) {
-			parent.addChild(this);
-		}
-		switch (initialState) {
-			case Created:
-			case Running:
-				start();
-			case _:
-				throw new TaskException('Invalid initial state $initialState');
-		}
-	}
-
-	/**
-		Returns the task's error value, if any/
-	**/
-	public function getError() {
-		return error;
-	}
-
-	/**
-		Initiates cancellation of this task and all its children.
-
-		If `cause` is provided, it is set as this task's error value and used to cancel all children.
-
-		If the task cannot be cancelled or has already been cancelled, this function only checks if the
-		task has completed and initiates the appropriate behavior.
-	**/
-	public function cancel(?cause:CancellationException) {
-		switch (state) {
-			case Created | Running | Completing:
-				cause ??= new CancellationException();
-				if (error == null) {
-					error = cause;
-				}
-				state = Cancelling;
-
-				if (null != cancellationCallbacks) {
-					for (h in cancellationCallbacks) {
-						h.run();
-					}
-				}
-
-				cancelChildren(cause);
-				checkCompletion();
-			case _:
-				checkCompletion();
-		}
-	}
-
-	/**
-		Returns `true` if the task is still active. Note that an task that was created but not started yet
-		is considered to be active.
-	**/
-	public function isActive() {
-		return switch (state) {
-			case Completed | Cancelled:
-				false;
-			case _:
-				true;
-		}
-	}
-
-	public function onCancellationRequested(callback:ICancellationCallback):ICancellationHandle {
-		return switch state {
-			case Cancelling | Cancelled:
-				callback.onCancellation(error.orCancellationException());
-
-				return noOpCancellationHandle;
-			case _:
-				final container = cancellationCallbacks ??= [];
-				final handle = new CancellationHandle(callback, this);
-
-				container.push(handle);
-
-				handle;
-		}
-	}
-
-	/**
-		Returns this task's value, if any.
-	**/
-	abstract public function get():Null<T>;
-
-	/**
-		Starts executing this task. Has no effect if the task is already active or has completed.
-	**/
-	public function start() {
-		switch (state) {
-			case Created:
-				state = Running;
-				doStart();
-			case _:
-				return;
-		}
-	}
-
-	public function cancelChildren(?cause:CancellationException) {
-		if (null == children || children.length == 0) {
-			return;
-		}
-
-		cause ??= new CancellationException();
-
-		for (child in children) {
-			if (child != null) {
-				child.cancel(cause);
-			}
-		}
-	}
-
-	final inline function beginCompleting() {
-		state = Completing;
-		startChildren();
-	}
-
-	function startChildren() {
-		if (null == children) {
-			return;
-		}
-
-		for (child in children) {
-			if (child == null) {
-				continue;
-			}
-			switch (child.state) {
-				case Created:
-					child.start();
-				case Cancelled | Completed:
-				case Running | Completing | Cancelling:
-			}
-		}
-	}
-
-	function checkCompletion() {
-		updateChildrenCompletion();
-		if (!allChildrenCompleted) {
-			return;
-		}
-		switch (state) {
-			case Created | Running | Completed | Cancelled:
-				return;
-			case _:
-		}
-		switch (state) {
-			case Completing:
-				state = Completed;
-			case Cancelling:
-				state = Cancelled;
-			case _:
-				throw new TaskException('Invalid state $state in checkCompletion');
-		}
-		complete();
-	}
-
-	function updateChildrenCompletion() {
-		if (allChildrenCompleted) {
-			return;
-		}
-		if (children == null) {
-			allChildrenCompleted = true;
-			childrenCompleted();
-		} else if (numCompletedChildren == children.length) {
-			allChildrenCompleted = true;
-			childrenCompleted();
-		}
-	}
-
-	abstract function doStart():Void;
-
-	abstract function complete():Void;
-
-	abstract function childrenCompleted():Void;
-
-	abstract function childSucceeds(child:AbstractTask):Void;
-
-	abstract function childErrors(child:AbstractTask, cause:Exception):Void;
-
-	abstract function childCancels(child:AbstractTask, cause:CancellationException):Void;
-
-	// called from child
-
-	function childCompletes(child:AbstractTask, processResult:Bool) {
-		numCompletedChildren++;
-		if (processResult) {
-			if (child.error != null) {
-				if (child.error is CancellationException) {
-					childCancels(child, cast child.error);
-				} else {
-					childErrors(child, child.error);
-				}
-			} else {
-				childSucceeds(child);
-			}
-		}
-		updateChildrenCompletion();
-		checkCompletion();
-		if (child.indexInParent >= 0) {
-			children[child.indexInParent] = null;
-		}
-	}
-
-	function addChild(child:AbstractTask) {
-		final container = children ??= [];
-		final index = container.push(child);
-		child.indexInParent = index - 1;
-	}
-}

+ 0 - 221
std/hxcoro/task/CoroBaseTask.hx

@@ -1,221 +0,0 @@
-package hxcoro.task;
-
-import hxcoro.task.CoroTask;
-import hxcoro.task.node.INodeStrategy;
-import hxcoro.task.ICoroTask;
-import hxcoro.task.AbstractTask;
-import hxcoro.task.ICoroNode;
-import haxe.Exception;
-import haxe.exceptions.CancellationException;
-import haxe.coro.IContinuation;
-import haxe.coro.context.Context;
-import haxe.coro.context.Key;
-import haxe.coro.context.IElement;
-import haxe.coro.schedulers.Scheduler;
-import haxe.coro.cancellation.CancellationToken;
-
-private class CoroTaskWith<T> implements ICoroNodeWith {
-	public var context(get, null):Context;
-
-	final task:CoroBaseTask<T>;
-
-	public function new(context:Context, task:CoroBaseTask<T>) {
-		this.context = context;
-		this.task = task;
-	}
-
-	inline function get_context() {
-		return context;
-	}
-
-	public function async<T>(lambda:NodeLambda<T>):ICoroTask<T> {
-		final child = new CoroTaskWithLambda(context, lambda, CoroTask.CoroChildStrategy);
-		context.get(Scheduler).scheduleObject(child);
-		return child;
-	}
-
-	public function lazy<T>(lambda:NodeLambda<T>):IStartableCoroTask<T> {
-		return new StartableCoroTask(context, lambda, CoroTask.CoroChildStrategy);
-	}
-
-	public function with(...elements:IElement<Any>) {
-		return task.with(...elements);
-	}
-}
-
-private class CoroKeys {
-	static public final awaitingChildContinuation = new Key<IContinuation<Any>>("AwaitingChildContinuation");
-}
-
-/**
-	CoroTask provides the basic functionality for coroutine tasks.
-**/
-abstract class CoroBaseTask<T> extends AbstractTask<T> implements ICoroNode implements ICoroTask<T> implements ILocalContext implements IElement<CoroBaseTask<Any>> {
-	/**
-		This task's immutable `Context`.
-	**/
-	public var context(get, null):Context;
-
-	final nodeStrategy:INodeStrategy;
-	var coroLocalContext:Null<AdjustableContext>;
-	var initialContext:Context;
-	var result:Null<T>;
-	var awaitingContinuations:Null<Array<IContinuation<T>>>;
-
-	/**
-		Creates a new task using the provided `context`.
-	**/
-	public function new(context:Context, nodeStrategy:INodeStrategy, initialState:TaskState) {
-		super(context.get(CoroTask), initialState);
-		initialContext = context;
-		this.nodeStrategy = nodeStrategy;
-	}
-
-	inline function get_context() {
-		if (context == null) {
-			context = initialContext.clone().with(this).add(CancellationToken, this);
-		}
-		return context;
-	}
-
-	public function get() {
-		return result;
-	}
-
-	public function getKey() {
-		return CoroTask.key;
-	}
-
-	public function getLocalElement<T>(key:Key<T>):Null<T> {
-		return coroLocalContext?.get(key);
-	}
-
-	public function setLocalElement<T>(key:Key<T>, element:T) {
-		if (coroLocalContext == null) {
-			coroLocalContext = Context.create();
-		}
-		coroLocalContext.add(key, element);
-	}
-
-	/**
-		Indicates that the task has been suspended, which allows it to clean up some of
-		its internal resources. Has no effect on the observable state of the task.
-
-		This function should be called when it is expected that the task might not be resumed
-		for a while, e.g. when waiting on a sparse `Channel` or a contended `Mutex`.
-	**/
-	public function putOnHold() {
-		context = null;
-		if (awaitingContinuations != null && awaitingContinuations.length == 0) {
-			awaitingContinuations = null;
-		}
-		if (cancellationCallbacks != null && cancellationCallbacks.length == 0) {
-			cancellationCallbacks = null;
-		}
-		if (allChildrenCompleted) {
-			children = null;
-		}
-	}
-
-	/**
-		Creates a lazy child task to execute `lambda`. The child task does not execute until its `start`
-		method is called. This occurrs automatically once this task has finished execution.
-	**/
-	public function lazy<T>(lambda:NodeLambda<T>):IStartableCoroTask<T> {
-		return new StartableCoroTask(context, lambda, CoroTask.CoroChildStrategy);
-	}
-
-	/**
-		Creates a child task to execute `lambda` and starts it automatically.
-	**/
-	public function async<T>(lambda:NodeLambda<T>):ICoroTask<T> {
-		final child = new CoroTaskWithLambda<T>(context, lambda, CoroTask.CoroChildStrategy);
-		context.get(Scheduler).scheduleObject(child);
-		return child;
-	}
-
-	/**
-		Returns a copy of this tasks `Context` with `elements` added, which can be used to start child tasks.
-	**/
-	public function with(...elements:IElement<Any>) {
-		return new CoroTaskWith(context.clone().with(...elements), this);
-	}
-
-	/**
-		Resumes `cont` with this task's outcome.
-
-		If this task is no longer active, the continuation is resumed immediately. Otherwise, it is registered
-		to be resumed upon completion.
-
-		This function also starts this task if it has not been started yet.
-	**/
-	public function awaitContinuation(cont:IContinuation<T>) {
-		switch state {
-			case Completed:
-				cont.succeedSync(result);
-			case Cancelled:
-				cont.failSync(error);
-			case _:
-				awaitingContinuations ??= [];
-				awaitingContinuations.push(cont);
-				start();
-		}
-	}
-
-	@:coroutine public function awaitChildren() {
-		if (allChildrenCompleted) {
-			getLocalElement(CoroKeys.awaitingChildContinuation)?.callSync();
-			return;
-		}
-		startChildren();
-		Coro.suspend(cont -> setLocalElement(CoroKeys.awaitingChildContinuation, cont));
-	}
-
-	/**
-		Suspends this task until it completes.
-	**/
-	@:coroutine public function await():T {
-		return Coro.suspend(awaitContinuation);
-	}
-
-	function handleAwaitingContinuations() {
-		if (awaitingContinuations == null) {
-			return;
-		}
-		while (awaitingContinuations.length > 0) {
-			final continuations = awaitingContinuations;
-			awaitingContinuations = [];
-			if (error != null) {
-				for (cont in continuations) {
-					cont.failAsync(error);
-				}
-			} else {
-				for (cont in continuations) {
-					cont.succeedAsync(result);
-				}
-			}
-		}
-	}
-
-	function childrenCompleted() {
-		getLocalElement(CoroKeys.awaitingChildContinuation)?.callSync();
-	}
-
-	// strategy dispatcher
-
-	function complete() {
-		nodeStrategy.complete(this);
-	}
-
-	function childSucceeds(child:AbstractTask) {
-		nodeStrategy.childSucceeds(this, child);
-	}
-
-	function childErrors(child:AbstractTask, cause:Exception) {
-		nodeStrategy.childErrors(this, child, cause);
-	}
-
-	function childCancels(child:AbstractTask, cause:CancellationException) {
-		nodeStrategy.childCancels(this, child, cause);
-	}
-}

+ 0 - 88
std/hxcoro/task/CoroTask.hx

@@ -1,88 +0,0 @@
-package hxcoro.task;
-
-import hxcoro.task.node.CoroChildStrategy;
-import hxcoro.task.node.CoroScopeStrategy;
-import hxcoro.task.node.CoroSupervisorStrategy;
-import hxcoro.task.node.INodeStrategy;
-import hxcoro.task.AbstractTask;
-import haxe.coro.IContinuation;
-import haxe.coro.context.Key;
-import haxe.coro.context.Context;
-import haxe.coro.schedulers.IScheduleObject;
-import haxe.Exception;
-
-class CoroTask<T> extends CoroBaseTask<T> implements IContinuation<T> {
-	public static final key = new Key<CoroBaseTask<Any>>('Task');
-
-	static public final CoroChildStrategy = new CoroChildStrategy();
-	static public final CoroScopeStrategy = new CoroScopeStrategy();
-	static public final CoroSupervisorStrategy = new CoroSupervisorStrategy();
-
-	var wasResumed:Bool;
-
-	public function new(context:Context, nodeStrategy:INodeStrategy, initialState:TaskState = Running) {
-		super(context, nodeStrategy, initialState);
-		wasResumed = true;
-	}
-
-	public function doStart() {
-		wasResumed = false;
-	}
-
-	public function runNodeLambda(lambda:NodeLambda<T>) {
-		final result = lambda(this, this);
-		start();
-		switch result.state {
-			case Pending:
-				return;
-			case Returned:
-				this.succeedSync(result.result);
-			case Thrown:
-				this.failSync(result.error);
-		}
-	}
-
-	/**
-		Resumes the task with the provided `result` and `error`.
-	**/
-	public function resume(result:T, error:Exception) {
-		wasResumed = true;
-		if (error == null) {
-			switch (state) {
-				case Running:
-					this.result = result;
-					beginCompleting();
-				case _:
-			}
-			checkCompletion();
-		} else {
-			if (this.error == null) {
-				this.error = error;
-			}
-			cancel();
-		}
-	}
-
-	override function checkCompletion() {
-		if (!wasResumed) {
-			return;
-		}
-		super.checkCompletion();
-	}
-}
-
-class CoroTaskWithLambda<T> extends CoroTask<T> implements IScheduleObject {
-	final lambda:NodeLambda<T>;
-
-	/**
-		Creates a new task using the provided `context` in order to execute `lambda`.
-	**/
-	public function new(context:Context, lambda:NodeLambda<T>, nodeStrategy:INodeStrategy) {
-		super(context, nodeStrategy);
-		this.lambda = lambda;
-	}
-
-	public function onSchedule() {
-		runNodeLambda(lambda);
-	}
-}

+ 0 - 20
std/hxcoro/task/ICoroNode.hx

@@ -1,20 +0,0 @@
-package hxcoro.task;
-
-import haxe.exceptions.CancellationException;
-import haxe.coro.context.Context;
-import haxe.coro.context.IElement;
-import hxcoro.task.ICoroTask;
-
-interface ICoroNodeWith {
-	var context(get, null):Context;
-	function async<T>(lambda:NodeLambda<T>):ICoroTask<T>;
-	function lazy<T>(lambda:NodeLambda<T>):IStartableCoroTask<T>;
-	function with(...elements:IElement<Any>):ICoroNodeWith;
-}
-
-interface ICoroNode extends ICoroNodeWith extends ILocalContext {
-	var id(get, never):Int;
-	@:coroutine function awaitChildren():Void;
-	function cancel(?cause:CancellationException):Void;
-	function cancelChildren(?cause:CancellationException):Void;
-}

+ 0 - 17
std/hxcoro/task/ICoroTask.hx

@@ -1,17 +0,0 @@
-package hxcoro.task;
-
-import haxe.Exception;
-import haxe.exceptions.CancellationException;
-
-interface ICoroTask<T> extends ILocalContext {
-	var id(get, never):Int;
-	function cancel(?cause:CancellationException):Void;
-	@:coroutine function await():T;
-	function get():T;
-	function getError():Exception;
-	function isActive():Bool;
-}
-
-interface IStartableCoroTask<T> extends ICoroTask<T> {
-	function start():Void;
-}

+ 0 - 8
std/hxcoro/task/ILocalContext.hx

@@ -1,8 +0,0 @@
-package hxcoro.task;
-
-import haxe.coro.context.Key;
-
-interface ILocalContext {
-	function getLocalElement<T>(key:Key<T>):Null<T>;
-	function setLocalElement<T>(key:Key<T>, element:T):Void;
-}

+ 0 - 5
std/hxcoro/task/NodeLambda.hx

@@ -1,5 +0,0 @@
-package hxcoro.task;
-
-import haxe.coro.Coroutine;
-
-typedef NodeLambda<T> = Coroutine<(node:ICoroNode) -> T>;

+ 0 - 25
std/hxcoro/task/StartableCoroTask.hx

@@ -1,25 +0,0 @@
-package hxcoro.task;
-
-import hxcoro.task.node.INodeStrategy;
-import hxcoro.task.ICoroTask;
-import haxe.coro.context.Context;
-
-class StartableCoroTask<T> extends CoroTask<T> implements IStartableCoroTask<T> {
-	final lambda:NodeLambda<T>;
-
-	/**
-		Creates a new task using the provided `context` in order to execute `lambda`.
-	**/
-	public function new(context:Context, lambda:NodeLambda<T>, nodeStrategy:INodeStrategy) {
-		super(context, nodeStrategy, Created);
-		this.lambda = lambda;
-	}
-
-	/**
-		Starts executing this task's `lambda`. Has no effect if the task is already active or has completed.
-	**/
-	override public function doStart() {
-		super.doStart();
-		runNodeLambda(lambda);
-	}
-}

+ 0 - 35
std/hxcoro/task/node/CoroChildStrategy.hx

@@ -1,35 +0,0 @@
-package hxcoro.task.node;
-
-import haxe.Exception;
-import haxe.exceptions.CancellationException;
-
-@:access(hxcoro.task.AbstractTask)
-@:access(hxcoro.task.CoroTask)
-class CoroChildStrategy implements INodeStrategy {
-	public function new() {}
-
-	public function complete<T>(task:CoroBaseTask<T>) {
-		task.parent?.childCompletes(task, true);
-		task.handleAwaitingContinuations();
-	}
-
-	public function childSucceeds<T>(task:CoroBaseTask<T>, child:AbstractTask) {}
-
-	public function childErrors<T>(task:CoroBaseTask<T>, child:AbstractTask, cause:Exception) {
-		switch (task.state) {
-			case Created | Running | Completing:
-				// inherit child error
-				if (task.error == null) {
-					task.error = cause;
-				}
-				task.cancel();
-			case Cancelling:
-				// not sure about this one, what if we cancel normally and then get a real exception?
-			case Completed | Cancelled:
-		}
-	}
-
-	public function childCancels<T>(task:CoroBaseTask<T>, child:AbstractTask, cause:CancellationException) {
-		task.cancel(cause);
-	}
-}

+ 0 - 33
std/hxcoro/task/node/CoroScopeStrategy.hx

@@ -1,33 +0,0 @@
-package hxcoro.task.node;
-
-import haxe.Exception;
-import haxe.exceptions.CancellationException;
-
-@:access(hxcoro.task.AbstractTask)
-@:access(hxcoro.task.CoroTask)
-class CoroScopeStrategy implements INodeStrategy {
-	public function new() {}
-
-	public function complete<T>(task:CoroBaseTask<T>) {
-		task.parent?.childCompletes(task, false);
-		task.handleAwaitingContinuations();
-	}
-
-	public function childSucceeds<T>(task:CoroBaseTask<T>, child:AbstractTask) {}
-
-	public function childErrors<T>(task:CoroBaseTask<T>, child:AbstractTask, cause:Exception) {
-		switch (task.state) {
-			case Created | Running | Completing:
-				// inherit child error
-				if (task.error == null) {
-					task.error = cause;
-				}
-				task.cancel();
-			case Cancelling:
-				// not sure about this one, what if we cancel normally and then get a real exception?
-			case Completed | Cancelled:
-		}
-	}
-
-	public function childCancels<T>(task:CoroBaseTask<T>, child:AbstractTask, cause:CancellationException) {}
-}

+ 0 - 21
std/hxcoro/task/node/CoroSupervisorStrategy.hx

@@ -1,21 +0,0 @@
-package hxcoro.task.node;
-
-import haxe.Exception;
-import haxe.exceptions.CancellationException;
-
-@:access(hxcoro.task.AbstractTask)
-@:access(hxcoro.task.CoroBaseTask)
-class CoroSupervisorStrategy implements INodeStrategy {
-	public function new() {}
-
-	public function complete<T>(task:CoroBaseTask<T>) {
-		task.parent?.childCompletes(task, false);
-		task.handleAwaitingContinuations();
-	}
-
-	public function childSucceeds<T>(task:CoroBaseTask<T>, child:AbstractTask) {}
-
-	public function childErrors<T>(task:CoroBaseTask<T>, child:AbstractTask, cause:Exception) {}
-
-	public function childCancels<T>(task:CoroBaseTask<T>, child:AbstractTask, cause:CancellationException) {}
-}

+ 0 - 13
std/hxcoro/task/node/INodeStrategy.hx

@@ -1,13 +0,0 @@
-package hxcoro.task.node;
-
-import haxe.Exception;
-import haxe.exceptions.CancellationException;
-import hxcoro.task.AbstractTask;
-import hxcoro.task.CoroTask;
-
-interface INodeStrategy {
-	function complete<T>(task:CoroBaseTask<T>):Void;
-	function childSucceeds<T>(task:CoroBaseTask<T>, child:AbstractTask):Void;
-	function childErrors<T>(task:CoroBaseTask<T>, child:AbstractTask, cause:Exception):Void;
-	function childCancels<T>(task:CoroBaseTask<T>, child:AbstractTask, cause:CancellationException):Void;
-}

+ 0 - 81
std/hxcoro/util/Convenience.hx

@@ -1,81 +0,0 @@
-package hxcoro.util;
-
-import haxe.coro.cancellation.ICancellationToken;
-import haxe.exceptions.CancellationException;
-import haxe.coro.schedulers.Scheduler;
-import haxe.Exception;
-import haxe.coro.IContinuation;
-
-/**
-	A set of convenience functions for working with hxcoro data.
-**/
-class Convenience {
-	/**
-		Resumes `cont` with `result` immediately.
-	**/
-	static public inline function succeedSync<T>(cont:IContinuation<T>, result:T) {
-		cont.resume(result, null);
-	}
-
-	/**
-		Resumes `cont` with exception `error` immediately.
-	**/
-	static public inline function failSync<T>(cont:IContinuation<T>, error:Exception) {
-		cont.resume(null, error);
-	}
-
-	/**
-		Schedules `cont` to be resumed with `result`.
-
-		Scheduled functions do not increase the call stack and might be executed in a different
-		thread if the current dispatcher allows that.
-	**/
-	static public inline function succeedAsync<T>(cont:IContinuation<T>, result:T) {
-		resumeAsync(cont, result, null);
-	}
-
-	/**
-		Schedules `cont` to be resumed with exception `error`.
-
-		Scheduled functions do not increase the call stack and might be executed in a different
-		thread if the current dispatcher allows that.
-	**/
-	static public inline function failAsync<T>(cont:IContinuation<T>, error:Exception) {
-		resumeAsync(cont, null, error);
-	}
-
-	/**
-		Calls `cont` without any values immediately.
-	**/
-	static public inline function callSync<T>(cont:IContinuation<T>) {
-		cont.resume(null, null);
-	}
-
-	/**
-		Schedules `cont` to be resumed without any values.
-
-		Scheduled functions do not increase the call stack and might be executed in a different
-		thread if the current dispatcher allows that.
-	**/
-	static public inline function callAsync<T>(cont:IContinuation<T>) {
-		resumeAsync(cont, null, null);
-	}
-
-	/**
-		Schedules `cont` to be resumed with result `result` and exception `error`.
-
-		Scheduled functions do not increase the call stack and might be executed in a different
-		thread if the current dispatcher allows that.
-	**/
-	static public inline function resumeAsync<T>(cont:IContinuation<T>, result:T, error:Exception) {
-		cont.context.get(Scheduler).schedule(0, () -> cont.resume(result, error));
-	}
-
-	static public inline function orCancellationException(exc:Exception):CancellationException {
-		return exc is CancellationException ? cast exc : new CancellationException();
-	}
-
-	static public inline function isCancellationRequested(ct:ICancellationToken) {
-		return ct.cancellationException != null;
-	}
-}

+ 12 - 1
tests/runci/targets/Macro.hx

@@ -50,16 +50,27 @@ class Macro {
 
 
 		deleteDirectoryRecursively(partyDir);
 		deleteDirectoryRecursively(partyDir);
 		runCommand("mkdir", [partyDir]);
 		runCommand("mkdir", [partyDir]);
-		changeDirectory(partyDir);
 		party();
 		party();
 	}
 	}
 
 
 	static function party() {
 	static function party() {
+		changeDirectory(partyDir);
 		runCommand("git", ["clone", "https://github.com/haxetink/tink_core", "tink_core"]);
 		runCommand("git", ["clone", "https://github.com/haxetink/tink_core", "tink_core"]);
 		changeDirectory("tink_core");
 		changeDirectory("tink_core");
 		runCommand("haxelib", ["newrepo"]);
 		runCommand("haxelib", ["newrepo"]);
 		runCommand("haxelib", ["install", "tests.hxml", "--always"]);
 		runCommand("haxelib", ["install", "tests.hxml", "--always"]);
 		runCommand("haxelib", ["dev", "tink_core", "."]);
 		runCommand("haxelib", ["dev", "tink_core", "."]);
 		runCommand("haxe", ["tests.hxml", "-w", "-WDeprecated", "--interp", "--macro", "addMetadata('@:exclude','Futures','testDelay')"]);
 		runCommand("haxe", ["tests.hxml", "-w", "-WDeprecated", "--interp", "--macro", "addMetadata('@:exclude','Futures','testDelay')"]);
+
+		changeDirectory(partyDir);
+		runCommand("git", ["clone", "https://github.com/HaxeFoundation/hxcoro", "hxcoro"]);
+		changeDirectory("hxcoro");
+		runCommand("git", ["checkout", "setup-haxelib"]); // TODO: remove once merged
+		runCommand("haxelib", ["newrepo"]);
+		runCommand("haxelib", ["git", "utest", "https://github.com/Aidan63/utest.git", "coro"]);
+		runCommand("haxelib", ["dev", "hxcoro", "."]);
+		runCommand("haxe", ["--cwd", "tests", "build-eval.hxml"]);
+		runCommand("haxe", ["--cwd", "tests", "build-eval.hxml", "--hxb", "bin/test.hxb"]);
+		runCommand("haxe", ["--cwd", "tests", "build-eval.hxml", "--hxb-lib", "bin/test.hxb"]);
 	}
 	}
 }
 }