Pārlūkot izejas kodu

[eval] fix event loop (#11028)

* [eval] port MainLoop changes to eval impl

* [eval] fix nullcheck

* [eval] add doc from core MainLoop for clarity

* [tests] add hl and eval targets to eventLoop tests

* Revert "[tests] disable for now"

This reverts commit fa582e38663b9d974d6def72db1a5dc1c49eed37.

* [tests] sleep at the end shouldn't be needed

* [tests] update eventLoop tests, make it actually run for hl, cpp, eval

Note that the output is still not checked for cpp and hl

* [eval] cleanup event loop start

* [tests]  add test for 10832

* [tests] add expected test result
Rudy Ges 2 gadi atpakaļ
vecāks
revīzija
9717436a87

+ 83 - 17
std/eval/_std/sys/thread/EventLoop.hx

@@ -3,12 +3,23 @@ package sys.thread;
 import eval.luv.Loop;
 import eval.luv.Async;
 import eval.luv.Timer as LuvTimer;
+import haxe.MainLoop;
 
+/**
+	When an event loop has an available event to execute.
+**/
 @:coreApi
 enum NextEventTime {
+	/** There's already an event waiting to be executed */
 	Now;
+	/** No new events are expected. */
 	Never;
+	/**
+		An event is expected to arrive at any time.
+		If `time` is specified, then the event will be ready at that time for sure.
+	*/
 	AnyTime(time:Null<Float>);
+	/** An event is expected to be ready for execution at `time`. */
 	At(time:Float);
 }
 
@@ -27,37 +38,47 @@ private class RegularEvent {
 	}
 }
 
+/**
+	An event loop implementation used for `sys.thread.Thread`
+**/
 @:coreApi
 class EventLoop {
 	@:allow(eval.luv.Loop)
 	final handle:Loop;
 
 	final mutex = new Mutex();
-	final oneTimeEvents = new Array<Null<()->Void>>();
-	var oneTimeEventsIdx = 0;
 	final wakeup:Async;
 	var promisedEventsCount = 0;
 	var pending:Array<()->Void> = [];
-	var looping = false;
+	var started:Bool = false;
+
+	var isMainThread:Bool;
+	static var CREATED : Bool;
 
 	public function new():Void {
+		isMainThread = !CREATED;
+		CREATED = true;
 		handle = Loop.init().resolve();
 		wakeup = Async.init(handle, consumePending).resolve();
 		wakeup.unref();
 	}
 
+	/**
+		Schedule event for execution every `intervalMs` milliseconds in current loop.
+	**/
 	public function repeat(event:()->Void, intervalMs:Int):EventHandler {
 		var e = new RegularEvent(event);
 		mutex.acquire();
-		pending.push(() -> {
-			e.timer = LuvTimer.init(handle).resolve();
-			e.timer.start(e.run, intervalMs, intervalMs < 1 ? 1 : intervalMs).resolve();
-		});
+		e.timer = LuvTimer.init(handle).resolve();
+		e.timer.start(e.run, intervalMs, intervalMs < 1 ? 1 : intervalMs).resolve();
 		mutex.release();
 		wakeup.send();
 		return e;
 	}
 
+	/**
+		Prevent execution of a previously scheduled event in current loop.
+	**/
 	public function cancel(eventHandler:EventHandler):Void {
 		mutex.acquire();
 		(eventHandler:RegularEvent).event = noop;
@@ -71,6 +92,11 @@ class EventLoop {
 	}
 	static final noop = function() {}
 
+	/**
+		Notify this loop about an upcoming event.
+		This makes the thread stay alive and wait for as many events as the number of
+		times `.promise()` was called. These events should be added via `.runPromised()`.
+	**/
 	public function promise():Void {
 		mutex.acquire();
 		++promisedEventsCount;
@@ -79,6 +105,9 @@ class EventLoop {
 		wakeup.send();
 	}
 
+	/**
+		Execute `event` as soon as possible.
+	**/
 	public function run(event:()->Void):Void {
 		mutex.acquire();
 		pending.push(event);
@@ -86,6 +115,9 @@ class EventLoop {
 		wakeup.send();
 	}
 
+	/**
+		Add previously promised `event` for execution.
+	**/
 	public function runPromised(event:()->Void):Void {
 		mutex.acquire();
 		--promisedEventsCount;
@@ -96,7 +128,7 @@ class EventLoop {
 	}
 
 	function refUnref():Void {
-		if(promisedEventsCount > 0) {
+		if (promisedEventsCount > 0 || (isMainThread && haxe.MainLoop.hasEvents())) {
 			wakeup.ref();
 		} else {
 			wakeup.unref();
@@ -104,37 +136,71 @@ class EventLoop {
 	}
 
 	public function progress():NextEventTime {
-		//TODO: throw if loop is already running
-		if((handle:Loop).run(NOWAIT)) {
+		if (started) throw "Event loop already started";
+
+		if (handle.run(NOWAIT)) {
 			return AnyTime(null);
 		} else {
 			return Never;
 		}
 	}
 
+	/**
+		Blocks until a new event is added or `timeout` (in seconds) to expires.
+
+		Depending on a target platform this method may also automatically execute arriving
+		events while waiting. However if any event is executed it will stop waiting.
+
+		Returns `true` if more events are expected.
+		Returns `false` if no more events expected.
+
+		Depending on a target platform this method may be non-reentrant. It must
+		not be called from event callbacks.
+	**/
 	public function wait(?timeout:Float):Bool {
-		//TODO: throw if loop is already running
-		if(timeout == null) {
+		if (started) throw "Event loop already started";
+
+		if(timeout != null) {
 			var timer = LuvTimer.init(handle).resolve();
 			timer.start(() -> {
 				timer.stop().resolve();
 				timer.close(() -> {});
 			}, Std.int(timeout * 1000));
-			return (handle:Loop).run(ONCE);
+			return handle.run(ONCE);
 		} else {
-			return (handle:Loop).run(ONCE);
+			return handle.run(ONCE);
 		}
 	}
 
+	/**
+		Execute all pending events.
+		Wait and execute as many events as the number of times `promise()` was called.
+		Runs until all repeating events are cancelled and no more events are expected.
+
+		Depending on a target platform this method may be non-reentrant. It must
+		not be called from event callbacks.
+	**/
 	public function loop():Void {
-		//TODO: throw if loop is already running
+		if (started) throw "Event loop already started";
+		started = true;
 		consumePending();
-		(handle:Loop).run(DEFAULT);
+		handle.run(DEFAULT);
 	}
 
 	function consumePending(?_:Async):Void {
+		mutex.acquire();
 		var p = pending;
 		pending = [];
+		mutex.release();
 		for(fn in p) fn();
+
+		if (started && isMainThread) {
+			var next = @:privateAccess MainLoop.tick();
+			if (haxe.MainLoop.hasEvents()) {
+				wakeup.send();
+			} else {
+				refUnref();
+			}
+		}
 	}
-}
+}

+ 1 - 4
tests/misc/eventLoop/Main.hx

@@ -48,10 +48,7 @@ class Main {
 				trace(String.fromCharCode("A".code + count++));
 				if( count == 5 ) event.stop();
 			});
-			#if sys
-			Sys.sleep(3);
-			#end
 		});
 	}
 
-}
+}

+ 3 - 0
tests/misc/eventLoop/build-cpp.hxml

@@ -0,0 +1,3 @@
+--main Main
+--dce full
+-cpp cpp

+ 23 - 0
tests/misc/eventLoop/build-cpp.hxml.stdout.disabled

@@ -0,0 +1,23 @@
+Main.hx:18: BEFORE1
+Main.hx:35: BEFORE2
+Main.hx:26: T0
+Main.hx:48: A
+Main.hx:7: 0
+Main.hx:48: B
+Main.hx:7: 1
+Main.hx:48: C
+Main.hx:7: 2
+Main.hx:48: D
+Main.hx:7: 3
+Main.hx:48: E
+Main.hx:7: 4
+Main.hx:7: 5
+Main.hx:7: 6
+Main.hx:7: 7
+Main.hx:7: 8
+Main.hx:7: 9
+Main.hx:10: false
+Main.hx:26: T1
+Main.hx:26: T2
+Main.hx:26: T3
+Main.hx:26: T4

+ 3 - 0
tests/misc/eventLoop/build-hl.hxml

@@ -0,0 +1,3 @@
+--main Main
+--dce full
+-hl eventLoop.hl

+ 23 - 0
tests/misc/eventLoop/build-hl.hxml.stdout.disabled

@@ -0,0 +1,23 @@
+Main.hx:18: BEFORE1
+Main.hx:35: BEFORE2
+Main.hx:26: T0
+Main.hx:48: A
+Main.hx:7: 0
+Main.hx:48: B
+Main.hx:7: 1
+Main.hx:48: C
+Main.hx:7: 2
+Main.hx:48: D
+Main.hx:7: 3
+Main.hx:48: E
+Main.hx:7: 4
+Main.hx:7: 5
+Main.hx:7: 6
+Main.hx:7: 7
+Main.hx:7: 8
+Main.hx:7: 9
+Main.hx:10: false
+Main.hx:26: T1
+Main.hx:26: T2
+Main.hx:26: T3
+Main.hx:26: T4

+ 21 - 0
tests/misc/projects/Issue10832/Main.hx

@@ -0,0 +1,21 @@
+import haxe.MainLoop;
+
+class Main {
+	static function main() {
+		trace('main');
+		var event = null;
+		var done 	= false;
+		event = MainLoop.add(
+			() -> {
+				if(!done){
+					done = true;
+					trace('ok');
+				}
+				if(event !=null){
+					trace("stop");
+					event.stop();
+				}
+			}
+		);
+	}
+}

+ 2 - 0
tests/misc/projects/Issue10832/compile.hxml

@@ -0,0 +1,2 @@
+--main Main
+--interp

+ 3 - 0
tests/misc/projects/Issue10832/compile.hxml.stdout

@@ -0,0 +1,3 @@
+Main.hx:5: main
+Main.hx:12: ok
+Main.hx:15: stop

+ 54 - 0
tests/misc/projects/eventLoop/Main.hx

@@ -0,0 +1,54 @@
+class Main {
+
+	static function main() {
+		var event : haxe.MainLoop.MainEvent = null;
+		var count = 0;
+		event = haxe.MainLoop.add(function() {
+			trace(count++);
+			if( count == 10 ) {
+				event.stop();
+				trace(haxe.MainLoop.hasEvents());
+			}
+		});
+
+		#if sys
+		// if we don't use native timer, this should execute before
+		var t = new haxe.Timer(100);
+		t.run = function() {
+			trace("BEFORE1");
+			t.stop();
+		};
+		#end
+
+		var t = new haxe.Timer(200);
+		var count = 0;
+		t.run = function() {
+			trace("T" + count++);
+			if( count == 5 )
+				t.stop();
+		};
+
+		#if sys
+		// if we don't use native timer, this should execute before
+		var t = new haxe.Timer(100);
+		t.run = function() {
+			trace("BEFORE2");
+			t.stop();
+		};
+		#end
+
+		#if sys
+		Sys.sleep(0.3);
+		#end
+
+		haxe.MainLoop.addThread(function() {
+			var event : haxe.MainLoop.MainEvent = null;
+			var count = 0;
+			event = haxe.MainLoop.add(function() {
+				trace(String.fromCharCode("A".code + count++));
+				if( count == 5 ) event.stop();
+			});
+		});
+	}
+
+}

+ 3 - 0
tests/misc/projects/eventLoop/compile.hxml

@@ -0,0 +1,3 @@
+--main Main
+--dce full
+--interp

+ 23 - 0
tests/misc/projects/eventLoop/compile.hxml.stdout.disabled

@@ -0,0 +1,23 @@
+Main.hx:18: BEFORE1
+Main.hx:35: BEFORE2
+Main.hx:26: T0
+Main.hx:48: A
+Main.hx:7: 0
+Main.hx:48: B
+Main.hx:7: 1
+Main.hx:48: C
+Main.hx:7: 2
+Main.hx:48: D
+Main.hx:7: 3
+Main.hx:48: E
+Main.hx:7: 4
+Main.hx:7: 5
+Main.hx:7: 6
+Main.hx:7: 7
+Main.hx:7: 8
+Main.hx:7: 9
+Main.hx:10: false
+Main.hx:26: T1
+Main.hx:26: T2
+Main.hx:26: T3
+Main.hx:26: T4

+ 5 - 0
tests/runci/targets/Cpp.hx

@@ -83,6 +83,11 @@ class Cpp {
 			runCpp("export/cpp/Main");
 		}
 
+		changeDirectory(getMiscSubDir("eventLoop"));
+		runCommand("haxe", ["build-cpp.hxml"]);
+		// TODO: check output like misc tests do
+		runCpp("cpp/Main");
+
 		// if (Sys.systemName() == "Mac")
 		// {
 		// 	changeDirectory(getMiscSubDir("cppObjc"));

+ 5 - 0
tests/runci/targets/Hl.hx

@@ -91,6 +91,11 @@ class Hl {
 		runCommand("haxe", ["compile-hl.hxml"].concat(args));
 		runSysTest(hlBinary, ["bin/hl/sys.hl"]);
 
+		changeDirectory(getMiscSubDir("eventLoop"));
+		runCommand("haxe", ["build-hl.hxml"]);
+		// TODO: check output like misc tests do
+		runCommand(hlBinary, ["eventLoop.hl"]);
+
 		changeDirectory(miscHlDir);
 		runCommand("haxe", ["run.hxml"]);
 	}

+ 32 - 32
tests/threads/src/cases/TestMainLoop.hx

@@ -11,37 +11,37 @@ class TestMainLoop extends utest.Test {
 	});
 	static var staticInit = false;
 
-	// function testWorksInStaticInits_issue10114(async:Async) {
-	// 	var mainThread = Thread.current();
-	// 	var checkAttempts = 3;
-	// 	function check() {
-	// 		checkAttempts++;
-	// 		if(staticInit) {
-	// 			pass();
-	// 			async.done();
-	// 		} else if(checkAttempts > 0) {
-	// 			checkAttempts--;
-	// 			mainThread.events.run(check);
-	// 		} else {
-	// 			fail();
-	// 			async.done();
-	// 		}
-	// 	}
-	// 	mainThread.events.run(check);
-	// }
+	function testWorksInStaticInits_issue10114(async:Async) {
+		var mainThread = Thread.current();
+		var checkAttempts = 3;
+		function check() {
+			checkAttempts++;
+			if(staticInit) {
+				pass();
+				async.done();
+			} else if(checkAttempts > 0) {
+				checkAttempts--;
+				mainThread.events.run(check);
+			} else {
+				fail();
+				async.done();
+			}
+		}
+		mainThread.events.run(check);
+	}
 
-	// function testNewAction_immediately(async:Async) {
-	// 	var e1:MainEvent = null;
-	// 	e1 = MainLoop.add(() -> {
-	// 		e1.stop();
-	// 		var e2:MainEvent = null;
-	// 		e2 = MainLoop.add(() -> {
-	// 			e2.stop();
-	// 			pass();
-	// 			async.done();
-	// 		});
-	// 		e2.delay(0);
-	// 	});
-	// 	e1.delay(0);
-	// }
+	function testNewAction_immediately(async:Async) {
+		var e1:MainEvent = null;
+		e1 = MainLoop.add(() -> {
+			e1.stop();
+			var e2:MainEvent = null;
+			e2 = MainLoop.add(() -> {
+				e2.stop();
+				pass();
+				async.done();
+			});
+			e2.delay(0);
+		});
+		e1.delay(0);
+	}
 }