test_Task.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592
  1. import pytest
  2. from panda3d import core
  3. from direct.task import Task
  4. TASK_NAME = 'Arbitrary task name'
  5. TASK_CHAIN_NAME = 'Arbitrary task chain name'
  6. def DUMMY_FUNCTION(*_):
  7. pass
  8. @pytest.fixture
  9. def task_manager():
  10. manager = Task.TaskManager()
  11. manager.mgr = core.AsyncTaskManager('Test manager')
  12. manager.clock = core.ClockObject()
  13. manager.setupTaskChain('default', tickClock=True)
  14. manager.finalInit()
  15. yield manager
  16. manager.destroy()
  17. def test_sequence(task_manager):
  18. numbers = []
  19. def append_1(task):
  20. numbers.append(1)
  21. def append_2(task):
  22. numbers.append(2)
  23. sequence = Task.sequence(core.PythonTask(append_1), core.PythonTask(append_2))
  24. task_manager.add(sequence)
  25. for _ in range(3):
  26. task_manager.step()
  27. assert not task_manager.getTasks()
  28. assert numbers == [1, 2]
  29. def test_loop(task_manager):
  30. numbers = []
  31. def append_1(task):
  32. numbers.append(1)
  33. def append_2(task):
  34. numbers.append(2)
  35. loop = Task.loop(core.PythonTask(append_1), core.PythonTask(append_2))
  36. task_manager.add(loop)
  37. for _ in range(5):
  38. task_manager.step()
  39. assert numbers == [1, 2, 1, 2]
  40. def test_get_current_task(task_manager):
  41. def check_current_task(task):
  42. assert task_manager.getCurrentTask().name == TASK_NAME
  43. task_manager.add(check_current_task, TASK_NAME)
  44. assert len(task_manager.getTasks()) == 1
  45. assert task_manager.getCurrentTask() is None
  46. task_manager.step()
  47. assert len(task_manager.getTasks()) == 0
  48. assert task_manager.getCurrentTask() is None
  49. def test_has_task_chain(task_manager):
  50. assert not task_manager.hasTaskChain(TASK_CHAIN_NAME)
  51. task_manager.setupTaskChain(TASK_CHAIN_NAME)
  52. assert task_manager.hasTaskChain(TASK_CHAIN_NAME)
  53. def test_done(task_manager):
  54. # run-once task
  55. tm = task_manager
  56. l = []
  57. def _testDone(task, l=l):
  58. l.append(None)
  59. return task.done
  60. tm.add(_testDone, 'testDone')
  61. tm.step()
  62. assert len(l) == 1
  63. tm.step()
  64. assert len(l) == 1
  65. def test_remove_by_name(task_manager):
  66. # remove by name
  67. tm = task_manager
  68. def _testRemoveByName(task):
  69. return task.done
  70. tm.add(_testRemoveByName, 'testRemoveByName')
  71. assert tm.remove('testRemoveByName') == 1
  72. assert tm.remove('testRemoveByName') == 0
  73. def test_duplicate_named_tasks(task_manager):
  74. # duplicate named tasks
  75. tm = task_manager
  76. def _testDupNamedTasks(task):
  77. return task.done
  78. tm.add(_testDupNamedTasks, 'testDupNamedTasks')
  79. tm.add(_testDupNamedTasks, 'testDupNamedTasks')
  80. assert tm.remove('testRemoveByName') == 0
  81. def test_continued_task(task_manager):
  82. # continued task
  83. tm = task_manager
  84. l = []
  85. def _testCont(task, l = l):
  86. l.append(None)
  87. return task.cont
  88. tm.add(_testCont, 'testCont')
  89. tm.step()
  90. assert len(l) == 1
  91. tm.step()
  92. assert len(l) == 2
  93. tm.remove('testCont')
  94. def test_continue_until_done(task_manager):
  95. # continue until done task
  96. tm = task_manager
  97. l = []
  98. def _testContDone(task, l = l):
  99. l.append(None)
  100. if len(l) >= 2:
  101. return task.done
  102. else:
  103. return task.cont
  104. tm.add(_testContDone, 'testContDone')
  105. tm.step()
  106. assert len(l) == 1
  107. tm.step()
  108. assert len(l) == 2
  109. tm.step()
  110. assert len(l) == 2
  111. assert not tm.hasTaskNamed('testContDone')
  112. def test_has_task_named(task_manager):
  113. # hasTaskNamed
  114. tm = task_manager
  115. def _testHasTaskNamed(task):
  116. return task.done
  117. tm.add(_testHasTaskNamed, 'testHasTaskNamed')
  118. assert tm.hasTaskNamed('testHasTaskNamed')
  119. tm.step()
  120. assert not tm.hasTaskNamed('testHasTaskNamed')
  121. def test_task_sort(task_manager):
  122. # task sort
  123. tm = task_manager
  124. l = []
  125. def _testPri1(task, l = l):
  126. l.append(1)
  127. return task.cont
  128. def _testPri2(task, l = l):
  129. l.append(2)
  130. return task.cont
  131. tm.add(_testPri1, 'testPri1', sort = 1)
  132. tm.add(_testPri2, 'testPri2', sort = 2)
  133. tm.step()
  134. assert len(l) == 2
  135. assert l == [1, 2,]
  136. tm.step()
  137. assert len(l) == 4
  138. assert l == [1, 2, 1, 2,]
  139. tm.remove('testPri1')
  140. tm.remove('testPri2')
  141. def test_extra_args(task_manager):
  142. # task extraArgs
  143. tm = task_manager
  144. l = []
  145. def _testExtraArgs(arg1, arg2, l=l):
  146. l.extend([arg1, arg2,])
  147. return Task.done
  148. tm.add(_testExtraArgs, 'testExtraArgs', extraArgs=[4,5])
  149. tm.step()
  150. assert len(l) == 2
  151. assert l == [4, 5,]
  152. def test_append_task(task_manager):
  153. # task appendTask
  154. tm = task_manager
  155. l = []
  156. def _testAppendTask(arg1, arg2, task, l=l):
  157. l.extend([arg1, arg2,])
  158. return task.done
  159. tm.add(_testAppendTask, '_testAppendTask', extraArgs=[4,5], appendTask=True)
  160. tm.step()
  161. assert len(l) == 2
  162. assert l == [4, 5,]
  163. def test_task_upon_death(task_manager):
  164. # task uponDeath
  165. tm = task_manager
  166. l = []
  167. def _uponDeathFunc(task, l=l):
  168. l.append(task.name)
  169. def _testUponDeath(task):
  170. return Task.done
  171. tm.add(_testUponDeath, 'testUponDeath', uponDeath=_uponDeathFunc)
  172. tm.step()
  173. assert len(l) == 1
  174. assert l == ['testUponDeath']
  175. def test_task_owner(task_manager):
  176. # task owner
  177. tm = task_manager
  178. class _TaskOwner:
  179. def _addTask(self, task):
  180. self.addedTaskName = task.name
  181. def _clearTask(self, task):
  182. self.clearedTaskName = task.name
  183. to = _TaskOwner()
  184. l = []
  185. def _testOwner(task):
  186. return Task.done
  187. tm.add(_testOwner, 'testOwner', owner=to)
  188. tm.step()
  189. assert getattr(to, 'addedTaskName', None) == 'testOwner'
  190. assert getattr(to, 'clearedTaskName', None) == 'testOwner'
  191. def test_do_laters(task_manager):
  192. tm = task_manager
  193. doLaterTests = [0,]
  194. # doLater
  195. l = []
  196. def _testDoLater1(task, l=l):
  197. l.append(1)
  198. def _testDoLater2(task, l=l):
  199. l.append(2)
  200. def _monitorDoLater(task, tm=tm, l=l, doLaterTests=doLaterTests):
  201. if task.time > .03:
  202. assert l == [1, 2,]
  203. doLaterTests[0] -= 1
  204. return task.done
  205. return task.cont
  206. tm.doMethodLater(.01, _testDoLater1, 'testDoLater1')
  207. tm.doMethodLater(.02, _testDoLater2, 'testDoLater2')
  208. doLaterTests[0] += 1
  209. # make sure we run this task after the doLaters if they all occur on the same frame
  210. tm.add(_monitorDoLater, 'monitorDoLater', sort=10)
  211. _testDoLater1 = None
  212. _testDoLater2 = None
  213. _monitorDoLater = None
  214. # doLater sort
  215. l = []
  216. def _testDoLaterPri1(task, l=l):
  217. l.append(1)
  218. def _testDoLaterPri2(task, l=l):
  219. l.append(2)
  220. def _monitorDoLaterPri(task, tm=tm, l=l, doLaterTests=doLaterTests):
  221. if task.time > .02:
  222. assert l == [1, 2,]
  223. doLaterTests[0] -= 1
  224. return task.done
  225. return task.cont
  226. tm.doMethodLater(.01, _testDoLaterPri1, 'testDoLaterPri1', sort=1)
  227. tm.doMethodLater(.01, _testDoLaterPri2, 'testDoLaterPri2', sort=2)
  228. doLaterTests[0] += 1
  229. # make sure we run this task after the doLaters if they all occur on the same frame
  230. tm.add(_monitorDoLaterPri, 'monitorDoLaterPri', sort=10)
  231. _testDoLaterPri1 = None
  232. _testDoLaterPri2 = None
  233. _monitorDoLaterPri = None
  234. # doLater extraArgs
  235. l = []
  236. def _testDoLaterExtraArgs(arg1, l=l):
  237. l.append(arg1)
  238. def _monitorDoLaterExtraArgs(task, tm=tm, l=l, doLaterTests=doLaterTests):
  239. if task.time > .02:
  240. assert l == [3,]
  241. doLaterTests[0] -= 1
  242. return task.done
  243. return task.cont
  244. tm.doMethodLater(.01, _testDoLaterExtraArgs, 'testDoLaterExtraArgs', extraArgs=[3,])
  245. doLaterTests[0] += 1
  246. # make sure we run this task after the doLaters if they all occur on the same frame
  247. tm.add(_monitorDoLaterExtraArgs, 'monitorDoLaterExtraArgs', sort=10)
  248. _testDoLaterExtraArgs = None
  249. _monitorDoLaterExtraArgs = None
  250. # doLater appendTask
  251. l = []
  252. def _testDoLaterAppendTask(arg1, task, l=l):
  253. assert task.name == 'testDoLaterAppendTask'
  254. l.append(arg1)
  255. def _monitorDoLaterAppendTask(task, tm=tm, l=l, doLaterTests=doLaterTests):
  256. if task.time > .02:
  257. assert l == [4,]
  258. doLaterTests[0] -= 1
  259. return task.done
  260. return task.cont
  261. tm.doMethodLater(.01, _testDoLaterAppendTask, 'testDoLaterAppendTask',
  262. extraArgs=[4,], appendTask=True)
  263. doLaterTests[0] += 1
  264. # make sure we run this task after the doLaters if they all occur on the same frame
  265. tm.add(_monitorDoLaterAppendTask, 'monitorDoLaterAppendTask', sort=10)
  266. _testDoLaterAppendTask = None
  267. _monitorDoLaterAppendTask = None
  268. # doLater uponDeath
  269. l = []
  270. def _testUponDeathFunc(task, l=l):
  271. assert task.name == 'testDoLaterUponDeath'
  272. l.append(10)
  273. def _testDoLaterUponDeath(arg1, l=l):
  274. return Task.done
  275. def _monitorDoLaterUponDeath(task, tm=tm, l=l, doLaterTests=doLaterTests):
  276. if task.time > .02:
  277. assert l == [10,]
  278. doLaterTests[0] -= 1
  279. return task.done
  280. return task.cont
  281. tm.doMethodLater(.01, _testDoLaterUponDeath, 'testDoLaterUponDeath',
  282. uponDeath=_testUponDeathFunc)
  283. doLaterTests[0] += 1
  284. # make sure we run this task after the doLaters if they all occur on the same frame
  285. tm.add(_monitorDoLaterUponDeath, 'monitorDoLaterUponDeath', sort=10)
  286. _testUponDeathFunc = None
  287. _testDoLaterUponDeath = None
  288. _monitorDoLaterUponDeath = None
  289. # doLater owner
  290. class _DoLaterOwner:
  291. def _addTask(self, task):
  292. self.addedTaskName = task.name
  293. def _clearTask(self, task):
  294. self.clearedTaskName = task.name
  295. doLaterOwner = _DoLaterOwner()
  296. l = []
  297. def _testDoLaterOwner(l=l):
  298. pass
  299. def _monitorDoLaterOwner(task, tm=tm, l=l, doLaterOwner=doLaterOwner,
  300. doLaterTests=doLaterTests):
  301. if task.time > .02:
  302. assert getattr(doLaterOwner, 'addedTaskName', None) == 'testDoLaterOwner'
  303. assert getattr(doLaterOwner, 'clearedTaskName', None) == 'testDoLaterOwner'
  304. doLaterTests[0] -= 1
  305. return task.done
  306. return task.cont
  307. tm.doMethodLater(.01, _testDoLaterOwner, 'testDoLaterOwner',
  308. owner=doLaterOwner)
  309. doLaterTests[0] += 1
  310. # make sure we run this task after the doLaters if they all occur on the same frame
  311. tm.add(_monitorDoLaterOwner, 'monitorDoLaterOwner', sort=10)
  312. _testDoLaterOwner = None
  313. _monitorDoLaterOwner = None
  314. del doLaterOwner
  315. _DoLaterOwner = None
  316. # run the doLater tests
  317. while doLaterTests[0] > 0:
  318. tm.step()
  319. del doLaterTests
  320. def test_get_tasks(task_manager):
  321. tm = task_manager
  322. # getTasks
  323. def _testGetTasks(task):
  324. return task.cont
  325. # No doLaterProcessor in the new world.
  326. assert len(tm.getTasks()) == 0
  327. tm.add(_testGetTasks, 'testGetTasks1')
  328. assert len(tm.getTasks()) == 1
  329. assert (tm.getTasks()[0].name == 'testGetTasks1' or
  330. tm.getTasks()[1].name == 'testGetTasks1')
  331. tm.add(_testGetTasks, 'testGetTasks2')
  332. tm.add(_testGetTasks, 'testGetTasks3')
  333. assert len(tm.getTasks()) == 3
  334. tm.remove('testGetTasks2')
  335. assert len(tm.getTasks()) == 2
  336. tm.remove('testGetTasks1')
  337. tm.remove('testGetTasks3')
  338. assert len(tm.getTasks()) == 0
  339. def test_get_do_laters(task_manager):
  340. tm = task_manager
  341. # getDoLaters
  342. def _testGetDoLaters():
  343. pass
  344. assert len(tm.getDoLaters()) == 0
  345. tm.doMethodLater(.1, _testGetDoLaters, 'testDoLater1')
  346. assert len(tm.getDoLaters()) == 1
  347. assert tm.getDoLaters()[0].name == 'testDoLater1'
  348. tm.doMethodLater(.1, _testGetDoLaters, 'testDoLater2')
  349. tm.doMethodLater(.1, _testGetDoLaters, 'testDoLater3')
  350. assert len(tm.getDoLaters()) == 3
  351. tm.remove('testDoLater2')
  352. assert len(tm.getDoLaters()) == 2
  353. tm.remove('testDoLater1')
  354. tm.remove('testDoLater3')
  355. assert len(tm.getDoLaters()) == 0
  356. def test_get_all_tasks(task_manager):
  357. active_task = task_manager.add(DUMMY_FUNCTION, delay=None)
  358. sleeping_task = task_manager.add(DUMMY_FUNCTION, delay=1)
  359. assert task_manager.getTasks() == [active_task]
  360. assert task_manager.getDoLaters() == [sleeping_task]
  361. assert task_manager.getAllTasks() in ([active_task, sleeping_task], [sleeping_task, active_task])
  362. def test_duplicate_named_do_laters(task_manager):
  363. tm = task_manager
  364. # duplicate named doLaters removed via taskMgr.remove
  365. def _testDupNameDoLaters():
  366. pass
  367. # the doLaterProcessor is always running
  368. tm.doMethodLater(.1, _testDupNameDoLaters, 'testDupNameDoLater')
  369. tm.doMethodLater(.1, _testDupNameDoLaters, 'testDupNameDoLater')
  370. assert len(tm.getDoLaters()) == 2
  371. tm.remove('testDupNameDoLater')
  372. assert len(tm.getDoLaters()) == 0
  373. def test_duplicate_named_do_laters_remove(task_manager):
  374. tm = task_manager
  375. # duplicate named doLaters removed via remove()
  376. def _testDupNameDoLatersRemove():
  377. pass
  378. # the doLaterProcessor is always running
  379. dl1 = tm.doMethodLater(.1, _testDupNameDoLatersRemove, 'testDupNameDoLaterRemove')
  380. dl2 = tm.doMethodLater(.1, _testDupNameDoLatersRemove, 'testDupNameDoLaterRemove')
  381. assert len(tm.getDoLaters()) == 2
  382. dl2.remove()
  383. assert len(tm.getDoLaters()) == 1
  384. dl1.remove()
  385. assert len(tm.getDoLaters()) == 0
  386. def test_get_tasks_named(task_manager):
  387. tm = task_manager
  388. # getTasksNamed
  389. def _testGetTasksNamed(task):
  390. return task.cont
  391. assert len(tm.getTasksNamed('testGetTasksNamed')) == 0
  392. tm.add(_testGetTasksNamed, 'testGetTasksNamed')
  393. assert len(tm.getTasksNamed('testGetTasksNamed')) == 1
  394. assert tm.getTasksNamed('testGetTasksNamed')[0].name == 'testGetTasksNamed'
  395. tm.add(_testGetTasksNamed, 'testGetTasksNamed')
  396. tm.add(_testGetTasksNamed, 'testGetTasksNamed')
  397. assert len(tm.getTasksNamed('testGetTasksNamed')) == 3
  398. tm.remove('testGetTasksNamed')
  399. assert len(tm.getTasksNamed('testGetTasksNamed')) == 0
  400. def test_get_tasks_matching(task_manager):
  401. task_manager.add(DUMMY_FUNCTION, 'task_1')
  402. task_manager.add(DUMMY_FUNCTION, 'task_2')
  403. task_manager.add(DUMMY_FUNCTION, 'another_task')
  404. assert len(task_manager.getTasksMatching('task_?')) == 2
  405. assert len(task_manager.getTasksMatching('*_task')) == 1
  406. assert len(task_manager.getTasksMatching('*task*')) == 3
  407. def test_remove_tasks_matching(task_manager):
  408. tm = task_manager
  409. # removeTasksMatching
  410. def _testRemoveTasksMatching(task):
  411. return task.cont
  412. tm.add(_testRemoveTasksMatching, 'testRemoveTasksMatching')
  413. assert len(tm.getTasksNamed('testRemoveTasksMatching')) == 1
  414. tm.removeTasksMatching('testRemoveTasksMatching')
  415. assert len(tm.getTasksNamed('testRemoveTasksMatching')) == 0
  416. tm.add(_testRemoveTasksMatching, 'testRemoveTasksMatching1')
  417. tm.add(_testRemoveTasksMatching, 'testRemoveTasksMatching2')
  418. assert len(tm.getTasksNamed('testRemoveTasksMatching1')) == 1
  419. assert len(tm.getTasksNamed('testRemoveTasksMatching2')) == 1
  420. tm.removeTasksMatching('testRemoveTasksMatching*')
  421. assert len(tm.getTasksNamed('testRemoveTasksMatching1')) == 0
  422. assert len(tm.getTasksNamed('testRemoveTasksMatching2')) == 0
  423. tm.add(_testRemoveTasksMatching, 'testRemoveTasksMatching1a')
  424. tm.add(_testRemoveTasksMatching, 'testRemoveTasksMatching2a')
  425. assert len(tm.getTasksNamed('testRemoveTasksMatching1a')) == 1
  426. assert len(tm.getTasksNamed('testRemoveTasksMatching2a')) == 1
  427. tm.removeTasksMatching('testRemoveTasksMatching?a')
  428. assert len(tm.getTasksNamed('testRemoveTasksMatching1a')) == 0
  429. assert len(tm.getTasksNamed('testRemoveTasksMatching2a')) == 0
  430. def test_task_obj(task_manager):
  431. tm = task_manager
  432. # create Task object and add to mgr
  433. l = []
  434. def _testTaskObj(task, l=l):
  435. l.append(None)
  436. return task.cont
  437. t = Task.Task(_testTaskObj)
  438. tm.add(t, 'testTaskObj')
  439. tm.step()
  440. assert len(l) == 1
  441. tm.step()
  442. assert len(l) == 2
  443. tm.remove('testTaskObj')
  444. tm.step()
  445. assert len(l) == 2
  446. def test_task_remove(task_manager):
  447. tm = task_manager
  448. # remove Task via task.remove()
  449. l = []
  450. def _testTaskObjRemove(task, l=l):
  451. l.append(None)
  452. return task.cont
  453. t = Task.Task(_testTaskObjRemove)
  454. tm.add(t, 'testTaskObjRemove')
  455. tm.step()
  456. assert len(l) == 1
  457. tm.step()
  458. assert len(l) == 2
  459. t.remove()
  460. tm.step()
  461. assert len(l) == 2
  462. del t
  463. def test_task_get_sort(task_manager):
  464. tm = task_manager
  465. # set/get Task sort
  466. l = []
  467. def _testTaskObjSort(arg, task, l=l):
  468. l.append(arg)
  469. return task.cont
  470. t1 = Task.Task(_testTaskObjSort)
  471. t2 = Task.Task(_testTaskObjSort)
  472. tm.add(t1, 'testTaskObjSort1', extraArgs=['a',], appendTask=True, sort=1)
  473. tm.add(t2, 'testTaskObjSort2', extraArgs=['b',], appendTask=True, sort=2)
  474. tm.step()
  475. assert len(l) == 2
  476. assert l == ['a', 'b']
  477. assert t1.getSort() == 1
  478. assert t2.getSort() == 2
  479. t1.setSort(3)
  480. assert t1.getSort() == 3
  481. tm.step()
  482. assert len(l) == 4
  483. assert l == ['a', 'b', 'b', 'a',]
  484. t1.remove()
  485. t2.remove()
  486. tm.step()
  487. assert len(l) == 4