Przeglądaj źródła

Threading API for Python (#9754)

* Define target.threaded for Python

* Isolate Issue4878 test for Python

* Stub class for sys.thread.Mutex

* Implement Thread.create() for python

* Use python's Thread.start(), not .run()

* Implement Mutex with Python's RLock

* Fix Mutex.tryAcquire() for Python

* Implement thread.Deque for Python

* Isolate DequeBrackets test

* Fix Deque.pop() for python

* Implement rest of Thread for python

* Implement Lock for Python

* Simplify python Lock using Semaphore

Co-authored-by: Jonas Malaco <[email protected]>

* Make python Mutex an abstract of RLock

Co-authored-by: Jonas Malaco <[email protected]>

* Only catch IndexError in Deque.pop()

* Make python Deque.pop() atomic

* Implement Tls for python

* Use Mutex, not Lock, in python Deque.pop

* Use python Condition instead of hop loop in Deque

Co-authored-by: Jonas Malaco <[email protected]>

* Run all thread tests for Python

* Clean threads after they finish

* Fix last commit

* Requested changes to Thread.hx

Co-authored-by: Jonas Malaco <[email protected]>
Co-authored-by: Jonas Malaco <[email protected]>
Nat Quayle Nelson 5 lat temu
rodzic
commit
7108322cee

+ 1 - 0
src/context/common.ml

@@ -604,6 +604,7 @@ let get_config com =
 			pf_static = false;
 			pf_capture_policy = CPLoopVars;
 			pf_uses_utf16 = false;
+			pf_supports_threads = true;
 			pf_exceptions = { default_config.pf_exceptions with
 				ec_native_throws = [
 					["python";"Exceptions"],"BaseException";

+ 83 - 0
std/python/_std/sys/thread/Deque.hx

@@ -0,0 +1,83 @@
+/*
+ * Copyright (C)2005-2019 Haxe Foundation
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a
+ * copy of this software and associated documentation files (the "Software"),
+ * to deal in the Software without restriction, including without limitation
+ * the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ * and/or sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ */
+
+package sys.thread;
+
+using python.internal.UBuiltins;
+
+class Deque<T> {
+	var deque:NativeDeque<T>;
+	var lock:NativeCondition;
+
+	public function new() {
+		deque = new NativeDeque<T>();
+		lock = new NativeCondition();
+	}
+
+	public function add(i:T) {
+		lock.acquire();
+		deque.append(i);
+		lock.notify();
+		lock.release();
+	}
+
+	public function push(i:T) {
+		lock.acquire();
+		deque.appendleft(i);
+		lock.notify();
+		lock.release();
+	}
+
+	public function pop(block:Bool):Null<T> {
+		var ret = null;
+		lock.acquire();
+		if (block) {
+			lock.wait_for(() -> deque.bool());
+			ret = deque.popleft();
+		} else if (deque.bool()) {
+			ret = deque.popleft();
+		}
+		lock.release();
+		return ret;
+	}
+}
+
+@:pythonImport("collections", "deque")
+@:native("deque")
+extern class NativeDeque<T> {
+	function new();
+	function append(x:T):Void;
+	function appendleft(x:T):Void;
+	function popleft():T;
+}
+
+@:pythonImport("threading", "Condition")
+@:native("Condition")
+private extern class NativeCondition {
+	function new(?lock:Dynamic);
+	function acquire(blocking:Bool = true, timeout:Float = -1):Bool;
+	function release():Void;
+	function wait(?timeout:Float):Bool;
+	function wait_for(predicate:()->Bool, ?timeout:Float):Bool;
+	function notify(n:Int = 1):Void;
+	function notify_all():Void;
+}

+ 42 - 0
std/python/_std/sys/thread/Lock.hx

@@ -0,0 +1,42 @@
+/*
+ * Copyright (C)2005-2019 Haxe Foundation
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a
+ * copy of this software and associated documentation files (the "Software"),
+ * to deal in the Software without restriction, including without limitation
+ * the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ * and/or sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ */
+
+package sys.thread;
+
+@:forward(release)
+abstract Lock(NativeSemaphore) {
+	public inline function new() {
+		this = new NativeSemaphore(0);
+	}
+
+	public inline function wait(?timeout:Float):Bool {
+		return this.acquire(true, timeout);
+	}
+}
+
+@:pythonImport("threading", "Semaphore")
+@:native("Lock")
+private extern class NativeSemaphore {
+	function new(value:Int);
+	function acquire(blocking:Bool = true, ?timeout:Float):Bool;
+	function release():Void;
+}

+ 45 - 0
std/python/_std/sys/thread/Mutex.hx

@@ -0,0 +1,45 @@
+/*
+ * Copyright (C)2005-2019 Haxe Foundation
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a
+ * copy of this software and associated documentation files (the "Software"),
+ * to deal in the Software without restriction, including without limitation
+ * the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ * and/or sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ */
+
+package sys.thread;
+
+@:forward("release")
+abstract Mutex(NativeRLock) {
+	inline public function new() {
+		this = new NativeRLock();
+	}
+
+	inline public function acquire():Void {
+		this.acquire(true);
+	}
+
+	inline public function tryAcquire():Bool {
+		return this.acquire(false);
+	}
+}
+
+@:pythonImport("threading", "RLock")
+extern class NativeRLock {
+	function new():Void;
+	function acquire(blocking:Bool):Bool;
+	function release():Void;
+}

+ 94 - 0
std/python/_std/sys/thread/Thread.hx

@@ -0,0 +1,94 @@
+/*
+ * Copyright (C)2005-2019 Haxe Foundation
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a
+ * copy of this software and associated documentation files (the "Software"),
+ * to deal in the Software without restriction, including without limitation
+ * the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ * and/or sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ */
+
+package sys.thread;
+
+class Thread {
+	var nativeThread: NativeThread;
+	var messages: Deque<Dynamic>;
+
+	static var threads = new haxe.ds.ObjectMap<NativeThread, Thread>();
+	static var threadsMutex: Mutex = new Mutex();
+	static var mainThread: Thread;
+
+	private function new(t:NativeThread) {
+		nativeThread = t;
+		messages = new Deque<Dynamic>();
+	}
+
+	public function sendMessage(msg:Dynamic):Void {
+		messages.add(msg);
+	}
+
+	public static function current():Thread {
+		threadsMutex.acquire();
+		var ct = PyThreadingAPI.current_thread();
+		if (ct == PyThreadingAPI.main_thread()) {
+			if (mainThread == null) mainThread = new Thread(ct);
+			threadsMutex.release();
+			return mainThread;
+		}
+		// If the current thread was not created via the haxe API, it can still be wrapped
+		if (!threads.exists(ct)) {
+			threads.set(ct, new Thread(ct));
+		}
+		var t = threads.get(ct);
+		threadsMutex.release();
+		return t;
+	}
+
+	public static function create(callb:Void->Void):Thread {
+		var nt:NativeThread = null;
+		// Wrap the callback so it will clear the thread reference once the thread is finished
+		var wrappedCallB = () -> { 
+			callb();
+			threadsMutex.acquire();
+			threads.remove(nt);
+			threadsMutex.release();
+		}
+		nt = new NativeThread(null, wrappedCallB);
+		var t = new Thread(nt);
+		threadsMutex.acquire();
+		threads.set(nt, t);
+		threadsMutex.release();
+		nt.start();
+		return t;
+	}
+
+	public static function readMessage(block:Bool):Dynamic {
+		return current().messages.pop(block);
+	}
+}
+
+@:pythonImport("threading", "Thread")
+@:native("Thread")
+private extern class NativeThread {
+	function new(group:Dynamic, target:Void->Void);
+	function start():Void;
+}
+
+@:pythonImport("threading")
+@:native("threading")
+private extern class PyThreadingAPI {
+	static function current_thread():NativeThread;
+	static function main_thread():NativeThread;
+}

+ 30 - 0
std/python/_std/sys/thread/Tls.hx

@@ -0,0 +1,30 @@
+/*
+ * Copyright (C)2005-2019 Haxe Foundation
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a
+ * copy of this software and associated documentation files (the "Software"),
+ * to deal in the Software without restriction, including without limitation
+ * the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ * and/or sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ */
+
+package sys.thread;
+
+@:pythonImport("threading", "local")
+@:native("local")
+extern class Tls<T> {
+	function new():Void;
+	var value(default, default):T;
+}

+ 1 - 1
tests/threads/src/cases/Issue3767.hx

@@ -6,7 +6,7 @@ import utest.ITest;
 class Issue3767 implements ITest {
 	public function new() { }
 
-	#if java
+	#if (java || python)
 
 	@:timeout(5000)
 	function testBasicLock(async:utest.Async) {

+ 1 - 1
tests/threads/src/cases/Issue4878.hx

@@ -7,7 +7,7 @@ import utest.ITest;
 class Issue4878 implements ITest {
 	public function new() { }
 
-	#if java
+	#if (java || python)
 
 	@:timeout(5000)
 	function test(async:Async) {