test_threading.py 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. import sys
  2. from panda3d import core
  3. from direct.stdpy import threading
  4. import pytest
  5. def test_threading_error():
  6. with pytest.raises(threading.ThreadError):
  7. threading.stack_size()
  8. @pytest.mark.skipif(sys.platform == "emscripten", reason="No threading")
  9. @pytest.mark.skipif(not core.Thread.is_threading_supported(), reason="No threading")
  10. def test_threading():
  11. from collections import deque
  12. _sleep = core.Thread.sleep
  13. _VERBOSE = False
  14. class _Verbose(object):
  15. def __init__(self, verbose=None):
  16. if verbose is None:
  17. verbose = _VERBOSE
  18. self.__verbose = verbose
  19. def _note(self, format, *args):
  20. if self.__verbose:
  21. format = format % args
  22. format = "%s: %s\n" % (
  23. threading.currentThread().getName(), format)
  24. sys.stderr.write(format)
  25. class BoundedQueue(_Verbose):
  26. def __init__(self, limit):
  27. _Verbose.__init__(self)
  28. self.mon = threading.Lock(name = "BoundedQueue.mon")
  29. self.rc = threading.Condition(self.mon)
  30. self.wc = threading.Condition(self.mon)
  31. self.limit = limit
  32. self.queue = deque()
  33. def put(self, item):
  34. self.mon.acquire()
  35. while len(self.queue) >= self.limit:
  36. self._note("put(%s): queue full", item)
  37. self.wc.wait()
  38. self.queue.append(item)
  39. self._note("put(%s): appended, length now %d",
  40. item, len(self.queue))
  41. self.rc.notify()
  42. self.mon.release()
  43. def get(self):
  44. self.mon.acquire()
  45. while not self.queue:
  46. self._note("get(): queue empty")
  47. self.rc.wait()
  48. item = self.queue.popleft()
  49. self._note("get(): got %s, %d left", item, len(self.queue))
  50. self.wc.notify()
  51. self.mon.release()
  52. return item
  53. class ProducerThread(threading.Thread):
  54. def __init__(self, queue, quota):
  55. threading.Thread.__init__(self, name="Producer")
  56. self.queue = queue
  57. self.quota = quota
  58. def run(self):
  59. from random import random
  60. counter = 0
  61. while counter < self.quota:
  62. counter = counter + 1
  63. self.queue.put("%s.%d" % (self.getName(), counter))
  64. _sleep(random() * 0.00001)
  65. class ConsumerThread(threading.Thread):
  66. def __init__(self, queue, count):
  67. threading.Thread.__init__(self, name="Consumer")
  68. self.queue = queue
  69. self.count = count
  70. def run(self):
  71. while self.count > 0:
  72. item = self.queue.get()
  73. print(item)
  74. self.count = self.count - 1
  75. NP = 3
  76. QL = 4
  77. NI = 5
  78. Q = BoundedQueue(QL)
  79. P = []
  80. for i in range(NP):
  81. t = ProducerThread(Q, NI)
  82. t.setName("Producer-%d" % (i+1))
  83. P.append(t)
  84. C = ConsumerThread(Q, NI*NP)
  85. for t in P:
  86. t.start()
  87. _sleep(0.000001)
  88. C.start()
  89. for t in P:
  90. t.join()
  91. C.join()