ClientRepositoryBase.py 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608
  1. from panda3d.core import *
  2. from panda3d.direct import *
  3. from .MsgTypes import *
  4. from direct.task import Task
  5. from direct.directnotify import DirectNotifyGlobal
  6. from . import CRCache
  7. from direct.distributed.CRDataCache import CRDataCache
  8. from direct.distributed.ConnectionRepository import ConnectionRepository
  9. from direct.showbase import PythonUtil
  10. from . import ParentMgr
  11. from . import RelatedObjectMgr
  12. import time
  13. from .ClockDelta import *
  14. class ClientRepositoryBase(ConnectionRepository):
  15. """
  16. This maintains a client-side connection with a Panda server.
  17. This base class exists to collect the common code between
  18. ClientRepository, which is the CMU-provided, open-source version
  19. of the client repository code, and OTPClientRepository, which is
  20. the VR Studio's implementation of the same.
  21. """
  22. notify = DirectNotifyGlobal.directNotify.newCategory("ClientRepositoryBase")
  23. def __init__(self, dcFileNames = None, dcSuffix = '',
  24. connectMethod = None, threadedNet = None):
  25. if connectMethod is None:
  26. connectMethod = self.CM_HTTP
  27. ConnectionRepository.__init__(self, connectMethod, base.config, hasOwnerView = True, threadedNet = threadedNet)
  28. self.dcSuffix = dcSuffix
  29. if hasattr(self, 'setVerbose'):
  30. if self.config.GetBool('verbose-clientrepository'):
  31. self.setVerbose(1)
  32. self.context=100000
  33. self.setClientDatagram(1)
  34. self.deferredGenerates = []
  35. self.deferredDoIds = {}
  36. self.lastGenerate = 0
  37. self.setDeferInterval(base.config.GetDouble('deferred-generate-interval', 0.2))
  38. self.noDefer = False # Set this True to temporarily disable deferring.
  39. self.recorder = base.recorder
  40. self.readDCFile(dcFileNames)
  41. self.cache=CRCache.CRCache()
  42. self.doDataCache = CRDataCache()
  43. self.cacheOwner=CRCache.CRCache()
  44. self.serverDelta = 0
  45. self.bootedIndex = None
  46. self.bootedText = None
  47. # create a parentMgr to handle distributed reparents
  48. # this used to be 'token2nodePath'
  49. self.parentMgr = ParentMgr.ParentMgr()
  50. # The RelatedObjectMgr helps distributed objects find each
  51. # other.
  52. self.relatedObjectMgr = RelatedObjectMgr.RelatedObjectMgr(self)
  53. # This will be filled in when a TimeManager is created.
  54. self.timeManager = None
  55. # Keep track of how recently we last sent a heartbeat message.
  56. # We want to keep these coming at heartbeatInterval seconds.
  57. self.heartbeatInterval = base.config.GetDouble('heartbeat-interval', 10)
  58. self.heartbeatStarted = 0
  59. self.lastHeartbeat = 0
  60. self._delayDeletedDOs = {}
  61. self.specialNameNumber = 0
  62. def setDeferInterval(self, deferInterval):
  63. """Specifies the minimum amount of time, in seconds, that must
  64. elapse before generating any two DistributedObjects whose
  65. class type is marked "deferrable". Set this to 0 to indicate
  66. no deferring will occur."""
  67. self.deferInterval = deferInterval
  68. self.setHandleCUpdates(self.deferInterval == 0)
  69. if self.deferredGenerates:
  70. taskMgr.remove('deferredGenerate')
  71. taskMgr.doMethodLater(self.deferInterval, self.doDeferredGenerate, 'deferredGenerate')
  72. ## def queryObjectAll(self, doID, context=0):
  73. ## """
  74. ## Get a one-time snapshot look at the object.
  75. ## """
  76. ## assert self.notify.debugStateCall(self)
  77. ## # Create a message
  78. ## datagram = PyDatagram()
  79. ## datagram.addServerHeader(
  80. ## doID, localAvatar.getDoId(), 2020)
  81. ## # A context that can be used to index the response if needed
  82. ## datagram.addUint32(context)
  83. ## self.send(datagram)
  84. ## # Make sure the message gets there.
  85. ## self.flush()
  86. def specialName(self, label):
  87. name = ("SpecialName %s %s" % (self.specialNameNumber, label))
  88. self.specialNameNumber += 1
  89. return name
  90. def getTables(self, ownerView):
  91. if ownerView:
  92. return self.doId2ownerView, self.cacheOwner
  93. else:
  94. return self.doId2do, self.cache
  95. def _getMsgName(self, msgId):
  96. # we might get a list of message names, use the first one
  97. return makeList(MsgId2Names.get(msgId, 'UNKNOWN MESSAGE: %s' % msgId))[0]
  98. def allocateContext(self):
  99. self.context+=1
  100. return self.context
  101. def setServerDelta(self, delta):
  102. """
  103. Indicates the approximate difference in seconds between the
  104. client's clock and the server's clock, in universal time (not
  105. including timezone shifts). This is mainly useful for
  106. reporting synchronization information to the logs; don't
  107. depend on it for any precise timing requirements.
  108. Also see Notify.setServerDelta(), which also accounts for a
  109. timezone shift.
  110. """
  111. self.serverDelta = delta
  112. def getServerDelta(self):
  113. return self.serverDelta
  114. def getServerTimeOfDay(self):
  115. """
  116. Returns the current time of day (seconds elapsed since the
  117. 1972 epoch) according to the server's clock. This is in GMT,
  118. and hence is irrespective of timezones.
  119. The value is computed based on the client's clock and the
  120. known delta from the server's clock, which is not terribly
  121. precisely measured and may drift slightly after startup, but
  122. it should be accurate plus or minus a couple of seconds.
  123. """
  124. return time.time() + self.serverDelta
  125. def doGenerate(self, parentId, zoneId, classId, doId, di):
  126. # Look up the dclass
  127. assert parentId == self.GameGlobalsId or parentId in self.doId2do
  128. dclass = self.dclassesByNumber[classId]
  129. assert(self.notify.debug("performing generate for %s %s" % (dclass.getName(), doId)))
  130. dclass.startGenerate()
  131. # Create a new distributed object, and put it in the dictionary
  132. distObj = self.generateWithRequiredOtherFields(dclass, doId, di, parentId, zoneId)
  133. dclass.stopGenerate()
  134. def flushGenerates(self):
  135. """ Forces all pending generates to be performed immediately. """
  136. while self.deferredGenerates:
  137. msgType, extra = self.deferredGenerates[0]
  138. del self.deferredGenerates[0]
  139. self.replayDeferredGenerate(msgType, extra)
  140. taskMgr.remove('deferredGenerate')
  141. def replayDeferredGenerate(self, msgType, extra):
  142. """ Override this to do something appropriate with deferred
  143. "generate" messages when they are replayed().
  144. """
  145. if msgType == CLIENT_ENTER_OBJECT_REQUIRED_OTHER:
  146. # It's a generate message.
  147. doId = extra
  148. if doId in self.deferredDoIds:
  149. args, deferrable, dg, updates = self.deferredDoIds[doId]
  150. del self.deferredDoIds[doId]
  151. self.doGenerate(*args)
  152. if deferrable:
  153. self.lastGenerate = globalClock.getFrameTime()
  154. for dg, di in updates:
  155. # non-DC updates that need to be played back in-order are
  156. # stored as (msgType, (dg, di))
  157. if type(di) is tuple:
  158. msgType = dg
  159. dg, di = di
  160. self.replayDeferredGenerate(msgType, (dg, di))
  161. else:
  162. # ovUpdated is set to True since its OV
  163. # is assumbed to have occured when the
  164. # deferred update was originally received
  165. self.__doUpdate(doId, di, True)
  166. else:
  167. self.notify.warning("Ignoring deferred message %s" % (msgType))
  168. def doDeferredGenerate(self, task):
  169. """ This is the task that generates an object on the deferred
  170. queue. """
  171. now = globalClock.getFrameTime()
  172. while self.deferredGenerates:
  173. if now - self.lastGenerate < self.deferInterval:
  174. # Come back later.
  175. return Task.again
  176. # Generate the next deferred object.
  177. msgType, extra = self.deferredGenerates[0]
  178. del self.deferredGenerates[0]
  179. self.replayDeferredGenerate(msgType, extra)
  180. # All objects are generaetd.
  181. return Task.done
  182. def generateWithRequiredFields(self, dclass, doId, di, parentId, zoneId):
  183. if doId in self.doId2do:
  184. # ...it is in our dictionary.
  185. # Just update it.
  186. distObj = self.doId2do[doId]
  187. assert distObj.dclass == dclass
  188. distObj.generate()
  189. distObj.setLocation(parentId, zoneId)
  190. distObj.updateRequiredFields(dclass, di)
  191. # updateRequiredFields calls announceGenerate
  192. elif self.cache.contains(doId):
  193. # ...it is in the cache.
  194. # Pull it out of the cache:
  195. distObj = self.cache.retrieve(doId)
  196. assert distObj.dclass == dclass
  197. # put it in the dictionary:
  198. self.doId2do[doId] = distObj
  199. # and update it.
  200. distObj.generate()
  201. # make sure we don't have a stale location
  202. distObj.parentId = None
  203. distObj.zoneId = None
  204. distObj.setLocation(parentId, zoneId)
  205. distObj.updateRequiredFields(dclass, di)
  206. # updateRequiredFields calls announceGenerate
  207. else:
  208. # ...it is not in the dictionary or the cache.
  209. # Construct a new one
  210. classDef = dclass.getClassDef()
  211. if classDef == None:
  212. self.notify.error("Could not create an undefined %s object." % (dclass.getName()))
  213. distObj = classDef(self)
  214. distObj.dclass = dclass
  215. # Assign it an Id
  216. distObj.doId = doId
  217. # Put the new do in the dictionary
  218. self.doId2do[doId] = distObj
  219. # Update the required fields
  220. distObj.generateInit() # Only called when constructed
  221. distObj._retrieveCachedData()
  222. distObj.generate()
  223. distObj.setLocation(parentId, zoneId)
  224. distObj.updateRequiredFields(dclass, di)
  225. # updateRequiredFields calls announceGenerate
  226. self.notify.debug("New DO:%s, dclass:%s" % (doId, dclass.getName()))
  227. return distObj
  228. def generateWithRequiredOtherFields(self, dclass, doId, di,
  229. parentId = None, zoneId = None):
  230. if doId in self.doId2do:
  231. # ...it is in our dictionary.
  232. # Just update it.
  233. distObj = self.doId2do[doId]
  234. assert distObj.dclass == dclass
  235. distObj.generate()
  236. distObj.setLocation(parentId, zoneId)
  237. distObj.updateRequiredOtherFields(dclass, di)
  238. # updateRequiredOtherFields calls announceGenerate
  239. elif self.cache.contains(doId):
  240. # ...it is in the cache.
  241. # Pull it out of the cache:
  242. distObj = self.cache.retrieve(doId)
  243. assert distObj.dclass == dclass
  244. # put it in the dictionary:
  245. self.doId2do[doId] = distObj
  246. # and update it.
  247. distObj.generate()
  248. # make sure we don't have a stale location
  249. distObj.parentId = None
  250. distObj.zoneId = None
  251. distObj.setLocation(parentId, zoneId)
  252. distObj.updateRequiredOtherFields(dclass, di)
  253. # updateRequiredOtherFields calls announceGenerate
  254. else:
  255. # ...it is not in the dictionary or the cache.
  256. # Construct a new one
  257. classDef = dclass.getClassDef()
  258. if classDef == None:
  259. self.notify.error("Could not create an undefined %s object." % (dclass.getName()))
  260. distObj = classDef(self)
  261. distObj.dclass = dclass
  262. # Assign it an Id
  263. distObj.doId = doId
  264. # Put the new do in the dictionary
  265. self.doId2do[doId] = distObj
  266. # Update the required fields
  267. distObj.generateInit() # Only called when constructed
  268. distObj._retrieveCachedData()
  269. distObj.generate()
  270. distObj.setLocation(parentId, zoneId)
  271. distObj.updateRequiredOtherFields(dclass, di)
  272. # updateRequiredOtherFields calls announceGenerate
  273. return distObj
  274. def generateWithRequiredOtherFieldsOwner(self, dclass, doId, di):
  275. if doId in self.doId2ownerView:
  276. # ...it is in our dictionary.
  277. # Just update it.
  278. self.notify.error('duplicate owner generate for %s (%s)' % (
  279. doId, dclass.getName()))
  280. distObj = self.doId2ownerView[doId]
  281. assert distObj.dclass == dclass
  282. distObj.generate()
  283. distObj.updateRequiredOtherFields(dclass, di)
  284. # updateRequiredOtherFields calls announceGenerate
  285. elif self.cacheOwner.contains(doId):
  286. # ...it is in the cache.
  287. # Pull it out of the cache:
  288. distObj = self.cacheOwner.retrieve(doId)
  289. assert distObj.dclass == dclass
  290. # put it in the dictionary:
  291. self.doId2ownerView[doId] = distObj
  292. # and update it.
  293. distObj.generate()
  294. distObj.updateRequiredOtherFields(dclass, di)
  295. # updateRequiredOtherFields calls announceGenerate
  296. else:
  297. # ...it is not in the dictionary or the cache.
  298. # Construct a new one
  299. classDef = dclass.getOwnerClassDef()
  300. if classDef == None:
  301. self.notify.error("Could not create an undefined %s object. Have you created an owner view?" % (dclass.getName()))
  302. distObj = classDef(self)
  303. distObj.dclass = dclass
  304. # Assign it an Id
  305. distObj.doId = doId
  306. # Put the new do in the dictionary
  307. self.doId2ownerView[doId] = distObj
  308. # Update the required fields
  309. distObj.generateInit() # Only called when constructed
  310. distObj.generate()
  311. distObj.updateRequiredOtherFields(dclass, di)
  312. # updateRequiredOtherFields calls announceGenerate
  313. return distObj
  314. def disableDoId(self, doId, ownerView=False):
  315. table, cache = self.getTables(ownerView)
  316. # Make sure the object exists
  317. if doId in table:
  318. # Look up the object
  319. distObj = table[doId]
  320. # remove the object from the dictionary
  321. del table[doId]
  322. # Only cache the object if it is a "cacheable" type
  323. # object; this way we don't clutter up the caches with
  324. # trivial objects that don't benefit from caching.
  325. # also don't try to cache an object that is delayDeleted
  326. cached = False
  327. if distObj.getCacheable() and distObj.getDelayDeleteCount() <= 0:
  328. cached = cache.cache(distObj)
  329. if not cached:
  330. distObj.deleteOrDelay()
  331. if distObj.getDelayDeleteCount() <= 0:
  332. # make sure we're not leaking
  333. distObj.detectLeaks()
  334. elif doId in self.deferredDoIds:
  335. # The object had been deferred. Great; we don't even have
  336. # to generate it now.
  337. del self.deferredDoIds[doId]
  338. i = self.deferredGenerates.index((CLIENT_ENTER_OBJECT_REQUIRED_OTHER, doId))
  339. del self.deferredGenerates[i]
  340. if len(self.deferredGenerates) == 0:
  341. taskMgr.remove('deferredGenerate')
  342. else:
  343. self._logFailedDisable(doId, ownerView)
  344. def _logFailedDisable(self, doId, ownerView):
  345. self.notify.warning(
  346. "Disable failed. DistObj "
  347. + str(doId) +
  348. " is not in dictionary, ownerView=%s" % ownerView)
  349. def handleDelete(self, di):
  350. # overridden by ClientRepository
  351. assert 0
  352. def handleUpdateField(self, di):
  353. """
  354. This method is called when a CLIENT_OBJECT_UPDATE_FIELD
  355. message is received; it decodes the update, unpacks the
  356. arguments, and calls the corresponding method on the indicated
  357. DistributedObject.
  358. In fact, this method is exactly duplicated by the C++ method
  359. cConnectionRepository::handle_update_field(), which was
  360. written to optimize the message loop by handling all of the
  361. CLIENT_OBJECT_UPDATE_FIELD messages in C++. That means that
  362. nowadays, this Python method will probably never be called,
  363. since UPDATE_FIELD messages will not even be passed to the
  364. Python message handlers. But this method remains for
  365. documentation purposes, and also as a "just in case" handler
  366. in case we ever do come across a situation in the future in
  367. which python might handle the UPDATE_FIELD message.
  368. """
  369. # Get the DO Id
  370. doId = di.getUint32()
  371. ovUpdated = self.__doUpdateOwner(doId, di)
  372. if doId in self.deferredDoIds:
  373. # This object hasn't really been generated yet. Sit on
  374. # the update.
  375. args, deferrable, dg0, updates = self.deferredDoIds[doId]
  376. # Keep a copy of the datagram, and move the di to the copy
  377. dg = Datagram(di.getDatagram())
  378. di = DatagramIterator(dg, di.getCurrentIndex())
  379. updates.append((dg, di))
  380. else:
  381. # This object has been fully generated. It's OK to update.
  382. self.__doUpdate(doId, di, ovUpdated)
  383. def __doUpdate(self, doId, di, ovUpdated):
  384. # Find the DO
  385. do = self.doId2do.get(doId)
  386. if do is not None:
  387. # Let the dclass finish the job
  388. do.dclass.receiveUpdate(do, di)
  389. elif not ovUpdated:
  390. # this next bit is looking for avatar handles so that if you get an update
  391. # for an avatar that isn't in your doId2do table but there is a
  392. # avatar handle for that object then it's messages will be forwarded to that
  393. # object. We are currently using that for whisper echoing
  394. # if you need a more general perpose system consider registering proxy objects on
  395. # a dict and adding the avatar handles to that dict when they are created
  396. # then change/remove the old method. I didn't do that because I couldn't think
  397. # of a use for it. -JML
  398. try :
  399. handle = self.identifyAvatar(doId)
  400. if handle:
  401. dclass = self.dclassesByName[handle.dclassName]
  402. dclass.receiveUpdate(handle, di)
  403. else:
  404. self.notify.warning(
  405. "Asked to update non-existent DistObj " + str(doId))
  406. except:
  407. self.notify.warning(
  408. "Asked to update non-existent DistObj " + str(doId) + "and failed to find it")
  409. def __doUpdateOwner(self, doId, di):
  410. ovObj = self.doId2ownerView.get(doId)
  411. if ovObj:
  412. odg = Datagram(di.getDatagram())
  413. odi = DatagramIterator(odg, di.getCurrentIndex())
  414. ovObj.dclass.receiveUpdate(ovObj, odi)
  415. return True
  416. return False
  417. def handleGoGetLost(self, di):
  418. # The server told us it's about to drop the connection on us.
  419. # Get ready!
  420. if (di.getRemainingSize() > 0):
  421. self.bootedIndex = di.getUint16()
  422. self.bootedText = di.getString()
  423. self.notify.warning(
  424. "Server is booting us out (%d): %s" % (self.bootedIndex, self.bootedText))
  425. else:
  426. self.bootedIndex = None
  427. self.bootedText = None
  428. self.notify.warning(
  429. "Server is booting us out with no explanation.")
  430. # disconnect now, don't wait for send/recv to fail
  431. self.stopReaderPollTask()
  432. self.lostConnection()
  433. def handleServerHeartbeat(self, di):
  434. # Got a heartbeat message from the server.
  435. if base.config.GetBool('server-heartbeat-info', 1):
  436. self.notify.info("Server heartbeat.")
  437. def handleSystemMessage(self, di):
  438. # Got a system message from the server.
  439. message = di.getString()
  440. self.notify.info('Message from server: %s' % (message))
  441. return message
  442. def handleSystemMessageAknowledge(self, di):
  443. # Got a system message from the server.
  444. message = di.getString()
  445. self.notify.info('Message with aknowledge from server: %s' % (message))
  446. messenger.send("system message aknowledge", [message])
  447. return message
  448. def getObjectsOfClass(self, objClass):
  449. """ returns dict of doId:object, containing all objects
  450. that inherit from 'class'. returned dict is safely mutable. """
  451. doDict = {}
  452. for doId, do in self.doId2do.items():
  453. if isinstance(do, objClass):
  454. doDict[doId] = do
  455. return doDict
  456. def getObjectsOfExactClass(self, objClass):
  457. """ returns dict of doId:object, containing all objects that
  458. are exactly of type 'class' (neglecting inheritance). returned
  459. dict is safely mutable. """
  460. doDict = {}
  461. for doId, do in self.doId2do.items():
  462. if do.__class__ == objClass:
  463. doDict[doId] = do
  464. return doDict
  465. def considerHeartbeat(self):
  466. """Send a heartbeat message if we haven't sent one recently."""
  467. if not self.heartbeatStarted:
  468. self.notify.debug("Heartbeats not started; not sending.")
  469. return
  470. elapsed = globalClock.getRealTime() - self.lastHeartbeat
  471. if elapsed < 0 or elapsed > self.heartbeatInterval:
  472. # It's time to send the heartbeat again (or maybe someone
  473. # reset the clock back).
  474. self.notify.info("Sending heartbeat mid-frame.")
  475. self.startHeartbeat()
  476. def stopHeartbeat(self):
  477. taskMgr.remove("heartBeat")
  478. self.heartbeatStarted = 0
  479. def startHeartbeat(self):
  480. self.stopHeartbeat()
  481. self.heartbeatStarted = 1
  482. self.sendHeartbeat()
  483. self.waitForNextHeartBeat()
  484. def sendHeartbeatTask(self, task):
  485. self.sendHeartbeat()
  486. return Task.again
  487. def waitForNextHeartBeat(self):
  488. taskMgr.doMethodLater(self.heartbeatInterval, self.sendHeartbeatTask,
  489. "heartBeat", taskChain = 'net')
  490. def replaceMethod(self, oldMethod, newFunction):
  491. return 0
  492. def getWorld(self, doId):
  493. # Get the world node for this object
  494. obj = self.doId2do[doId]
  495. worldNP = obj.getParent()
  496. while 1:
  497. nextNP = worldNP.getParent()
  498. if nextNP == render:
  499. break
  500. elif worldNP.isEmpty():
  501. return None
  502. return worldNP
  503. def isLive(self):
  504. if base.config.GetBool('force-live', 0):
  505. return True
  506. return not (__dev__ or launcher.isTestServer())
  507. def isLocalId(self, id):
  508. # By default, no ID's are local. See also
  509. # ClientRepository.isLocalId().
  510. return 0
  511. # methods for tracking delaydeletes
  512. def _addDelayDeletedDO(self, do):
  513. # use the id of the object, it's possible to have multiple DelayDeleted instances
  514. # with identical doIds if an object gets deleted then re-generated
  515. key = id(do)
  516. assert key not in self._delayDeletedDOs
  517. self._delayDeletedDOs[key] = do
  518. def _removeDelayDeletedDO(self, do):
  519. key = id(do)
  520. del self._delayDeletedDOs[key]
  521. def printDelayDeletes(self):
  522. print('DelayDeletes:')
  523. print('=============')
  524. for obj in self._delayDeletedDOs.values():
  525. print('%s\t%s (%s)\tdelayDeletes=%s' % (
  526. obj.doId, safeRepr(obj), itype(obj), obj.getDelayDeleteNames()))