| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345 |
- package unit;
- #if neko
- import neko.vm.Thread;
- import neko.vm.Deque;
- import neko.vm.Lock;
- import neko.vm.Tls;
- import neko.vm.Mutex;
- #elseif cpp
- import cpp.vm.Thread;
- import cpp.vm.Deque;
- import cpp.vm.Lock;
- import cpp.vm.Tls;
- import cpp.vm.Mutex;
- #elseif java
- import java.vm.Thread;
- import java.vm.Deque;
- import java.vm.Lock;
- import java.vm.Tls;
- import java.vm.Mutex;
- #end
- class TestThreads extends Test
- {
- private function testSort()
- {
- var ts = new ThreadSort();
- #if java
- ts.maxVal *= 10;
- #end
- for (creatorWait in [.02, .05, 0])
- for (creatorLoad in [false, true])
- for (consumerWait in [.02,.05,0])
- for (useTls in [false,true])
- for (q in [new QDeque() #if java, new QLockFree()#end])
- for (lock in [ [new DequeSemaphore(), new LockSemaphore()], [new LockSemaphore(), new DequeSemaphore() ] ])
- {
- ts.creatorWait = creatorWait;
- ts.creatorLoad = creatorLoad;
- ts.consumerWait = consumerWait;
- ts.useTls = useTls;
- ts.queue = q;
- var lock1 = lock[0], lock2 = lock[1];
- ts.lock1 = lock1;
- ts.lock2 = lock2;
- try
- {
- ts.run();
- t(true);
- }
- catch(e:Dynamic)
- {
- Test.report('Error $e for parameters: $creatorWait, $creatorLoad, $consumerWait, $useTls, $q, $lock1, $lock2');
- }
- }
- }
- }
- class ThreadSort
- {
- //params
- public var creatorWait:Seconds = .2;
- //should an extra load be performed on the creator?
- public var creatorLoad:Bool = false;
- public var consumerWait:Seconds = 0;
- //should an extra load be performed on the consumer?
- public var consumerLoad:Bool = false;
- public var useTls:Bool = false;
- public var nThreads:Int = 10;
- public var maxVal:Int = 1000;
- public var queue:QueueStrategy<Int>;
- public var lock1:SemaphoreStrategy;
- public var lock2:SemaphoreStrategy;
- public function new()
- {
- }
- public function run()
- {
- //spawning creators
- lock1.reset(); lock2.reset(); queue.reset();
- lock1.setReleaseCount(nThreads);
- lock2.setReleaseCount(nThreads);
- var finishedMutex = new Mutex();
- finishedMutex.acquire();
- var tls = new Tls();
- for (i in 0...nThreads)
- {
- Thread.create(function() {
- tls.value = i;
- Sys.sleep(creatorWait.secs());
- var i = useTls ? tls.value : i;
- for (j in 1...maxVal)
- {
- if (j % nThreads == i)
- {
- queue.add(j);
- }
- if (j % Std.int(maxVal / 10) == 0 && creatorLoad)
- Sys.sleep(.01);
- }
- //creator thread finished
- lock1.release();
- });
- }
- //spawning consumers
- if (consumerWait != 0)
- Sys.sleep(consumerWait.secs());
- var arr = [];
- for (i in 0...nThreads)
- {
- var myarr = [];
- arr.push(myarr);
- Thread.create(function() {
- var i = 0;
- while(true)
- {
- var val = queue.pop();
- if (val != null) {
- myarr.push(val);
- } else if (finishedMutex.tryAcquire()) {
- finishedMutex.release();
- break;
- }
- if (++i % Std.int(maxVal / 10) == 0 && consumerLoad)
- Sys.sleep(.01);
- }
- lock2.release();
- });
- }
- //wait creators to finish
- lock1.block();
- //release finishedMutex
- finishedMutex.release();
- //wait consumers to finish
- lock2.block();
- //start checking
- var mainarr = [];
- mainarr[maxVal] = maxVal; //prealloc
- for(a in arr)
- {
- for (a in a)
- {
- var val = mainarr[a];
- if (val == a)
- throw 'Same value detected for $val and $a';
- mainarr[a] = a;
- }
- }
- //no repeats, ok!
- for (i in 1...mainarr.length)
- {
- if (i != mainarr[i])
- throw 'No value found for $i and ${mainarr[i]}';
- }
- }
- }
- private interface SemaphoreStrategy //may be released by another thread
- {
- function block():Void; //block until all semaphores are released
- function release():Void;
- function setReleaseCount(i:Int):Void;
- function reset():Void;
- }
- private class DequeSemaphore implements SemaphoreStrategy
- {
- var d:Deque<Bool>;
- var c:Int = 0;
- public function new()
- {
- this.d = new Deque();
- this.c = 0;
- }
- public function block()
- {
- for (i in 0...c)
- {
- d.pop(true);
- }
- this.c = 0;
- }
- public function release()
- {
- d.push(true);
- }
- public function setReleaseCount(c:Int)
- {
- this.c = c;
- }
- public function reset()
- {
- this.d = new Deque();
- this.c = 0;
- }
- @:keep public function toString()
- {
- return "DequeSemaphore";
- }
- }
- private class LockSemaphore implements SemaphoreStrategy
- {
- var l:Lock;
- var c:Int = 0;
- public function new()
- {
- this.l = new Lock();
- this.l.release();
- }
- public function block()
- {
- for(i in 0...c)
- {
- l.wait();
- }
- this.c = 0;
- }
- public function release()
- {
- this.l.release();
- }
- public function setReleaseCount(c:Int)
- {
- this.c = c;
- }
- public function reset()
- {
- this.l = new Lock();
- this.c = 0;
- }
- @:keep public function toString()
- {
- return "LockSemaphore";
- }
- }
- private interface QueueStrategy<T>
- {
- function add(t:T):Void;
- function pop():Null<T>; //not blocking
- function reset():Void;
- }
- private class QDeque<T> implements QueueStrategy<T>
- {
- var q:Deque<T>;
- public function new()
- {
- this.q = new Deque();
- }
- public function add(t:T)
- {
- this.q.add(t);
- }
- public function pop():Null<T>
- {
- return this.q.pop(false);
- }
- public function reset()
- {
- this.q = new Deque();
- }
- @:keep public function toString()
- {
- return "QDeque";
- }
- }
- #if java
- private class QLockFree<T> implements QueueStrategy<T>
- {
- var q:java.vm.AtomicList<T>;
- public function new()
- {
- this.q = new java.vm.AtomicList();
- }
- public function add(t:T)
- {
- this.q.add(t);
- }
- public function pop():Null<T>
- {
- return this.q.pop();
- }
- public function reset()
- {
- this.q = new java.vm.AtomicList();
- }
- @:keep public function toString()
- {
- return "QLockFree";
- }
- }
- #end
- private abstract Seconds(Float) from Float
- {
- public inline function secs():Float
- {
- return this;
- }
- public inline function ms():Float
- {
- return this * 1000;
- }
- }
|