ClusterServer.py 14 KB


  1. from panda3d.core import (
  2. ClockObject,
  3. Vec3,
  4. )
  5. from panda3d.net import (
  6. ConnectionWriter,
  7. NetAddress,
  8. PointerToConnection,
  9. QueuedConnectionListener,
  10. QueuedConnectionManager,
  11. QueuedConnectionReader,
  12. )
  13. from .ClusterMsgs import (
  14. CLUSTER_CAM_FRUSTUM,
  15. CLUSTER_CAM_MOVEMENT,
  16. CLUSTER_CAM_OFFSET,
  17. CLUSTER_COMMAND_STRING,
  18. CLUSTER_DAEMON_PORT,
  19. CLUSTER_EXIT,
  20. CLUSTER_NAMED_MOVEMENT_DONE,
  21. CLUSTER_NAMED_OBJECT_MOVEMENT,
  22. CLUSTER_NONE,
  23. CLUSTER_SELECTED_MOVEMENT,
  24. CLUSTER_SERVER_PORT,
  25. CLUSTER_SWAP_NOW,
  26. CLUSTER_SWAP_READY,
  27. CLUSTER_TIME_DATA,
  28. ClusterMsgHandler,
  29. )
  30. from direct.directnotify import DirectNotifyGlobal
  31. from direct.showbase import DirectObject
  32. from direct.task import Task
  33. from direct.task.TaskManagerGlobal import taskMgr
  34. import builtins
  35. # NOTE: This assumes the following variables are set via bootstrap command line
  36. # arguments on server startup:
  37. # clusterServerPort
  38. # clusterSyncFlag
  39. # clusterDaemonClient
  40. # clusterDaemonPort
  41. # Also, I'm not sure multiple camera-group configurations are working for the
  42. # cluster system.
  43. class ClusterServer(DirectObject.DirectObject):
  44. notify = DirectNotifyGlobal.directNotify.newCategory("ClusterServer")
  45. MSG_NUM = 2000000
  46. def __init__(self, cameraJig, camera):
  47. global clusterServerPort, clusterSyncFlag
  48. global clusterDaemonClient, clusterDaemonPort
  49. # Store information about the cluster's camera
  50. self.cameraJig = cameraJig
  51. self.camera = camera
  52. self.lens = camera.node().getLens()
  53. self.lastConnection = None
  54. self.fPosReceived = 0
  55. # Create network layer objects
  56. self.qcm = QueuedConnectionManager()
  57. self.qcl = QueuedConnectionListener(self.qcm, 0)
  58. self.qcr = QueuedConnectionReader(self.qcm, 0)
  59. self.cw = ConnectionWriter(self.qcm, 0)
  60. try:
  61. port = clusterServerPort
  62. except NameError:
  63. port = CLUSTER_SERVER_PORT
  64. self.tcpRendezvous = self.qcm.openTCPServerRendezvous(port, 1)
  65. self.qcl.addConnection(self.tcpRendezvous)
  66. self.msgHandler = ClusterMsgHandler(ClusterServer.MSG_NUM, self.notify)
  67. # Start cluster tasks
  68. self.startListenerPollTask()
  69. self.startReaderPollTask()
  70. # If synchronized server, start swap coordinator too
  71. try:
  72. clusterSyncFlag
  73. except NameError:
  74. clusterSyncFlag = 0
  75. if clusterSyncFlag:
  76. self.startSwapCoordinator()
  77. base.graphicsEngine.setAutoFlip(0)
  78. # Set global clock mode to slave mode
  79. ClockObject.getGlobalClock().setMode(ClockObject.MSlave)
  80. # Send verification of startup to client
  81. self.daemon = DirectD()
  82. self.objectMappings = {}
  83. self.objectHasColor = {}
  84. self.controlMappings = {}
  85. self.controlPriorities = {}
  86. self.controlOffsets = {}
  87. self.messageQueue = []
  88. self.sortedControlMappings = []
  89. # These must be passed in as bootstrap arguments and stored in
  90. # the __builtins__ namespace
  91. try:
  92. clusterDaemonClient
  93. except NameError:
  94. clusterDaemonClient = 'localhost'
  95. try:
  96. clusterDaemonPort
  97. except NameError:
  98. clusterDaemonPort = CLUSTER_DAEMON_PORT
  99. self.daemon.serverReady(clusterDaemonClient, clusterDaemonPort)
  100. def startListenerPollTask(self):
  101. # Run this task near the start of frame, sometime after the dataLoop
  102. taskMgr.add(self.listenerPollTask, "serverListenerPollTask", -40)
  103. def listenerPollTask(self, task):
  104. """ Task to listen for a new connection from the client """
  105. # Run this task after the dataLoop
  106. if self.qcl.newConnectionAvailable():
  107. self.notify.info("New connection is available")
  108. rendezvous = PointerToConnection()
  109. netAddress = NetAddress()
  110. newConnection = PointerToConnection()
  111. if self.qcl.getNewConnection(rendezvous, netAddress, newConnection):
  112. # Crazy dereferencing
  113. newConnection=newConnection.p()
  114. self.qcr.addConnection(newConnection)
  115. self.lastConnection = newConnection
  116. self.notify.info("Got a connection!")
  117. else:
  118. self.notify.warning("getNewConnection returned false")
  119. return Task.cont
  120. def addNamedObjectMapping(self, object, name, hasColor = True,
  121. priority = 0):
  122. if name not in self.objectMappings:
  123. self.objectMappings[name] = object
  124. self.objectHasColor[name] = hasColor
  125. else:
  126. self.notify.debug('attempt to add duplicate named object: '+name)
  127. def removeObjectMapping(self, name):
  128. if name in self.objectMappings:
  129. self.objectMappings.pop(name)
  130. def redoSortedPriorities(self):
  131. self.sortedControlMappings = sorted(
  132. [self.controlPriorities[key], key] for key in self.objectMappings
  133. )
  134. def addControlMapping(self, objectName, controlledName, offset = None,
  135. priority = 0):
  136. if objectName not in self.controlMappings:
  137. self.controlMappings[objectName] = controlledName
  138. if offset is None:
  139. offset = Vec3(0,0,0)
  140. self.controlOffsets[objectName] = offset
  141. self.controlPriorities[objectName] = priority
  142. self.redoSortedPriorities()
  143. else:
  144. self.notify.debug('attempt to add duplicate controlled object: ' + objectName)
  145. def setControlMappingOffset(self, objectName, offset):
  146. if objectName in self.controlMappings:
  147. self.controlOffsets[objectName] = offset
  148. def removeControlMapping(self, name):
  149. if name in self.controlMappings:
  150. self.controlMappings.pop(name)
  151. self.controlPriorities.pop(name)
  152. self.redoSortedPriorities()
  153. def startControlObjectTask(self):
  154. self.notify.debug("moving control objects")
  155. taskMgr.add(self.controlObjectTask,"controlObjectTask",50)
  156. def controlObjectTask(self, task):
  157. #print "running control object task"
  158. for pair in self.sortedControlPriorities:
  159. object = pair[1]
  160. name = self.controlMappings[object]
  161. if object in self.objectMappings:
  162. self.moveObject(self.objectMappings[object],name,self.controlOffsets[object],
  163. self.objectHasColor[object])
  164. self.sendNamedMovementDone()
  165. return Task.cont
  166. def sendNamedMovementDone(self):
  167. self.notify.debug("named movement done")
  168. datagram = self.msgHandler.makeNamedMovementDone()
  169. self.cw.send(datagram,self.lastConnection)
  170. def moveObject(self, nodePath, object, offset, hasColor):
  171. self.notify.debug('moving object '+object)
  172. #print "moving object",object
  173. xyz = nodePath.getPos(render) + offset
  174. hpr = nodePath.getHpr(render)
  175. scale = nodePath.getScale(render)
  176. if hasColor:
  177. color = nodePath.getColor()
  178. else:
  179. color = [1,1,1,1]
  180. hidden = nodePath.isHidden()
  181. datagram = self.msgHandler.makeNamedObjectMovementDatagram(xyz,hpr,scale,color,hidden,object)
  182. self.cw.send(datagram, self.lastConnection)
  183. def startReaderPollTask(self):
  184. """ Task to handle datagrams from client """
  185. # Run this task just after the listener poll task
  186. if clusterSyncFlag:
  187. # Sync version
  188. taskMgr.add(self._syncReaderPollTask, "serverReaderPollTask", -39)
  189. else:
  190. # Asynchronous version
  191. taskMgr.add(self._readerPollTask, "serverReaderPollTask", -39)
  192. def _readerPollTask(self, state):
  193. """ Non blocking task to read all available datagrams """
  194. while 1:
  195. (datagram, dgi, type) = self.msgHandler.nonBlockingRead(self.qcr)
  196. # Queue is empty, done for now
  197. if type is CLUSTER_NONE:
  198. break
  199. else:
  200. # Got a datagram, handle it
  201. self.handleDatagram(dgi, type)
  202. return Task.cont
  203. def _syncReaderPollTask(self, task):
  204. if self.lastConnection is None:
  205. pass
  206. elif self.qcr.isConnectionOk(self.lastConnection):
  207. # Process datagrams till you get a postion update
  208. type = CLUSTER_NONE
  209. while type != CLUSTER_CAM_MOVEMENT:
  210. # Block until you get a new datagram
  211. (datagram, dgi, type) = self.msgHandler.blockingRead(self.qcr)
  212. # Process datagram
  213. self.handleDatagram(dgi, type)
  214. return Task.cont
  215. def startSwapCoordinator(self):
  216. taskMgr.add(self.swapCoordinatorTask, "serverSwapCoordinator", 51)
  217. def swapCoordinatorTask(self, task):
  218. if self.fPosReceived:
  219. self.fPosReceived = 0
  220. # Alert client that this server is ready to swap
  221. self.sendSwapReady()
  222. # Wait for swap command (processing any intermediate datagrams)
  223. while 1:
  224. (datagram, dgi, type) = self.msgHandler.blockingRead(self.qcr)
  225. self.handleDatagram(dgi, type)
  226. if type == CLUSTER_SWAP_NOW:
  227. break
  228. return Task.cont
  229. def sendSwapReady(self):
  230. self.notify.debug(
  231. 'send swap ready packet %d' % self.msgHandler.packetNumber)
  232. datagram = self.msgHandler.makeSwapReadyDatagram()
  233. self.cw.send(datagram, self.lastConnection)
  234. def handleDatagram(self, dgi, type):
  235. """ Process a datagram depending upon type flag """
  236. if type == CLUSTER_NONE:
  237. pass
  238. elif type == CLUSTER_EXIT:
  239. print('GOT EXIT')
  240. import sys
  241. sys.exit()
  242. elif type == CLUSTER_CAM_OFFSET:
  243. self.handleCamOffset(dgi)
  244. elif type == CLUSTER_CAM_FRUSTUM:
  245. self.handleCamFrustum(dgi)
  246. elif type == CLUSTER_CAM_MOVEMENT:
  247. self.handleCamMovement(dgi)
  248. elif type == CLUSTER_SELECTED_MOVEMENT:
  249. self.handleSelectedMovement(dgi)
  250. elif type == CLUSTER_COMMAND_STRING:
  251. self.handleCommandString(dgi)
  252. elif type == CLUSTER_SWAP_READY:
  253. pass
  254. elif type == CLUSTER_SWAP_NOW:
  255. self.notify.debug('swapping')
  256. base.graphicsEngine.flipFrame()
  257. elif type == CLUSTER_TIME_DATA:
  258. self.notify.debug('time data')
  259. self.handleTimeData(dgi)
  260. elif type == CLUSTER_NAMED_OBJECT_MOVEMENT:
  261. self.messageQueue.append(self.msgHandler.parseNamedMovementDatagram(dgi))
  262. #self.handleNamedMovement(dgi)
  263. elif type == CLUSTER_NAMED_MOVEMENT_DONE:
  264. #print "got done",self.messageQueue
  265. #if (len(self.messageQueue) > 0):
  266. # print self.messageQueue[0]
  267. # print dir(self.messageQueue)
  268. self.handleMessageQueue()
  269. else:
  270. self.notify.warning("Received unknown packet type:" % type)
  271. return type
  272. # Server specific tasks
  273. def handleCamOffset(self, dgi):
  274. """ Set offset of camera from cameraJig """
  275. (x, y, z, h, p, r) = self.msgHandler.parseCamOffsetDatagram(dgi)
  276. self.camera.setPos(x,y,z)
  277. self.lens.setViewHpr(h, p, r)
  278. def handleCamFrustum(self, dgi):
  279. """ Adjust camera frustum based on parameters sent by client """
  280. (fl, fs, fo) = self.msgHandler.parseCamFrustumDatagram(dgi)
  281. self.lens.setFocalLength(fl)
  282. self.lens.setFilmSize(fs[0], fs[1])
  283. self.lens.setFilmOffset(fo[0], fo[1])
  284. def handleNamedMovement(self, data):
  285. """ Update cameraJig position to reflect latest position """
  286. (name,x, y, z, h, p, r,sx,sy,sz, red, g, b, a, hidden) = data
  287. if name in self.objectMappings:
  288. self.objectMappings[name].setPosHpr(render, x, y, z, h, p, r)
  289. self.objectMappings[name].setScale(render,sx,sy,sz)
  290. self.objectMappings[name].setColor(red,g,b,a)
  291. if hidden:
  292. self.objectMappings[name].hide()
  293. else:
  294. self.objectMappings[name].show()
  295. else:
  296. self.notify.debug("recieved unknown named object command: "+name)
  297. def handleMessageQueue(self):
  298. #print(self.messageQueue)
  299. for data in self.messageQueue:
  300. #print("in queue", dgi)
  301. self.handleNamedMovement(data)
  302. self.messageQueue = []
  303. def handleCamMovement(self, dgi):
  304. """ Update cameraJig position to reflect latest position """
  305. (x, y, z, h, p, r) = self.msgHandler.parseCamMovementDatagram(dgi)
  306. self.cameraJig.setPosHpr(render, x, y, z, h, p, r)
  307. self.fPosReceived = 1
  308. def handleSelectedMovement(self, dgi):
  309. """ Update cameraJig position to reflect latest position """
  310. (x, y, z, h, p, r, sx, sy, sz) = self.msgHandler.parseSelectedMovementDatagram(
  311. dgi)
  312. if getattr(builtins, 'last', None):
  313. builtins.last.setPosHprScale(x, y, z, h, p, r, sx, sy, sz)
  314. def handleTimeData(self, dgi):
  315. """ Update cameraJig position to reflect latest position """
  316. (frameCount, frameTime, dt) = self.msgHandler.parseTimeDataDatagram(dgi)
  317. # Use frame time from client for both real and frame time
  318. clock = ClockObject.getGlobalClock()
  319. clock.setFrameCount(frameCount)
  320. clock.setFrameTime(frameTime)
  321. clock.dt = dt
  322. def handleCommandString(self, dgi):
  323. """ Handle arbitrary command string from client """
  324. command = self.msgHandler.parseCommandStringDatagram(dgi)
  325. try:
  326. exec(command, __builtins__)
  327. except Exception:
  328. pass