test_threading.py 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. import sys
  2. from panda3d import core
  3. from direct.stdpy import threading
  4. import pytest
  5. def test_threading_error():
  6. with pytest.raises(threading.ThreadError):
  7. threading.stack_size()
  8. def test_threading():
  9. from collections import deque
  10. _sleep = core.Thread.sleep
  11. _VERBOSE = False
  12. class _Verbose(object):
  13. def __init__(self, verbose=None):
  14. if verbose is None:
  15. verbose = _VERBOSE
  16. self.__verbose = verbose
  17. def _note(self, format, *args):
  18. if self.__verbose:
  19. format = format % args
  20. format = "%s: %s\n" % (
  21. threading.currentThread().getName(), format)
  22. sys.stderr.write(format)
  23. class BoundedQueue(_Verbose):
  24. def __init__(self, limit):
  25. _Verbose.__init__(self)
  26. self.mon = threading.Lock(name = "BoundedQueue.mon")
  27. self.rc = threading.Condition(self.mon)
  28. self.wc = threading.Condition(self.mon)
  29. self.limit = limit
  30. self.queue = deque()
  31. def put(self, item):
  32. self.mon.acquire()
  33. while len(self.queue) >= self.limit:
  34. self._note("put(%s): queue full", item)
  35. self.wc.wait()
  36. self.queue.append(item)
  37. self._note("put(%s): appended, length now %d",
  38. item, len(self.queue))
  39. self.rc.notify()
  40. self.mon.release()
  41. def get(self):
  42. self.mon.acquire()
  43. while not self.queue:
  44. self._note("get(): queue empty")
  45. self.rc.wait()
  46. item = self.queue.popleft()
  47. self._note("get(): got %s, %d left", item, len(self.queue))
  48. self.wc.notify()
  49. self.mon.release()
  50. return item
  51. class ProducerThread(threading.Thread):
  52. def __init__(self, queue, quota):
  53. threading.Thread.__init__(self, name="Producer")
  54. self.queue = queue
  55. self.quota = quota
  56. def run(self):
  57. from random import random
  58. counter = 0
  59. while counter < self.quota:
  60. counter = counter + 1
  61. self.queue.put("%s.%d" % (self.getName(), counter))
  62. _sleep(random() * 0.00001)
  63. class ConsumerThread(threading.Thread):
  64. def __init__(self, queue, count):
  65. threading.Thread.__init__(self, name="Consumer")
  66. self.queue = queue
  67. self.count = count
  68. def run(self):
  69. while self.count > 0:
  70. item = self.queue.get()
  71. print(item)
  72. self.count = self.count - 1
  73. NP = 3
  74. QL = 4
  75. NI = 5
  76. Q = BoundedQueue(QL)
  77. P = []
  78. for i in range(NP):
  79. t = ProducerThread(Q, NI)
  80. t.setName("Producer-%d" % (i+1))
  81. P.append(t)
  82. C = ConsumerThread(Q, NI*NP)
  83. for t in P:
  84. t.start()
  85. _sleep(0.000001)
  86. C.start()
  87. for t in P:
  88. t.join()
  89. C.join()