2
0

ClusterClient.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346
  1. """ClusterClient: Master for mutli-piping or PC clusters. """
  2. from PandaModules import *
  3. from ClusterMsgs import *
  4. from ClusterConfig import *
  5. import DirectNotifyGlobal
  6. import DirectObject
  7. import Task
  8. class ClusterClient(DirectObject.DirectObject):
  9. notify = DirectNotifyGlobal.directNotify.newCategory("ClusterClient")
  10. MGR_NUM = 1000000
  11. def __init__(self, configList, clusterSyncFlag):
  12. # First start up servers using direct daemon
  13. clusterDaemonClient = base.config.GetString(
  14. 'cluster-daemon-client', 'localhost')
  15. clusterDaemonPort = base.config.GetInt(
  16. 'cluster-daemon-port', CLUSTER_DAEMON_PORT)
  17. self.daemon = DirectD()
  18. print 'LISTEN'
  19. self.daemon.listenTo(clusterDaemonPort)
  20. for serverConfig in configList:
  21. serverCommand = (SERVER_STARTUP_STRING %
  22. (serverConfig.serverPort,
  23. clusterSyncFlag,
  24. clusterDaemonClient,
  25. clusterDaemonPort))
  26. print 'BOOTSTRAP', serverCommand
  27. self.daemon.clientReady(serverConfig.serverName,
  28. clusterDaemonPort,
  29. serverCommand)
  30. print 'WAITING'
  31. if not self.daemon.waitForServers(len(configList)):
  32. print 'ERROR'
  33. print 'DONE'
  34. self.qcm=QueuedConnectionManager()
  35. self.serverList = []
  36. self.msgHandler = ClusterMsgHandler(ClusterClient.MGR_NUM, self.notify)
  37. for serverConfig in configList:
  38. server = DisplayConnection(self.qcm,serverConfig.serverName,
  39. serverConfig.serverPort,self.msgHandler)
  40. if server == None:
  41. self.notify.error('Could not open %s on %s port %d' %
  42. (serverConfig.serverConfigName,
  43. serverConfig.serverName,
  44. serverConfig.serverPort))
  45. else:
  46. server.sendCamOffset(serverConfig.xyz,serverConfig.hpr)
  47. if serverConfig.fFrustum:
  48. server.sendCamFrustum(serverConfig.focalLength,
  49. serverConfig.filmSize,
  50. serverConfig.filmOffset)
  51. self.serverList.append(server)
  52. self.startMoveCamTask()
  53. self.startMoveSelectedTask()
  54. def startMoveCamTask(self):
  55. taskMgr.add(self.moveCameraTask, "moveCamTask", 49)
  56. def moveCameraTask(self,task):
  57. self.moveCamera(
  58. base.camera.getPos(render),
  59. base.camera.getHpr(render))
  60. return Task.cont
  61. def moveCamera(self, xyz, hpr):
  62. self.notify.debug('moving unsynced camera')
  63. for server in self.serverList:
  64. server.sendMoveCam(xyz,hpr)
  65. def startMoveSelectedTask(self):
  66. taskMgr.add(self.moveSelectedTask, "moveSelectedTask", 48)
  67. def moveSelectedTask(self, state):
  68. # Update cluster if current display is a cluster client
  69. if (last is not None):
  70. self.notify.debug('moving selected node path')
  71. xyz = Point3(0)
  72. hpr = VBase3(0)
  73. scale = VBase3(1)
  74. decomposeMatrix(last.getMat(), scale, hpr, xyz)
  75. for server in self.serverList:
  76. server.sendMoveSelected(xyz,hpr)
  77. return Task.cont
  78. def getNodePathFindCmd(self, nodePath):
  79. import string
  80. pathString = `nodePath`
  81. index = string.find(pathString, '/')
  82. if index != -1:
  83. rootName = pathString[:index]
  84. searchString = pathString[index+1:]
  85. return rootName + ('.find("%s")' % searchString)
  86. else:
  87. return rootName
  88. def selectNodePath(self, nodePath):
  89. self.cmd(self.getNodePathFindCmd(nodePath) + '.select()', 0)
  90. def deselectNodePath(self, nodePath):
  91. self.cmd(self.getNodePathFindCmd(nodePath) + '.deselect()', 0)
  92. def loadModel(self, nodePath):
  93. pass
  94. def cmd(self, commandString, fLocally = 1):
  95. # Execute remotely
  96. for server in self.serverList:
  97. server.sendCommandString(commandString)
  98. if fLocally:
  99. # Execute locally
  100. exec( commandString, globals() )
  101. def exit(self):
  102. # Execute remotely
  103. for server in self.serverList:
  104. server.sendExit()
  105. # Execute locally
  106. import sys
  107. sys.exit()
  108. class ClusterClientSync(ClusterClient):
  109. def __init__(self, configList, clusterSyncFlag):
  110. ClusterClient.__init__(self, configList, clusterSyncFlag)
  111. #I probably don't need this
  112. self.waitForSwap = 0
  113. self.ready = 0
  114. self.startSwapCoordinatorTask()
  115. def startSwapCoordinatorTask(self):
  116. taskMgr.add(self.swapCoordinator, "clientSwapCoordinator", 51)
  117. return None
  118. def swapCoordinator(self,task):
  119. self.ready = 1
  120. if self.waitForSwap:
  121. self.waitForSwap=0
  122. self.notify.debug(
  123. "START get swaps----------------------------------")
  124. for server in self.serverList:
  125. server.getSwapReady()
  126. self.notify.debug(
  127. "----------------START swap now--------------------")
  128. for server in self.serverList:
  129. server.sendSwapNow()
  130. self.notify.debug(
  131. "------------------------------START swap----------")
  132. base.win.swap()
  133. self.notify.debug(
  134. "------------------------------------------END swap")
  135. return Task.cont
  136. def moveCamera(self,xyz,hpr):
  137. if self.ready:
  138. self.notify.debug('moving synced camera')
  139. ClusterClient.moveCamera(self,xyz,hpr)
  140. self.waitForSwap=1
  141. class DisplayConnection:
  142. def __init__(self,qcm,serverName,port,msgHandler):
  143. self.msgHandler = msgHandler
  144. gameServerTimeoutMs = base.config.GetInt(
  145. "game-server-timeout-ms", 20000)
  146. # A big old 20 second timeout.
  147. self.tcpConn = qcm.openTCPClientConnection(
  148. serverName, port, gameServerTimeoutMs)
  149. # Test for bad connection
  150. if self.tcpConn == None:
  151. return None
  152. else:
  153. self.tcpConn.setNoDelay(1)
  154. self.qcr=QueuedConnectionReader(qcm, 0)
  155. self.qcr.addConnection(self.tcpConn)
  156. self.cw=ConnectionWriter(qcm, 0)
  157. def sendCamOffset(self,xyz,hpr):
  158. ClusterClient.notify.debug("send cam offset...")
  159. ClusterClient.notify.debug( ("packet %d xyz,hpr=%f %f %f %f %f %f" %
  160. (self.msgHandler.packetNumber,xyz[0],xyz[1],xyz[2],
  161. hpr[0],hpr[1],hpr[2])) )
  162. datagram = self.msgHandler.makeCamOffsetDatagram(xyz, hpr)
  163. self.cw.send(datagram, self.tcpConn)
  164. def sendCamFrustum(self,focalLength, filmSize, filmOffset):
  165. ClusterClient.notify.info("send cam frustum...")
  166. ClusterClient.notify.info(
  167. (("packet %d" % self.msgHandler.packetNumber) +
  168. (" fl, fs, fo=%0.3f, (%0.3f, %0.3f), (%0.3f, %0.3f)" %
  169. (focalLength, filmSize[0], filmSize[1],
  170. filmOffset[0], filmOffset[1])))
  171. )
  172. datagram = self.msgHandler.makeCamFrustumDatagram(
  173. focalLength, filmSize, filmOffset)
  174. self.cw.send(datagram, self.tcpConn)
  175. def sendMoveCam(self,xyz,hpr):
  176. ClusterClient.notify.debug("send cam move...")
  177. ClusterClient.notify.debug( ("packet %d xyz,hpr=%f %f %f %f %f %f" %
  178. (self.msgHandler.packetNumber,xyz[0],xyz[1],xyz[2],
  179. hpr[0],hpr[1],hpr[2])) )
  180. datagram = self.msgHandler.makeCamMovementDatagram(xyz, hpr)
  181. self.cw.send(datagram, self.tcpConn)
  182. def sendMoveSelected(self,xyz,hpr):
  183. ClusterClient.notify.debug("send move selected...")
  184. ClusterClient.notify.debug("packet %d xyz,hpr=%f %f %f %f %f %f" %
  185. (self.msgHandler.packetNumber,
  186. xyz[0],xyz[1],xyz[2],
  187. hpr[0],hpr[1],hpr[2]))
  188. datagram = self.msgHandler.makeSelectedMovementDatagram(xyz, hpr)
  189. self.cw.send(datagram, self.tcpConn)
  190. # the following should only be called by a synchronized cluster manger
  191. def getSwapReady(self):
  192. while 1:
  193. (datagram, dgi, type) = self.msgHandler.blockingRead(self.qcr)
  194. if type == CLUSTER_SWAP_READY:
  195. break
  196. else:
  197. self.notify.warning('was expecting SWAP_READY, got %d' % type)
  198. # the following should only be called by a synchronized cluster manger
  199. def sendSwapNow(self):
  200. ClusterClient.notify.debug(
  201. "display connect send swap now, packet %d" %
  202. self.msgHandler.packetNumber)
  203. datagram = self.msgHandler.makeSwapNowDatagram()
  204. self.cw.send(datagram, self.tcpConn)
  205. def sendCommandString(self, commandString):
  206. ClusterClient.notify.debug("send command string: %s" % commandString)
  207. datagram = self.msgHandler.makeCommandStringDatagram(commandString)
  208. self.cw.send(datagram, self.tcpConn)
  209. def sendExit(self):
  210. ClusterClient.notify.debug(
  211. "display connect send exit, packet %d" %
  212. self.msgHandler.packetNumber)
  213. datagram = self.msgHandler.makeExitDatagram()
  214. self.cw.send(datagram, self.tcpConn)
  215. class ClusterConfigItem:
  216. def __init__(self, serverConfigName, serverName, serverPort):
  217. self.serverConfigName = serverConfigName
  218. self.serverName = serverName
  219. self.serverPort = serverPort
  220. # Camera Offset
  221. self.xyz = Vec3(0)
  222. self.hpr = Vec3(0)
  223. # Camera Frustum Data
  224. self.fFrustum = 0
  225. self.focalLength = None
  226. self.filmSize = None
  227. self.filmOffset = None
  228. def setCamOffset(self, xyz, hpr):
  229. self.xyz = xyz
  230. self.hpr = hpr
  231. def setCamFrustum(self, focalLength, filmSize, filmOffset):
  232. self.fFrustum = 1
  233. self.focalLength = focalLength
  234. self.filmSize = filmSize
  235. self.filmOffset = filmOffset
  236. def createClusterClient():
  237. # setup camera offsets based on cluster-config
  238. clusterConfig = base.config.GetString('cluster-config', 'single-server')
  239. # No cluster config specified!
  240. if not ClientConfigs.has_key(clusterConfig):
  241. base.notify.warning(
  242. 'createClusterClient: %s cluster-config is undefined.' %
  243. clusterConfig)
  244. return None
  245. # Get display config for each server in the cluster
  246. displayConfigs = []
  247. configList = ClientConfigs[clusterConfig]
  248. numConfigs = len(configList)
  249. for i in range(numConfigs):
  250. configData = configList[i]
  251. displayName = configData.get('display name', ('display%d' % i))
  252. displayMode = configData.get('display mode', 'server')
  253. # Init Cam Offset
  254. pos = configData.get('pos', Vec3(0))
  255. hpr = configData.get('hpr', Vec3(0))
  256. # Init Frustum if specified
  257. fl = configData.get('focal length', None)
  258. fs = configData.get('film size', None)
  259. fo = configData.get('film offset', None)
  260. if displayMode == 'client':
  261. lens = base.cam.node().getLens()
  262. lens.setViewHpr(hpr)
  263. lens.setIodOffset(pos[0])
  264. if fl is not None:
  265. lens.setFocalLength(fl)
  266. if fs is not None:
  267. lens.setFilmSize(fs[0], fs[1])
  268. if fo is not None:
  269. lens.setFilmOffset(fo[0], fo[1])
  270. else:
  271. serverConfigName = 'cluster-server-%s' % displayName
  272. serverName = base.config.GetString(serverConfigName, '')
  273. if serverName == '':
  274. base.notify.warning(
  275. '%s undefined in Configrc: expected by %s display client.'%
  276. (serverConfigName,clusterConfig))
  277. base.notify.warning('%s will not be used.' % serverConfigName)
  278. else:
  279. # Server port
  280. serverPortConfigName = 'cluster-server-port-%s' % displayName
  281. serverPort = base.config.GetInt(serverPortConfigName,
  282. CLUSTER_SERVER_PORT)
  283. cci = ClusterConfigItem(
  284. serverConfigName,
  285. serverName,
  286. serverPort)
  287. # Init cam offset
  288. cci.setCamOffset(pos, hpr)
  289. # Init frustum if specified
  290. if fl and fs and fo:
  291. cci.setCamFrustum(fl, fs, fo)
  292. displayConfigs.append(cci)
  293. # Create Cluster Managers (opening connections to servers)
  294. # Are the servers going to be synced?
  295. clusterSyncFlag = base.config.GetBool('cluster-sync', 0)
  296. if clusterSyncFlag:
  297. base.win.setSync(1)
  298. return ClusterClientSync(displayConfigs, clusterSyncFlag)
  299. else:
  300. return ClusterClient(displayConfigs, clusterSyncFlag)
  301. class DummyClusterClient(DirectObject.DirectObject):
  302. """ Dummy class to handle command strings when not in cluster mode """
  303. notify = DirectNotifyGlobal.directNotify.newCategory("ClusterClient")
  304. def __init__(self):
  305. pass
  306. def cmd(self, commandString, fLocally = 1):
  307. if fLocally:
  308. # Execute locally
  309. exec( commandString, globals() )