ConnectionRepository.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371
  1. from PandaModules import *
  2. import Task
  3. import DirectNotifyGlobal
  4. import DirectObject
  5. class ConnectionRepository(DirectObject.DirectObject):
  6. """
  7. This is a base class for things that know how to establish a
  8. connection (and exchange datagrams) with a gameserver. This
  9. includes ClientRepository and AIRepository.
  10. """
  11. notify = DirectNotifyGlobal.directNotify.newCategory("ConnectionRepository")
  12. taskPriority = -30
  13. def __init__(self, config):
  14. DirectObject.DirectObject.__init__(self)
  15. self.config = config
  16. # Set this to 'http' to establish a connection to the server
  17. # using the HTTPClient interface, which ultimately uses the
  18. # OpenSSL socket library (even though SSL is not involved).
  19. # This is not as robust a socket library as NSPR's, but the
  20. # HTTPClient interface does a good job of negotiating the
  21. # connection over an HTTP proxy if one is in use.
  22. # Set it to 'nspr' to use Panda's net interface
  23. # (e.g. QueuedConnectionManager, etc.) to establish the
  24. # connection, which ultimately uses the NSPR socket library.
  25. # This is a much better socket library, but it may be more
  26. # than you need for most applications; and there is no support
  27. # for proxies.
  28. # Set it to 'default' to use the HTTPClient interface if a
  29. # proxy is in place, but the NSPR interface if we don't have a
  30. # proxy.
  31. self.connectMethod = self.config.GetString('connect-method', 'default')
  32. self.connectHttp = None
  33. self.http = None
  34. self.qcm = None
  35. self.cw = None
  36. self.tcpConn = None
  37. self.recorder = None
  38. # Reader statistics
  39. self.rsDatagramCount = 0
  40. self.rsUpdateObjs = {}
  41. self.rsLastUpdate = 0
  42. self.rsDoReport = self.config.GetBool('reader-statistics', 0)
  43. self.rsUpdateInterval = self.config.GetDouble('reader-statistics-interval', 10)
  44. def connect(self, serverList,
  45. successCallback = None, successArgs = [],
  46. failureCallback = None, failureArgs = []):
  47. """
  48. Attempts to establish a connection to the server. May return
  49. before the connection is established. The two callbacks
  50. represent the two functions to call (and their arguments) on
  51. success or failure, respectively. The failure callback also
  52. gets one additional parameter, which will be passed in first:
  53. the return status code giving reason for failure, if it is
  54. known.
  55. """
  56. if self.recorder and self.recorder.isPlaying():
  57. # If we have a recorder and it's already in playback mode,
  58. # don't actually attempt to connect to a gameserver since
  59. # we don't need to. Just let it play back the data.
  60. self.notify.info("Not connecting to gameserver; using playback data instead.")
  61. self.connectHttp = 1
  62. self.tcpConn = SocketStreamRecorder()
  63. self.recorder.addRecorder('gameserver', self.tcpConn)
  64. self.startReaderPollTask()
  65. if successCallback:
  66. successCallback(*successArgs)
  67. return
  68. hasProxy = 0
  69. if self.checkHttp():
  70. proxies = self.http.getProxiesForUrl(serverList[0])
  71. hasProxy = (proxies != 'DIRECT')
  72. if hasProxy:
  73. self.notify.info("Connecting to gameserver via proxy list: %s" % (proxies))
  74. else:
  75. self.notify.info("Connecting to gameserver directly (no proxy).");
  76. if self.connectMethod == 'http':
  77. self.connectHttp = 1
  78. elif self.connectMethod == 'nspr':
  79. self.connectHttp = 0
  80. else:
  81. self.connectHttp = (hasProxy or serverList[0].isSsl())
  82. self.bootedIndex = None
  83. self.bootedText = None
  84. if self.connectHttp:
  85. # In the HTTP case, we can't just iterate through the list
  86. # of servers, because each server attempt requires
  87. # spawning a request and then coming back later to check
  88. # the success or failure. Instead, we start the ball
  89. # rolling by calling the connect callback, which will call
  90. # itself repeatedly until we establish a connection (or
  91. # run out of servers).
  92. ch = self.http.makeChannel(0)
  93. self.httpConnectCallback(ch, serverList, 0,
  94. successCallback, successArgs,
  95. failureCallback, failureArgs)
  96. else:
  97. if self.qcm == None:
  98. self.qcm = QueuedConnectionManager()
  99. if self.cw == None:
  100. self.cw = ConnectionWriter(self.qcm, 0)
  101. self.qcr = QueuedConnectionReader(self.qcm, 0)
  102. minLag = self.config.GetFloat('min-lag', 0.)
  103. maxLag = self.config.GetFloat('max-lag', 0.)
  104. if minLag or maxLag:
  105. self.qcr.startDelay(minLag, maxLag)
  106. # A big old 20 second timeout.
  107. gameServerTimeoutMs = self.config.GetInt("game-server-timeout-ms",
  108. 20000)
  109. # Try each of the servers in turn.
  110. for url in serverList:
  111. self.notify.info("Connecting to %s via NSPR interface." % (url.cStr()))
  112. self.tcpConn = self.qcm.openTCPClientConnection(
  113. url.getServer(), url.getPort(),
  114. gameServerTimeoutMs)
  115. if self.tcpConn:
  116. self.tcpConn.setNoDelay(1)
  117. self.qcr.addConnection(self.tcpConn)
  118. self.startReaderPollTask()
  119. if successCallback:
  120. successCallback(*successArgs)
  121. return
  122. # Failed to connect.
  123. if failureCallback:
  124. failureCallback(0, '', *failureArgs)
  125. def disconnect(self):
  126. """Closes the previously-established connection.
  127. """
  128. self.notify.info("Closing connection to server.")
  129. if self.tcpConn != None:
  130. if self.connectHttp:
  131. self.tcpConn.close()
  132. else:
  133. self.qcm.closeConnection(self.tcpConn)
  134. self.tcpConn = None
  135. self.stopReaderPollTask()
  136. def httpConnectCallback(self, ch, serverList, serverIndex,
  137. successCallback, successArgs,
  138. failureCallback, failureArgs):
  139. if ch.isConnectionReady():
  140. self.tcpConn = ch.getConnection()
  141. self.tcpConn.userManagesMemory = 1
  142. if self.recorder:
  143. # If we have a recorder, we wrap the connect inside a
  144. # SocketStreamRecorder, which will trap incoming data
  145. # when the recorder is set to record mode. (It will
  146. # also play back data when the recorder is in playback
  147. # mode, but in that case we never get this far in the
  148. # code, since we just create an empty
  149. # SocketStreamRecorder without actually connecting to
  150. # the gameserver.)
  151. stream = SocketStreamRecorder(self.tcpConn, 1)
  152. self.recorder.addRecorder('gameserver', stream)
  153. # In this case, we pass ownership of the original
  154. # connection to the SocketStreamRecorder object.
  155. self.tcpConn.userManagesMemory = 0
  156. self.tcpConn = stream
  157. self.startReaderPollTask()
  158. if successCallback:
  159. successCallback(*successArgs)
  160. elif serverIndex < len(serverList):
  161. # No connection yet, but keep trying.
  162. url = serverList[serverIndex]
  163. self.notify.info("Connecting to %s via HTTP interface." % (url.cStr()))
  164. ch.preserveStatus()
  165. ch.beginConnectTo(DocumentSpec(url))
  166. ch.spawnTask(name = 'connect-to-server',
  167. callback = self.httpConnectCallback,
  168. extraArgs = [ch, serverList, serverIndex + 1,
  169. successCallback, successArgs,
  170. failureCallback, failureArgs])
  171. else:
  172. # No more servers to try; we have to give up now.
  173. if failureCallback:
  174. failureCallback(ch.getStatusCode(), ch.getStatusString(),
  175. *failureArgs)
  176. def checkHttp(self):
  177. # Creates an HTTPClient, if possible, if we don't have one
  178. # already. This might fail if the OpenSSL library isn't
  179. # available. Returns the HTTPClient (also self.http), or None
  180. # if not set.
  181. if self.http == None:
  182. try:
  183. self.http = HTTPClient()
  184. except:
  185. pass
  186. return self.http
  187. def startReaderPollTask(self):
  188. # Stop any tasks we are running now
  189. self.stopReaderPollTask()
  190. taskMgr.add(self.readerPollUntilEmpty, "readerPollTask",
  191. priority = self.taskPriority)
  192. def stopReaderPollTask(self):
  193. taskMgr.remove("readerPollTask")
  194. def readerPollUntilEmpty(self, task):
  195. while self.readerPollOnce():
  196. pass
  197. return Task.cont
  198. def readerPollOnce(self):
  199. # we simulate the network plug being pulled by setting tcpConn
  200. # to None; enforce that condition
  201. if not self.tcpConn:
  202. return 0
  203. # Make sure any recently-sent datagrams are flushed when the
  204. # time expires, if we're in collect-tcp mode.
  205. self.tcpConn.considerFlush()
  206. if self.rsDoReport:
  207. self.reportReaderStatistics()
  208. if self.connectHttp:
  209. datagram = Datagram()
  210. if self.tcpConn.receiveDatagram(datagram):
  211. if self.rsDoReport:
  212. self.rsDatagramCount += 1
  213. self.handleDatagram(datagram)
  214. return 1
  215. # Unable to receive a datagram: did we lose the connection?
  216. if self.tcpConn.isClosed():
  217. self.tcpConn = None
  218. self.stopReaderPollTask()
  219. self.lostConnection()
  220. return 0
  221. else:
  222. self.ensureValidConnection()
  223. if self.qcr.dataAvailable():
  224. datagram = NetDatagram()
  225. if self.qcr.getData(datagram):
  226. if self.rsDoReport:
  227. self.rsDatagramCount += 1
  228. self.handleDatagram(datagram)
  229. return 1
  230. return 0
  231. def flush(self):
  232. # Ensure the latest has been sent to the server.
  233. if self.tcpConn:
  234. self.tcpConn.flush()
  235. def ensureValidConnection(self):
  236. # Was the connection reset?
  237. if self.connectHttp:
  238. pass
  239. else:
  240. if self.qcm.resetConnectionAvailable():
  241. resetConnectionPointer = PointerToConnection()
  242. if self.qcm.getResetConnection(resetConnectionPointer):
  243. resetConn = resetConnectionPointer.p()
  244. self.qcm.closeConnection(resetConn)
  245. # if we've simulated a network plug pull, restore the
  246. # simulated plug
  247. self.restoreNetworkPlug()
  248. if self.tcpConn.this == resetConn.this:
  249. self.tcpConn = None
  250. self.stopReaderPollTask()
  251. self.lostConnection()
  252. else:
  253. self.notify.warning("Lost unknown connection.")
  254. def lostConnection(self):
  255. # This should be overrided by a derived class to handle an
  256. # unexpectedly lost connection to the gameserver.
  257. self.notify.warning("Lost connection to gameserver.")
  258. def handleDatagram(self, datagram):
  259. # This class is meant to be pure virtual, and any classes that
  260. # inherit from it need to make their own handleDatagram method
  261. pass
  262. def reportReaderStatistics(self):
  263. now = globalClock.getRealTime()
  264. if now - self.rsLastUpdate < self.rsUpdateInterval:
  265. return
  266. self.rsLastUpdate = now
  267. self.notify.info("Received %s datagrams" % (self.rsDatagramCount))
  268. if self.rsUpdateObjs:
  269. self.notify.info("Updates: %s" % (self.rsUpdateObjs))
  270. self.rsDatagramCount = 0
  271. self.rsUpdateObjs = {}
  272. def send(self, datagram):
  273. #if self.notify.getDebug():
  274. # print "ConnectionRepository sending datagram:"
  275. # datagram.dumpHex(ostream)
  276. if not self.tcpConn:
  277. self.notify.warning("Unable to send message after connection is closed.")
  278. return
  279. if self.connectHttp:
  280. if not self.tcpConn.sendDatagram(datagram):
  281. self.notify.warning("Could not send datagram.")
  282. else:
  283. self.cw.send(datagram, self.tcpConn)
  284. # debugging funcs for simulating a network-plug-pull
  285. def pullNetworkPlug(self):
  286. self.restoreNetworkPlug()
  287. self.notify.warning('*** SIMULATING A NETWORK-PLUG-PULL ***')
  288. self.hijackedTcpConn = self.tcpConn
  289. self.tcpConn = None
  290. def networkPlugPulled(self):
  291. return hasattr(self, 'hijackedTcpConn')
  292. def restoreNetworkPlug(self):
  293. if self.networkPlugPulled():
  294. self.notify.info('*** RESTORING SIMULATED PULLED-NETWORK-PLUG ***')
  295. self.tcpConn = self.hijackedTcpConn
  296. del self.hijackedTcpConn
  297. def doFind(self, str):
  298. """ returns list of distributed objects with matching str in value """
  299. for value in self.doId2do.values():
  300. if `value`.find(str) >= 0:
  301. return value
  302. def doFindAll(self, str):
  303. """ returns list of distributed objects with matching str in value """
  304. matches = []
  305. for value in self.doId2do.values():
  306. if `value`.find(str) >= 0:
  307. matches.append(value)
  308. return matches