test_futures.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661
  1. from panda3d import core
  2. import pytest
  3. import time
  4. import sys
  5. if sys.version_info >= (3, 8):
  6. from asyncio.exceptions import TimeoutError, CancelledError
  7. else:
  8. from concurrent.futures._base import TimeoutError, CancelledError
  9. class MockFuture:
  10. _asyncio_future_blocking = False
  11. _state = 'PENDING'
  12. _cancel_return = False
  13. _result = None
  14. def __await__(self):
  15. while self._state == 'PENDING':
  16. yield self
  17. return self.result()
  18. def done(self):
  19. return self._state != 'PENDING'
  20. def cancelled(self):
  21. return self._state == 'CANCELLED'
  22. def cancel(self):
  23. return self._cancel_return
  24. def result(self):
  25. if self._state == 'CANCELLED':
  26. raise CancelledError
  27. return self._result
  28. def test_future_cancelled():
  29. fut = core.AsyncFuture()
  30. assert not fut.done()
  31. assert not fut.cancelled()
  32. fut.cancel()
  33. assert fut.done()
  34. assert fut.cancelled()
  35. with pytest.raises(CancelledError):
  36. fut.result()
  37. # Works more than once
  38. with pytest.raises(CancelledError):
  39. fut.result()
  40. def test_future_timeout():
  41. fut = core.AsyncFuture()
  42. with pytest.raises(TimeoutError):
  43. fut.result(0.001)
  44. # Works more than once
  45. with pytest.raises(TimeoutError):
  46. fut.result(0.001)
  47. @pytest.mark.skipif(not core.Thread.is_threading_supported(),
  48. reason="Threading support disabled")
  49. def test_future_wait():
  50. threading = pytest.importorskip("direct.stdpy.threading")
  51. fut = core.AsyncFuture()
  52. # Launch a thread to set the result value.
  53. def thread_main():
  54. time.sleep(0.001)
  55. fut.set_result(None)
  56. thread = threading.Thread(target=thread_main)
  57. assert not fut.done()
  58. thread.start()
  59. assert fut.result() is None
  60. assert fut.done()
  61. assert not fut.cancelled()
  62. assert fut.result() is None
  63. @pytest.mark.skipif(not core.Thread.is_threading_supported(),
  64. reason="Threading support disabled")
  65. def test_future_wait_cancel():
  66. threading = pytest.importorskip("direct.stdpy.threading")
  67. fut = core.AsyncFuture()
  68. # Launch a thread to cancel the future.
  69. def thread_main():
  70. time.sleep(0.001)
  71. fut.cancel()
  72. thread = threading.Thread(target=thread_main)
  73. assert not fut.done()
  74. thread.start()
  75. with pytest.raises(CancelledError):
  76. fut.result()
  77. assert fut.done()
  78. assert fut.cancelled()
  79. with pytest.raises(CancelledError):
  80. fut.result()
  81. def test_task_cancel():
  82. task_mgr = core.AsyncTaskManager.get_global_ptr()
  83. task = core.PythonTask(lambda task: task.done)
  84. task_mgr.add(task)
  85. assert not task.done()
  86. task_mgr.remove(task)
  87. assert task.done()
  88. assert task.cancelled()
  89. with pytest.raises(CancelledError):
  90. task.result()
  91. def test_task_cancel_during_run():
  92. task_mgr = core.AsyncTaskManager.get_global_ptr()
  93. task_chain = task_mgr.make_task_chain("test_task_cancel_during_run")
  94. def task_main(task):
  95. task.remove()
  96. # It won't yet be marked done until after it returns.
  97. assert not task.done()
  98. return task.done
  99. task = core.PythonTask(task_main)
  100. task.set_task_chain(task_chain.name)
  101. task_mgr.add(task)
  102. task_chain.wait_for_tasks()
  103. assert task.done()
  104. assert task.cancelled()
  105. with pytest.raises(CancelledError):
  106. task.result()
  107. def test_task_cancel_waiting():
  108. # Calling result() in a threaded task chain should cancel the future being
  109. # waited on if the surrounding task is cancelled.
  110. task_mgr = core.AsyncTaskManager.get_global_ptr()
  111. task_chain = task_mgr.make_task_chain("test_task_cancel_waiting")
  112. task_chain.set_num_threads(1)
  113. fut = core.AsyncFuture()
  114. async def task_main(task):
  115. # This will block the thread this task is in until the future is done,
  116. # or until the task is cancelled (which implicitly cancels the future).
  117. fut.result()
  118. return task.done
  119. task = core.PythonTask(task_main, 'task_main')
  120. task.set_task_chain(task_chain.name)
  121. task_mgr.add(task)
  122. task_chain.start_threads()
  123. try:
  124. assert not task.done()
  125. fut.cancel()
  126. task.wait()
  127. assert task.cancelled()
  128. assert fut.cancelled()
  129. finally:
  130. task_chain.stop_threads()
  131. def test_task_cancel_awaiting():
  132. task_mgr = core.AsyncTaskManager.get_global_ptr()
  133. task_chain = task_mgr.make_task_chain("test_task_cancel_awaiting")
  134. fut = core.AsyncFuture()
  135. async def task_main(task):
  136. await fut
  137. return task.done
  138. task = core.PythonTask(task_main, 'task_main')
  139. task.set_task_chain(task_chain.name)
  140. task_mgr.add(task)
  141. task_chain.poll()
  142. assert not task.done()
  143. task_chain.poll()
  144. assert not task.done()
  145. task.cancel()
  146. task_chain.poll()
  147. assert task.done()
  148. assert task.cancelled()
  149. assert fut.done()
  150. assert fut.cancelled()
  151. def test_task_result():
  152. task_mgr = core.AsyncTaskManager.get_global_ptr()
  153. task_chain = task_mgr.make_task_chain("test_task_result")
  154. def task_main(task):
  155. task.set_result(42)
  156. # It won't yet be marked done until after it returns.
  157. assert not task.done()
  158. return core.PythonTask.done
  159. task = core.PythonTask(task_main)
  160. task.set_task_chain(task_chain.name)
  161. task_mgr.add(task)
  162. task_chain.wait_for_tasks()
  163. assert task.done()
  164. assert not task.cancelled()
  165. assert task.result() == 42
  166. def test_coro_await_coro():
  167. # Await another coro in a coro.
  168. fut = core.AsyncFuture()
  169. async def coro2():
  170. await fut
  171. async def coro_main():
  172. await coro2()
  173. task = core.PythonTask(coro_main())
  174. task_mgr = core.AsyncTaskManager.get_global_ptr()
  175. task_mgr.add(task)
  176. for i in range(5):
  177. task_mgr.poll()
  178. assert not task.done()
  179. fut.set_result(None)
  180. task_mgr.poll()
  181. assert task.done()
  182. assert not task.cancelled()
  183. def test_coro_await_cancel_resistant_coro():
  184. # Await another coro in a coro, but cancel the outer.
  185. fut = core.AsyncFuture()
  186. cancelled_caught = [0]
  187. keep_going = [False]
  188. async def cancel_resistant_coro():
  189. while not fut.done():
  190. try:
  191. await core.AsyncFuture.shield(fut)
  192. except CancelledError as ex:
  193. cancelled_caught[0] += 1
  194. async def coro_main():
  195. await cancel_resistant_coro()
  196. task = core.PythonTask(coro_main(), 'coro_main')
  197. task_mgr = core.AsyncTaskManager.get_global_ptr()
  198. task_mgr.add(task)
  199. assert not task.done()
  200. task_mgr.poll()
  201. assert not task.done()
  202. # No cancelling it once it started...
  203. for i in range(3):
  204. assert task.cancel()
  205. assert not task.done()
  206. for j in range(3):
  207. task_mgr.poll()
  208. assert not task.done()
  209. assert cancelled_caught[0] == 3
  210. fut.set_result(None)
  211. task_mgr.poll()
  212. assert task.done()
  213. assert not task.cancelled()
  214. def test_coro_await_external():
  215. # Await an external future in a coro.
  216. fut = MockFuture()
  217. fut._result = 12345
  218. res = []
  219. async def coro_main():
  220. res.append(await fut)
  221. task = core.PythonTask(coro_main(), 'coro_main')
  222. task_mgr = core.AsyncTaskManager.get_global_ptr()
  223. task_mgr.add(task)
  224. for i in range(5):
  225. task_mgr.poll()
  226. assert not task.done()
  227. fut._state = 'FINISHED'
  228. task_mgr.poll()
  229. assert task.done()
  230. assert not task.cancelled()
  231. assert res == [12345]
  232. def test_coro_await_external_cancel_inner():
  233. # Cancel external future being awaited by a coro.
  234. fut = MockFuture()
  235. async def coro_main():
  236. await fut
  237. task = core.PythonTask(coro_main(), 'coro_main')
  238. task_mgr = core.AsyncTaskManager.get_global_ptr()
  239. task_mgr.add(task)
  240. for i in range(5):
  241. task_mgr.poll()
  242. assert not task.done()
  243. fut._state = 'CANCELLED'
  244. assert not task.done()
  245. task_mgr.poll()
  246. assert task.done()
  247. assert task.cancelled()
  248. def test_coro_await_external_cancel_outer():
  249. # Cancel task that is awaiting external future.
  250. fut = MockFuture()
  251. result = []
  252. async def coro_main():
  253. result.append(await fut)
  254. task = core.PythonTask(coro_main(), 'coro_main')
  255. task_mgr = core.AsyncTaskManager.get_global_ptr()
  256. task_mgr.add(task)
  257. for i in range(5):
  258. task_mgr.poll()
  259. assert not task.done()
  260. fut._state = 'CANCELLED'
  261. assert not task.done()
  262. task_mgr.poll()
  263. assert task.done()
  264. assert task.cancelled()
  265. def test_coro_exception():
  266. task_mgr = core.AsyncTaskManager.get_global_ptr()
  267. task_chain = task_mgr.make_task_chain("test_coro_exception")
  268. def coro_main():
  269. raise RuntimeError
  270. yield None
  271. task = core.PythonTask(coro_main())
  272. task.set_task_chain(task_chain.name)
  273. task_mgr.add(task)
  274. task_chain.wait_for_tasks()
  275. assert task.done()
  276. assert not task.cancelled()
  277. with pytest.raises(RuntimeError):
  278. task.result()
  279. def test_future_result():
  280. # Cancelled
  281. fut = core.AsyncFuture()
  282. assert not fut.done()
  283. fut.cancel()
  284. with pytest.raises(CancelledError):
  285. fut.result()
  286. # None
  287. fut = core.AsyncFuture()
  288. fut.set_result(None)
  289. assert fut.done()
  290. assert fut.result() is None
  291. # Store int
  292. fut = core.AsyncFuture()
  293. fut.set_result(123)
  294. assert fut.result() == 123
  295. # Store string
  296. fut = core.AsyncFuture()
  297. fut.set_result("test\000\u1234")
  298. assert fut.result() == "test\000\u1234"
  299. # Store TypedWritableReferenceCount
  300. tex = core.Texture()
  301. rc = tex.get_ref_count()
  302. fut = core.AsyncFuture()
  303. fut.set_result(tex)
  304. assert tex.get_ref_count() == rc + 1
  305. assert fut.result() == tex
  306. assert tex.get_ref_count() == rc + 1
  307. assert fut.result() == tex
  308. assert tex.get_ref_count() == rc + 1
  309. fut = None
  310. assert tex.get_ref_count() == rc
  311. # Store EventParameter (no longer gets unwrapped)
  312. ep = core.EventParameter(0.5)
  313. fut = core.AsyncFuture()
  314. fut.set_result(ep)
  315. assert fut.result() is ep
  316. assert fut.result() is ep
  317. # Store TypedObject
  318. dg = core.Datagram(b"test")
  319. fut = core.AsyncFuture()
  320. fut.set_result(dg)
  321. assert fut.result() == dg
  322. assert fut.result() == dg
  323. # Store arbitrary Python object
  324. obj = object()
  325. rc = sys.getrefcount(obj)
  326. fut = core.AsyncFuture()
  327. fut.set_result(obj)
  328. assert sys.getrefcount(obj) == rc + 1
  329. assert fut.result() is obj
  330. assert sys.getrefcount(obj) == rc + 1
  331. assert fut.result() is obj
  332. assert sys.getrefcount(obj) == rc + 1
  333. fut = None
  334. assert sys.getrefcount(obj) == rc
  335. def test_future_gather():
  336. fut1 = core.AsyncFuture()
  337. fut2 = core.AsyncFuture()
  338. # 0 and 1 arguments are special
  339. assert core.AsyncFuture.gather().done()
  340. assert core.AsyncFuture.gather(fut1) == fut1
  341. # Gathering not-done futures
  342. gather = core.AsyncFuture.gather(fut1, fut2)
  343. assert not gather.done()
  344. # One future done
  345. fut1.set_result(1)
  346. assert not gather.done()
  347. # Two futures done
  348. fut2.set_result(2)
  349. assert gather.done()
  350. assert not gather.cancelled()
  351. assert tuple(gather.result()) == (1, 2)
  352. def test_future_gather_cancel_inner():
  353. fut1 = core.AsyncFuture()
  354. fut2 = core.AsyncFuture()
  355. # Gathering not-done futures
  356. gather = core.AsyncFuture.gather(fut1, fut2)
  357. assert not gather.done()
  358. # One future cancelled
  359. fut1.cancel()
  360. assert not gather.done()
  361. # Two futures cancelled
  362. fut2.set_result(2)
  363. assert gather.done()
  364. assert not gather.cancelled()
  365. with pytest.raises(CancelledError):
  366. assert gather.result()
  367. def test_future_gather_cancel_outer():
  368. fut1 = core.AsyncFuture()
  369. fut2 = core.AsyncFuture()
  370. # Gathering not-done futures
  371. gather = core.AsyncFuture.gather(fut1, fut2)
  372. assert not gather.done()
  373. assert gather.cancel()
  374. assert gather.done()
  375. assert gather.cancelled()
  376. with pytest.raises(CancelledError):
  377. assert gather.result()
  378. def test_future_shield():
  379. # An already done future is returned as-is (no cancellation can occur)
  380. inner = core.AsyncFuture()
  381. inner.set_result(None)
  382. outer = core.AsyncFuture.shield(inner)
  383. assert inner == outer
  384. # Normally finishing future
  385. inner = core.AsyncFuture()
  386. outer = core.AsyncFuture.shield(inner)
  387. assert not outer.done()
  388. inner.set_result(None)
  389. assert outer.done()
  390. assert not outer.cancelled()
  391. assert inner.result() is None
  392. # Normally finishing future with result
  393. inner = core.AsyncFuture()
  394. outer = core.AsyncFuture.shield(inner)
  395. assert not outer.done()
  396. inner.set_result(123)
  397. assert outer.done()
  398. assert not outer.cancelled()
  399. assert inner.result() == 123
  400. # Cancelled inner future does propagate cancellation outward
  401. inner = core.AsyncFuture()
  402. outer = core.AsyncFuture.shield(inner)
  403. assert not outer.done()
  404. inner.cancel()
  405. assert outer.done()
  406. assert outer.cancelled()
  407. # Finished outer future does nothing to inner
  408. inner = core.AsyncFuture()
  409. outer = core.AsyncFuture.shield(inner)
  410. outer.set_result(None)
  411. assert not inner.done()
  412. inner.cancel()
  413. assert not outer.cancelled()
  414. # Cancelled outer future does nothing to inner
  415. inner = core.AsyncFuture()
  416. outer = core.AsyncFuture.shield(inner)
  417. outer.cancel()
  418. assert not inner.done()
  419. inner.cancel()
  420. # Can be shielded multiple times
  421. inner = core.AsyncFuture()
  422. outer1 = core.AsyncFuture.shield(inner)
  423. outer2 = core.AsyncFuture.shield(inner)
  424. outer1.cancel()
  425. assert not inner.done()
  426. assert not outer2.done()
  427. inner.cancel()
  428. assert outer1.done()
  429. assert outer2.done()
  430. def test_future_done_callback():
  431. fut = core.AsyncFuture()
  432. # Use the list hack since Python 2 doesn't have the "nonlocal" keyword.
  433. called = [False]
  434. def on_done(arg):
  435. assert arg == fut
  436. called[0] = True
  437. fut.add_done_callback(on_done)
  438. fut.cancel()
  439. assert fut.done()
  440. task_mgr = core.AsyncTaskManager.get_global_ptr()
  441. task_mgr.poll()
  442. assert called[0]
  443. def test_future_done_callback_already_done():
  444. # Same as above, but with the future already done when add_done_callback
  445. # is called.
  446. fut = core.AsyncFuture()
  447. fut.cancel()
  448. assert fut.done()
  449. # Use the list hack since Python 2 doesn't have the "nonlocal" keyword.
  450. called = [False]
  451. def on_done(arg):
  452. assert arg == fut
  453. called[0] = True
  454. fut.add_done_callback(on_done)
  455. task_mgr = core.AsyncTaskManager.get_global_ptr()
  456. task_mgr.poll()
  457. assert called[0]
  458. def test_event_future():
  459. queue = core.EventQueue()
  460. handler = core.EventHandler(queue)
  461. fut = handler.get_future("test")
  462. # If we ask again, we should get the same one.
  463. assert handler.get_future("test") == fut
  464. event = core.Event("test")
  465. handler.dispatch_event(event)
  466. assert fut.done()
  467. assert not fut.cancelled()
  468. assert fut.result() == event
  469. def test_event_future_cancel():
  470. # This is a very strange thing to do, but it's possible, so let's make
  471. # sure it gives defined behavior.
  472. queue = core.EventQueue()
  473. handler = core.EventHandler(queue)
  474. fut = handler.get_future("test")
  475. fut.cancel()
  476. assert fut.done()
  477. assert fut.cancelled()
  478. event = core.Event("test")
  479. handler.dispatch_event(event)
  480. assert fut.done()
  481. assert fut.cancelled()
  482. def test_event_future_cancel2():
  483. queue = core.EventQueue()
  484. handler = core.EventHandler(queue)
  485. # Make sure we get a new future if we cancelled the first one.
  486. fut = handler.get_future("test")
  487. fut.cancel()
  488. fut2 = handler.get_future("test")
  489. assert fut != fut2
  490. assert fut.done()
  491. assert fut.cancelled()
  492. assert not fut2.done()
  493. assert not fut2.cancelled()