test_condition_var.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141
  1. from panda3d.core import Mutex, ConditionVarFull
  2. from panda3d import core
  3. from direct.stdpy import thread
  4. import pytest
  5. def yield_thread():
  6. # Thread.force_yield() is not enough for true-threading builds, whereas
  7. # time.sleep() does not yield in simple-thread builds. Thread.sleep()
  8. # seems to do the job in all cases, however.
  9. core.Thread.sleep(0.002)
  10. def test_cvar_notify():
  11. # Just tests that notifying without waiting does no harm.
  12. m = Mutex()
  13. cv = ConditionVarFull(m)
  14. cv.notify()
  15. cv.notify_all()
  16. del cv
  17. def test_cvar_notify_locked():
  18. # Tests the same thing, but with the lock held.
  19. m = Mutex()
  20. cv = ConditionVarFull(m)
  21. m.acquire()
  22. cv.notify()
  23. m.release()
  24. m.acquire()
  25. cv.notify_all()
  26. m.release()
  27. del cv
  28. @pytest.mark.parametrize("num_threads", [1, 2, 3, 4])
  29. @pytest.mark.skipif(not core.Thread.is_threading_supported(),
  30. reason="Threading support disabled")
  31. def test_cvar_notify_thread(num_threads):
  32. # Tests notify() with some number of threads waiting.
  33. m = Mutex()
  34. cv = ConditionVarFull(m)
  35. # We prematurely notify, so that we can test that it's not doing anything.
  36. m.acquire()
  37. cv.notify()
  38. state = {'waiting': 0}
  39. def wait_thread():
  40. m.acquire()
  41. state['waiting'] += 1
  42. cv.wait()
  43. state['waiting'] -= 1
  44. m.release()
  45. # Start the threads, and yield to it, giving it a chance to mess up.
  46. threads = []
  47. for i in range(num_threads):
  48. thread = core.PythonThread(wait_thread, (), "", "")
  49. thread.start(core.TP_high, True)
  50. # Yield until all of the threads are waiting for the condition variable.
  51. for i in range(1000):
  52. m.release()
  53. yield_thread()
  54. m.acquire()
  55. if state['waiting'] == num_threads:
  56. break
  57. assert state['waiting'] == num_threads
  58. m.release()
  59. # OK, now signal it, and yield. One thread must be unblocked per notify.
  60. for i in range(num_threads):
  61. cv.notify()
  62. yield_thread()
  63. m.acquire()
  64. assert state['waiting'] == num_threads - i - 1
  65. m.release()
  66. for thread in threads:
  67. thread.join()
  68. cv = None
  69. @pytest.mark.parametrize("num_threads", [1, 2, 3, 4])
  70. @pytest.mark.skipif(not core.Thread.is_threading_supported(),
  71. reason="Threading support disabled")
  72. def test_cvar_notify_all_threads(num_threads):
  73. # Tests notify_all() with some number of threads waiting.
  74. m = Mutex()
  75. cv = ConditionVarFull(m)
  76. # We prematurely notify, so that we can test that it's not doing anything.
  77. m.acquire()
  78. cv.notify_all()
  79. state = {'waiting': 0}
  80. def wait_thread():
  81. m.acquire()
  82. state['waiting'] += 1
  83. cv.wait()
  84. state['waiting'] -= 1
  85. m.release()
  86. # Start the threads, and yield to it, giving it a chance to mess up.
  87. threads = []
  88. for i in range(num_threads):
  89. thread = core.PythonThread(wait_thread, (), "", "")
  90. thread.start(core.TP_high, True)
  91. # Yield until all of the threads are waiting for the condition variable.
  92. for i in range(1000):
  93. m.release()
  94. yield_thread()
  95. m.acquire()
  96. if state['waiting'] == num_threads:
  97. break
  98. assert state['waiting'] == num_threads
  99. # OK, now signal it, and yield. All threads must unblock.
  100. cv.notify_all()
  101. for i in range(1000):
  102. m.release()
  103. yield_thread()
  104. m.acquire()
  105. if state['waiting'] == 0:
  106. break
  107. assert state['waiting'] == 0
  108. m.release()
  109. for thread in threads:
  110. thread.join()
  111. cv = None