Răsfoiți Sursa

*** empty log message ***

Mark Mine 24 ani în urmă
părinte
comite
0a482df28a

+ 13 - 9
direct/src/cluster/ClusterClient.py

@@ -92,6 +92,11 @@ class DisplayConnection:
         datagram = self.msgHandler.makeSwapNowDatagram()
         self.cw.send(datagram, self.tcpConn)
         
+    def sendCommandString(self, commandString):
+        ClusterManager.notify.debug("send command string: %s" % commandString)
+        datagram = self.msgHandler.makeCommandStringDatagram(commandString)
+        self.cw.send(datagram, self.tcpConn)
+
 class ClusterManager(DirectObject.DirectObject):
     notify = DirectNotifyGlobal.directNotify.newCategory("ClusterClient")
     MGR_NUM = 1000000
@@ -128,10 +133,16 @@ class ClusterManager(DirectObject.DirectObject):
 
     def moveCameraTask(self,task):
         self.moveCamera(
-            base.camera.getPos(render),
-            base.camera.getHpr(render))
+            direct.camera.getPos(render),
+            direct.camera.getHpr(render))
         return Task.cont
         
+    def clusterCommand(self, commandString):
+        # Execute remotely
+        for server in self.serverList:
+            server.sendCommandString(commandString)
+        # Execute locally
+        exec( commandString, globals() )
 
 class ClusterManagerSync(ClusterManager):
 
@@ -153,24 +164,17 @@ class ClusterManagerSync(ClusterManager):
             self.waitForSwap=0
             self.notify.debug(
                 "START get swaps----------------------------------")
-            localClock = ClockObject()
-            t1 = localClock.getRealTime()
             for server in self.serverList:
                 server.getSwapReady()
             self.notify.debug(
                 "----------------START swap now--------------------")
-            t2 = localClock.getRealTime()
             for server in self.serverList:
                 server.sendSwapNow()
             self.notify.debug(
                 "------------------------------START swap----------")
-            t3 = localClock.getRealTime()
             base.win.swap()
-            t4 = localClock.getRealTime()
             self.notify.debug(
                 "------------------------------------------END swap")
-            self.notify.debug( ("times=%f %f %f %f" % (t1,t2,t3,t4)) )
-            self.notify.debug( ("deltas=%f %f %f" % (t2-t1,t3-t2,t4-t3)) )
         return Task.cont
 
     def moveCamera(self,xyz,hpr):

+ 15 - 12
direct/src/cluster/ClusterMsgs.py

@@ -14,6 +14,7 @@ CLUSTER_CAM_FRUSTUM = 2
 CLUSTER_POS_UPDATE = 3
 CLUSTER_SWAP_READY = 4
 CLUSTER_SWAP_NOW   = 5
+CLUSTER_COMMAND_STRING = 6
 
 #Port number for cluster rendering
 CLUSTER_PORT = 1970
@@ -49,7 +50,6 @@ class MsgHandler:
         else:
             type = CLUSTER_NOTHING
             dgi = None
-
         return (type,dgi)
 
     def readHeader(self,datagram):
@@ -60,21 +60,16 @@ class MsgHandler:
         return (type,dgi)        
 
     def blockingRead(self,qcr):
-        availGetVal = 0
-        while not availGetVal:
-            availGetVal = qcr.dataAvailable()
-            if not availGetVal:
-                # The following may not be necessary.
-                # I just wanted some
-                # time given to the operating system while
-                # busy waiting.
-                time.sleep(0.002)
-                type = CLUSTER_NOTHING
+        while not qcr.dataAvailable():
+            # The following may not be necessary.
+            # I just wanted some
+            # time given to the operating system while
+            # busy waiting.
+            time.sleep(0.002)
         datagram = NetDatagram()
         readRetVal = qcr.getData(datagram)
         if not readRetVal:
             self.notify.warning("getData returned false")
-                
         return datagram
 
     def makeCamOffsetDatagram(self,xyz,hpr):
@@ -90,6 +85,14 @@ class MsgHandler:
         datagram.addFloat32(hpr[2])
         return datagram
 
+    def makeCommandStringDatagram(self, commandString):
+        datagram = Datagram.Datagram()
+        datagram.addUint32(self.packetNumber)
+        self.packetNumber = self.packetNumber + 1
+        datagram.addUint8(CLUSTER_COMMAND_STRING)
+        datagram.addString(commandString)
+        return datagram
+
     def makeCamFrustumDatagram(self,focalLength, filmSize, filmOffset):
         datagram = Datagram.Datagram()
         datagram.addUint32(self.packetNumber)

+ 60 - 100
direct/src/cluster/ClusterServer.py

@@ -26,30 +26,33 @@ class ClusterServer(DirectObject.DirectObject):
     MSG_NUM = 2000000
 
     def __init__(self,cameraGroup,camera):
+        # Store information about the cluster's camera
+        self.cameraGroup = cameraGroup
+        self.camera = camera
+        self.lens = camera.node().getLens()
+        # Initialize camera offsets
+        self.posOffset = Vec3(0,0,0)
+        self.hprOffset = Vec3(0,0,0)
+        # Create network layer objects
+        self.lastConnection = None
         self.qcm = QueuedConnectionManager()
         self.qcl = QueuedConnectionListener(self.qcm, 0)
         self.qcr = QueuedConnectionReader(self.qcm, 0)
         self.cw = ConnectionWriter(self.qcm,0)
         port = base.config.GetInt("cluster-server-port",CLUSTER_PORT)
         self.tcpRendezvous = self.qcm.openTCPServerRendezvous(port, 1)
-        print self.tcpRendezvous
-        self.cameraGroup = cameraGroup
-        self.camera = camera
-        self.lens = camera.node().getLens()
         self.qcl.addConnection(self.tcpRendezvous)
         self.msgHandler = MsgHandler(ClusterServer.MSG_NUM,self.notify)
+        # Start cluster tasks
         self.startListenerPollTask()
         self.startReaderPollTask()
-        self.posOffset = Vec3(0,0,0)
-        self.hprOffset = Vec3(0,0,0)
-        return None
 
     def startListenerPollTask(self):
-        task = Task.Task(self.listenerPoll)
-        taskMgr.add(task, "serverListenerPollTask")
-        return None
+        taskMgr.add(self.listenerPollTask, "serverListenerPollTask",-40)
 
-    def listenerPoll(self, task):
+    def listenerPollTask(self, task):
+        """ Task to listen for a new connection from the client """
+        # Run this task after the dataloop
         if self.qcl.newConnectionAvailable():
             print "New connection is available"
             rendezvous = PointerToConnection()
@@ -69,26 +72,38 @@ class ClusterServer(DirectObject.DirectObject):
         return Task.cont
 
     def startReaderPollTask(self):
-        task = Task.Task(self.readerPollUntilEmpty,-10)
-        taskMgr.add(task, "serverReaderPollTask")
-        return None
-
-    def readerPollUntilEmpty(self, task):
-        while self.readerPollOnce():
-            pass
-        return Task.cont
+        """ Task to handle datagrams from client """
+        # Run this task just after the listener poll task and dataloop
+        taskMgr.add(self.readerPollTask, "serverReaderPollTask", -39)
 
-    def readerPollOnce(self):
-        availGetVal = self.qcr.dataAvailable()
-        if availGetVal:
+    def readerPollTask(self):
+        while self.qcr.dataAvailable():
             datagram = NetDatagram()
             readRetVal = self.qcr.getData(datagram)
             if readRetVal:
                 self.handleDatagram(datagram)
             else:
                 self.notify.warning("getData returned false")
-        return availGetVal
+        return Task.cont
 
+    def handleDatagram(self, datagram):
+        (type, dgi) = self.msgHandler.nonBlockingRead(self.qcr)
+        if type==CLUSTER_CAM_OFFSET:
+            self.handleCamOffset(dgi)
+        elif type==CLUSTER_CAM_FRUSTUM:
+            self.handleCamFrustum(dgi)
+        elif type==CLUSTER_POS_UPDATE:
+            self.handleCamMovement(dgi)
+        elif type==CLUSTER_SWAP_READY:
+            pass
+        elif type==CLUSTER_SWAP_NOW:
+            pass
+        elif type==CLUSTER_COMMAND_STRING:
+            self.handleCommandString(dgi)
+        else:
+            self.notify.warning("recieved unknown packet")
+        return type
+    
     def handleCamOffset(self,dgi):
         x=dgi.getFloat32()
         y=dgi.getFloat32()
@@ -129,83 +144,40 @@ class ClusterServer(DirectObject.DirectObject):
         finalR = r + self.hprOffset[2]
         self.cameraGroup.setPosHpr(render,finalX,finalY,finalZ,
                                    finalH,finalP,finalR)
+
+    def handleCommandString(self, dgi):
+        command = dgi.getString()
+        exec( command, globals() )
         
-    def handleDatagram(self, datagram):
-        (type, dgi) = self.msgHandler.nonBlockingRead(self.qcr)
-        if type==CLUSTER_CAM_OFFSET:
-            self.handleCamOffset(dgi)
-        elif type==CLUSTER_CAM_FRUSTUM:
-            self.handleCamFrustum(dgi)
-        elif type==CLUSTER_POS_UPDATE:
-            self.handleCamMovement(dgi)
-        elif type==CLUSTER_SWAP_READY:
-            pass
-        elif type==CLUSTER_SWAP_NOW:
-            pass
-        else:
-            self.notify.warning("recieved unknown packet")
-        return type
-    
 class ClusterServerSync(ClusterServer):
 
     def __init__(self,cameraGroup,camera):
         self.notify.info('starting ClusterServerSync')
-        self.startReading = 0
         self.posRecieved = 0
         ClusterServer.__init__(self,cameraGroup,camera)
         self.startSwapCoordinator()
         return None
 
-    def startListenerPollTask(self):
-        task = Task.Task(self.listenerPoll,-2)
-        taskMgr.add(task, "serverListenerPollTask")
-        return None
-
-    def listenerPoll(self, task):
-        if self.qcl.newConnectionAvailable():
-            print "New connection is available"
-            rendezvous = PointerToConnection()
-            netAddress = NetAddress()
-            newConnection = PointerToConnection()
-            retVal = self.qcl.getNewConnection(rendezvous, netAddress,
-                                               newConnection)
-            if retVal:
-                # Crazy dereferencing
-                newConnection=newConnection.p()
-                self.qcr.addConnection(newConnection)
-                print "Got a connection!"
-                self.lastConnection = newConnection
+    def readerPollTask(self, task):
+        if self.lastConnection is None:
+            pass
+        elif self.qcr.isConnectionOk(self.lastConnection):
+            # Process datagrams till you get a postion update
+            type = CLUSTER_NOTHING
+            while type != CLUSTER_POS_UPDATE:
                 datagram = self.msgHandler.blockingRead(self.qcr)
                 (type,dgi) = self.msgHandler.readHeader(datagram)
-                if type==CLUSTER_CAM_OFFSET:
+                if type == CLUSTER_POS_UPDATE:
+                    # Move camera
+                    self.handleCamMovement(dgi)
+                    # Set flag for swap coordinator
+                    self.posRecieved = 1
+                elif type == CLUSTER_CAM_OFFSET:
+                    # Update camera offset                    
                     self.handleCamOffset(dgi)
-                else:
-                    self.notify.warning("Wanted cam offset, got something else")
-                self.startReading = 1
-                # now that we have the offset read, can start reading
-            else:
-                self.notify.warning("getNewConnection returned false")
-        return Task.cont
-
-    def startReaderPollTask(self):
-        task = Task.Task(self.readPos,-1)
-        taskMgr.add(task, "serverReadPosTask")
-        return None
-
-    def readPos(self, task):
-        if self.startReading and self.qcr.isConnectionOk(self.lastConnection):
-            datagram = self.msgHandler.blockingRead(self.qcr)
-            (type,dgi) = self.msgHandler.readHeader(datagram)
-            if type == CLUSTER_POS_UPDATE:
-                self.posRecieved = 1
-                self.handleCamMovement(dgi)
-            elif type == CLUSTER_CAM_OFFSET:
-                self.handleCamOffset(dgi)
-            else:
-                self.notify.warning('expected pos or orientation, instead got %d' % type)
-        else:
-            self.startReading = 0 # keep this 0 as long as connection not ok
-
+                elif type == CLUSTER_COMMAND_STRING:
+                    # Got a command, execute it
+                    self.handleCommandString(dgi)
         return Task.cont
 
     def sendSwapReady(self):
@@ -215,30 +187,18 @@ class ClusterServerSync(ClusterServer):
         self.cw.send(datagram, self.lastConnection)
 
     def startSwapCoordinator(self):
-        task = Task.Task(self.swapCoordinatorTask, 51)
-        taskMgr.add(task, "serverSwapCoordinator")
+        taskMgr.add(self.swapCoordinatorTask, "serverSwapCoordinator", 51)
         return None
 
     def swapCoordinatorTask(self, task):
         if self.posRecieved:
             self.posRecieved = 0
-            localClock = ClockObject()
-#            print "START send-------------------------------"
-            t1 = localClock.getRealTime()
             self.sendSwapReady()
-#            print "-----------START read--------------------"
-            t2 = localClock.getRealTime()
             datagram = self.msgHandler.blockingRead(self.qcr)
             (type,dgi) = self.msgHandler.readHeader(datagram)
             if type == CLUSTER_SWAP_NOW:
                 self.notify.debug('swapping')
-#                print "---------------------START SWAP----------"
-                t3 = localClock.getRealTime()
                 base.win.swap()
-                t4 = localClock.getRealTime()
-#                print "---------------------------------END SWAP"
-#                print "times=",t1,t2,t3,t4
-#                print "deltas=",t2-t1,t3-t2,t4-t3
             else:
                 self.notify.warning("did not get expected swap now")
         return Task.cont