ClusterClient.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362
  1. """ClusterClient: Master for mutli-piping or PC clusters. """
  2. from PandaModules import *
  3. from ClusterMsgs import *
  4. from ClusterConfig import *
  5. import DirectNotifyGlobal
  6. import DirectObject
  7. import Task
  8. import os
  9. class ClusterClient(DirectObject.DirectObject):
  10. notify = DirectNotifyGlobal.directNotify.newCategory("ClusterClient")
  11. MGR_NUM = 1000000
  12. def __init__(self, configList, clusterSyncFlag):
  13. # First start up servers using direct daemon
  14. # What is the name of the client machine?
  15. clusterDaemonClient = os.popen('uname -n').read()
  16. clusterDaemonClient = clusterDaemonClient.replace('\n', '')
  17. # What daemon port are we using to communicate between client/servers
  18. clusterDaemonPort = base.config.GetInt(
  19. 'cluster-daemon-port', CLUSTER_DAEMON_PORT)
  20. # Create a daemon
  21. self.daemon = DirectD()
  22. # Start listening for the response
  23. self.daemon.listenTo(clusterDaemonPort)
  24. # Contact server daemons and start up remote server application
  25. for serverConfig in configList:
  26. # First kill existing application
  27. self.daemon.tellServer(serverConfig.serverName,
  28. clusterDaemonPort,
  29. 'ka')
  30. # Now start up new application
  31. serverCommand = (SERVER_STARTUP_STRING %
  32. (serverConfig.serverPort,
  33. clusterSyncFlag,
  34. clusterDaemonClient,
  35. clusterDaemonPort))
  36. self.daemon.tellServer(serverConfig.serverName,
  37. clusterDaemonPort,
  38. serverCommand)
  39. print 'Begin waitForServers'
  40. if not self.daemon.waitForServers(len(configList)):
  41. print 'Cluster Client, no response from servers'
  42. print 'End waitForServers'
  43. self.qcm=QueuedConnectionManager()
  44. self.serverList = []
  45. self.msgHandler = ClusterMsgHandler(ClusterClient.MGR_NUM, self.notify)
  46. for serverConfig in configList:
  47. server = DisplayConnection(self.qcm,serverConfig.serverName,
  48. serverConfig.serverPort,self.msgHandler)
  49. if server == None:
  50. self.notify.error('Could not open %s on %s port %d' %
  51. (serverConfig.serverConfigName,
  52. serverConfig.serverName,
  53. serverConfig.serverPort))
  54. else:
  55. server.sendCamOffset(serverConfig.xyz,serverConfig.hpr)
  56. if serverConfig.fFrustum:
  57. server.sendCamFrustum(serverConfig.focalLength,
  58. serverConfig.filmSize,
  59. serverConfig.filmOffset)
  60. self.serverList.append(server)
  61. self.startMoveCamTask()
  62. self.startMoveSelectedTask()
  63. def startMoveCamTask(self):
  64. taskMgr.add(self.moveCameraTask, "moveCamTask", 49)
  65. def moveCameraTask(self,task):
  66. self.moveCamera(
  67. base.camera.getPos(render),
  68. base.camera.getHpr(render))
  69. return Task.cont
  70. def moveCamera(self, xyz, hpr):
  71. self.notify.debug('moving unsynced camera')
  72. for server in self.serverList:
  73. server.sendMoveCam(xyz,hpr)
  74. def startMoveSelectedTask(self):
  75. taskMgr.add(self.moveSelectedTask, "moveSelectedTask", 48)
  76. def moveSelectedTask(self, state):
  77. # Update cluster if current display is a cluster client
  78. if (last is not None):
  79. self.notify.debug('moving selected node path')
  80. xyz = Point3(0)
  81. hpr = VBase3(0)
  82. scale = VBase3(1)
  83. decomposeMatrix(last.getMat(), scale, hpr, xyz)
  84. for server in self.serverList:
  85. server.sendMoveSelected(xyz,hpr,scale)
  86. return Task.cont
  87. def getNodePathFindCmd(self, nodePath):
  88. import string
  89. pathString = `nodePath`
  90. index = string.find(pathString, '/')
  91. if index != -1:
  92. rootName = pathString[:index]
  93. searchString = pathString[index+1:]
  94. return rootName + ('.find("%s")' % searchString)
  95. else:
  96. return rootName
  97. def selectNodePath(self, nodePath):
  98. self(self.getNodePathFindCmd(nodePath) + '.select()', 0)
  99. def deselectNodePath(self, nodePath):
  100. self(self.getNodePathFindCmd(nodePath) + '.deselect()', 0)
  101. def loadModel(self, nodePath):
  102. pass
  103. def __call__(self, commandString, fLocally = 1, serverList = []):
  104. # Execute remotely
  105. if serverList:
  106. # Passed in list of servers
  107. for serverNum in serverList:
  108. self.serverList[serverNum].sendCommandString(commandString)
  109. else:
  110. # All servers
  111. for server in self.serverList:
  112. server.sendCommandString(commandString)
  113. if fLocally:
  114. # Execute locally
  115. exec( commandString, __builtins__ )
  116. def exit(self):
  117. # Execute remotely
  118. for server in self.serverList:
  119. server.sendExit()
  120. # Execute locally
  121. import sys
  122. sys.exit()
  123. class ClusterClientSync(ClusterClient):
  124. def __init__(self, configList, clusterSyncFlag):
  125. ClusterClient.__init__(self, configList, clusterSyncFlag)
  126. #I probably don't need this
  127. self.waitForSwap = 0
  128. self.ready = 0
  129. self.startSwapCoordinatorTask()
  130. def startSwapCoordinatorTask(self):
  131. taskMgr.add(self.swapCoordinator, "clientSwapCoordinator", 51)
  132. return None
  133. def swapCoordinator(self,task):
  134. self.ready = 1
  135. if self.waitForSwap:
  136. self.waitForSwap=0
  137. self.notify.debug(
  138. "START get swaps----------------------------------")
  139. for server in self.serverList:
  140. server.getSwapReady()
  141. self.notify.debug(
  142. "----------------START swap now--------------------")
  143. for server in self.serverList:
  144. server.sendSwapNow()
  145. self.notify.debug(
  146. "------------------------------START swap----------")
  147. base.graphicsEngine.flipFrame()
  148. self.notify.debug(
  149. "------------------------------------------END swap")
  150. return Task.cont
  151. def moveCamera(self,xyz,hpr):
  152. if self.ready:
  153. self.notify.debug('moving synced camera')
  154. ClusterClient.moveCamera(self,xyz,hpr)
  155. self.waitForSwap=1
  156. class DisplayConnection:
  157. def __init__(self,qcm,serverName,port,msgHandler):
  158. self.msgHandler = msgHandler
  159. gameServerTimeoutMs = base.config.GetInt(
  160. "game-server-timeout-ms", 20000)
  161. # A big old 20 second timeout.
  162. self.tcpConn = qcm.openTCPClientConnection(
  163. serverName, port, gameServerTimeoutMs)
  164. # Test for bad connection
  165. if self.tcpConn == None:
  166. return None
  167. else:
  168. self.tcpConn.setNoDelay(1)
  169. self.qcr=QueuedConnectionReader(qcm, 0)
  170. self.qcr.addConnection(self.tcpConn)
  171. self.cw=ConnectionWriter(qcm, 0)
  172. def sendCamOffset(self,xyz,hpr):
  173. ClusterClient.notify.debug("send cam offset...")
  174. ClusterClient.notify.debug( ("packet %d xyz,hpr=%f %f %f %f %f %f" %
  175. (self.msgHandler.packetNumber,xyz[0],xyz[1],xyz[2],
  176. hpr[0],hpr[1],hpr[2])) )
  177. datagram = self.msgHandler.makeCamOffsetDatagram(xyz, hpr)
  178. self.cw.send(datagram, self.tcpConn)
  179. def sendCamFrustum(self,focalLength, filmSize, filmOffset):
  180. ClusterClient.notify.info("send cam frustum...")
  181. ClusterClient.notify.info(
  182. (("packet %d" % self.msgHandler.packetNumber) +
  183. (" fl, fs, fo=%0.3f, (%0.3f, %0.3f), (%0.3f, %0.3f)" %
  184. (focalLength, filmSize[0], filmSize[1],
  185. filmOffset[0], filmOffset[1])))
  186. )
  187. datagram = self.msgHandler.makeCamFrustumDatagram(
  188. focalLength, filmSize, filmOffset)
  189. self.cw.send(datagram, self.tcpConn)
  190. def sendMoveCam(self,xyz,hpr):
  191. ClusterClient.notify.debug("send cam move...")
  192. ClusterClient.notify.debug( ("packet %d xyz,hpr=%f %f %f %f %f %f" %
  193. (self.msgHandler.packetNumber,xyz[0],xyz[1],xyz[2],
  194. hpr[0],hpr[1],hpr[2])) )
  195. datagram = self.msgHandler.makeCamMovementDatagram(xyz, hpr)
  196. self.cw.send(datagram, self.tcpConn)
  197. def sendMoveSelected(self,xyz,hpr,scale):
  198. ClusterClient.notify.debug("send move selected...")
  199. ClusterClient.notify.debug(
  200. "packet %d xyz,hpr=%f %f %f %f %f %f %f %f %f" %
  201. (self.msgHandler.packetNumber,
  202. xyz[0],xyz[1],xyz[2],
  203. hpr[0],hpr[1],hpr[2],
  204. scale[0],scale[1],scale[2]))
  205. datagram = self.msgHandler.makeSelectedMovementDatagram(xyz, hpr,scale)
  206. self.cw.send(datagram, self.tcpConn)
  207. # the following should only be called by a synchronized cluster manger
  208. def getSwapReady(self):
  209. while 1:
  210. (datagram, dgi, type) = self.msgHandler.blockingRead(self.qcr)
  211. if type == CLUSTER_SWAP_READY:
  212. break
  213. else:
  214. self.notify.warning('was expecting SWAP_READY, got %d' % type)
  215. # the following should only be called by a synchronized cluster manger
  216. def sendSwapNow(self):
  217. ClusterClient.notify.debug(
  218. "display connect send swap now, packet %d" %
  219. self.msgHandler.packetNumber)
  220. datagram = self.msgHandler.makeSwapNowDatagram()
  221. self.cw.send(datagram, self.tcpConn)
  222. def sendCommandString(self, commandString):
  223. ClusterClient.notify.debug("send command string: %s" % commandString)
  224. datagram = self.msgHandler.makeCommandStringDatagram(commandString)
  225. self.cw.send(datagram, self.tcpConn)
  226. def sendExit(self):
  227. ClusterClient.notify.debug(
  228. "display connect send exit, packet %d" %
  229. self.msgHandler.packetNumber)
  230. datagram = self.msgHandler.makeExitDatagram()
  231. self.cw.send(datagram, self.tcpConn)
  232. class ClusterConfigItem:
  233. def __init__(self, serverConfigName, serverName, serverPort):
  234. self.serverConfigName = serverConfigName
  235. self.serverName = serverName
  236. self.serverPort = serverPort
  237. # Camera Offset
  238. self.xyz = Vec3(0)
  239. self.hpr = Vec3(0)
  240. # Camera Frustum Data
  241. self.fFrustum = 0
  242. self.focalLength = None
  243. self.filmSize = None
  244. self.filmOffset = None
  245. def setCamOffset(self, xyz, hpr):
  246. self.xyz = xyz
  247. self.hpr = hpr
  248. def setCamFrustum(self, focalLength, filmSize, filmOffset):
  249. self.fFrustum = 1
  250. self.focalLength = focalLength
  251. self.filmSize = filmSize
  252. self.filmOffset = filmOffset
  253. def createClusterClient():
  254. # setup camera offsets based on cluster-config
  255. clusterConfig = base.config.GetString('cluster-config', 'single-server')
  256. # No cluster config specified!
  257. if not ClientConfigs.has_key(clusterConfig):
  258. base.notify.warning(
  259. 'createClusterClient: %s cluster-config is undefined.' %
  260. clusterConfig)
  261. return None
  262. # Get display config for each server in the cluster
  263. displayConfigs = []
  264. configList = ClientConfigs[clusterConfig]
  265. numConfigs = len(configList)
  266. for i in range(numConfigs):
  267. configData = configList[i]
  268. displayName = configData.get('display name', ('display%d' % i))
  269. displayMode = configData.get('display mode', 'server')
  270. # Init Cam Offset
  271. pos = configData.get('pos', Vec3(0))
  272. hpr = configData.get('hpr', Vec3(0))
  273. # Init Frustum if specified
  274. fl = configData.get('focal length', None)
  275. fs = configData.get('film size', None)
  276. fo = configData.get('film offset', None)
  277. if displayMode == 'client':
  278. lens = base.cam.node().getLens()
  279. lens.setViewHpr(hpr)
  280. lens.setIodOffset(pos[0])
  281. if fl is not None:
  282. lens.setFocalLength(fl)
  283. if fs is not None:
  284. lens.setFilmSize(fs[0], fs[1])
  285. if fo is not None:
  286. lens.setFilmOffset(fo[0], fo[1])
  287. else:
  288. serverConfigName = 'cluster-server-%s' % displayName
  289. serverName = base.config.GetString(serverConfigName, '')
  290. if serverName == '':
  291. base.notify.warning(
  292. '%s undefined in Configrc: expected by %s display client.'%
  293. (serverConfigName,clusterConfig))
  294. base.notify.warning('%s will not be used.' % serverConfigName)
  295. else:
  296. # Server port
  297. serverPortConfigName = 'cluster-server-port-%s' % displayName
  298. serverPort = base.config.GetInt(serverPortConfigName,
  299. CLUSTER_SERVER_PORT)
  300. cci = ClusterConfigItem(
  301. serverConfigName,
  302. serverName,
  303. serverPort)
  304. # Init cam offset
  305. cci.setCamOffset(pos, hpr)
  306. # Init frustum if specified
  307. if fl and fs and fo:
  308. cci.setCamFrustum(fl, fs, fo)
  309. displayConfigs.append(cci)
  310. # Create Cluster Managers (opening connections to servers)
  311. # Are the servers going to be synced?
  312. if base.clusterSyncFlag:
  313. base.graphicsEngine.setAutoFlip(0)
  314. return ClusterClientSync(displayConfigs, base.clusterSyncFlag)
  315. else:
  316. return ClusterClient(displayConfigs, base.clusterSyncFlag)
  317. class DummyClusterClient(DirectObject.DirectObject):
  318. """ Dummy class to handle command strings when not in cluster mode """
  319. notify = DirectNotifyGlobal.directNotify.newCategory("DummyClusterClient")
  320. def __init__(self):
  321. pass
  322. def __call__(self, commandString, fLocally = 1, serverList = None):
  323. if fLocally:
  324. # Execute locally
  325. exec( commandString, __builtins__ )