Pārlūkot izejas kodu

More thread fixes (#8009)

* [java] use LinkedBlockingDeque instead of waneck-Deque

closes #7996

* [std] support all threaded targets in haxe.EntryPoint

except HL because it has no Lock

* [eval] catch `Sys_exit` from threads

* [hl] add Deque-based Lock implementation

* [tests] activate thread tests again, but give them a timeout

* [tests] move all threads tests to tests/threads

* [eval] don't ignore Sys.exit

* [eval] check if yielding before delaying does anything

* [tests] neko is too slow

* [tests] don't allow lock to time-out

* [tests] allow timeout on Deque test

* [tests] don't run timing-related thread tests, they only pass on Java

* [tests] don't run threads test on eval for now

There seem to be issues with the scheduler which cause the tests to time-out.

* [tests] don't run threads tests on neko either
Simon Krajewski 6 gadi atpakaļ
vecāks
revīzija
b50279d858

+ 12 - 2
src/macro/eval/evalStdLib.ml

@@ -1440,7 +1440,8 @@ module StdLock = struct
 		| None ->
 			begin match timeout with
 				| VNull ->
-					Option.get (Deque.pop lock.ldeque true)
+					ignore(Deque.pop lock.ldeque true);
+					vtrue
 				| _ ->
 					let target_time = (Sys.time()) +. num timeout in
 					loop target_time
@@ -2597,7 +2598,13 @@ module StdSys = struct
 
 	let setTimeLocale = vfun1 (fun _ -> vfalse)
 
-	let sleep = vfun1 (fun f -> Thread.delay (num f); vnull)
+	let sleep = vfun1 (fun f ->
+		let time = Sys.time() in
+		Thread.yield();
+		let diff = Sys.time() -. time in
+		Thread.delay ((num f) -. diff);
+		vnull
+	)
 
 	let stderr = vfun0 (fun () ->
 		encode_instance key_sys_io_FileOutput ~kind:(IOutChannel stderr)
@@ -3200,6 +3207,9 @@ let init_constructors builtins =
 						let msg = get_exc_error_message ctx v stack p in
 						prerr_endline msg;
 						close();
+					| Sys_exit i ->
+						close();
+						exit i;
 					| exc ->
 						close();
 						raise exc

+ 35 - 48
std/haxe/EntryPoint.hx

@@ -1,36 +1,28 @@
 package haxe;
 
-#if (neko && !macro && !interp)
+#if (target.threaded)
 import sys.thread.Lock;
 import sys.thread.Mutex;
 import sys.thread.Thread;
-#elseif cpp
-import sys.thread.Lock;
-import sys.thread.Mutex;
-import sys.thread.Thread;
-#elseif java
-import java.vm.Lock;
-import java.vm.Mutex;
-import java.vm.Thread;
 #elseif sys
 private class Lock {
-	public function new() {
-	}
-	public inline function release() {
-	}
-	public inline function wait( ?t : Float ) {
-	}
+	public function new() {}
+
+	public inline function release() {}
+
+	public inline function wait(?t:Float) {}
 }
+
 private class Mutex {
-	public function new() {
-	}
-	public inline function acquire() {
-	}
-	public inline function release() {
-	}
+	public function new() {}
+
+	public inline function acquire() {}
+
+	public inline function release() {}
 }
+
 private class Thread {
-	public static function create( f : Void -> Void ) {
+	public static function create(f:Void->Void) {
 		f();
 	}
 }
@@ -41,14 +33,12 @@ private class Thread {
 	This class can be redefined by custom frameworks so they can handle their own main loop logic.
 **/
 class EntryPoint {
-
 	#if sys
 	static var sleepLock = new Lock();
 	static var mutex = new Mutex();
 	#end
 	static var pending = new Array<Void->Void>();
-
-	public static var threadCount(default,null) : Int = 0;
+	public static var threadCount(default, null):Int = 0;
 
 	/**
 		Wakeup a sleeping run()
@@ -59,7 +49,7 @@ class EntryPoint {
 		#end
 	}
 
-	public static function runInMainThread( f : Void -> Void ) {
+	public static function runInMainThread(f:Void->Void) {
 		#if sys
 		mutex.acquire();
 		pending.push(f);
@@ -70,7 +60,7 @@ class EntryPoint {
 		#end
 	}
 
-	public static function addThread( f : Void -> Void ) {
+	public static function addThread(f:Void->Void) {
 		#if sys
 		mutex.acquire();
 		threadCount++;
@@ -79,18 +69,22 @@ class EntryPoint {
 			f();
 			mutex.acquire();
 			threadCount--;
-			if( threadCount == 0 ) wakeup();
+			if (threadCount == 0)
+				wakeup();
 			mutex.release();
 		});
 		#else
 		threadCount++;
-		pending.push(function() { f(); threadCount--; } );
+		pending.push(function() {
+			f();
+			threadCount--;
+		});
 		#end
 	}
 
-	static function processEvents() : Float {
+	static function processEvents():Float {
 		// flush all pending calls
-		while( true ) {
+		while (true) {
 			#if sys
 			mutex.acquire();
 			var f = pending.shift();
@@ -98,11 +92,12 @@ class EntryPoint {
 			#else
 			var f = pending.shift();
 			#end
-			if( f == null ) break;
+			if (f == null)
+				break;
 			f();
 		}
 		var time = @:privateAccess MainLoop.tick();
-		if( !MainLoop.hasEvents() && threadCount == 0 )
+		if (!MainLoop.hasEvents() && threadCount == 0)
 			return -1;
 		return time;
 	}
@@ -112,37 +107,29 @@ class EntryPoint {
 	**/
 	@:keep public static function run() @:privateAccess {
 		#if js
-
 		var nextTick = processEvents();
 
 		#if nodejs
-		if( nextTick < 0 )
+		if (nextTick < 0)
 			return;
-		(untyped setTimeout)(run,nextTick);
+		(untyped setTimeout) (run, nextTick);
 		#else
-		var window : Dynamic = js.Browser.window;
-		var rqf : Dynamic = window.requestAnimationFrame ||
-			window.webkitRequestAnimationFrame ||
-			window.mozRequestAnimationFrame;
+		var window:Dynamic = js.Browser.window;
+		var rqf:Dynamic = window.requestAnimationFrame || window.webkitRequestAnimationFrame || window.mozRequestAnimationFrame;
 		rqf(run);
 		#end
-
 		#elseif flash
-
 		flash.Lib.current.stage.addEventListener(flash.events.Event.ENTER_FRAME, function(_) processEvents());
-
 		#elseif sys
-		while( true ) {
+		while (true) {
 			var nextTick = processEvents();
-			if( nextTick < 0 )
+			if (nextTick < 0)
 				break;
-			if( nextTick > 0 )
+			if (nextTick > 0)
 				sleepLock.wait(nextTick); // wait until nextTick or wakeup() call
 		}
 		#else
-
 		// no implementation available, let's exit immediately
-
 		#end
 	}
 }

+ 50 - 0
std/hl/_std/sys/thread/Lock.hx

@@ -0,0 +1,50 @@
+/*
+ * Copyright (C)2005-2019 Haxe Foundation
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a
+ * copy of this software and associated documentation files (the "Software"),
+ * to deal in the Software without restriction, including without limitation
+ * the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ * and/or sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ */
+
+package sys.thread;
+
+@:coreApi
+class Lock {
+	var deque:sys.thread.Deque<Bool>;
+
+	public function new():Void {
+		deque = new Deque<Null<Bool>>();
+	}
+
+	public function wait(?timeout:Float):Bool {
+		if (timeout == null) {
+			deque.pop(true);
+			return true;
+		}
+		var targetTime = haxe.Timer.stamp() + timeout;
+		do {
+			if (deque.pop(false) != null) {
+				return true;
+			}
+		} while (haxe.Timer.stamp() < targetTime);
+		return false;
+	}
+
+	public function release():Void {
+		deque.push(true);
+	}
+}

+ 16 - 57
std/java/_std/sys/thread/Deque.hx

@@ -19,74 +19,33 @@
  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
  * DEALINGS IN THE SOFTWARE.
  */
+
 package sys.thread;
+
 import java.Lib;
 
 @:coreApi
 @:native('haxe.java.vm.Deque')
-@:nativeGen class Deque<T>
-{
-	@:private var head:Node<T>;
-	@:private var tail:Node<T>;
-
-	public function new()
-	{
-		this.head = this.tail = new Node(null);
-	}
+@:nativeGen class Deque<T> {
+	var lbd:java.util.concurrent.LinkedBlockingDeque<T>;
 
-	public function add(i : T):Void
-	{
-		var n = new Node(i);
-		untyped __lock__(this,
-		{
-			tail.next = n;
-			tail = n;
-			try { untyped this.notify(); } catch(e:Dynamic) { throw e; }
-		});
+	public function new() {
+		lbd = new java.util.concurrent.LinkedBlockingDeque<T>();
 	}
 
-	public function push(i : T):Void
-	{
-		var n = new Node(i);
-		untyped __lock__(this,
-		{
-			n.next = head.next;
-			head.next = n;
-			try { untyped this.notify(); } catch(e:Dynamic) { throw e; }
-		});
+	public function add(i:T):Void {
+		lbd.add(i);
 	}
 
-	public function pop(block : Bool) : Null<T>
-	{
-		var ret = null;
-		untyped __lock__(this, {
-			var n = null;
-			do {
-				n = head.next;
-				if (n != null)
-				{
-					ret = n.value;
-					n.value = null;
-					head = n;
-				} else if (block) {
-					//block
-					try { untyped this.wait(); } catch(e:Dynamic) { throw e; }
-				}
-			} while( block && n == null );
-		});
-		return ret;
+	public function push(i:T):Void {
+		lbd.push(i);
 	}
-}
-
-@:native('haxe.java.vm.DequeNode')
-@:nativeGen
-class Node<T>
-{
-	public var value:T;
-	public var next:Node<T>;
 
-	public function new(val)
-	{
-		this.value = val;
+	public inline function pop(block:Bool):Null<T> {
+		return if (block) {
+			lbd.take();
+		} else {
+			lbd.poll();
+		}
 	}
 }

+ 3 - 3
tests/runci/targets/Cpp.hx

@@ -68,9 +68,9 @@ class Cpp {
 		runCommand("haxe", ["compile-cpp.hxml"]);
 		runCpp("bin/cpp/Main-debug", []);
 
-		// changeDirectory(threadsDir);
-		// runCommand("haxe", ["build.hxml", "-cpp", "export/cpp"]);
-		// runCpp("export/cpp/Main");
+		changeDirectory(threadsDir);
+		runCommand("haxe", ["build.hxml", "-cpp", "export/cpp"]);
+		runCpp("export/cpp/Main");
 
 		// if (Sys.systemName() == "Mac")
 		// {

+ 3 - 3
tests/runci/targets/Java.hx

@@ -23,9 +23,9 @@ class Java {
 		runCommand("haxe", ["compile-java.hxml"]);
 		runCommand("java", ["-jar", "bin/java/Main-Debug.jar"]);
 
-		// changeDirectory(threadsDir);
-		// runCommand("haxe", ["build.hxml", "-java", "export/java"]);
-		// runCommand("java", ["-jar", "export/java/Main.jar"]);
+		changeDirectory(threadsDir);
+		runCommand("haxe", ["build.hxml", "-java", "export/java"]);
+		runCommand("java", ["-jar", "export/java/Main.jar"]);
 
 		infoMsg("Testing java-lib extras");
 		changeDirectory('$unitDir/bin');

+ 1 - 7
tests/threads/src/Main.hx

@@ -1,16 +1,10 @@
-import cases.WeirdTreeSum;
 import utest.Runner;
 import utest.ui.Report;
 
 class Main {
 	static function main() {
 		var runner = new Runner();
-		runner.addCase(new cases.WeirdTreeSum());
-		#if !hl // no Lock
-		#if !java // Deque broken?
-		runner.addCase(new cases.DequeBrackets());
-		#end
-		#end
+		runner.addCases("cases");
 		var report = Report.create(runner);
 		report.displayHeader = AlwaysShowHeader;
 		report.displaySuccessResults = NeverShowSuccessResults;

+ 64 - 60
tests/threads/src/cases/DequeBrackets.hx

@@ -12,70 +12,74 @@ class DequeBrackets implements ITest {
 		one is placed in front, the closing one in the back. This is going
 		to result in something like `([{<>}])` which we check for at the end.
 	**/
-	public function test() {
-		Sys.println("Running DequeBrackets");
-		var deque = new Deque();
-		var dequeMutex = new Mutex();
-		function add(open:String, close:String) {
-			dequeMutex.acquire();
-			deque.push(open);
-			deque.add(close);
-			dequeMutex.release();
-		}
-
-		var pairs = [
-			{open: "(", close: ")"},
-			{open: "[", close: "]"},
-			{open: "{", close: "}"},
-			{open: "<", close: ">"}
-		];
-		var iterationsPerThread = 100;
-
-		var lock = new Lock();
-		var self = Thread.current();
+	@:timeout(2000)
+	public function test(async:utest.Async) {
 		Thread.create(() -> {
-			for (_ in 0...pairs.length) {
-				Assert.isTrue(lock.wait(2.));
+			Sys.println("Running DequeBrackets");
+			var deque = new Deque();
+			var dequeMutex = new Mutex();
+			function add(open:String, close:String) {
+				dequeMutex.acquire();
+				deque.push(open);
+				deque.add(close);
+				dequeMutex.release();
 			}
-			self.sendMessage("done");
-		});
-		var threads = [];
-		for (pair in pairs) {
-			threads.push(Thread.create(() -> {
-				Thread.readMessage(true);
-				for (_ in 0...iterationsPerThread) {
-					add(pair.open, pair.close);
-					Sys.sleep(0.001); // sleep a bit to increase chaos
+
+			var pairs = [
+				{open: "(", close: ")"},
+				{open: "[", close: "]"},
+				{open: "{", close: "}"},
+				{open: "<", close: ">"}
+			];
+			var iterationsPerThread = 100;
+
+			var lock = new Lock();
+			var self = Thread.current();
+			Thread.create(() -> {
+				for (_ in 0...pairs.length) {
+					Assert.isTrue(lock.wait());
 				}
-				lock.release();
-			}));
-		}
-		for (thread in threads) {
-			thread.sendMessage("go");
-		}
-		switch (Thread.readMessage(true)) {
-			case "done":
-			case s:
-				Assert.fail("Unexpected message: " + s);
-		}
-		var stack = new GenericStack<String>();
-		function pop() {
-			return deque.pop(false);
-		}
-		for (_ in 0...pairs.length * iterationsPerThread) {
-			stack.add(pop());
-		}
-		for (elt in stack) {
-			var expected = switch (elt) {
-				case "(": ")";
-				case "<": ">";
-				case "{": "}";
-				case "[": "]";
+				self.sendMessage("done");
+			});
+			var threads = [];
+			for (pair in pairs) {
+				threads.push(Thread.create(() -> {
+					Thread.readMessage(true);
+					for (_ in 0...iterationsPerThread) {
+						add(pair.open, pair.close);
+						Sys.sleep(0.0001); // sleep a bit to increase chaos
+					}
+					lock.release();
+				}));
+			}
+			for (thread in threads) {
+				thread.sendMessage("go");
+			}
+			switch (Thread.readMessage(true)) {
+				case "done":
 				case s:
-					Assert.fail("Unexpected " + s);
-					s;
+					Assert.fail("Unexpected message: " + s);
+			}
+			var stack = new GenericStack<String>();
+			function pop() {
+				return deque.pop(false);
+			}
+			for (_ in 0...pairs.length * iterationsPerThread) {
+				stack.add(pop());
 			}
-			Assert.equals(expected, pop());
-		}
+			for (elt in stack) {
+				var expected = switch (elt) {
+					case "(": ")";
+					case "<": ">";
+					case "{": "}";
+					case "[": "]";
+					case s:
+						Assert.fail("Unexpected " + s);
+						s;
+				}
+				Assert.equals(expected, pop());
+			}
+			async.done();
+		});
 	}
 }

+ 51 - 0
tests/threads/src/cases/Issue3767.hx

@@ -0,0 +1,51 @@
+package cases;
+
+import utest.Assert;
+import utest.ITest;
+
+class Issue3767 implements ITest {
+	public function new() { }
+
+	#if java
+
+	@:timeout(5000)
+	function testBasicLock(async:utest.Async) {
+		Thread.create(() -> {
+			var lock = new Lock();
+			//it starts locked
+			Assert.isFalse(lock.wait(0.001));
+			lock.release();
+			Assert.isTrue(lock.wait(.001));
+			Assert.isFalse(lock.wait(.001));
+			Assert.isFalse(lock.wait(.001));
+
+			lock.release();
+			Assert.isTrue(lock.wait());
+			lock.release();
+			lock.release();
+			lock.release();
+			Assert.isTrue(lock.wait());
+			Assert.isTrue(lock.wait(.001));
+			Assert.isTrue(lock.wait());
+			Assert.isFalse(lock.wait(.001));
+
+			var cur = Sys.time();
+			Thread.create(function()
+			{
+				Sys.sleep(.01);
+				lock.release();
+			});
+			Assert.isTrue(lock.wait(2.0));
+			Assert.isTrue( (Sys.time() - cur) < 2 );
+			Thread.create(function()
+			{
+				Sys.sleep(.01);
+				lock.release();
+			});
+			Assert.isTrue(lock.wait());
+			async.done();
+		});
+	}
+
+	#end
+}

+ 32 - 0
tests/threads/src/cases/Issue4878.hx

@@ -0,0 +1,32 @@
+package cases;
+
+import utest.Async;
+import utest.Assert;
+import utest.ITest;
+
+class Issue4878 implements ITest {
+	public function new() { }
+
+	#if java
+
+	@:timeout(5000)
+	function test(async:Async) {
+		Thread.create(() -> {
+			var mutex = new Mutex();
+			Thread.create(function() {
+				mutex.acquire();
+				mutex.acquire();
+				mutex.release();
+				Sys.sleep(.2);
+				mutex.release();
+			});
+			Sys.sleep(0.05);
+			Assert.isFalse(mutex.tryAcquire());
+			Sys.sleep(.3);
+			Assert.isTrue(mutex.tryAcquire());
+			async.done();
+		});
+	}
+
+	#end
+}

+ 16 - 9
tests/unit/src/unit/TestThreads.hx → tests/threads/src/cases/TestThreads.hx

@@ -1,16 +1,23 @@
-package unit;
+package cases;
+
+import utest.Async;
 import utest.Assert;
-import sys.thread.Thread;
-import sys.thread.Deque;
-import sys.thread.Lock;
-import sys.thread.Tls;
-import sys.thread.Mutex;
 
-class TestThreads extends Test
+class TestThreads implements utest.ITest
 {
+	public function new() { }
+
+	@:timeout(40000)
+	function testSort(async:Async) {
+		Thread.create(() -> {
+			doTestSort();
+			async.done();
+		});
+	}
 
-	private function testSort()
+	private function doTestSort()
 	{
+		Sys.println("Running TestThreads");
 		var ts = new ThreadSort();
 #if java
 		ts.maxVal *= 10;
@@ -33,7 +40,7 @@ class TestThreads extends Test
 									try
 									{
 										ts.run();
-										t(true);
+										Assert.pass("ok");
 									}
 									catch(e:Dynamic)
 									{

+ 14 - 10
tests/threads/src/cases/WeirdTreeSum.hx

@@ -57,16 +57,20 @@ typedef TreeNode<T> = {
 class WeirdTreeSum implements utest.ITest {
 	public function new() {}
 
-	public function test() {
-		Sys.println("Running WeirdTreeSum");
-		var fileContent = File.getContent("res/tree1.txt");
-		var buf = new StringBuf();
-		buf.add("(1)\n");
-		for (i in 0...10) {
-			buf.add(fileContent + "\n");
-		}
-		var tree = parseTree(buf.toString().trim())[0];
-		compare(tree);
+	@:timeout(2000)
+	public function test(async:utest.Async) {
+		Thread.create(() -> {
+			Sys.println("Running WeirdTreeSum");
+			var fileContent = File.getContent("res/tree1.txt");
+			var buf = new StringBuf();
+			buf.add("(1)\n");
+			for (i in 0...10) {
+				buf.add(fileContent + "\n");
+			}
+			var tree = parseTree(buf.toString().trim())[0];
+			compare(tree);
+			async.done();
+		});
 	}
 
 	static function compare(tree:Tree<Int>) {

+ 0 - 6
tests/unit/src/unit/TestMain.hx

@@ -108,12 +108,6 @@ class TestMain {
 			//new TestRemoting(),
 		];
 
-		#if ( (java || neko || cpp || eval) && !macro)
-		if (Sys.getEnv("TRAVIS") != null) {
-			classes.push(new TestThreads());
-		}
-		#end
-
 		TestIssues.addIssueClasses("src/unit/issues", "unit.issues");
 		TestIssues.addIssueClasses("src/unit/hxcpp_issues", "unit.hxcpp_issues");
 

+ 0 - 46
tests/unit/src/unit/issues/Issue3767.hx

@@ -1,46 +0,0 @@
-package unit.issues;
-import unit.Test;
-#if (java || neko || cpp)
-import sys.thread.*;
-#end
-
-class Issue3767 extends Test
-{
-#if (java || (neko && !interp && !macro) || (cpp && !emscripten))
-	function testBasicLock()
-	{
-		var lock = new Lock();
-		//it starts locked
-		f(lock.wait(0.001));
-		lock.release();
-		t(lock.wait(.001));
-		f(lock.wait(.001));
-		f(lock.wait(.001));
-
-		lock.release();
-		t(lock.wait());
-		lock.release();
-		lock.release();
-		lock.release();
-		t(lock.wait());
-		t(lock.wait(.001));
-		t(lock.wait());
-		f(lock.wait(.001));
-
-		var cur = Sys.time();
-		Thread.create(function()
-		{
-			Sys.sleep(.01);
-			lock.release();
-		});
-		t(lock.wait(2.0));
-		t( (Sys.time() - cur) < 2 );
-		Thread.create(function()
-		{
-			Sys.sleep(.01);
-			lock.release();
-		});
-		t(lock.wait());
-	}
-#end
-}

+ 0 - 24
tests/unit/src/unit/issues/Issue4878.hx

@@ -1,24 +0,0 @@
-package unit.issues;
-#if java
-import sys.thread.Thread;
-import sys.thread.Mutex;
-#end
-
-class Issue4878 extends Test {
-	#if java
-  function test() {
-    var mutex = new Mutex();
-    var thread = Thread.create(function() {
-      mutex.acquire();
-      mutex.acquire();
-      mutex.release();
-      Sys.sleep(.2);
-      mutex.release();
-    });
-    Sys.sleep(0.05);
-    f(mutex.tryAcquire());
-    Sys.sleep(.3);
-    t(mutex.tryAcquire());
-  }
-  #end
-}