threading.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542
  1. """ This module reimplements Python's native threading module using Panda
  2. threading constructs. It's designed as a drop-in replacement for the
  3. threading module for code that works with Panda; it is necessary because
  4. in some compilation models, Panda's threading constructs are
  5. incompatible with the OS-provided threads used by Python's thread
  6. module.
  7. This module implements the threading module with a thin layer over
  8. Panda's threading constructs. As such, the semantics are close to,
  9. but not precisely, the semantics documented for Python's standard
  10. threading module. If you really do require strict adherence to
  11. Python's semantics, see the threading2 module instead.
  12. However, if you don't need such strict adherence to Python's original
  13. semantics, this module is probably a better choice. It is likely to
  14. be slighly faster than the threading2 module (and even slightly faster
  15. than Python's own threading module). It is also better integrated
  16. with Panda's threads, so that Panda's thread debug mechanisms will be
  17. easier to use and understand.
  18. It is permissible to mix-and-match both threading and threading2
  19. within the same application. """
  20. from panda3d import core
  21. from direct.stdpy import thread as _thread
  22. import sys as _sys
  23. import weakref
  24. __all__ = [
  25. 'Thread',
  26. 'Lock', 'RLock',
  27. 'Condition',
  28. 'Semaphore', 'BoundedSemaphore',
  29. 'Event',
  30. 'Timer',
  31. 'ThreadError',
  32. 'local',
  33. 'current_thread',
  34. 'main_thread',
  35. 'enumerate', 'active_count',
  36. 'settrace', 'setprofile', 'stack_size',
  37. 'TIMEOUT_MAX',
  38. ]
  39. TIMEOUT_MAX = _thread.TIMEOUT_MAX
  40. local = _thread._local
  41. _newname = _thread._newname
  42. ThreadError = _thread.error
  43. class ThreadBase:
  44. """ A base class for both Thread and ExternalThread in this
  45. module. """
  46. def __init__(self):
  47. pass
  48. def getName(self):
  49. return self.name
  50. def isDaemon(self):
  51. return self.daemon
  52. def setDaemon(self, daemon):
  53. if self.is_alive():
  54. raise RuntimeError
  55. self.__dict__['daemon'] = daemon
  56. def __setattr__(self, key, value):
  57. if key == 'name':
  58. self.setName(value)
  59. elif key == 'ident':
  60. raise AttributeError
  61. elif key == 'daemon':
  62. self.setDaemon(value)
  63. else:
  64. self.__dict__[key] = value
  65. # Copy these static methods from Panda's Thread object. These are
  66. # useful if you may be running in Panda's SIMPLE_THREADS compilation
  67. # mode.
  68. ThreadBase.forceYield = core.Thread.forceYield
  69. ThreadBase.considerYield = core.Thread.considerYield
  70. class Thread(ThreadBase):
  71. """ This class provides a wrapper around Panda's PythonThread
  72. object. The wrapper is designed to emulate Python's own
  73. threading.Thread object. """
  74. def __init__(self, group=None, target=None, name=None, args=(), kwargs={}, daemon=None):
  75. ThreadBase.__init__(self)
  76. assert group is None
  77. self.__target = target
  78. self.__args = args
  79. self.__kwargs = kwargs
  80. if not name:
  81. name = _newname()
  82. current = current_thread()
  83. if daemon is not None:
  84. self.__dict__['daemon'] = daemon
  85. else:
  86. self.__dict__['daemon'] = current.daemon
  87. self.__dict__['name'] = name
  88. def call_run():
  89. # As soon as the thread is done, break the circular reference.
  90. try:
  91. self.run()
  92. finally:
  93. self.__thread = None
  94. _thread._remove_thread_id(self.ident)
  95. self.__thread = core.PythonThread(call_run, None, name, name)
  96. threadId = _thread._add_thread(self.__thread, weakref.proxy(self))
  97. self.__dict__['ident'] = threadId
  98. def __del__(self):
  99. # On interpreter shutdown, the _thread module might have
  100. # already been cleaned up.
  101. if _thread and _thread._remove_thread_id:
  102. _thread._remove_thread_id(self.ident)
  103. def is_alive(self):
  104. thread = self.__thread
  105. return thread is not None and thread.is_started()
  106. isAlive = is_alive
  107. def start(self):
  108. thread = self.__thread
  109. if thread is None or thread.is_started():
  110. raise RuntimeError
  111. if not thread.start(core.TPNormal, True):
  112. raise RuntimeError
  113. def run(self):
  114. if _settrace_func:
  115. _sys.settrace(_settrace_func)
  116. if _setprofile_func:
  117. _sys.setprofile(_setprofile_func)
  118. self.__target(*self.__args, **self.__kwargs)
  119. def join(self, timeout = None):
  120. # We don't support a timed join here, sorry.
  121. assert timeout is None
  122. thread = self.__thread
  123. if thread is not None:
  124. thread.join()
  125. # Clear the circular reference.
  126. self.__thread = None
  127. _thread._remove_thread_id(self.ident)
  128. def setName(self, name):
  129. self.__dict__['name'] = name
  130. self.__thread.setName(name)
  131. class ExternalThread(ThreadBase):
  132. """ Returned for a Thread object that wasn't created by this
  133. interface. """
  134. def __init__(self, extThread, threadId):
  135. ThreadBase.__init__(self)
  136. self.__thread = extThread
  137. self.__dict__['daemon'] = True
  138. self.__dict__['name'] = self.__thread.getName()
  139. self.__dict__['ident'] = threadId
  140. def is_alive(self):
  141. return self.__thread.isStarted()
  142. def isAlive(self):
  143. return self.__thread.isStarted()
  144. def start(self):
  145. raise RuntimeError
  146. def run(self):
  147. raise RuntimeError
  148. def join(self, timeout = None):
  149. raise RuntimeError
  150. def setDaemon(self, daemon):
  151. raise RuntimeError
  152. class MainThread(ExternalThread):
  153. """ Returned for the MainThread object. """
  154. def __init__(self, extThread, threadId):
  155. ExternalThread.__init__(self, extThread, threadId)
  156. self.__dict__['daemon'] = False
  157. class Lock(core.Mutex):
  158. """ This class provides a wrapper around Panda's Mutex object.
  159. The wrapper is designed to emulate Python's own threading.Lock
  160. object. """
  161. def __init__(self, name = "PythonLock"):
  162. core.Mutex.__init__(self, name)
  163. class RLock(core.ReMutex):
  164. """ This class provides a wrapper around Panda's ReMutex object.
  165. The wrapper is designed to emulate Python's own threading.RLock
  166. object. """
  167. def __init__(self, name = "PythonRLock"):
  168. core.ReMutex.__init__(self, name)
  169. class Condition(core.ConditionVar):
  170. """ This class provides a wrapper around Panda's ConditionVar
  171. object. The wrapper is designed to emulate Python's own
  172. threading.Condition object. """
  173. def __init__(self, lock = None):
  174. if not lock:
  175. lock = Lock()
  176. # Panda doesn't support RLock objects used with condition
  177. # variables.
  178. assert isinstance(lock, Lock)
  179. self.__lock = lock
  180. core.ConditionVar.__init__(self, self.__lock)
  181. def acquire(self, *args, **kw):
  182. return self.__lock.acquire(*args, **kw)
  183. def release(self):
  184. self.__lock.release()
  185. def wait(self, timeout = None):
  186. if timeout is None:
  187. core.ConditionVar.wait(self)
  188. else:
  189. core.ConditionVar.wait(self, timeout)
  190. def notifyAll(self):
  191. core.ConditionVar.notifyAll(self)
  192. notify_all = notifyAll
  193. __enter__ = acquire
  194. def __exit__(self, t, v, tb):
  195. self.release()
  196. class Semaphore(core.Semaphore):
  197. """ This class provides a wrapper around Panda's Semaphore
  198. object. The wrapper is designed to emulate Python's own
  199. threading.Semaphore object. """
  200. def __init__(self, value = 1):
  201. core.Semaphore.__init__(self, value)
  202. def acquire(self, blocking = True):
  203. if blocking:
  204. core.Semaphore.acquire(self)
  205. return True
  206. else:
  207. return core.Semaphore.tryAcquire(self)
  208. __enter__ = acquire
  209. def __exit__(self, t, v, tb):
  210. self.release()
  211. class BoundedSemaphore(Semaphore):
  212. """ This class provides a wrapper around Panda's Semaphore
  213. object. The wrapper is designed to emulate Python's own
  214. threading.BoundedSemaphore object. """
  215. def __init__(self, value = 1):
  216. self.__max = value
  217. Semaphore.__init__(value)
  218. def release(self):
  219. if self.getCount() > self.__max:
  220. raise ValueError
  221. Semaphore.release(self)
  222. class Event:
  223. """ This class is designed to emulate Python's own threading.Event
  224. object. """
  225. def __init__(self):
  226. self.__lock = core.Mutex("Python Event")
  227. self.__cvar = core.ConditionVar(self.__lock)
  228. self.__flag = False
  229. def is_set(self):
  230. return self.__flag
  231. isSet = is_set
  232. def set(self):
  233. self.__lock.acquire()
  234. try:
  235. self.__flag = True
  236. self.__cvar.notifyAll()
  237. finally:
  238. self.__lock.release()
  239. def clear(self):
  240. self.__lock.acquire()
  241. try:
  242. self.__flag = False
  243. finally:
  244. self.__lock.release()
  245. def wait(self, timeout = None):
  246. self.__lock.acquire()
  247. try:
  248. if timeout is None:
  249. while not self.__flag:
  250. self.__cvar.wait()
  251. else:
  252. clock = core.TrueClock.getGlobalPtr()
  253. expires = clock.getShortTime() + timeout
  254. while not self.__flag:
  255. wait = expires - clock.getShortTime()
  256. if wait < 0:
  257. return
  258. self.__cvar.wait(wait)
  259. finally:
  260. self.__lock.release()
  261. class Timer(Thread):
  262. """Call a function after a specified number of seconds:
  263. t = Timer(30.0, f, args=[], kwargs={})
  264. t.start()
  265. t.cancel() # stop the timer's action if it's still waiting
  266. """
  267. def __init__(self, interval, function, args=[], kwargs={}):
  268. Thread.__init__(self)
  269. self.interval = interval
  270. self.function = function
  271. self.args = args
  272. self.kwargs = kwargs
  273. self.finished = Event()
  274. def cancel(self):
  275. """Stop the timer if it hasn't finished yet"""
  276. self.finished.set()
  277. def run(self):
  278. self.finished.wait(self.interval)
  279. if not self.finished.isSet():
  280. self.function(*self.args, **self.kwargs)
  281. self.finished.set()
  282. def _create_thread_wrapper(t, threadId):
  283. """ Creates a thread wrapper for the indicated external thread. """
  284. if isinstance(t, core.MainThread):
  285. pyt = MainThread(t, threadId)
  286. else:
  287. pyt = ExternalThread(t, threadId)
  288. return pyt
  289. def current_thread():
  290. t = core.Thread.getCurrentThread()
  291. return _thread._get_thread_wrapper(t, _create_thread_wrapper)
  292. def main_thread():
  293. t = core.Thread.getMainThread()
  294. return _thread._get_thread_wrapper(t, _create_thread_wrapper)
  295. currentThread = current_thread
  296. def enumerate():
  297. tlist = []
  298. _thread._threadsLock.acquire()
  299. try:
  300. for thread, locals, wrapper in list(_thread._threads.values()):
  301. if wrapper and wrapper.is_alive():
  302. tlist.append(wrapper)
  303. return tlist
  304. finally:
  305. _thread._threadsLock.release()
  306. def active_count():
  307. return len(enumerate())
  308. activeCount = active_count
  309. _settrace_func = None
  310. def settrace(func):
  311. global _settrace_func
  312. _settrace_func = func
  313. _setprofile_func = None
  314. def setprofile(func):
  315. global _setprofile_func
  316. _setprofile_func = func
  317. def stack_size(size = None):
  318. raise ThreadError
  319. if __debug__:
  320. def _test():
  321. from collections import deque
  322. _sleep = core.Thread.sleep
  323. _VERBOSE = False
  324. class _Verbose(object):
  325. def __init__(self, verbose=None):
  326. if verbose is None:
  327. verbose = _VERBOSE
  328. self.__verbose = verbose
  329. def _note(self, format, *args):
  330. if self.__verbose:
  331. format = format % args
  332. format = "%s: %s\n" % (
  333. currentThread().getName(), format)
  334. _sys.stderr.write(format)
  335. class BoundedQueue(_Verbose):
  336. def __init__(self, limit):
  337. _Verbose.__init__(self)
  338. self.mon = Lock(name = "BoundedQueue.mon")
  339. self.rc = Condition(self.mon)
  340. self.wc = Condition(self.mon)
  341. self.limit = limit
  342. self.queue = deque()
  343. def put(self, item):
  344. self.mon.acquire()
  345. while len(self.queue) >= self.limit:
  346. self._note("put(%s): queue full", item)
  347. self.wc.wait()
  348. self.queue.append(item)
  349. self._note("put(%s): appended, length now %d",
  350. item, len(self.queue))
  351. self.rc.notify()
  352. self.mon.release()
  353. def get(self):
  354. self.mon.acquire()
  355. while not self.queue:
  356. self._note("get(): queue empty")
  357. self.rc.wait()
  358. item = self.queue.popleft()
  359. self._note("get(): got %s, %d left", item, len(self.queue))
  360. self.wc.notify()
  361. self.mon.release()
  362. return item
  363. class ProducerThread(Thread):
  364. def __init__(self, queue, quota):
  365. Thread.__init__(self, name="Producer")
  366. self.queue = queue
  367. self.quota = quota
  368. def run(self):
  369. from random import random
  370. counter = 0
  371. while counter < self.quota:
  372. counter = counter + 1
  373. self.queue.put("%s.%d" % (self.getName(), counter))
  374. _sleep(random() * 0.00001)
  375. class ConsumerThread(Thread):
  376. def __init__(self, queue, count):
  377. Thread.__init__(self, name="Consumer")
  378. self.queue = queue
  379. self.count = count
  380. def run(self):
  381. while self.count > 0:
  382. item = self.queue.get()
  383. print(item)
  384. self.count = self.count - 1
  385. NP = 3
  386. QL = 4
  387. NI = 5
  388. Q = BoundedQueue(QL)
  389. P = []
  390. for i in range(NP):
  391. t = ProducerThread(Q, NI)
  392. t.setName("Producer-%d" % (i+1))
  393. P.append(t)
  394. C = ConsumerThread(Q, NI*NP)
  395. for t in P:
  396. t.start()
  397. _sleep(0.000001)
  398. C.start()
  399. for t in P:
  400. t.join()
  401. C.join()
  402. if __name__ == '__main__':
  403. _test()