| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542 |
- """ This module reimplements Python's native threading module using Panda
- threading constructs. It's designed as a drop-in replacement for the
- threading module for code that works with Panda; it is necessary because
- in some compilation models, Panda's threading constructs are
- incompatible with the OS-provided threads used by Python's thread
- module.
- This module implements the threading module with a thin layer over
- Panda's threading constructs. As such, the semantics are close to,
- but not precisely, the semantics documented for Python's standard
- threading module. If you really do require strict adherence to
- Python's semantics, see the threading2 module instead.
- However, if you don't need such strict adherence to Python's original
- semantics, this module is probably a better choice. It is likely to
- be slighly faster than the threading2 module (and even slightly faster
- than Python's own threading module). It is also better integrated
- with Panda's threads, so that Panda's thread debug mechanisms will be
- easier to use and understand.
- It is permissible to mix-and-match both threading and threading2
- within the same application. """
- from panda3d import core
- from direct.stdpy import thread as _thread
- import sys as _sys
- import weakref
- __all__ = [
- 'Thread',
- 'Lock', 'RLock',
- 'Condition',
- 'Semaphore', 'BoundedSemaphore',
- 'Event',
- 'Timer',
- 'ThreadError',
- 'local',
- 'current_thread',
- 'main_thread',
- 'enumerate', 'active_count',
- 'settrace', 'setprofile', 'stack_size',
- 'TIMEOUT_MAX',
- ]
- TIMEOUT_MAX = _thread.TIMEOUT_MAX
- local = _thread._local
- _newname = _thread._newname
- ThreadError = _thread.error
- class ThreadBase:
- """ A base class for both Thread and ExternalThread in this
- module. """
- def __init__(self):
- pass
- def getName(self):
- return self.name
- def isDaemon(self):
- return self.daemon
- def setDaemon(self, daemon):
- if self.is_alive():
- raise RuntimeError
- self.__dict__['daemon'] = daemon
- def __setattr__(self, key, value):
- if key == 'name':
- self.setName(value)
- elif key == 'ident':
- raise AttributeError
- elif key == 'daemon':
- self.setDaemon(value)
- else:
- self.__dict__[key] = value
- # Copy these static methods from Panda's Thread object. These are
- # useful if you may be running in Panda's SIMPLE_THREADS compilation
- # mode.
- ThreadBase.forceYield = core.Thread.forceYield
- ThreadBase.considerYield = core.Thread.considerYield
- class Thread(ThreadBase):
- """ This class provides a wrapper around Panda's PythonThread
- object. The wrapper is designed to emulate Python's own
- threading.Thread object. """
- def __init__(self, group=None, target=None, name=None, args=(), kwargs={}, daemon=None):
- ThreadBase.__init__(self)
- assert group is None
- self.__target = target
- self.__args = args
- self.__kwargs = kwargs
- if not name:
- name = _newname()
- current = current_thread()
- if daemon is not None:
- self.__dict__['daemon'] = daemon
- else:
- self.__dict__['daemon'] = current.daemon
- self.__dict__['name'] = name
- def call_run():
- # As soon as the thread is done, break the circular reference.
- try:
- self.run()
- finally:
- self.__thread = None
- _thread._remove_thread_id(self.ident)
- self.__thread = core.PythonThread(call_run, None, name, name)
- threadId = _thread._add_thread(self.__thread, weakref.proxy(self))
- self.__dict__['ident'] = threadId
- def __del__(self):
- # On interpreter shutdown, the _thread module might have
- # already been cleaned up.
- if _thread and _thread._remove_thread_id:
- _thread._remove_thread_id(self.ident)
- def is_alive(self):
- thread = self.__thread
- return thread is not None and thread.is_started()
- isAlive = is_alive
- def start(self):
- thread = self.__thread
- if thread is None or thread.is_started():
- raise RuntimeError
- if not thread.start(core.TPNormal, True):
- raise RuntimeError
- def run(self):
- if _settrace_func:
- _sys.settrace(_settrace_func)
- if _setprofile_func:
- _sys.setprofile(_setprofile_func)
- self.__target(*self.__args, **self.__kwargs)
- def join(self, timeout = None):
- # We don't support a timed join here, sorry.
- assert timeout is None
- thread = self.__thread
- if thread is not None:
- thread.join()
- # Clear the circular reference.
- self.__thread = None
- _thread._remove_thread_id(self.ident)
- def setName(self, name):
- self.__dict__['name'] = name
- self.__thread.setName(name)
- class ExternalThread(ThreadBase):
- """ Returned for a Thread object that wasn't created by this
- interface. """
- def __init__(self, extThread, threadId):
- ThreadBase.__init__(self)
- self.__thread = extThread
- self.__dict__['daemon'] = True
- self.__dict__['name'] = self.__thread.getName()
- self.__dict__['ident'] = threadId
- def is_alive(self):
- return self.__thread.isStarted()
- def isAlive(self):
- return self.__thread.isStarted()
- def start(self):
- raise RuntimeError
- def run(self):
- raise RuntimeError
- def join(self, timeout = None):
- raise RuntimeError
- def setDaemon(self, daemon):
- raise RuntimeError
- class MainThread(ExternalThread):
- """ Returned for the MainThread object. """
- def __init__(self, extThread, threadId):
- ExternalThread.__init__(self, extThread, threadId)
- self.__dict__['daemon'] = False
- class Lock(core.Mutex):
- """ This class provides a wrapper around Panda's Mutex object.
- The wrapper is designed to emulate Python's own threading.Lock
- object. """
- def __init__(self, name = "PythonLock"):
- core.Mutex.__init__(self, name)
- class RLock(core.ReMutex):
- """ This class provides a wrapper around Panda's ReMutex object.
- The wrapper is designed to emulate Python's own threading.RLock
- object. """
- def __init__(self, name = "PythonRLock"):
- core.ReMutex.__init__(self, name)
- class Condition(core.ConditionVar):
- """ This class provides a wrapper around Panda's ConditionVar
- object. The wrapper is designed to emulate Python's own
- threading.Condition object. """
- def __init__(self, lock = None):
- if not lock:
- lock = Lock()
- # Panda doesn't support RLock objects used with condition
- # variables.
- assert isinstance(lock, Lock)
- self.__lock = lock
- core.ConditionVar.__init__(self, self.__lock)
- def acquire(self, *args, **kw):
- return self.__lock.acquire(*args, **kw)
- def release(self):
- self.__lock.release()
- def wait(self, timeout = None):
- if timeout is None:
- core.ConditionVar.wait(self)
- else:
- core.ConditionVar.wait(self, timeout)
- def notifyAll(self):
- core.ConditionVar.notifyAll(self)
- notify_all = notifyAll
- __enter__ = acquire
- def __exit__(self, t, v, tb):
- self.release()
- class Semaphore(core.Semaphore):
- """ This class provides a wrapper around Panda's Semaphore
- object. The wrapper is designed to emulate Python's own
- threading.Semaphore object. """
- def __init__(self, value = 1):
- core.Semaphore.__init__(self, value)
- def acquire(self, blocking = True):
- if blocking:
- core.Semaphore.acquire(self)
- return True
- else:
- return core.Semaphore.tryAcquire(self)
- __enter__ = acquire
- def __exit__(self, t, v, tb):
- self.release()
- class BoundedSemaphore(Semaphore):
- """ This class provides a wrapper around Panda's Semaphore
- object. The wrapper is designed to emulate Python's own
- threading.BoundedSemaphore object. """
- def __init__(self, value = 1):
- self.__max = value
- Semaphore.__init__(value)
- def release(self):
- if self.getCount() > self.__max:
- raise ValueError
- Semaphore.release(self)
- class Event:
- """ This class is designed to emulate Python's own threading.Event
- object. """
- def __init__(self):
- self.__lock = core.Mutex("Python Event")
- self.__cvar = core.ConditionVar(self.__lock)
- self.__flag = False
- def is_set(self):
- return self.__flag
- isSet = is_set
- def set(self):
- self.__lock.acquire()
- try:
- self.__flag = True
- self.__cvar.notifyAll()
- finally:
- self.__lock.release()
- def clear(self):
- self.__lock.acquire()
- try:
- self.__flag = False
- finally:
- self.__lock.release()
- def wait(self, timeout = None):
- self.__lock.acquire()
- try:
- if timeout is None:
- while not self.__flag:
- self.__cvar.wait()
- else:
- clock = core.TrueClock.getGlobalPtr()
- expires = clock.getShortTime() + timeout
- while not self.__flag:
- wait = expires - clock.getShortTime()
- if wait < 0:
- return
- self.__cvar.wait(wait)
- finally:
- self.__lock.release()
- class Timer(Thread):
- """Call a function after a specified number of seconds:
- t = Timer(30.0, f, args=[], kwargs={})
- t.start()
- t.cancel() # stop the timer's action if it's still waiting
- """
- def __init__(self, interval, function, args=[], kwargs={}):
- Thread.__init__(self)
- self.interval = interval
- self.function = function
- self.args = args
- self.kwargs = kwargs
- self.finished = Event()
- def cancel(self):
- """Stop the timer if it hasn't finished yet"""
- self.finished.set()
- def run(self):
- self.finished.wait(self.interval)
- if not self.finished.isSet():
- self.function(*self.args, **self.kwargs)
- self.finished.set()
- def _create_thread_wrapper(t, threadId):
- """ Creates a thread wrapper for the indicated external thread. """
- if isinstance(t, core.MainThread):
- pyt = MainThread(t, threadId)
- else:
- pyt = ExternalThread(t, threadId)
- return pyt
- def current_thread():
- t = core.Thread.getCurrentThread()
- return _thread._get_thread_wrapper(t, _create_thread_wrapper)
- def main_thread():
- t = core.Thread.getMainThread()
- return _thread._get_thread_wrapper(t, _create_thread_wrapper)
- currentThread = current_thread
- def enumerate():
- tlist = []
- _thread._threadsLock.acquire()
- try:
- for thread, locals, wrapper in list(_thread._threads.values()):
- if wrapper and wrapper.is_alive():
- tlist.append(wrapper)
- return tlist
- finally:
- _thread._threadsLock.release()
- def active_count():
- return len(enumerate())
- activeCount = active_count
- _settrace_func = None
- def settrace(func):
- global _settrace_func
- _settrace_func = func
- _setprofile_func = None
- def setprofile(func):
- global _setprofile_func
- _setprofile_func = func
- def stack_size(size = None):
- raise ThreadError
- if __debug__:
- def _test():
- from collections import deque
- _sleep = core.Thread.sleep
- _VERBOSE = False
- class _Verbose(object):
- def __init__(self, verbose=None):
- if verbose is None:
- verbose = _VERBOSE
- self.__verbose = verbose
- def _note(self, format, *args):
- if self.__verbose:
- format = format % args
- format = "%s: %s\n" % (
- currentThread().getName(), format)
- _sys.stderr.write(format)
- class BoundedQueue(_Verbose):
- def __init__(self, limit):
- _Verbose.__init__(self)
- self.mon = Lock(name = "BoundedQueue.mon")
- self.rc = Condition(self.mon)
- self.wc = Condition(self.mon)
- self.limit = limit
- self.queue = deque()
- def put(self, item):
- self.mon.acquire()
- while len(self.queue) >= self.limit:
- self._note("put(%s): queue full", item)
- self.wc.wait()
- self.queue.append(item)
- self._note("put(%s): appended, length now %d",
- item, len(self.queue))
- self.rc.notify()
- self.mon.release()
- def get(self):
- self.mon.acquire()
- while not self.queue:
- self._note("get(): queue empty")
- self.rc.wait()
- item = self.queue.popleft()
- self._note("get(): got %s, %d left", item, len(self.queue))
- self.wc.notify()
- self.mon.release()
- return item
- class ProducerThread(Thread):
- def __init__(self, queue, quota):
- Thread.__init__(self, name="Producer")
- self.queue = queue
- self.quota = quota
- def run(self):
- from random import random
- counter = 0
- while counter < self.quota:
- counter = counter + 1
- self.queue.put("%s.%d" % (self.getName(), counter))
- _sleep(random() * 0.00001)
- class ConsumerThread(Thread):
- def __init__(self, queue, count):
- Thread.__init__(self, name="Consumer")
- self.queue = queue
- self.count = count
- def run(self):
- while self.count > 0:
- item = self.queue.get()
- print(item)
- self.count = self.count - 1
- NP = 3
- QL = 4
- NI = 5
- Q = BoundedQueue(QL)
- P = []
- for i in range(NP):
- t = ProducerThread(Q, NI)
- t.setName("Producer-%d" % (i+1))
- P.append(t)
- C = ConsumerThread(Q, NI*NP)
- for t in P:
- t.start()
- _sleep(0.000001)
- C.start()
- for t in P:
- t.join()
- C.join()
- if __name__ == '__main__':
- _test()
|