| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208 |
- """ServerRepository module: contains the ServerRepository class"""
- from PandaModules import *
- from ClusterMsgs import *
- from MsgTypes import *
- import DirectNotifyGlobal
- import DirectObject
- import Task
- # NOTE: This assumes the following variables are set via bootstrap command line
- # arguments on server startup:
- # clusterServerPort
- # clusterSyncFlag
- # clusterDaemonClient
- # clusterDaemonPort
- # Also, I'm not sure multiple camera-group configurations are working for the
- # cluster system.
- class ClusterServer(DirectObject.DirectObject):
- notify = DirectNotifyGlobal.directNotify.newCategory("ClusterServer")
- MSG_NUM = 2000000
- def __init__(self,cameraJig,camera):
- global clusterServerPort, clusterSyncFlag
- global clusterDaemonClient, clusterDaemonPort
- # Store information about the cluster's camera
- self.cameraJig = cameraJig
- self.camera = camera
- self.lens = camera.node().getLens()
- self.lastConnection = None
- self.fPosReceived = 0
- # Create network layer objects
- self.qcm = QueuedConnectionManager()
- self.qcl = QueuedConnectionListener(self.qcm, 0)
- self.qcr = QueuedConnectionReader(self.qcm, 0)
- self.cw = ConnectionWriter(self.qcm,0)
- try:
- port = clusterServerPort
- except NameError:
- port = CLUSTER_SERVER_PORT
- self.tcpRendezvous = self.qcm.openTCPServerRendezvous(port, 1)
- self.qcl.addConnection(self.tcpRendezvous)
- self.msgHandler = ClusterMsgHandler(ClusterServer.MSG_NUM, self.notify)
- # Start cluster tasks
- self.startListenerPollTask()
- self.startReaderPollTask()
- # If synchronized server, start swap coordinator too
- try:
- clusterSyncFlag
- except NameError:
- clusterSyncFlag = 0
- if clusterSyncFlag:
- self.startSwapCoordinator()
- base.win.setSync(1)
- # Send verification of startup to client
- self.daemon = DirectD()
- # These must be passed in as bootstrap arguments and stored in
- # the __builtin__ namespace
- try:
- clusterDaemonClient
- except NameError:
- clusterDaemonClient = 'localhost'
- try:
- clusterDaemonPort
- except NameError:
- clusterDaemonPort = CLUSTER_DAEMON_PORT
- self.daemon.serverReady(clusterDaemonClient, clusterDaemonPort)
- def startListenerPollTask(self):
- # Run this task near the start of frame, sometime after the dataloop
- taskMgr.add(self.listenerPollTask, "serverListenerPollTask",-40)
- def listenerPollTask(self, task):
- """ Task to listen for a new connection from the client """
- # Run this task after the dataloop
- if self.qcl.newConnectionAvailable():
- self.notify.info("New connection is available")
- rendezvous = PointerToConnection()
- netAddress = NetAddress()
- newConnection = PointerToConnection()
- if self.qcl.getNewConnection(rendezvous,netAddress,newConnection):
- # Crazy dereferencing
- newConnection=newConnection.p()
- self.qcr.addConnection(newConnection)
- self.lastConnection = newConnection
- self.notify.info("Got a connection!")
- else:
- self.notify.warning("getNewConnection returned false")
- return Task.cont
- def startReaderPollTask(self):
- """ Task to handle datagrams from client """
- # Run this task just after the listener poll task
- if clusterSyncFlag:
- # Sync version
- taskMgr.add(self._syncReaderPollTask, "serverReaderPollTask", -39)
- else:
- # Asynchronous version
- taskMgr.add(self._readerPollTask, "serverReaderPollTask", -39)
- def _readerPollTask(self, state):
- """ Non blocking task to read all available datagrams """
- while 1:
- (datagram, dgi,type) = self.msgHandler.nonBlockingRead(self.qcr)
- # Queue is empty, done for now
- if type is CLUSTER_NONE:
- break
- else:
- # Got a datagram, handle it
- self.handleDatagram(dgi, type)
- return Task.cont
- def _syncReaderPollTask(self, task):
- if self.lastConnection is None:
- pass
- elif self.qcr.isConnectionOk(self.lastConnection):
- # Process datagrams till you get a postion update
- type = CLUSTER_NONE
- while type != CLUSTER_CAM_MOVEMENT:
- # Block until you get a new datagram
- (datagram,dgi,type) = self.msgHandler.blockingRead(self.qcr)
- # Process datagram
- self.handleDatagram(dgi,type)
- return Task.cont
- def startSwapCoordinator(self):
- taskMgr.add(self.swapCoordinatorTask, "serverSwapCoordinator", 51)
- def swapCoordinatorTask(self, task):
- if self.fPosReceived:
- self.fPosReceived = 0
- # Alert client that this server is ready to swap
- self.sendSwapReady()
- # Wait for swap command (processing any intermediate datagrams)
- while 1:
- (datagram,dgi,type) = self.msgHandler.blockingRead(self.qcr)
- self.handleDatagram(dgi,type)
- if type == CLUSTER_SWAP_NOW:
- break
- return Task.cont
- def sendSwapReady(self):
- self.notify.debug(
- 'send swap ready packet %d' % self.msgHandler.packetNumber)
- datagram = self.msgHandler.makeSwapReadyDatagram()
- self.cw.send(datagram, self.lastConnection)
- def handleDatagram(self, dgi, type):
- """ Process a datagram depending upon type flag """
- if (type == CLUSTER_NONE):
- pass
- elif (type == CLUSTER_EXIT):
- import sys
- sys.exit()
- elif (type == CLUSTER_CAM_OFFSET):
- self.handleCamOffset(dgi)
- elif (type == CLUSTER_CAM_FRUSTUM):
- self.handleCamFrustum(dgi)
- elif (type == CLUSTER_CAM_MOVEMENT):
- self.handleCamMovement(dgi)
- elif (type == CLUSTER_SELECTED_MOVEMENT):
- self.handleSelectedMovement(dgi)
- elif (type == CLUSTER_COMMAND_STRING):
- self.handleCommandString(dgi)
- elif (type == CLUSTER_SWAP_READY):
- pass
- elif (type == CLUSTER_SWAP_NOW):
- self.notify.debug('swapping')
- base.win.swap()
- else:
- self.notify.warning("Received unknown packet type:" % type)
- return type
- # Server specific tasks
- def handleCamOffset(self,dgi):
- """ Set offset of camera from cameraJig """
- (x,y,z,h,p,r) = self.msgHandler.parseCamOffsetDatagram(dgi)
- self.lens.setIodOffset(x)
- self.lens.setViewHpr(h,p,r)
-
- def handleCamFrustum(self,dgi):
- """ Adjust camera frustum based on parameters sent by client """
- (fl,fs,fo) = self.msgHandler.parseCamFrustumDatagram(dgi)
- self.lens.setFocalLength(fl)
- self.lens.setFilmSize(fs[0], fs[1])
- self.lens.setFilmOffset(fo[0], fo[1])
- def handleCamMovement(self,dgi):
- """ Update cameraJig position to reflect latest position """
- (x,y,z,h,p,r) = self.msgHandler.parseCamMovementDatagram(dgi)
- self.cameraJig.setPosHpr(render,x,y,z,h,p,r)
- self.fPosReceived = 1
- def handleSelectedMovement(self,dgi):
- """ Update cameraJig position to reflect latest position """
- (x,y,z,h,p,r,sx,sy,sz) = self.msgHandler.parseSelectedMovementDatagram(
- dgi)
- if last:
- last.setPosHprScale(x,y,z,h,p,r,sx,sy,sz)
- def handleCommandString(self, dgi):
- """ Handle arbitrary command string from client """
- command = self.msgHandler.parseCommandStringDatagram(dgi)
- try:
- exec( command, globals() )
- except:
- pass
-
|