Bläddra i källkod

Thread pool (#9952)

* sys.thread.ThreadPool

* [jvm] fix test

* IThreadPool

* better tests

* FixedThreadPool

* ElasticThreadPool

* [java] workaround Deque not allowing null items

* [java][jvm] sys.thread.Deque docs about null items

* fix tests
Aleksandr Kuzmenko 4 år sedan
förälder
incheckning
e2ff34c741

+ 4 - 0
std/sys/thread/Deque.hx

@@ -38,11 +38,15 @@ package sys.thread;
 
 	/**
 		Adds an element at the end of `this` Deque.
+
+		(Java,Jvm): throws `java.lang.NullPointerException` if `i` is `null`.
 	**/
 	function add(i:T):Void;
 
 	/**
 		Adds an element at the front of `this` Deque.
+
+		(Java,Jvm): throws `java.lang.NullPointerException` if `i` is `null`.
 	**/
 	function push(i:T):Void;
 

+ 198 - 0
std/sys/thread/ElasticThreadPool.hx

@@ -0,0 +1,198 @@
+/*
+ * Copyright (C)2005-2019 Haxe Foundation
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a
+ * copy of this software and associated documentation files (the "Software"),
+ * to deal in the Software without restriction, including without limitation
+ * the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ * and/or sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ */
+
+package sys.thread;
+
+#if (!target.threaded)
+#error "This class is not available on this target"
+#end
+
+import haxe.Exception;
+
+/**
+	Thread pool with a varying amount of threads.
+
+	A new thread is spawned every time a task is submitted while all existing
+	threads are busy.
+**/
+@:coreApi
+class ElasticThreadPool implements IThreadPool {
+	/* Amount of alive threads in this pool. */
+	public var threadsCount(get,null):Int;
+	/* Maximum amount of threads in this pool. */
+	public var maxThreadsCount:Int;
+	/** Indicates if `shutdown` method of this pool has been called. */
+	public var isShutdown(get,never):Bool;
+	var _isShutdown = false;
+	function get_isShutdown():Bool return _isShutdown;
+
+	final pool:Array<Worker> = [];
+	final queue = new Deque<()->Void>();
+	final mutex = new Mutex();
+	final threadTimeout:Float;
+
+	/**
+		Create a new thread pool with `threadsCount` threads.
+
+		If a worker thread does not receive a task for `threadTimeout` seconds it
+		is terminated.
+	**/
+	public function new(maxThreadsCount:Int, threadTimeout:Float = 60):Void {
+		if(maxThreadsCount < 1)
+			throw new ThreadPoolException('ElasticThreadPool needs maxThreadsCount to be at least 1.');
+		this.maxThreadsCount = maxThreadsCount;
+		this.threadTimeout = threadTimeout;
+	}
+
+	/**
+		Submit a task to run in a thread.
+
+		Throws an exception if the pool is shut down.
+	**/
+	public function run(task:()->Void):Void {
+		if(_isShutdown)
+			throw new ThreadPoolException('Task is rejected. Thread pool is shut down.');
+		if(task == null)
+			throw new ThreadPoolException('Task to run must not be null.');
+
+		mutex.acquire();
+		var submitted = false;
+		var deadWorker = null;
+		for(worker in pool) {
+			if(deadWorker == null && worker.dead) {
+				deadWorker = worker;
+			}
+			if(worker.task == null) {
+				submitted = true;
+				worker.wakeup(task);
+				break;
+			}
+		}
+		if(!submitted) {
+			if(deadWorker != null) {
+				deadWorker.wakeup(task);
+			} else if(pool.length < maxThreadsCount) {
+				var worker = new Worker(queue, threadTimeout);
+				pool.push(worker);
+				worker.wakeup(task);
+			} else {
+				queue.add(task);
+			}
+		}
+		mutex.release();
+	}
+
+	/**
+		Initiates a shutdown.
+		All previousely submitted tasks will be executed, but no new tasks will
+		be accepted.
+
+		Multiple calls to this method have no effect.
+	**/
+	public function shutdown():Void {
+		if(_isShutdown) return;
+		mutex.acquire();
+		_isShutdown = true;
+		for(worker in pool) {
+			worker.shutdown();
+		}
+		mutex.release();
+	}
+
+	function get_threadsCount():Int {
+		var result = 0;
+		for(worker in pool)
+			if(!worker.dead)
+				++result;
+		return result;
+	}
+}
+
+private class Worker {
+	public var task(default,null):Null<()->Void>;
+	public var dead(default,null) = false;
+
+	final deathMutex = new Mutex();
+	final waiter = new Lock();
+	final queue:Deque<()->Void>;
+	final timeout:Float;
+	var thread:Thread;
+	var isShutdown = false;
+
+	public function new(queue:Deque<()->Void>, timeout:Float) {
+		this.queue = queue;
+		this.timeout = timeout;
+		start();
+	}
+
+	public function wakeup(task:()->Void) {
+		deathMutex.acquire();
+		if(dead)
+			start();
+		this.task = task;
+		waiter.release();
+		deathMutex.release();
+	}
+
+	public function shutdown() {
+		isShutdown = true;
+		waiter.release();
+	}
+
+	function start() {
+		dead = false;
+		thread = Thread.create(loop);
+	}
+
+	function loop() {
+		try {
+			while(waiter.wait(timeout)) {
+				switch task {
+					case null:
+						if(isShutdown)
+							break;
+					case fn:
+						fn();
+						//if more tasks were added while all threads were busy
+						while(true) {
+							switch queue.pop(false) {
+								case null: break;
+								case fn: fn();
+							}
+						}
+						task = null;
+				}
+			}
+			deathMutex.acquire();
+			//in case a task was submitted right after the lock timed out
+			if(task != null)
+				start()
+			else
+				dead = true;
+			deathMutex.release();
+		} catch(e) {
+			task = null;
+			start();
+			throw e;
+		}
+	}
+}

+ 116 - 0
std/sys/thread/FixedThreadPool.hx

@@ -0,0 +1,116 @@
+/*
+ * Copyright (C)2005-2019 Haxe Foundation
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a
+ * copy of this software and associated documentation files (the "Software"),
+ * to deal in the Software without restriction, including without limitation
+ * the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ * and/or sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ */
+
+package sys.thread;
+
+#if (!target.threaded)
+#error "This class is not available on this target"
+#end
+
+import haxe.Exception;
+
+/**
+	Thread pool with a constant amount of threads.
+	Threads in the pool will exist until the pool is explicitly shut down.
+**/
+@:coreApi
+class FixedThreadPool implements IThreadPool {
+	/* Amount of threads in this pool. */
+	public var threadsCount(get,null):Int;
+	function get_threadsCount():Int return threadsCount;
+
+	/** Indicates if `shutdown` method of this pool has been called. */
+	public var isShutdown(get,never):Bool;
+	var _isShutdown = false;
+	function get_isShutdown():Bool return _isShutdown;
+
+	final pool:Array<Worker>;
+	final poolMutex = new Mutex();
+	final queue = new Deque<()->Void>();
+
+	/**
+		Create a new thread pool with `threadsCount` threads.
+	**/
+	public function new(threadsCount:Int):Void {
+		if(threadsCount < 1)
+			throw new ThreadPoolException('FixedThreadPool needs threadsCount to be at least 1.');
+		this.threadsCount = threadsCount;
+		pool = [for(i in 0...threadsCount) new Worker(queue)];
+	}
+
+	/**
+		Submit a task to run in a thread.
+
+		Throws an exception if the pool is shut down.
+	**/
+	public function run(task:()->Void):Void {
+		if(_isShutdown)
+			throw new ThreadPoolException('Task is rejected. Thread pool is shut down.');
+		if(task == null)
+			throw new ThreadPoolException('Task to run must not be null.');
+		queue.add(task);
+	}
+
+	/**
+		Initiates a shutdown.
+		All previousely submitted tasks will be executed, but no new tasks will
+		be accepted.
+
+		Multiple calls to this method have no effect.
+	**/
+	public function shutdown():Void {
+		if(_isShutdown) return;
+		_isShutdown = true;
+		for(_ in pool) {
+			queue.add(shutdownTask);
+		}
+	}
+
+	static function shutdownTask():Void {
+		throw new ShutdownException('');
+	}
+}
+
+private class ShutdownException extends Exception {}
+
+private class Worker {
+	var thread:Thread;
+	final queue:Deque<Null<()->Void>>;
+
+	public function new(queue:Deque<Null<()->Void>>) {
+		this.queue = queue;
+		thread = Thread.create(loop);
+	}
+
+	function loop() {
+		try {
+			while(true) {
+				var task = queue.pop(true);
+				task();
+			}
+		} catch(_:ShutdownException) {
+		} catch(e) {
+			thread = Thread.create(loop);
+			throw e;
+		}
+	}
+}

+ 51 - 0
std/sys/thread/IThreadPool.hx

@@ -0,0 +1,51 @@
+/*
+ * Copyright (C)2005-2019 Haxe Foundation
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a
+ * copy of this software and associated documentation files (the "Software"),
+ * to deal in the Software without restriction, including without limitation
+ * the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ * and/or sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ */
+
+package sys.thread;
+
+/**
+	A thread pool interface.
+**/
+interface IThreadPool {
+
+	/** Amount of alive threads in this pool. */
+	var threadsCount(get,never):Int;
+
+	/** Indicates if `shutdown` method of this pool has been called. */
+	var isShutdown(get,never):Bool;
+
+	/**
+		Submit a task to run in a thread.
+
+		Throws an exception if the pool is shut down.
+	**/
+	function run(task:()->Void):Void;
+
+	/**
+		Initiates a shutdown.
+		All previousely submitted tasks will be executed, but no new tasks will
+		be accepted.
+
+		Multiple calls to this method have no effect.
+	**/
+	function shutdown():Void;
+}

+ 6 - 0
std/sys/thread/ThreadPoolException.hx

@@ -0,0 +1,6 @@
+package sys.thread;
+
+import haxe.Exception;
+
+class ThreadPoolException extends Exception {
+}

+ 3 - 5
tests/runci/targets/Cpp.hx

@@ -74,11 +74,9 @@ class Cpp {
 			runCpp("bin/cpp/Main-debug", []);
 		}
 
-		// if (systemName != "Windows") { // TODO: find out why we keep getting "missed async calls" error
-		// 	changeDirectory(threadsDir);
-		// 	runCommand("haxe", ["build.hxml", "-cpp", "export/cpp"]);
-		// 	runCpp("export/cpp/Main");
-		// }
+		changeDirectory(threadsDir);
+		runCommand("haxe", ["build.hxml", "-cpp", "export/cpp"]);
+		runCpp("export/cpp/Main");
 
 		// if (Sys.systemName() == "Mac")
 		// {

+ 3 - 3
tests/runci/targets/Cs.hx

@@ -62,9 +62,9 @@ class Cs {
 		runCommand("haxe", ["compile-cs.hxml",'-D','fast_cast'].concat(args));
 		runCs("bin/cs/bin/Main-Debug.exe", []);
 
-		// changeDirectory(threadsDir);
-		// runCommand("haxe", ["build.hxml", "-cs", "export/cs"]);
-		// runCs("export/cs/bin/Main.exe");
+		changeDirectory(threadsDir);
+		runCommand("haxe", ["build.hxml", "-cs", "export/cs"]);
+		runCs("export/cs/bin/Main.exe");
 
 		changeDirectory(eventLoopDir);
 		runCommand("haxe", ["build.hxml", "--cs", "bin/cs"]);

+ 3 - 3
tests/runci/targets/Hl.hx

@@ -72,9 +72,9 @@ class Hl {
         runCommand("haxe", ["compile-hl.hxml"].concat(args));
         runCommand(hlBinary, ["bin/unit.hl"]);
 
-		// changeDirectory(threadsDir);
-		// runCommand("haxe", ["build.hxml", "-hl", "export/threads.hl"]);
-		// runCommand("hl", ["export/threads.hl"]);
+		changeDirectory(threadsDir);
+		runCommand("haxe", ["build.hxml", "-hl", "export/threads.hl"]);
+		runCommand("hl", ["export/threads.hl"]);
 
         changeDirectory(sysDir);
         runCommand("haxe", ["compile-hl.hxml"].concat(args));

+ 3 - 5
tests/runci/targets/Java.hx

@@ -35,11 +35,9 @@ class Java {
 		runCommand("haxe", ["build.hxml", "-java", "bin/java"].concat(args));
 		runCommand("java", ["-jar", "bin/java/Main.jar"]);
 
-		// changeDirectory(threadsDir);
-		// runCommand("haxe", ["build.hxml", "-java", "export/java"].concat(args));
-		// if (systemName != "Windows") { // #8154
-		// 	runCommand("java", ["-jar", "export/java/Main.jar"]);
-		// }
+		changeDirectory(threadsDir);
+		runCommand("haxe", ["build.hxml", "-java", "export/java"].concat(args));
+		runCommand("java", ["-jar", "export/java/Main.jar"]);
 
 		infoMsg("Testing java-lib extras");
 		changeDirectory('$unitDir/bin');

+ 3 - 5
tests/runci/targets/Jvm.hx

@@ -22,10 +22,8 @@ class Jvm {
 		runCommand("haxe", ["build.hxml", "--jvm", "bin/test.jar"].concat(args));
 		runCommand("java", ["-jar", "bin/test.jar"]);
 
-		// changeDirectory(threadsDir);
-		// runCommand("haxe", ["build.hxml", "--jvm", "export/threads.jar"].concat(args));
-		// if (systemName != "Windows") { // #8154
-		// 	runCommand("java", ["-jar", "export/threads.jar"]);
-		// }
+		changeDirectory(threadsDir);
+		runCommand("haxe", ["build.hxml", "--jvm", "export/threads.jar"].concat(args));
+		runCommand("java", ["-jar", "export/threads.jar"]);
 	}
 }

+ 2 - 2
tests/runci/targets/Macro.hx

@@ -40,7 +40,7 @@ class Macro {
 		changeDirectory(eventLoopDir);
 		runCommand("haxe", ["build.hxml"].concat(args).concat(["--interp"]));
 
-		// changeDirectory(threadsDir);
-		// runCommand("haxe", ["build.hxml", "--interp"]);
+		changeDirectory(threadsDir);
+		runCommand("haxe", ["build.hxml", "--interp"]);
 	}
 }

+ 3 - 3
tests/runci/targets/Neko.hx

@@ -17,8 +17,8 @@ class Neko {
 		runCommand("haxe", ["build.hxml", "--neko", "bin/test.n"]);
 		runCommand("neko", ["bin/test.n"]);
 
-		// changeDirectory(threadsDir);
-		// runCommand("haxe", ["build.hxml", "-neko", "export/threads.n"]);
-		// runCommand("neko", ["export/threads.n"]);
+		changeDirectory(threadsDir);
+		runCommand("haxe", ["build.hxml", "-neko", "export/threads.n"]);
+		runCommand("neko", ["export/threads.n"]);
 	}
 }

+ 4 - 0
tests/runci/targets/Python.hx

@@ -85,5 +85,9 @@ class Python {
 		for (py in pys) {
 			runCommand(py, ["bin/test.py"]);
 		}
+
+		changeDirectory(threadsDir);
+		runCommand("haxe", ["build.hxml", "--python", "export/threads.py"].concat(args));
+		runCommand("python3", ["export/threads.py"]);
 	}
 }

+ 3 - 0
tests/threads/.gitignore

@@ -0,0 +1,3 @@
+bin/*
+export/*
+dump/*

+ 5 - 0
tests/threads/build.hxml

@@ -0,0 +1,5 @@
+--class-path src
+--main Main
+--library utest
+--dce full
+-D analyzer-optimize

+ 11 - 0
tests/threads/src/Main.hx

@@ -0,0 +1,11 @@
+import utest.ui.Report;
+import utest.Runner;
+
+function main() {
+	var runner = new Runner();
+	var report = Report.create(runner);
+	report.displayHeader = AlwaysShowHeader;
+	report.displaySuccessResults = NeverShowSuccessResults;
+	runner.addCases('cases');
+	runner.run();
+}

+ 32 - 0
tests/threads/src/cases/TestElasticThreadPool.hx

@@ -0,0 +1,32 @@
+package cases;
+
+import sys.thread.IThreadPool;
+import sys.thread.ElasticThreadPool;
+
+class TestElasticThreadPool extends misc.TestThreadPoolBase {
+	override function createThreadPool(count:Int):IThreadPool {
+		return new ElasticThreadPool(count);
+	}
+
+	function testThreadTimeout() {
+		var timeout = 0.1;
+		var pool = new ElasticThreadPool(3, timeout);
+
+		for(_ in 0...3)
+			pool.run(() -> Sys.sleep(0.2));
+
+		//by the end of this sleep all threads should be terminated as timed out
+		Sys.sleep(1);
+
+		equals(0, pool.threadsCount);
+
+		//check we still can run tasks after all threads were terminated
+		var lock = new Lock();
+		for(_ in 0...3)
+			pool.run(() -> {
+				Sys.sleep(0.2);
+				lock.release();
+			});
+		assertReleased(lock, 3);
+	}
+}

+ 10 - 0
tests/threads/src/cases/TestFixedThreadPool.hx

@@ -0,0 +1,10 @@
+package cases;
+
+import sys.thread.IThreadPool;
+import sys.thread.FixedThreadPool;
+
+class TestFixedThreadPool extends misc.TestThreadPoolBase {
+	override function createThreadPool(count:Int):IThreadPool {
+		return new FixedThreadPool(count);
+	}
+}

+ 4 - 0
tests/threads/src/import.hx

@@ -0,0 +1,4 @@
+import utest.Assert.*;
+import utest.Async;
+import sys.thread.Thread;
+import sys.thread.Lock;

+ 104 - 0
tests/threads/src/misc/TestThreadPoolBase.hx

@@ -0,0 +1,104 @@
+package misc;
+
+import sys.thread.IThreadPool;
+import sys.thread.ThreadPoolException;
+import sys.thread.Deque;
+import sys.thread.Mutex;
+import haxe.Timer;
+
+abstract class TestThreadPoolBase extends utest.Test {
+
+	abstract function createThreadPool(count:Int):IThreadPool;
+
+	function assertReleased(lock:Lock, releaseCount:Int, ?timeout:Float = 2, ?pos:haxe.PosInfos) {
+		var releases = 0;
+		var timeouts = 0;
+		var timeoutTime = Timer.stamp() + timeout;
+		for(_ in 0...releaseCount) {
+			var timeout = Math.max(timeoutTime - Timer.stamp(), 0.1);
+			if(lock.wait(timeout))
+				++releases
+			else
+				++timeouts;
+		}
+		isTrue(timeouts == 0 && releases == releaseCount, 'Lock was released $releases times out of $releaseCount with $timeouts timeouts', pos);
+	}
+
+	function testCreateRunShutdown() {
+		var threadsCount = 5;
+		var tasksCount = threadsCount * 2;
+		var taskThreads = [];
+		var taskIds = [];
+		var lock = new Lock();
+		var mutex = new Mutex();
+
+		var pool = createThreadPool(threadsCount);
+		for(id in 0...tasksCount) {
+			pool.run(() -> {
+				Sys.sleep(0.2); // keep the thread busy until all the tasks are submitted
+				mutex.acquire();
+				taskThreads.push(Thread.current());
+				taskIds.push(id);
+				mutex.release();
+				lock.release();
+			});
+		}
+
+		//wait for all tasks to finish
+		assertReleased(lock, tasksCount);
+
+		//check we had `tasksCount` unique tasks
+		taskIds.sort(Reflect.compare);
+		var expected = [for(id in 0...tasksCount) id];
+		same(expected, taskIds, 'Expected $expected, but they are $taskIds');
+
+		//check each thread run two tasks
+		for(thread in taskThreads) {
+			var count = 0;
+			for(t in taskThreads)
+				if(t == thread)
+					++count;
+			if(count != 2)
+				fail('Some thread executed $count tasks instead of 2');
+		}
+		equals(threadsCount, pool.threadsCount);
+
+		pool.shutdown();
+		isTrue(pool.isShutdown);
+	}
+
+	function testShutdown_finishesSubmittedTasks() {
+		var tasksCount = 5;
+		var pool = createThreadPool(3);
+		var lock = new Lock();
+		for(_ in 0...tasksCount) {
+			pool.run(() -> {
+				Sys.sleep(0.2);
+				lock.release();
+			});
+		}
+		pool.shutdown();
+
+		assertReleased(lock, tasksCount);
+	}
+
+	function testShutdownRun_exception() {
+		var pool = createThreadPool(1);
+		pool.shutdown();
+		try {
+			pool.run(() -> {});
+			fail();
+		} catch(e:ThreadPoolException) {
+			pass();
+		}
+	}
+
+	function testMultipleShutdown() {
+		var pool = createThreadPool(1);
+		pool.run(() -> Sys.sleep(0.2));
+		pool.shutdown();
+		isTrue(pool.isShutdown);
+		pool.shutdown();
+		pass();
+	}
+}