test_futures.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683
  1. from panda3d import core
  2. import pytest
  3. import time
  4. import sys
  5. from asyncio.exceptions import TimeoutError, CancelledError
  6. class MockFuture:
  7. _asyncio_future_blocking = False
  8. _state = 'PENDING'
  9. _cancel_return = False
  10. _result = None
  11. def __await__(self):
  12. while self._state == 'PENDING':
  13. yield self
  14. return self.result()
  15. def done(self):
  16. return self._state != 'PENDING'
  17. def cancelled(self):
  18. return self._state == 'CANCELLED'
  19. def cancel(self):
  20. return self._cancel_return
  21. def result(self):
  22. if self._state == 'CANCELLED':
  23. raise CancelledError
  24. return self._result
  25. def test_future_cancelled():
  26. fut = core.AsyncFuture()
  27. assert not fut.done()
  28. assert not fut.cancelled()
  29. fut.cancel()
  30. assert fut.done()
  31. assert fut.cancelled()
  32. with pytest.raises(CancelledError):
  33. fut.result()
  34. # Works more than once
  35. with pytest.raises(CancelledError):
  36. fut.result()
  37. def test_future_timeout():
  38. fut = core.AsyncFuture()
  39. with pytest.raises(TimeoutError):
  40. fut.result(0.001)
  41. # Works more than once
  42. with pytest.raises(TimeoutError):
  43. fut.result(0.001)
  44. @pytest.mark.skipif(not core.Thread.is_threading_supported(),
  45. reason="Threading support disabled")
  46. def test_future_wait():
  47. threading = pytest.importorskip("direct.stdpy.threading")
  48. fut = core.AsyncFuture()
  49. # Launch a thread to set the result value.
  50. def thread_main():
  51. time.sleep(0.001)
  52. fut.set_result(None)
  53. thread = threading.Thread(target=thread_main)
  54. assert not fut.done()
  55. thread.start()
  56. assert fut.result() is None
  57. assert fut.done()
  58. assert not fut.cancelled()
  59. assert fut.result() is None
  60. @pytest.mark.skipif(not core.Thread.is_threading_supported(),
  61. reason="Threading support disabled")
  62. def test_future_wait_timeout():
  63. threading = pytest.importorskip("direct.stdpy.threading")
  64. fut = core.AsyncFuture()
  65. # Launch a thread to set the result value.
  66. def thread_main():
  67. time.sleep(0.001)
  68. fut.set_result(None)
  69. thread = threading.Thread(target=thread_main)
  70. assert not fut.done()
  71. thread.start()
  72. assert fut.result(1.0) is None
  73. assert fut.done()
  74. assert not fut.cancelled()
  75. assert fut.result() is None
  76. @pytest.mark.skipif(not core.Thread.is_threading_supported(),
  77. reason="Threading support disabled")
  78. def test_future_wait_cancel():
  79. threading = pytest.importorskip("direct.stdpy.threading")
  80. fut = core.AsyncFuture()
  81. # Launch a thread to cancel the future.
  82. def thread_main():
  83. time.sleep(0.001)
  84. fut.cancel()
  85. thread = threading.Thread(target=thread_main)
  86. assert not fut.done()
  87. thread.start()
  88. with pytest.raises(CancelledError):
  89. fut.result()
  90. assert fut.done()
  91. assert fut.cancelled()
  92. with pytest.raises(CancelledError):
  93. fut.result()
  94. def test_task_cancel():
  95. task_mgr = core.AsyncTaskManager.get_global_ptr()
  96. task = core.PythonTask(lambda task: task.done)
  97. task_mgr.add(task)
  98. assert not task.done()
  99. task_mgr.remove(task)
  100. assert task.done()
  101. assert task.cancelled()
  102. with pytest.raises(CancelledError):
  103. task.result()
  104. def test_task_cancel_during_run():
  105. task_mgr = core.AsyncTaskManager.get_global_ptr()
  106. task_chain = task_mgr.make_task_chain("test_task_cancel_during_run")
  107. def task_main(task):
  108. task.remove()
  109. # It won't yet be marked done until after it returns.
  110. assert not task.done()
  111. return task.done
  112. task = core.PythonTask(task_main)
  113. task.set_task_chain(task_chain.name)
  114. task_mgr.add(task)
  115. task_chain.wait_for_tasks()
  116. assert task.done()
  117. assert task.cancelled()
  118. with pytest.raises(CancelledError):
  119. task.result()
  120. @pytest.mark.skipif(not core.Thread.is_threading_supported(),
  121. reason="Threading support disabled")
  122. def test_task_cancel_waiting():
  123. # Calling result() in a threaded task chain should cancel the future being
  124. # waited on if the surrounding task is cancelled.
  125. task_mgr = core.AsyncTaskManager.get_global_ptr()
  126. task_chain = task_mgr.make_task_chain("test_task_cancel_waiting")
  127. task_chain.set_num_threads(1)
  128. fut = core.AsyncFuture()
  129. async def task_main(task):
  130. # This will block the thread this task is in until the future is done,
  131. # or until the task is cancelled (which implicitly cancels the future).
  132. fut.result()
  133. return task.done
  134. task = core.PythonTask(task_main, 'task_main')
  135. task.set_task_chain(task_chain.name)
  136. task_mgr.add(task)
  137. task_chain.start_threads()
  138. try:
  139. assert not task.done()
  140. fut.cancel()
  141. task.wait()
  142. assert task.cancelled()
  143. assert fut.cancelled()
  144. finally:
  145. task_chain.stop_threads()
  146. def test_task_cancel_awaiting():
  147. task_mgr = core.AsyncTaskManager.get_global_ptr()
  148. task_chain = task_mgr.make_task_chain("test_task_cancel_awaiting")
  149. fut = core.AsyncFuture()
  150. async def task_main(task):
  151. await fut
  152. return task.done
  153. task = core.PythonTask(task_main, 'task_main')
  154. task.set_task_chain(task_chain.name)
  155. task_mgr.add(task)
  156. task_chain.poll()
  157. assert not task.done()
  158. task_chain.poll()
  159. assert not task.done()
  160. task.cancel()
  161. task_chain.poll()
  162. assert task.done()
  163. assert task.cancelled()
  164. assert fut.done()
  165. assert fut.cancelled()
  166. def test_task_result():
  167. task_mgr = core.AsyncTaskManager.get_global_ptr()
  168. task_chain = task_mgr.make_task_chain("test_task_result")
  169. def task_main(task):
  170. task.set_result(42)
  171. # It won't yet be marked done until after it returns.
  172. assert not task.done()
  173. return core.PythonTask.done
  174. task = core.PythonTask(task_main)
  175. task.set_task_chain(task_chain.name)
  176. task_mgr.add(task)
  177. task_chain.wait_for_tasks()
  178. assert task.done()
  179. assert not task.cancelled()
  180. assert task.result() == 42
  181. def test_coro_await_coro():
  182. # Await another coro in a coro.
  183. fut = core.AsyncFuture()
  184. async def coro2():
  185. await fut
  186. async def coro_main():
  187. await coro2()
  188. task = core.PythonTask(coro_main())
  189. task_mgr = core.AsyncTaskManager.get_global_ptr()
  190. task_mgr.add(task)
  191. for i in range(5):
  192. task_mgr.poll()
  193. assert not task.done()
  194. fut.set_result(None)
  195. task_mgr.poll()
  196. assert task.done()
  197. assert not task.cancelled()
  198. def test_coro_await_cancel_resistant_coro():
  199. # Await another coro in a coro, but cancel the outer.
  200. fut = core.AsyncFuture()
  201. cancelled_caught = [0]
  202. keep_going = [False]
  203. async def cancel_resistant_coro():
  204. while not fut.done():
  205. try:
  206. await core.AsyncFuture.shield(fut)
  207. except CancelledError as ex:
  208. cancelled_caught[0] += 1
  209. async def coro_main():
  210. await cancel_resistant_coro()
  211. task = core.PythonTask(coro_main(), 'coro_main')
  212. task_mgr = core.AsyncTaskManager.get_global_ptr()
  213. task_mgr.add(task)
  214. assert not task.done()
  215. task_mgr.poll()
  216. assert not task.done()
  217. # No cancelling it once it started...
  218. for i in range(3):
  219. assert task.cancel()
  220. assert not task.done()
  221. for j in range(3):
  222. task_mgr.poll()
  223. assert not task.done()
  224. assert cancelled_caught[0] == 3
  225. fut.set_result(None)
  226. task_mgr.poll()
  227. assert task.done()
  228. assert not task.cancelled()
  229. def test_coro_await_external():
  230. # Await an external future in a coro.
  231. fut = MockFuture()
  232. fut._result = 12345
  233. res = []
  234. async def coro_main():
  235. res.append(await fut)
  236. task = core.PythonTask(coro_main(), 'coro_main')
  237. task_mgr = core.AsyncTaskManager.get_global_ptr()
  238. task_mgr.add(task)
  239. for i in range(5):
  240. task_mgr.poll()
  241. assert not task.done()
  242. fut._state = 'FINISHED'
  243. task_mgr.poll()
  244. assert task.done()
  245. assert not task.cancelled()
  246. assert res == [12345]
  247. def test_coro_await_external_cancel_inner():
  248. # Cancel external future being awaited by a coro.
  249. fut = MockFuture()
  250. async def coro_main():
  251. await fut
  252. task = core.PythonTask(coro_main(), 'coro_main')
  253. task_mgr = core.AsyncTaskManager.get_global_ptr()
  254. task_mgr.add(task)
  255. for i in range(5):
  256. task_mgr.poll()
  257. assert not task.done()
  258. fut._state = 'CANCELLED'
  259. assert not task.done()
  260. task_mgr.poll()
  261. assert task.done()
  262. assert task.cancelled()
  263. def test_coro_await_external_cancel_outer():
  264. # Cancel task that is awaiting external future.
  265. fut = MockFuture()
  266. result = []
  267. async def coro_main():
  268. result.append(await fut)
  269. task = core.PythonTask(coro_main(), 'coro_main')
  270. task_mgr = core.AsyncTaskManager.get_global_ptr()
  271. task_mgr.add(task)
  272. for i in range(5):
  273. task_mgr.poll()
  274. assert not task.done()
  275. fut._state = 'CANCELLED'
  276. assert not task.done()
  277. task_mgr.poll()
  278. assert task.done()
  279. assert task.cancelled()
  280. def test_coro_exception():
  281. task_mgr = core.AsyncTaskManager.get_global_ptr()
  282. task_chain = task_mgr.make_task_chain("test_coro_exception")
  283. def coro_main():
  284. raise RuntimeError
  285. yield None
  286. task = core.PythonTask(coro_main())
  287. task.set_task_chain(task_chain.name)
  288. task_mgr.add(task)
  289. task_chain.wait_for_tasks()
  290. assert task.done()
  291. assert not task.cancelled()
  292. with pytest.raises(RuntimeError):
  293. task.result()
  294. def test_future_result():
  295. # Cancelled
  296. fut = core.AsyncFuture()
  297. assert not fut.done()
  298. fut.cancel()
  299. with pytest.raises(CancelledError):
  300. fut.result()
  301. # None
  302. fut = core.AsyncFuture()
  303. fut.set_result(None)
  304. assert fut.done()
  305. assert fut.result() is None
  306. # Store int
  307. fut = core.AsyncFuture()
  308. fut.set_result(123)
  309. assert fut.result() == 123
  310. # Store string
  311. fut = core.AsyncFuture()
  312. fut.set_result("test\000\u1234")
  313. assert fut.result() == "test\000\u1234"
  314. # Store TypedWritableReferenceCount
  315. tex = core.Texture()
  316. rc = tex.get_ref_count()
  317. fut = core.AsyncFuture()
  318. fut.set_result(tex)
  319. assert tex.get_ref_count() == rc + 1
  320. assert fut.result() == tex
  321. assert tex.get_ref_count() == rc + 1
  322. assert fut.result() == tex
  323. assert tex.get_ref_count() == rc + 1
  324. fut = None
  325. assert tex.get_ref_count() == rc
  326. # Store EventParameter (no longer gets unwrapped)
  327. ep = core.EventParameter(0.5)
  328. fut = core.AsyncFuture()
  329. fut.set_result(ep)
  330. assert fut.result() is ep
  331. assert fut.result() is ep
  332. # Store TypedObject
  333. dg = core.Datagram(b"test")
  334. fut = core.AsyncFuture()
  335. fut.set_result(dg)
  336. assert fut.result() == dg
  337. assert fut.result() == dg
  338. # Store arbitrary Python object
  339. obj = object()
  340. rc = sys.getrefcount(obj)
  341. fut = core.AsyncFuture()
  342. fut.set_result(obj)
  343. assert sys.getrefcount(obj) == rc + 1
  344. assert fut.result() is obj
  345. assert sys.getrefcount(obj) == rc + 1
  346. assert fut.result() is obj
  347. assert sys.getrefcount(obj) == rc + 1
  348. fut = None
  349. assert sys.getrefcount(obj) == rc
  350. def test_future_gather():
  351. fut1 = core.AsyncFuture()
  352. fut2 = core.AsyncFuture()
  353. # 0 and 1 arguments are special
  354. assert core.AsyncFuture.gather().done()
  355. assert core.AsyncFuture.gather(fut1) == fut1
  356. # Gathering not-done futures
  357. gather = core.AsyncFuture.gather(fut1, fut2)
  358. assert not gather.done()
  359. # One future done
  360. fut1.set_result(1)
  361. assert not gather.done()
  362. # Two futures done
  363. fut2.set_result(2)
  364. assert gather.done()
  365. assert not gather.cancelled()
  366. assert tuple(gather.result()) == (1, 2)
  367. def test_future_gather_cancel_inner():
  368. fut1 = core.AsyncFuture()
  369. fut2 = core.AsyncFuture()
  370. # Gathering not-done futures
  371. gather = core.AsyncFuture.gather(fut1, fut2)
  372. assert not gather.done()
  373. # One future cancelled
  374. fut1.cancel()
  375. assert not gather.done()
  376. # Two futures cancelled
  377. fut2.set_result(2)
  378. assert gather.done()
  379. assert not gather.cancelled()
  380. with pytest.raises(CancelledError):
  381. assert gather.result()
  382. def test_future_gather_cancel_outer():
  383. fut1 = core.AsyncFuture()
  384. fut2 = core.AsyncFuture()
  385. # Gathering not-done futures
  386. gather = core.AsyncFuture.gather(fut1, fut2)
  387. assert not gather.done()
  388. assert gather.cancel()
  389. assert gather.done()
  390. assert gather.cancelled()
  391. with pytest.raises(CancelledError):
  392. assert gather.result()
  393. def test_future_shield():
  394. # An already done future is returned as-is (no cancellation can occur)
  395. inner = core.AsyncFuture()
  396. inner.set_result(None)
  397. outer = core.AsyncFuture.shield(inner)
  398. assert inner == outer
  399. # Normally finishing future
  400. inner = core.AsyncFuture()
  401. outer = core.AsyncFuture.shield(inner)
  402. assert not outer.done()
  403. inner.set_result(None)
  404. assert outer.done()
  405. assert not outer.cancelled()
  406. assert inner.result() is None
  407. # Normally finishing future with result
  408. inner = core.AsyncFuture()
  409. outer = core.AsyncFuture.shield(inner)
  410. assert not outer.done()
  411. inner.set_result(123)
  412. assert outer.done()
  413. assert not outer.cancelled()
  414. assert inner.result() == 123
  415. # Cancelled inner future does propagate cancellation outward
  416. inner = core.AsyncFuture()
  417. outer = core.AsyncFuture.shield(inner)
  418. assert not outer.done()
  419. inner.cancel()
  420. assert outer.done()
  421. assert outer.cancelled()
  422. # Finished outer future does nothing to inner
  423. inner = core.AsyncFuture()
  424. outer = core.AsyncFuture.shield(inner)
  425. outer.set_result(None)
  426. assert not inner.done()
  427. inner.cancel()
  428. assert not outer.cancelled()
  429. # Cancelled outer future does nothing to inner
  430. inner = core.AsyncFuture()
  431. outer = core.AsyncFuture.shield(inner)
  432. outer.cancel()
  433. assert not inner.done()
  434. inner.cancel()
  435. # Can be shielded multiple times
  436. inner = core.AsyncFuture()
  437. outer1 = core.AsyncFuture.shield(inner)
  438. outer2 = core.AsyncFuture.shield(inner)
  439. outer1.cancel()
  440. assert not inner.done()
  441. assert not outer2.done()
  442. inner.cancel()
  443. assert outer1.done()
  444. assert outer2.done()
  445. def test_future_done_callback():
  446. fut = core.AsyncFuture()
  447. # Use the list hack since Python 2 doesn't have the "nonlocal" keyword.
  448. called = [False]
  449. def on_done(arg):
  450. assert arg == fut
  451. called[0] = True
  452. fut.add_done_callback(on_done)
  453. fut.cancel()
  454. assert fut.done()
  455. task_mgr = core.AsyncTaskManager.get_global_ptr()
  456. task_mgr.poll()
  457. assert called[0]
  458. def test_future_done_callback_already_done():
  459. # Same as above, but with the future already done when add_done_callback
  460. # is called.
  461. fut = core.AsyncFuture()
  462. fut.cancel()
  463. assert fut.done()
  464. # Use the list hack since Python 2 doesn't have the "nonlocal" keyword.
  465. called = [False]
  466. def on_done(arg):
  467. assert arg == fut
  468. called[0] = True
  469. fut.add_done_callback(on_done)
  470. task_mgr = core.AsyncTaskManager.get_global_ptr()
  471. task_mgr.poll()
  472. assert called[0]
  473. def test_event_future():
  474. queue = core.EventQueue()
  475. handler = core.EventHandler(queue)
  476. fut = handler.get_future("test")
  477. # If we ask again, we should get the same one.
  478. assert handler.get_future("test") == fut
  479. event = core.Event("test")
  480. handler.dispatch_event(event)
  481. assert fut.done()
  482. assert not fut.cancelled()
  483. assert fut.result() == event
  484. def test_event_future_cancel():
  485. # This is a very strange thing to do, but it's possible, so let's make
  486. # sure it gives defined behavior.
  487. queue = core.EventQueue()
  488. handler = core.EventHandler(queue)
  489. fut = handler.get_future("test")
  490. fut.cancel()
  491. assert fut.done()
  492. assert fut.cancelled()
  493. event = core.Event("test")
  494. handler.dispatch_event(event)
  495. assert fut.done()
  496. assert fut.cancelled()
  497. def test_event_future_cancel2():
  498. queue = core.EventQueue()
  499. handler = core.EventHandler(queue)
  500. # Make sure we get a new future if we cancelled the first one.
  501. fut = handler.get_future("test")
  502. fut.cancel()
  503. fut2 = handler.get_future("test")
  504. assert fut != fut2
  505. assert fut.done()
  506. assert fut.cancelled()
  507. assert not fut2.done()
  508. assert not fut2.cancelled()