2
0

ClusterServer.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348
  1. from panda3d.core import *
  2. from .ClusterMsgs import *
  3. from direct.distributed.MsgTypes import *
  4. from direct.directnotify import DirectNotifyGlobal
  5. from direct.showbase import DirectObject
  6. from direct.task import Task
  7. from direct.task.TaskManagerGlobal import taskMgr
  8. # NOTE: This assumes the following variables are set via bootstrap command line
  9. # arguments on server startup:
  10. # clusterServerPort
  11. # clusterSyncFlag
  12. # clusterDaemonClient
  13. # clusterDaemonPort
  14. # Also, I'm not sure multiple camera-group configurations are working for the
  15. # cluster system.
  16. class ClusterServer(DirectObject.DirectObject):
  17. notify = DirectNotifyGlobal.directNotify.newCategory("ClusterServer")
  18. MSG_NUM = 2000000
  19. def __init__(self, cameraJig, camera):
  20. global clusterServerPort, clusterSyncFlag
  21. global clusterDaemonClient, clusterDaemonPort
  22. # Store information about the cluster's camera
  23. self.cameraJig = cameraJig
  24. self.camera = camera
  25. self.lens = camera.node().getLens()
  26. self.lastConnection = None
  27. self.fPosReceived = 0
  28. # Create network layer objects
  29. self.qcm = QueuedConnectionManager()
  30. self.qcl = QueuedConnectionListener(self.qcm, 0)
  31. self.qcr = QueuedConnectionReader(self.qcm, 0)
  32. self.cw = ConnectionWriter(self.qcm, 0)
  33. try:
  34. port = clusterServerPort
  35. except NameError:
  36. port = CLUSTER_SERVER_PORT
  37. self.tcpRendezvous = self.qcm.openTCPServerRendezvous(port, 1)
  38. self.qcl.addConnection(self.tcpRendezvous)
  39. self.msgHandler = ClusterMsgHandler(ClusterServer.MSG_NUM, self.notify)
  40. # Start cluster tasks
  41. self.startListenerPollTask()
  42. self.startReaderPollTask()
  43. # If synchronized server, start swap coordinator too
  44. try:
  45. clusterSyncFlag
  46. except NameError:
  47. clusterSyncFlag = 0
  48. if clusterSyncFlag:
  49. self.startSwapCoordinator()
  50. base.graphicsEngine.setAutoFlip(0)
  51. # Set global clock mode to slave mode
  52. globalClock.setMode(ClockObject.MSlave)
  53. # Send verification of startup to client
  54. self.daemon = DirectD()
  55. self.objectMappings = {}
  56. self.objectHasColor = {}
  57. self.controlMappings = {}
  58. self.controlPriorities = {}
  59. self.controlOffsets = {}
  60. self.messageQueue = []
  61. self.sortedControlMappings = []
  62. # These must be passed in as bootstrap arguments and stored in
  63. # the __builtins__ namespace
  64. try:
  65. clusterDaemonClient
  66. except NameError:
  67. clusterDaemonClient = 'localhost'
  68. try:
  69. clusterDaemonPort
  70. except NameError:
  71. clusterDaemonPort = CLUSTER_DAEMON_PORT
  72. self.daemon.serverReady(clusterDaemonClient, clusterDaemonPort)
  73. def startListenerPollTask(self):
  74. # Run this task near the start of frame, sometime after the dataLoop
  75. taskMgr.add(self.listenerPollTask, "serverListenerPollTask", -40)
  76. def listenerPollTask(self, task):
  77. """ Task to listen for a new connection from the client """
  78. # Run this task after the dataLoop
  79. if self.qcl.newConnectionAvailable():
  80. self.notify.info("New connection is available")
  81. rendezvous = PointerToConnection()
  82. netAddress = NetAddress()
  83. newConnection = PointerToConnection()
  84. if self.qcl.getNewConnection(rendezvous, netAddress, newConnection):
  85. # Crazy dereferencing
  86. newConnection=newConnection.p()
  87. self.qcr.addConnection(newConnection)
  88. self.lastConnection = newConnection
  89. self.notify.info("Got a connection!")
  90. else:
  91. self.notify.warning("getNewConnection returned false")
  92. return Task.cont
  93. def addNamedObjectMapping(self, object, name, hasColor = True,
  94. priority = 0):
  95. if name not in self.objectMappings:
  96. self.objectMappings[name] = object
  97. self.objectHasColor[name] = hasColor
  98. else:
  99. self.notify.debug('attempt to add duplicate named object: '+name)
  100. def removeObjectMapping(self, name):
  101. if name in self.objectMappings:
  102. self.objectMappings.pop(name)
  103. def redoSortedPriorities(self):
  104. self.sortedControlMappings = []
  105. for key in self.objectMappings:
  106. self.sortedControlMappings.append([self.controlPriorities[key],
  107. key])
  108. self.sortedControlMappings.sort()
  109. def addControlMapping(self, objectName, controlledName, offset = None,
  110. priority = 0):
  111. if objectName not in self.controlMappings:
  112. self.controlMappings[objectName] = controlledName
  113. if offset is None:
  114. offset = Vec3(0,0,0)
  115. self.controlOffsets[objectName] = offset
  116. self.controlPriorities[objectName] = priority
  117. self.redoSortedPriorities()
  118. else:
  119. self.notify.debug('attempt to add duplicate controlled object: '+name)
  120. def setControlMappingOffset(self, objectName, offset):
  121. if objectName in self.controlMappings:
  122. self.controlOffsets[objectName] = offset
  123. def removeControlMapping(self, name):
  124. if name in self.controlMappings:
  125. self.controlMappings.pop(name)
  126. self.controlPriorities.pop(name)
  127. self.redoSortedPriorities()
  128. def startControlObjectTask(self):
  129. self.notify.debug("moving control objects")
  130. taskMgr.add(self.controlObjectTask,"controlObjectTask",50)
  131. def controlObjectTask(self, task):
  132. #print "running control object task"
  133. for pair in self.sortedControlPriorities:
  134. object = pair[1]
  135. name = self.controlMappings[object]
  136. if object in self.objectMappings:
  137. self.moveObject(self.objectMappings[object],name,self.controlOffsets[object],
  138. self.objectHasColor[object])
  139. self.sendNamedMovementDone()
  140. return Task.cont
  141. def sendNamedMovementDone(self):
  142. self.notify.debug("named movement done")
  143. datagram = self.msgHandler.makeNamedMovementDone()
  144. self.cw.send(datagram,self.lastConnection)
  145. def moveObject(self, nodePath, object, offset, hasColor):
  146. self.notify.debug('moving object '+object)
  147. #print "moving object",object
  148. xyz = nodePath.getPos(render) + offset
  149. hpr = nodePath.getHpr(render)
  150. scale = nodePath.getScale(render)
  151. if hasColor:
  152. color = nodePath.getColor()
  153. else:
  154. color = [1,1,1,1]
  155. hidden = nodePath.isHidden()
  156. datagram = self.msgHandler.makeNamedObjectMovementDatagram(xyz,hpr,scale,color,hidden,object)
  157. self.cw.send(datagram, self.lastConnection)
  158. def startReaderPollTask(self):
  159. """ Task to handle datagrams from client """
  160. # Run this task just after the listener poll task
  161. if clusterSyncFlag:
  162. # Sync version
  163. taskMgr.add(self._syncReaderPollTask, "serverReaderPollTask", -39)
  164. else:
  165. # Asynchronous version
  166. taskMgr.add(self._readerPollTask, "serverReaderPollTask", -39)
  167. def _readerPollTask(self, state):
  168. """ Non blocking task to read all available datagrams """
  169. while 1:
  170. (datagram, dgi, type) = self.msgHandler.nonBlockingRead(self.qcr)
  171. # Queue is empty, done for now
  172. if type is CLUSTER_NONE:
  173. break
  174. else:
  175. # Got a datagram, handle it
  176. self.handleDatagram(dgi, type)
  177. return Task.cont
  178. def _syncReaderPollTask(self, task):
  179. if self.lastConnection is None:
  180. pass
  181. elif self.qcr.isConnectionOk(self.lastConnection):
  182. # Process datagrams till you get a postion update
  183. type = CLUSTER_NONE
  184. while type != CLUSTER_CAM_MOVEMENT:
  185. # Block until you get a new datagram
  186. (datagram, dgi, type) = self.msgHandler.blockingRead(self.qcr)
  187. # Process datagram
  188. self.handleDatagram(dgi, type)
  189. return Task.cont
  190. def startSwapCoordinator(self):
  191. taskMgr.add(self.swapCoordinatorTask, "serverSwapCoordinator", 51)
  192. def swapCoordinatorTask(self, task):
  193. if self.fPosReceived:
  194. self.fPosReceived = 0
  195. # Alert client that this server is ready to swap
  196. self.sendSwapReady()
  197. # Wait for swap command (processing any intermediate datagrams)
  198. while 1:
  199. (datagram, dgi, type) = self.msgHandler.blockingRead(self.qcr)
  200. self.handleDatagram(dgi, type)
  201. if type == CLUSTER_SWAP_NOW:
  202. break
  203. return Task.cont
  204. def sendSwapReady(self):
  205. self.notify.debug(
  206. 'send swap ready packet %d' % self.msgHandler.packetNumber)
  207. datagram = self.msgHandler.makeSwapReadyDatagram()
  208. self.cw.send(datagram, self.lastConnection)
  209. def handleDatagram(self, dgi, type):
  210. """ Process a datagram depending upon type flag """
  211. if type == CLUSTER_NONE:
  212. pass
  213. elif type == CLUSTER_EXIT:
  214. print('GOT EXIT')
  215. import sys
  216. sys.exit()
  217. elif type == CLUSTER_CAM_OFFSET:
  218. self.handleCamOffset(dgi)
  219. elif type == CLUSTER_CAM_FRUSTUM:
  220. self.handleCamFrustum(dgi)
  221. elif type == CLUSTER_CAM_MOVEMENT:
  222. self.handleCamMovement(dgi)
  223. elif type == CLUSTER_SELECTED_MOVEMENT:
  224. self.handleSelectedMovement(dgi)
  225. elif type == CLUSTER_COMMAND_STRING:
  226. self.handleCommandString(dgi)
  227. elif type == CLUSTER_SWAP_READY:
  228. pass
  229. elif type == CLUSTER_SWAP_NOW:
  230. self.notify.debug('swapping')
  231. base.graphicsEngine.flipFrame()
  232. elif type == CLUSTER_TIME_DATA:
  233. self.notify.debug('time data')
  234. self.handleTimeData(dgi)
  235. elif type == CLUSTER_NAMED_OBJECT_MOVEMENT:
  236. self.messageQueue.append(self.msgHandler.parseNamedMovementDatagram(dgi))
  237. #self.handleNamedMovement(dgi)
  238. elif type == CLUSTER_NAMED_MOVEMENT_DONE:
  239. #print "got done",self.messageQueue
  240. #if (len(self.messageQueue) > 0):
  241. # print self.messageQueue[0]
  242. # print dir(self.messageQueue)
  243. self.handleMessageQueue()
  244. else:
  245. self.notify.warning("Received unknown packet type:" % type)
  246. return type
  247. # Server specific tasks
  248. def handleCamOffset(self, dgi):
  249. """ Set offset of camera from cameraJig """
  250. (x, y, z, h, p, r) = self.msgHandler.parseCamOffsetDatagram(dgi)
  251. self.camera.setPos(x,y,z)
  252. self.lens.setViewHpr(h, p, r)
  253. def handleCamFrustum(self, dgi):
  254. """ Adjust camera frustum based on parameters sent by client """
  255. (fl, fs, fo) = self.msgHandler.parseCamFrustumDatagram(dgi)
  256. self.lens.setFocalLength(fl)
  257. self.lens.setFilmSize(fs[0], fs[1])
  258. self.lens.setFilmOffset(fo[0], fo[1])
  259. def handleNamedMovement(self, data):
  260. """ Update cameraJig position to reflect latest position """
  261. (name,x, y, z, h, p, r,sx,sy,sz, red, g, b, a, hidden) = data
  262. if name in self.objectMappings:
  263. self.objectMappings[name].setPosHpr(render, x, y, z, h, p, r)
  264. self.objectMappings[name].setScale(render,sx,sy,sz)
  265. self.objectMappings[name].setColor(red,g,b,a)
  266. if hidden:
  267. self.objectMappings[name].hide()
  268. else:
  269. self.objectMappings[name].show()
  270. else:
  271. self.notify.debug("recieved unknown named object command: "+name)
  272. def handleMessageQueue(self):
  273. #print self.messageQueue
  274. for data in self.messageQueue:
  275. #print "in queue",dgi
  276. self.handleNamedMovement(data)
  277. self.messageQueue = []
  278. def handleCamMovement(self, dgi):
  279. """ Update cameraJig position to reflect latest position """
  280. (x, y, z, h, p, r) = self.msgHandler.parseCamMovementDatagram(dgi)
  281. self.cameraJig.setPosHpr(render, x, y, z, h, p, r)
  282. self.fPosReceived = 1
  283. def handleSelectedMovement(self, dgi):
  284. """ Update cameraJig position to reflect latest position """
  285. (x, y, z, h, p, r, sx, sy, sz) = self.msgHandler.parseSelectedMovementDatagram(
  286. dgi)
  287. if last:
  288. last.setPosHprScale(x, y, z, h, p, r, sx, sy, sz)
  289. def handleTimeData(self, dgi):
  290. """ Update cameraJig position to reflect latest position """
  291. (frameCount, frameTime, dt) = self.msgHandler.parseTimeDataDatagram(dgi)
  292. # Use frame time from client for both real and frame time
  293. globalClock.setFrameCount(frameCount)
  294. globalClock.setFrameTime(frameTime)
  295. globalClock.setDt(dt)
  296. def handleCommandString(self, dgi):
  297. """ Handle arbitrary command string from client """
  298. command = self.msgHandler.parseCommandStringDatagram(dgi)
  299. try:
  300. exec(command, __builtins__)
  301. except:
  302. pass