test_condition_var.py 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. from panda3d.core import Mutex, ConditionVarFull
  2. from panda3d import core
  3. from direct.stdpy import thread
  4. import pytest
  5. def yield_thread():
  6. # Thread.force_yield() is not enough for true-threading builds, whereas
  7. # time.sleep() does not yield in simple-thread builds. Thread.sleep()
  8. # seems to do the job in all cases, however.
  9. core.Thread.sleep(0.002)
  10. def test_cvar_notify():
  11. # Just tests that notifying without waiting does no harm.
  12. m = Mutex()
  13. cv = ConditionVarFull(m)
  14. cv.notify()
  15. cv.notify_all()
  16. del cv
  17. def test_cvar_notify_locked():
  18. # Tests the same thing, but with the lock held.
  19. m = Mutex()
  20. cv = ConditionVarFull(m)
  21. m.acquire()
  22. cv.notify()
  23. m.release()
  24. m.acquire()
  25. cv.notify_all()
  26. m.release()
  27. del cv
  28. @pytest.mark.parametrize("num_threads", [1, 2, 3, 4])
  29. @pytest.mark.skipif(not core.Thread.is_threading_supported(),
  30. reason="Threading support disabled")
  31. def test_cvar_notify_thread(num_threads):
  32. # Tests notify() with some number of threads waiting.
  33. m = Mutex()
  34. cv = ConditionVarFull(m)
  35. # We prematurely notify, so that we can test that it's not doing anything.
  36. m.acquire()
  37. cv.notify()
  38. state = {'waiting': 0}
  39. def wait_thread():
  40. m.acquire()
  41. state['waiting'] += 1
  42. cv.wait()
  43. state['waiting'] -= 1
  44. m.release()
  45. # Start the threads, and yield to it, giving it a chance to mess up.
  46. threads = []
  47. for i in range(num_threads):
  48. thread = core.PythonThread(wait_thread, (), "", "")
  49. thread.start(core.TP_high, True)
  50. # Yield until all of the threads are waiting for the condition variable.
  51. for i in range(1000):
  52. m.release()
  53. yield_thread()
  54. m.acquire()
  55. if state['waiting'] == num_threads:
  56. break
  57. assert state['waiting'] == num_threads
  58. # OK, now signal it, and yield. One thread must be unblocked per notify.
  59. for i in range(num_threads):
  60. cv.notify()
  61. expected_waiters = num_threads - i - 1
  62. for j in range(1000):
  63. m.release()
  64. yield_thread()
  65. m.acquire()
  66. if state['waiting'] == expected_waiters:
  67. break
  68. assert state['waiting'] == expected_waiters
  69. m.release()
  70. for thread in threads:
  71. thread.join()
  72. cv = None
  73. @pytest.mark.parametrize("num_threads", [1, 2, 3, 4])
  74. @pytest.mark.skipif(not core.Thread.is_threading_supported(),
  75. reason="Threading support disabled")
  76. def test_cvar_notify_all_threads(num_threads):
  77. # Tests notify_all() with some number of threads waiting.
  78. m = Mutex()
  79. cv = ConditionVarFull(m)
  80. # We prematurely notify, so that we can test that it's not doing anything.
  81. m.acquire()
  82. cv.notify_all()
  83. state = {'waiting': 0}
  84. def wait_thread():
  85. m.acquire()
  86. state['waiting'] += 1
  87. cv.wait()
  88. state['waiting'] -= 1
  89. m.release()
  90. # Start the threads, and yield to it, giving it a chance to mess up.
  91. threads = []
  92. for i in range(num_threads):
  93. thread = core.PythonThread(wait_thread, (), "", "")
  94. thread.start(core.TP_high, True)
  95. # Yield until all of the threads are waiting for the condition variable.
  96. for i in range(1000):
  97. m.release()
  98. yield_thread()
  99. m.acquire()
  100. if state['waiting'] == num_threads:
  101. break
  102. assert state['waiting'] == num_threads
  103. # OK, now signal it, and yield. All threads must unblock.
  104. cv.notify_all()
  105. for i in range(1000):
  106. m.release()
  107. yield_thread()
  108. m.acquire()
  109. if state['waiting'] == 0:
  110. break
  111. assert state['waiting'] == 0
  112. m.release()
  113. for thread in threads:
  114. thread.join()
  115. cv = None