test_threading.py 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
  1. import sys
  2. from panda3d import core
  3. from direct.stdpy import threading
  4. import pytest
  5. def test_threading_error():
  6. with pytest.raises(threading.ThreadError):
  7. threading.stack_size()
  8. @pytest.mark.skipif(sys.platform == "emscripten", reason="No threading")
  9. def test_threading():
  10. from collections import deque
  11. _sleep = core.Thread.sleep
  12. _VERBOSE = False
  13. class _Verbose(object):
  14. def __init__(self, verbose=None):
  15. if verbose is None:
  16. verbose = _VERBOSE
  17. self.__verbose = verbose
  18. def _note(self, format, *args):
  19. if self.__verbose:
  20. format = format % args
  21. format = "%s: %s\n" % (
  22. threading.currentThread().getName(), format)
  23. sys.stderr.write(format)
  24. class BoundedQueue(_Verbose):
  25. def __init__(self, limit):
  26. _Verbose.__init__(self)
  27. self.mon = threading.Lock(name = "BoundedQueue.mon")
  28. self.rc = threading.Condition(self.mon)
  29. self.wc = threading.Condition(self.mon)
  30. self.limit = limit
  31. self.queue = deque()
  32. def put(self, item):
  33. self.mon.acquire()
  34. while len(self.queue) >= self.limit:
  35. self._note("put(%s): queue full", item)
  36. self.wc.wait()
  37. self.queue.append(item)
  38. self._note("put(%s): appended, length now %d",
  39. item, len(self.queue))
  40. self.rc.notify()
  41. self.mon.release()
  42. def get(self):
  43. self.mon.acquire()
  44. while not self.queue:
  45. self._note("get(): queue empty")
  46. self.rc.wait()
  47. item = self.queue.popleft()
  48. self._note("get(): got %s, %d left", item, len(self.queue))
  49. self.wc.notify()
  50. self.mon.release()
  51. return item
  52. class ProducerThread(threading.Thread):
  53. def __init__(self, queue, quota):
  54. threading.Thread.__init__(self, name="Producer")
  55. self.queue = queue
  56. self.quota = quota
  57. def run(self):
  58. from random import random
  59. counter = 0
  60. while counter < self.quota:
  61. counter = counter + 1
  62. self.queue.put("%s.%d" % (self.getName(), counter))
  63. _sleep(random() * 0.00001)
  64. class ConsumerThread(threading.Thread):
  65. def __init__(self, queue, count):
  66. threading.Thread.__init__(self, name="Consumer")
  67. self.queue = queue
  68. self.count = count
  69. def run(self):
  70. while self.count > 0:
  71. item = self.queue.get()
  72. print(item)
  73. self.count = self.count - 1
  74. NP = 3
  75. QL = 4
  76. NI = 5
  77. Q = BoundedQueue(QL)
  78. P = []
  79. for i in range(NP):
  80. t = ProducerThread(Q, NI)
  81. t.setName("Producer-%d" % (i+1))
  82. P.append(t)
  83. C = ConsumerThread(Q, NI*NP)
  84. for t in P:
  85. t.start()
  86. _sleep(0.000001)
  87. C.start()
  88. for t in P:
  89. t.join()
  90. C.join()