2
0
Эх сурвалжийг харах

Centralize the logic for accumulating and converting
non-aligned ByteBuffers into messages and messages into
ByteBuffers.


git-svn-id: https://jmonkeyengine.googlecode.com/svn/trunk@7013 75d07b2b-3a1a-0410-a2c5-0572b91ccdca

PSp..om 14 жил өмнө
parent
commit
5adecf2aa3

+ 10 - 49
engine/src/networking/com/jme3/network/base/ConnectorAdapter.java

@@ -79,63 +79,24 @@ public class ConnectorAdapter extends Thread
         connector.close();
     }
  
-    protected void createAndDispatch( ByteBuffer buffer )
+    protected void dispatch( Message m )
     {
-        try {
-            Object obj = Serializer.readClassAndObject( buffer );
-            Message m = (Message)obj;
-            dispatcher.messageReceived( null, m );                        
-        } catch( IOException e ) {
-            throw new RuntimeException( "Error deserializing object", e );   
-        }         
+        dispatcher.messageReceived( null, m );                        
     }
  
     public void run()
     {
-        ByteBuffer current = null;
-        int size = 0;
-    
+        MessageProtocol protocol = new MessageProtocol();
+        
         while( go.get() ) {
             ByteBuffer buffer = connector.read();
             
-            // push the data from the buffer into as
-            // many messages as we can
-            while( buffer.remaining() > 0 ) {
-                
-                if( current == null ) {
-                    // We are not currently reading an object so
-                    // grab the size.
-                    // Note: this is somewhat limiting... int would
-                    // be better.
-                    size = buffer.getShort();
-                    current = ByteBuffer.allocate(size);
-                }
-
-                if( current.remaining() <= buffer.remaining() ) {
-                    // We have at least one complete object so
-                    // copy what we can into current, create a message,
-                    // and then continue pulling from buffer.
-                    
-                    // Artificially set the limit so we don't overflow
-                    int extra = buffer.remaining() - current.remaining();
-                    buffer.limit( buffer.position() + current.remaining() );
- 
-                    // Now copy the data                   
-                    current.put( buffer );
-                    current.flip();
-                    
-                    // Now set the limit back to a good value
-                    buffer.limit( buffer.position() + extra );
- 
-                    createAndDispatch( current );
- 
-                    current = null;                    
-                } else {
-                
-                    // Not yet a complete object so just copy what we have
-                    current.put( buffer ); 
-                }            
-            }            
+            protocol.addBuffer( buffer );
+            
+            Message m = null;
+            while( (m = protocol.getMessage()) != null ) {
+                dispatch( m );
+            }
         }
     }
         

+ 1 - 19
engine/src/networking/com/jme3/network/base/DefaultClient.java

@@ -131,24 +131,6 @@ public class DefaultClient implements Client
         return id;
     }     
  
-    protected ByteBuffer messageToBuffer( Message message )
-    {
-        ByteBuffer buffer = ByteBuffer.allocate( 32767 + 2 );
-        
-        try {
-            buffer.position( 2 );
-            Serializer.writeClassAndObject( buffer, message );
-            buffer.flip();
-            short dataLength = (short)(buffer.remaining() - 2);
-            buffer.putShort( dataLength );
-            buffer.position( 0 );
-            
-            return buffer;
-        } catch( IOException e ) {
-            throw new RuntimeException( "Error serializing message", e );
-        }
-    }
- 
     public void send( Message message )
     {
         checkRunning();
@@ -158,7 +140,7 @@ public class DefaultClient implements Client
         // be called from multiple threads.  If writing
         // is queued into its own thread then that could
         // be shared.
-        ByteBuffer buffer = messageToBuffer(message);
+        ByteBuffer buffer = MessageProtocol.messageToBuffer(message, null);
         if( message.isReliable() || fast == null ) {
             if( reliable == null )
                 throw new RuntimeException( "No reliable connector configured" );

+ 2 - 20
engine/src/networking/com/jme3/network/base/DefaultServer.java

@@ -133,24 +133,6 @@ public class DefaultServer implements Server
         }                               
     }
  
-    protected ByteBuffer messageToBuffer( Message message )
-    {
-        ByteBuffer buffer = ByteBuffer.allocate( 32767 + 2 );
-        
-        try {
-            buffer.position( 2 );
-            Serializer.writeClassAndObject( buffer, message );
-            buffer.flip();
-            short dataLength = (short)(buffer.remaining() - 2);
-            buffer.putShort( dataLength );
-            buffer.position( 0 );
-            
-            return buffer;
-        } catch( IOException e ) {
-            throw new RuntimeException( "Error serializing message", e );
-        }
-    }
-
     public void broadcast( Message message )
     {
         broadcast( null, message );
@@ -158,7 +140,7 @@ public class DefaultServer implements Server
 
     public void broadcast( Object filter, Message message )
     {
-        ByteBuffer buffer = messageToBuffer(message);
+        ByteBuffer buffer = MessageProtocol.messageToBuffer(message, null);
         
         // Ignore the filter for the moment
         if( message.isReliable() || fast == null ) {
@@ -359,7 +341,7 @@ public class DefaultServer implements Server
         
         public void send( Message message )
         {
-            ByteBuffer buffer = messageToBuffer(message);
+            ByteBuffer buffer = MessageProtocol.messageToBuffer(message, null);
             if( message.isReliable() || fast == null ) {
                 reliable.send( buffer );
             } else {

+ 19 - 59
engine/src/networking/com/jme3/network/base/KernelAdapter.java

@@ -93,74 +93,34 @@ public class KernelAdapter extends Thread
         server.connectionClosed(p);
     }
  
-    protected void createAndDispatch( Endpoint p, ByteBuffer buffer )
+    protected void dispatch( Endpoint p, Message m )
     {
-        try {
-            Object obj = Serializer.readClassAndObject( buffer );
-            Message m = (Message)obj;
- 
-            // Because this class is the only one with the information
-            // to do it... we need to pull of the registration message
-            // here.
-            if( m instanceof ClientRegistrationMessage ) {
-                server.registerClient( this, p, (ClientRegistrationMessage)m );
-                return;           
-            }                
+        // Because this class is the only one with the information
+        // to do it... we need to pull of the registration message
+        // here.
+        if( m instanceof ClientRegistrationMessage ) {
+            server.registerClient( this, p, (ClientRegistrationMessage)m );
+            return;           
+        }                
             
-            HostedConnection source = getConnection(p);
-            messageDispatcher.messageReceived( source, m );
-        } catch( IOException e ) {
-            throw new RuntimeException( "Error deserializing object", e );   
-        }         
+        HostedConnection source = getConnection(p);
+        messageDispatcher.messageReceived( source, m );
     }
 
     protected void createAndDispatch( Envelope env )
     {
+        MessageProtocol protocol = new MessageProtocol();
+    
         byte[] data = env.getData();
         ByteBuffer buffer = ByteBuffer.wrap(data);
+
+        protocol.addBuffer( buffer );
         
-        ByteBuffer current = null;
-        int size = 0;
-        
-        // push the data from the buffer into as
-        // many messages as we can
-        while( buffer.remaining() > 0 ) {
-                
-            if( current == null ) {
-                // We are not currently reading an object so
-                // grab the size.
-                // Note: this is somewhat limiting... int would
-                // be better.
-                size = buffer.getShort();
-                current = ByteBuffer.allocate(size);
-            }
-                
-            if( current.remaining() <= buffer.remaining() ) {
-                // We have at least one complete object so
-                // copy what we can into current, create a message,
-                // and then continue pulling from buffer.
-                    
-                // Artificially set the limit so we don't overflow
-                int extra = buffer.remaining() - current.remaining();
-                buffer.limit( buffer.position() + current.remaining() );
- 
-                // Now copy the data                   
-                current.put( buffer );
-                current.flip();                
-                    
-                // Now set the limit back to a good value
-                buffer.limit( buffer.position() + extra );
- 
-                createAndDispatch( env.getSource(), current );
- 
-                current = null;                    
-            } else {
-                
-                // Not yet a complete object so just copy what we have
-                current.put( buffer ); 
-            }            
-        }            
-            
+        // Should be complete... and maybe we should check but we don't
+        Message m = null;
+        while( (m = protocol.getMessage()) != null ) {
+            dispatch( env.getSource(), m );
+        }
     } 
 
     protected void createAndDispatch( EndpointEvent event )

+ 167 - 0
engine/src/networking/com/jme3/network/base/MessageProtocol.java

@@ -0,0 +1,167 @@
+/*
+ * Copyright (c) 2011 jMonkeyEngine
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ *   notice, this list of conditions and the following disclaimer.
+ *
+ * * Redistributions in binary form must reproduce the above copyright
+ *   notice, this list of conditions and the following disclaimer in the
+ *   documentation and/or other materials provided with the distribution.
+ *
+ * * Neither the name of 'jMonkeyEngine' nor the names of its contributors
+ *   may be used to endorse or promote products derived from this software
+ *   without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+ * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package com.jme3.network.base;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import com.jme3.network.Message;
+import com.jme3.network.serializing.Serializer;
+
+/**
+ *  Consolidates the conversion of messages to/from byte buffers
+ *  and provides a rolling message buffer.  ByteBuffers can be
+ *  pushed in and messages will be extracted, accumulated, and 
+ *  available for retrieval.  This is not thread safe and is meant
+ *  to be used within a single message processing thread.
+ *
+ *  <p>The protocol is based on a simple length + data format
+ *  where two bytes represent the (short) length of the data
+ *  and the rest is the raw data for the Serializers class.</p>
+ *
+ *  @version   $Revision$
+ *  @author    Paul Speed
+ */
+public class MessageProtocol
+{
+    private LinkedList<Message> messages = new LinkedList<Message>();
+    private ByteBuffer current;
+    private int size;
+ 
+    /**
+     *  Converts a message to a ByteBuffer using the Serializer
+     *  and the (short length) + data protocol.  If target is null
+     *  then a 32k byte buffer will be created and filled.
+     */
+    public static ByteBuffer messageToBuffer( Message message, ByteBuffer target )
+    {
+        // Could let the caller pass their own in       
+        ByteBuffer buffer = target == null ? ByteBuffer.allocate( 32767 + 2 ) : target;
+        
+        try {
+            buffer.position( 2 );
+            Serializer.writeClassAndObject( buffer, message );
+            buffer.flip();
+            short dataLength = (short)(buffer.remaining() - 2);
+            buffer.putShort( dataLength );
+            buffer.position( 0 );
+            
+            return buffer;
+        } catch( IOException e ) {
+            throw new RuntimeException( "Error serializing message", e );
+        }
+    }
+ 
+    /**
+     *  Retrieves and removes an extracted message from the accumulated buffer
+     *  or returns null if there are no more messages.
+     */
+    public Message getMessage()
+    {
+        if( messages.isEmpty() ) {
+            return null;
+        }
+        
+        return messages.removeFirst();
+    }     
+   
+    /**
+     *  Adds the specified buffer, extracting the contained messages 
+     *  and making them available to getMessage().  The left over
+     *  data is buffered to be combined with future data.
+     &
+     *  @return The total number of queued messages after this call.       
+     */
+    public int addBuffer( ByteBuffer buffer )
+    {
+        // push the data from the buffer into as
+        // many messages as we can
+        while( buffer.remaining() > 0 ) {
+                
+            if( current == null ) {
+                // We are not currently reading an object so
+                // grab the size.
+                // Note: this is somewhat limiting... int would
+                // be better.
+                size = buffer.getShort();
+                current = ByteBuffer.allocate(size);
+            }
+
+            if( current.remaining() <= buffer.remaining() ) {
+                // We have at least one complete object so
+                // copy what we can into current, create a message,
+                // and then continue pulling from buffer.
+                    
+                // Artificially set the limit so we don't overflow
+                int extra = buffer.remaining() - current.remaining();
+                buffer.limit( buffer.position() + current.remaining() );
+ 
+                // Now copy the data                   
+                current.put( buffer );
+                current.flip();
+                    
+                // Now set the limit back to a good value
+                buffer.limit( buffer.position() + extra );
+ 
+                createMessage( current );
+ 
+                current = null;                    
+            } else {
+                
+                // Not yet a complete object so just copy what we have
+                current.put( buffer ); 
+            }            
+        }            
+        
+        return messages.size();        
+    }
+ 
+    /**
+     *  Creates a message from the properly sized byte buffer
+     *  and adds it to the messages queue.
+     */   
+    protected void createMessage( ByteBuffer buffer )
+    {
+        try {
+            Object obj = Serializer.readClassAndObject( buffer );
+            Message m = (Message)obj;
+            messages.add(m);
+        } catch( IOException e ) {
+            throw new RuntimeException( "Error deserializing object", e );   
+        }         
+    }
+}
+
+
+