ClusterServer.py 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206
  1. from PandaModules import *
  2. from ClusterMsgs import *
  3. from MsgTypes import *
  4. import DirectNotifyGlobal
  5. import DirectObject
  6. import Task
  7. # NOTE: This assumes the following variables are set via bootstrap command line
  8. # arguments on server startup:
  9. # clusterServerPort
  10. # clusterSyncFlag
  11. # clusterDaemonClient
  12. # clusterDaemonPort
  13. # Also, I'm not sure multiple camera-group configurations are working for the
  14. # cluster system.
  15. class ClusterServer(DirectObject.DirectObject):
  16. notify = DirectNotifyGlobal.directNotify.newCategory("ClusterServer")
  17. MSG_NUM = 2000000
  18. def __init__(self,cameraJig,camera):
  19. global clusterServerPort, clusterSyncFlag
  20. global clusterDaemonClient, clusterDaemonPort
  21. # Store information about the cluster's camera
  22. self.cameraJig = cameraJig
  23. self.camera = camera
  24. self.lens = camera.node().getLens()
  25. self.lastConnection = None
  26. self.fPosReceived = 0
  27. # Create network layer objects
  28. self.qcm = QueuedConnectionManager()
  29. self.qcl = QueuedConnectionListener(self.qcm, 0)
  30. self.qcr = QueuedConnectionReader(self.qcm, 0)
  31. self.cw = ConnectionWriter(self.qcm,0)
  32. try:
  33. port = clusterServerPort
  34. except NameError:
  35. port = CLUSTER_SERVER_PORT
  36. self.tcpRendezvous = self.qcm.openTCPServerRendezvous(port, 1)
  37. self.qcl.addConnection(self.tcpRendezvous)
  38. self.msgHandler = ClusterMsgHandler(ClusterServer.MSG_NUM, self.notify)
  39. # Start cluster tasks
  40. self.startListenerPollTask()
  41. self.startReaderPollTask()
  42. # If synchronized server, start swap coordinator too
  43. try:
  44. clusterSyncFlag
  45. except NameError:
  46. clusterSyncFlag = 0
  47. if clusterSyncFlag:
  48. self.startSwapCoordinator()
  49. base.graphicsEngine.setAutoFlip(0)
  50. # Send verification of startup to client
  51. self.daemon = DirectD()
  52. # These must be passed in as bootstrap arguments and stored in
  53. # the __builtins__ namespace
  54. try:
  55. clusterDaemonClient
  56. except NameError:
  57. clusterDaemonClient = 'localhost'
  58. try:
  59. clusterDaemonPort
  60. except NameError:
  61. clusterDaemonPort = CLUSTER_DAEMON_PORT
  62. self.daemon.serverReady(clusterDaemonClient, clusterDaemonPort)
  63. def startListenerPollTask(self):
  64. # Run this task near the start of frame, sometime after the dataloop
  65. taskMgr.add(self.listenerPollTask, "serverListenerPollTask",-40)
  66. def listenerPollTask(self, task):
  67. """ Task to listen for a new connection from the client """
  68. # Run this task after the dataloop
  69. if self.qcl.newConnectionAvailable():
  70. self.notify.info("New connection is available")
  71. rendezvous = PointerToConnection()
  72. netAddress = NetAddress()
  73. newConnection = PointerToConnection()
  74. if self.qcl.getNewConnection(rendezvous,netAddress,newConnection):
  75. # Crazy dereferencing
  76. newConnection=newConnection.p()
  77. self.qcr.addConnection(newConnection)
  78. self.lastConnection = newConnection
  79. self.notify.info("Got a connection!")
  80. else:
  81. self.notify.warning("getNewConnection returned false")
  82. return Task.cont
  83. def startReaderPollTask(self):
  84. """ Task to handle datagrams from client """
  85. # Run this task just after the listener poll task
  86. if clusterSyncFlag:
  87. # Sync version
  88. taskMgr.add(self._syncReaderPollTask, "serverReaderPollTask", -39)
  89. else:
  90. # Asynchronous version
  91. taskMgr.add(self._readerPollTask, "serverReaderPollTask", -39)
  92. def _readerPollTask(self, state):
  93. """ Non blocking task to read all available datagrams """
  94. while 1:
  95. (datagram, dgi,type) = self.msgHandler.nonBlockingRead(self.qcr)
  96. # Queue is empty, done for now
  97. if type is CLUSTER_NONE:
  98. break
  99. else:
  100. # Got a datagram, handle it
  101. self.handleDatagram(dgi, type)
  102. return Task.cont
  103. def _syncReaderPollTask(self, task):
  104. if self.lastConnection is None:
  105. pass
  106. elif self.qcr.isConnectionOk(self.lastConnection):
  107. # Process datagrams till you get a postion update
  108. type = CLUSTER_NONE
  109. while type != CLUSTER_CAM_MOVEMENT:
  110. # Block until you get a new datagram
  111. (datagram,dgi,type) = self.msgHandler.blockingRead(self.qcr)
  112. # Process datagram
  113. self.handleDatagram(dgi,type)
  114. return Task.cont
  115. def startSwapCoordinator(self):
  116. taskMgr.add(self.swapCoordinatorTask, "serverSwapCoordinator", 51)
  117. def swapCoordinatorTask(self, task):
  118. if self.fPosReceived:
  119. self.fPosReceived = 0
  120. # Alert client that this server is ready to swap
  121. self.sendSwapReady()
  122. # Wait for swap command (processing any intermediate datagrams)
  123. while 1:
  124. (datagram,dgi,type) = self.msgHandler.blockingRead(self.qcr)
  125. self.handleDatagram(dgi,type)
  126. if type == CLUSTER_SWAP_NOW:
  127. break
  128. return Task.cont
  129. def sendSwapReady(self):
  130. self.notify.debug(
  131. 'send swap ready packet %d' % self.msgHandler.packetNumber)
  132. datagram = self.msgHandler.makeSwapReadyDatagram()
  133. self.cw.send(datagram, self.lastConnection)
  134. def handleDatagram(self, dgi, type):
  135. """ Process a datagram depending upon type flag """
  136. if (type == CLUSTER_NONE):
  137. pass
  138. elif (type == CLUSTER_EXIT):
  139. import sys
  140. sys.exit()
  141. elif (type == CLUSTER_CAM_OFFSET):
  142. self.handleCamOffset(dgi)
  143. elif (type == CLUSTER_CAM_FRUSTUM):
  144. self.handleCamFrustum(dgi)
  145. elif (type == CLUSTER_CAM_MOVEMENT):
  146. self.handleCamMovement(dgi)
  147. elif (type == CLUSTER_SELECTED_MOVEMENT):
  148. self.handleSelectedMovement(dgi)
  149. elif (type == CLUSTER_COMMAND_STRING):
  150. self.handleCommandString(dgi)
  151. elif (type == CLUSTER_SWAP_READY):
  152. pass
  153. elif (type == CLUSTER_SWAP_NOW):
  154. self.notify.debug('swapping')
  155. base.graphicsEngine.flipFrame()
  156. else:
  157. self.notify.warning("Received unknown packet type:" % type)
  158. return type
  159. # Server specific tasks
  160. def handleCamOffset(self,dgi):
  161. """ Set offset of camera from cameraJig """
  162. (x,y,z,h,p,r) = self.msgHandler.parseCamOffsetDatagram(dgi)
  163. self.lens.setIodOffset(x)
  164. self.lens.setViewHpr(h,p,r)
  165. def handleCamFrustum(self,dgi):
  166. """ Adjust camera frustum based on parameters sent by client """
  167. (fl,fs,fo) = self.msgHandler.parseCamFrustumDatagram(dgi)
  168. self.lens.setFocalLength(fl)
  169. self.lens.setFilmSize(fs[0], fs[1])
  170. self.lens.setFilmOffset(fo[0], fo[1])
  171. def handleCamMovement(self,dgi):
  172. """ Update cameraJig position to reflect latest position """
  173. (x,y,z,h,p,r) = self.msgHandler.parseCamMovementDatagram(dgi)
  174. self.cameraJig.setPosHpr(render,x,y,z,h,p,r)
  175. self.fPosReceived = 1
  176. def handleSelectedMovement(self,dgi):
  177. """ Update cameraJig position to reflect latest position """
  178. (x,y,z,h,p,r,sx,sy,sz) = self.msgHandler.parseSelectedMovementDatagram(
  179. dgi)
  180. if last:
  181. last.setPosHprScale(x,y,z,h,p,r,sx,sy,sz)
  182. def handleCommandString(self, dgi):
  183. """ Handle arbitrary command string from client """
  184. command = self.msgHandler.parseCommandStringDatagram(dgi)
  185. try:
  186. exec( command, __builtins__ )
  187. except:
  188. pass