test_futures.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736
  1. from panda3d import core
  2. from asyncio.exceptions import TimeoutError, CancelledError
  3. import pytest
  4. import time
  5. import sys
  6. import weakref
  7. class MockFuture:
  8. _asyncio_future_blocking = False
  9. _state = 'PENDING'
  10. _cancel_return = False
  11. _result = None
  12. def __await__(self):
  13. while self._state == 'PENDING':
  14. yield self
  15. return self.result()
  16. def done(self):
  17. return self._state != 'PENDING'
  18. def cancelled(self):
  19. return self._state == 'CANCELLED'
  20. def cancel(self):
  21. return self._cancel_return
  22. def result(self):
  23. if self._state == 'CANCELLED':
  24. raise CancelledError
  25. return self._result
  26. def test_future_cancelled():
  27. fut = core.AsyncFuture()
  28. assert not fut.done()
  29. assert not fut.cancelled()
  30. fut.cancel()
  31. assert fut.done()
  32. assert fut.cancelled()
  33. with pytest.raises(CancelledError):
  34. fut.result()
  35. # Works more than once
  36. with pytest.raises(CancelledError):
  37. fut.result()
  38. def test_future_timeout():
  39. fut = core.AsyncFuture()
  40. with pytest.raises(TimeoutError):
  41. fut.result(0.001)
  42. # Works more than once
  43. with pytest.raises(TimeoutError):
  44. fut.result(0.001)
  45. @pytest.mark.skipif(not core.Thread.is_threading_supported(),
  46. reason="Threading support disabled")
  47. def test_future_wait():
  48. threading = pytest.importorskip("direct.stdpy.threading")
  49. fut = core.AsyncFuture()
  50. # Launch a thread to set the result value.
  51. def thread_main():
  52. time.sleep(0.001)
  53. fut.set_result(None)
  54. thread = threading.Thread(target=thread_main)
  55. assert not fut.done()
  56. thread.start()
  57. assert fut.result() is None
  58. assert fut.done()
  59. assert not fut.cancelled()
  60. assert fut.result() is None
  61. @pytest.mark.skipif(not core.Thread.is_threading_supported(),
  62. reason="Threading support disabled")
  63. def test_future_wait_timeout():
  64. threading = pytest.importorskip("direct.stdpy.threading")
  65. fut = core.AsyncFuture()
  66. # Launch a thread to set the result value.
  67. def thread_main():
  68. time.sleep(0.001)
  69. fut.set_result(None)
  70. thread = threading.Thread(target=thread_main)
  71. assert not fut.done()
  72. thread.start()
  73. assert fut.result(1.0) is None
  74. assert fut.done()
  75. assert not fut.cancelled()
  76. assert fut.result() is None
  77. @pytest.mark.skipif(not core.Thread.is_threading_supported(),
  78. reason="Threading support disabled")
  79. def test_future_wait_cancel():
  80. threading = pytest.importorskip("direct.stdpy.threading")
  81. fut = core.AsyncFuture()
  82. # Launch a thread to cancel the future.
  83. def thread_main():
  84. time.sleep(0.001)
  85. fut.cancel()
  86. thread = threading.Thread(target=thread_main)
  87. assert not fut.done()
  88. thread.start()
  89. with pytest.raises(CancelledError):
  90. fut.result()
  91. assert fut.done()
  92. assert fut.cancelled()
  93. with pytest.raises(CancelledError):
  94. fut.result()
  95. def test_task_cancel():
  96. task_mgr = core.AsyncTaskManager.get_global_ptr()
  97. task = core.PythonTask(lambda task: task.done)
  98. task_mgr.add(task)
  99. assert not task.done()
  100. task_mgr.remove(task)
  101. assert task.done()
  102. assert task.cancelled()
  103. with pytest.raises(CancelledError):
  104. task.result()
  105. def test_task_cancel_during_run():
  106. task_mgr = core.AsyncTaskManager.get_global_ptr()
  107. task_chain = task_mgr.make_task_chain("test_task_cancel_during_run")
  108. def task_main(task):
  109. task.remove()
  110. # It won't yet be marked done until after it returns.
  111. assert not task.done()
  112. return task.done
  113. task = core.PythonTask(task_main)
  114. task.set_task_chain(task_chain.name)
  115. task_mgr.add(task)
  116. task_chain.wait_for_tasks()
  117. assert task.done()
  118. assert task.cancelled()
  119. with pytest.raises(CancelledError):
  120. task.result()
  121. @pytest.mark.skipif(not core.Thread.is_threading_supported(),
  122. reason="Threading support disabled")
  123. def test_task_cancel_waiting():
  124. # Calling result() in a threaded task chain should cancel the future being
  125. # waited on if the surrounding task is cancelled.
  126. task_mgr = core.AsyncTaskManager.get_global_ptr()
  127. task_chain = task_mgr.make_task_chain("test_task_cancel_waiting")
  128. task_chain.set_num_threads(1)
  129. fut = core.AsyncFuture()
  130. async def task_main(task):
  131. # This will block the thread this task is in until the future is done,
  132. # or until the task is cancelled (which implicitly cancels the future).
  133. fut.result()
  134. return task.done
  135. task = core.PythonTask(task_main, 'task_main')
  136. task.set_task_chain(task_chain.name)
  137. task_mgr.add(task)
  138. task_chain.start_threads()
  139. try:
  140. assert not task.done()
  141. fut.cancel()
  142. task.wait()
  143. assert task.cancelled()
  144. assert fut.cancelled()
  145. finally:
  146. task_chain.stop_threads()
  147. def test_task_cancel_awaiting():
  148. task_mgr = core.AsyncTaskManager.get_global_ptr()
  149. task_chain = task_mgr.make_task_chain("test_task_cancel_awaiting")
  150. fut = core.AsyncFuture()
  151. async def task_main(task):
  152. await fut
  153. return task.done
  154. task = core.PythonTask(task_main, 'task_main')
  155. task.set_task_chain(task_chain.name)
  156. task_mgr.add(task)
  157. task_chain.poll()
  158. assert not task.done()
  159. task_chain.poll()
  160. assert not task.done()
  161. task.cancel()
  162. task_chain.poll()
  163. assert task.done()
  164. assert task.cancelled()
  165. assert fut.done()
  166. assert fut.cancelled()
  167. def test_task_result():
  168. task_mgr = core.AsyncTaskManager.get_global_ptr()
  169. task_chain = task_mgr.make_task_chain("test_task_result")
  170. def task_main(task):
  171. task.set_result(42)
  172. # It won't yet be marked done until after it returns.
  173. assert not task.done()
  174. return core.PythonTask.done
  175. task = core.PythonTask(task_main)
  176. task.set_task_chain(task_chain.name)
  177. task_mgr.add(task)
  178. task_chain.wait_for_tasks()
  179. assert task.done()
  180. assert not task.cancelled()
  181. assert task.result() == 42
  182. def test_coro_await_coro():
  183. # Await another coro in a coro.
  184. fut = core.AsyncFuture()
  185. async def coro2():
  186. await fut
  187. async def coro_main():
  188. await coro2()
  189. task = core.PythonTask(coro_main())
  190. task_mgr = core.AsyncTaskManager.get_global_ptr()
  191. task_mgr.add(task)
  192. for i in range(5):
  193. task_mgr.poll()
  194. assert not task.done()
  195. fut.set_result(None)
  196. task_mgr.poll()
  197. assert task.done()
  198. assert not task.cancelled()
  199. def test_coro_await_cancel_resistant_coro():
  200. # Await another coro in a coro, but cancel the outer.
  201. fut = core.AsyncFuture()
  202. cancelled_caught = [0]
  203. keep_going = [False]
  204. async def cancel_resistant_coro():
  205. while not fut.done():
  206. try:
  207. await core.AsyncFuture.shield(fut)
  208. except CancelledError as ex:
  209. cancelled_caught[0] += 1
  210. async def coro_main():
  211. await cancel_resistant_coro()
  212. task = core.PythonTask(coro_main(), 'coro_main')
  213. task_mgr = core.AsyncTaskManager.get_global_ptr()
  214. task_mgr.add(task)
  215. assert not task.done()
  216. task_mgr.poll()
  217. assert not task.done()
  218. # No cancelling it once it started...
  219. for i in range(3):
  220. assert task.cancel()
  221. assert not task.done()
  222. for j in range(3):
  223. task_mgr.poll()
  224. assert not task.done()
  225. assert cancelled_caught[0] == 3
  226. fut.set_result(None)
  227. task_mgr.poll()
  228. assert task.done()
  229. assert not task.cancelled()
  230. def test_coro_await_external():
  231. # Await an external future in a coro.
  232. fut = MockFuture()
  233. fut._result = 12345
  234. res = []
  235. async def coro_main():
  236. res.append(await fut)
  237. task = core.PythonTask(coro_main(), 'coro_main')
  238. task_mgr = core.AsyncTaskManager.get_global_ptr()
  239. task_mgr.add(task)
  240. for i in range(5):
  241. task_mgr.poll()
  242. assert not task.done()
  243. fut._state = 'FINISHED'
  244. task_mgr.poll()
  245. assert task.done()
  246. assert not task.cancelled()
  247. assert res == [12345]
  248. def test_coro_await_external_cancel_inner():
  249. # Cancel external future being awaited by a coro.
  250. fut = MockFuture()
  251. async def coro_main():
  252. await fut
  253. task = core.PythonTask(coro_main(), 'coro_main')
  254. task_mgr = core.AsyncTaskManager.get_global_ptr()
  255. task_mgr.add(task)
  256. for i in range(5):
  257. task_mgr.poll()
  258. assert not task.done()
  259. fut._state = 'CANCELLED'
  260. assert not task.done()
  261. task_mgr.poll()
  262. assert task.done()
  263. assert task.cancelled()
  264. def test_coro_await_external_cancel_outer():
  265. # Cancel task that is awaiting external future.
  266. fut = MockFuture()
  267. result = []
  268. async def coro_main():
  269. result.append(await fut)
  270. task = core.PythonTask(coro_main(), 'coro_main')
  271. task_mgr = core.AsyncTaskManager.get_global_ptr()
  272. task_mgr.add(task)
  273. for i in range(5):
  274. task_mgr.poll()
  275. assert not task.done()
  276. fut._state = 'CANCELLED'
  277. assert not task.done()
  278. task_mgr.poll()
  279. assert task.done()
  280. assert task.cancelled()
  281. def test_coro_exception():
  282. task_mgr = core.AsyncTaskManager.get_global_ptr()
  283. task_chain = task_mgr.make_task_chain("test_coro_exception")
  284. def coro_main():
  285. raise RuntimeError
  286. yield None
  287. task = core.PythonTask(coro_main())
  288. task.set_task_chain(task_chain.name)
  289. task_mgr.add(task)
  290. task_chain.wait_for_tasks()
  291. assert task.done()
  292. assert not task.cancelled()
  293. with pytest.raises(RuntimeError):
  294. task.result()
  295. def test_future_result():
  296. # Cancelled
  297. fut = core.AsyncFuture()
  298. assert not fut.done()
  299. fut.cancel()
  300. with pytest.raises(CancelledError):
  301. fut.result()
  302. # None
  303. fut = core.AsyncFuture()
  304. fut.set_result(None)
  305. assert fut.done()
  306. assert fut.result() is None
  307. # Store int
  308. fut = core.AsyncFuture()
  309. fut.set_result(123)
  310. assert fut.result() == 123
  311. # Store string
  312. fut = core.AsyncFuture()
  313. fut.set_result("test\000\u1234")
  314. assert fut.result() == "test\000\u1234"
  315. # Store TypedWritableReferenceCount
  316. tex = core.Texture()
  317. rc = tex.get_ref_count()
  318. fut = core.AsyncFuture()
  319. fut.set_result(tex)
  320. assert tex.get_ref_count() == rc + 1
  321. assert fut.result() == tex
  322. assert tex.get_ref_count() == rc + 1
  323. assert fut.result() == tex
  324. assert tex.get_ref_count() == rc + 1
  325. fut = None
  326. assert tex.get_ref_count() == rc
  327. # Store EventParameter (no longer gets unwrapped)
  328. ep = core.EventParameter(0.5)
  329. fut = core.AsyncFuture()
  330. fut.set_result(ep)
  331. assert fut.result() is ep
  332. assert fut.result() is ep
  333. # Store TypedObject
  334. dg = core.Datagram(b"test")
  335. fut = core.AsyncFuture()
  336. fut.set_result(dg)
  337. assert fut.result() == dg
  338. assert fut.result() == dg
  339. # Store arbitrary Python object
  340. obj = object()
  341. rc = sys.getrefcount(obj)
  342. fut = core.AsyncFuture()
  343. fut.set_result(obj)
  344. assert sys.getrefcount(obj) == rc + 1
  345. assert fut.result() is obj
  346. assert sys.getrefcount(obj) == rc + 1
  347. assert fut.result() is obj
  348. assert sys.getrefcount(obj) == rc + 1
  349. fut = None
  350. assert sys.getrefcount(obj) == rc
  351. def test_future_gather():
  352. fut1 = core.AsyncFuture()
  353. fut2 = core.AsyncFuture()
  354. # 0 and 1 arguments are special
  355. assert core.AsyncFuture.gather().done()
  356. assert core.AsyncFuture.gather(fut1) == fut1
  357. # Gathering not-done futures
  358. gather = core.AsyncFuture.gather(fut1, fut2)
  359. assert not gather.done()
  360. # One future done
  361. fut1.set_result(1)
  362. assert not gather.done()
  363. # Two futures done
  364. fut2.set_result(2)
  365. assert gather.done()
  366. assert not gather.cancelled()
  367. assert tuple(gather.result()) == (1, 2)
  368. def test_future_gather_cancel_inner():
  369. fut1 = core.AsyncFuture()
  370. fut2 = core.AsyncFuture()
  371. # Gathering not-done futures
  372. gather = core.AsyncFuture.gather(fut1, fut2)
  373. assert not gather.done()
  374. # One future cancelled
  375. fut1.cancel()
  376. assert not gather.done()
  377. # Two futures cancelled
  378. fut2.set_result(2)
  379. assert gather.done()
  380. assert not gather.cancelled()
  381. with pytest.raises(CancelledError):
  382. assert gather.result()
  383. def test_future_gather_cancel_outer():
  384. fut1 = core.AsyncFuture()
  385. fut2 = core.AsyncFuture()
  386. # Gathering not-done futures
  387. gather = core.AsyncFuture.gather(fut1, fut2)
  388. assert not gather.done()
  389. assert gather.cancel()
  390. assert gather.done()
  391. assert gather.cancelled()
  392. with pytest.raises(CancelledError):
  393. assert gather.result()
  394. def test_future_shield():
  395. # An already done future is returned as-is (no cancellation can occur)
  396. inner = core.AsyncFuture()
  397. inner.set_result(None)
  398. outer = core.AsyncFuture.shield(inner)
  399. assert inner == outer
  400. # Normally finishing future
  401. inner = core.AsyncFuture()
  402. outer = core.AsyncFuture.shield(inner)
  403. assert not outer.done()
  404. inner.set_result(None)
  405. assert outer.done()
  406. assert not outer.cancelled()
  407. assert inner.result() is None
  408. # Normally finishing future with result
  409. inner = core.AsyncFuture()
  410. outer = core.AsyncFuture.shield(inner)
  411. assert not outer.done()
  412. inner.set_result(123)
  413. assert outer.done()
  414. assert not outer.cancelled()
  415. assert inner.result() == 123
  416. # Cancelled inner future does propagate cancellation outward
  417. inner = core.AsyncFuture()
  418. outer = core.AsyncFuture.shield(inner)
  419. assert not outer.done()
  420. inner.cancel()
  421. assert outer.done()
  422. assert outer.cancelled()
  423. # Finished outer future does nothing to inner
  424. inner = core.AsyncFuture()
  425. outer = core.AsyncFuture.shield(inner)
  426. outer.set_result(None)
  427. assert not inner.done()
  428. inner.cancel()
  429. assert not outer.cancelled()
  430. # Cancelled outer future does nothing to inner
  431. inner = core.AsyncFuture()
  432. outer = core.AsyncFuture.shield(inner)
  433. outer.cancel()
  434. assert not inner.done()
  435. inner.cancel()
  436. # Can be shielded multiple times
  437. inner = core.AsyncFuture()
  438. outer1 = core.AsyncFuture.shield(inner)
  439. outer2 = core.AsyncFuture.shield(inner)
  440. outer1.cancel()
  441. assert not inner.done()
  442. assert not outer2.done()
  443. inner.cancel()
  444. assert outer1.done()
  445. assert outer2.done()
  446. def test_future_done_callback():
  447. fut = core.AsyncFuture()
  448. # Use the list hack since Python 2 doesn't have the "nonlocal" keyword.
  449. called = [False]
  450. def on_done(arg):
  451. assert arg == fut
  452. called[0] = True
  453. fut.add_done_callback(on_done)
  454. fut.cancel()
  455. assert fut.done()
  456. task_mgr = core.AsyncTaskManager.get_global_ptr()
  457. task_mgr.poll()
  458. assert called[0]
  459. def test_future_done_callback_already_done():
  460. # Same as above, but with the future already done when add_done_callback
  461. # is called.
  462. fut = core.AsyncFuture()
  463. fut.cancel()
  464. assert fut.done()
  465. # Use the list hack since Python 2 doesn't have the "nonlocal" keyword.
  466. called = [False]
  467. def on_done(arg):
  468. assert arg == fut
  469. called[0] = True
  470. fut.add_done_callback(on_done)
  471. task_mgr = core.AsyncTaskManager.get_global_ptr()
  472. task_mgr.poll()
  473. assert called[0]
  474. def test_event_future():
  475. queue = core.EventQueue()
  476. handler = core.EventHandler(queue)
  477. fut = handler.get_future("test")
  478. # If we ask again, we should get the same one.
  479. assert handler.get_future("test") == fut
  480. event = core.Event("test")
  481. handler.dispatch_event(event)
  482. assert fut.done()
  483. assert not fut.cancelled()
  484. assert fut.result() == event
  485. def test_event_future_cancel():
  486. # This is a very strange thing to do, but it's possible, so let's make
  487. # sure it gives defined behavior.
  488. queue = core.EventQueue()
  489. handler = core.EventHandler(queue)
  490. fut = handler.get_future("test")
  491. fut.cancel()
  492. assert fut.done()
  493. assert fut.cancelled()
  494. event = core.Event("test")
  495. handler.dispatch_event(event)
  496. assert fut.done()
  497. assert fut.cancelled()
  498. def test_event_future_cancel2():
  499. queue = core.EventQueue()
  500. handler = core.EventHandler(queue)
  501. # Make sure we get a new future if we cancelled the first one.
  502. fut = handler.get_future("test")
  503. fut.cancel()
  504. fut2 = handler.get_future("test")
  505. assert fut != fut2
  506. assert fut.done()
  507. assert fut.cancelled()
  508. assert not fut2.done()
  509. assert not fut2.cancelled()
  510. def test_task_manager_cleanup_non_panda_future():
  511. future = MockFuture()
  512. # Create a weakref so we can verify the future was cleaned up.
  513. future_ref = weakref.ref(future)
  514. async def coro_main():
  515. return await future
  516. task = core.PythonTask(coro_main(), 'coro_main')
  517. task_mgr = core.AsyncTaskManager.get_global_ptr()
  518. task_mgr.add(task)
  519. # Poll the task_mgr so the PythonTask starts polling on future.done()
  520. task_mgr.poll()
  521. future._result = 'Done123'
  522. future._state = 'DONE'
  523. # Drop our reference to the future, `task` should hold the only reference
  524. del future
  525. # Recognize the future has completed
  526. task_mgr.poll()
  527. # Verify the task was completed.
  528. assert task.result() == 'Done123'
  529. del coro_main # this should break the last strong reference to the mock future
  530. assert future_ref() is None, "MockFuture was not cleaned up!"
  531. def test_await_future_result_cleanup():
  532. # Create a simple future and an object for it to return
  533. future = core.AsyncFuture()
  534. class TestObject:
  535. pass
  536. test_result = TestObject()
  537. future_result_ref = weakref.ref(test_result)
  538. # Setup an async environment and dispatch it to a task chain to await the future
  539. async def coro_main():
  540. nonlocal test_result
  541. future.set_result(test_result)
  542. del test_result
  543. await future
  544. task = core.PythonTask(coro_main(), 'coro_main')
  545. task_mgr = core.AsyncTaskManager.get_global_ptr()
  546. task_mgr.add(task)
  547. # Poll the task_mgr so the PythonTask starts polling on future.done()
  548. task_mgr.poll()
  549. # Break all possible references to the future object
  550. del task
  551. del coro_main
  552. del future # this should break the last strong reference to the future
  553. assert future_result_ref() is None, "TestObject was not cleaned up!"