EventLoop.hx 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499
  1. package haxe;
  2. import haxe.EntryPoint;
  3. class Event {
  4. var prev : Event;
  5. var next : Event;
  6. var callb : Void -> Void;
  7. /**
  8. The EventLoop our event is part of.
  9. **/
  10. public var loop(default,null) : EventLoop;
  11. /**
  12. The event priority. Events will be executed in order of priority (highest first).
  13. **/
  14. public var priority : Int;
  15. /**
  16. Tells if an event is blocking. It means the event loop won't return from `loop()` until this event has been stopped.
  17. **/
  18. public var isBlocking : Bool = true;
  19. var toRemove : Bool;
  20. var nextRun : Float = Math.NEGATIVE_INFINITY;
  21. function new(loop, p) {
  22. this.loop = loop;
  23. this.priority = p;
  24. }
  25. /**
  26. Delay the execution of the event for the given time, in seconds.
  27. If t is null, the event will be run at next event loop.
  28. **/
  29. public function delay(t:Null<Float>,fromLastRun=false) {
  30. nextRun = t == null ? Math.NEGATIVE_INFINITY : (fromLastRun && Math.isFinite(nextRun) ? nextRun : haxe.Timer.stamp()) + t;
  31. }
  32. /**
  33. Stop this event from repeating.
  34. **/
  35. public function stop() {
  36. @:privateAccess loop.stop(this);
  37. }
  38. /**
  39. Start the event with the given callback function.
  40. **/
  41. public function start( callb : Void -> Void ) {
  42. this.callb = callb;
  43. @:privateAccess loop.start(this);
  44. }
  45. /**
  46. Tells if the event has been stopped.
  47. **/
  48. public function isStopped() {
  49. return toRemove || (prev == null && @:privateAccess loop.events != this);
  50. }
  51. }
  52. /**
  53. Handles async events for all threads
  54. **/
  55. @:access(haxe.Event)
  56. class EventLoop {
  57. /**
  58. This is the main thread event loop.
  59. **/
  60. public static var main(get,null) : EventLoop;
  61. /**
  62. This is the current thread event loop. For platforms that doesn't support threads
  63. it is the same as `main`.
  64. **/
  65. public static var current(get,never) : EventLoop;
  66. var events : Event;
  67. var inLoop : Bool;
  68. var hasPendingRemove : Bool;
  69. var promiseCount : Int = 0;
  70. #if target.threaded
  71. var mutex : sys.thread.Mutex;
  72. var lockTime : sys.thread.Lock;
  73. /**
  74. The reference thread for this loop. If set, the loop can only be run within this thread.
  75. **/
  76. public var thread : sys.thread.Thread;
  77. #end
  78. #if hl
  79. var uvLoop : hl.uv.Loop;
  80. var inUV : Bool;
  81. #end
  82. public function new() {
  83. #if target.threaded
  84. mutex = new sys.thread.Mutex();
  85. lockTime = new sys.thread.Lock();
  86. #end
  87. }
  88. #if hl
  89. function getUVLoop() {
  90. if( uvLoop == null ) {
  91. if( this == main )
  92. uvLoop = @:privateAccess hl.uv.Loop.default_loop();
  93. else {
  94. #if (hl_ver < version("1.16.0"))
  95. throw "Using libUV multithread requires -D hl-ver=1.16.0";
  96. #else
  97. uvLoop = hl.uv.Loop.create();
  98. #end
  99. }
  100. }
  101. return uvLoop;
  102. }
  103. #end
  104. /**
  105. This should be called after you are finished with a custom event loop.
  106. It is already automatically called for threads loops.
  107. **/
  108. public function dispose() {
  109. #if hl
  110. if( uvLoop != null && uvLoop.close() != 0 ) Sys.println("Some async handlers have not been closed");
  111. #end
  112. }
  113. /**
  114. Runs until all the blocking events have been stopped.
  115. If this is called on the main thread, also wait for all blocking threads to finish.
  116. **/
  117. public function loop() {
  118. checkThread();
  119. while( hasEvents(true) || promiseCount > 0 || (this == main && hasRunningThreads()) ) {
  120. var time = getNextTick();
  121. #if hl
  122. // disable wait if we have our uvloop alive
  123. if( uvLoop != null && time > 0 && uvLoop.alive() > 0 )
  124. time = -1;
  125. #end
  126. if( time > 0 ) {
  127. wait(time);
  128. continue;
  129. }
  130. loopOnce(false);
  131. }
  132. }
  133. /**
  134. Promise a possible future event addition. This will prevent the `loop()` from terminating until the matching number of `deliver()` calls have been made.
  135. **/
  136. public function promise() {
  137. lock();
  138. promiseCount++;
  139. unlock();
  140. }
  141. /**
  142. Deliver after a `promise()`. This will throw an exception if more `deliver()` calls has been made than corresponding `promise()` calls.
  143. **/
  144. public function deliver() {
  145. lock();
  146. if( promiseCount == 0 ) {
  147. unlock();
  148. throw "Too many calls to deliver()";
  149. }
  150. promiseCount--;
  151. unlock();
  152. if( promiseCount == 0 )
  153. wakeup();
  154. }
  155. inline function wakeup() {
  156. #if target.threaded
  157. lockTime.release();
  158. #end
  159. }
  160. inline function wait( time : Float ) {
  161. #if target.threaded
  162. lockTime.wait(time);
  163. #elseif sys
  164. Sys.sleep(time);
  165. #end
  166. }
  167. inline function lock() {
  168. #if target.threaded
  169. mutex.acquire();
  170. #end
  171. }
  172. inline function unlock() {
  173. #if target.threaded
  174. mutex.release();
  175. #end
  176. }
  177. function checkThread() {
  178. #if target.threaded
  179. if( thread != null && thread != sys.thread.Thread.current() ) throw "You can't run this loop from a different thread";
  180. #end
  181. }
  182. /**
  183. Perform an update of pending events.
  184. By default, an event loop from a thread can only be triggered from this thread.
  185. You can set `threadCheck` to false in the rare cases you might want otherwise.
  186. **/
  187. public function loopOnce( threadCheck = true ) {
  188. if( threadCheck )
  189. checkThread();
  190. #if hl
  191. if( inUV ) throw "You cannot callback EventLoop.loop() while in uv event callback";
  192. #end
  193. lock();
  194. sortEvents();
  195. var current = events; // protect from further add()
  196. inLoop = true;
  197. unlock();
  198. #if hl
  199. if( uvLoop != null ) {
  200. inUV = true;
  201. uvLoop.run(NoWait);
  202. inUV = false;
  203. }
  204. #end
  205. // if inLoop turns false, stop because we had reentrency
  206. var time = haxe.Timer.stamp();
  207. while( inLoop && current != null ) {
  208. var n = current.next;
  209. if( current.nextRun <= time && !current.toRemove )
  210. current.callb();
  211. current = n;
  212. }
  213. lock();
  214. inLoop = false;
  215. if( hasPendingRemove ) {
  216. hasPendingRemove = false;
  217. var e = events;
  218. while( e != null ) {
  219. var n = e.next;
  220. if( e.toRemove ) stop(e);
  221. e = n;
  222. }
  223. }
  224. unlock();
  225. }
  226. /**
  227. Add a callback to be run at each loop of the event loop.
  228. **/
  229. public function add( callb : Void -> Void, priority = 0 ) : Event {
  230. var e = new Event(this,priority);
  231. e.start(callb);
  232. return e;
  233. }
  234. /**
  235. Similar to `add` but will return the Event before it's started.
  236. This is useful if you wish to hold a reference of another thread Event loop
  237. before it runs.
  238. **/
  239. public function addAsync( priority = 0 ) {
  240. return new Event(this,priority);
  241. }
  242. /**
  243. Add a callback to be run every `delay` seconds until stopped
  244. **/
  245. public function addTimer( callb : Void -> Void, delay : Float, priority = 0 ) : Event {
  246. var e : Event = new Event(this,priority);
  247. e.delay(delay);
  248. e.start(function() { e.delay(delay,true); callb(); });
  249. return e;
  250. }
  251. @:deprecated @:noCompletion public function repeat( callb, delay : Int ) {
  252. return addTimer(callb,delay/1000);
  253. }
  254. @:deprecated @:noCompletion public function cancel( e : Event ) {
  255. e.stop();
  256. }
  257. /**
  258. Add a function to be run once at next loop of the event loop.
  259. **/
  260. public function run( callb : Void -> Void, priority = 0 ) : Event {
  261. var e : Event = new Event(this,priority);
  262. e.start(function() { e.stop(); callb(); });
  263. return e;
  264. }
  265. function start( e : Event ) {
  266. lock();
  267. e.toRemove = false;
  268. if( !e.isStopped() ) {
  269. unlock();
  270. return;
  271. }
  272. if( events != null )
  273. events.prev = e;
  274. e.next = events;
  275. events = e;
  276. wakeup();
  277. unlock();
  278. }
  279. function stop( e : Event ) {
  280. lock();
  281. if( inLoop ) {
  282. // prevent remove while in loopOnce()
  283. e.toRemove = true;
  284. hasPendingRemove = true;
  285. unlock();
  286. return;
  287. }
  288. e.toRemove = false;
  289. if( e.isStopped() ) {
  290. unlock();
  291. return;
  292. }
  293. if( events == e )
  294. events = e.next;
  295. else if( e.prev != null )
  296. e.prev.next = e.next;
  297. if( e.next != null ) {
  298. e.next.prev = e.prev;
  299. e.next = null;
  300. }
  301. e.prev = null;
  302. wakeup();
  303. unlock();
  304. }
  305. function getNextTick() : Float {
  306. lock();
  307. if( events == null ) {
  308. unlock();
  309. return 1e9;
  310. }
  311. var now = haxe.Timer.stamp();
  312. var e = events;
  313. var next = Math.POSITIVE_INFINITY;
  314. while( e != null ) {
  315. if( e.nextRun <= now ) {
  316. unlock();
  317. return -1;
  318. }
  319. if( e.nextRun < next )
  320. next = e.nextRun;
  321. e = e.next;
  322. }
  323. unlock();
  324. return next - now;
  325. }
  326. function sortEvents() {
  327. // pending = haxe.ds.ListSort.sort(pending, function(e1, e2) return e1.nextRun > e2.nextRun ? -1 : 1);
  328. // we can't use directly ListSort because it requires prev/next to be public, which we don't want here
  329. // we do then a manual inline, this also allow use to do a Float comparison of nextRun
  330. lock();
  331. var list = events;
  332. if (list == null) {
  333. unlock();
  334. return;
  335. }
  336. var insize = 1, nmerges, psize = 0, qsize = 0;
  337. var p, q, e, tail:Event;
  338. while (true) {
  339. p = list;
  340. list = null;
  341. tail = null;
  342. nmerges = 0;
  343. while (p != null) {
  344. nmerges++;
  345. q = p;
  346. psize = 0;
  347. for (i in 0...insize) {
  348. psize++;
  349. q = q.next;
  350. if (q == null)
  351. break;
  352. }
  353. qsize = insize;
  354. while (psize > 0 || (qsize > 0 && q != null)) {
  355. if (psize == 0) {
  356. e = q;
  357. q = q.next;
  358. qsize--;
  359. } else if (qsize == 0
  360. || q == null
  361. || (p.priority > q.priority || (p.priority == q.priority && p.nextRun <= q.nextRun))) {
  362. e = p;
  363. p = p.next;
  364. psize--;
  365. } else {
  366. e = q;
  367. q = q.next;
  368. qsize--;
  369. }
  370. if (tail != null)
  371. tail.next = e;
  372. else
  373. list = e;
  374. e.prev = tail;
  375. tail = e;
  376. }
  377. p = q;
  378. }
  379. tail.next = null;
  380. if (nmerges <= 1)
  381. break;
  382. insize *= 2;
  383. }
  384. list.prev = null; // not cycling
  385. events = list;
  386. unlock();
  387. }
  388. /**
  389. Tells if we currently have blocking unfinished threads.
  390. **/
  391. public static function hasRunningThreads() {
  392. #if !target.threaded
  393. return false;
  394. #else
  395. return @:privateAccess sys.thread.Thread.hasBlocking();
  396. #end
  397. }
  398. /**
  399. Tells if the event loop has remaining events.
  400. If blocking is set to true, only check if it has remaining blocking events.
  401. **/
  402. public function hasEvents( blocking : Bool = true ) {
  403. #if hl
  404. if( uvLoop != null && uvLoop.alive() > 0 )
  405. return true;
  406. #end
  407. if( !blocking )
  408. return events != null;
  409. lock();
  410. var e = events;
  411. while( e != null ) {
  412. if( e.isBlocking ) {
  413. unlock();
  414. return true;
  415. }
  416. e = e.next;
  417. }
  418. unlock();
  419. return false;
  420. }
  421. /**
  422. Add a task to be run either on another thread or as part of the main event loop if the
  423. platform does not support threads.
  424. **/
  425. public static function addTask( f : Void -> Void, blocking = true ) {
  426. #if target.threaded
  427. sys.thread.Thread.create(f).isBlocking = blocking;
  428. #else
  429. main.add(f).isBlocking = blocking;
  430. #end
  431. }
  432. static function get_current() {
  433. #if target.threaded
  434. var events = sys.thread.Thread.current().events;
  435. if( events == null ) throw "The current thread doesn't have an event loop.";
  436. return events;
  437. #else
  438. return main;
  439. #end
  440. }
  441. static function get_main() {
  442. if( main == null ) main = new EventLoop();
  443. return main;
  444. }
  445. }