| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027 |
- from pandac.PandaModules import PStatCollector
- from direct.directnotify.DirectNotifyGlobal import directNotify
- from direct.showbase.PythonUtil import Queue, invertDictLossless, makeFlywheelGen
- from direct.showbase.PythonUtil import itype, serialNum, safeRepr, fastRepr
- from direct.showbase.Job import Job
- import types, weakref, random, __builtin__
- def _createContainerLeak():
- def leakContainer(task=None):
- base = getBase()
- if not hasattr(base, 'leakContainer'):
- base.leakContainer = {}
- # use tuples as keys since they can't be weakref'd, and use an instance
- # since it can't be repr/eval'd
- # that will force the leak detector to hold a normal 'non-weak' reference
- class LeakKey:
- pass
- base.leakContainer[(LeakKey(),)] = {}
- # test the non-weakref object reference handling
- if random.random() < .01:
- key = random.choice(base.leakContainer.keys())
- ContainerLeakDetector.notify.debug(
- 'removing reference to leakContainer key %s so it will be garbage-collected' % safeRepr(key))
- del base.leakContainer[key]
- taskMgr.doMethodLater(10, leakContainer, 'leakContainer-%s' % serialNum())
- if task:
- return task.done
- leakContainer()
- def _createTaskLeak():
- leakTaskName = uniqueName('leakedTask')
- leakDoLaterName = uniqueName('leakedDoLater')
- def nullTask(task=None):
- return task.cont
- def nullDoLater(task=None):
- return task.done
- def leakTask(task=None, leakTaskName=leakTaskName):
- base = getBase()
- taskMgr.add(nullTask, uniqueName(leakTaskName))
- taskMgr.doMethodLater(1 << 31, nullDoLater, uniqueName(leakDoLaterName))
- taskMgr.doMethodLater(10, leakTask, 'doLeakTask-%s' % serialNum())
- if task:
- return task.done
- leakTask()
- class NoDictKey:
- pass
- class Indirection:
- """
- Represents the indirection that brings you from a container to an element of the container.
- Stored as a string to be used as part of an eval, or as a key to be looked up in a dict.
- Each dictionary dereference is individually eval'd since the dict key might have been
- garbage-collected
- TODO: store string components that are duplicates of strings in the actual system so that
- Python will keep one copy and reduce memory usage
- """
- def __init__(self, evalStr=None, dictKey=NoDictKey):
- # if this is a dictionary lookup, pass dictKey instead of evalStr
- self.evalStr = evalStr
- self.dictKey = NoDictKey
- # is the dictKey a weak reference?
- self._isWeakRef = False
- self._refCount = 0
- if dictKey is not NoDictKey:
- # if we can repr/eval the key, store it as an evalStr
- keyRepr = safeRepr(dictKey)
- useEval = False
- try:
- keyEval = eval(keyRepr)
- useEval = True
- except:
- pass
- if useEval:
- # check to make sure the eval succeeded
- if hash(keyEval) != hash(dictKey):
- useEval = False
- if useEval:
- # eval/repr succeeded, store as an evalStr
- self.evalStr = '[%s]' % keyRepr
- else:
- try:
- # store a weakref to the key
- self.dictKey = weakref.ref(dictKey)
- self._isWeakRef = True
- except TypeError, e:
- ContainerLeakDetector.notify.debug('could not weakref dict key %s' % keyRepr)
- self.dictKey = dictKey
- self._isWeakRef = False
- def destroy(self):
- # re-entrant
- self.dictKey = NoDictKey
- def acquire(self):
- self._refCount += 1
- def release(self):
- self._refCount -= 1
- if self._refCount == 0:
- self.destroy()
- def isDictKey(self):
- # is this an indirection through a dictionary?
- return self.dictKey is not NoDictKey
- def _getNonWeakDictKey(self):
- if not self._isWeakRef:
- return self.dictKey
- else:
- key = self.dictKey()
- if key is None:
- return '<garbage-collected dict key>'
- return key
- def dereferenceDictKey(self, parentDict):
- # look ourselves up in parentDict
- key = self._getNonWeakDictKey()
- # objects in __builtin__ will have parentDict==None
- if parentDict is None:
- return key
- return parentDict[key]
- def getString(self, prevIndirection=None, nextIndirection=None):
- # return our contribution to the full name of an object
- instanceDictStr = '.__dict__'
- if self.evalStr is not None:
- # if we're an instance dict, skip over this one (obj.__dict__[keyName] == obj.keyName)
- if nextIndirection is not None and self.evalStr[-len(instanceDictStr):] == instanceDictStr:
- return self.evalStr[:-len(instanceDictStr)]
- # if the previous indirection was an instance dict, change our syntax from ['key'] to .key
- if prevIndirection is not None and prevIndirection.evalStr is not None:
- if prevIndirection.evalStr[-len(instanceDictStr):] == instanceDictStr:
- return '.%s' % self.evalStr[2:-2]
- return self.evalStr
- # we're stored as a dict key
- keyRepr = safeRepr(self._getNonWeakDictKey())
- # if the previous indirection was an instance dict, change our syntax from ['key'] to .key
- if prevIndirection is not None and prevIndirection.evalStr is not None:
- if prevIndirection.evalStr[-len(instanceDictStr):] == instanceDictStr:
- return '.%s' % keyRepr
- return '[%s]' % keyRepr
- def __repr__(self):
- return self.getString()
- class ObjectRef:
- """
- stores a reference to a container in a way that does not prevent garbage
- collection of the container if possible
- stored as a series of 'indirections' (obj.foo -> '.foo', dict[key] -> '[key]', etc.)
- """
- notify = directNotify.newCategory("ObjectRef")
- class FailedEval(Exception):
- pass
- def __init__(self, indirection, objId, other=None):
- self._indirections = []
- # this is a cache of the ids of our component objects
- self._objIds = set()
- # are we building off of an existing ref?
- if other is not None:
- self._objIds = set(other._objIds)
- for ind in other._indirections:
- self._indirections.append(ind)
- self._indirections.append(indirection)
- # make sure we're not storing a reference to the actual object,
- # that could cause a memory leak
- assert type(objId) in (types.IntType, types.LongType)
- # prevent cycles (i.e. base.loader.base.loader)
- assert objId not in self._objIds
- self._objIds.add(objId)
- # make sure our indirections don't get destroyed while we're using them
- for ind in self._indirections:
- ind.acquire()
- self.notify.debug(repr(self))
- def destroy(self):
- for indirection in self._indirections:
- indirection.release()
- del self._indirections
- def getNumIndirections(self):
- return len(self._indirections)
- def goesThrough(self, obj):
- # since we cache the ids involved in this reference,
- # this isn't perfect, for example if base.myObject is reassigned
- # to a different object after this Ref was created this would return
- # false, allowing a ref to base.myObject.otherObject.myObject
- return id(obj) in self._objIds
- def _getContainerByEval(self, evalStr, curObj=None):
- if curObj is not None:
- # eval('curObj.foo.bar.someDict')
- evalStr = 'curObj%s' % evalStr
- else:
- # this eval is not based off of curObj, use the global__builtin__ namespace
- # put __builtin__ at the start if it's not already there
- bis = '__builtin__'
- if evalStr[:len(bis)] != bis:
- evalStr = '%s.%s' % (bis, evalStr)
- try:
- container = eval(evalStr)
- except NameError, ne:
- return None
- except AttributeError, ne:
- return None
- return container
- def getContainerGen(self, getInstance=False):
- # try to get a handle on the container by eval'ing and looking things
- # up in dictionaries, depending on the type of each indirection
- # if getInstance is True, will return instance instead of instance dict
- #import pdb;pdb.set_trace()
- evalStr = ''
- curObj = None
- # make sure the indirections don't go away on us
- indirections = self._indirections
- for indirection in indirections:
- indirection.acquire()
- for indirection in indirections:
- yield None
- if not indirection.isDictKey():
- # build up a string to be eval'd
- evalStr += indirection.getString()
- else:
- curObj = self._getContainerByEval(evalStr, curObj=curObj)
- if curObj is None:
- raise FailedEval(evalStr)
- # try to look up this key in the curObj dictionary
- curObj = indirection.dereferenceDictKey(curObj)
- evalStr = ''
- for indirection in indirections:
- yield None
- indirection.release()
- if getInstance:
- lenDict = len('.__dict__')
- if evalStr[-lenDict:] == '.__dict__':
- evalStr = evalStr[:-lenDict]
- # TODO: check that this is still the object we originally pointed to
- yield self._getContainerByEval(evalStr, curObj=curObj)
-
- def getEvalStrGen(self, getInstance=False):
- str = ''
- prevIndirection = None
- curIndirection = None
- nextIndirection = None
- # make sure the indirections don't go away on us
- indirections = self._indirections
- for indirection in indirections:
- indirection.acquire()
- for i in xrange(len(indirections)):
- yield None
- if i > 0:
- prevIndirection = indirections[i-1]
- else:
- prevIndirection = None
- curIndirection = indirections[i]
- if i < len(indirections)-1:
- nextIndirection = indirections[i+1]
- else:
- nextIndirection = None
- str += curIndirection.getString(prevIndirection=prevIndirection,
- nextIndirection=nextIndirection)
- if getInstance:
- lenDict = len('.__dict__')
- if str[-lenDict:] == '.__dict__':
- str = str[:-lenDict]
- for indirection in indirections:
- yield None
- indirection.release()
- yield str
- def __repr__(self):
- for result in self.getEvalStrGen():
- pass
- return result
- class FindContainers(Job):
- """
- Explore the Python graph, looking for objects that support __len__()
- """
- def __init__(self, name, leakDetector):
- Job.__init__(self, name)
- self._leakDetector = leakDetector
- self._id2ref = self._leakDetector._id2ref
- # these hold objects that we should start traversals from often and not-as-often,
- # respectively
- self._id2baseStartRef = {}
- self._id2discoveredStartRef = {}
- # these are working copies so that our iterations aren't disturbed by changes to the
- # definitive ref sets
- self._baseStartRefWorkingList = ScratchPad(refGen=nullGen(),
- source=self._id2baseStartRef)
- self._discoveredStartRefWorkingList = ScratchPad(refGen=nullGen(),
- source=self._id2discoveredStartRef)
- self.notify = self._leakDetector.notify
- ContainerLeakDetector.addPrivateObj(self.__dict__)
- # set up the base containers, the ones that hold most objects
- ref = ObjectRef(Indirection(evalStr='__builtin__.__dict__'), id(__builtin__.__dict__))
- self._id2baseStartRef[id(__builtin__.__dict__)] = ref
- # container for objects that want to make sure they are found by
- # the object exploration algorithm, including objects that exist
- # just to measure things such as C++ memory usage, scene graph size,
- # framerate, etc. See LeakDetectors.py
- if not hasattr(__builtin__, "leakDetectors"):
- __builtin__.leakDetectors = {}
- ref = ObjectRef(Indirection(evalStr='leakDetectors'), id(leakDetectors))
- self._id2baseStartRef[id(leakDetectors)] = ref
- for i in self._addContainerGen(__builtin__.__dict__, ref):
- pass
- try:
- base
- except:
- pass
- else:
- ref = ObjectRef(Indirection(evalStr='base.__dict__'), id(base.__dict__))
- self._id2baseStartRef[id(base.__dict__)] = ref
- for i in self._addContainerGen(base.__dict__, ref):
- pass
- try:
- simbase
- except:
- pass
- else:
- ref = ObjectRef(Indirection(evalStr='simbase.__dict__'), id(simbase.__dict__))
- self._id2baseStartRef[id(simbase.__dict__)] = ref
- for i in self._addContainerGen(simbase.__dict__, ref):
- pass
- def destroy(self):
- ContainerLeakDetector.removePrivateObj(self.__dict__)
- Job.destroy(self)
- def getPriority(self):
- return Job.Priorities.Low
- @staticmethod
- def getStartObjAffinity(startObj):
- # how good of a starting object is this object for traversing the object graph?
- try:
- return len(startObj)
- except:
- return 1
-
- def _isDeadEnd(self, obj, objName=None):
- if type(obj) in (types.BooleanType, types.BuiltinFunctionType,
- types.BuiltinMethodType, types.ComplexType,
- types.FloatType, types.IntType, types.LongType,
- types.NoneType, types.NotImplementedType,
- types.TypeType, types.CodeType, types.FunctionType,
- types.StringType, types.UnicodeType,
- types.TupleType):
- return True
- # if it's an internal object, ignore it
- if id(obj) in ContainerLeakDetector.PrivateIds:
- return True
- # prevent crashes in objects that define __cmp__ and don't handle strings
- if type(objName) == types.StringType and objName in ('im_self', 'im_class'):
- return True
- try:
- className = obj.__class__.__name__
- except:
- pass
- else:
- # prevent infinite recursion in built-in containers related to methods
- if className == 'method-wrapper':
- return True
- return False
- def _hasLength(self, obj):
- return hasattr(obj, '__len__')
- def _addContainerGen(self, cont, objRef):
- contId = id(cont)
- # if this container is new, or the objRef repr is shorter than what we already have,
- # put it in the table
- if contId in self._id2ref:
- for existingRepr in self._id2ref[contId].getEvalStrGen():
- yield None
- for newRepr in objRef.getEvalStrGen():
- yield None
- if contId not in self._id2ref or len(newRepr) < len(existingRepr):
- if contId in self._id2ref:
- self._leakDetector.removeContainerById(contId)
- self._id2ref[contId] = objRef
- def _addDiscoveredStartRef(self, obj, ref):
- # we've discovered an object that can be used to start an object graph traversal
- objId = id(obj)
- if objId in self._id2discoveredStartRef:
- existingRef = self._id2discoveredStartRef[objId]
- if type(existingRef) not in (types.IntType, types.LongType):
- if (existingRef.getNumIndirections() >=
- ref.getNumIndirections()):
- # the ref that we already have is more concise than the new ref
- return
- if objId in self._id2ref:
- if (self._id2ref[objId].getNumIndirections() >=
- ref.getNumIndirections()):
- # the ref that we already have is more concise than the new ref
- return
- storedItem = ref
- # if we already are storing a reference to this object, don't store a second reference
- if objId in self._id2ref:
- storedItem = objId
- self._id2discoveredStartRef[objId] = storedItem
- def run(self):
- try:
- # this yields a different set of start refs every time we start a new traversal
- # force creation of a new workingListSelector inside the while loop right off the bat
- workingListSelector = nullGen()
- # this holds the current step of the current traversal
- curObjRef = None
- while True:
- # yield up here instead of at the end, since we skip back to the
- # top of the while loop from various points
- yield None
- #import pdb;pdb.set_trace()
- if curObjRef is None:
- # choose an object to start a traversal from
- try:
- startRefWorkingList = workingListSelector.next()
- except StopIteration:
- # do relative # of traversals on each set based on how many refs it contains
- baseLen = len(self._baseStartRefWorkingList.source)
- discLen = len(self._discoveredStartRefWorkingList.source)
- minLen = float(max(1, min(baseLen, discLen)))
- # this will cut down the traversals of the larger set by 2/3
- minLen *= 3.
- workingListSelector = flywheel([self._baseStartRefWorkingList, self._discoveredStartRefWorkingList],
- [baseLen/minLen, discLen/minLen])
- yield None
- continue
- # grab the next start ref from this sequence and see if it's still valid
- while True:
- yield None
- try:
- curObjRef = startRefWorkingList.refGen.next()
- break
- except StopIteration:
- # we've run out of refs, grab a new set
- if len(startRefWorkingList.source) == 0:
- # ref set is empty, choose another
- break
- # make a generator that yields containers a # of times that is
- # proportional to their length
- for fw in makeFlywheelGen(
- startRefWorkingList.source.values(),
- countFunc=lambda x: self.getStartObjAffinity(x),
- scale=.05):
- yield None
- startRefWorkingList.refGen = fw
- if curObjRef is None:
- # this ref set is empty, choose another
- # the base set should never be empty (__builtin__ etc.)
- continue
- # do we need to go look up the object in _id2ref? sometimes we do that
- # to avoid storing multiple redundant refs to a single item
- if type(curObjRef) in (types.IntType, types.LongType):
- startId = curObjRef
- curObjRef = None
- try:
- for containerRef in self._leakDetector.getContainerByIdGen(startId):
- yield None
- except:
- # ref is invalid
- self.notify.debug('invalid startRef, stored as id %s' % startId)
- self._leakDetector.removeContainerById(startId)
- continue
- curObjRef = containerRef
- try:
- for curObj in curObjRef.getContainerGen():
- yield None
- except:
- self.notify.debug('lost current container, ref.getContainerGen() failed')
- # that container is gone, try again
- curObjRef = None
- continue
- self.notify.debug('--> %s' % curObjRef)
- #import pdb;pdb.set_trace()
- # store a copy of the current objRef
- parentObjRef = curObjRef
- # if we hit a dead end, start over from another container
- curObjRef = None
- if hasattr(curObj, '__dict__'):
- child = curObj.__dict__
- hasLength = self._hasLength(child)
- notDeadEnd = not self._isDeadEnd(child)
- if hasLength or notDeadEnd:
- # prevent cycles in the references (i.e. base.loader.base)
- if not parentObjRef.goesThrough(child):
- objRef = ObjectRef(Indirection(evalStr='.__dict__'),
- id(child), parentObjRef)
- yield None
- if hasLength:
- for i in self._addContainerGen(child, objRef):
- yield None
- if notDeadEnd:
- self._addDiscoveredStartRef(child, objRef)
- curObjRef = objRef
- continue
- if type(curObj) is types.DictType:
- key = None
- attr = None
- keys = curObj.keys()
- # we will continue traversing the object graph via one key of the dict,
- # choose it at random without taking a big chunk of CPU time
- numKeysLeft = len(keys) + 1
- for key in keys:
- yield None
- numKeysLeft -= 1
- try:
- attr = curObj[key]
- except KeyError, e:
- # this is OK because we are yielding during the iteration
- self.notify.debug('could not index into %s with key %s' % (
- parentObjRef, safeRepr(key)))
- continue
- hasLength = self._hasLength(attr)
- notDeadEnd = False
- # if we haven't picked the next ref, check if this one is a candidate
- if curObjRef is None:
- notDeadEnd = not self._isDeadEnd(attr, key)
- if hasLength or notDeadEnd:
- # prevent cycles in the references (i.e. base.loader.base)
- if not parentObjRef.goesThrough(curObj[key]):
- if curObj is __builtin__.__dict__:
- objRef = ObjectRef(Indirection(evalStr='%s' % key),
- id(curObj[key]))
- else:
- objRef = ObjectRef(Indirection(dictKey=key),
- id(curObj[key]), parentObjRef)
- yield None
- if hasLength:
- for i in self._addContainerGen(attr, objRef):
- yield None
- if notDeadEnd:
- self._addDiscoveredStartRef(attr, objRef)
- if curObjRef is None and random.randrange(numKeysLeft) == 0:
- curObjRef = objRef
- del key
- del attr
- continue
- try:
- childNames = dir(curObj)
- except:
- pass
- else:
- try:
- index = -1
- attrs = []
- while 1:
- yield None
- try:
- attr = itr.next()
- except:
- # some custom classes don't do well when iterated
- attr = None
- break
- attrs.append(attr)
- # we will continue traversing the object graph via one attr,
- # choose it at random without taking a big chunk of CPU time
- numAttrsLeft = len(attrs) + 1
- for attr in attrs:
- yield None
- index += 1
- numAttrsLeft -= 1
- hasLength = self._hasLength(attr)
- notDeadEnd = False
- if curObjRef is None:
- notDeadEnd = not self._isDeadEnd(attr)
- if hasLength or notDeadEnd:
- # prevent cycles in the references (i.e. base.loader.base)
- if not parentObjRef.goesThrough(curObj[index]):
- objRef = ObjectRef(Indirection(evalStr='[%s]' % index),
- id(curObj[index]), parentObjRef)
- yield None
- if hasLength:
- for i in self._addContainerGen(attr, objRef):
- yield None
- if notDeadEnd:
- self._addDiscoveredStartRef(attr, objRef)
- if curObjRef is None and random.randrange(numAttrsLeft) == 0:
- curObjRef = objRef
- del attr
- except StopIteration, e:
- pass
- del itr
- continue
- except Exception, e:
- print 'FindContainers job caught exception: %s' % e
- if __dev__:
- raise
- yield Job.Done
- class CheckContainers(Job):
- """
- Job to check container sizes and find potential leaks; sub-job of ContainerLeakDetector
- """
- ReprItems = 5
-
- def __init__(self, name, leakDetector, index):
- Job.__init__(self, name)
- self._leakDetector = leakDetector
- self.notify = self._leakDetector.notify
- self._index = index
- ContainerLeakDetector.addPrivateObj(self.__dict__)
- def destroy(self):
- ContainerLeakDetector.removePrivateObj(self.__dict__)
- Job.destroy(self)
- def getPriority(self):
- return Job.Priorities.Normal
-
- def run(self):
- try:
- self._leakDetector._index2containerId2len[self._index] = {}
- ids = self._leakDetector.getContainerIds()
- # record the current len of each container
- for objId in ids:
- yield None
- try:
- for result in self._leakDetector.getContainerByIdGen(objId):
- yield None
- container = result
- except Exception, e:
- # this container no longer exists
- if self.notify.getDebug():
- for contName in self._leakDetector.getContainerNameByIdGen(objId):
- yield None
- self.notify.debug(
- '%s no longer exists; caught exception in getContainerById (%s)' % (
- contName, e))
- self._leakDetector.removeContainerById(objId)
- continue
- if container is None:
- # this container no longer exists
- if self.notify.getDebug():
- for contName in self._leakDetector.getContainerNameByIdGen(objId):
- yield None
- self.notify.debug('%s no longer exists; getContainerById returned None' %
- contName)
- self._leakDetector.removeContainerById(objId)
- continue
- try:
- cLen = len(container)
- except Exception, e:
- # this container no longer exists
- if self.notify.getDebug():
- for contName in self._leakDetector.getContainerNameByIdGen(objId):
- yield None
- self.notify.debug(
- '%s is no longer a container, it is now %s (%s)' %
- (contName, safeRepr(container), e))
- self._leakDetector.removeContainerById(objId)
- continue
- self._leakDetector._index2containerId2len[self._index][objId] = cLen
- # compare the current len of each container to past lens
- if self._index > 0:
- idx2id2len = self._leakDetector._index2containerId2len
- for objId in idx2id2len[self._index]:
- yield None
- if objId in idx2id2len[self._index-1]:
- diff = idx2id2len[self._index][objId] - idx2id2len[self._index-1][objId]
- """
- # this check is too spammy
- if diff > 20:
- if diff > idx2id2len[self._index-1][objId]:
- minutes = (self._leakDetector._index2delay[self._index] -
- self._leakDetector._index2delay[self._index-1]) / 60.
- name = self._leakDetector.getContainerNameById(objId)
- if idx2id2len[self._index-1][objId] != 0:
- percent = 100. * (float(diff) / float(idx2id2len[self._index-1][objId]))
- try:
- for container in self._leakDetector.getContainerByIdGen(objId):
- yield None
- except:
- # TODO
- self.notify.debug('caught exception in getContainerByIdGen (1)')
- else:
- self.notify.warning(
- '%s (%s) grew %.2f%% in %.2f minutes (%s items at last measurement, current contents: %s)' % (
- name, itype(container), percent, minutes, idx2id2len[self._index][objId],
- fastRepr(container, maxLen=CheckContainers.ReprItems)))
- yield None
- """
- if (self._index > 2 and
- objId in idx2id2len[self._index-2] and
- objId in idx2id2len[self._index-3]):
- diff2 = idx2id2len[self._index-1][objId] - idx2id2len[self._index-2][objId]
- diff3 = idx2id2len[self._index-2][objId] - idx2id2len[self._index-3][objId]
- if self._index <= 4:
- if diff > 0 and diff2 > 0 and diff3 > 0:
- name = self._leakDetector.getContainerNameById(objId)
- try:
- for container in self._leakDetector.getContainerByIdGen(objId):
- yield None
- except:
- # TODO
- self.notify.debug('caught exception in getContainerByIdGen (2)')
- else:
- msg = ('%s (%s) consistently increased in size over the last '
- '3 periods (%s items at last measurement, current contents: %s)' %
- (name, itype(container), idx2id2len[self._index][objId],
- fastRepr(container, maxLen=CheckContainers.ReprItems)))
- self.notify.warning(msg)
- yield None
- elif (objId in idx2id2len[self._index-4] and
- objId in idx2id2len[self._index-5]):
- # if size has consistently increased over the last 5 checks,
- # send out a warning
- diff4 = idx2id2len[self._index-3][objId] - idx2id2len[self._index-4][objId]
- diff5 = idx2id2len[self._index-4][objId] - idx2id2len[self._index-5][objId]
- if diff > 0 and diff2 > 0 and diff3 > 0 and diff4 > 0 and diff5 > 0:
- name = self._leakDetector.getContainerNameById(objId)
- try:
- for container in self._leakDetector.getContainerByIdGen(objId):
- yield None
- except:
- # TODO
- self.notify.debug('caught exception in getContainerByIdGen (3)')
- else:
- msg = ('leak detected: %s (%s) consistently increased in size over the last '
- '5 periods (%s items at last measurement, current contents: %s)' %
- (name, itype(container), idx2id2len[self._index][objId],
- fastRepr(container, maxLen=CheckContainers.ReprItems)))
- self.notify.warning(msg)
- yield None
- messenger.send(self._leakDetector.getLeakEvent(), [container, name])
- if config.GetBool('pdb-on-leak-detect', 0):
- import pdb;pdb.set_trace()
- pass
- except Exception, e:
- print 'CheckContainers job caught exception: %s' % e
- if __dev__:
- raise
- yield Job.Done
- class FPTObjsOfType(Job):
- def __init__(self, name, leakDetector, otn, doneCallback=None):
- Job.__init__(self, name)
- self._leakDetector = leakDetector
- self.notify = self._leakDetector.notify
- self._otn = otn
- self._doneCallback = doneCallback
- self._ldde = self._leakDetector._getDestroyEvent()
- self.accept(self._ldde, self._handleLDDestroy)
- ContainerLeakDetector.addPrivateObj(self.__dict__)
- def destroy(self):
- self.ignore(self._ldde)
- self._leakDetector = None
- self._doneCallback = None
- ContainerLeakDetector.removePrivateObj(self.__dict__)
- Job.destroy(self)
- def _handleLDDestroy(self):
- self.destroy()
-
- def getPriority(self):
- return Job.Priorities.High
-
- def run(self):
- ids = self._leakDetector.getContainerIds()
- try:
- for id in ids:
- getInstance = (self._otn.lower() not in 'dict')
- yield None
- try:
- for container in self._leakDetector.getContainerByIdGen(
- id, getInstance=getInstance):
- yield None
- except:
- pass
- else:
- if hasattr(container, '__class__'):
- cName = container.__class__.__name__
- else:
- cName = container.__name__
- if (self._otn.lower() in cName.lower()):
- try:
- for ptc in self._leakDetector.getContainerNameByIdGen(
- id, getInstance=getInstance):
- yield None
- except:
- pass
- else:
- print 'GPTC(' + self._otn + '):' + self.getJobName() + ': ' + ptc
- except Exception, e:
- print 'FPTObjsOfType job caught exception: %s' % e
- if __dev__:
- raise
- yield Job.Done
- def finished(self):
- if self._doneCallback:
- self._doneCallback(self)
- class PruneObjectRefs(Job):
- """
- Job to destroy any container refs that are no longer valid.
- Checks validity by asking for each container
- """
- def __init__(self, name, leakDetector):
- Job.__init__(self, name)
- self._leakDetector = leakDetector
- self.notify = self._leakDetector.notify
- ContainerLeakDetector.addPrivateObj(self.__dict__)
- def destroy(self):
- ContainerLeakDetector.removePrivateObj(self.__dict__)
- Job.destroy(self)
- def getPriority(self):
- return Job.Priorities.Normal
-
- def run(self):
- try:
- ids = self._leakDetector.getContainerIds()
- for id in ids:
- yield None
- try:
- for container in self._leakDetector.getContainerByIdGen(id):
- yield None
- except:
- # reference is invalid, remove it
- self._leakDetector.removeContainerById(id)
- _id2baseStartRef = self._leakDetector._findContainersJob._id2baseStartRef
- ids = _id2baseStartRef.keys()
- for id in ids:
- yield None
- try:
- for container in _id2baseStartRef[id].getContainerGen():
- yield None
- except:
- # reference is invalid, remove it
- del _id2baseStartRef[id]
- _id2discoveredStartRef = self._leakDetector._findContainersJob._id2discoveredStartRef
- ids = _id2discoveredStartRef.keys()
- for id in ids:
- yield None
- try:
- for container in _id2discoveredStartRef[id].getContainerGen():
- yield None
- except:
- # reference is invalid, remove it
- del _id2discoveredStartRef[id]
- except Exception, e:
- print 'PruneObjectRefs job caught exception: %s' % e
- if __dev__:
- raise
- yield Job.Done
- class ContainerLeakDetector(Job):
- """
- Low-priority Python object-graph walker that looks for leaking containers.
- To reduce memory usage, this does a random walk of the Python objects to
- discover containers rather than keep a set of all visited objects; it may
- visit the same object many times but eventually it will discover every object.
- Checks container sizes at ever-increasing intervals.
- """
- notify = directNotify.newCategory("ContainerLeakDetector")
- # set of containers that should not be examined
- PrivateIds = set()
- def __init__(self, name, firstCheckDelay = None):
- Job.__init__(self, name)
- self._serialNum = serialNum()
- self._findContainersJob = None
- self._checkContainersJob = None
- self._pruneContainersJob = None
- if firstCheckDelay is None:
- firstCheckDelay = 60. * 15.
- # divide by two, since the first check just takes length measurements and
- # doesn't check for leaks
- self._nextCheckDelay = firstCheckDelay/2.
- self._checkDelayScale = config.GetFloat('leak-detector-check-delay-scale', 1.5)
- self._pruneTaskPeriod = config.GetFloat('leak-detector-prune-period', 60. * 30.)
- # main dict of id(container)->containerRef
- self._id2ref = {}
- # storage for results of check-container job
- self._index2containerId2len = {}
- self._index2delay = {}
- if config.GetBool('leak-container', 0):
- _createContainerLeak()
- if config.GetBool('leak-tasks', 0):
- _createTaskLeak()
- # don't check our own tables for leaks
- ContainerLeakDetector.addPrivateObj(ContainerLeakDetector.PrivateIds)
- ContainerLeakDetector.addPrivateObj(self.__dict__)
- self.setPriority(Job.Priorities.Min)
- jobMgr.add(self)
- def destroy(self):
- messenger.send(self._getDestroyEvent())
- self.ignoreAll()
- if self._pruneContainersJob is not None:
- jobMgr.remove(self._pruneContainersJob)
- self._pruneContainersJob = None
- if self._checkContainersJob is not None:
- jobMgr.remove(self._checkContainersJob)
- self._checkContainersJob = None
- jobMgr.remove(self._findContainersJob)
- self._findContainersJob = None
- del self._id2ref
- del self._index2containerId2len
- del self._index2delay
- def _getDestroyEvent(self):
- # sent when leak detector is about to be destroyed
- return 'cldDestroy-%s' % self._serialNum
-
- def getLeakEvent(self):
- # sent when a leak is detected
- # passes description string as argument
- return 'containerLeakDetected-%s' % self._serialNum
- @classmethod
- def addPrivateObj(cls, obj):
- cls.PrivateIds.add(id(obj))
- @classmethod
- def removePrivateObj(cls, obj):
- cls.PrivateIds.remove(id(obj))
- def _getCheckTaskName(self):
- return 'checkForLeakingContainers-%s' % self._serialNum
- def _getPruneTaskName(self):
- return 'pruneLeakingContainerRefs-%s' % self._serialNum
- def getContainerIds(self):
- return self._id2ref.keys()
- def getContainerByIdGen(self, id, **kwArgs):
- # return a generator to look up a container
- return self._id2ref[id].getContainerGen(**kwArgs)
- def getContainerById(self, id):
- for result in self._id2ref[id].getContainerGen():
- pass
- return result
- def getContainerNameByIdGen(self, id, **kwArgs):
- return self._id2ref[id].getEvalStrGen(**kwArgs)
- def getContainerNameById(self, id):
- if id in self._id2ref:
- return repr(self._id2ref[id])
- return '<unknown container>'
- def removeContainerById(self, id):
- if id in self._id2ref:
- self._id2ref[id].destroy()
- del self._id2ref[id]
- def run(self):
- # start looking for containers
- self._findContainersJob = FindContainers(
- '%s-findContainers' % self.getJobName(), self)
- jobMgr.add(self._findContainersJob)
- self._scheduleNextLeakCheck()
- self._scheduleNextPruning()
- while True:
- yield Job.Sleep
- def getPathsToContainers(self, name, ot, doneCallback=None):
- j = FPTObjsOfType(name, self, ot, doneCallback)
- jobMgr.add(j)
- return j
-
- def _scheduleNextLeakCheck(self):
- taskMgr.doMethodLater(self._nextCheckDelay, self._checkForLeaks,
- self._getCheckTaskName())
- # delay between checks
- # fib: 1 1 2 3 5 8 13 21 34 55 89
- # * 2.: 1 2 4 8 16 32 64 128 256 512 1024
- # * 1.5: 1 1.5 2.3 3.4 5.1 7.6 11.4 17.1 25.6 38.4 57.7
- #
- # delay from job start
- # fib: 1 2 4 7 12 20 33 54 88 143 232
- # * 2.: 1 3 7 15 31 63 127 255 511 1023 2047
- # * 1.5: 1 2.5 4.75 8.1 13.2 20.8 32.2 49.3 74.9 113.3 171
- self._nextCheckDelay = self._nextCheckDelay * self._checkDelayScale
- def _checkForLeaks(self, task=None):
- self._index2delay[len(self._index2containerId2len)] = self._nextCheckDelay
- self._checkContainersJob = CheckContainers(
- '%s-checkForLeaks' % self.getJobName(), self, len(self._index2containerId2len))
- self.acceptOnce(self._checkContainersJob.getFinishedEvent(),
- self._scheduleNextLeakCheck)
- jobMgr.add(self._checkContainersJob)
- return task.done
- def _scheduleNextPruning(self):
- taskMgr.doMethodLater(self._pruneTaskPeriod, self._pruneObjectRefs,
- self._getPruneTaskName())
- def _pruneObjectRefs(self, task=None):
- self._pruneContainersJob = PruneObjectRefs(
- '%s-pruneObjectRefs' % self.getJobName(), self)
- self.acceptOnce(self._pruneContainersJob.getFinishedEvent(),
- self._scheduleNextPruning)
- jobMgr.add(self._pruneContainersJob)
- return task.done
|