test_condition_var.py 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146
  1. from panda3d.core import Mutex, ConditionVar
  2. from panda3d import core
  3. from direct.stdpy import thread
  4. import pytest
  5. def yield_thread():
  6. # Thread.force_yield() is not enough for true-threading builds, whereas
  7. # time.sleep() does not yield in simple-thread builds. Thread.sleep()
  8. # seems to do the job in all cases, however.
  9. core.Thread.sleep(0.002)
  10. def test_cvar_notify():
  11. # Just tests that notifying without waiting does no harm.
  12. m = Mutex()
  13. cv = ConditionVar(m)
  14. cv.notify()
  15. cv.notify_all()
  16. del cv
  17. def test_cvar_notify_locked():
  18. # Tests the same thing, but with the lock held.
  19. m = Mutex()
  20. cv = ConditionVar(m)
  21. with m:
  22. cv.notify()
  23. with m:
  24. cv.notify_all()
  25. del cv
  26. @pytest.mark.parametrize("num_threads", [1, 2, 3, 4])
  27. @pytest.mark.skipif(not core.Thread.is_threading_supported(),
  28. reason="Threading support disabled")
  29. def test_cvar_notify_thread(num_threads):
  30. # Tests notify() with some number of threads waiting.
  31. m = Mutex()
  32. cv = ConditionVar(m)
  33. # We prematurely notify, so that we can test that it's not doing anything.
  34. m.acquire()
  35. cv.notify()
  36. state = {'waiting': 0}
  37. def wait_thread():
  38. m.acquire()
  39. state['waiting'] += 1
  40. cv.wait()
  41. state['waiting'] -= 1
  42. m.release()
  43. # Start the threads, and yield to it, giving it a chance to mess up.
  44. threads = []
  45. for i in range(num_threads):
  46. thread = core.PythonThread(wait_thread, (), "", "")
  47. thread.start(core.TP_high, True)
  48. # Yield until all of the threads are waiting for the condition variable.
  49. for i in range(1000):
  50. m.release()
  51. yield_thread()
  52. m.acquire()
  53. if state['waiting'] == num_threads:
  54. break
  55. assert state['waiting'] == num_threads
  56. # OK, now signal it, and yield. One thread must be unblocked per notify.
  57. for i in range(num_threads):
  58. cv.notify()
  59. expected_waiters = num_threads - i - 1
  60. for j in range(1000):
  61. m.release()
  62. yield_thread()
  63. m.acquire()
  64. if state['waiting'] == expected_waiters:
  65. break
  66. assert state['waiting'] == expected_waiters
  67. m.release()
  68. for thread in threads:
  69. thread.join()
  70. cv = None
  71. @pytest.mark.parametrize("num_threads", [1, 2, 3, 4])
  72. @pytest.mark.skipif(not core.Thread.is_threading_supported(),
  73. reason="Threading support disabled")
  74. def test_cvar_notify_all_threads(num_threads):
  75. # Tests notify_all() with some number of threads waiting.
  76. m = Mutex()
  77. cv = ConditionVar(m)
  78. # We prematurely notify, so that we can test that it's not doing anything.
  79. m.acquire()
  80. cv.notify_all()
  81. state = {'waiting': 0}
  82. def wait_thread():
  83. m.acquire()
  84. state['waiting'] += 1
  85. cv.wait()
  86. state['waiting'] -= 1
  87. m.release()
  88. # Start the threads, and yield to it, giving it a chance to mess up.
  89. threads = []
  90. for i in range(num_threads):
  91. thread = core.PythonThread(wait_thread, (), "", "")
  92. thread.start(core.TP_high, True)
  93. # Yield until all of the threads are waiting for the condition variable.
  94. for i in range(1000):
  95. m.release()
  96. yield_thread()
  97. m.acquire()
  98. if state['waiting'] == num_threads:
  99. break
  100. assert state['waiting'] == num_threads
  101. # OK, now signal it, and yield. All threads must unblock.
  102. cv.notify_all()
  103. for i in range(1000):
  104. m.release()
  105. yield_thread()
  106. m.acquire()
  107. if state['waiting'] == 0:
  108. break
  109. assert state['waiting'] == 0
  110. m.release()
  111. for thread in threads:
  112. thread.join()
  113. cv = None