Przeglądaj źródła

[threads] changed haxe.EntryPoint and MainLoop to be proxies for sys.thread.EventLoop (fixes #9901)

Aleksandr Kuzmenko 4 lat temu
rodzic
commit
3070d07a0a

+ 7 - 3
src/typing/finalization.ml

@@ -55,17 +55,21 @@ let get_main ctx types =
 		in
 		(* add haxe.EntryPoint.run() call *)
 		let add_entry_point_run main =
-			(try
+			try
 				main :: [call_static (["haxe"],"EntryPoint") "run"]
 			with Not_found ->
 				[main]
-			)
+		and add_entry_point_init main =
+			try
+				main :: [call_static (["haxe"],"EntryPoint") "init"]
+			with Not_found ->
+				[main]
 		in
 		(* add calls for event loop *)
 		let add_event_loop main =
 			(try
 				let thread = (["sys";"thread";"_Thread"],"Thread_Impl_") in
-				call_static thread "initEventLoop" :: add_entry_point_run main @ [call_static thread "processEvents"]
+				call_static thread "initEventLoop" :: add_entry_point_init main @ [call_static thread "processEvents"]
 			with Not_found ->
 				[main]
 			)

+ 29 - 19
std/haxe/EntryPoint.hx

@@ -34,8 +34,15 @@ private class Thread {
 **/
 class EntryPoint {
 	#if sys
-	static var sleepLock = new Lock();
-	static var mutex = new Mutex();
+		static var mutex = new Mutex();
+		#if (target.threaded && !cppia)
+			static var mainThread:Thread;
+			@:keep static function init() {
+				mainThread = Thread.current();
+			}
+		#else
+			static var sleepLock = new Lock();
+		#end
 	#end
 	static var pending = new Array<Void->Void>();
 	public static var threadCount(default, null):Int = 0;
@@ -44,17 +51,21 @@ class EntryPoint {
 		Wakeup a sleeping `run()`
 	**/
 	public static function wakeup() {
-		#if sys
+		#if (sys && !(target.threaded && !cppia))
 		sleepLock.release();
 		#end
 	}
 
 	public static function runInMainThread(f:Void->Void) {
 		#if sys
-		mutex.acquire();
-		pending.push(f);
-		mutex.release();
-		wakeup();
+			#if (target.threaded && !cppia)
+				mainThread.events.run(f);
+			#else
+				mutex.acquire();
+				pending.push(f);
+				mutex.release();
+				wakeup();
+			#end
 		#else
 		pending.push(f);
 		#end
@@ -65,6 +76,9 @@ class EntryPoint {
 		mutex.acquire();
 		threadCount++;
 		mutex.release();
+		#if (target.threaded && !cppia)
+			mainThread.events.promise();
+		#end
 		Thread.create(function() {
 			f();
 			mutex.acquire();
@@ -72,6 +86,9 @@ class EntryPoint {
 			if (threadCount == 0)
 				wakeup();
 			mutex.release();
+			#if (target.threaded && !cppia)
+				mainThread.events.runPromised(() -> {});
+			#end
 		});
 		#else
 		threadCount++;
@@ -83,6 +100,9 @@ class EntryPoint {
 	}
 
 	static function processEvents():Float {
+		#if (target.threaded && !cppia)
+		return -1;
+		#else
 		// flush all pending calls
 		while (true) {
 			#if sys
@@ -100,6 +120,7 @@ class EntryPoint {
 		if (!MainLoop.hasEvents() && threadCount == 0)
 			return -1;
 		return time;
+		#end
 	}
 
 	/**
@@ -131,18 +152,7 @@ class EntryPoint {
 		#elseif flash
 		flash.Lib.current.stage.addEventListener(flash.events.Event.ENTER_FRAME, function(_) processEvents());
 		#elseif (target.threaded && !cppia)
-		var mainThread = Thread.current();
-		var handler = null;
-		function progress() {
-			if(handler != null) {
-				mainThread.events.cancel(handler);
-			}
-			var nextTick = processEvents();
-			if (nextTick > 0) {
-				handler = mainThread.events.repeat(progress, Std.int(nextTick * 1000.0));
-			}
-		}
-		progress();
+		//everything is delegated to sys.thread.EventLoop
 		#elseif sys
 		while (true) {
 			var nextTick = processEvents();

+ 32 - 0
std/haxe/MainLoop.hx

@@ -1,6 +1,10 @@
 package haxe;
 
 import haxe.EntryPoint;
+#if (target.threaded && !cppia)
+import sys.thread.EventLoop;
+import sys.thread.Thread;
+#end
 
 class MainEvent {
 	var f:Void->Void;
@@ -56,6 +60,14 @@ class MainEvent {
 
 @:access(haxe.MainEvent)
 class MainLoop {
+	#if (target.threaded && !cppia)
+	static var eventLoopHandler:Null<EventHandler>;
+	static var mutex = new sys.thread.Mutex();
+	static var mainThread(get,never):Thread;
+	static inline function get_mainThread():Thread
+		return @:privateAccess EntryPoint.mainThread;
+	#end
+
 	static var pending:MainEvent;
 
 	public static var threadCount(get, never):Int;
@@ -93,9 +105,29 @@ class MainLoop {
 			head.prev = e;
 		e.next = head;
 		pending = e;
+		injectIntoEventLoop(0);
 		return e;
 	}
 
+	static function injectIntoEventLoop(waitMs:Int) {
+		#if (target.threaded && !cppia)
+			mutex.acquire();
+			if(eventLoopHandler != null) {
+				mainThread.events.cancel(eventLoopHandler);
+			}
+			eventLoopHandler = mainThread.events.repeat(
+				() -> {
+					var wait = tick();
+					if(hasEvents()) {
+						injectIntoEventLoop(Std.int(wait * 1000));
+					}
+				},
+				waitMs
+			);
+			mutex.release();
+		#end
+	}
+
 	static function sortEvents() {
 		// pending = haxe.ds.ListSort.sort(pending, function(e1, e2) return e1.nextRun > e2.nextRun ? -1 : 1);
 		// we can't use directly ListSort because it requires prev/next to be public, which we don't want here

+ 22 - 0
tests/eventLoop/src/cases/TestMainLoop.hx

@@ -0,0 +1,22 @@
+package cases;
+
+import haxe.MainLoop;
+
+@:timeout(10000)
+@:depends(cases.TestEvents)
+class TestMainLoop extends utest.Test {
+	function testNewAction_immediately(async:Async) {
+		var e1:MainEvent = null;
+		e1 = MainLoop.add(() -> {
+			e1.stop();
+			var e2:MainEvent = null;
+			e2 = MainLoop.add(() -> {
+				e2.stop();
+				pass();
+				async.done();
+			});
+			e2.delay(0);
+		});
+		e1.delay(0);
+	}
+}