EventLoop.hx 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276
  1. package sys.thread;
  2. /**
  3. When an event loop has an available event to execute.
  4. **/
  5. enum NextEventTime {
  6. /** There's already an event waiting to be executed */
  7. Now;
  8. /** No new events are expected. */
  9. Never;
  10. /**
  11. An event is expected to arrive at any time.
  12. If `time` is specified, then the event will be ready at that time for sure.
  13. */
  14. AnyTime(time:Null<Float>);
  15. /** An event is expected to be ready for execution at `time`. */
  16. At(time:Float);
  17. }
  18. /**
  19. An event loop implementation used for `sys.thread.Thread`
  20. **/
  21. @:coreApi
  22. class EventLoop {
  23. final mutex = new Mutex();
  24. final oneTimeEvents = new Array<Null<()->Void>>();
  25. var oneTimeEventsIdx = 0;
  26. final waitLock = new Lock();
  27. var promisedEventsCount = 0;
  28. var regularEvents:Null<RegularEvent>;
  29. public function new():Void {}
  30. /**
  31. Schedule event for execution every `intervalMs` milliseconds in current loop.
  32. **/
  33. public function repeat(event:()->Void, intervalMs:Int):EventHandler {
  34. mutex.acquire();
  35. var interval = 0.001 * intervalMs;
  36. var event = new RegularEvent(event, Sys.time() + interval, interval);
  37. inline insertEventByTime(event);
  38. waitLock.release();
  39. mutex.release();
  40. return event;
  41. }
  42. function insertEventByTime(event:RegularEvent):Void {
  43. switch regularEvents {
  44. case null:
  45. regularEvents = event;
  46. case current:
  47. var previous = null;
  48. while(true) {
  49. if(current == null) {
  50. previous.next = event;
  51. event.previous = previous;
  52. break;
  53. } else if(event.nextRunTime < current.nextRunTime) {
  54. event.next = current;
  55. current.previous = event;
  56. switch previous {
  57. case null:
  58. regularEvents = event;
  59. case _:
  60. event.previous = previous;
  61. previous.next = event;
  62. current.previous = event;
  63. }
  64. break;
  65. } else {
  66. previous = current;
  67. current = current.next;
  68. }
  69. }
  70. }
  71. }
  72. /**
  73. Prevent execution of a previously scheduled event in current loop.
  74. **/
  75. public function cancel(eventHandler:EventHandler):Void {
  76. mutex.acquire();
  77. var event:RegularEvent = eventHandler;
  78. event.cancelled = true;
  79. if(regularEvents == event) {
  80. regularEvents = event.next;
  81. }
  82. switch event.next {
  83. case null:
  84. case e: e.previous = event.previous;
  85. }
  86. switch event.previous {
  87. case null:
  88. case e: e.next = event.next;
  89. }
  90. event.next = event.previous = null;
  91. mutex.release();
  92. }
  93. /**
  94. Notify this loop about an upcoming event.
  95. This makes the thread to stay alive and wait for as many events as many times
  96. `.promise()` was called. These events should be added via `.runPromised()`
  97. **/
  98. public function promise():Void {
  99. mutex.acquire();
  100. ++promisedEventsCount;
  101. mutex.release();
  102. }
  103. /**
  104. Execute `event` as soon as possible.
  105. **/
  106. public function run(event:()->Void):Void {
  107. mutex.acquire();
  108. oneTimeEvents[oneTimeEventsIdx++] = event;
  109. waitLock.release();
  110. mutex.release();
  111. }
  112. /**
  113. Add previously promised `event` for execution.
  114. **/
  115. public function runPromised(event:()->Void):Void {
  116. mutex.acquire();
  117. oneTimeEvents[oneTimeEventsIdx++] = event;
  118. --promisedEventsCount;
  119. waitLock.release();
  120. mutex.release();
  121. }
  122. /**
  123. Executes all pending events.
  124. The returned time stamps can be used with `Sys.time()` for calculations.
  125. Depending on a target platform this method may be non-reentrant. It must
  126. not be called from event callbacks.
  127. **/
  128. public function progress():NextEventTime {
  129. return switch __progress(Sys.time(), [], []) {
  130. case {nextEventAt:-2}: Now;
  131. case {nextEventAt:-1, anyTime:false}: Never;
  132. case {nextEventAt:-1, anyTime:true}: AnyTime(null);
  133. case {nextEventAt:time, anyTime:true}: AnyTime(time);
  134. case {nextEventAt:time, anyTime:false}: At(time);
  135. }
  136. }
  137. /**
  138. Blocks until a new event is added or `timeout` (in seconds) to expires.
  139. Depending on a target platform this method may also automatically execute arriving
  140. events while waiting. However if any event is executed it will stop waiting.
  141. Returns `true` if more events are expected.
  142. Returns `false` if no more events expected.
  143. Depending on a target platform this method may be non-reentrant. It must
  144. not be called from event callbacks.
  145. **/
  146. public function wait(?timeout:Float):Bool {
  147. return waitLock.wait(timeout);
  148. }
  149. /**
  150. Execute all pending events.
  151. Wait and execute as many events as many times `promiseEvent()` was called.
  152. Runs until all repeating events are cancelled and no more events is expected.
  153. Depending on a target platform this method may be non-reentrant. It must
  154. not be called from event callbacks.
  155. **/
  156. public function loop():Void {
  157. var recycleRegular = [];
  158. var recycleOneTimers = [];
  159. while(true) {
  160. var r = __progress(Sys.time(), recycleRegular, recycleOneTimers);
  161. switch r {
  162. case {nextEventAt:-2}:
  163. case {nextEventAt:-1, anyTime:false}:
  164. break;
  165. case {nextEventAt:-1, anyTime:true}:
  166. waitLock.wait();
  167. case {nextEventAt:time}:
  168. var timeout = time - Sys.time();
  169. waitLock.wait(Math.max(0, timeout));
  170. }
  171. }
  172. }
  173. /**
  174. `.progress` implementation with a reusable array for internal usage.
  175. The `nextEventAt` field of the return value denotes when the next event
  176. is expected to run:
  177. * -1 - never
  178. * -2 - now
  179. * other values - at specified time
  180. **/
  181. inline function __progress(now:Float, recycleRegular:Array<RegularEvent>, recycleOneTimers:Array<()->Void>):{nextEventAt:Float, anyTime:Bool} {
  182. var regularsToRun = recycleRegular;
  183. var eventsToRunIdx = 0;
  184. // When the next event is expected to run
  185. var nextEventAt:Float = -1;
  186. mutex.acquire();
  187. //reset waitLock
  188. while(waitLock.wait(0.0)) {}
  189. // Collect regular events to run
  190. var current = regularEvents;
  191. while(current != null) {
  192. if(current.nextRunTime <= now) {
  193. regularsToRun[eventsToRunIdx++] = current;
  194. current.nextRunTime += current.interval;
  195. nextEventAt = -2;
  196. } else if(nextEventAt == -1 || current.nextRunTime < nextEventAt) {
  197. nextEventAt = current.nextRunTime;
  198. }
  199. current = current.next;
  200. }
  201. mutex.release();
  202. // Run regular events
  203. for(i in 0...eventsToRunIdx) {
  204. if(!regularsToRun[i].cancelled)
  205. regularsToRun[i].run();
  206. regularsToRun[i] = null;
  207. }
  208. eventsToRunIdx = 0;
  209. var oneTimersToRun = recycleOneTimers;
  210. // Collect pending one-time events
  211. mutex.acquire();
  212. for(i => event in oneTimeEvents) {
  213. switch event {
  214. case null:
  215. break;
  216. case _:
  217. oneTimersToRun[eventsToRunIdx++] = event;
  218. oneTimeEvents[i] = null;
  219. }
  220. }
  221. oneTimeEventsIdx = 0;
  222. var hasPromisedEvents = promisedEventsCount > 0;
  223. mutex.release();
  224. //run events
  225. for(i in 0...eventsToRunIdx) {
  226. oneTimersToRun[i]();
  227. oneTimersToRun[i] = null;
  228. }
  229. // Some events were executed. They could add new events to run.
  230. if(eventsToRunIdx > 0) {
  231. nextEventAt = -2;
  232. }
  233. return {nextEventAt:nextEventAt, anyTime:hasPromisedEvents}
  234. }
  235. }
  236. abstract EventHandler(RegularEvent) from RegularEvent to RegularEvent {}
  237. private class RegularEvent {
  238. public var nextRunTime:Float;
  239. public final interval:Float;
  240. public final run:()->Void;
  241. public var next:Null<RegularEvent>;
  242. public var previous:Null<RegularEvent>;
  243. public var cancelled:Bool = false;
  244. public function new(run:()->Void, nextRunTime:Float, interval:Float) {
  245. this.run = run;
  246. this.nextRunTime = nextRunTime;
  247. this.interval = interval;
  248. }
  249. }