EventLoop.hx 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203
  1. package sys.thread;
  2. import eval.luv.Loop;
  3. import eval.luv.Async;
  4. import eval.luv.Timer as LuvTimer;
  5. import haxe.MainLoop;
  6. /**
  7. When an event loop has an available event to execute.
  8. **/
  9. @:coreApi
  10. enum NextEventTime {
  11. /** There's already an event waiting to be executed */
  12. Now;
  13. /** No new events are expected. */
  14. Never;
  15. /**
  16. An event is expected to arrive at any time.
  17. If `time` is specified, then the event will be ready at that time for sure.
  18. */
  19. AnyTime(time:Null<Float>);
  20. /** An event is expected to be ready for execution at `time`. */
  21. At(time:Float);
  22. }
  23. abstract EventHandler(RegularEvent) from RegularEvent to RegularEvent {}
  24. private class RegularEvent {
  25. public var timer:Null<LuvTimer>;
  26. public var event:()->Void;
  27. public function new(e:()->Void) {
  28. event = e;
  29. }
  30. public function run() {
  31. event();
  32. }
  33. }
  34. /**
  35. An event loop implementation used for `sys.thread.Thread`
  36. **/
  37. @:coreApi
  38. class EventLoop {
  39. @:allow(eval.luv.Loop)
  40. final handle:Loop;
  41. final mutex = new Mutex();
  42. final wakeup:Async;
  43. var promisedEventsCount = 0;
  44. var pending:Array<()->Void> = [];
  45. var started:Bool = false;
  46. var isMainThread:Bool;
  47. static var CREATED : Bool;
  48. public function new():Void {
  49. isMainThread = !CREATED;
  50. CREATED = true;
  51. handle = Loop.init().resolve();
  52. wakeup = Async.init(handle, consumePending).resolve();
  53. wakeup.unref();
  54. }
  55. /**
  56. Schedule event for execution every `intervalMs` milliseconds in current loop.
  57. **/
  58. public function repeat(event:()->Void, intervalMs:Int):EventHandler {
  59. var e = new RegularEvent(event);
  60. mutex.acquire();
  61. e.timer = LuvTimer.init(handle).resolve();
  62. e.timer.start(e.run, intervalMs, intervalMs < 1 ? 1 : intervalMs).resolve();
  63. mutex.release();
  64. wakeup.send();
  65. return e;
  66. }
  67. /**
  68. Prevent execution of a previously scheduled event in current loop.
  69. **/
  70. public function cancel(eventHandler:EventHandler):Void {
  71. mutex.acquire();
  72. (eventHandler:RegularEvent).event = noop;
  73. pending.push(() -> {
  74. var timer = (eventHandler:RegularEvent).timer;
  75. timer.stop().resolve();
  76. timer.close(noop);
  77. });
  78. mutex.release();
  79. wakeup.send();
  80. }
  81. static final noop = function() {}
  82. /**
  83. Notify this loop about an upcoming event.
  84. This makes the thread stay alive and wait for as many events as the number of
  85. times `.promise()` was called. These events should be added via `.runPromised()`.
  86. **/
  87. public function promise():Void {
  88. mutex.acquire();
  89. ++promisedEventsCount;
  90. pending.push(refUnref);
  91. mutex.release();
  92. wakeup.send();
  93. }
  94. /**
  95. Execute `event` as soon as possible.
  96. **/
  97. public function run(event:()->Void):Void {
  98. mutex.acquire();
  99. pending.push(event);
  100. mutex.release();
  101. wakeup.send();
  102. }
  103. /**
  104. Add previously promised `event` for execution.
  105. **/
  106. public function runPromised(event:()->Void):Void {
  107. mutex.acquire();
  108. --promisedEventsCount;
  109. pending.push(refUnref);
  110. pending.push(event);
  111. mutex.release();
  112. wakeup.send();
  113. }
  114. function refUnref():Void {
  115. if (promisedEventsCount > 0 || (isMainThread && haxe.MainLoop.hasEvents())) {
  116. wakeup.ref();
  117. } else {
  118. wakeup.unref();
  119. }
  120. }
  121. public function progress():NextEventTime {
  122. if (started) throw "Event loop already started";
  123. if (handle.run(NOWAIT)) {
  124. return AnyTime(null);
  125. } else {
  126. return Never;
  127. }
  128. }
  129. /**
  130. Blocks until a new event is added or `timeout` (in seconds) to expires.
  131. Depending on a target platform this method may also automatically execute arriving
  132. events while waiting. However if any event is executed it will stop waiting.
  133. Returns `true` if more events are expected.
  134. Returns `false` if no more events expected.
  135. Depending on a target platform this method may be non-reentrant. It must
  136. not be called from event callbacks.
  137. **/
  138. public function wait(?timeout:Float):Bool {
  139. if (started) throw "Event loop already started";
  140. if(timeout != null) {
  141. var timer = LuvTimer.init(handle).resolve();
  142. timer.start(() -> {
  143. timer.stop().resolve();
  144. timer.close(() -> {});
  145. }, Std.int(timeout * 1000));
  146. return handle.run(ONCE);
  147. } else {
  148. return handle.run(ONCE);
  149. }
  150. }
  151. /**
  152. Execute all pending events.
  153. Wait and execute as many events as the number of times `promise()` was called.
  154. Runs until all repeating events are cancelled and no more events are expected.
  155. Depending on a target platform this method may be non-reentrant. It must
  156. not be called from event callbacks.
  157. **/
  158. public function loop():Void {
  159. if (started) throw "Event loop already started";
  160. started = true;
  161. consumePending();
  162. handle.run(DEFAULT);
  163. }
  164. function consumePending(?_:Async):Void {
  165. mutex.acquire();
  166. var p = pending;
  167. pending = [];
  168. mutex.release();
  169. for(fn in p) fn();
  170. if (started && isMainThread) {
  171. var next = @:privateAccess MainLoop.tick();
  172. if (haxe.MainLoop.hasEvents()) wakeup.send();
  173. refUnref();
  174. }
  175. }
  176. }