EventLoop.hx 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249
  1. package sys.thread;
  2. /**
  3. When an event loop has an available event to execute.
  4. **/
  5. enum NextEventTime {
  6. /** There's already an event waiting to be executed */
  7. Now;
  8. /** No new events are expected. */
  9. Never;
  10. /**
  11. An event is expected to arrive at any time.
  12. If `time` is specified, then the event will be ready at that time for sure.
  13. */
  14. AnyTime(time:Null<Float>);
  15. /** An event is expected to be ready for execution at `time`. */
  16. At(time:Float);
  17. }
  18. abstract EventLoopHandle(Int) {}
  19. /**
  20. An event loop implementation used for `sys.thread.Thread`
  21. **/
  22. @:coreApi
  23. class EventLoop {
  24. public final handle:EventLoopHandle = cast 0;
  25. final mutex = new Mutex();
  26. final oneTimeEvents = new Array<Null<()->Void>>();
  27. var oneTimeEventsIdx = 0;
  28. final waitLock = new Lock();
  29. var promisedEventsCount = 0;
  30. var regularEvents:Null<RegularEvent>;
  31. public function new():Void {}
  32. /**
  33. Schedule event for execution every `intervalMs` milliseconds in current loop.
  34. **/
  35. public function repeat(event:()->Void, intervalMs:Int):EventHandler {
  36. mutex.acquire();
  37. var interval = 0.001 * intervalMs;
  38. var event = new RegularEvent(event, Sys.time() + interval, interval);
  39. switch regularEvents {
  40. case null:
  41. case current:
  42. event.next = current;
  43. current.previous = event;
  44. }
  45. regularEvents = event;
  46. waitLock.release();
  47. mutex.release();
  48. return event;
  49. }
  50. /**
  51. Prevent execution of a previousely scheduled event in current loop.
  52. **/
  53. public function cancel(eventHandler:EventHandler):Void {
  54. mutex.acquire();
  55. var event:RegularEvent = eventHandler;
  56. if(regularEvents == event) {
  57. regularEvents = event.next;
  58. }
  59. switch event.next {
  60. case null:
  61. case e: e.previous = event.previous;
  62. }
  63. switch event.previous {
  64. case null:
  65. case e: e.next = event.next;
  66. }
  67. mutex.release();
  68. }
  69. /**
  70. Notify this loop about an upcoming event.
  71. This makes the thread to stay alive and wait for as many events as many times
  72. `.promise()` was called. These events should be added via `.runPromised()`
  73. **/
  74. public function promise():Void {
  75. mutex.acquire();
  76. ++promisedEventsCount;
  77. mutex.release();
  78. }
  79. /**
  80. Execute `event` as soon as possible.
  81. **/
  82. public function run(event:()->Void):Void {
  83. mutex.acquire();
  84. oneTimeEvents[oneTimeEventsIdx++] = event;
  85. waitLock.release();
  86. mutex.release();
  87. }
  88. /**
  89. Add previously promised `event` for execution.
  90. **/
  91. public function runPromised(event:()->Void):Void {
  92. mutex.acquire();
  93. oneTimeEvents[oneTimeEventsIdx++] = event;
  94. --promisedEventsCount;
  95. waitLock.release();
  96. mutex.release();
  97. }
  98. /**
  99. Executes all pending events.
  100. The returned time stamps can be used with `Sys.time()` for calculations.
  101. Depending on a target platform this method may be non-reentrant. It must
  102. not be called from event callbacks.
  103. **/
  104. public function progress():NextEventTime {
  105. return switch __progress(Sys.time(), []) {
  106. case {nextEventAt:-2}: Now;
  107. case {nextEventAt:-1, anyTime:false}: Never;
  108. case {nextEventAt:-1, anyTime:true}: AnyTime(null);
  109. case {nextEventAt:time, anyTime:true}: AnyTime(time);
  110. case {nextEventAt:time, anyTime:false}: At(time);
  111. }
  112. }
  113. /**
  114. Blocks until a new event is added or `timeout` (in seconds) to expires.
  115. Depending on a target platform this method may also automatically execute arriving
  116. events while waiting. However if any event is executed it will stop waiting.
  117. Returns `true` if more events are expected.
  118. Returns `false` if no more events expected.
  119. Depending on a target platform this method may be non-reentrant. It must
  120. not be called from event callbacks.
  121. **/
  122. public function wait(?timeout:Float):Bool {
  123. return waitLock.wait(timeout);
  124. }
  125. /**
  126. Execute all pending events.
  127. Wait and execute as many events as many times `promiseEvent()` was called.
  128. Runs until all repeating events are cancelled and no more events is expected.
  129. Depending on a target platform this method may be non-reentrant. It must
  130. not be called from event callbacks.
  131. **/
  132. public function loop():Void {
  133. var events = [];
  134. while(true) {
  135. var r = __progress(Sys.time(), events);
  136. switch r {
  137. case {nextEventAt:-2}:
  138. case {nextEventAt:-1, anyTime:false}:
  139. break;
  140. case {nextEventAt:-1, anyTime:true}:
  141. waitLock.wait();
  142. case {nextEventAt:time}:
  143. var timeout = time - Sys.time();
  144. waitLock.wait(Math.max(0, timeout));
  145. }
  146. }
  147. }
  148. /**
  149. `.pogress` implementation with a resuable array for internal usage.
  150. The `nextEventAt` field of the return value denotes when the next event
  151. is expected to run:
  152. * -1 - never
  153. * -2 - now
  154. * other values - at specified time
  155. **/
  156. inline function __progress(now:Float, recycle:Array<()->Void>):{nextEventAt:Float, anyTime:Bool} {
  157. var eventsToRun = recycle;
  158. var eventsToRunIdx = 0;
  159. // When the next event is expected to run
  160. var nextEventAt:Float = -1;
  161. mutex.acquire();
  162. //reset waitLock
  163. while(waitLock.wait(0.0)) {}
  164. // Collect regular events to run
  165. var current = regularEvents;
  166. while(current != null) {
  167. if(current.nextRunTime <= now) {
  168. eventsToRun[eventsToRunIdx++] = current.run;
  169. current.nextRunTime += current.interval;
  170. nextEventAt = -2;
  171. } else if(nextEventAt == -1 || current.nextRunTime < nextEventAt) {
  172. nextEventAt = current.nextRunTime;
  173. }
  174. current = current.next;
  175. }
  176. mutex.release();
  177. // Run regular events
  178. for(i in 0...eventsToRunIdx) {
  179. eventsToRun[i]();
  180. eventsToRun[i] = null;
  181. }
  182. eventsToRunIdx = 0;
  183. // Collect pending one-time events
  184. mutex.acquire();
  185. for(i => event in oneTimeEvents) {
  186. switch event {
  187. case null:
  188. break;
  189. case _:
  190. eventsToRun[eventsToRunIdx++] = event;
  191. oneTimeEvents[i] = null;
  192. }
  193. }
  194. oneTimeEventsIdx = 0;
  195. var hasPromisedEvents = promisedEventsCount > 0;
  196. mutex.release();
  197. //run events
  198. for(i in 0...eventsToRunIdx) {
  199. eventsToRun[i]();
  200. eventsToRun[i] = null;
  201. }
  202. // Some events were executed. They could add new events to run.
  203. if(eventsToRunIdx > 0) {
  204. nextEventAt = -2;
  205. }
  206. return {nextEventAt:nextEventAt, anyTime:hasPromisedEvents}
  207. }
  208. }
  209. abstract EventHandler(RegularEvent) from RegularEvent to RegularEvent {}
  210. private class RegularEvent {
  211. public var nextRunTime:Float;
  212. public final interval:Float;
  213. public final run:()->Void;
  214. public var next:Null<RegularEvent>;
  215. public var previous:Null<RegularEvent>;
  216. public function new(run:()->Void, nextRunTime:Float, interval:Float) {
  217. this.run = run;
  218. this.nextRunTime = nextRunTime;
  219. this.interval = interval;
  220. }
  221. }