test_threading2.py 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687
  1. import sys
  2. from collections import deque
  3. from panda3d import core
  4. from direct.stdpy import threading2
  5. import pytest
  6. @pytest.mark.skipif(sys.platform == "emscripten", reason="No threading")
  7. def test_threading2():
  8. class BoundedQueue(threading2._Verbose):
  9. def __init__(self, limit):
  10. threading2._Verbose.__init__(self)
  11. self.mon = threading2.RLock()
  12. self.rc = threading2.Condition(self.mon)
  13. self.wc = threading2.Condition(self.mon)
  14. self.limit = limit
  15. self.queue = deque()
  16. def put(self, item):
  17. self.mon.acquire()
  18. while len(self.queue) >= self.limit:
  19. self._note("put(%s): queue full", item)
  20. self.wc.wait()
  21. self.queue.append(item)
  22. self._note("put(%s): appended, length now %d",
  23. item, len(self.queue))
  24. self.rc.notify()
  25. self.mon.release()
  26. def get(self):
  27. self.mon.acquire()
  28. while not self.queue:
  29. self._note("get(): queue empty")
  30. self.rc.wait()
  31. item = self.queue.popleft()
  32. self._note("get(): got %s, %d left", item, len(self.queue))
  33. self.wc.notify()
  34. self.mon.release()
  35. return item
  36. class ProducerThread(threading2.Thread):
  37. def __init__(self, queue, quota):
  38. threading2.Thread.__init__(self, name="Producer")
  39. self.queue = queue
  40. self.quota = quota
  41. def run(self):
  42. from random import random
  43. counter = 0
  44. while counter < self.quota:
  45. counter = counter + 1
  46. self.queue.put("%s.%d" % (self.getName(), counter))
  47. core.Thread.sleep(random() * 0.00001)
  48. class ConsumerThread(threading2.Thread):
  49. def __init__(self, queue, count):
  50. threading2.Thread.__init__(self, name="Consumer")
  51. self.queue = queue
  52. self.count = count
  53. def run(self):
  54. while self.count > 0:
  55. item = self.queue.get()
  56. print(item)
  57. self.count = self.count - 1
  58. NP = 3
  59. QL = 4
  60. NI = 5
  61. Q = BoundedQueue(QL)
  62. P = []
  63. for i in range(NP):
  64. t = ProducerThread(Q, NI)
  65. t.setName("Producer-%d" % (i+1))
  66. P.append(t)
  67. C = ConsumerThread(Q, NI*NP)
  68. for t in P:
  69. t.start()
  70. core.Thread.sleep(0.000001)
  71. C.start()
  72. for t in P:
  73. t.join()
  74. C.join()