ClusterServer.py 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208
  1. """ServerRepository module: contains the ServerRepository class"""
  2. from PandaModules import *
  3. from ClusterMsgs import *
  4. from MsgTypes import *
  5. import DirectNotifyGlobal
  6. import DirectObject
  7. import Task
  8. # NOTE: This assumes the following variables are set via bootstrap command line
  9. # arguments on server startup:
  10. # clusterServerPort
  11. # clusterSyncFlag
  12. # clusterDaemonClient
  13. # clusterDaemonPort
  14. # Also, I'm not sure multiple camera-group configurations are working for the
  15. # cluster system.
  16. class ClusterServer(DirectObject.DirectObject):
  17. notify = DirectNotifyGlobal.directNotify.newCategory("ClusterServer")
  18. MSG_NUM = 2000000
  19. def __init__(self,cameraJig,camera):
  20. global clusterServerPort, clusterSyncFlag
  21. global clusterDaemonClient, clusterDaemonPort
  22. # Store information about the cluster's camera
  23. self.cameraJig = cameraJig
  24. self.camera = camera
  25. self.lens = camera.node().getLens()
  26. self.lastConnection = None
  27. self.fPosReceived = 0
  28. # Create network layer objects
  29. self.qcm = QueuedConnectionManager()
  30. self.qcl = QueuedConnectionListener(self.qcm, 0)
  31. self.qcr = QueuedConnectionReader(self.qcm, 0)
  32. self.cw = ConnectionWriter(self.qcm,0)
  33. try:
  34. port = clusterServerPort
  35. except NameError:
  36. port = CLUSTER_SERVER_PORT
  37. self.tcpRendezvous = self.qcm.openTCPServerRendezvous(port, 1)
  38. self.qcl.addConnection(self.tcpRendezvous)
  39. self.msgHandler = ClusterMsgHandler(ClusterServer.MSG_NUM, self.notify)
  40. # Start cluster tasks
  41. self.startListenerPollTask()
  42. self.startReaderPollTask()
  43. # If synchronized server, start swap coordinator too
  44. try:
  45. clusterSyncFlag
  46. except NameError:
  47. clusterSyncFlag = 0
  48. if clusterSyncFlag:
  49. self.startSwapCoordinator()
  50. base.win.setSync(1)
  51. # Send verification of startup to client
  52. self.daemon = DirectD()
  53. # These must be passed in as bootstrap arguments and stored in
  54. # the __builtin__ namespace
  55. try:
  56. clusterDaemonClient
  57. except NameError:
  58. clusterDaemonClient = 'localhost'
  59. try:
  60. clusterDaemonPort
  61. except NameError:
  62. clusterDaemonPort = CLUSTER_DAEMON_PORT
  63. self.daemon.serverReady(clusterDaemonClient, clusterDaemonPort)
  64. def startListenerPollTask(self):
  65. # Run this task near the start of frame, sometime after the dataloop
  66. taskMgr.add(self.listenerPollTask, "serverListenerPollTask",-40)
  67. def listenerPollTask(self, task):
  68. """ Task to listen for a new connection from the client """
  69. # Run this task after the dataloop
  70. if self.qcl.newConnectionAvailable():
  71. self.notify.info("New connection is available")
  72. rendezvous = PointerToConnection()
  73. netAddress = NetAddress()
  74. newConnection = PointerToConnection()
  75. if self.qcl.getNewConnection(rendezvous,netAddress,newConnection):
  76. # Crazy dereferencing
  77. newConnection=newConnection.p()
  78. self.qcr.addConnection(newConnection)
  79. self.lastConnection = newConnection
  80. self.notify.info("Got a connection!")
  81. else:
  82. self.notify.warning("getNewConnection returned false")
  83. return Task.cont
  84. def startReaderPollTask(self):
  85. """ Task to handle datagrams from client """
  86. # Run this task just after the listener poll task
  87. if clusterSyncFlag:
  88. # Sync version
  89. taskMgr.add(self._syncReaderPollTask, "serverReaderPollTask", -39)
  90. else:
  91. # Asynchronous version
  92. taskMgr.add(self._readerPollTask, "serverReaderPollTask", -39)
  93. def _readerPollTask(self, state):
  94. """ Non blocking task to read all available datagrams """
  95. while 1:
  96. (datagram, dgi,type) = self.msgHandler.nonBlockingRead(self.qcr)
  97. # Queue is empty, done for now
  98. if type is CLUSTER_NONE:
  99. break
  100. else:
  101. # Got a datagram, handle it
  102. self.handleDatagram(dgi, type)
  103. return Task.cont
  104. def _syncReaderPollTask(self, task):
  105. if self.lastConnection is None:
  106. pass
  107. elif self.qcr.isConnectionOk(self.lastConnection):
  108. # Process datagrams till you get a postion update
  109. type = CLUSTER_NONE
  110. while type != CLUSTER_CAM_MOVEMENT:
  111. # Block until you get a new datagram
  112. (datagram,dgi,type) = self.msgHandler.blockingRead(self.qcr)
  113. # Process datagram
  114. self.handleDatagram(dgi,type)
  115. return Task.cont
  116. def startSwapCoordinator(self):
  117. taskMgr.add(self.swapCoordinatorTask, "serverSwapCoordinator", 51)
  118. def swapCoordinatorTask(self, task):
  119. if self.fPosReceived:
  120. self.fPosReceived = 0
  121. # Alert client that this server is ready to swap
  122. self.sendSwapReady()
  123. # Wait for swap command (processing any intermediate datagrams)
  124. while 1:
  125. (datagram,dgi,type) = self.msgHandler.blockingRead(self.qcr)
  126. self.handleDatagram(dgi,type)
  127. if type == CLUSTER_SWAP_NOW:
  128. break
  129. return Task.cont
  130. def sendSwapReady(self):
  131. self.notify.debug(
  132. 'send swap ready packet %d' % self.msgHandler.packetNumber)
  133. datagram = self.msgHandler.makeSwapReadyDatagram()
  134. self.cw.send(datagram, self.lastConnection)
  135. def handleDatagram(self, dgi, type):
  136. """ Process a datagram depending upon type flag """
  137. if (type == CLUSTER_NONE):
  138. pass
  139. elif (type == CLUSTER_EXIT):
  140. import sys
  141. sys.exit()
  142. elif (type == CLUSTER_CAM_OFFSET):
  143. self.handleCamOffset(dgi)
  144. elif (type == CLUSTER_CAM_FRUSTUM):
  145. self.handleCamFrustum(dgi)
  146. elif (type == CLUSTER_CAM_MOVEMENT):
  147. self.handleCamMovement(dgi)
  148. elif (type == CLUSTER_SELECTED_MOVEMENT):
  149. self.handleSelectedMovement(dgi)
  150. elif (type == CLUSTER_COMMAND_STRING):
  151. self.handleCommandString(dgi)
  152. elif (type == CLUSTER_SWAP_READY):
  153. pass
  154. elif (type == CLUSTER_SWAP_NOW):
  155. self.notify.debug('swapping')
  156. base.win.swap()
  157. else:
  158. self.notify.warning("Received unknown packet type:" % type)
  159. return type
  160. # Server specific tasks
  161. def handleCamOffset(self,dgi):
  162. """ Set offset of camera from cameraJig """
  163. (x,y,z,h,p,r) = self.msgHandler.parseCamOffsetDatagram(dgi)
  164. self.lens.setIodOffset(x)
  165. self.lens.setViewHpr(h,p,r)
  166. def handleCamFrustum(self,dgi):
  167. """ Adjust camera frustum based on parameters sent by client """
  168. (fl,fs,fo) = self.msgHandler.parseCamFrustumDatagram(dgi)
  169. self.lens.setFocalLength(fl)
  170. self.lens.setFilmSize(fs[0], fs[1])
  171. self.lens.setFilmOffset(fo[0], fo[1])
  172. def handleCamMovement(self,dgi):
  173. """ Update cameraJig position to reflect latest position """
  174. (x,y,z,h,p,r) = self.msgHandler.parseCamMovementDatagram(dgi)
  175. self.cameraJig.setPosHpr(render,x,y,z,h,p,r)
  176. self.fPosReceived = 1
  177. def handleSelectedMovement(self,dgi):
  178. """ Update cameraJig position to reflect latest position """
  179. (x,y,z,h,p,r,sx,sy,sz) = self.msgHandler.parseSelectedMovementDatagram(
  180. dgi)
  181. if last:
  182. last.setPosHprScale(x,y,z,h,p,r,sx,sy,sz)
  183. def handleCommandString(self, dgi):
  184. """ Handle arbitrary command string from client """
  185. command = self.msgHandler.parseCommandStringDatagram(dgi)
  186. try:
  187. exec( command, globals() )
  188. except:
  189. pass