test_pythontask.py 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
  1. from panda3d.core import AsyncTaskManager, PythonTask
  2. from contextlib import contextmanager
  3. import pytest
  4. import types
  5. import sys
  6. import gc
  7. @contextmanager
  8. def gc_disabled():
  9. gc.disable()
  10. gc.collect()
  11. gc.freeze()
  12. gc.set_debug(gc.DEBUG_SAVEALL)
  13. try:
  14. yield
  15. finally:
  16. gc.set_debug(0)
  17. gc.garbage.clear()
  18. gc.unfreeze()
  19. gc.collect()
  20. gc.enable()
  21. def test_pythontask_property_builtin():
  22. task = PythonTask()
  23. # Read-write property
  24. assert task.name == ""
  25. task.name = "ABC"
  26. # Read-only property
  27. assert task.dt == 0.0
  28. with pytest.raises(AttributeError):
  29. task.dt = 1.0
  30. assert task.dt == 0.0
  31. # Non-existent property
  32. with pytest.raises(AttributeError):
  33. task.abc
  34. def test_pythontask_property_custom():
  35. task = PythonTask()
  36. assert not hasattr(task, 'custom_field')
  37. task.custom_field = 1.0
  38. assert hasattr(task, 'custom_field')
  39. assert task.custom_field == 1.0
  40. task.custom_field = 2.0
  41. assert task.custom_field == 2.0
  42. del task.custom_field
  43. assert not hasattr(task, 'custom_field')
  44. def test_pythontask_property_override():
  45. task = PythonTask()
  46. assert isinstance(task.gather, types.BuiltinMethodType)
  47. task.gather = 123
  48. assert task.gather == 123
  49. del task.gather
  50. assert isinstance(task.gather, types.BuiltinMethodType)
  51. def test_pythontask_dict_get():
  52. task = PythonTask()
  53. d = task.__dict__
  54. rc1 = sys.getrefcount(d)
  55. task.__dict__
  56. task.__dict__
  57. rc2 = sys.getrefcount(d)
  58. assert rc1 == rc2
  59. def test_pythontask_dict_set():
  60. task = PythonTask()
  61. d = {}
  62. rc1 = sys.getrefcount(d)
  63. task.__dict__ = d
  64. rc2 = sys.getrefcount(d)
  65. assert rc1 + 1 == rc2
  66. task.__dict__ = {}
  67. rc2 = sys.getrefcount(d)
  68. assert rc1 == rc2
  69. def test_pythontask_cycle():
  70. with gc_disabled():
  71. task = PythonTask()
  72. assert gc.is_tracked(task)
  73. task.marker = 'test_pythontask_cycle'
  74. task.prop = task
  75. del task
  76. gc.collect()
  77. assert len(gc.garbage) > 0
  78. for g in gc.garbage:
  79. if isinstance(g, PythonTask) and \
  80. getattr(g, 'marker', None) == 'test_pythontask_cycle':
  81. break
  82. else:
  83. pytest.fail('not found in garbage')
  84. def test_task_persistent_wrapper():
  85. task_mgr = AsyncTaskManager.get_global_ptr()
  86. task_chain = task_mgr.make_task_chain("test_task_persistent_wrapper")
  87. # Create a subclass of PythonTask
  88. class PythonTaskSubclassTest(PythonTask):
  89. pass
  90. def task_main(task):
  91. # Set our result to be the input task we got into this function
  92. task.set_result(task)
  93. # Verify we got the subclass
  94. assert isinstance(task, PythonTaskSubclassTest)
  95. return task.done
  96. done_callback_reached = False
  97. def done_callback(task):
  98. # Verify we got the subclass
  99. assert isinstance(task, PythonTaskSubclassTest)
  100. nonlocal done_callback_reached
  101. done_callback_reached = True
  102. task = PythonTaskSubclassTest(task_main)
  103. task.set_task_chain(task_chain.name)
  104. task.set_upon_death(done_callback)
  105. task_mgr.add(task)
  106. task_chain.wait_for_tasks()
  107. assert task.done()
  108. assert not task.cancelled()
  109. # Verify the task passed into the function is the same task we created
  110. assert task.result() is task
  111. # Verify the done callback worked and was tested
  112. assert done_callback_reached