123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203 |
- package sys.thread;
- import eval.luv.Loop;
- import eval.luv.Async;
- import eval.luv.Timer as LuvTimer;
- import haxe.MainLoop;
- /**
- When an event loop has an available event to execute.
- **/
- @:coreApi
- enum NextEventTime {
- /** There's already an event waiting to be executed */
- Now;
- /** No new events are expected. */
- Never;
- /**
- An event is expected to arrive at any time.
- If `time` is specified, then the event will be ready at that time for sure.
- */
- AnyTime(time:Null<Float>);
- /** An event is expected to be ready for execution at `time`. */
- At(time:Float);
- }
- abstract EventHandler(RegularEvent) from RegularEvent to RegularEvent {}
- private class RegularEvent {
- public var timer:Null<LuvTimer>;
- public var event:()->Void;
- public function new(e:()->Void) {
- event = e;
- }
- public function run() {
- event();
- }
- }
- /**
- An event loop implementation used for `sys.thread.Thread`
- **/
- @:coreApi
- class EventLoop {
- @:allow(eval.luv.Loop)
- final handle:Loop;
- final mutex = new Mutex();
- final wakeup:Async;
- var promisedEventsCount = 0;
- var pending:Array<()->Void> = [];
- var started:Bool = false;
- var isMainThread:Bool;
- static var CREATED : Bool;
- public function new():Void {
- isMainThread = !CREATED;
- CREATED = true;
- handle = Loop.init().resolve();
- wakeup = Async.init(handle, consumePending).resolve();
- wakeup.unref();
- }
- /**
- Schedule event for execution every `intervalMs` milliseconds in current loop.
- **/
- public function repeat(event:()->Void, intervalMs:Int):EventHandler {
- var e = new RegularEvent(event);
- mutex.acquire();
- e.timer = LuvTimer.init(handle).resolve();
- e.timer.start(e.run, intervalMs, intervalMs < 1 ? 1 : intervalMs).resolve();
- mutex.release();
- wakeup.send();
- return e;
- }
- /**
- Prevent execution of a previously scheduled event in current loop.
- **/
- public function cancel(eventHandler:EventHandler):Void {
- mutex.acquire();
- (eventHandler:RegularEvent).event = noop;
- pending.push(() -> {
- var timer = (eventHandler:RegularEvent).timer;
- timer.stop().resolve();
- timer.close(noop);
- });
- mutex.release();
- wakeup.send();
- }
- static final noop = function() {}
- /**
- Notify this loop about an upcoming event.
- This makes the thread stay alive and wait for as many events as the number of
- times `.promise()` was called. These events should be added via `.runPromised()`.
- **/
- public function promise():Void {
- mutex.acquire();
- ++promisedEventsCount;
- pending.push(refUnref);
- mutex.release();
- wakeup.send();
- }
- /**
- Execute `event` as soon as possible.
- **/
- public function run(event:()->Void):Void {
- mutex.acquire();
- pending.push(event);
- mutex.release();
- wakeup.send();
- }
- /**
- Add previously promised `event` for execution.
- **/
- public function runPromised(event:()->Void):Void {
- mutex.acquire();
- --promisedEventsCount;
- pending.push(refUnref);
- pending.push(event);
- mutex.release();
- wakeup.send();
- }
- function refUnref():Void {
- if (promisedEventsCount > 0 || (isMainThread && haxe.MainLoop.hasEvents())) {
- wakeup.ref();
- } else {
- wakeup.unref();
- }
- }
- public function progress():NextEventTime {
- if (started) throw "Event loop already started";
- if (handle.run(NOWAIT)) {
- return AnyTime(null);
- } else {
- return Never;
- }
- }
- /**
- Blocks until a new event is added or `timeout` (in seconds) to expires.
- Depending on a target platform this method may also automatically execute arriving
- events while waiting. However if any event is executed it will stop waiting.
- Returns `true` if more events are expected.
- Returns `false` if no more events expected.
- Depending on a target platform this method may be non-reentrant. It must
- not be called from event callbacks.
- **/
- public function wait(?timeout:Float):Bool {
- if (started) throw "Event loop already started";
- if(timeout != null) {
- var timer = LuvTimer.init(handle).resolve();
- timer.start(() -> {
- timer.stop().resolve();
- timer.close(() -> {});
- }, Std.int(timeout * 1000));
- return handle.run(ONCE);
- } else {
- return handle.run(ONCE);
- }
- }
- /**
- Execute all pending events.
- Wait and execute as many events as the number of times `promise()` was called.
- Runs until all repeating events are cancelled and no more events are expected.
- Depending on a target platform this method may be non-reentrant. It must
- not be called from event callbacks.
- **/
- public function loop():Void {
- if (started) throw "Event loop already started";
- started = true;
- consumePending();
- handle.run(DEFAULT);
- }
- function consumePending(?_:Async):Void {
- mutex.acquire();
- var p = pending;
- pending = [];
- mutex.release();
- for(fn in p) fn();
- if (started && isMainThread) {
- var next = @:privateAccess MainLoop.tick();
- if (haxe.MainLoop.hasEvents()) wakeup.send();
- refUnref();
- }
- }
- }
|