test_futures.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797
  1. from panda3d import core
  2. from asyncio.exceptions import TimeoutError, CancelledError
  3. import pytest
  4. import time
  5. import sys
  6. import weakref
  7. class MockFuture:
  8. _asyncio_future_blocking = False
  9. _state = 'PENDING'
  10. _cancel_return = False
  11. _result = None
  12. def __await__(self):
  13. while self._state == 'PENDING':
  14. yield self
  15. return self.result()
  16. def done(self):
  17. return self._state != 'PENDING'
  18. def cancelled(self):
  19. return self._state == 'CANCELLED'
  20. def cancel(self):
  21. return self._cancel_return
  22. def result(self):
  23. if self._state == 'CANCELLED':
  24. raise CancelledError
  25. return self._result
  26. def check_result(fut, expected):
  27. """Asserts the result of the future is the expected value."""
  28. if fut.result() != expected:
  29. return False
  30. # Make sure that await also returns the values properly
  31. with pytest.raises(StopIteration) as e:
  32. next(fut.__await__())
  33. if e.value.value != expected:
  34. return False
  35. return True
  36. def test_future_await_send():
  37. fut = core.AsyncFuture()
  38. i = fut.__await__()
  39. assert i.send(None) == fut
  40. assert i.send(None) == fut
  41. assert i.send(None) == fut
  42. assert i.send(None) == fut
  43. fut.set_result(123)
  44. with pytest.raises(StopIteration) as e:
  45. i.send(None)
  46. assert e.value.value == 123
  47. def test_future_await_throw():
  48. fut = core.AsyncFuture()
  49. i = fut.__await__()
  50. with pytest.raises(RuntimeError):
  51. i.throw(RuntimeError)
  52. def test_future_cancelled():
  53. fut = core.AsyncFuture()
  54. assert not fut.done()
  55. assert not fut.cancelled()
  56. fut.cancel()
  57. assert fut.done()
  58. assert fut.cancelled()
  59. with pytest.raises(CancelledError):
  60. fut.result()
  61. # Works more than once
  62. with pytest.raises(CancelledError):
  63. fut.result()
  64. def test_future_timeout():
  65. fut = core.AsyncFuture()
  66. with pytest.raises(TimeoutError):
  67. fut.result(0.001)
  68. # Works more than once
  69. with pytest.raises(TimeoutError):
  70. fut.result(0.001)
  71. @pytest.mark.skipif(not core.Thread.is_threading_supported(),
  72. reason="Threading support disabled")
  73. def test_future_wait():
  74. threading = pytest.importorskip("direct.stdpy.threading")
  75. fut = core.AsyncFuture()
  76. # Launch a thread to set the result value.
  77. def thread_main():
  78. time.sleep(0.001)
  79. fut.set_result(None)
  80. thread = threading.Thread(target=thread_main)
  81. assert not fut.done()
  82. thread.start()
  83. assert fut.result() is None
  84. assert fut.done()
  85. assert not fut.cancelled()
  86. assert fut.result() is None
  87. @pytest.mark.skipif(not core.Thread.is_threading_supported(),
  88. reason="Threading support disabled")
  89. def test_future_wait_timeout():
  90. threading = pytest.importorskip("direct.stdpy.threading")
  91. fut = core.AsyncFuture()
  92. # Launch a thread to set the result value.
  93. def thread_main():
  94. time.sleep(0.001)
  95. fut.set_result(None)
  96. thread = threading.Thread(target=thread_main)
  97. assert not fut.done()
  98. thread.start()
  99. assert fut.result(1.0) is None
  100. assert fut.done()
  101. assert not fut.cancelled()
  102. assert fut.result() is None
  103. @pytest.mark.skipif(not core.Thread.is_threading_supported(),
  104. reason="Threading support disabled")
  105. def test_future_wait_cancel():
  106. threading = pytest.importorskip("direct.stdpy.threading")
  107. fut = core.AsyncFuture()
  108. # Launch a thread to cancel the future.
  109. def thread_main():
  110. time.sleep(0.001)
  111. fut.cancel()
  112. thread = threading.Thread(target=thread_main)
  113. assert not fut.done()
  114. thread.start()
  115. with pytest.raises(CancelledError):
  116. fut.result()
  117. assert fut.done()
  118. assert fut.cancelled()
  119. with pytest.raises(CancelledError):
  120. fut.result()
  121. def test_task_remove():
  122. task_mgr = core.AsyncTaskManager.get_global_ptr()
  123. task = core.PythonTask(lambda task: task.done)
  124. task_mgr.add(task)
  125. assert not task.done()
  126. task_mgr.remove(task)
  127. assert not task.is_alive()
  128. assert task.done()
  129. assert task.cancelled()
  130. with pytest.raises(CancelledError):
  131. task.result()
  132. def test_task_cancel():
  133. task_mgr = core.AsyncTaskManager.get_global_ptr()
  134. task = core.PythonTask(lambda task: task.done)
  135. task_mgr.add(task)
  136. assert not task.done()
  137. task.cancel()
  138. assert not task.is_alive()
  139. assert task.done()
  140. assert task.cancelled()
  141. with pytest.raises(CancelledError):
  142. task.result()
  143. def test_task_cancel_during_run():
  144. task_mgr = core.AsyncTaskManager.get_global_ptr()
  145. task_chain = task_mgr.make_task_chain("test_task_cancel_during_run")
  146. def task_main(task):
  147. task.remove()
  148. # It won't yet be marked done until after it returns.
  149. assert not task.done()
  150. return task.done
  151. task = core.PythonTask(task_main)
  152. task.set_task_chain(task_chain.name)
  153. task_mgr.add(task)
  154. task_chain.wait_for_tasks()
  155. assert task.done()
  156. assert task.cancelled()
  157. with pytest.raises(CancelledError):
  158. task.result()
  159. @pytest.mark.skipif(not core.Thread.is_threading_supported(),
  160. reason="Threading support disabled")
  161. def test_task_cancel_waiting():
  162. # Calling result() in a threaded task chain should cancel the future being
  163. # waited on if the surrounding task is cancelled.
  164. task_mgr = core.AsyncTaskManager.get_global_ptr()
  165. task_chain = task_mgr.make_task_chain("test_task_cancel_waiting")
  166. task_chain.set_num_threads(1)
  167. fut = core.AsyncFuture()
  168. async def task_main(task):
  169. # This will block the thread this task is in until the future is done,
  170. # or until the task is cancelled (which implicitly cancels the future).
  171. fut.result()
  172. return task.done
  173. task = core.PythonTask(task_main, 'task_main')
  174. task.set_task_chain(task_chain.name)
  175. task_mgr.add(task)
  176. task_chain.start_threads()
  177. try:
  178. assert not task.done()
  179. fut.cancel()
  180. task.wait()
  181. assert task.cancelled()
  182. assert fut.cancelled()
  183. finally:
  184. task_chain.stop_threads()
  185. def test_task_cancel_awaiting():
  186. task_mgr = core.AsyncTaskManager.get_global_ptr()
  187. task_chain = task_mgr.make_task_chain("test_task_cancel_awaiting")
  188. fut = core.AsyncFuture()
  189. async def task_main(task):
  190. await fut
  191. return task.done
  192. task = core.PythonTask(task_main, 'task_main')
  193. task.set_task_chain(task_chain.name)
  194. task_mgr.add(task)
  195. task_chain.poll()
  196. assert not task.done()
  197. task_chain.poll()
  198. assert not task.done()
  199. task.cancel()
  200. task_chain.poll()
  201. assert task.done()
  202. assert task.cancelled()
  203. assert fut.done()
  204. assert fut.cancelled()
  205. def test_task_result():
  206. task_mgr = core.AsyncTaskManager.get_global_ptr()
  207. task_chain = task_mgr.make_task_chain("test_task_result")
  208. def task_main(task):
  209. task.set_result(42)
  210. # It won't yet be marked done until after it returns.
  211. assert not task.done()
  212. return core.PythonTask.done
  213. task = core.PythonTask(task_main)
  214. task.set_task_chain(task_chain.name)
  215. task_mgr.add(task)
  216. task_chain.wait_for_tasks()
  217. assert task.done()
  218. assert not task.cancelled()
  219. assert task.result() == 42
  220. def test_coro_await_coro():
  221. # Await another coro in a coro.
  222. fut = core.AsyncFuture()
  223. async def coro2():
  224. await fut
  225. async def coro_main():
  226. await coro2()
  227. task = core.PythonTask(coro_main())
  228. task_mgr = core.AsyncTaskManager.get_global_ptr()
  229. task_mgr.add(task)
  230. for i in range(5):
  231. task_mgr.poll()
  232. assert not task.done()
  233. fut.set_result(None)
  234. task_mgr.poll()
  235. assert task.done()
  236. assert not task.cancelled()
  237. def test_coro_await_cancel_resistant_coro():
  238. # Await another coro in a coro, but cancel the outer.
  239. fut = core.AsyncFuture()
  240. cancelled_caught = [0]
  241. keep_going = [False]
  242. async def cancel_resistant_coro():
  243. while not fut.done():
  244. try:
  245. await core.AsyncFuture.shield(fut)
  246. except CancelledError as ex:
  247. cancelled_caught[0] += 1
  248. async def coro_main():
  249. await cancel_resistant_coro()
  250. task = core.PythonTask(coro_main(), 'coro_main')
  251. task_mgr = core.AsyncTaskManager.get_global_ptr()
  252. task_mgr.add(task)
  253. assert not task.done()
  254. task_mgr.poll()
  255. assert not task.done()
  256. # No cancelling it once it started...
  257. for i in range(3):
  258. assert task.cancel()
  259. assert not task.done()
  260. for j in range(3):
  261. task_mgr.poll()
  262. assert not task.done()
  263. assert cancelled_caught[0] == 3
  264. fut.set_result(None)
  265. task_mgr.poll()
  266. assert task.done()
  267. assert not task.cancelled()
  268. def test_coro_await_external():
  269. # Await an external future in a coro.
  270. fut = MockFuture()
  271. fut._result = 12345
  272. res = []
  273. async def coro_main():
  274. res.append(await fut)
  275. task = core.PythonTask(coro_main(), 'coro_main')
  276. task_mgr = core.AsyncTaskManager.get_global_ptr()
  277. task_mgr.add(task)
  278. for i in range(5):
  279. task_mgr.poll()
  280. assert not task.done()
  281. fut._state = 'FINISHED'
  282. task_mgr.poll()
  283. assert task.done()
  284. assert not task.cancelled()
  285. assert res == [12345]
  286. def test_coro_await_external_cancel_inner():
  287. # Cancel external future being awaited by a coro.
  288. fut = MockFuture()
  289. async def coro_main():
  290. await fut
  291. task = core.PythonTask(coro_main(), 'coro_main')
  292. task_mgr = core.AsyncTaskManager.get_global_ptr()
  293. task_mgr.add(task)
  294. for i in range(5):
  295. task_mgr.poll()
  296. assert not task.done()
  297. fut._state = 'CANCELLED'
  298. assert not task.done()
  299. task_mgr.poll()
  300. assert task.done()
  301. assert task.cancelled()
  302. def test_coro_await_external_cancel_outer():
  303. # Cancel task that is awaiting external future.
  304. fut = MockFuture()
  305. result = []
  306. async def coro_main():
  307. result.append(await fut)
  308. task = core.PythonTask(coro_main(), 'coro_main')
  309. task_mgr = core.AsyncTaskManager.get_global_ptr()
  310. task_mgr.add(task)
  311. for i in range(5):
  312. task_mgr.poll()
  313. assert not task.done()
  314. fut._state = 'CANCELLED'
  315. assert not task.done()
  316. task_mgr.poll()
  317. assert task.done()
  318. assert task.cancelled()
  319. def test_coro_exception():
  320. task_mgr = core.AsyncTaskManager.get_global_ptr()
  321. task_chain = task_mgr.make_task_chain("test_coro_exception")
  322. def coro_main():
  323. raise RuntimeError
  324. yield None
  325. task = core.PythonTask(coro_main())
  326. task.set_task_chain(task_chain.name)
  327. task_mgr.add(task)
  328. task_chain.wait_for_tasks()
  329. assert task.done()
  330. assert not task.cancelled()
  331. with pytest.raises(RuntimeError):
  332. task.result()
  333. def test_future_result():
  334. # Cancelled
  335. fut = core.AsyncFuture()
  336. assert not fut.done()
  337. fut.cancel()
  338. with pytest.raises(CancelledError):
  339. fut.result()
  340. # None
  341. fut = core.AsyncFuture()
  342. fut.set_result(None)
  343. assert fut.done()
  344. assert fut.result() is None
  345. # Store int
  346. fut = core.AsyncFuture()
  347. fut.set_result(123)
  348. assert fut.result() == 123
  349. # Store string
  350. fut = core.AsyncFuture()
  351. fut.set_result("test\000\u1234")
  352. assert fut.result() == "test\000\u1234"
  353. # Store TypedWritableReferenceCount
  354. tex = core.Texture()
  355. rc = tex.get_ref_count()
  356. fut = core.AsyncFuture()
  357. fut.set_result(tex)
  358. assert tex.get_ref_count() == rc + 1
  359. assert fut.result() == tex
  360. assert tex.get_ref_count() == rc + 1
  361. assert fut.result() == tex
  362. assert tex.get_ref_count() == rc + 1
  363. fut = None
  364. assert tex.get_ref_count() == rc
  365. # Store EventParameter (no longer gets unwrapped)
  366. ep = core.EventParameter(0.5)
  367. fut = core.AsyncFuture()
  368. fut.set_result(ep)
  369. assert check_result(fut, ep)
  370. assert check_result(fut, ep)
  371. # Store TypedObject
  372. dg = core.Datagram(b"test")
  373. fut = core.AsyncFuture()
  374. fut.set_result(dg)
  375. assert check_result(fut, dg)
  376. assert check_result(fut, dg)
  377. # Store tuple
  378. fut = core.AsyncFuture()
  379. fut.set_result((1, 2))
  380. assert check_result(fut, (1, 2))
  381. # Store arbitrary Python object
  382. obj = object()
  383. rc = sys.getrefcount(obj)
  384. fut = core.AsyncFuture()
  385. fut.set_result(obj)
  386. assert sys.getrefcount(obj) == rc + 1
  387. assert fut.result() is obj
  388. assert sys.getrefcount(obj) == rc + 1
  389. assert fut.result() is obj
  390. assert sys.getrefcount(obj) == rc + 1
  391. fut = None
  392. assert sys.getrefcount(obj) == rc
  393. def test_future_gather():
  394. fut1 = core.AsyncFuture()
  395. fut2 = core.AsyncFuture()
  396. # 0 and 1 arguments are special
  397. assert core.AsyncFuture.gather().done()
  398. assert core.AsyncFuture.gather(fut1) == fut1
  399. # Gathering not-done futures
  400. gather = core.AsyncFuture.gather(fut1, fut2)
  401. assert not gather.done()
  402. # One future done
  403. fut1.set_result(1)
  404. assert not gather.done()
  405. # Two futures done
  406. fut2.set_result(2)
  407. assert gather.done()
  408. assert not gather.cancelled()
  409. assert check_result(gather, [1, 2])
  410. def test_future_gather_cancel_inner():
  411. fut1 = core.AsyncFuture()
  412. fut2 = core.AsyncFuture()
  413. # Gathering not-done futures
  414. gather = core.AsyncFuture.gather(fut1, fut2)
  415. assert not gather.done()
  416. # One future cancelled
  417. fut1.cancel()
  418. assert not gather.done()
  419. # Two futures cancelled
  420. fut2.set_result(2)
  421. assert gather.done()
  422. assert not gather.cancelled()
  423. with pytest.raises(CancelledError):
  424. assert gather.result()
  425. def test_future_gather_cancel_outer():
  426. fut1 = core.AsyncFuture()
  427. fut2 = core.AsyncFuture()
  428. # Gathering not-done futures
  429. gather = core.AsyncFuture.gather(fut1, fut2)
  430. assert not gather.done()
  431. assert gather.cancel()
  432. assert gather.done()
  433. assert gather.cancelled()
  434. with pytest.raises(CancelledError):
  435. assert gather.result()
  436. def test_future_shield():
  437. # An already done future is returned as-is (no cancellation can occur)
  438. inner = core.AsyncFuture()
  439. inner.set_result(None)
  440. outer = core.AsyncFuture.shield(inner)
  441. assert inner == outer
  442. # Normally finishing future
  443. inner = core.AsyncFuture()
  444. outer = core.AsyncFuture.shield(inner)
  445. assert not outer.done()
  446. inner.set_result(None)
  447. assert outer.done()
  448. assert not outer.cancelled()
  449. assert inner.result() is None
  450. # Normally finishing future with result
  451. inner = core.AsyncFuture()
  452. outer = core.AsyncFuture.shield(inner)
  453. assert not outer.done()
  454. inner.set_result(123)
  455. assert outer.done()
  456. assert not outer.cancelled()
  457. assert inner.result() == 123
  458. # Cancelled inner future does propagate cancellation outward
  459. inner = core.AsyncFuture()
  460. outer = core.AsyncFuture.shield(inner)
  461. assert not outer.done()
  462. inner.cancel()
  463. assert outer.done()
  464. assert outer.cancelled()
  465. # Finished outer future does nothing to inner
  466. inner = core.AsyncFuture()
  467. outer = core.AsyncFuture.shield(inner)
  468. outer.set_result(None)
  469. assert not inner.done()
  470. inner.cancel()
  471. assert not outer.cancelled()
  472. # Cancelled outer future does nothing to inner
  473. inner = core.AsyncFuture()
  474. outer = core.AsyncFuture.shield(inner)
  475. outer.cancel()
  476. assert not inner.done()
  477. inner.cancel()
  478. # Can be shielded multiple times
  479. inner = core.AsyncFuture()
  480. outer1 = core.AsyncFuture.shield(inner)
  481. outer2 = core.AsyncFuture.shield(inner)
  482. outer1.cancel()
  483. assert not inner.done()
  484. assert not outer2.done()
  485. inner.cancel()
  486. assert outer1.done()
  487. assert outer2.done()
  488. def test_future_done_callback():
  489. fut = core.AsyncFuture()
  490. # Use the list hack since Python 2 doesn't have the "nonlocal" keyword.
  491. called = [False]
  492. def on_done(arg):
  493. assert arg == fut
  494. called[0] = True
  495. fut.add_done_callback(on_done)
  496. fut.cancel()
  497. assert fut.done()
  498. task_mgr = core.AsyncTaskManager.get_global_ptr()
  499. task_mgr.poll()
  500. assert called[0]
  501. def test_future_done_callback_already_done():
  502. # Same as above, but with the future already done when add_done_callback
  503. # is called.
  504. fut = core.AsyncFuture()
  505. fut.cancel()
  506. assert fut.done()
  507. # Use the list hack since Python 2 doesn't have the "nonlocal" keyword.
  508. called = [False]
  509. def on_done(arg):
  510. assert arg == fut
  511. called[0] = True
  512. fut.add_done_callback(on_done)
  513. task_mgr = core.AsyncTaskManager.get_global_ptr()
  514. task_mgr.poll()
  515. assert called[0]
  516. def test_event_future():
  517. queue = core.EventQueue()
  518. handler = core.EventHandler(queue)
  519. fut = handler.get_future("test")
  520. # If we ask again, we should get the same one.
  521. assert handler.get_future("test") == fut
  522. event = core.Event("test")
  523. handler.dispatch_event(event)
  524. assert fut.done()
  525. assert not fut.cancelled()
  526. assert fut.result() == event
  527. def test_event_future_cancel():
  528. # This is a very strange thing to do, but it's possible, so let's make
  529. # sure it gives defined behavior.
  530. queue = core.EventQueue()
  531. handler = core.EventHandler(queue)
  532. fut = handler.get_future("test")
  533. fut.cancel()
  534. assert fut.done()
  535. assert fut.cancelled()
  536. event = core.Event("test")
  537. handler.dispatch_event(event)
  538. assert fut.done()
  539. assert fut.cancelled()
  540. def test_event_future_cancel2():
  541. queue = core.EventQueue()
  542. handler = core.EventHandler(queue)
  543. # Make sure we get a new future if we cancelled the first one.
  544. fut = handler.get_future("test")
  545. fut.cancel()
  546. fut2 = handler.get_future("test")
  547. assert fut != fut2
  548. assert fut.done()
  549. assert fut.cancelled()
  550. assert not fut2.done()
  551. assert not fut2.cancelled()
  552. def test_task_manager_cleanup_non_panda_future():
  553. future = MockFuture()
  554. # Create a weakref so we can verify the future was cleaned up.
  555. future_ref = weakref.ref(future)
  556. async def coro_main():
  557. return await future
  558. task = core.PythonTask(coro_main(), 'coro_main')
  559. task_mgr = core.AsyncTaskManager.get_global_ptr()
  560. task_mgr.add(task)
  561. # Poll the task_mgr so the PythonTask starts polling on future.done()
  562. task_mgr.poll()
  563. future._result = 'Done123'
  564. future._state = 'DONE'
  565. # Drop our reference to the future, `task` should hold the only reference
  566. del future
  567. # Recognize the future has completed
  568. task_mgr.poll()
  569. # Verify the task was completed.
  570. assert task.result() == 'Done123'
  571. del coro_main # this should break the last strong reference to the mock future
  572. assert future_ref() is None, "MockFuture was not cleaned up!"
  573. def test_await_future_result_cleanup():
  574. # Create a simple future and an object for it to return
  575. future = core.AsyncFuture()
  576. class TestObject:
  577. pass
  578. test_result = TestObject()
  579. future_result_ref = weakref.ref(test_result)
  580. # Setup an async environment and dispatch it to a task chain to await the future
  581. async def coro_main():
  582. nonlocal test_result
  583. future.set_result(test_result)
  584. del test_result
  585. await future
  586. task = core.PythonTask(coro_main(), 'coro_main')
  587. task_mgr = core.AsyncTaskManager.get_global_ptr()
  588. task_mgr.add(task)
  589. # Poll the task_mgr so the PythonTask starts polling on future.done()
  590. task_mgr.poll()
  591. # Break all possible references to the future object
  592. del task
  593. del coro_main
  594. del future # this should break the last strong reference to the future
  595. assert future_result_ref() is None, "TestObject was not cleaned up!"