EventLoop.hx 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230
  1. package sys.thread;
  2. /**
  3. When an event loop has an available event to execute.
  4. **/
  5. enum NextEventTime {
  6. /** There's already an event waiting to be executed */
  7. Now;
  8. /** No new events are expected. */
  9. Never;
  10. /**
  11. An event is expected to arrive at any time.
  12. If `time` is specified, then the event will be ready at that time for sure.
  13. */
  14. AnyTime(time:Null<Float>);
  15. /** An event is expected to be ready for execution at `time`. */
  16. At(time:Float);
  17. }
  18. /**
  19. An event loop implementation used for `sys.thread.Thread`
  20. **/
  21. class EventLoop {
  22. final mutex = new Mutex();
  23. final oneTimeEvents = new Array<Null<()->Void>>();
  24. var oneTimeEventsIdx = 0;
  25. final waitLock = new Lock();
  26. var promisedEventsCount = 0;
  27. var regularEvents:Null<RegularEvent>;
  28. public function new():Void {}
  29. /**
  30. Schedule event for execution every `intervalMs` milliseconds in current loop.
  31. **/
  32. public function repeat(event:()->Void, intervalMs:Int):EventHandler {
  33. mutex.acquire();
  34. var interval = 0.001 * intervalMs;
  35. var event = new RegularEvent(event, Sys.time() + interval, interval);
  36. switch regularEvents {
  37. case null:
  38. case current:
  39. event.next = current;
  40. current.previous = event;
  41. }
  42. regularEvents = event;
  43. waitLock.release();
  44. mutex.release();
  45. return event;
  46. }
  47. /**
  48. Prevent execution of a previousely scheduled event in current loop.
  49. **/
  50. public function cancel(eventHandler:EventHandler):Void {
  51. mutex.acquire();
  52. var event:RegularEvent = eventHandler;
  53. if(regularEvents == event) {
  54. regularEvents = event.next;
  55. }
  56. switch event.next {
  57. case null:
  58. case e: e.previous = event.previous;
  59. }
  60. switch event.previous {
  61. case null:
  62. case e: e.next = event.next;
  63. }
  64. mutex.release();
  65. }
  66. /**
  67. Notify this loop about an upcoming event.
  68. This makes the thread to stay alive and wait for as many events as many times
  69. `.promise()` was called. These events should be added via `.runPromised()`
  70. **/
  71. public function promise():Void {
  72. mutex.acquire();
  73. ++promisedEventsCount;
  74. mutex.release();
  75. }
  76. /**
  77. Execute `event` as soon as possible.
  78. **/
  79. public function run(event:()->Void):Void {
  80. mutex.acquire();
  81. oneTimeEvents[oneTimeEventsIdx++] = event;
  82. waitLock.release();
  83. mutex.release();
  84. }
  85. /**
  86. Add previously promised `event` for execution.
  87. **/
  88. public function runPromised(event:()->Void):Void {
  89. mutex.acquire();
  90. oneTimeEvents[oneTimeEventsIdx++] = event;
  91. --promisedEventsCount;
  92. waitLock.release();
  93. mutex.release();
  94. }
  95. /**
  96. Executes all pending events.
  97. The returned time stamps can be used with `Sys.time()` for calculations.
  98. **/
  99. public function progress():NextEventTime {
  100. return switch __progress(Sys.time(), []) {
  101. case {nextEventAt:-2}: Now;
  102. case {nextEventAt:-1, anyTime:false}: Never;
  103. case {nextEventAt:-1, anyTime:true}: AnyTime(null);
  104. case {nextEventAt:time, anyTime:true}: AnyTime(time);
  105. case {nextEventAt:time, anyTime:false}: At(time);
  106. }
  107. }
  108. /**
  109. Waits for a new event to be added, or `timeout` (in seconds) to expire.
  110. Returns `true` if an event was added and `false` if a timeout occurs.
  111. **/
  112. public function wait(?timeout:Float) {
  113. return waitLock.wait(timeout);
  114. }
  115. /**
  116. Execute all pending events.
  117. Wait and execute as many events as many times `promiseEvent()` was called.
  118. Runs until all repeating events are cancelled and no more events is expected.
  119. **/
  120. public function loop():Void {
  121. var events = [];
  122. while(true) {
  123. var r = __progress(Sys.time(), events);
  124. switch r {
  125. case {nextEventAt:-2}:
  126. case {nextEventAt:-1, anyTime:false}:
  127. break;
  128. case {nextEventAt:-1, anyTime:true}:
  129. waitLock.wait();
  130. case {nextEventAt:time}:
  131. var timeout = time - Sys.time();
  132. waitLock.wait(Math.max(0, timeout));
  133. }
  134. }
  135. }
  136. /**
  137. `.pogress` implementation with a resuable array for internal usage.
  138. The `nextEventAt` field of the return value denotes when the next event
  139. is expected to run:
  140. * -1 - never
  141. * -2 - now
  142. * other values - at specified time
  143. **/
  144. inline function __progress(now:Float, recycle:Array<()->Void>):{nextEventAt:Float, anyTime:Bool} {
  145. var eventsToRun = recycle;
  146. var eventsToRunIdx = 0;
  147. // When the next event is expected to run
  148. var nextEventAt:Float = -1;
  149. mutex.acquire();
  150. //reset waitLock
  151. while(waitLock.wait(0.0)) {}
  152. // Collect regular events to run
  153. var current = regularEvents;
  154. while(current != null) {
  155. if(current.nextRunTime <= now) {
  156. eventsToRun[eventsToRunIdx++] = current.run;
  157. current.nextRunTime += current.interval;
  158. nextEventAt = -2;
  159. } else if(nextEventAt == -1 || current.nextRunTime < nextEventAt) {
  160. nextEventAt = current.nextRunTime;
  161. }
  162. current = current.next;
  163. }
  164. mutex.release();
  165. // Run regular events
  166. for(i in 0...eventsToRunIdx) {
  167. eventsToRun[i]();
  168. eventsToRun[i] = null;
  169. }
  170. eventsToRunIdx = 0;
  171. // Collect pending one-time events
  172. mutex.acquire();
  173. for(i => event in oneTimeEvents) {
  174. switch event {
  175. case null:
  176. break;
  177. case _:
  178. eventsToRun[eventsToRunIdx++] = event;
  179. oneTimeEvents[i] = null;
  180. }
  181. }
  182. oneTimeEventsIdx = 0;
  183. var hasPromisedEvents = promisedEventsCount > 0;
  184. mutex.release();
  185. //run events
  186. for(i in 0...eventsToRunIdx) {
  187. eventsToRun[i]();
  188. eventsToRun[i] = null;
  189. }
  190. // Some events were executed. They could add new events to run.
  191. if(eventsToRunIdx > 0) {
  192. nextEventAt = -2;
  193. }
  194. return {nextEventAt:nextEventAt, anyTime:hasPromisedEvents}
  195. }
  196. }
  197. abstract EventHandler(RegularEvent) from RegularEvent to RegularEvent {}
  198. private class RegularEvent {
  199. public var nextRunTime:Float;
  200. public final interval:Float;
  201. public final run:()->Void;
  202. public var next:Null<RegularEvent>;
  203. public var previous:Null<RegularEvent>;
  204. public function new(run:()->Void, nextRunTime:Float, interval:Float) {
  205. this.run = run;
  206. this.nextRunTime = nextRunTime;
  207. this.interval = interval;
  208. }
  209. }