ソースを参照

Semaphores and condition variables. (#10503)

* Semaphores and condition variables.

* Update Ocaml version

* Fix tests for neko and conditons for C#.

* Fix MacOS CI.

* Disable semaphores and condition variables for eval.
Also revert changes to Ocaml version.

* Implement semaphore using luv.

* Cleanup luv objects.
Remove timing dependent test case that fails sometimes.

* Make C# use System.Threading.Semaphore.
Remove commented out code from the .ml files.

* Remove more unused code.

* Add Semaphore and Condition to cppia HostClasses.
Zeta 3 年 前
コミット
bc6fb5c530

+ 33 - 0
std/cpp/_std/sys/thread/Condition.hx

@@ -0,0 +1,33 @@
+package sys.thread;
+
+@:coreApi
+class Condition {
+	var c:Dynamic;
+	public function new():Void {
+		c = untyped __global__.__hxcpp_condition_create();
+	}
+
+	public function acquire():Void {
+		untyped __global__.__hxcpp_condition_acquire(c);
+	}
+
+	public function tryAcquire():Bool {
+		return untyped __global__.__hxcpp_condition_try_acquire(c);
+	}
+
+	public function release():Void {
+		untyped __global__.__hxcpp_condition_release(c);
+	}
+
+	public function wait():Void {
+		untyped __global__.__hxcpp_condition_wait(c);
+	}
+
+	public function signal():Void {
+		untyped __global__.__hxcpp_condition_signal(c);
+	}
+
+	public function broadcast():Void {
+		untyped __global__.__hxcpp_condition_broadcast(c);
+	}
+}

+ 22 - 0
std/cpp/_std/sys/thread/Semaphore.hx

@@ -0,0 +1,22 @@
+package sys.thread;
+
+@:coreApi
+class Semaphore {
+	var m:Dynamic;
+
+	public function new(value:Int) {
+		m = untyped __global__.__hxcpp_semaphore_create(value);
+	}
+
+	public function acquire():Void {
+		untyped __global__.__hxcpp_semaphore_acquire(m);
+	}
+
+	public function tryAcquire(?timeout:Float):Bool {
+		return untyped __global__.__hxcpp_semaphore_try_acquire(m, timeout == null ? 0 : (timeout:Float));
+	}
+
+	public function release():Void {
+		untyped __global__.__hxcpp_semaphore_release(m);
+	}
+}

+ 2 - 0
std/cpp/cppia/HostClasses.hx

@@ -46,6 +46,8 @@ class HostClasses {
 		"sys.thread.Mutex",
 		"sys.thread.Thread",
 		"sys.thread.Tls",
+		"sys.thread.Semaphore",
+		"sys.thread.Condition",
 		"cpp.vm.ExecutionTrace",
 		"cpp.vm.Gc",
 		"cpp.vm.Profiler",

+ 37 - 0
std/cs/_std/sys/thread/Condition.hx

@@ -0,0 +1,37 @@
+package sys.thread;
+
+import cs.system.threading.Monitor;
+
+@:coreApi
+@:access(sys.thread.Mutex)
+class Condition {
+	final object:cs.system.Object;
+
+	public function new():Void {
+		this.object = new cs.system.Object();
+	}
+
+	public function acquire():Void {
+		Monitor.Enter(object);
+	}
+
+	public function tryAcquire():Bool {
+		return Monitor.TryEnter(object);
+	}
+
+	public function release():Void {
+		Monitor.Exit(object);
+	}
+
+	public function wait():Void {
+		Monitor.Wait(object);
+	}
+
+	public function signal():Void {
+		Monitor.Pulse(object);
+	}
+
+	public function broadcast():Void {
+		Monitor.PulseAll(object);
+	}
+}

+ 22 - 0
std/cs/_std/sys/thread/Semaphore.hx

@@ -0,0 +1,22 @@
+package sys.thread;
+
+@:coreApi
+class Semaphore {
+	final native:cs.system.threading.Semaphore;
+
+	public function new(value:Int):Void {
+		this.native = new cs.system.threading.Semaphore(value, 0x7FFFFFFF);
+	}
+
+	public function acquire():Void {
+		native.WaitOne();
+	}
+
+	public function tryAcquire(?timeout:Float):Bool {
+		return native.WaitOne(timeout == null ? 0 : Std.int(timeout * 1000));
+	}
+
+	public function release():Void {
+		native.Release();
+	}
+}

+ 41 - 0
std/eval/_std/sys/thread/Condition.hx

@@ -0,0 +1,41 @@
+package sys.thread;
+
+@:coreApi class Condition {
+	final cond:eval.luv.Condition;
+	final mutex:eval.luv.Mutex;
+
+	public function new():Void {
+		cond = eval.luv.Condition.init().resolve();
+		mutex = eval.luv.Mutex.init(true).resolve();
+		eval.vm.Gc.finalise(destroy, this);
+	}
+
+	static function destroy(cond:Condition):Void {
+		cond.cond.destroy();
+		cond.mutex.destroy();
+	}
+
+	public function acquire():Void {
+		mutex.lock();
+	}
+
+	public function tryAcquire():Bool {
+		return mutex.tryLock().isOk();
+	}
+
+	public function release():Void {
+		mutex.unlock();
+	}
+
+	public function wait():Void {
+		cond.wait(mutex);
+	}
+
+	public function signal():Void {
+		cond.signal();
+	}
+
+	public function broadcast():Void {
+		cond.broadcast();
+	}
+}

+ 36 - 0
std/eval/_std/sys/thread/Semaphore.hx

@@ -0,0 +1,36 @@
+package sys.thread;
+
+@:coreApi class Semaphore {
+	final native:eval.luv.Semaphore;
+
+	public function new(value:Int):Void {
+		native = eval.luv.Semaphore.init(value).resolve();
+		eval.vm.Gc.finalise(destroy, this);
+	}
+
+    static function destroy(sem:Semaphore):Void {
+        sem.native.destroy();
+    }
+
+	public function acquire():Void {
+		native.wait();
+	}
+
+	public function tryAcquire(?timeout:Float):Bool {
+		if (timeout == null) {
+			return native.tryWait().isOk();
+		} else {
+			var t = Sys.time() + timeout;
+			while (Sys.time() < t) {
+				if (native.tryWait().isOk()) {
+					return true;
+				}
+			}
+			return false;
+		}
+	}
+
+	public function release():Void {
+		native.post();
+	}
+}

+ 32 - 0
std/hl/_std/sys/thread/Condition.hx

@@ -0,0 +1,32 @@
+package sys.thread;
+
+abstract Condition(hl.Abstract<"hl_condition">) {
+	public function new():Void {
+		this = alloc();
+	}
+
+	@:hlNative("std", "condition_acquire")
+	public function acquire():Void {}
+
+	@:hlNative("std", "condition_try_acquire")
+	public function tryAcquire():Bool {
+		return false;
+	}
+
+	@:hlNative("std", "condition_release")
+	public function release():Void {}
+
+	@:hlNative("std", "condition_wait")
+	public function wait():Void {}
+
+	@:hlNative("std", "condition_signal")
+	public function signal():Void {}
+
+	@:hlNative("std", "condition_broadcast")
+	public function broadcast():Void {}
+
+	@:hlNative("std", "condition_alloc")
+	static function alloc():hl.Abstract<"hl_condition"> {
+		return null;
+	}
+}

+ 23 - 0
std/hl/_std/sys/thread/Semaphore.hx

@@ -0,0 +1,23 @@
+package sys.thread;
+
+abstract Semaphore(hl.Abstract<"hl_semaphore">) {
+	public function new(value:Int):Void {
+		this = alloc(value);
+	}
+
+	@:hlNative("std", "semaphore_acquire")
+	public function acquire():Void {}
+
+	@:hlNative("std", "semaphore_release")
+	public function release():Void {}
+
+	@:hlNative("std", "semaphore_try_acquire")
+	public function tryAcquire(?timeout:Float):Bool {
+		return false;
+	}
+
+	@:hlNative("std", "semaphore_alloc")
+	static function alloc(value:Int):hl.Abstract<"hl_semaphore"> {
+		return null;
+	}
+}

+ 45 - 0
std/java/_std/sys/thread/Condition.hx

@@ -0,0 +1,45 @@
+package sys.thread;
+
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.Condition as NativeCondition;
+
+@:access(sys.thread.Mutex)
+@:coreApi
+@:native('haxe.java.vm.Condition')
+class Condition {
+	final lock:ReentrantLock;
+	final native:NativeCondition;
+
+	public function new():Void {
+		this.lock = new ReentrantLock();
+		this.native = lock.newCondition();
+	}
+
+	public function acquire():Void {
+		lock.lock();
+	}
+
+	public function tryAcquire():Bool {
+		return this.lock.tryLock();
+	}
+
+	public function release():Void {
+		lock.unlock();
+	}
+
+	// without the @:native, you get "java.lang.VerifyError: class sys.thread.Condition overrides final method java.lang.Object.wait()V" on jvm
+	// and "wait() in Condition cannot override wait() in Object" from javac
+
+	@:native("waitOn")
+	public function wait():Void {
+		native.await();
+	}
+
+	public function signal():Void {
+		native.signal();
+	}
+
+	public function broadcast():Void {
+		native.signalAll();
+	}
+}

+ 25 - 0
std/java/_std/sys/thread/Semaphore.hx

@@ -0,0 +1,25 @@
+package sys.thread;
+
+import java.util.concurrent.TimeUnit;
+
+@:coreApi
+@:native('haxe.java.vm.Semaphore')
+class Semaphore {
+	final native:java.util.concurrent.Semaphore;
+
+	public function new(value:Int):Void {
+		this.native = new java.util.concurrent.Semaphore(value);
+	}
+
+	public function acquire():Void {
+		native.acquire();
+	}
+
+	public function tryAcquire(?timeout:Float):Bool {
+		return timeout == null ? native.tryAcquire() : native.tryAcquire(haxe.Int64.fromFloat(timeout * 1000000000),TimeUnit.NANOSECONDS);
+	}
+
+	public function release():Void {
+		native.release();
+	}
+}

+ 34 - 0
std/python/_std/sys/thread/Condition.hx

@@ -0,0 +1,34 @@
+package sys.thread;
+
+@:coreApi
+class Condition {
+	final cond:python.lib.threading.Condition;
+
+	public function new():Void {
+		this.cond = new python.lib.threading.Condition();
+	}
+
+	public function acquire():Void {
+		cond.acquire();
+	}
+
+	public function tryAcquire():Bool {
+		return cond.acquire(false);
+	}
+
+	public function release():Void {
+		cond.release();
+	}
+
+	public function wait():Void {
+		cond.wait();
+	}
+
+	public function signal():Void {
+		cond.notify();
+	}
+
+	public function broadcast():Void {
+		cond.notify_all();
+	}
+}

+ 24 - 0
std/python/_std/sys/thread/Semaphore.hx

@@ -0,0 +1,24 @@
+package sys.thread;
+
+import python.lib.threading.Semaphore as NativeSemaphore;
+
+@:coreApi
+class Semaphore {
+	final semaphore:NativeSemaphore;
+
+	public function new(value:Int):Void {
+		this.semaphore = new NativeSemaphore(value);
+	}
+
+	public function acquire():Void {
+		semaphore.acquire();
+	}
+
+	public function tryAcquire(?timeout:Float):Bool {
+		return timeout == null ? semaphore.acquire(false) : semaphore.acquire(true, timeout);
+	}
+
+	public function release():Void {
+		semaphore.release();
+	}
+}

+ 58 - 0
std/sys/thread/Condition.hx

@@ -0,0 +1,58 @@
+package sys.thread;
+
+#if (!target.threaded)
+#error "This class is not available on this target"
+#end
+
+/**
+	Creates a new condition variable.
+	Conditions variables can be used to block one or more threads at the same time,
+	until another thread modifies a shared variable (the condition)
+	and signals the condition variable.
+**/
+@:coreApi extern class Condition {
+	/**
+		Create a new condition variable.
+		A thread that waits on a newly created condition variable will block.
+	**/
+	function new():Void;
+
+	/**
+		Acquires the internal mutex.
+	**/
+	function acquire():Void;
+
+	/**
+		Tries to acquire the internal mutex.
+		@see `Mutex.tryAcquire`
+	**/
+	function tryAcquire():Bool;
+
+	/***
+		Releases the internal mutex.
+	**/
+	function release():Void;
+
+	/**
+		Atomically releases the mutex and blocks until the condition variable pointed is signaled by a call to
+		`signal` or to `broadcast`. When the calling thread becomes unblocked it
+		acquires the internal mutex.
+		The internal mutex should be locked before this function is called.
+	**/
+	function wait():Void;
+
+	/**
+		Unblocks one of the threads that are blocked on the
+		condition variable at the time of the call. If no threads are blocked
+		on the condition variable at the time of the call, the function does nothing.
+	**/
+	function signal():Void;
+
+	/**
+		Unblocks all of the threads that are blocked on the
+		condition variable at the time of the call. If no threads are blocked
+		on the condition variable at the time of the call, the function does
+		nothing.
+	**/
+	function broadcast():Void;
+}

+ 33 - 0
std/sys/thread/Semaphore.hx

@@ -0,0 +1,33 @@
+package sys.thread;
+
+#if (!target.threaded)
+#error "This class is not available on this target"
+#end
+@:coreApi extern class Semaphore {
+	/**
+		Creates a new semaphore with an initial value.
+	**/
+	public function new(value:Int):Void;
+
+	/**
+		Locks the semaphore.
+		If the value of the semaphore is zero, then the thread will block until it is able to lock the semaphore.
+		If the value is non-zero, it is decreased by one.
+	**/
+	public function acquire():Void;
+
+	/**
+		Try to lock the semaphore.
+		If the value of the semaphore is zero, `false` is returned, else the value is increased.
+
+		If `timeout` is specified, this function will block until the thread is able to acquire the semaphore, or the timout expires.
+		`timeout` is in seconds.
+	**/
+	public function tryAcquire(?timeout:Float):Bool;
+
+	/**
+		Release the semaphore.
+		The value of the semaphore is increased by one.
+	**/
+	public function release():Void;
+}

+ 24 - 0
tests/threads/src/cases/TestCondition.hx

@@ -0,0 +1,24 @@
+package cases;
+
+#if !neko
+import sys.thread.Condition;
+import sys.thread.Thread;
+#end
+
+class TestCondition extends utest.Test {
+	#if !neko
+	function test() {
+		final cond = new Condition();
+		final thread = Thread.create(() -> {
+			Sys.sleep(0.01);
+			cond.acquire();
+			cond.signal();
+			cond.release();
+		});
+		cond.acquire();
+		cond.wait();
+		cond.release();
+		utest.Assert.pass();
+	}
+	#end
+}

+ 21 - 0
tests/threads/src/cases/TestSemaphore.hx

@@ -0,0 +1,21 @@
+package cases;
+
+#if !neko
+import sys.thread.Semaphore;
+#end
+
+class TestSemaphore extends utest.Test {
+	#if !neko
+	function test() {
+		var m = new Semaphore(3);
+		m.acquire();
+		m.acquire();
+		isTrue(m.tryAcquire());
+		isFalse(m.tryAcquire());
+		isFalse(m.tryAcquire(0.1));
+		m.release();
+		m.release();
+		m.release();
+	}
+	#end
+}