ClientRepository.py 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597
  1. """ClientRepository module: contains the ClientRepository class"""
  2. from PandaModules import *
  3. from TaskManagerGlobal import *
  4. from MsgTypes import *
  5. from ShowBaseGlobal import *
  6. import Task
  7. import DirectNotifyGlobal
  8. import ClientDistClass
  9. import CRCache
  10. # The repository must import all known types of Distributed Objects
  11. #import DistributedObject
  12. #import DistributedToon
  13. import DirectObject
  14. class ClientRepository(DirectObject.DirectObject):
  15. notify = DirectNotifyGlobal.directNotify.newCategory("ClientRepository")
  16. TASK_PRIORITY = -30
  17. def __init__(self, dcFileName):
  18. self.number2cdc={}
  19. self.name2cdc={}
  20. self.doId2do={}
  21. self.doId2cdc={}
  22. self.parseDcFile(dcFileName)
  23. self.cache=CRCache.CRCache()
  24. # Set this true to establish a connection to the server using
  25. # the HTTPClient interface, which ultimately uses the OpenSSL
  26. # socket library (even though SSL is not involved). This is
  27. # not as robust a socket library as NSPR's, but the HTTPClient
  28. # interface does a good job of negotiating the connection over
  29. # an HTTP proxy if one is in use.
  30. # Set it false to use Panda's net interface
  31. # (e.g. QueuedConnectionManager, etc.) to establish the
  32. # connection, which ultimately uses the NSPR socket library.
  33. # This is a much better socket library, but it may be more
  34. # than you need for most applications; and the proxy support
  35. # is weak.
  36. self.connectHttp = base.config.GetBool('connect-http', 1)
  37. self.bootedIndex = None
  38. self.bootedText = None
  39. self.tcpConn = None
  40. return None
  41. def parseDcFile(self, dcFileName):
  42. self.dcFile = DCFile()
  43. readResult = self.dcFile.read(dcFileName)
  44. if not readResult:
  45. self.notify.error("Could not read dcfile: " + dcFileName)
  46. self.hashVal = self.dcFile.getHash()
  47. return self.parseDcClasses(self.dcFile)
  48. def parseDcClasses(self, dcFile):
  49. numClasses = dcFile.getNumClasses()
  50. for i in range(0, numClasses):
  51. # Create a clientDistClass from the dcClass
  52. dcClass = dcFile.getClass(i)
  53. clientDistClass = ClientDistClass.ClientDistClass(dcClass)
  54. # List the cdc in the number and name dictionaries
  55. self.number2cdc[dcClass.getNumber()]=clientDistClass
  56. self.name2cdc[dcClass.getName()]=clientDistClass
  57. return None
  58. def connect(self, serverURL,
  59. successCallback = None, successArgs = [],
  60. failureCallback = None, failureArgs = []):
  61. """
  62. Attempts to establish a connection to the server. May return
  63. before the connection is established. The two callbacks
  64. represent the two functions to call (and their arguments) on
  65. success or failure, respectively. The failure callback also
  66. gets one additional parameter, which will be passed in first:
  67. the return status code giving reason for failure, if it is
  68. known.
  69. """
  70. self.bootedIndex = None
  71. self.bootedText = None
  72. if self.connectHttp:
  73. ch = self.http.makeChannel(0)
  74. ch.beginConnectTo(serverURL)
  75. ch.spawnTask(name = 'connect-to-server',
  76. callback = self.httpConnectCallback,
  77. extraArgs = [ch, successCallback, successArgs,
  78. failureCallback, failureArgs])
  79. else:
  80. self.qcm = QueuedConnectionManager()
  81. # A big old 20 second timeout.
  82. gameServerTimeoutMs = base.config.GetInt("game-server-timeout-ms",
  83. 20000)
  84. if self.hasProxy:
  85. url = self.proxy
  86. else:
  87. url = serverURL
  88. self.tcpConn = self.qcm.openTCPClientConnection(
  89. url.getServer(), url.getPort(),
  90. gameServerTimeoutMs)
  91. if self.tcpConn:
  92. self.tcpConn.setNoDelay(1)
  93. self.qcr=QueuedConnectionReader(self.qcm, 0)
  94. self.qcr.addConnection(self.tcpConn)
  95. self.cw=ConnectionWriter(self.qcm, 0)
  96. if self.hasProxy:
  97. # Now we send an http CONNECT message on that
  98. # connection to initiate a connection to the real
  99. # game server
  100. realGameServer = (serverURL.getServer() + ":" + str(serverURL.getPort()))
  101. connectString = "CONNECT " + realGameServer + " HTTP/1.0\012\012"
  102. datagram = Datagram()
  103. # Use appendData and sendRaw so we do not send the length of the string
  104. datagram.appendData(connectString)
  105. self.notify.info("Sending CONNECT string: " + connectString)
  106. self.cw.setRawMode(1)
  107. self.qcr.setRawMode(1)
  108. self.notify.info("done set raw mode")
  109. self.send(datagram)
  110. self.notify.info("done send datagram")
  111. # Find the end of the http response, then call callback
  112. self.findRawString(["\015\012", "\015\015"],
  113. self.proxyConnectCallback, [successCallback, successArgs])
  114. self.notify.info("done find raw string")
  115. # Now start the raw reader poll task and look for
  116. # the HTTP response When this is finished, it will
  117. # call the connect callback just like the non
  118. # proxy case
  119. self.startRawReaderPollTask()
  120. self.notify.info("done start raw reader poll task")
  121. else:
  122. # no proxy. We're done connecting.
  123. self.startReaderPollTask()
  124. if successCallback:
  125. successCallback(*successArgs)
  126. else:
  127. # Failed to connect.
  128. if failureCallback:
  129. failureCallback(0, *failureArgs)
  130. def httpConnectCallback(self, ch, successCallback, successArgs,
  131. failureCallback, failureArgs):
  132. if ch.isConnectionReady():
  133. self.tcpConn = ch.getConnection()
  134. self.tcpConn.userManagesMemory = 1
  135. self.startReaderPollTask()
  136. if successCallback:
  137. successCallback(*successArgs)
  138. else:
  139. # Failed to connect.
  140. if failureCallback:
  141. failureCallback(ch.getStatusCode(), *failureArgs)
  142. def proxyConnectCallback(self, successCallback, successArgs):
  143. # Make sure we are not in raw mode anymore
  144. self.cw.setRawMode(0)
  145. self.qcr.setRawMode(0)
  146. self.stopRawReaderPollTask()
  147. if successCallback:
  148. successCallback(*successArgs)
  149. def startRawReaderPollTask(self):
  150. # Stop any tasks we are running now
  151. self.stopRawReaderPollTask()
  152. self.stopReaderPollTask()
  153. task = Task.Task(self.rawReaderPollUntilEmpty)
  154. # Start with empty string
  155. task.currentRawString = ""
  156. taskMgr.add(task, "rawReaderPollTask", priority=self.TASK_PRIORITY)
  157. return None
  158. def stopRawReaderPollTask(self):
  159. taskMgr.remove("rawReaderPollTask")
  160. return None
  161. def rawReaderPollUntilEmpty(self, task):
  162. while self.rawReaderPollOnce():
  163. pass
  164. return Task.cont
  165. def rawReaderPollOnce(self):
  166. self.notify.debug("rawReaderPollOnce")
  167. self.ensureValidConnection()
  168. availGetVal = self.qcr.dataAvailable()
  169. if availGetVal:
  170. datagram = NetDatagram()
  171. readRetVal = self.qcr.getData(datagram)
  172. if readRetVal:
  173. str = datagram.getMessage()
  174. self.notify.debug("rawReaderPollOnce: found str: " + str)
  175. self.handleRawString(str)
  176. else:
  177. ClientRepository.notify.warning("getData returned false")
  178. return availGetVal
  179. def handleRawString(self, str):
  180. self.notify.info("handleRawString: str = <%s>" % (str))
  181. self.currentRawString += str
  182. self.notify.info("currentRawString = <%s>" % (self.currentRawString))
  183. # Look in all the match strings to see if we got it yet
  184. for matchString in self.rawStringMatchList:
  185. if (self.currentRawString.find(matchString) >= 0):
  186. self.rawStringCallback(*self.rawStringExtraArgs)
  187. return
  188. def findRawString(self, matchList, callback, extraArgs = []):
  189. self.currentRawString = ""
  190. self.rawStringMatchList = matchList
  191. self.rawStringCallback = callback
  192. self.rawStringExtraArgs = extraArgs
  193. def startReaderPollTask(self):
  194. # Stop any tasks we are running now
  195. self.stopReaderPollTask()
  196. taskMgr.add(self.readerPollUntilEmpty, "readerPollTask",
  197. priority=self.TASK_PRIORITY)
  198. return None
  199. def stopReaderPollTask(self):
  200. taskMgr.remove("readerPollTask")
  201. return None
  202. def readerPollUntilEmpty(self, task):
  203. while self.readerPollOnce():
  204. pass
  205. return Task.cont
  206. def readerPollOnce(self):
  207. if self.connectHttp:
  208. datagram = Datagram()
  209. if self.tcpConn.receiveDatagram(datagram):
  210. self.handleDatagram(datagram)
  211. return 1
  212. # Unable to receive a datagram: did we lose the connection?
  213. if self.tcpConn.isClosed():
  214. self.tcpConn = None
  215. self.stopReaderPollTask()
  216. self.loginFSM.request("noConnection")
  217. return 0
  218. else:
  219. self.ensureValidConnection()
  220. if self.qcr.dataAvailable():
  221. datagram = NetDatagram()
  222. if self.qcr.getData(datagram):
  223. self.handleDatagram(datagram)
  224. return 1
  225. return 0
  226. def ensureValidConnection(self):
  227. # Was the connection reset?
  228. if self.connectHttp:
  229. pass
  230. else:
  231. if self.qcm.resetConnectionAvailable():
  232. resetConnectionPointer = PointerToConnection()
  233. if self.qcm.getResetConnection(resetConnectionPointer):
  234. resetConn = resetConnectionPointer.p()
  235. self.qcm.closeConnection(resetConn)
  236. if self.tcpConn.this == resetConn.this:
  237. self.tcpConn = None
  238. self.stopReaderPollTask()
  239. self.loginFSM.request("noConnection")
  240. else:
  241. self.notify.warning("Lost unknown connection.")
  242. return None
  243. def handleDatagram(self, datagram):
  244. # This class is meant to be pure virtual, and any classes that
  245. # inherit from it need to make their own handleDatagram method
  246. pass
  247. def handleGenerateWithRequired(self, di):
  248. # Get the class Id
  249. classId = di.getArg(STUint16);
  250. # Get the DO Id
  251. doId = di.getArg(STUint32)
  252. # Look up the cdc
  253. cdc = self.number2cdc[classId]
  254. # Create a new distributed object, and put it in the dictionary
  255. distObj = self.generateWithRequiredFields(cdc, doId, di)
  256. return None
  257. def handleGenerateWithRequiredOther(self, di):
  258. # Get the class Id
  259. classId = di.getArg(STUint16);
  260. # Get the DO Id
  261. doId = di.getArg(STUint32)
  262. # Look up the cdc
  263. cdc = self.number2cdc[classId]
  264. # Create a new distributed object, and put it in the dictionary
  265. distObj = self.generateWithRequiredOtherFields(cdc, doId, di)
  266. return None
  267. def handleQuietZoneGenerateWithRequired(self, di):
  268. # Special handler for quiet zone generates -- we need to filter
  269. # Get the class Id
  270. classId = di.getArg(STUint16);
  271. # Get the DO Id
  272. doId = di.getArg(STUint32)
  273. # Look up the cdc
  274. cdc = self.number2cdc[classId]
  275. # If the class is a neverDisable class (which implies uberzone) we
  276. # should go ahead and generate it even though we are in the quiet zone
  277. if cdc.constructor.neverDisable:
  278. # Create a new distributed object, and put it in the dictionary
  279. distObj = self.generateWithRequiredFields(cdc, doId, di)
  280. return None
  281. def handleQuietZoneGenerateWithRequiredOther(self, di):
  282. # Special handler for quiet zone generates -- we need to filter
  283. # Get the class Id
  284. classId = di.getArg(STUint16);
  285. # Get the DO Id
  286. doId = di.getArg(STUint32)
  287. # Look up the cdc
  288. cdc = self.number2cdc[classId]
  289. # If the class is a neverDisable class (which implies uberzone) we
  290. # should go ahead and generate it even though we are in the quiet zone
  291. if cdc.constructor.neverDisable:
  292. # Create a new distributed object, and put it in the dictionary
  293. distObj = self.generateWithRequiredOtherFields(cdc, doId, di)
  294. return None
  295. def generateWithRequiredFields(self, cdc, doId, di):
  296. # Is it in our dictionary?
  297. if self.doId2do.has_key(doId):
  298. # If so, just update it.
  299. distObj = self.doId2do[doId]
  300. distObj.generate()
  301. distObj.updateRequiredFields(cdc, di)
  302. distObj.announceGenerate()
  303. # Is it in the cache? If so, pull it out, put it in the dictionaries,
  304. # and update it.
  305. elif self.cache.contains(doId):
  306. # If so, pull it out of the cache...
  307. distObj = self.cache.retrieve(doId)
  308. # put it in both dictionaries...
  309. self.doId2do[doId] = distObj
  310. self.doId2cdc[doId] = cdc
  311. # and update it.
  312. distObj.generate()
  313. distObj.updateRequiredFields(cdc, di)
  314. distObj.announceGenerate()
  315. # If it is not in the dictionary or the cache, then...
  316. else:
  317. # Construct a new one
  318. distObj = cdc.constructor(self)
  319. # Assign it an Id
  320. distObj.doId = doId
  321. # Put the new do in both dictionaries
  322. self.doId2do[doId] = distObj
  323. self.doId2cdc[doId] = cdc
  324. # Update the required fields
  325. distObj.generateInit() # Only called when constructed
  326. distObj.generate()
  327. distObj.updateRequiredFields(cdc, di)
  328. distObj.announceGenerate()
  329. return distObj
  330. def generateWithRequiredOtherFields(self, cdc, doId, di):
  331. # Is it in our dictionary?
  332. if self.doId2do.has_key(doId):
  333. # If so, just update it.
  334. distObj = self.doId2do[doId]
  335. distObj.generate()
  336. distObj.updateRequiredOtherFields(cdc, di)
  337. distObj.announceGenerate()
  338. # Is it in the cache? If so, pull it out, put it in the dictionaries,
  339. # and update it.
  340. elif self.cache.contains(doId):
  341. # If so, pull it out of the cache...
  342. distObj = self.cache.retrieve(doId)
  343. # put it in both dictionaries...
  344. self.doId2do[doId] = distObj
  345. self.doId2cdc[doId] = cdc
  346. # and update it.
  347. distObj.generate()
  348. distObj.updateRequiredOtherFields(cdc, di)
  349. distObj.announceGenerate()
  350. # If it is not in the dictionary or the cache, then...
  351. else:
  352. # Construct a new one
  353. distObj = cdc.constructor(self)
  354. # Assign it an Id
  355. distObj.doId = doId
  356. # Put the new do in both dictionaries
  357. self.doId2do[doId] = distObj
  358. self.doId2cdc[doId] = cdc
  359. # Update the required fields
  360. distObj.generateInit() # Only called when constructed
  361. distObj.generate()
  362. distObj.updateRequiredOtherFields(cdc, di)
  363. distObj.announceGenerate()
  364. return distObj
  365. def handleDisable(self, di):
  366. # Get the DO Id
  367. doId = di.getArg(STUint32)
  368. # disable it.
  369. self.disableDoId(doId)
  370. return None
  371. def disableDoId(self, doId):
  372. # Make sure the object exists
  373. if self.doId2do.has_key(doId):
  374. # Look up the object
  375. distObj = self.doId2do[doId]
  376. # remove the object from both dictionaries
  377. del(self.doId2do[doId])
  378. del(self.doId2cdc[doId])
  379. assert(len(self.doId2do) == len(self.doId2cdc))
  380. # Only cache the object if it is a "cacheable" type
  381. # object; this way we don't clutter up the caches with
  382. # trivial objects that don't benefit from caching.
  383. if distObj.getCacheable():
  384. self.cache.cache(distObj)
  385. else:
  386. distObj.deleteOrDelay()
  387. else:
  388. ClientRepository.notify.warning("Disable failed. DistObj " +
  389. str(doId) +
  390. " is not in dictionary")
  391. return None
  392. def handleDelete(self, di):
  393. # Get the DO Id
  394. doId = di.getArg(STUint32)
  395. self.deleteObject(doId)
  396. def deleteObject(self, doId):
  397. """deleteObject(self, doId)
  398. Removes the object from the client's view of the world. This
  399. should normally not be called except in the case of error
  400. recovery, since the server will normally be responsible for
  401. deleting and disabling objects as they go out of scope.
  402. After this is called, future updates by server on this object
  403. will be ignored (with a warning message). The object will
  404. become valid again the next time the server sends a generate
  405. message for this doId.
  406. This is not a distributed message and does not delete the
  407. object on the server or on any other client.
  408. """
  409. # If it is in the dictionaries, remove it.
  410. if self.doId2do.has_key(doId):
  411. obj = self.doId2do[doId]
  412. # Remove it from the dictionaries
  413. del(self.doId2do[doId])
  414. del(self.doId2cdc[doId])
  415. # Sanity check the dictionaries
  416. assert(len(self.doId2do) == len(self.doId2cdc))
  417. # Disable, announce, and delete the object itself...
  418. # unless delayDelete is on...
  419. obj.deleteOrDelay()
  420. # If it is in the cache, remove it.
  421. elif self.cache.contains(doId):
  422. self.cache.delete(doId)
  423. # Otherwise, ignore it
  424. else:
  425. ClientRepository.notify.warning(
  426. "Asked to delete non-existent DistObj " + str(doId))
  427. return None
  428. def handleUpdateField(self, di):
  429. # Get the DO Id
  430. doId = di.getArg(STUint32)
  431. #print("Updating " + str(doId))
  432. # Find the DO
  433. do = self.doId2do.get(doId)
  434. cdc = self.doId2cdc.get(doId)
  435. if (do != None and cdc != None):
  436. # Let the cdc finish the job
  437. cdc.updateField(do, di)
  438. else:
  439. ClientRepository.notify.warning(
  440. "Asked to update non-existent DistObj " + str(doId))
  441. return None
  442. def handleGoGetLost(self, di):
  443. # The server told us it's about to drop the connection on us.
  444. # Get ready!
  445. if (di.getRemainingSize() > 0):
  446. self.bootedIndex = di.getUint16()
  447. self.bootedText = di.getString()
  448. ClientRepository.notify.warning(
  449. "Server is booting us out (%d): %s" % (self.bootedIndex, self.bootedText))
  450. else:
  451. self.bootedIndex = None
  452. self.bootedText = None
  453. ClientRepository.notify.warning(
  454. "Server is booting us out with no explanation.")
  455. def handleUnexpectedMsgType(self, msgType, di):
  456. if msgType == CLIENT_GO_GET_LOST:
  457. self.handleGoGetLost(di)
  458. else:
  459. currentLoginState = self.loginFSM.getCurrentState()
  460. if currentLoginState:
  461. currentLoginStateName = currentLoginState.getName()
  462. else:
  463. currentLoginStateName = "None"
  464. currentGameState = self.gameFSM.getCurrentState()
  465. if currentGameState:
  466. currentGameStateName = currentGameState.getName()
  467. else:
  468. currentGameStateName = "None"
  469. ClientRepository.notify.warning(
  470. "Ignoring unexpected message type: " +
  471. str(msgType) +
  472. " login state: " +
  473. currentLoginStateName +
  474. " game state: " +
  475. currentGameStateName)
  476. return None
  477. def sendSetShardMsg(self, shardId):
  478. datagram = Datagram()
  479. # Add message type
  480. datagram.addUint16(CLIENT_SET_SHARD)
  481. # Add shard id
  482. datagram.addUint32(shardId)
  483. # send the message
  484. self.send(datagram)
  485. return None
  486. def sendSetZoneMsg(self, zoneId):
  487. datagram = Datagram()
  488. # Add message type
  489. datagram.addUint16(CLIENT_SET_ZONE)
  490. # Add zone id
  491. datagram.addUint16(zoneId)
  492. # send the message
  493. self.send(datagram)
  494. return None
  495. def sendUpdate(self, do, fieldName, args, sendToId = None):
  496. # Get the DO id
  497. doId = do.doId
  498. # Get the cdc
  499. cdc = self.doId2cdc.get(doId, None)
  500. if cdc:
  501. # Let the cdc finish the job
  502. cdc.sendUpdate(self, do, fieldName, args, sendToId)
  503. def send(self, datagram):
  504. if self.notify.getDebug():
  505. print "ClientRepository sending datagram:"
  506. datagram.dumpHex(ostream)
  507. if not self.tcpConn:
  508. self.notify.warning("Unable to send message after connection is closed.")
  509. return
  510. if self.connectHttp:
  511. if not self.tcpConn.sendDatagram(datagram):
  512. self.notify.warning("Could not send datagram.")
  513. else:
  514. self.cw.send(datagram, self.tcpConn)
  515. return None
  516. def replaceMethod(self, oldMethod, newFunction):
  517. foundIt = 0
  518. import new
  519. # Iterate over the ClientDistClasses
  520. for cdc in self.number2cdc.values():
  521. # Iterate over the ClientDistUpdates
  522. for cdu in cdc.allCDU:
  523. method = cdu.func
  524. # See if this is a match
  525. if (method and (method.im_func == oldMethod)):
  526. # Create a new unbound method out of this new function
  527. newMethod = new.instancemethod(newFunction,
  528. method.im_self,
  529. method.im_class)
  530. # Set the new method on the cdu
  531. cdu.func = newMethod
  532. foundIt = 1
  533. return foundIt