ClusterClient.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403
  1. """ClusterClient: Master for mutli-piping or PC clusters. """
  2. from pandac.PandaModules import *
  3. from ClusterMsgs import *
  4. from ClusterConfig import *
  5. from direct.directnotify import DirectNotifyGlobal
  6. from direct.showbase import DirectObject
  7. from direct.task import Task
  8. import os
  9. class ClusterClient(DirectObject.DirectObject):
  10. notify = DirectNotifyGlobal.directNotify.newCategory("ClusterClient")
  11. MGR_NUM = 1000000
  12. def __init__(self, configList, clusterSyncFlag):
  13. # Set name so cluster __call__ function can be used in Intervals
  14. self.__name__ = 'cluster'
  15. # First start up servers using direct daemon
  16. # What is the name of the client machine?
  17. clusterDaemonClient = base.config.GetString(
  18. 'cluster-daemon-client', 'None')
  19. if clusterDaemonClient == 'None':
  20. clusterDaemonClient = os.popen('uname -n').read()
  21. clusterDaemonClient = clusterDaemonClient.replace('\n', '')
  22. # What daemon port are we using to communicate between client/servers
  23. clusterDaemonPort = base.config.GetInt(
  24. 'cluster-daemon-port', CLUSTER_DAEMON_PORT)
  25. # Create a daemon
  26. self.daemon = DirectD()
  27. # Start listening for the response
  28. self.daemon.listenTo(clusterDaemonPort)
  29. # Contact server daemons and start up remote server application
  30. for serverConfig in configList:
  31. # First kill existing application
  32. self.daemon.tellServer(serverConfig.serverName,
  33. clusterDaemonPort,
  34. 'ka')
  35. # Now start up new application
  36. serverCommand = (SERVER_STARTUP_STRING %
  37. (serverConfig.serverPort,
  38. clusterSyncFlag,
  39. clusterDaemonClient,
  40. clusterDaemonPort))
  41. self.daemon.tellServer(serverConfig.serverName,
  42. clusterDaemonPort,
  43. serverCommand)
  44. print 'Begin waitForServers'
  45. if not self.daemon.waitForServers(len(configList)):
  46. print 'Cluster Client, no response from servers'
  47. print 'End waitForServers'
  48. self.qcm=QueuedConnectionManager()
  49. self.serverList = []
  50. self.msgHandler = ClusterMsgHandler(ClusterClient.MGR_NUM, self.notify)
  51. for serverConfig in configList:
  52. server = DisplayConnection(self.qcm,serverConfig.serverName,
  53. serverConfig.serverPort,self.msgHandler)
  54. if server == None:
  55. self.notify.error('Could not open %s on %s port %d' %
  56. (serverConfig.serverConfigName,
  57. serverConfig.serverName,
  58. serverConfig.serverPort))
  59. else:
  60. self.notify.debug('send cam pos')
  61. #server.sendMoveCam(Point3(0), Vec3(0))
  62. self.notify.debug('send cam offset')
  63. server.sendCamOffset(serverConfig.xyz,serverConfig.hpr)
  64. if serverConfig.fFrustum:
  65. self.notify.debug('send cam frustum')
  66. server.sendCamFrustum(serverConfig.focalLength,
  67. serverConfig.filmSize,
  68. serverConfig.filmOffset)
  69. self.serverList.append(server)
  70. self.notify.debug('pre startTimeTask')
  71. self.startSynchronizeTimeTask()
  72. self.notify.debug('pre startMoveCam')
  73. self.startMoveCamTask()
  74. self.notify.debug('post startMoveCam')
  75. self.startMoveSelectedTask()
  76. def startSynchronizeTimeTask(self):
  77. self.notify.debug('broadcasting frame time')
  78. taskMgr.add(self.synchronizeTimeTask, "synchronizeTimeTask", -40)
  79. def synchronizeTimeTask(self,task):
  80. frameTime = globalClock.getFrameTime()
  81. dt = globalClock.getDt()
  82. for server in self.serverList:
  83. server.sendTimeData(frameTime, dt)
  84. return Task.cont
  85. def startMoveCamTask(self):
  86. self.notify.debug('adding move cam')
  87. taskMgr.add(self.moveCameraTask, "moveCamTask", 49)
  88. def moveCameraTask(self,task):
  89. self.moveCamera(
  90. base.camera.getPos(render),
  91. base.camera.getHpr(render))
  92. return Task.cont
  93. def moveCamera(self, xyz, hpr):
  94. self.notify.debug('moving unsynced camera')
  95. for server in self.serverList:
  96. server.sendMoveCam(xyz,hpr)
  97. def startMoveSelectedTask(self):
  98. taskMgr.add(self.moveSelectedTask, "moveSelectedTask", 48)
  99. def moveSelectedTask(self, state):
  100. # Update cluster if current display is a cluster client
  101. if (last is not None):
  102. self.notify.debug('moving selected node path')
  103. xyz = Point3(0)
  104. hpr = VBase3(0)
  105. scale = VBase3(1)
  106. decomposeMatrix(last.getMat(), scale, hpr, xyz)
  107. for server in self.serverList:
  108. server.sendMoveSelected(xyz,hpr,scale)
  109. return Task.cont
  110. def getNodePathFindCmd(self, nodePath):
  111. import string
  112. pathString = `nodePath`
  113. index = string.find(pathString, '/')
  114. if index != -1:
  115. rootName = pathString[:index]
  116. searchString = pathString[index+1:]
  117. return rootName + ('.find("%s")' % searchString)
  118. else:
  119. return rootName
  120. def selectNodePath(self, nodePath):
  121. self(self.getNodePathFindCmd(nodePath) + '.select()', 0)
  122. def deselectNodePath(self, nodePath):
  123. self(self.getNodePathFindCmd(nodePath) + '.deselect()', 0)
  124. def sendCamFrustum(self, focalLength, filmSize, filmOffset, indexList=[]):
  125. if indexList:
  126. serverList = map(lambda i: self.serverList[i], indexList)
  127. else:
  128. serverList = self.serverList
  129. for server in serverList:
  130. self.notify.debug('updating camera frustum')
  131. server.sendCamFrustum(focalLength, filmSize, filmOffset)
  132. def loadModel(self, nodePath):
  133. pass
  134. def __call__(self, commandString, fLocally = 1, serverList = []):
  135. # Execute remotely
  136. if serverList:
  137. # Passed in list of servers
  138. for serverNum in serverList:
  139. self.serverList[serverNum].sendCommandString(commandString)
  140. else:
  141. # All servers
  142. for server in self.serverList:
  143. server.sendCommandString(commandString)
  144. if fLocally:
  145. # Execute locally
  146. exec( commandString, __builtins__ )
  147. def exit(self):
  148. # Execute remotely
  149. for server in self.serverList:
  150. server.sendExit()
  151. # Execute locally
  152. import sys
  153. sys.exit()
  154. class ClusterClientSync(ClusterClient):
  155. def __init__(self, configList, clusterSyncFlag):
  156. ClusterClient.__init__(self, configList, clusterSyncFlag)
  157. #I probably don't need this
  158. self.waitForSwap = 0
  159. self.ready = 0
  160. self.startSwapCoordinatorTask()
  161. def startSwapCoordinatorTask(self):
  162. taskMgr.add(self.swapCoordinator, "clientSwapCoordinator", 51)
  163. return None
  164. def swapCoordinator(self,task):
  165. self.ready = 1
  166. if self.waitForSwap:
  167. self.waitForSwap=0
  168. self.notify.debug(
  169. "START get swaps----------------------------------")
  170. for server in self.serverList:
  171. server.getSwapReady()
  172. self.notify.debug(
  173. "----------------START swap now--------------------")
  174. for server in self.serverList:
  175. server.sendSwapNow()
  176. self.notify.debug(
  177. "------------------------------START swap----------")
  178. base.graphicsEngine.flipFrame()
  179. self.notify.debug(
  180. "------------------------------------------END swap")
  181. return Task.cont
  182. def moveCamera(self,xyz,hpr):
  183. if self.ready:
  184. self.notify.debug('moving synced camera')
  185. ClusterClient.moveCamera(self,xyz,hpr)
  186. self.waitForSwap=1
  187. class DisplayConnection:
  188. def __init__(self,qcm,serverName,port,msgHandler):
  189. self.msgHandler = msgHandler
  190. gameServerTimeoutMs = base.config.GetInt(
  191. "cluster-server-timeout-ms", 300000)
  192. # A giant 300 second timeout.
  193. self.tcpConn = qcm.openTCPClientConnection(
  194. serverName, port, gameServerTimeoutMs)
  195. # Test for bad connection
  196. if self.tcpConn == None:
  197. return None
  198. else:
  199. self.tcpConn.setNoDelay(1)
  200. self.qcr=QueuedConnectionReader(qcm, 0)
  201. self.qcr.addConnection(self.tcpConn)
  202. self.cw=ConnectionWriter(qcm, 0)
  203. def sendCamOffset(self,xyz,hpr):
  204. ClusterClient.notify.debug("send cam offset...")
  205. ClusterClient.notify.debug( ("packet %d xyz,hpr=%f %f %f %f %f %f" %
  206. (self.msgHandler.packetNumber,xyz[0],xyz[1],xyz[2],
  207. hpr[0],hpr[1],hpr[2])) )
  208. datagram = self.msgHandler.makeCamOffsetDatagram(xyz, hpr)
  209. self.cw.send(datagram, self.tcpConn)
  210. def sendCamFrustum(self,focalLength, filmSize, filmOffset):
  211. ClusterClient.notify.info("send cam frustum...")
  212. ClusterClient.notify.info(
  213. (("packet %d" % self.msgHandler.packetNumber) +
  214. (" fl, fs, fo=%0.3f, (%0.3f, %0.3f), (%0.3f, %0.3f)" %
  215. (focalLength, filmSize[0], filmSize[1],
  216. filmOffset[0], filmOffset[1])))
  217. )
  218. datagram = self.msgHandler.makeCamFrustumDatagram(
  219. focalLength, filmSize, filmOffset)
  220. self.cw.send(datagram, self.tcpConn)
  221. def sendMoveCam(self,xyz,hpr):
  222. ClusterClient.notify.debug("send cam move...")
  223. ClusterClient.notify.debug( ("packet %d xyz,hpr=%f %f %f %f %f %f" %
  224. (self.msgHandler.packetNumber,xyz[0],xyz[1],xyz[2],
  225. hpr[0],hpr[1],hpr[2])) )
  226. datagram = self.msgHandler.makeCamMovementDatagram(xyz, hpr)
  227. self.cw.send(datagram, self.tcpConn)
  228. def sendMoveSelected(self,xyz,hpr,scale):
  229. ClusterClient.notify.debug("send move selected...")
  230. ClusterClient.notify.debug(
  231. "packet %d xyz,hpr=%f %f %f %f %f %f %f %f %f" %
  232. (self.msgHandler.packetNumber,
  233. xyz[0],xyz[1],xyz[2],
  234. hpr[0],hpr[1],hpr[2],
  235. scale[0],scale[1],scale[2]))
  236. datagram = self.msgHandler.makeSelectedMovementDatagram(xyz, hpr,scale)
  237. self.cw.send(datagram, self.tcpConn)
  238. # the following should only be called by a synchronized cluster manger
  239. def getSwapReady(self):
  240. while 1:
  241. (datagram, dgi, type) = self.msgHandler.blockingRead(self.qcr)
  242. if type == CLUSTER_SWAP_READY:
  243. break
  244. else:
  245. self.notify.warning('was expecting SWAP_READY, got %d' % type)
  246. # the following should only be called by a synchronized cluster manger
  247. def sendSwapNow(self):
  248. ClusterClient.notify.debug(
  249. "display connect send swap now, packet %d" %
  250. self.msgHandler.packetNumber)
  251. datagram = self.msgHandler.makeSwapNowDatagram()
  252. self.cw.send(datagram, self.tcpConn)
  253. def sendCommandString(self, commandString):
  254. ClusterClient.notify.debug("send command string: %s" % commandString)
  255. datagram = self.msgHandler.makeCommandStringDatagram(commandString)
  256. self.cw.send(datagram, self.tcpConn)
  257. def sendExit(self):
  258. ClusterClient.notify.debug(
  259. "display connect send exit, packet %d" %
  260. self.msgHandler.packetNumber)
  261. datagram = self.msgHandler.makeExitDatagram()
  262. self.cw.send(datagram, self.tcpConn)
  263. def sendTimeData(self,frameTime, dt):
  264. ClusterClient.notify.debug("send time data...")
  265. datagram = self.msgHandler.makeTimeDataDatagram(frameTime, dt)
  266. self.cw.send(datagram, self.tcpConn)
  267. class ClusterConfigItem:
  268. def __init__(self, serverConfigName, serverName, serverPort):
  269. self.serverConfigName = serverConfigName
  270. self.serverName = serverName
  271. self.serverPort = serverPort
  272. # Camera Offset
  273. self.xyz = Vec3(0)
  274. self.hpr = Vec3(0)
  275. # Camera Frustum Data
  276. self.fFrustum = 0
  277. self.focalLength = None
  278. self.filmSize = None
  279. self.filmOffset = None
  280. def setCamOffset(self, xyz, hpr):
  281. self.xyz = xyz
  282. self.hpr = hpr
  283. def setCamFrustum(self, focalLength, filmSize, filmOffset):
  284. self.fFrustum = 1
  285. self.focalLength = focalLength
  286. self.filmSize = filmSize
  287. self.filmOffset = filmOffset
  288. def createClusterClient():
  289. # setup camera offsets based on cluster-config
  290. clusterConfig = base.config.GetString('cluster-config', 'single-server')
  291. # No cluster config specified!
  292. if not ClientConfigs.has_key(clusterConfig):
  293. base.notify.warning(
  294. 'createClusterClient: %s cluster-config is undefined.' %
  295. clusterConfig)
  296. return None
  297. # Get display config for each server in the cluster
  298. displayConfigs = []
  299. configList = ClientConfigs[clusterConfig]
  300. numConfigs = len(configList)
  301. for i in range(numConfigs):
  302. configData = configList[i]
  303. displayName = configData.get('display name', ('display%d' % i))
  304. displayMode = configData.get('display mode', 'server')
  305. # Init Cam Offset
  306. pos = configData.get('pos', Vec3(0))
  307. hpr = configData.get('hpr', Vec3(0))
  308. # Init Frustum if specified
  309. fl = configData.get('focal length', None)
  310. fs = configData.get('film size', None)
  311. fo = configData.get('film offset', None)
  312. if displayMode == 'client':
  313. lens = base.cam.node().getLens()
  314. lens.setViewHpr(hpr)
  315. lens.setIodOffset(pos[0])
  316. if fl is not None:
  317. lens.setFocalLength(fl)
  318. if fs is not None:
  319. lens.setFilmSize(fs[0], fs[1])
  320. if fo is not None:
  321. lens.setFilmOffset(fo[0], fo[1])
  322. else:
  323. serverConfigName = 'cluster-server-%s' % displayName
  324. serverName = base.config.GetString(serverConfigName, '')
  325. if serverName == '':
  326. base.notify.warning(
  327. '%s undefined in Configrc: expected by %s display client.'%
  328. (serverConfigName,clusterConfig))
  329. base.notify.warning('%s will not be used.' % serverConfigName)
  330. else:
  331. # Server port
  332. serverPortConfigName = 'cluster-server-port-%s' % displayName
  333. serverPort = base.config.GetInt(serverPortConfigName,
  334. CLUSTER_SERVER_PORT)
  335. cci = ClusterConfigItem(
  336. serverConfigName,
  337. serverName,
  338. serverPort)
  339. # Init cam offset
  340. cci.setCamOffset(pos, hpr)
  341. # Init frustum if specified
  342. if fl and fs and fo:
  343. cci.setCamFrustum(fl, fs, fo)
  344. displayConfigs.append(cci)
  345. # Create Cluster Managers (opening connections to servers)
  346. # Are the servers going to be synced?
  347. if base.clusterSyncFlag:
  348. base.notify.warning('autoflip')
  349. base.graphicsEngine.setAutoFlip(0)
  350. base.notify.warning('ClusterClientSync')
  351. return ClusterClientSync(displayConfigs, base.clusterSyncFlag)
  352. else:
  353. return ClusterClient(displayConfigs, base.clusterSyncFlag)
  354. class DummyClusterClient(DirectObject.DirectObject):
  355. """ Dummy class to handle command strings when not in cluster mode """
  356. notify = DirectNotifyGlobal.directNotify.newCategory("DummyClusterClient")
  357. def __init__(self):
  358. pass
  359. def __call__(self, commandString, fLocally = 1, serverList = None):
  360. if fLocally:
  361. # Execute locally
  362. exec( commandString, __builtins__ )