EventLoop.hx 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. package sys.thread;
  2. import eval.luv.Loop;
  3. import eval.luv.Async;
  4. import eval.luv.Timer as LuvTimer;
  5. @:coreApi
  6. enum NextEventTime {
  7. Now;
  8. Never;
  9. AnyTime(time:Null<Float>);
  10. At(time:Float);
  11. }
  12. abstract EventHandler(RegularEvent) from RegularEvent to RegularEvent {}
  13. private class RegularEvent {
  14. public var timer:Null<LuvTimer>;
  15. public var event:()->Void;
  16. public function new(e:()->Void) {
  17. event = e;
  18. }
  19. public function run() {
  20. event();
  21. }
  22. }
  23. @:coreApi
  24. class EventLoop {
  25. @:allow(eval.luv.Loop)
  26. final handle:Loop;
  27. final mutex = new Mutex();
  28. final oneTimeEvents = new Array<Null<()->Void>>();
  29. var oneTimeEventsIdx = 0;
  30. final wakeup:Async;
  31. var promisedEventsCount = 0;
  32. var pending:Array<()->Void> = [];
  33. var looping = false;
  34. public function new():Void {
  35. handle = Loop.init().resolve();
  36. wakeup = Async.init(handle, consumePending).resolve();
  37. wakeup.unref();
  38. }
  39. public function repeat(event:()->Void, intervalMs:Int):EventHandler {
  40. var e = new RegularEvent(event);
  41. mutex.acquire();
  42. pending.push(() -> {
  43. e.timer = LuvTimer.init(handle).resolve();
  44. e.timer.start(e.run, intervalMs, intervalMs < 1 ? 1 : intervalMs).resolve();
  45. });
  46. mutex.release();
  47. wakeup.send();
  48. return e;
  49. }
  50. public function cancel(eventHandler:EventHandler):Void {
  51. mutex.acquire();
  52. (eventHandler:RegularEvent).event = noop;
  53. pending.push(() -> {
  54. var timer = (eventHandler:RegularEvent).timer;
  55. timer.stop().resolve();
  56. timer.close(noop);
  57. });
  58. mutex.release();
  59. wakeup.send();
  60. }
  61. static final noop = function() {}
  62. public function promise():Void {
  63. mutex.acquire();
  64. ++promisedEventsCount;
  65. pending.push(refUnref);
  66. mutex.release();
  67. wakeup.send();
  68. }
  69. public function run(event:()->Void):Void {
  70. mutex.acquire();
  71. pending.push(event);
  72. mutex.release();
  73. wakeup.send();
  74. }
  75. public function runPromised(event:()->Void):Void {
  76. mutex.acquire();
  77. --promisedEventsCount;
  78. pending.push(refUnref);
  79. pending.push(event);
  80. mutex.release();
  81. wakeup.send();
  82. }
  83. function refUnref():Void {
  84. if(promisedEventsCount > 0) {
  85. wakeup.ref();
  86. } else {
  87. wakeup.unref();
  88. }
  89. }
  90. public function progress():NextEventTime {
  91. //TODO: throw if loop is already running
  92. if((handle:Loop).run(NOWAIT)) {
  93. return AnyTime(null);
  94. } else {
  95. return Never;
  96. }
  97. }
  98. public function wait(?timeout:Float):Bool {
  99. //TODO: throw if loop is already running
  100. if(timeout == null) {
  101. var timer = LuvTimer.init(handle).resolve();
  102. timer.start(() -> {
  103. timer.stop().resolve();
  104. timer.close(() -> {});
  105. }, Std.int(timeout * 1000));
  106. return (handle:Loop).run(ONCE);
  107. } else {
  108. return (handle:Loop).run(ONCE);
  109. }
  110. }
  111. public function loop():Void {
  112. //TODO: throw if loop is already running
  113. consumePending();
  114. (handle:Loop).run(DEFAULT);
  115. }
  116. function consumePending(?_:Async):Void {
  117. var p = pending;
  118. pending = [];
  119. for(fn in p) fn();
  120. }
  121. }