ContainerLeakDetector.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453
  1. from direct.directnotify.DirectNotifyGlobal import directNotify
  2. from direct.showbase.PythonUtil import Queue, invertDictLossless
  3. from direct.showbase.PythonUtil import itype, serialNum, safeRepr
  4. from direct.showbase.Job import Job
  5. import types, weakref, random, __builtin__
  6. class CheckContainers(Job):
  7. """
  8. Job to check container sizes and find potential leaks; sub-job of ContainerLeakDetector
  9. """
  10. def __init__(self, name, leakDetector, index):
  11. Job.__init__(self, name)
  12. self._leakDetector = leakDetector
  13. self.notify = self._leakDetector.notify
  14. self._index = index
  15. def getPriority(self):
  16. return Job.Priorities.Normal
  17. def run(self):
  18. self._leakDetector._index2containerName2len[self._index] = {}
  19. self._leakDetector.notify.debug(repr(self._leakDetector._id2ref))
  20. ids = self._leakDetector.getContainerIds()
  21. # record the current len of each container
  22. for id in ids:
  23. yield None
  24. try:
  25. container = self._leakDetector.getContainerById(id)
  26. except Exception, e:
  27. # this container no longer exists
  28. self.notify.debug('container %s no longer exists; caught exception in getContainerById (%s)' % (name, e))
  29. self._leakDetector.removeContainerById(id)
  30. continue
  31. if container is None:
  32. # this container no longer exists
  33. self.notify.debug('container %s no longer exists; getContainerById returned None (%s)' % (name, e))
  34. self._leakDetector.removeContainerById(id)
  35. continue
  36. cLen = len(container)
  37. name = self._leakDetector.getContainerNameById(id)
  38. self._leakDetector._index2containerName2len[self._index][name] = cLen
  39. # compare the current len of each container to past lens
  40. if self._index > 0:
  41. idx2name2len = self._leakDetector._index2containerName2len
  42. for name in idx2name2len[self._index]:
  43. yield None
  44. if name in idx2name2len[self._index-1]:
  45. diff = idx2name2len[self._index][name] - idx2name2len[self._index-1][name]
  46. if diff > 0:
  47. if diff > idx2name2len[self._index-1][name]:
  48. minutes = (self._leakDetector._index2delay[self._index] -
  49. self._leakDetector._index2delay[self._index-1]) / 60.
  50. self.notify.warning('container %s grew > 200%% in %s minutes' % (name, minutes))
  51. if self._index > 3:
  52. diff2 = idx2name2len[self._index-1][name] - idx2name2len[self._index-2][name]
  53. diff3 = idx2name2len[self._index-2][name] - idx2name2len[self._index-3][name]
  54. if self._index <= 5:
  55. msg = ('%s consistently increased in length over the last 3 periods (currently %s items)' %
  56. (name, idx2name2len[self._index][name]))
  57. self.notify.warning(msg)
  58. else:
  59. # if size has consistently increased over the last 5 checks, send out a warning
  60. diff4 = idx2name2len[self._index-3][name] - idx2name2len[self._index-4][name]
  61. diff5 = idx2name2len[self._index-4][name] - idx2name2len[self._index-5][name]
  62. if diff > 0 and diff2 > 0 and diff3 > 0 and diff4 > 0 and diff5 > 0:
  63. msg = ('%s consistently increased in length over the last 5 periods (currently %s items), notifying system' %
  64. (name, idx2name2len[self._index][name]))
  65. self.notify.warning(msg)
  66. messenger.send(self._leakDetector.getLeakEvent(), [msg])
  67. yield Job.Done
  68. class NoDictKey:
  69. pass
  70. class Indirection:
  71. # represents the indirection that brings you from a container to an element of the container
  72. # stored as a string to be used as part of an eval
  73. # each dictionary dereference is individually eval'd since the dict key might have been garbage-collected
  74. class GarbageCollectedDictKey(Exception):
  75. pass
  76. def __init__(self, evalStr=None, dictKey=NoDictKey):
  77. # if this is a dictionary lookup, pass dictKey instead of evalStr
  78. self.evalStr = evalStr
  79. self.dictKey = NoDictKey
  80. if dictKey is not NoDictKey:
  81. # if we can repr/eval the key, store it as an evalStr
  82. keyRepr = repr(dictKey)
  83. useEval = False
  84. try:
  85. keyEval = eval(keyRepr)
  86. useEval = True
  87. except:
  88. pass
  89. if useEval:
  90. # check to make sure the eval succeeded
  91. if hash(keyEval) != hash(dictKey):
  92. useEval = False
  93. if useEval:
  94. # eval/repr succeeded, store as an evalStr
  95. self.evalStr = '[%s]' % keyRepr
  96. else:
  97. # store a weakref to the key
  98. self.dictKey = weakref.ref(dictKey)
  99. def isDictKey(self):
  100. return self.dictKey is not NoDictKey
  101. def dereferenceDictKey(self, parentDict):
  102. key = self.dictKey()
  103. if key is None:
  104. raise Indirection.GarbageCollectedDictKey()
  105. return parentDict[key]
  106. def getString(self, nextIndirection=None):
  107. # return our contribution to the name of the object
  108. if self.evalStr is not None:
  109. # if we're an instance dict and the next indirection is not a dict key,
  110. # skip over this one (obj.__dict__[keyName] == obj.keyName)
  111. if nextIndirection is not None and self.evalStr == '.__dict__':
  112. return ''
  113. return self.evalStr
  114. # we're stored as a dict key
  115. # this might not eval, but that's OK, we're not using this string to find
  116. # the object, we dereference the parent dict
  117. key = self.dictKey()
  118. if key is None:
  119. return '<garbage-collected dict key>'
  120. return safeRepr(key)
  121. def __repr__(self):
  122. return self.getString()
  123. class ContainerRef:
  124. """
  125. stores a reference to a container in a way that does not prevent garbage
  126. collection of the container
  127. stored as a series of 'indirections' (obj.foo -> '.foo', dict[key] -> '[key]', etc.)
  128. """
  129. class FailedEval(Exception):
  130. pass
  131. # whatever this is set to will be the default ContainerRef
  132. BaseRef = None
  133. def __init__(self, other=None, indirection=None):
  134. self._indirections = []
  135. # if no other passed in, try ContainerRef.BaseRef
  136. if other is None:
  137. other = ContainerRef.BaseRef
  138. if other is not None:
  139. for ind in other._indirections:
  140. self.addIndirection(ind)
  141. if indirection:
  142. self.addIndirection(indirection)
  143. def addIndirection(self, indirection):
  144. self._indirections.append(indirection)
  145. def _getContainerByEval(self, evalStr):
  146. try:
  147. container = eval(evalStr)
  148. except NameError, ne:
  149. return None
  150. return container
  151. def _evalWithObj(self, evalStr, curObj=None):
  152. # eval an evalStr, optionally based off of an existing object
  153. if curObj is not None:
  154. # eval('curObj.foo.bar.someDict')
  155. evalStr = 'curObj%s' % evalStr
  156. return self._getContainerByEval(evalStr)
  157. def getContainer(self):
  158. # try to get a handle on the container by eval'ing and looking things
  159. # up in dictionaries, depending on the type of each indirection
  160. #import pdb;pdb.set_trace()
  161. evalStr = ''
  162. curObj = None
  163. curIndirection = None
  164. nextIndirection = None
  165. for indirection in self._indirections:
  166. if not indirection.isDictKey():
  167. # build up a string to be eval'd
  168. evalStr += indirection.getString()
  169. else:
  170. curObj = self._evalWithObj(evalStr, curObj)
  171. if curObj is None:
  172. raise FailedEval(evalStr)
  173. # try to look up this key in the curObj dictionary
  174. curObj = indirection.dereferenceDictKey(curObj)
  175. evalStr = ''
  176. return self._evalWithObj(evalStr, curObj)
  177. def __repr__(self):
  178. str = ''
  179. curIndirection = None
  180. nextIndirection = None
  181. for i in xrange(len(self._indirections)):
  182. curIndirection = self._indirections[i]
  183. if i < len(self._indirections)-1:
  184. nextIndirection = self._indirections[i+1]
  185. else:
  186. nextIndirection = None
  187. str += curIndirection.getString(nextIndirection=nextIndirection)
  188. return str
  189. class ContainerLeakDetector(Job):
  190. """
  191. Low-priority Python object-graph walker that looks for leaking containers.
  192. To reduce memory usage, this does a random walk of the Python objects to
  193. discover containers rather than keep a set of all visited objects.
  194. Checks container sizes at ever-increasing intervals.
  195. """
  196. notify = directNotify.newCategory("ContainerLeakDetector")
  197. # set of containers that should not be examined
  198. PrivateIds = set()
  199. def __init__(self, name, firstCheckDelay = None):
  200. Job.__init__(self, name)
  201. self._serialNum = serialNum()
  202. self._priority = (Job.Priorities.Low + Job.Priorities.Normal) / 2
  203. self._checkContainersJob = None
  204. # run first check after one hour
  205. if firstCheckDelay is None:
  206. firstCheckDelay = 60. * 15.
  207. self._nextCheckDelay = firstCheckDelay
  208. self._index2containerName2len = {}
  209. self._index2delay = {}
  210. # set up our data structures
  211. self._id2ref = {}
  212. # set up the base/starting object
  213. self._nameContainer(__builtin__.__dict__, ContainerRef(indirection=Indirection(evalStr='__builtin__.__dict__')))
  214. try:
  215. base
  216. except:
  217. pass
  218. else:
  219. ContainerRef.BaseRef = ContainerRef(indirection=Indirection(evalStr='base.__dict__'))
  220. self._nameContainer(base.__dict__, ContainerRef.BaseRef)
  221. try:
  222. simbase
  223. except:
  224. pass
  225. else:
  226. ContainerRef.BaseRef = ContainerRef(indirection=Indirection(evalStr='simbase.__dict__'))
  227. self._nameContainer(simbase.__dict__, ContainerRef.BaseRef)
  228. self._curObjRef = ContainerRef()
  229. jobMgr.add(self)
  230. ContainerLeakDetector.PrivateIds.update(set([
  231. id(ContainerLeakDetector.PrivateIds),
  232. id(self._id2ref),
  233. ]))
  234. def destroy(self):
  235. if self._checkContainersJob is not None:
  236. jobMgr.remove(self._checkContainersJob)
  237. self._checkContainersJob = None
  238. del self._id2ref
  239. del self._index2containerName2len
  240. del self._index2delay
  241. def getPriority(self):
  242. return self._priority
  243. def getCheckTaskName(self):
  244. return 'checkForLeakingContainers-%s' % self._serialNum
  245. def getLeakEvent(self):
  246. # passes description string as argument
  247. return 'containerLeakDetected-%s' % self._serialNum
  248. def getContainerIds(self):
  249. return self._id2ref.keys()
  250. def getContainerById(self, id):
  251. return self._id2ref[id].getContainer()
  252. def getContainerNameById(self, id):
  253. return repr(self._id2ref[id])
  254. def removeContainerById(self, id):
  255. del self._id2ref[id]
  256. def run(self):
  257. taskMgr.doMethodLater(self._nextCheckDelay, self._checkForLeaks,
  258. self.getCheckTaskName())
  259. while True:
  260. # yield up here instead of at the end, since we skip back to the
  261. # top of the while loop from various points
  262. yield None
  263. #import pdb;pdb.set_trace()
  264. curObj = None
  265. if self._curObjRef is None:
  266. self._curObjRef = random.choice(self._id2ref.values())
  267. try:
  268. curObj = self._curObjRef.getContainer()
  269. except:
  270. self.notify.debug('lost current container: %s' % self._curObjRef)
  271. while len(self._id2ref):
  272. _id = random.choice(self._id2ref.keys())
  273. curObj = self.getContainerById(_id)
  274. if curObj is not None:
  275. break
  276. # container is no longer valid
  277. del self._id2ref[_id]
  278. self._curObjRef = self._id2ref[_id]
  279. #print '%s: %s, %s' % (id(curObj), type(curObj), self._id2ref[id(curObj)])
  280. self.notify.debug('--> %s' % self._curObjRef)
  281. # keep a copy of this obj's eval str, it might not be in _id2ref
  282. curObjRef = self._curObjRef
  283. # if we hit a dead end, start over at a container we know about
  284. self._curObjRef = None
  285. try:
  286. if curObj.__class__.__name__ == 'method-wrapper':
  287. continue
  288. except:
  289. pass
  290. if type(curObj) in (types.StringType, types.UnicodeType):
  291. continue
  292. if type(curObj) in (types.ModuleType, types.InstanceType):
  293. child = curObj.__dict__
  294. if not self._isDeadEnd(child):
  295. self._curObjRef = ContainerRef(curObjRef, indirection=Indirection(evalStr='.__dict__'))
  296. if self._isContainer(child):
  297. self._nameContainer(child, self._curObjRef)
  298. continue
  299. if type(curObj) is types.DictType:
  300. key = None
  301. attr = None
  302. keys = curObj.keys()
  303. # we will continue traversing the object graph via the last container
  304. # in the list; shuffle the list to randomize the traversal
  305. random.shuffle(keys)
  306. for key in keys:
  307. try:
  308. attr = curObj[key]
  309. except KeyError, e:
  310. self.notify.warning('could not index into %s with key %s' % (curObjRef, key))
  311. continue
  312. if not self._isDeadEnd(attr):
  313. if curObj is __builtin__.__dict__:
  314. indirection=Indirection(evalStr=key)
  315. else:
  316. indirection=Indirection(dictKey=key)
  317. self._curObjRef = ContainerRef(curObjRef, indirection=indirection)
  318. if self._isContainer(attr):
  319. self._nameContainer(attr, self._curObjRef)
  320. del key
  321. del attr
  322. continue
  323. if type(curObj) is not types.FileType:
  324. try:
  325. itr = iter(curObj)
  326. except:
  327. pass
  328. else:
  329. try:
  330. index = 0
  331. attrs = []
  332. while 1:
  333. try:
  334. attr = itr.next()
  335. except:
  336. # some custom classes don't do well when iterated
  337. attr = None
  338. break
  339. attrs.append(attr)
  340. # we will continue traversing the object graph via the last container
  341. # in the list; shuffle the list to randomize the traversal
  342. random.shuffle(attrs)
  343. for attr in attrs:
  344. if not self._isDeadEnd(attr):
  345. self._curObjRef = ContainerRef(curObjRef, indirection=Indirection(evalStr='[%s]' % index))
  346. if self._isContainer(attr):
  347. self._nameContainer(attr, self._curObjRef)
  348. index += 1
  349. del attr
  350. except StopIteration, e:
  351. pass
  352. del itr
  353. continue
  354. try:
  355. childNames = dir(curObj)
  356. except:
  357. pass
  358. else:
  359. childName = None
  360. child = None
  361. # we will continue traversing the object graph via the last container
  362. # in the list; shuffle the list to randomize the traversal
  363. random.shuffle(childNames)
  364. for childName in childNames:
  365. child = getattr(curObj, childName)
  366. if not self._isDeadEnd(child):
  367. self._curObjRef = ContainerRef(curObjRef, indirection=Indirection(evalStr='.%s' % childName))
  368. if self._isContainer(child):
  369. self._nameContainer(child, self._curObjRef)
  370. del childName
  371. del child
  372. continue
  373. yield Job.Done
  374. def _isDeadEnd(self, obj):
  375. if type(obj) in (types.BooleanType, types.BuiltinFunctionType,
  376. types.BuiltinMethodType, types.ComplexType,
  377. types.FloatType, types.IntType, types.LongType,
  378. types.NoneType, types.NotImplementedType,
  379. types.TypeType, types.CodeType, types.FunctionType):
  380. return True
  381. # if it's an internal object, ignore it
  382. if id(obj) in ContainerLeakDetector.PrivateIds:
  383. return True
  384. if id(obj) in self._id2ref:
  385. return True
  386. return False
  387. def _isContainer(self, obj):
  388. try:
  389. len(obj)
  390. except:
  391. return False
  392. return True
  393. def _nameContainer(self, cont, objRef):
  394. if self.notify.getDebug():
  395. self.notify.debug('_nameContainer: %s' % objRef)
  396. #printStack()
  397. contId = id(cont)
  398. # if this container is new, or the objRef repr is shorter than what we already have,
  399. # put it in the table
  400. if contId not in self._id2ref or len(repr(objRef)) < len(repr(self._id2ref[contId])):
  401. self._id2ref[contId] = objRef
  402. def _checkForLeaks(self, task=None):
  403. self._index2delay[len(self._index2containerName2len)] = self._nextCheckDelay
  404. self._checkContainersJob = CheckContainers(
  405. '%s-checkForLeaks' % self.getJobName(), self, len(self._index2containerName2len))
  406. jobMgr.add(self._checkContainersJob)
  407. self._nextCheckDelay *= 2
  408. taskMgr.doMethodLater(self._nextCheckDelay, self._checkForLeaks,
  409. self.getCheckTaskName())