ClusterServer.py 11 KB


  1. from pandac.PandaModules import *
  2. from ClusterMsgs import *
  3. from direct.distributed.MsgTypes import *
  4. from direct.directnotify import DirectNotifyGlobal
  5. from direct.showbase import DirectObject
  6. from direct.task import Task
  7. # NOTE: This assumes the following variables are set via bootstrap command line
  8. # arguments on server startup:
  9. # clusterServerPort
  10. # clusterSyncFlag
  11. # clusterDaemonClient
  12. # clusterDaemonPort
  13. # Also, I'm not sure multiple camera-group configurations are working for the
  14. # cluster system.
  15. class ClusterServer(DirectObject.DirectObject):
  16. notify = DirectNotifyGlobal.directNotify.newCategory("ClusterServer")
  17. MSG_NUM = 2000000
  18. def __init__(self, cameraJig, camera):
  19. global clusterServerPort, clusterSyncFlag
  20. global clusterDaemonClient, clusterDaemonPort
  21. # Store information about the cluster's camera
  22. self.cameraJig = cameraJig
  23. self.camera = camera
  24. self.lens = camera.node().getLens()
  25. self.lastConnection = None
  26. self.fPosReceived = 0
  27. # Create network layer objects
  28. self.qcm = QueuedConnectionManager()
  29. self.qcl = QueuedConnectionListener(self.qcm, 0)
  30. self.qcr = QueuedConnectionReader(self.qcm, 0)
  31. self.cw = ConnectionWriter(self.qcm, 0)
  32. try:
  33. port = clusterServerPort
  34. except NameError:
  35. port = CLUSTER_SERVER_PORT
  36. self.tcpRendezvous = self.qcm.openTCPServerRendezvous(port, 1)
  37. self.qcl.addConnection(self.tcpRendezvous)
  38. self.msgHandler = ClusterMsgHandler(ClusterServer.MSG_NUM, self.notify)
  39. # Start cluster tasks
  40. self.startListenerPollTask()
  41. self.startReaderPollTask()
  42. # If synchronized server, start swap coordinator too
  43. try:
  44. clusterSyncFlag
  45. except NameError:
  46. clusterSyncFlag = 0
  47. if clusterSyncFlag:
  48. self.startSwapCoordinator()
  49. base.graphicsEngine.setAutoFlip(0)
  50. # Set global clock mode to slave mode
  51. globalClock.setMode(ClockObject.MSlave)
  52. # Send verification of startup to client
  53. self.daemon = DirectD()
  54. self.objectMappings = {}
  55. self.controlMappings = {}
  56. self.controlOffsets = {}
  57. # These must be passed in as bootstrap arguments and stored in
  58. # the __builtins__ namespace
  59. try:
  60. clusterDaemonClient
  61. except NameError:
  62. clusterDaemonClient = 'localhost'
  63. try:
  64. clusterDaemonPort
  65. except NameError:
  66. clusterDaemonPort = CLUSTER_DAEMON_PORT
  67. self.daemon.serverReady(clusterDaemonClient, clusterDaemonPort)
  68. def startListenerPollTask(self):
  69. # Run this task near the start of frame, sometime after the dataLoop
  70. taskMgr.add(self.listenerPollTask, "serverListenerPollTask", -40)
  71. def listenerPollTask(self, task):
  72. """ Task to listen for a new connection from the client """
  73. # Run this task after the dataLoop
  74. if self.qcl.newConnectionAvailable():
  75. self.notify.info("New connection is available")
  76. rendezvous = PointerToConnection()
  77. netAddress = NetAddress()
  78. newConnection = PointerToConnection()
  79. if self.qcl.getNewConnection(rendezvous, netAddress, newConnection):
  80. # Crazy dereferencing
  81. newConnection=newConnection.p()
  82. self.qcr.addConnection(newConnection)
  83. self.lastConnection = newConnection
  84. self.notify.info("Got a connection!")
  85. else:
  86. self.notify.warning("getNewConnection returned false")
  87. return Task.cont
  88. def addNamedObjectMapping(self,object,name):
  89. if (not self.objectMappings.has_key(name)):
  90. self.objectMappings[name] = object
  91. else:
  92. self.notify.debug('attempt to add duplicate named object: '+name)
  93. def removeObjectMapping(self,name):
  94. if (self.objectMappings.has_key(name)):
  95. self.objectMappings.pop(name)
  96. def addControlMapping(self,objectName,controlledName, offset = None):
  97. if (not self.controlMappings.has_key(objectName)):
  98. self.controlMappings[objectName] = controlledName
  99. if (offset == None):
  100. offset = Vec3(0,0,0)
  101. self.controlOffsets[objectName] = offset
  102. else:
  103. self.notify.debug('attempt to add duplicate controlled object: '+name)
  104. def setControlMappingOffset(self,objectName,offset):
  105. if (self.controlMappings.has_key(objectName)):
  106. self.controlOffsets[objectName] = offset
  107. def removeControlMapping(self,name):
  108. if (self.controlMappings.has_key(name)):
  109. self.controlMappings.pop(name)
  110. def startControlObjectTask(self):
  111. self.notify.debug("moving control objects")
  112. taskMgr.add(self.controlObjectTask,"controlObjectTask")
  113. def controlObjectTask(self, task):
  114. for object in self.controlMappings:
  115. name = self.controlMappings[object]
  116. if (self.objectMappings.has_key(object)):
  117. self.moveObject(self.objectMappings[object],name,self.controlOffsets[object])
  118. return Task.cont
  119. def moveObject(self, nodePath, object, offset):
  120. self.notify.debug('moving object '+object)
  121. xyz = nodePath.getPos(render) + offset
  122. hpr = nodePath.getHpr(render)
  123. scale = nodePath.getScale(render)
  124. hidden = nodePath.isHidden()
  125. datagram = self.msgHandler.makeNamedObjectMovementDatagram(xyz,hpr,scale,hidden,object)
  126. self.cw.send(datagram, self.lastConnection)
  127. def startReaderPollTask(self):
  128. """ Task to handle datagrams from client """
  129. # Run this task just after the listener poll task
  130. if clusterSyncFlag:
  131. # Sync version
  132. taskMgr.add(self._syncReaderPollTask, "serverReaderPollTask", -39)
  133. else:
  134. # Asynchronous version
  135. taskMgr.add(self._readerPollTask, "serverReaderPollTask", -39)
  136. def _readerPollTask(self, state):
  137. """ Non blocking task to read all available datagrams """
  138. while 1:
  139. (datagram, dgi, type) = self.msgHandler.nonBlockingRead(self.qcr)
  140. # Queue is empty, done for now
  141. if type is CLUSTER_NONE:
  142. break
  143. else:
  144. # Got a datagram, handle it
  145. self.handleDatagram(dgi, type)
  146. return Task.cont
  147. def _syncReaderPollTask(self, task):
  148. if self.lastConnection is None:
  149. pass
  150. elif self.qcr.isConnectionOk(self.lastConnection):
  151. # Process datagrams till you get a postion update
  152. type = CLUSTER_NONE
  153. while type != CLUSTER_CAM_MOVEMENT:
  154. # Block until you get a new datagram
  155. (datagram, dgi, type) = self.msgHandler.blockingRead(self.qcr)
  156. # Process datagram
  157. self.handleDatagram(dgi, type)
  158. return Task.cont
  159. def startSwapCoordinator(self):
  160. taskMgr.add(self.swapCoordinatorTask, "serverSwapCoordinator", 51)
  161. def swapCoordinatorTask(self, task):
  162. if self.fPosReceived:
  163. self.fPosReceived = 0
  164. # Alert client that this server is ready to swap
  165. self.sendSwapReady()
  166. # Wait for swap command (processing any intermediate datagrams)
  167. while 1:
  168. (datagram, dgi, type) = self.msgHandler.blockingRead(self.qcr)
  169. self.handleDatagram(dgi, type)
  170. if type == CLUSTER_SWAP_NOW:
  171. break
  172. return Task.cont
  173. def sendSwapReady(self):
  174. self.notify.debug(
  175. 'send swap ready packet %d' % self.msgHandler.packetNumber)
  176. datagram = self.msgHandler.makeSwapReadyDatagram()
  177. self.cw.send(datagram, self.lastConnection)
  178. def handleDatagram(self, dgi, type):
  179. """ Process a datagram depending upon type flag """
  180. if (type == CLUSTER_NONE):
  181. pass
  182. elif (type == CLUSTER_EXIT):
  183. print 'GOT EXIT'
  184. import sys
  185. sys.exit()
  186. elif (type == CLUSTER_CAM_OFFSET):
  187. self.handleCamOffset(dgi)
  188. elif (type == CLUSTER_CAM_FRUSTUM):
  189. self.handleCamFrustum(dgi)
  190. elif (type == CLUSTER_CAM_MOVEMENT):
  191. self.handleCamMovement(dgi)
  192. elif (type == CLUSTER_SELECTED_MOVEMENT):
  193. self.handleSelectedMovement(dgi)
  194. elif (type == CLUSTER_COMMAND_STRING):
  195. self.handleCommandString(dgi)
  196. elif (type == CLUSTER_SWAP_READY):
  197. pass
  198. elif (type == CLUSTER_SWAP_NOW):
  199. self.notify.debug('swapping')
  200. base.graphicsEngine.flipFrame()
  201. elif (type == CLUSTER_TIME_DATA):
  202. self.notify.debug('time data')
  203. self.handleTimeData(dgi)
  204. elif (type == CLUSTER_NAMED_OBJECT_MOVEMENT):
  205. self.handleNamedMovement(dgi)
  206. else:
  207. self.notify.warning("Received unknown packet type:" % type)
  208. return type
  209. # Server specific tasks
  210. def handleCamOffset(self, dgi):
  211. """ Set offset of camera from cameraJig """
  212. (x, y, z, h, p, r) = self.msgHandler.parseCamOffsetDatagram(dgi)
  213. self.camera.setPos(x,y,z)
  214. self.lens.setViewHpr(h, p, r)
  215. def handleCamFrustum(self, dgi):
  216. """ Adjust camera frustum based on parameters sent by client """
  217. (fl, fs, fo) = self.msgHandler.parseCamFrustumDatagram(dgi)
  218. self.lens.setFocalLength(fl)
  219. self.lens.setFilmSize(fs[0], fs[1])
  220. self.lens.setFilmOffset(fo[0], fo[1])
  221. def handleNamedMovement(self, dgi):
  222. """ Update cameraJig position to reflect latest position """
  223. (name,x, y, z, h, p, r,sx,sy,sz, hidden) = self.msgHandler.parseNamedMovementDatagram(
  224. dgi)
  225. if (self.objectMappings.has_key(name)):
  226. self.objectMappings[name].setPosHpr(render, x, y, z, h, p, r)
  227. self.objectMappings[name].setScale(render,sx,sy,sz)
  228. if (hidden):
  229. self.objectMappings[name].hide()
  230. else:
  231. self.objectMappings[name].show()
  232. else:
  233. self.notify.debug("recieved unknown named object command: "+name)
  234. def handleCamMovement(self, dgi):
  235. """ Update cameraJig position to reflect latest position """
  236. (x, y, z, h, p, r) = self.msgHandler.parseCamMovementDatagram(dgi)
  237. self.cameraJig.setPosHpr(render, x, y, z, h, p, r)
  238. self.fPosReceived = 1
  239. def handleSelectedMovement(self, dgi):
  240. """ Update cameraJig position to reflect latest position """
  241. (x, y, z, h, p, r, sx, sy, sz) = self.msgHandler.parseSelectedMovementDatagram(
  242. dgi)
  243. if last:
  244. last.setPosHprScale(x, y, z, h, p, r, sx, sy, sz)
  245. def handleTimeData(self, dgi):
  246. """ Update cameraJig position to reflect latest position """
  247. (frameCount, frameTime, dt) = self.msgHandler.parseTimeDataDatagram(dgi)
  248. # Use frame time from client for both real and frame time
  249. globalClock.setFrameCount(frameCount)
  250. globalClock.setFrameTime(frameTime)
  251. globalClock.setDt(dt)
  252. def handleCommandString(self, dgi):
  253. """ Handle arbitrary command string from client """
  254. command = self.msgHandler.parseCommandStringDatagram(dgi)
  255. try:
  256. exec(command, __builtins__)
  257. except:
  258. pass