EventLoop.hx 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300
  1. package sys.thread;
  2. /**
  3. When an event loop has an available event to execute.
  4. **/
  5. enum NextEventTime {
  6. /** There's already an event waiting to be executed */
  7. Now;
  8. /** No new events are expected. */
  9. Never;
  10. /**
  11. An event is expected to arrive at any time.
  12. If `time` is specified, then the event will be ready at that time for sure.
  13. */
  14. AnyTime(time:Null<Float>);
  15. /** An event is expected to be ready for execution at `time`. */
  16. At(time:Float);
  17. }
  18. /**
  19. An event loop implementation used for `sys.thread.Thread`
  20. **/
  21. @:coreApi
  22. class EventLoop {
  23. final mutex = new Mutex();
  24. final oneTimeEvents = new Array<Null<()->Void>>();
  25. var oneTimeEventsIdx = 0;
  26. final waitLock = new Lock();
  27. var promisedEventsCount = 0;
  28. var regularEvents:Null<RegularEvent>;
  29. var lastRegularEvent:Null<RegularEvent>;
  30. var isMainThread:Bool;
  31. static var CREATED : Bool;
  32. public function new():Void {
  33. isMainThread = !CREATED;
  34. CREATED = true;
  35. }
  36. /**
  37. Schedule event for execution every `intervalMs` milliseconds in current loop.
  38. **/
  39. public function repeat(event:()->Void, intervalMs:Int):EventHandler {
  40. mutex.acquire();
  41. var interval = 0.001 * intervalMs;
  42. var event = new RegularEvent(event, Sys.time() + interval, interval);
  43. inline insertEventByTime(event);
  44. waitLock.release();
  45. mutex.release();
  46. return event;
  47. }
  48. function insertEventByTime(event:RegularEvent):Void {
  49. switch regularEvents {
  50. case null:
  51. regularEvents = event;
  52. lastRegularEvent = null;
  53. case current:
  54. var previous = null;
  55. if (lastRegularEvent != null && lastRegularEvent.nextRunTime <= event.nextRunTime) {
  56. lastRegularEvent.next = event;
  57. event.previous = lastRegularEvent;
  58. lastRegularEvent = event;
  59. return;
  60. }
  61. while(true) {
  62. if(current == null) {
  63. previous.next = event;
  64. event.previous = previous;
  65. lastRegularEvent = event;
  66. break;
  67. } else if(event.nextRunTime < current.nextRunTime) {
  68. event.next = current;
  69. current.previous = event;
  70. switch previous {
  71. case null:
  72. regularEvents = event;
  73. case _:
  74. event.previous = previous;
  75. previous.next = event;
  76. current.previous = event;
  77. }
  78. break;
  79. } else {
  80. previous = current;
  81. current = current.next;
  82. }
  83. }
  84. }
  85. }
  86. /**
  87. Prevent execution of a previously scheduled event in current loop.
  88. **/
  89. public function cancel(eventHandler:EventHandler):Void {
  90. mutex.acquire();
  91. var event:RegularEvent = eventHandler;
  92. event.cancelled = true;
  93. if(regularEvents == event) {
  94. regularEvents = event.next;
  95. }
  96. switch event.next {
  97. case null:
  98. case e: e.previous = event.previous;
  99. }
  100. switch event.previous {
  101. case null:
  102. case e: e.next = event.next;
  103. }
  104. event.next = event.previous = null;
  105. mutex.release();
  106. }
  107. /**
  108. Notify this loop about an upcoming event.
  109. This makes the thread stay alive and wait for as many events as the number of
  110. times `.promise()` was called. These events should be added via `.runPromised()`.
  111. **/
  112. public function promise():Void {
  113. mutex.acquire();
  114. ++promisedEventsCount;
  115. mutex.release();
  116. }
  117. /**
  118. Execute `event` as soon as possible.
  119. **/
  120. public function run(event:()->Void):Void {
  121. mutex.acquire();
  122. oneTimeEvents[oneTimeEventsIdx++] = event;
  123. waitLock.release();
  124. mutex.release();
  125. }
  126. /**
  127. Add previously promised `event` for execution.
  128. **/
  129. public function runPromised(event:()->Void):Void {
  130. mutex.acquire();
  131. oneTimeEvents[oneTimeEventsIdx++] = event;
  132. --promisedEventsCount;
  133. waitLock.release();
  134. mutex.release();
  135. }
  136. /**
  137. Executes all pending events.
  138. The returned time stamps can be used with `Sys.time()` for calculations.
  139. Depending on a target platform this method may be non-reentrant. It must
  140. not be called from event callbacks.
  141. **/
  142. public function progress():NextEventTime {
  143. return switch __progress(Sys.time(), [], []) {
  144. case {nextEventAt:-2}: Now;
  145. case {nextEventAt:-1, anyTime:false}: Never;
  146. case {nextEventAt:-1, anyTime:true}: AnyTime(null);
  147. case {nextEventAt:time, anyTime:true}: AnyTime(time);
  148. case {nextEventAt:time, anyTime:false}: At(time);
  149. }
  150. }
  151. /**
  152. Blocks until a new event is added or `timeout` (in seconds) to expires.
  153. Depending on a target platform this method may also automatically execute arriving
  154. events while waiting. However if any event is executed it will stop waiting.
  155. Returns `true` if more events are expected.
  156. Returns `false` if no more events expected.
  157. Depending on a target platform this method may be non-reentrant. It must
  158. not be called from event callbacks.
  159. **/
  160. public function wait(?timeout:Float):Bool {
  161. return waitLock.wait(timeout);
  162. }
  163. /**
  164. Execute all pending events.
  165. Wait and execute as many events as the number of times `promise()` was called.
  166. Runs until all repeating events are cancelled and no more events are expected.
  167. Depending on a target platform this method may be non-reentrant. It must
  168. not be called from event callbacks.
  169. **/
  170. public function loop():Void {
  171. var recycleRegular = [];
  172. var recycleOneTimers = [];
  173. while(true) {
  174. var r = __progress(Sys.time(), recycleRegular, recycleOneTimers);
  175. switch r {
  176. case {nextEventAt:-2}:
  177. case {nextEventAt:-1, anyTime:false}:
  178. break;
  179. case {nextEventAt:-1, anyTime:true}:
  180. waitLock.wait();
  181. case {nextEventAt:time}:
  182. var timeout = time - Sys.time();
  183. waitLock.wait(Math.max(0, timeout));
  184. }
  185. }
  186. }
  187. /**
  188. `.progress` implementation with a reusable array for internal usage.
  189. The `nextEventAt` field of the return value denotes when the next event
  190. is expected to run:
  191. * -1 - never
  192. * -2 - now
  193. * other values - at specified time
  194. **/
  195. inline function __progress(now:Float, recycleRegular:Array<RegularEvent>, recycleOneTimers:Array<()->Void>):{nextEventAt:Float, anyTime:Bool} {
  196. var regularsToRun = recycleRegular;
  197. var eventsToRunIdx = 0;
  198. // When the next event is expected to run
  199. var nextEventAt:Float = -1;
  200. mutex.acquire();
  201. //reset waitLock
  202. while(waitLock.wait(0.0)) {}
  203. // Collect regular events to run
  204. var current = regularEvents;
  205. while(current != null) {
  206. if(current.nextRunTime <= now) {
  207. regularsToRun[eventsToRunIdx++] = current;
  208. current.nextRunTime += current.interval;
  209. nextEventAt = -2;
  210. } else if(nextEventAt == -1 || current.nextRunTime < nextEventAt) {
  211. nextEventAt = current.nextRunTime;
  212. }
  213. current = current.next;
  214. }
  215. mutex.release();
  216. // Run regular events
  217. for(i in 0...eventsToRunIdx) {
  218. if(!regularsToRun[i].cancelled)
  219. regularsToRun[i].run();
  220. regularsToRun[i] = null;
  221. }
  222. eventsToRunIdx = 0;
  223. var oneTimersToRun = recycleOneTimers;
  224. // Collect pending one-time events
  225. mutex.acquire();
  226. for(i => event in oneTimeEvents) {
  227. switch event {
  228. case null:
  229. break;
  230. case _:
  231. oneTimersToRun[eventsToRunIdx++] = event;
  232. oneTimeEvents[i] = null;
  233. }
  234. }
  235. oneTimeEventsIdx = 0;
  236. var hasPromisedEvents = promisedEventsCount > 0;
  237. mutex.release();
  238. //run events
  239. for(i in 0...eventsToRunIdx) {
  240. oneTimersToRun[i]();
  241. oneTimersToRun[i] = null;
  242. }
  243. // run main events
  244. if( isMainThread ) {
  245. var next = @:privateAccess haxe.MainLoop.tick();
  246. if( haxe.MainLoop.hasEvents() ) {
  247. eventsToRunIdx++;
  248. if( nextEventAt > next )
  249. nextEventAt = next;
  250. }
  251. }
  252. // Some events were executed. They could add new events to run.
  253. if(eventsToRunIdx > 0) {
  254. nextEventAt = -2;
  255. }
  256. return {nextEventAt:nextEventAt, anyTime:hasPromisedEvents}
  257. }
  258. }
  259. abstract EventHandler(RegularEvent) from RegularEvent to RegularEvent {}
  260. private class RegularEvent {
  261. public var nextRunTime:Float;
  262. public final interval:Float;
  263. public final run:()->Void;
  264. public var next:Null<RegularEvent>;
  265. public var previous:Null<RegularEvent>;
  266. public var cancelled:Bool = false;
  267. public function new(run:()->Void, nextRunTime:Float, interval:Float) {
  268. this.run = run;
  269. this.nextRunTime = nextRunTime;
  270. this.interval = interval;
  271. }
  272. }