ConnectionRepository.py 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559
  1. from pandac.PandaModules import *
  2. from direct.task import Task
  3. from direct.directnotify import DirectNotifyGlobal
  4. from direct.distributed.DoInterestManager import DoInterestManager
  5. from direct.distributed.DoCollectionManager import DoCollectionManager
  6. from PyDatagram import PyDatagram
  7. from PyDatagramIterator import PyDatagramIterator
  8. import types
  9. import imp
  10. class ConnectionRepository(
  11. DoInterestManager, DoCollectionManager, CConnectionRepository):
  12. """
  13. This is a base class for things that know how to establish a
  14. connection (and exchange datagrams) with a gameserver. This
  15. includes ClientRepository and AIRepository.
  16. """
  17. notify = DirectNotifyGlobal.directNotify.newCategory("ConnectionRepository")
  18. taskPriority = -30
  19. CM_HTTP=0
  20. CM_NSPR=1
  21. CM_NATIVE=2
  22. def __init__(self, connectMethod, config, hasOwnerView=False):
  23. assert self.notify.debugCall()
  24. # let the C connection repository know whether we're supporting
  25. # 'owner' views of distributed objects (i.e. 'receives ownrecv',
  26. # 'I own this object and have a separate view of it regardless of
  27. # where it currently is located')
  28. CConnectionRepository.__init__(self, hasOwnerView)
  29. # DoInterestManager.__init__ relies on CConnectionRepository being
  30. # initialized
  31. DoInterestManager.__init__(self)
  32. DoCollectionManager.__init__(self)
  33. self.setPythonRepository(self)
  34. self.config = config
  35. if hasattr(self, 'setVerbose'):
  36. self.setVerbose(self.config.GetBool('verbose-repository', 0))
  37. # Set this to 'http' to establish a connection to the server
  38. # using the HTTPClient interface, which ultimately uses the
  39. # OpenSSL socket library (even though SSL is not involved).
  40. # This is not as robust a socket library as NSPR's, but the
  41. # HTTPClient interface does a good job of negotiating the
  42. # connection over an HTTP proxy if one is in use.
  43. #
  44. # Set it to 'nspr' to use Panda's net interface
  45. # (e.g. QueuedConnectionManager, etc.) to establish the
  46. # connection, which ultimately uses the NSPR socket library.
  47. # This is a much better socket library, but it may be more
  48. # than you need for most applications; and there is no support
  49. # for proxies.
  50. #
  51. # Set it to 'default' to use the HTTPClient interface if a
  52. # proxy is in place, but the NSPR interface if we don't have a
  53. # proxy.
  54. self.connectMethod=connectMethod
  55. self.connectHttp = None
  56. self.http = None
  57. # This DatagramIterator is constructed once, and then re-used
  58. # each time we read a datagram.
  59. self.__di = PyDatagramIterator()
  60. self.recorder = None
  61. # This is the string that is appended to symbols read from the
  62. # DC file. The AIRepository will redefine this to 'AI'.
  63. self.dcSuffix = ''
  64. self._serverAddress = ''
  65. if self.config.GetBool('want-debug-leak', 1):
  66. import gc
  67. gc.set_debug(gc.DEBUG_SAVEALL)
  68. def generateGlobalObject(self, doId, dcname, values=None):
  69. def applyFieldValues(distObj, dclass, values):
  70. for i in range(dclass.getNumInheritedFields()):
  71. field = dclass.getInheritedField(i)
  72. if field.asMolecularField() == None:
  73. value = values.get(field.getName(), None)
  74. if value is None and field.isRequired():
  75. # Gee, this could be better. What would really be
  76. # nicer is to get value from field.getDefaultValue
  77. # or similar, but that returns a binary string, not
  78. # a python tuple, like the following does. If you
  79. # want to change something better, please go ahead.
  80. packer = DCPacker()
  81. packer.beginPack(field)
  82. packer.packDefaultValue()
  83. packer.endPack()
  84. unpacker = DCPacker()
  85. unpacker.setUnpackData(packer.getString())
  86. unpacker.beginUnpack(field)
  87. value = unpacker.unpackObject()
  88. unpacker.endUnpack()
  89. if value is not None:
  90. function = getattr(distObj, field.getName())
  91. if function is not None:
  92. function(*value)
  93. else:
  94. self.notify.error("\n\n\nNot able to find %s.%s"%(
  95. distObj.__class__.__name__, field.getName()))
  96. # Look up the dclass
  97. dclass = self.dclassesByName.get(dcname+self.dcSuffix)
  98. if dclass is None:
  99. print "\n\n\nNeed to define", dcname+self.dcSuffix
  100. dclass = self.dclassesByName.get(dcname+'AI')
  101. if dclass is None:
  102. dclass = self.dclassesByName.get(dcname)
  103. # Create a new distributed object, and put it in the dictionary
  104. #distObj = self.generateWithRequiredFields(dclass, doId, di)
  105. # Construct a new one
  106. classDef = dclass.getClassDef()
  107. if classDef == None:
  108. self.notify.error("Could not create an undefined %s object."%(
  109. dclass.getName()))
  110. distObj = classDef(self)
  111. distObj.dclass = dclass
  112. # Assign it an Id
  113. distObj.doId = doId
  114. # Put the new do in the dictionary
  115. self.doId2do[doId] = distObj
  116. # Update the required fields
  117. distObj.generateInit() # Only called when constructed
  118. distObj.generate()
  119. if values is not None:
  120. applyFieldValues(distObj, dclass, values)
  121. distObj.announceGenerate()
  122. distObj.parentId = 0
  123. distObj.zoneId = 0
  124. # updateRequiredFields calls announceGenerate
  125. return distObj
  126. def readDCFile(self, dcFileNames = None):
  127. """
  128. Reads in the dc files listed in dcFileNames, or if
  129. dcFileNames is None, reads in all of the dc files listed in
  130. the Configrc file.
  131. """
  132. dcFile = self.getDcFile()
  133. dcFile.clear()
  134. self.dclassesByName = {}
  135. self.dclassesByNumber = {}
  136. self.hashVal = 0
  137. if isinstance(dcFileNames, types.StringTypes):
  138. # If we were given a single string, make it a list.
  139. dcFileNames = [dcFileNames]
  140. dcImports = {}
  141. if dcFileNames == None:
  142. readResult = dcFile.readAll()
  143. if not readResult:
  144. self.notify.error("Could not read dc file.")
  145. else:
  146. for dcFileName in dcFileNames:
  147. readResult = dcFile.read(Filename(dcFileName))
  148. if not readResult:
  149. self.notify.error("Could not read dc file: %s" % (dcFileName))
  150. if not dcFile.allObjectsValid():
  151. names = []
  152. for i in range(dcFile.getNumTypedefs()):
  153. td = dcFile.getTypedef(i)
  154. if td.isBogusTypedef():
  155. names.append(td.getName())
  156. nameList = ', '.join(names)
  157. self.notify.error("Undefined types in DC file: " + nameList)
  158. self.hashVal = dcFile.getHash()
  159. # Now import all of the modules required by the DC file.
  160. for n in range(dcFile.getNumImportModules()):
  161. moduleName = dcFile.getImportModule(n)[:]
  162. # Maybe the module name is represented as "moduleName/AI".
  163. suffix = moduleName.split('/')
  164. moduleName = suffix[0]
  165. suffix=suffix[1:]
  166. if self.dcSuffix in suffix:
  167. moduleName += self.dcSuffix
  168. elif self.dcSuffix == 'UD' and 'AI' in suffix: #HACK:
  169. moduleName += 'AI'
  170. importSymbols = []
  171. for i in range(dcFile.getNumImportSymbols(n)):
  172. symbolName = dcFile.getImportSymbol(n, i)
  173. # Maybe the symbol name is represented as "symbolName/AI".
  174. suffix = symbolName.split('/')
  175. symbolName = suffix[0]
  176. suffix=suffix[1:]
  177. if self.dcSuffix in suffix:
  178. symbolName += self.dcSuffix
  179. elif self.dcSuffix == 'UD' and 'AI' in suffix: #HACK:
  180. symbolName += 'AI'
  181. importSymbols.append(symbolName)
  182. self.importModule(dcImports, moduleName, importSymbols)
  183. # Now get the class definition for the classes named in the DC
  184. # file.
  185. for i in range(dcFile.getNumClasses()):
  186. dclass = dcFile.getClass(i)
  187. number = dclass.getNumber()
  188. className = dclass.getName() + self.dcSuffix
  189. # Does the class have a definition defined in the newly
  190. # imported namespace?
  191. classDef = dcImports.get(className)
  192. if classDef is None and self.dcSuffix == 'UD': #HACK:
  193. className = dclass.getName() + 'AI'
  194. classDef = dcImports.get(className)
  195. # Also try it without the dcSuffix.
  196. if classDef == None:
  197. className = dclass.getName()
  198. classDef = dcImports.get(className)
  199. if classDef is None:
  200. self.notify.info("No class definition for %s." % (className))
  201. else:
  202. if type(classDef) == types.ModuleType:
  203. if not hasattr(classDef, className):
  204. self.notify.error("Module %s does not define class %s." % (className, className))
  205. classDef = getattr(classDef, className)
  206. if type(classDef) != types.ClassType and type(classDef) != types.TypeType:
  207. self.notify.error("Symbol %s is not a class name." % (className))
  208. else:
  209. dclass.setClassDef(classDef)
  210. self.dclassesByName[className] = dclass
  211. if number >= 0:
  212. self.dclassesByNumber[number] = dclass
  213. # Owner Views
  214. if self.hasOwnerView():
  215. ownerDcSuffix = self.dcSuffix + 'OV'
  216. # dict of class names (without 'OV') that have owner views
  217. ownerImportSymbols = {}
  218. # Now import all of the modules required by the DC file.
  219. for n in range(dcFile.getNumImportModules()):
  220. moduleName = dcFile.getImportModule(n)
  221. # Maybe the module name is represented as "moduleName/AI".
  222. suffix = moduleName.split('/')
  223. moduleName = suffix[0]
  224. suffix=suffix[1:]
  225. if ownerDcSuffix in suffix:
  226. moduleName = moduleName + ownerDcSuffix
  227. importSymbols = []
  228. for i in range(dcFile.getNumImportSymbols(n)):
  229. symbolName = dcFile.getImportSymbol(n, i)
  230. # Check for the OV suffix
  231. suffix = symbolName.split('/')
  232. symbolName = suffix[0]
  233. suffix=suffix[1:]
  234. if ownerDcSuffix in suffix:
  235. symbolName += ownerDcSuffix
  236. importSymbols.append(symbolName)
  237. ownerImportSymbols[symbolName] = None
  238. self.importModule(dcImports, moduleName, importSymbols)
  239. # Now get the class definition for the owner classes named
  240. # in the DC file.
  241. for i in range(dcFile.getNumClasses()):
  242. dclass = dcFile.getClass(i)
  243. if ((dclass.getName()+ownerDcSuffix) in ownerImportSymbols):
  244. number = dclass.getNumber()
  245. className = dclass.getName() + ownerDcSuffix
  246. # Does the class have a definition defined in the newly
  247. # imported namespace?
  248. classDef = dcImports.get(className)
  249. if classDef is None:
  250. self.notify.error("No class definition for %s." % className)
  251. else:
  252. if type(classDef) == types.ModuleType:
  253. if not hasattr(classDef, className):
  254. self.notify.error("Module %s does not define class %s." % (className, className))
  255. classDef = getattr(classDef, className)
  256. dclass.setOwnerClassDef(classDef)
  257. self.dclassesByName[className] = dclass
  258. def importModule(self, dcImports, moduleName, importSymbols):
  259. """
  260. Imports the indicated moduleName and all of its symbols
  261. into the current namespace. This more-or-less reimplements
  262. the Python import command.
  263. """
  264. module = __import__(moduleName, globals(), locals(), importSymbols)
  265. if importSymbols:
  266. # "from moduleName import symbolName, symbolName, ..."
  267. # Copy just the named symbols into the dictionary.
  268. if importSymbols == ['*']:
  269. # "from moduleName import *"
  270. if hasattr(module, "__all__"):
  271. importSymbols = module.__all__
  272. else:
  273. importSymbols = module.__dict__.keys()
  274. for symbolName in importSymbols:
  275. if hasattr(module, symbolName):
  276. dcImports[symbolName] = getattr(module, symbolName)
  277. else:
  278. raise StandardError, 'Symbol %s not defined in module %s.' % (symbolName, moduleName)
  279. else:
  280. # "import moduleName"
  281. # Copy the root module name into the dictionary.
  282. # Follow the dotted chain down to the actual module.
  283. components = moduleName.split('.')
  284. dcImports[components[0]] = module
  285. def getServerAddress(self):
  286. return self._serverAddress
  287. def connect(self, serverList,
  288. successCallback = None, successArgs = [],
  289. failureCallback = None, failureArgs = []):
  290. """
  291. Attempts to establish a connection to the server. May return
  292. before the connection is established. The two callbacks
  293. represent the two functions to call (and their arguments) on
  294. success or failure, respectively. The failure callback also
  295. gets one additional parameter, which will be passed in first:
  296. the return status code giving reason for failure, if it is
  297. known.
  298. """
  299. ## if self.recorder and self.recorder.isPlaying():
  300. ## # If we have a recorder and it's already in playback mode,
  301. ## # don't actually attempt to connect to a gameserver since
  302. ## # we don't need to. Just let it play back the data.
  303. ## self.notify.info("Not connecting to gameserver; using playback data instead.")
  304. ## self.connectHttp = 1
  305. ## self.tcpConn = SocketStreamRecorder()
  306. ## self.recorder.addRecorder('gameserver', self.tcpConn)
  307. ## self.startReaderPollTask()
  308. ## if successCallback:
  309. ## successCallback(*successArgs)
  310. ## return
  311. hasProxy = 0
  312. if self.checkHttp():
  313. proxies = self.http.getProxiesForUrl(serverList[0])
  314. hasProxy = (proxies != 'DIRECT')
  315. if hasProxy:
  316. self.notify.info("Connecting to gameserver via proxy list: %s" % (proxies))
  317. else:
  318. self.notify.info("Connecting to gameserver directly (no proxy).")
  319. #Redefine the connection to http or nspr in the default case
  320. self.bootedIndex = None
  321. self.bootedText = None
  322. if self.connectMethod == self.CM_HTTP:
  323. # In the HTTP case, we can't just iterate through the list
  324. # of servers, because each server attempt requires
  325. # spawning a request and then coming back later to check
  326. # the success or failure. Instead, we start the ball
  327. # rolling by calling the connect callback, which will call
  328. # itself repeatedly until we establish a connection (or
  329. # run out of servers).
  330. ch = self.http.makeChannel(0)
  331. self.httpConnectCallback(
  332. ch, serverList, 0,
  333. successCallback, successArgs,
  334. failureCallback, failureArgs)
  335. elif self.connectMethod == self.CM_NSPR or (not hasattr(self,"connectNative")):
  336. # Try each of the servers in turn.
  337. for url in serverList:
  338. self.notify.info("Connecting to %s via NSPR interface." % (url.cStr()))
  339. if self.tryConnectNspr(url):
  340. self.startReaderPollTask()
  341. if successCallback:
  342. successCallback(*successArgs)
  343. return
  344. # Failed to connect.
  345. if failureCallback:
  346. failureCallback(0, '', *failureArgs)
  347. elif self.connectMethod == self.CM_NATIVE:
  348. for url in serverList:
  349. self.notify.info("Connecting to %s via Native interface." % (url.cStr()))
  350. if self.connectNative(url):
  351. self.startReaderPollTask()
  352. if successCallback:
  353. successCallback(*successArgs)
  354. return
  355. # Failed to connect.
  356. if failureCallback:
  357. failureCallback(0, '', *failureArgs)
  358. else:
  359. print "uh oh, we aren't using one of the tri-state CM variables"
  360. failureCallback(0, '', *failureArgs)
  361. def disconnect(self):
  362. """
  363. Closes the previously-established connection.
  364. """
  365. self.notify.info("Closing connection to server.")
  366. self._serverAddress = ''
  367. CConnectionRepository.disconnect(self)
  368. self.stopReaderPollTask()
  369. def httpConnectCallback(self, ch, serverList, serverIndex,
  370. successCallback, successArgs,
  371. failureCallback, failureArgs):
  372. if ch.isConnectionReady():
  373. self.setConnectionHttp(ch)
  374. self._serverAddress = serverList[serverIndex-1]
  375. ## if self.recorder:
  376. ## # If we have a recorder, we wrap the connect inside a
  377. ## # SocketStreamRecorder, which will trap incoming data
  378. ## # when the recorder is set to record mode. (It will
  379. ## # also play back data when the recorder is in playback
  380. ## # mode, but in that case we never get this far in the
  381. ## # code, since we just create an empty
  382. ## # SocketStreamRecorder without actually connecting to
  383. ## # the gameserver.)
  384. ## stream = SocketStreamRecorder(self.tcpConn, 1)
  385. ## self.recorder.addRecorder('gameserver', stream)
  386. ## # In this case, we pass ownership of the original
  387. ## # connection to the SocketStreamRecorder object.
  388. ## self.tcpConn.userManagesMemory = 0
  389. ## self.tcpConn = stream
  390. self.startReaderPollTask()
  391. if successCallback:
  392. successCallback(*successArgs)
  393. elif serverIndex < len(serverList):
  394. # No connection yet, but keep trying.
  395. url = serverList[serverIndex]
  396. self.notify.info("Connecting to %s via HTTP interface." % (url.cStr()))
  397. ch.preserveStatus()
  398. ch.beginConnectTo(DocumentSpec(url))
  399. ch.spawnTask(name = 'connect-to-server',
  400. callback = self.httpConnectCallback,
  401. extraArgs = [ch, serverList, serverIndex + 1,
  402. successCallback, successArgs,
  403. failureCallback, failureArgs])
  404. else:
  405. # No more servers to try; we have to give up now.
  406. if failureCallback:
  407. failureCallback(ch.getStatusCode(), ch.getStatusString(),
  408. *failureArgs)
  409. def checkHttp(self):
  410. # Creates an HTTPClient, if possible, if we don't have one
  411. # already. This might fail if the OpenSSL library isn't
  412. # available. Returns the HTTPClient (also self.http), or None
  413. # if not set.
  414. if self.http == None:
  415. try:
  416. self.http = HTTPClient()
  417. except:
  418. pass
  419. return self.http
  420. def startReaderPollTask(self):
  421. print '########## startReaderPollTask'
  422. # Stop any tasks we are running now
  423. self.stopReaderPollTask()
  424. self.accept(CConnectionRepository.getOverflowEventName(),
  425. self.handleReaderOverflow)
  426. taskMgr.add(self.readerPollUntilEmpty, self.uniqueName("readerPollTask"),
  427. priority = self.taskPriority)
  428. def stopReaderPollTask(self):
  429. print '########## stopReaderPollTask'
  430. taskMgr.remove(self.uniqueName("readerPollTask"))
  431. self.ignore(CConnectionRepository.getOverflowEventName())
  432. def readerPollUntilEmpty(self, task):
  433. while self.readerPollOnce():
  434. pass
  435. return Task.cont
  436. def readerPollOnce(self):
  437. if self.checkDatagram():
  438. self.getDatagramIterator(self.__di)
  439. self.handleDatagram(self.__di)
  440. return 1
  441. # Unable to receive a datagram: did we lose the connection?
  442. if not self.isConnected():
  443. self.stopReaderPollTask()
  444. self.lostConnection()
  445. return 0
  446. def handleReaderOverflow(self):
  447. # this is called if the incoming-datagram queue overflowed and
  448. # we lost some data. Override and handle if desired.
  449. pass
  450. def lostConnection(self):
  451. # This should be overrided by a derived class to handle an
  452. # unexpectedly lost connection to the gameserver.
  453. self.notify.warning("Lost connection to gameserver.")
  454. def handleDatagram(self, di):
  455. # This class is meant to be pure virtual, and any classes that
  456. # inherit from it need to make their own handleDatagram method
  457. pass
  458. def send(self, datagram):
  459. # Zero-length datagrams might freak out the server. No point
  460. # in sending them, anyway.
  461. if datagram.getLength() > 0:
  462. if ConnectionRepository.notify.getDebug():
  463. print "ConnectionRepository sending datagram:"
  464. datagram.dumpHex(ostream)
  465. self.sendDatagram(datagram)
  466. # debugging funcs for simulating a network-plug-pull
  467. def pullNetworkPlug(self):
  468. self.notify.warning('*** SIMULATING A NETWORK-PLUG-PULL ***')
  469. self.setSimulatedDisconnect(1)
  470. def networkPlugPulled(self):
  471. return self.getSimulatedDisconnect()
  472. def restoreNetworkPlug(self):
  473. if self.networkPlugPulled():
  474. self.notify.info('*** RESTORING SIMULATED PULLED-NETWORK-PLUG ***')
  475. self.setSimulatedDisconnect(0)