ContainerLeakDetector.py 45 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027
  1. from pandac.PandaModules import PStatCollector
  2. from direct.directnotify.DirectNotifyGlobal import directNotify
  3. from direct.showbase.PythonUtil import Queue, invertDictLossless, makeFlywheelGen
  4. from direct.showbase.PythonUtil import itype, serialNum, safeRepr, fastRepr
  5. from direct.showbase.Job import Job
  6. import types, weakref, random, __builtin__
  7. def _createContainerLeak():
  8. def leakContainer(task=None):
  9. base = getBase()
  10. if not hasattr(base, 'leakContainer'):
  11. base.leakContainer = {}
  12. # use tuples as keys since they can't be weakref'd, and use an instance
  13. # since it can't be repr/eval'd
  14. # that will force the leak detector to hold a normal 'non-weak' reference
  15. class LeakKey:
  16. pass
  17. base.leakContainer[(LeakKey(),)] = {}
  18. # test the non-weakref object reference handling
  19. if random.random() < .01:
  20. key = random.choice(base.leakContainer.keys())
  21. ContainerLeakDetector.notify.debug(
  22. 'removing reference to leakContainer key %s so it will be garbage-collected' % safeRepr(key))
  23. del base.leakContainer[key]
  24. taskMgr.doMethodLater(10, leakContainer, 'leakContainer-%s' % serialNum())
  25. if task:
  26. return task.done
  27. leakContainer()
  28. def _createTaskLeak():
  29. leakTaskName = uniqueName('leakedTask')
  30. leakDoLaterName = uniqueName('leakedDoLater')
  31. def nullTask(task=None):
  32. return task.cont
  33. def nullDoLater(task=None):
  34. return task.done
  35. def leakTask(task=None, leakTaskName=leakTaskName):
  36. base = getBase()
  37. taskMgr.add(nullTask, uniqueName(leakTaskName))
  38. taskMgr.doMethodLater(1 << 31, nullDoLater, uniqueName(leakDoLaterName))
  39. taskMgr.doMethodLater(10, leakTask, 'doLeakTask-%s' % serialNum())
  40. if task:
  41. return task.done
  42. leakTask()
  43. class NoDictKey:
  44. pass
  45. class Indirection:
  46. """
  47. Represents the indirection that brings you from a container to an element of the container.
  48. Stored as a string to be used as part of an eval, or as a key to be looked up in a dict.
  49. Each dictionary dereference is individually eval'd since the dict key might have been
  50. garbage-collected
  51. TODO: store string components that are duplicates of strings in the actual system so that
  52. Python will keep one copy and reduce memory usage
  53. """
  54. def __init__(self, evalStr=None, dictKey=NoDictKey):
  55. # if this is a dictionary lookup, pass dictKey instead of evalStr
  56. self.evalStr = evalStr
  57. self.dictKey = NoDictKey
  58. # is the dictKey a weak reference?
  59. self._isWeakRef = False
  60. self._refCount = 0
  61. if dictKey is not NoDictKey:
  62. # if we can repr/eval the key, store it as an evalStr
  63. keyRepr = safeRepr(dictKey)
  64. useEval = False
  65. try:
  66. keyEval = eval(keyRepr)
  67. useEval = True
  68. except:
  69. pass
  70. if useEval:
  71. # check to make sure the eval succeeded
  72. if hash(keyEval) != hash(dictKey):
  73. useEval = False
  74. if useEval:
  75. # eval/repr succeeded, store as an evalStr
  76. self.evalStr = '[%s]' % keyRepr
  77. else:
  78. try:
  79. # store a weakref to the key
  80. self.dictKey = weakref.ref(dictKey)
  81. self._isWeakRef = True
  82. except TypeError, e:
  83. ContainerLeakDetector.notify.debug('could not weakref dict key %s' % keyRepr)
  84. self.dictKey = dictKey
  85. self._isWeakRef = False
  86. def destroy(self):
  87. # re-entrant
  88. self.dictKey = NoDictKey
  89. def acquire(self):
  90. self._refCount += 1
  91. def release(self):
  92. self._refCount -= 1
  93. if self._refCount == 0:
  94. self.destroy()
  95. def isDictKey(self):
  96. # is this an indirection through a dictionary?
  97. return self.dictKey is not NoDictKey
  98. def _getNonWeakDictKey(self):
  99. if not self._isWeakRef:
  100. return self.dictKey
  101. else:
  102. key = self.dictKey()
  103. if key is None:
  104. return '<garbage-collected dict key>'
  105. return key
  106. def dereferenceDictKey(self, parentDict):
  107. # look ourselves up in parentDict
  108. key = self._getNonWeakDictKey()
  109. # objects in __builtin__ will have parentDict==None
  110. if parentDict is None:
  111. return key
  112. return parentDict[key]
  113. def getString(self, prevIndirection=None, nextIndirection=None):
  114. # return our contribution to the full name of an object
  115. instanceDictStr = '.__dict__'
  116. if self.evalStr is not None:
  117. # if we're an instance dict, skip over this one (obj.__dict__[keyName] == obj.keyName)
  118. if nextIndirection is not None and self.evalStr[-len(instanceDictStr):] == instanceDictStr:
  119. return self.evalStr[:-len(instanceDictStr)]
  120. # if the previous indirection was an instance dict, change our syntax from ['key'] to .key
  121. if prevIndirection is not None and prevIndirection.evalStr is not None:
  122. if prevIndirection.evalStr[-len(instanceDictStr):] == instanceDictStr:
  123. return '.%s' % self.evalStr[2:-2]
  124. return self.evalStr
  125. # we're stored as a dict key
  126. keyRepr = safeRepr(self._getNonWeakDictKey())
  127. # if the previous indirection was an instance dict, change our syntax from ['key'] to .key
  128. if prevIndirection is not None and prevIndirection.evalStr is not None:
  129. if prevIndirection.evalStr[-len(instanceDictStr):] == instanceDictStr:
  130. return '.%s' % keyRepr
  131. return '[%s]' % keyRepr
  132. def __repr__(self):
  133. return self.getString()
  134. class ObjectRef:
  135. """
  136. stores a reference to a container in a way that does not prevent garbage
  137. collection of the container if possible
  138. stored as a series of 'indirections' (obj.foo -> '.foo', dict[key] -> '[key]', etc.)
  139. """
  140. notify = directNotify.newCategory("ObjectRef")
  141. class FailedEval(Exception):
  142. pass
  143. def __init__(self, indirection, objId, other=None):
  144. self._indirections = []
  145. # this is a cache of the ids of our component objects
  146. self._objIds = set()
  147. # are we building off of an existing ref?
  148. if other is not None:
  149. self._objIds = set(other._objIds)
  150. for ind in other._indirections:
  151. self._indirections.append(ind)
  152. self._indirections.append(indirection)
  153. # make sure we're not storing a reference to the actual object,
  154. # that could cause a memory leak
  155. assert type(objId) in (types.IntType, types.LongType)
  156. # prevent cycles (i.e. base.loader.base.loader)
  157. assert objId not in self._objIds
  158. self._objIds.add(objId)
  159. # make sure our indirections don't get destroyed while we're using them
  160. for ind in self._indirections:
  161. ind.acquire()
  162. self.notify.debug(repr(self))
  163. def destroy(self):
  164. for indirection in self._indirections:
  165. indirection.release()
  166. del self._indirections
  167. def getNumIndirections(self):
  168. return len(self._indirections)
  169. def goesThrough(self, obj):
  170. # since we cache the ids involved in this reference,
  171. # this isn't perfect, for example if base.myObject is reassigned
  172. # to a different object after this Ref was created this would return
  173. # false, allowing a ref to base.myObject.otherObject.myObject
  174. return id(obj) in self._objIds
  175. def _getContainerByEval(self, evalStr, curObj=None):
  176. if curObj is not None:
  177. # eval('curObj.foo.bar.someDict')
  178. evalStr = 'curObj%s' % evalStr
  179. else:
  180. # this eval is not based off of curObj, use the global__builtin__ namespace
  181. # put __builtin__ at the start if it's not already there
  182. bis = '__builtin__'
  183. if evalStr[:len(bis)] != bis:
  184. evalStr = '%s.%s' % (bis, evalStr)
  185. try:
  186. container = eval(evalStr)
  187. except NameError, ne:
  188. return None
  189. except AttributeError, ne:
  190. return None
  191. return container
  192. def getContainerGen(self, getInstance=False):
  193. # try to get a handle on the container by eval'ing and looking things
  194. # up in dictionaries, depending on the type of each indirection
  195. # if getInstance is True, will return instance instead of instance dict
  196. #import pdb;pdb.set_trace()
  197. evalStr = ''
  198. curObj = None
  199. # make sure the indirections don't go away on us
  200. indirections = self._indirections
  201. for indirection in indirections:
  202. indirection.acquire()
  203. for indirection in indirections:
  204. yield None
  205. if not indirection.isDictKey():
  206. # build up a string to be eval'd
  207. evalStr += indirection.getString()
  208. else:
  209. curObj = self._getContainerByEval(evalStr, curObj=curObj)
  210. if curObj is None:
  211. raise FailedEval(evalStr)
  212. # try to look up this key in the curObj dictionary
  213. curObj = indirection.dereferenceDictKey(curObj)
  214. evalStr = ''
  215. for indirection in indirections:
  216. yield None
  217. indirection.release()
  218. if getInstance:
  219. lenDict = len('.__dict__')
  220. if evalStr[-lenDict:] == '.__dict__':
  221. evalStr = evalStr[:-lenDict]
  222. # TODO: check that this is still the object we originally pointed to
  223. yield self._getContainerByEval(evalStr, curObj=curObj)
  224. def getEvalStrGen(self, getInstance=False):
  225. str = ''
  226. prevIndirection = None
  227. curIndirection = None
  228. nextIndirection = None
  229. # make sure the indirections don't go away on us
  230. indirections = self._indirections
  231. for indirection in indirections:
  232. indirection.acquire()
  233. for i in xrange(len(indirections)):
  234. yield None
  235. if i > 0:
  236. prevIndirection = indirections[i-1]
  237. else:
  238. prevIndirection = None
  239. curIndirection = indirections[i]
  240. if i < len(indirections)-1:
  241. nextIndirection = indirections[i+1]
  242. else:
  243. nextIndirection = None
  244. str += curIndirection.getString(prevIndirection=prevIndirection,
  245. nextIndirection=nextIndirection)
  246. if getInstance:
  247. lenDict = len('.__dict__')
  248. if str[-lenDict:] == '.__dict__':
  249. str = str[:-lenDict]
  250. for indirection in indirections:
  251. yield None
  252. indirection.release()
  253. yield str
  254. def __repr__(self):
  255. for result in self.getEvalStrGen():
  256. pass
  257. return result
  258. class FindContainers(Job):
  259. """
  260. Explore the Python graph, looking for objects that support __len__()
  261. """
  262. def __init__(self, name, leakDetector):
  263. Job.__init__(self, name)
  264. self._leakDetector = leakDetector
  265. self._id2ref = self._leakDetector._id2ref
  266. # these hold objects that we should start traversals from often and not-as-often,
  267. # respectively
  268. self._id2baseStartRef = {}
  269. self._id2discoveredStartRef = {}
  270. # these are working copies so that our iterations aren't disturbed by changes to the
  271. # definitive ref sets
  272. self._baseStartRefWorkingList = ScratchPad(refGen=nullGen(),
  273. source=self._id2baseStartRef)
  274. self._discoveredStartRefWorkingList = ScratchPad(refGen=nullGen(),
  275. source=self._id2discoveredStartRef)
  276. self.notify = self._leakDetector.notify
  277. ContainerLeakDetector.addPrivateObj(self.__dict__)
  278. # set up the base containers, the ones that hold most objects
  279. ref = ObjectRef(Indirection(evalStr='__builtin__.__dict__'), id(__builtin__.__dict__))
  280. self._id2baseStartRef[id(__builtin__.__dict__)] = ref
  281. # container for objects that want to make sure they are found by
  282. # the object exploration algorithm, including objects that exist
  283. # just to measure things such as C++ memory usage, scene graph size,
  284. # framerate, etc. See LeakDetectors.py
  285. if not hasattr(__builtin__, "leakDetectors"):
  286. __builtin__.leakDetectors = {}
  287. ref = ObjectRef(Indirection(evalStr='leakDetectors'), id(leakDetectors))
  288. self._id2baseStartRef[id(leakDetectors)] = ref
  289. for i in self._addContainerGen(__builtin__.__dict__, ref):
  290. pass
  291. try:
  292. base
  293. except:
  294. pass
  295. else:
  296. ref = ObjectRef(Indirection(evalStr='base.__dict__'), id(base.__dict__))
  297. self._id2baseStartRef[id(base.__dict__)] = ref
  298. for i in self._addContainerGen(base.__dict__, ref):
  299. pass
  300. try:
  301. simbase
  302. except:
  303. pass
  304. else:
  305. ref = ObjectRef(Indirection(evalStr='simbase.__dict__'), id(simbase.__dict__))
  306. self._id2baseStartRef[id(simbase.__dict__)] = ref
  307. for i in self._addContainerGen(simbase.__dict__, ref):
  308. pass
  309. def destroy(self):
  310. ContainerLeakDetector.removePrivateObj(self.__dict__)
  311. Job.destroy(self)
  312. def getPriority(self):
  313. return Job.Priorities.Low
  314. @staticmethod
  315. def getStartObjAffinity(startObj):
  316. # how good of a starting object is this object for traversing the object graph?
  317. try:
  318. return len(startObj)
  319. except:
  320. return 1
  321. def _isDeadEnd(self, obj, objName=None):
  322. if type(obj) in (types.BooleanType, types.BuiltinFunctionType,
  323. types.BuiltinMethodType, types.ComplexType,
  324. types.FloatType, types.IntType, types.LongType,
  325. types.NoneType, types.NotImplementedType,
  326. types.TypeType, types.CodeType, types.FunctionType,
  327. types.StringType, types.UnicodeType,
  328. types.TupleType):
  329. return True
  330. # if it's an internal object, ignore it
  331. if id(obj) in ContainerLeakDetector.PrivateIds:
  332. return True
  333. # prevent crashes in objects that define __cmp__ and don't handle strings
  334. if type(objName) == types.StringType and objName in ('im_self', 'im_class'):
  335. return True
  336. try:
  337. className = obj.__class__.__name__
  338. except:
  339. pass
  340. else:
  341. # prevent infinite recursion in built-in containers related to methods
  342. if className == 'method-wrapper':
  343. return True
  344. return False
  345. def _hasLength(self, obj):
  346. return hasattr(obj, '__len__')
  347. def _addContainerGen(self, cont, objRef):
  348. contId = id(cont)
  349. # if this container is new, or the objRef repr is shorter than what we already have,
  350. # put it in the table
  351. if contId in self._id2ref:
  352. for existingRepr in self._id2ref[contId].getEvalStrGen():
  353. yield None
  354. for newRepr in objRef.getEvalStrGen():
  355. yield None
  356. if contId not in self._id2ref or len(newRepr) < len(existingRepr):
  357. if contId in self._id2ref:
  358. self._leakDetector.removeContainerById(contId)
  359. self._id2ref[contId] = objRef
  360. def _addDiscoveredStartRef(self, obj, ref):
  361. # we've discovered an object that can be used to start an object graph traversal
  362. objId = id(obj)
  363. if objId in self._id2discoveredStartRef:
  364. existingRef = self._id2discoveredStartRef[objId]
  365. if type(existingRef) not in (types.IntType, types.LongType):
  366. if (existingRef.getNumIndirections() >=
  367. ref.getNumIndirections()):
  368. # the ref that we already have is more concise than the new ref
  369. return
  370. if objId in self._id2ref:
  371. if (self._id2ref[objId].getNumIndirections() >=
  372. ref.getNumIndirections()):
  373. # the ref that we already have is more concise than the new ref
  374. return
  375. storedItem = ref
  376. # if we already are storing a reference to this object, don't store a second reference
  377. if objId in self._id2ref:
  378. storedItem = objId
  379. self._id2discoveredStartRef[objId] = storedItem
  380. def run(self):
  381. try:
  382. # this yields a different set of start refs every time we start a new traversal
  383. # force creation of a new workingListSelector inside the while loop right off the bat
  384. workingListSelector = nullGen()
  385. # this holds the current step of the current traversal
  386. curObjRef = None
  387. while True:
  388. # yield up here instead of at the end, since we skip back to the
  389. # top of the while loop from various points
  390. yield None
  391. #import pdb;pdb.set_trace()
  392. if curObjRef is None:
  393. # choose an object to start a traversal from
  394. try:
  395. startRefWorkingList = workingListSelector.next()
  396. except StopIteration:
  397. # do relative # of traversals on each set based on how many refs it contains
  398. baseLen = len(self._baseStartRefWorkingList.source)
  399. discLen = len(self._discoveredStartRefWorkingList.source)
  400. minLen = float(max(1, min(baseLen, discLen)))
  401. # this will cut down the traversals of the larger set by 2/3
  402. minLen *= 3.
  403. workingListSelector = flywheel([self._baseStartRefWorkingList, self._discoveredStartRefWorkingList],
  404. [baseLen/minLen, discLen/minLen])
  405. yield None
  406. continue
  407. # grab the next start ref from this sequence and see if it's still valid
  408. while True:
  409. yield None
  410. try:
  411. curObjRef = startRefWorkingList.refGen.next()
  412. break
  413. except StopIteration:
  414. # we've run out of refs, grab a new set
  415. if len(startRefWorkingList.source) == 0:
  416. # ref set is empty, choose another
  417. break
  418. # make a generator that yields containers a # of times that is
  419. # proportional to their length
  420. for fw in makeFlywheelGen(
  421. startRefWorkingList.source.values(),
  422. countFunc=lambda x: self.getStartObjAffinity(x),
  423. scale=.05):
  424. yield None
  425. startRefWorkingList.refGen = fw
  426. if curObjRef is None:
  427. # this ref set is empty, choose another
  428. # the base set should never be empty (__builtin__ etc.)
  429. continue
  430. # do we need to go look up the object in _id2ref? sometimes we do that
  431. # to avoid storing multiple redundant refs to a single item
  432. if type(curObjRef) in (types.IntType, types.LongType):
  433. startId = curObjRef
  434. curObjRef = None
  435. try:
  436. for containerRef in self._leakDetector.getContainerByIdGen(startId):
  437. yield None
  438. except:
  439. # ref is invalid
  440. self.notify.debug('invalid startRef, stored as id %s' % startId)
  441. self._leakDetector.removeContainerById(startId)
  442. continue
  443. curObjRef = containerRef
  444. try:
  445. for curObj in curObjRef.getContainerGen():
  446. yield None
  447. except:
  448. self.notify.debug('lost current container, ref.getContainerGen() failed')
  449. # that container is gone, try again
  450. curObjRef = None
  451. continue
  452. self.notify.debug('--> %s' % curObjRef)
  453. #import pdb;pdb.set_trace()
  454. # store a copy of the current objRef
  455. parentObjRef = curObjRef
  456. # if we hit a dead end, start over from another container
  457. curObjRef = None
  458. if hasattr(curObj, '__dict__'):
  459. child = curObj.__dict__
  460. hasLength = self._hasLength(child)
  461. notDeadEnd = not self._isDeadEnd(child)
  462. if hasLength or notDeadEnd:
  463. # prevent cycles in the references (i.e. base.loader.base)
  464. if not parentObjRef.goesThrough(child):
  465. objRef = ObjectRef(Indirection(evalStr='.__dict__'),
  466. id(child), parentObjRef)
  467. yield None
  468. if hasLength:
  469. for i in self._addContainerGen(child, objRef):
  470. yield None
  471. if notDeadEnd:
  472. self._addDiscoveredStartRef(child, objRef)
  473. curObjRef = objRef
  474. continue
  475. if type(curObj) is types.DictType:
  476. key = None
  477. attr = None
  478. keys = curObj.keys()
  479. # we will continue traversing the object graph via one key of the dict,
  480. # choose it at random without taking a big chunk of CPU time
  481. numKeysLeft = len(keys) + 1
  482. for key in keys:
  483. yield None
  484. numKeysLeft -= 1
  485. try:
  486. attr = curObj[key]
  487. except KeyError, e:
  488. # this is OK because we are yielding during the iteration
  489. self.notify.debug('could not index into %s with key %s' % (
  490. parentObjRef, safeRepr(key)))
  491. continue
  492. hasLength = self._hasLength(attr)
  493. notDeadEnd = False
  494. # if we haven't picked the next ref, check if this one is a candidate
  495. if curObjRef is None:
  496. notDeadEnd = not self._isDeadEnd(attr, key)
  497. if hasLength or notDeadEnd:
  498. # prevent cycles in the references (i.e. base.loader.base)
  499. if not parentObjRef.goesThrough(curObj[key]):
  500. if curObj is __builtin__.__dict__:
  501. objRef = ObjectRef(Indirection(evalStr='%s' % key),
  502. id(curObj[key]))
  503. else:
  504. objRef = ObjectRef(Indirection(dictKey=key),
  505. id(curObj[key]), parentObjRef)
  506. yield None
  507. if hasLength:
  508. for i in self._addContainerGen(attr, objRef):
  509. yield None
  510. if notDeadEnd:
  511. self._addDiscoveredStartRef(attr, objRef)
  512. if curObjRef is None and random.randrange(numKeysLeft) == 0:
  513. curObjRef = objRef
  514. del key
  515. del attr
  516. continue
  517. try:
  518. childNames = dir(curObj)
  519. except:
  520. pass
  521. else:
  522. try:
  523. index = -1
  524. attrs = []
  525. while 1:
  526. yield None
  527. try:
  528. attr = itr.next()
  529. except:
  530. # some custom classes don't do well when iterated
  531. attr = None
  532. break
  533. attrs.append(attr)
  534. # we will continue traversing the object graph via one attr,
  535. # choose it at random without taking a big chunk of CPU time
  536. numAttrsLeft = len(attrs) + 1
  537. for attr in attrs:
  538. yield None
  539. index += 1
  540. numAttrsLeft -= 1
  541. hasLength = self._hasLength(attr)
  542. notDeadEnd = False
  543. if curObjRef is None:
  544. notDeadEnd = not self._isDeadEnd(attr)
  545. if hasLength or notDeadEnd:
  546. # prevent cycles in the references (i.e. base.loader.base)
  547. if not parentObjRef.goesThrough(curObj[index]):
  548. objRef = ObjectRef(Indirection(evalStr='[%s]' % index),
  549. id(curObj[index]), parentObjRef)
  550. yield None
  551. if hasLength:
  552. for i in self._addContainerGen(attr, objRef):
  553. yield None
  554. if notDeadEnd:
  555. self._addDiscoveredStartRef(attr, objRef)
  556. if curObjRef is None and random.randrange(numAttrsLeft) == 0:
  557. curObjRef = objRef
  558. del attr
  559. except StopIteration, e:
  560. pass
  561. del itr
  562. continue
  563. except Exception, e:
  564. print 'FindContainers job caught exception: %s' % e
  565. if __dev__:
  566. raise
  567. yield Job.Done
  568. class CheckContainers(Job):
  569. """
  570. Job to check container sizes and find potential leaks; sub-job of ContainerLeakDetector
  571. """
  572. ReprItems = 5
  573. def __init__(self, name, leakDetector, index):
  574. Job.__init__(self, name)
  575. self._leakDetector = leakDetector
  576. self.notify = self._leakDetector.notify
  577. self._index = index
  578. ContainerLeakDetector.addPrivateObj(self.__dict__)
  579. def destroy(self):
  580. ContainerLeakDetector.removePrivateObj(self.__dict__)
  581. Job.destroy(self)
  582. def getPriority(self):
  583. return Job.Priorities.Normal
  584. def run(self):
  585. try:
  586. self._leakDetector._index2containerId2len[self._index] = {}
  587. ids = self._leakDetector.getContainerIds()
  588. # record the current len of each container
  589. for objId in ids:
  590. yield None
  591. try:
  592. for result in self._leakDetector.getContainerByIdGen(objId):
  593. yield None
  594. container = result
  595. except Exception, e:
  596. # this container no longer exists
  597. if self.notify.getDebug():
  598. for contName in self._leakDetector.getContainerNameByIdGen(objId):
  599. yield None
  600. self.notify.debug(
  601. '%s no longer exists; caught exception in getContainerById (%s)' % (
  602. contName, e))
  603. self._leakDetector.removeContainerById(objId)
  604. continue
  605. if container is None:
  606. # this container no longer exists
  607. if self.notify.getDebug():
  608. for contName in self._leakDetector.getContainerNameByIdGen(objId):
  609. yield None
  610. self.notify.debug('%s no longer exists; getContainerById returned None' %
  611. contName)
  612. self._leakDetector.removeContainerById(objId)
  613. continue
  614. try:
  615. cLen = len(container)
  616. except Exception, e:
  617. # this container no longer exists
  618. if self.notify.getDebug():
  619. for contName in self._leakDetector.getContainerNameByIdGen(objId):
  620. yield None
  621. self.notify.debug(
  622. '%s is no longer a container, it is now %s (%s)' %
  623. (contName, safeRepr(container), e))
  624. self._leakDetector.removeContainerById(objId)
  625. continue
  626. self._leakDetector._index2containerId2len[self._index][objId] = cLen
  627. # compare the current len of each container to past lens
  628. if self._index > 0:
  629. idx2id2len = self._leakDetector._index2containerId2len
  630. for objId in idx2id2len[self._index]:
  631. yield None
  632. if objId in idx2id2len[self._index-1]:
  633. diff = idx2id2len[self._index][objId] - idx2id2len[self._index-1][objId]
  634. """
  635. # this check is too spammy
  636. if diff > 20:
  637. if diff > idx2id2len[self._index-1][objId]:
  638. minutes = (self._leakDetector._index2delay[self._index] -
  639. self._leakDetector._index2delay[self._index-1]) / 60.
  640. name = self._leakDetector.getContainerNameById(objId)
  641. if idx2id2len[self._index-1][objId] != 0:
  642. percent = 100. * (float(diff) / float(idx2id2len[self._index-1][objId]))
  643. try:
  644. for container in self._leakDetector.getContainerByIdGen(objId):
  645. yield None
  646. except:
  647. # TODO
  648. self.notify.debug('caught exception in getContainerByIdGen (1)')
  649. else:
  650. self.notify.warning(
  651. '%s (%s) grew %.2f%% in %.2f minutes (%s items at last measurement, current contents: %s)' % (
  652. name, itype(container), percent, minutes, idx2id2len[self._index][objId],
  653. fastRepr(container, maxLen=CheckContainers.ReprItems)))
  654. yield None
  655. """
  656. if (self._index > 2 and
  657. objId in idx2id2len[self._index-2] and
  658. objId in idx2id2len[self._index-3]):
  659. diff2 = idx2id2len[self._index-1][objId] - idx2id2len[self._index-2][objId]
  660. diff3 = idx2id2len[self._index-2][objId] - idx2id2len[self._index-3][objId]
  661. if self._index <= 4:
  662. if diff > 0 and diff2 > 0 and diff3 > 0:
  663. name = self._leakDetector.getContainerNameById(objId)
  664. try:
  665. for container in self._leakDetector.getContainerByIdGen(objId):
  666. yield None
  667. except:
  668. # TODO
  669. self.notify.debug('caught exception in getContainerByIdGen (2)')
  670. else:
  671. msg = ('%s (%s) consistently increased in size over the last '
  672. '3 periods (%s items at last measurement, current contents: %s)' %
  673. (name, itype(container), idx2id2len[self._index][objId],
  674. fastRepr(container, maxLen=CheckContainers.ReprItems)))
  675. self.notify.warning(msg)
  676. yield None
  677. elif (objId in idx2id2len[self._index-4] and
  678. objId in idx2id2len[self._index-5]):
  679. # if size has consistently increased over the last 5 checks,
  680. # send out a warning
  681. diff4 = idx2id2len[self._index-3][objId] - idx2id2len[self._index-4][objId]
  682. diff5 = idx2id2len[self._index-4][objId] - idx2id2len[self._index-5][objId]
  683. if diff > 0 and diff2 > 0 and diff3 > 0 and diff4 > 0 and diff5 > 0:
  684. name = self._leakDetector.getContainerNameById(objId)
  685. try:
  686. for container in self._leakDetector.getContainerByIdGen(objId):
  687. yield None
  688. except:
  689. # TODO
  690. self.notify.debug('caught exception in getContainerByIdGen (3)')
  691. else:
  692. msg = ('leak detected: %s (%s) consistently increased in size over the last '
  693. '5 periods (%s items at last measurement, current contents: %s)' %
  694. (name, itype(container), idx2id2len[self._index][objId],
  695. fastRepr(container, maxLen=CheckContainers.ReprItems)))
  696. self.notify.warning(msg)
  697. yield None
  698. messenger.send(self._leakDetector.getLeakEvent(), [container, name])
  699. if config.GetBool('pdb-on-leak-detect', 0):
  700. import pdb;pdb.set_trace()
  701. pass
  702. except Exception, e:
  703. print 'CheckContainers job caught exception: %s' % e
  704. if __dev__:
  705. raise
  706. yield Job.Done
  707. class FPTObjsOfType(Job):
  708. def __init__(self, name, leakDetector, otn, doneCallback=None):
  709. Job.__init__(self, name)
  710. self._leakDetector = leakDetector
  711. self.notify = self._leakDetector.notify
  712. self._otn = otn
  713. self._doneCallback = doneCallback
  714. self._ldde = self._leakDetector._getDestroyEvent()
  715. self.accept(self._ldde, self._handleLDDestroy)
  716. ContainerLeakDetector.addPrivateObj(self.__dict__)
  717. def destroy(self):
  718. self.ignore(self._ldde)
  719. self._leakDetector = None
  720. self._doneCallback = None
  721. ContainerLeakDetector.removePrivateObj(self.__dict__)
  722. Job.destroy(self)
  723. def _handleLDDestroy(self):
  724. self.destroy()
  725. def getPriority(self):
  726. return Job.Priorities.High
  727. def run(self):
  728. ids = self._leakDetector.getContainerIds()
  729. try:
  730. for id in ids:
  731. getInstance = (self._otn.lower() not in 'dict')
  732. yield None
  733. try:
  734. for container in self._leakDetector.getContainerByIdGen(
  735. id, getInstance=getInstance):
  736. yield None
  737. except:
  738. pass
  739. else:
  740. if hasattr(container, '__class__'):
  741. cName = container.__class__.__name__
  742. else:
  743. cName = container.__name__
  744. if (self._otn.lower() in cName.lower()):
  745. try:
  746. for ptc in self._leakDetector.getContainerNameByIdGen(
  747. id, getInstance=getInstance):
  748. yield None
  749. except:
  750. pass
  751. else:
  752. print 'GPTC(' + self._otn + '):' + self.getJobName() + ': ' + ptc
  753. except Exception, e:
  754. print 'FPTObjsOfType job caught exception: %s' % e
  755. if __dev__:
  756. raise
  757. yield Job.Done
  758. def finished(self):
  759. if self._doneCallback:
  760. self._doneCallback(self)
  761. class PruneObjectRefs(Job):
  762. """
  763. Job to destroy any container refs that are no longer valid.
  764. Checks validity by asking for each container
  765. """
  766. def __init__(self, name, leakDetector):
  767. Job.__init__(self, name)
  768. self._leakDetector = leakDetector
  769. self.notify = self._leakDetector.notify
  770. ContainerLeakDetector.addPrivateObj(self.__dict__)
  771. def destroy(self):
  772. ContainerLeakDetector.removePrivateObj(self.__dict__)
  773. Job.destroy(self)
  774. def getPriority(self):
  775. return Job.Priorities.Normal
  776. def run(self):
  777. try:
  778. ids = self._leakDetector.getContainerIds()
  779. for id in ids:
  780. yield None
  781. try:
  782. for container in self._leakDetector.getContainerByIdGen(id):
  783. yield None
  784. except:
  785. # reference is invalid, remove it
  786. self._leakDetector.removeContainerById(id)
  787. _id2baseStartRef = self._leakDetector._findContainersJob._id2baseStartRef
  788. ids = _id2baseStartRef.keys()
  789. for id in ids:
  790. yield None
  791. try:
  792. for container in _id2baseStartRef[id].getContainerGen():
  793. yield None
  794. except:
  795. # reference is invalid, remove it
  796. del _id2baseStartRef[id]
  797. _id2discoveredStartRef = self._leakDetector._findContainersJob._id2discoveredStartRef
  798. ids = _id2discoveredStartRef.keys()
  799. for id in ids:
  800. yield None
  801. try:
  802. for container in _id2discoveredStartRef[id].getContainerGen():
  803. yield None
  804. except:
  805. # reference is invalid, remove it
  806. del _id2discoveredStartRef[id]
  807. except Exception, e:
  808. print 'PruneObjectRefs job caught exception: %s' % e
  809. if __dev__:
  810. raise
  811. yield Job.Done
  812. class ContainerLeakDetector(Job):
  813. """
  814. Low-priority Python object-graph walker that looks for leaking containers.
  815. To reduce memory usage, this does a random walk of the Python objects to
  816. discover containers rather than keep a set of all visited objects; it may
  817. visit the same object many times but eventually it will discover every object.
  818. Checks container sizes at ever-increasing intervals.
  819. """
  820. notify = directNotify.newCategory("ContainerLeakDetector")
  821. # set of containers that should not be examined
  822. PrivateIds = set()
  823. def __init__(self, name, firstCheckDelay = None):
  824. Job.__init__(self, name)
  825. self._serialNum = serialNum()
  826. self._findContainersJob = None
  827. self._checkContainersJob = None
  828. self._pruneContainersJob = None
  829. if firstCheckDelay is None:
  830. firstCheckDelay = 60. * 15.
  831. # divide by two, since the first check just takes length measurements and
  832. # doesn't check for leaks
  833. self._nextCheckDelay = firstCheckDelay/2.
  834. self._checkDelayScale = config.GetFloat('leak-detector-check-delay-scale', 1.5)
  835. self._pruneTaskPeriod = config.GetFloat('leak-detector-prune-period', 60. * 30.)
  836. # main dict of id(container)->containerRef
  837. self._id2ref = {}
  838. # storage for results of check-container job
  839. self._index2containerId2len = {}
  840. self._index2delay = {}
  841. if config.GetBool('leak-container', 0):
  842. _createContainerLeak()
  843. if config.GetBool('leak-tasks', 0):
  844. _createTaskLeak()
  845. # don't check our own tables for leaks
  846. ContainerLeakDetector.addPrivateObj(ContainerLeakDetector.PrivateIds)
  847. ContainerLeakDetector.addPrivateObj(self.__dict__)
  848. self.setPriority(Job.Priorities.Min)
  849. jobMgr.add(self)
  850. def destroy(self):
  851. messenger.send(self._getDestroyEvent())
  852. self.ignoreAll()
  853. if self._pruneContainersJob is not None:
  854. jobMgr.remove(self._pruneContainersJob)
  855. self._pruneContainersJob = None
  856. if self._checkContainersJob is not None:
  857. jobMgr.remove(self._checkContainersJob)
  858. self._checkContainersJob = None
  859. jobMgr.remove(self._findContainersJob)
  860. self._findContainersJob = None
  861. del self._id2ref
  862. del self._index2containerId2len
  863. del self._index2delay
  864. def _getDestroyEvent(self):
  865. # sent when leak detector is about to be destroyed
  866. return 'cldDestroy-%s' % self._serialNum
  867. def getLeakEvent(self):
  868. # sent when a leak is detected
  869. # passes description string as argument
  870. return 'containerLeakDetected-%s' % self._serialNum
  871. @classmethod
  872. def addPrivateObj(cls, obj):
  873. cls.PrivateIds.add(id(obj))
  874. @classmethod
  875. def removePrivateObj(cls, obj):
  876. cls.PrivateIds.remove(id(obj))
  877. def _getCheckTaskName(self):
  878. return 'checkForLeakingContainers-%s' % self._serialNum
  879. def _getPruneTaskName(self):
  880. return 'pruneLeakingContainerRefs-%s' % self._serialNum
  881. def getContainerIds(self):
  882. return self._id2ref.keys()
  883. def getContainerByIdGen(self, id, **kwArgs):
  884. # return a generator to look up a container
  885. return self._id2ref[id].getContainerGen(**kwArgs)
  886. def getContainerById(self, id):
  887. for result in self._id2ref[id].getContainerGen():
  888. pass
  889. return result
  890. def getContainerNameByIdGen(self, id, **kwArgs):
  891. return self._id2ref[id].getEvalStrGen(**kwArgs)
  892. def getContainerNameById(self, id):
  893. if id in self._id2ref:
  894. return repr(self._id2ref[id])
  895. return '<unknown container>'
  896. def removeContainerById(self, id):
  897. if id in self._id2ref:
  898. self._id2ref[id].destroy()
  899. del self._id2ref[id]
  900. def run(self):
  901. # start looking for containers
  902. self._findContainersJob = FindContainers(
  903. '%s-findContainers' % self.getJobName(), self)
  904. jobMgr.add(self._findContainersJob)
  905. self._scheduleNextLeakCheck()
  906. self._scheduleNextPruning()
  907. while True:
  908. yield Job.Sleep
  909. def getPathsToContainers(self, name, ot, doneCallback=None):
  910. j = FPTObjsOfType(name, self, ot, doneCallback)
  911. jobMgr.add(j)
  912. return j
  913. def _scheduleNextLeakCheck(self):
  914. taskMgr.doMethodLater(self._nextCheckDelay, self._checkForLeaks,
  915. self._getCheckTaskName())
  916. # delay between checks
  917. # fib: 1 1 2 3 5 8 13 21 34 55 89
  918. # * 2.: 1 2 4 8 16 32 64 128 256 512 1024
  919. # * 1.5: 1 1.5 2.3 3.4 5.1 7.6 11.4 17.1 25.6 38.4 57.7
  920. #
  921. # delay from job start
  922. # fib: 1 2 4 7 12 20 33 54 88 143 232
  923. # * 2.: 1 3 7 15 31 63 127 255 511 1023 2047
  924. # * 1.5: 1 2.5 4.75 8.1 13.2 20.8 32.2 49.3 74.9 113.3 171
  925. self._nextCheckDelay = self._nextCheckDelay * self._checkDelayScale
  926. def _checkForLeaks(self, task=None):
  927. self._index2delay[len(self._index2containerId2len)] = self._nextCheckDelay
  928. self._checkContainersJob = CheckContainers(
  929. '%s-checkForLeaks' % self.getJobName(), self, len(self._index2containerId2len))
  930. self.acceptOnce(self._checkContainersJob.getFinishedEvent(),
  931. self._scheduleNextLeakCheck)
  932. jobMgr.add(self._checkContainersJob)
  933. return task.done
  934. def _scheduleNextPruning(self):
  935. taskMgr.doMethodLater(self._pruneTaskPeriod, self._pruneObjectRefs,
  936. self._getPruneTaskName())
  937. def _pruneObjectRefs(self, task=None):
  938. self._pruneContainersJob = PruneObjectRefs(
  939. '%s-pruneObjectRefs' % self.getJobName(), self)
  940. self.acceptOnce(self._pruneContainersJob.getFinishedEvent(),
  941. self._scheduleNextPruning)
  942. jobMgr.add(self._pruneContainersJob)
  943. return task.done