threading.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542
  1. """ This module reimplements Python's native threading module using Panda
  2. threading constructs. It's designed as a drop-in replacement for the
  3. threading module for code that works with Panda; it is necessary because
  4. in some compilation models, Panda's threading constructs are
  5. incompatible with the OS-provided threads used by Python's thread
  6. module.
  7. This module implements the threading module with a thin layer over
  8. Panda's threading constructs. As such, the semantics are close to,
  9. but not precisely, the semantics documented for Python's standard
  10. threading module. If you really do require strict adherence to
  11. Python's semantics, see the threading2 module instead.
  12. However, if you don't need such strict adherence to Python's original
  13. semantics, this module is probably a better choice. It is likely to
  14. be slighly faster than the threading2 module (and even slightly faster
  15. than Python's own threading module). It is also better integrated
  16. with Panda's threads, so that Panda's thread debug mechanisms will be
  17. easier to use and understand.
  18. It is permissible to mix-and-match both threading and threading2
  19. within the same application. """
  20. from panda3d import core
  21. from direct.stdpy import thread as _thread
  22. import sys as _sys
  23. import weakref
  24. __all__ = [
  25. 'Thread',
  26. 'Lock', 'RLock',
  27. 'Condition',
  28. 'Semaphore', 'BoundedSemaphore',
  29. 'Event',
  30. 'Timer',
  31. 'ThreadError',
  32. 'local',
  33. 'current_thread',
  34. 'main_thread',
  35. 'enumerate', 'active_count',
  36. 'settrace', 'setprofile', 'stack_size',
  37. 'TIMEOUT_MAX',
  38. ]
  39. TIMEOUT_MAX = _thread.TIMEOUT_MAX
  40. local = _thread._local
  41. _newname = _thread._newname
  42. ThreadError = _thread.error
  43. class ThreadBase:
  44. """ A base class for both Thread and ExternalThread in this
  45. module. """
  46. def __init__(self):
  47. pass
  48. def getName(self):
  49. return self.name
  50. def isDaemon(self):
  51. return self.daemon
  52. def setDaemon(self, daemon):
  53. if self.is_alive():
  54. raise RuntimeError
  55. self.__dict__['daemon'] = daemon
  56. def __setattr__(self, key, value):
  57. if key == 'name':
  58. self.setName(value)
  59. elif key == 'ident':
  60. raise AttributeError
  61. elif key == 'daemon':
  62. self.setDaemon(value)
  63. else:
  64. self.__dict__[key] = value
  65. # Copy these static methods from Panda's Thread object. These are
  66. # useful if you may be running in Panda's SIMPLE_THREADS compilation
  67. # mode.
  68. ThreadBase.forceYield = core.Thread.forceYield
  69. ThreadBase.considerYield = core.Thread.considerYield
  70. class Thread(ThreadBase):
  71. """ This class provides a wrapper around Panda's PythonThread
  72. object. The wrapper is designed to emulate Python's own
  73. threading.Thread object. """
  74. def __init__(self, group=None, target=None, name=None, args=(), kwargs={}, daemon=None):
  75. ThreadBase.__init__(self)
  76. assert group is None
  77. self.__target = target
  78. self.__args = args
  79. self.__kwargs = kwargs
  80. if not name:
  81. name = _newname()
  82. current = current_thread()
  83. if daemon is not None:
  84. self.__dict__['daemon'] = daemon
  85. else:
  86. self.__dict__['daemon'] = current.daemon
  87. self.__dict__['name'] = name
  88. def call_run():
  89. # As soon as the thread is done, break the circular reference.
  90. try:
  91. self.run()
  92. finally:
  93. self.__thread = None
  94. _thread._remove_thread_id(self.ident)
  95. self.__thread = core.PythonThread(call_run, None, name, name)
  96. threadId = _thread._add_thread(self.__thread, weakref.proxy(self))
  97. self.__dict__['ident'] = threadId
  98. def __del__(self):
  99. # On interpreter shutdown, the _thread module might have
  100. # already been cleaned up.
  101. if _thread and _thread._remove_thread_id:
  102. _thread._remove_thread_id(self.ident)
  103. def is_alive(self):
  104. thread = self.__thread
  105. return thread is not None and thread.is_started()
  106. isAlive = is_alive
  107. def start(self):
  108. thread = self.__thread
  109. if thread is None or thread.is_started():
  110. raise RuntimeError
  111. if not thread.start(core.TPNormal, True):
  112. raise RuntimeError
  113. def run(self):
  114. if _settrace_func:
  115. _sys.settrace(_settrace_func)
  116. if _setprofile_func:
  117. _sys.setprofile(_setprofile_func)
  118. self.__target(*self.__args, **self.__kwargs)
  119. def join(self, timeout = None):
  120. # We don't support a timed join here, sorry.
  121. assert timeout is None
  122. thread = self.__thread
  123. if thread is not None:
  124. thread.join()
  125. # Clear the circular reference.
  126. self.__thread = None
  127. _thread._remove_thread_id(self.ident)
  128. def setName(self, name):
  129. self.__dict__['name'] = name
  130. self.__thread.setName(name)
  131. class ExternalThread(ThreadBase):
  132. """ Returned for a Thread object that wasn't created by this
  133. interface. """
  134. def __init__(self, extThread, threadId):
  135. ThreadBase.__init__(self)
  136. self.__thread = extThread
  137. self.__dict__['daemon'] = True
  138. self.__dict__['name'] = self.__thread.getName()
  139. self.__dict__['ident'] = threadId
  140. def is_alive(self):
  141. return self.__thread.isStarted()
  142. def isAlive(self):
  143. return self.__thread.isStarted()
  144. def start(self):
  145. raise RuntimeError
  146. def run(self):
  147. raise RuntimeError
  148. def join(self, timeout = None):
  149. raise RuntimeError
  150. def setDaemon(self, daemon):
  151. raise RuntimeError
  152. class MainThread(ExternalThread):
  153. """ Returned for the MainThread object. """
  154. def __init__(self, extThread, threadId):
  155. ExternalThread.__init__(self, extThread, threadId)
  156. self.__dict__['daemon'] = False
  157. class Lock(core.Mutex):
  158. """ This class provides a wrapper around Panda's Mutex object.
  159. The wrapper is designed to emulate Python's own threading.Lock
  160. object. """
  161. def __init__(self, name = "PythonLock"):
  162. core.Mutex.__init__(self, name)
  163. def acquire(self, blocking = True):
  164. if blocking:
  165. core.Mutex.acquire(self)
  166. return True
  167. else:
  168. return core.Mutex.tryAcquire(self)
  169. __enter__ = acquire
  170. def __exit__(self, t, v, tb):
  171. self.release()
  172. class RLock(core.ReMutex):
  173. """ This class provides a wrapper around Panda's ReMutex object.
  174. The wrapper is designed to emulate Python's own threading.RLock
  175. object. """
  176. def __init__(self, name = "PythonRLock"):
  177. core.ReMutex.__init__(self, name)
  178. def acquire(self, blocking = True):
  179. if blocking:
  180. core.ReMutex.acquire(self)
  181. return True
  182. else:
  183. return core.ReMutex.tryAcquire(self)
  184. __enter__ = acquire
  185. def __exit__(self, t, v, tb):
  186. self.release()
  187. class Condition(core.ConditionVarFull):
  188. """ This class provides a wrapper around Panda's ConditionVarFull
  189. object. The wrapper is designed to emulate Python's own
  190. threading.Condition object. """
  191. def __init__(self, lock = None):
  192. if not lock:
  193. lock = Lock()
  194. # Panda doesn't support RLock objects used with condition
  195. # variables.
  196. assert isinstance(lock, Lock)
  197. self.__lock = lock
  198. core.ConditionVarFull.__init__(self, self.__lock)
  199. def acquire(self, *args, **kw):
  200. return self.__lock.acquire(*args, **kw)
  201. def release(self):
  202. self.__lock.release()
  203. def wait(self, timeout = None):
  204. if timeout is None:
  205. core.ConditionVarFull.wait(self)
  206. else:
  207. core.ConditionVarFull.wait(self, timeout)
  208. def notifyAll(self):
  209. core.ConditionVarFull.notifyAll(self)
  210. notify_all = notifyAll
  211. __enter__ = acquire
  212. def __exit__(self, t, v, tb):
  213. self.release()
  214. class Semaphore(core.Semaphore):
  215. """ This class provides a wrapper around Panda's Semaphore
  216. object. The wrapper is designed to emulate Python's own
  217. threading.Semaphore object. """
  218. def __init__(self, value = 1):
  219. core.Semaphore.__init__(self, value)
  220. def acquire(self, blocking = True):
  221. if blocking:
  222. core.Semaphore.acquire(self)
  223. return True
  224. else:
  225. return core.Semaphore.tryAcquire(self)
  226. __enter__ = acquire
  227. def __exit__(self, t, v, tb):
  228. self.release()
  229. class BoundedSemaphore(Semaphore):
  230. """ This class provides a wrapper around Panda's Semaphore
  231. object. The wrapper is designed to emulate Python's own
  232. threading.BoundedSemaphore object. """
  233. def __init__(self, value = 1):
  234. self.__max = value
  235. Semaphore.__init__(value)
  236. def release(self):
  237. if self.getCount() > self.__max:
  238. raise ValueError
  239. Semaphore.release(self)
  240. class Event:
  241. """ This class is designed to emulate Python's own threading.Event
  242. object. """
  243. def __init__(self):
  244. self.__lock = core.Mutex("Python Event")
  245. self.__cvar = core.ConditionVarFull(self.__lock)
  246. self.__flag = False
  247. def is_set(self):
  248. return self.__flag
  249. isSet = is_set
  250. def set(self):
  251. self.__lock.acquire()
  252. try:
  253. self.__flag = True
  254. self.__cvar.notifyAll()
  255. finally:
  256. self.__lock.release()
  257. def clear(self):
  258. self.__lock.acquire()
  259. try:
  260. self.__flag = False
  261. finally:
  262. self.__lock.release()
  263. def wait(self, timeout = None):
  264. self.__lock.acquire()
  265. try:
  266. if timeout is None:
  267. while not self.__flag:
  268. self.__cvar.wait()
  269. else:
  270. clock = core.TrueClock.getGlobalPtr()
  271. expires = clock.getShortTime() + timeout
  272. while not self.__flag:
  273. wait = expires - clock.getShortTime()
  274. if wait < 0:
  275. return
  276. self.__cvar.wait(wait)
  277. finally:
  278. self.__lock.release()
  279. class Timer(Thread):
  280. """Call a function after a specified number of seconds:
  281. t = Timer(30.0, f, args=[], kwargs={})
  282. t.start()
  283. t.cancel() # stop the timer's action if it's still waiting
  284. """
  285. def __init__(self, interval, function, args=[], kwargs={}):
  286. Thread.__init__(self)
  287. self.interval = interval
  288. self.function = function
  289. self.args = args
  290. self.kwargs = kwargs
  291. self.finished = Event()
  292. def cancel(self):
  293. """Stop the timer if it hasn't finished yet"""
  294. self.finished.set()
  295. def run(self):
  296. self.finished.wait(self.interval)
  297. if not self.finished.isSet():
  298. self.function(*self.args, **self.kwargs)
  299. self.finished.set()
  300. def _create_thread_wrapper(t, threadId):
  301. """ Creates a thread wrapper for the indicated external thread. """
  302. if isinstance(t, core.MainThread):
  303. pyt = MainThread(t, threadId)
  304. else:
  305. pyt = ExternalThread(t, threadId)
  306. return pyt
  307. def current_thread():
  308. t = core.Thread.getCurrentThread()
  309. return _thread._get_thread_wrapper(t, _create_thread_wrapper)
  310. def main_thread():
  311. t = core.Thread.getMainThread()
  312. return _thread._get_thread_wrapper(t, _create_thread_wrapper)
  313. currentThread = current_thread
  314. def enumerate():
  315. tlist = []
  316. _thread._threadsLock.acquire()
  317. try:
  318. for thread, locals, wrapper in list(_thread._threads.values()):
  319. if wrapper and wrapper.is_alive():
  320. tlist.append(wrapper)
  321. return tlist
  322. finally:
  323. _thread._threadsLock.release()
  324. def active_count():
  325. return len(enumerate())
  326. activeCount = active_count
  327. _settrace_func = None
  328. def settrace(func):
  329. global _settrace_func
  330. _settrace_func = func
  331. _setprofile_func = None
  332. def setprofile(func):
  333. global _setprofile_func
  334. _setprofile_func = func
  335. def stack_size(size = None):
  336. raise ThreadError
  337. if __debug__:
  338. def _test():
  339. from collections import deque
  340. _sleep = core.Thread.sleep
  341. _VERBOSE = False
  342. class _Verbose(object):
  343. def __init__(self, verbose=None):
  344. if verbose is None:
  345. verbose = _VERBOSE
  346. self.__verbose = verbose
  347. def _note(self, format, *args):
  348. if self.__verbose:
  349. format = format % args
  350. format = "%s: %s\n" % (
  351. currentThread().getName(), format)
  352. _sys.stderr.write(format)
  353. class BoundedQueue(_Verbose):
  354. def __init__(self, limit):
  355. _Verbose.__init__(self)
  356. self.mon = Lock(name = "BoundedQueue.mon")
  357. self.rc = Condition(self.mon)
  358. self.wc = Condition(self.mon)
  359. self.limit = limit
  360. self.queue = deque()
  361. def put(self, item):
  362. self.mon.acquire()
  363. while len(self.queue) >= self.limit:
  364. self._note("put(%s): queue full", item)
  365. self.wc.wait()
  366. self.queue.append(item)
  367. self._note("put(%s): appended, length now %d",
  368. item, len(self.queue))
  369. self.rc.notify()
  370. self.mon.release()
  371. def get(self):
  372. self.mon.acquire()
  373. while not self.queue:
  374. self._note("get(): queue empty")
  375. self.rc.wait()
  376. item = self.queue.popleft()
  377. self._note("get(): got %s, %d left", item, len(self.queue))
  378. self.wc.notify()
  379. self.mon.release()
  380. return item
  381. class ProducerThread(Thread):
  382. def __init__(self, queue, quota):
  383. Thread.__init__(self, name="Producer")
  384. self.queue = queue
  385. self.quota = quota
  386. def run(self):
  387. from random import random
  388. counter = 0
  389. while counter < self.quota:
  390. counter = counter + 1
  391. self.queue.put("%s.%d" % (self.getName(), counter))
  392. _sleep(random() * 0.00001)
  393. class ConsumerThread(Thread):
  394. def __init__(self, queue, count):
  395. Thread.__init__(self, name="Consumer")
  396. self.queue = queue
  397. self.count = count
  398. def run(self):
  399. while self.count > 0:
  400. item = self.queue.get()
  401. print(item)
  402. self.count = self.count - 1
  403. NP = 3
  404. QL = 4
  405. NI = 5
  406. Q = BoundedQueue(QL)
  407. P = []
  408. for i in range(NP):
  409. t = ProducerThread(Q, NI)
  410. t.setName("Producer-%d" % (i+1))
  411. P.append(t)
  412. C = ConsumerThread(Q, NI*NP)
  413. for t in P:
  414. t.start()
  415. _sleep(0.000001)
  416. C.start()
  417. for t in P:
  418. t.join()
  419. C.join()
  420. if __name__ == '__main__':
  421. _test()