test_futures.py 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315
  1. from panda3d import core
  2. import pytest
  3. import time
  4. import sys
  5. if sys.version_info >= (3, 8):
  6. from asyncio.exceptions import TimeoutError, CancelledError
  7. else:
  8. from concurrent.futures._base import TimeoutError, CancelledError
  9. def test_future_cancelled():
  10. fut = core.AsyncFuture()
  11. assert not fut.done()
  12. assert not fut.cancelled()
  13. fut.cancel()
  14. assert fut.done()
  15. assert fut.cancelled()
  16. with pytest.raises(CancelledError):
  17. fut.result()
  18. # Works more than once
  19. with pytest.raises(CancelledError):
  20. fut.result()
  21. def test_future_timeout():
  22. fut = core.AsyncFuture()
  23. with pytest.raises(TimeoutError):
  24. fut.result(0.001)
  25. # Works more than once
  26. with pytest.raises(TimeoutError):
  27. fut.result(0.001)
  28. @pytest.mark.skipif(not core.Thread.is_threading_supported(),
  29. reason="Threading support disabled")
  30. def test_future_wait():
  31. threading = pytest.importorskip("direct.stdpy.threading")
  32. fut = core.AsyncFuture()
  33. # Launch a thread to set the result value.
  34. def thread_main():
  35. time.sleep(0.001)
  36. fut.set_result(None)
  37. thread = threading.Thread(target=thread_main)
  38. assert not fut.done()
  39. thread.start()
  40. assert fut.result() is None
  41. assert fut.done()
  42. assert not fut.cancelled()
  43. assert fut.result() is None
  44. @pytest.mark.skipif(not core.Thread.is_threading_supported(),
  45. reason="Threading support disabled")
  46. def test_future_wait_cancel():
  47. threading = pytest.importorskip("direct.stdpy.threading")
  48. fut = core.AsyncFuture()
  49. # Launch a thread to cancel the future.
  50. def thread_main():
  51. time.sleep(0.001)
  52. fut.cancel()
  53. thread = threading.Thread(target=thread_main)
  54. assert not fut.done()
  55. thread.start()
  56. with pytest.raises(CancelledError):
  57. fut.result()
  58. assert fut.done()
  59. assert fut.cancelled()
  60. with pytest.raises(CancelledError):
  61. fut.result()
  62. def test_task_cancel():
  63. task_mgr = core.AsyncTaskManager.get_global_ptr()
  64. task = core.PythonTask(lambda task: task.done)
  65. task_mgr.add(task)
  66. assert not task.done()
  67. task_mgr.remove(task)
  68. assert task.done()
  69. assert task.cancelled()
  70. with pytest.raises(CancelledError):
  71. task.result()
  72. def test_task_cancel_during_run():
  73. task_mgr = core.AsyncTaskManager.get_global_ptr()
  74. task_chain = task_mgr.make_task_chain("test_task_cancel_during_run")
  75. def task_main(task):
  76. task.remove()
  77. # It won't yet be marked done until after it returns.
  78. assert not task.done()
  79. return task.done
  80. task = core.PythonTask(task_main)
  81. task.set_task_chain(task_chain.name)
  82. task_mgr.add(task)
  83. task_chain.wait_for_tasks()
  84. assert task.done()
  85. assert task.cancelled()
  86. with pytest.raises(CancelledError):
  87. task.result()
  88. def test_task_result():
  89. task_mgr = core.AsyncTaskManager.get_global_ptr()
  90. task_chain = task_mgr.make_task_chain("test_task_result")
  91. def task_main(task):
  92. task.set_result(42)
  93. # It won't yet be marked done until after it returns.
  94. assert not task.done()
  95. return core.PythonTask.done
  96. task = core.PythonTask(task_main)
  97. task.set_task_chain(task_chain.name)
  98. task_mgr.add(task)
  99. task_chain.wait_for_tasks()
  100. assert task.done()
  101. assert not task.cancelled()
  102. assert task.result() == 42
  103. def test_coro_exception():
  104. task_mgr = core.AsyncTaskManager.get_global_ptr()
  105. task_chain = task_mgr.make_task_chain("test_coro_exception")
  106. def coro_main():
  107. raise RuntimeError
  108. yield None
  109. task = core.PythonTask(coro_main())
  110. task.set_task_chain(task_chain.name)
  111. task_mgr.add(task)
  112. task_chain.wait_for_tasks()
  113. assert task.done()
  114. assert not task.cancelled()
  115. with pytest.raises(RuntimeError):
  116. task.result()
  117. def test_future_gather():
  118. fut1 = core.AsyncFuture()
  119. fut2 = core.AsyncFuture()
  120. # 0 and 1 arguments are special
  121. assert core.AsyncFuture.gather().done()
  122. assert core.AsyncFuture.gather(fut1) == fut1
  123. # Gathering not-done futures
  124. gather = core.AsyncFuture.gather(fut1, fut2)
  125. assert not gather.done()
  126. # One future done
  127. fut1.set_result(1)
  128. assert not gather.done()
  129. # Two futures done
  130. fut2.set_result(2)
  131. assert gather.done()
  132. assert not gather.cancelled()
  133. assert tuple(gather.result()) == (1, 2)
  134. def test_future_gather_cancel_inner():
  135. fut1 = core.AsyncFuture()
  136. fut2 = core.AsyncFuture()
  137. # Gathering not-done futures
  138. gather = core.AsyncFuture.gather(fut1, fut2)
  139. assert not gather.done()
  140. # One future cancelled
  141. fut1.cancel()
  142. assert not gather.done()
  143. # Two futures cancelled
  144. fut2.set_result(2)
  145. assert gather.done()
  146. assert not gather.cancelled()
  147. with pytest.raises(CancelledError):
  148. assert gather.result()
  149. def test_future_gather_cancel_outer():
  150. fut1 = core.AsyncFuture()
  151. fut2 = core.AsyncFuture()
  152. # Gathering not-done futures
  153. gather = core.AsyncFuture.gather(fut1, fut2)
  154. assert not gather.done()
  155. assert gather.cancel()
  156. assert gather.done()
  157. assert gather.cancelled()
  158. with pytest.raises(CancelledError):
  159. assert gather.result()
  160. def test_future_done_callback():
  161. fut = core.AsyncFuture()
  162. # Use the list hack since Python 2 doesn't have the "nonlocal" keyword.
  163. called = [False]
  164. def on_done(arg):
  165. assert arg == fut
  166. called[0] = True
  167. fut.add_done_callback(on_done)
  168. fut.cancel()
  169. assert fut.done()
  170. task_mgr = core.AsyncTaskManager.get_global_ptr()
  171. task_mgr.poll()
  172. assert called[0]
  173. def test_future_done_callback_already_done():
  174. # Same as above, but with the future already done when add_done_callback
  175. # is called.
  176. fut = core.AsyncFuture()
  177. fut.cancel()
  178. assert fut.done()
  179. # Use the list hack since Python 2 doesn't have the "nonlocal" keyword.
  180. called = [False]
  181. def on_done(arg):
  182. assert arg == fut
  183. called[0] = True
  184. fut.add_done_callback(on_done)
  185. task_mgr = core.AsyncTaskManager.get_global_ptr()
  186. task_mgr.poll()
  187. assert called[0]
  188. def test_event_future():
  189. queue = core.EventQueue()
  190. handler = core.EventHandler(queue)
  191. fut = handler.get_future("test")
  192. # If we ask again, we should get the same one.
  193. assert handler.get_future("test") == fut
  194. event = core.Event("test")
  195. handler.dispatch_event(event)
  196. assert fut.done()
  197. assert not fut.cancelled()
  198. assert fut.result() == event
  199. def test_event_future_cancel():
  200. # This is a very strange thing to do, but it's possible, so let's make
  201. # sure it gives defined behavior.
  202. queue = core.EventQueue()
  203. handler = core.EventHandler(queue)
  204. fut = handler.get_future("test")
  205. fut.cancel()
  206. assert fut.done()
  207. assert fut.cancelled()
  208. event = core.Event("test")
  209. handler.dispatch_event(event)
  210. assert fut.done()
  211. assert fut.cancelled()
  212. def test_event_future_cancel2():
  213. queue = core.EventQueue()
  214. handler = core.EventHandler(queue)
  215. # Make sure we get a new future if we cancelled the first one.
  216. fut = handler.get_future("test")
  217. fut.cancel()
  218. fut2 = handler.get_future("test")
  219. assert fut != fut2
  220. assert fut.done()
  221. assert fut.cancelled()
  222. assert not fut2.done()
  223. assert not fut2.cancelled()