KernelAdapter.java 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190
  1. /*
  2. * Copyright (c) 2011 jMonkeyEngine
  3. * All rights reserved.
  4. *
  5. * Redistribution and use in source and binary forms, with or without
  6. * modification, are permitted provided that the following conditions are
  7. * met:
  8. *
  9. * * Redistributions of source code must retain the above copyright
  10. * notice, this list of conditions and the following disclaimer.
  11. *
  12. * * Redistributions in binary form must reproduce the above copyright
  13. * notice, this list of conditions and the following disclaimer in the
  14. * documentation and/or other materials provided with the distribution.
  15. *
  16. * * Neither the name of 'jMonkeyEngine' nor the names of its contributors
  17. * may be used to endorse or promote products derived from this software
  18. * without specific prior written permission.
  19. *
  20. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  21. * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
  22. * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
  23. * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
  24. * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
  25. * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
  26. * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
  27. * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
  28. * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
  29. * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
  30. * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  31. */
  32. package com.jme3.network.base;
  33. import java.io.IOException;
  34. import java.nio.ByteBuffer;
  35. import java.util.concurrent.atomic.AtomicBoolean;
  36. import com.jme3.network.*;
  37. import com.jme3.network.kernel.Endpoint;
  38. import com.jme3.network.kernel.EndpointEvent;
  39. import com.jme3.network.kernel.Envelope;
  40. import com.jme3.network.kernel.Kernel;
  41. import com.jme3.network.message.ClientRegistrationMessage; //hopefully temporary
  42. import com.jme3.network.serializing.Serializer;
  43. /**
  44. * Wraps a single Kernel and forwards new messages
  45. * to the supplied message dispatcher and new endpoint
  46. * events to the connection dispatcher. This is used
  47. * by DefaultServer to manage its kernel objects.
  48. *
  49. * <p>This adapter assumes a simple protocol where two
  50. * bytes define a (short) object size with the object data
  51. * to follow. Note: this limits the size of serialized
  52. * objects to 32676 bytes... even though, for example,
  53. * datagram packets can hold twice that. :P</p>
  54. *
  55. * @version $Revision$
  56. * @author Paul Speed
  57. */
  58. public class KernelAdapter extends Thread
  59. {
  60. private DefaultServer server; // this is unfortunate
  61. private Kernel kernel;
  62. private MessageListener messageDispatcher;
  63. private AtomicBoolean go = new AtomicBoolean(true);
  64. public KernelAdapter( DefaultServer server, Kernel kernel, MessageListener messageDispatcher )
  65. {
  66. super( String.valueOf(kernel) );
  67. this.server = server;
  68. this.kernel = kernel;
  69. this.messageDispatcher = messageDispatcher;
  70. setDaemon(true);
  71. }
  72. public void close() throws InterruptedException
  73. {
  74. go.set(false);
  75. // Kill the kernel
  76. kernel.terminate();
  77. }
  78. protected HostedConnection getConnection( Endpoint p )
  79. {
  80. return server.getConnection(p);
  81. }
  82. protected void connectionClosed( Endpoint p )
  83. {
  84. server.connectionClosed(p);
  85. }
  86. /**
  87. * Note on threading for those writing their own server
  88. * or adapter implementations. The rule that a single connection be
  89. * processed by only one thread at a time is more about ensuring that
  90. * the messages are delivered in the order that they are received
  91. * than for any user-code safety. 99% of the time the user code should
  92. * be writing for multithreaded access anyway.
  93. *
  94. * <p>The issue with the messages is that if a an implementation is
  95. * using a general thread pool then it would be possible for a
  96. * naive implementation to have one thread grab an Envelope from
  97. * connection 1's and another grab the next Envelope. Since an Envelope
  98. * may contain several messages, delivering the second thread's messages
  99. * before or during the first's would be really confusing and hard
  100. * to code for in user code.</p>
  101. *
  102. * <p>And that's why this note is here. DefaultServer does a rudimentary
  103. * per-connection locking but it couldn't possibly guard against
  104. * out of order Envelope processing.</p>
  105. */
  106. protected void dispatch( Endpoint p, Message m )
  107. {
  108. // Because this class is the only one with the information
  109. // to do it... we need to pull of the registration message
  110. // here.
  111. if( m instanceof ClientRegistrationMessage ) {
  112. server.registerClient( this, p, (ClientRegistrationMessage)m );
  113. return;
  114. }
  115. HostedConnection source = getConnection(p);
  116. messageDispatcher.messageReceived( source, m );
  117. }
  118. protected void createAndDispatch( Envelope env )
  119. {
  120. MessageProtocol protocol = new MessageProtocol();
  121. byte[] data = env.getData();
  122. ByteBuffer buffer = ByteBuffer.wrap(data);
  123. protocol.addBuffer( buffer );
  124. // Should be complete... and maybe we should check but we don't
  125. Message m = null;
  126. while( (m = protocol.getMessage()) != null ) {
  127. dispatch( env.getSource(), m );
  128. }
  129. }
  130. protected void createAndDispatch( EndpointEvent event )
  131. {
  132. // Only need to tell the server about disconnects
  133. if( event.getType() == EndpointEvent.Type.REMOVE ) {
  134. connectionClosed( event.getEndpoint() );
  135. }
  136. }
  137. protected void flushEvents()
  138. {
  139. EndpointEvent event;
  140. while( (event = kernel.nextEvent()) != null )
  141. {
  142. createAndDispatch( event );
  143. }
  144. }
  145. public void run()
  146. {
  147. while( go.get() ) {
  148. try {
  149. // Check for pending events
  150. flushEvents();
  151. // Grab the next envelope
  152. Envelope e = kernel.read();
  153. // Check for pending events that might have
  154. // come in while we were blocking. This is usually
  155. // when the connection add events come through
  156. flushEvents();
  157. createAndDispatch( e );
  158. } catch( InterruptedException ex ) {
  159. if( !go.get() )
  160. return;
  161. throw new RuntimeException( "Unexpected interruption", ex );
  162. }
  163. }
  164. }
  165. }