DefaultServer.java 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469
  1. /*
  2. * Copyright (c) 2011 jMonkeyEngine
  3. * All rights reserved.
  4. *
  5. * Redistribution and use in source and binary forms, with or without
  6. * modification, are permitted provided that the following conditions are
  7. * met:
  8. *
  9. * * Redistributions of source code must retain the above copyright
  10. * notice, this list of conditions and the following disclaimer.
  11. *
  12. * * Redistributions in binary form must reproduce the above copyright
  13. * notice, this list of conditions and the following disclaimer in the
  14. * documentation and/or other materials provided with the distribution.
  15. *
  16. * * Neither the name of 'jMonkeyEngine' nor the names of its contributors
  17. * may be used to endorse or promote products derived from this software
  18. * without specific prior written permission.
  19. *
  20. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  21. * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
  22. * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
  23. * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
  24. * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
  25. * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
  26. * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
  27. * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
  28. * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
  29. * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
  30. * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  31. */
  32. package com.jme3.network.base;
  33. import java.io.IOException;
  34. import java.nio.ByteBuffer;
  35. import java.util.*;
  36. import java.util.concurrent.*;
  37. import java.util.concurrent.atomic.AtomicInteger;
  38. import java.util.logging.Level;
  39. import java.util.logging.Logger;
  40. import com.jme3.network.*;
  41. import com.jme3.network.kernel.*;
  42. import com.jme3.network.message.ClientRegistrationMessage; //hopefully temporary
  43. import com.jme3.network.message.DisconnectMessage; //hopefully temporary
  44. import com.jme3.network.serializing.Serializer;
  45. /**
  46. * A default implementation of the Server interface that delegates
  47. * its network connectivity to kernel.Kernel.
  48. *
  49. * @version $Revision$
  50. * @author Paul Speed
  51. */
  52. public class DefaultServer implements Server
  53. {
  54. static Logger log = Logger.getLogger(DefaultServer.class.getName());
  55. private boolean isRunning = false;
  56. private AtomicInteger nextId = new AtomicInteger(0);
  57. private String gameName;
  58. private int version;
  59. private Kernel reliable;
  60. private KernelAdapter reliableAdapter;
  61. private Kernel fast;
  62. private KernelAdapter fastAdapter;
  63. private Redispatch dispatcher = new Redispatch();
  64. private Map<Integer,HostedConnection> connections = new ConcurrentHashMap<Integer,HostedConnection>();
  65. private Map<Endpoint,HostedConnection> endpointConnections
  66. = new ConcurrentHashMap<Endpoint,HostedConnection>();
  67. // Keeps track of clients for whom we've only received the UDP
  68. // registration message
  69. private Map<Long,Connection> connecting = new ConcurrentHashMap<Long,Connection>();
  70. private MessageListenerRegistry<HostedConnection> messageListeners
  71. = new MessageListenerRegistry<HostedConnection>();
  72. private List<ConnectionListener> connectionListeners = new CopyOnWriteArrayList<ConnectionListener>();
  73. public DefaultServer( String gameName, int version, Kernel reliable, Kernel fast )
  74. {
  75. if( reliable == null )
  76. throw new IllegalArgumentException( "Default server reqiures a reliable kernel instance." );
  77. this.gameName = gameName;
  78. this.version = version;
  79. this.reliable = reliable;
  80. this.fast = fast;
  81. reliableAdapter = new KernelAdapter( this, reliable, dispatcher );
  82. if( fast != null ) {
  83. fastAdapter = new KernelAdapter( this, fast, dispatcher );
  84. }
  85. }
  86. public String getGameName()
  87. {
  88. return gameName;
  89. }
  90. public int getVersion()
  91. {
  92. return version;
  93. }
  94. public void start()
  95. {
  96. if( isRunning )
  97. throw new IllegalStateException( "Server is already started." );
  98. // Initialize the kernels
  99. reliable.initialize();
  100. if( fast != null ) {
  101. fast.initialize();
  102. }
  103. // Start em up
  104. reliableAdapter.start();
  105. if( fastAdapter != null ) {
  106. fastAdapter.start();
  107. }
  108. isRunning = true;
  109. }
  110. public boolean isRunning()
  111. {
  112. return isRunning;
  113. }
  114. public void close()
  115. {
  116. if( !isRunning )
  117. throw new IllegalStateException( "Server is not started." );
  118. try {
  119. // Kill the adpaters, they will kill the kernels
  120. if( fastAdapter != null ) {
  121. fastAdapter.close();
  122. }
  123. reliableAdapter.close();
  124. isRunning = false;
  125. } catch( InterruptedException e ) {
  126. throw new RuntimeException( "Interrupted while closing", e );
  127. }
  128. }
  129. public void broadcast( Message message )
  130. {
  131. broadcast( null, message );
  132. }
  133. public void broadcast( Filter<? super HostedConnection> filter, Message message )
  134. {
  135. ByteBuffer buffer = MessageProtocol.messageToBuffer(message, null);
  136. FilterAdapter adapter = filter == null ? null : new FilterAdapter(filter);
  137. // Ignore the filter for the moment
  138. if( message.isReliable() || fast == null ) {
  139. reliable.broadcast( adapter, buffer, true );
  140. } else {
  141. fast.broadcast( adapter, buffer, false );
  142. }
  143. }
  144. public HostedConnection getConnection( long id )
  145. {
  146. return connections.get(id);
  147. }
  148. public Collection<HostedConnection> getConnections()
  149. {
  150. return Collections.unmodifiableCollection((Collection<HostedConnection>)connections.values());
  151. }
  152. public void addConnectionListener( ConnectionListener listener )
  153. {
  154. connectionListeners.add(listener);
  155. }
  156. public void removeConnectionListener( ConnectionListener listener )
  157. {
  158. connectionListeners.remove(listener);
  159. }
  160. public void addMessageListener( MessageListener<? super HostedConnection> listener )
  161. {
  162. messageListeners.addMessageListener( listener );
  163. }
  164. public void addMessageListener( MessageListener<? super HostedConnection> listener, Class... classes )
  165. {
  166. messageListeners.addMessageListener( listener, classes );
  167. }
  168. public void removeMessageListener( MessageListener<? super HostedConnection> listener )
  169. {
  170. messageListeners.removeMessageListener( listener );
  171. }
  172. public void removeMessageListener( MessageListener<? super HostedConnection> listener, Class... classes )
  173. {
  174. messageListeners.removeMessageListener( listener, classes );
  175. }
  176. protected void dispatch( HostedConnection source, Message m )
  177. {
  178. if( source == null ) {
  179. messageListeners.messageReceived( source, m );
  180. } else {
  181. // A semi-heavy handed way to make sure the listener
  182. // doesn't get called at the same time from two different
  183. // threads for the same hosted connection.
  184. synchronized( source ) {
  185. messageListeners.messageReceived( source, m );
  186. }
  187. }
  188. }
  189. protected void fireConnectionAdded( HostedConnection conn )
  190. {
  191. for( ConnectionListener l : connectionListeners ) {
  192. l.connectionAdded( this, conn );
  193. }
  194. }
  195. protected void fireConnectionRemoved( HostedConnection conn )
  196. {
  197. for( ConnectionListener l : connectionListeners ) {
  198. l.connectionRemoved( this, conn );
  199. }
  200. }
  201. protected void registerClient( KernelAdapter ka, Endpoint p, ClientRegistrationMessage m )
  202. {
  203. Connection addedConnection = null;
  204. Connection bootedConnection = null;
  205. // generally this will only be called by one thread but it's
  206. // important enough I won't take chances
  207. synchronized( this ) {
  208. // Grab the random ID that the client created when creating
  209. // its two registration messages
  210. long tempId = m.getId();
  211. // See if we already have one
  212. Connection c = connecting.remove(tempId);
  213. if( c == null ) {
  214. c = new Connection();
  215. log.log( Level.FINE, "Registering client for endpoint, pass 1:{0}.", p );
  216. } else {
  217. log.log( Level.FINE, "Refining client registration for endpoint:{0}.", p );
  218. }
  219. // Fill in what we now know
  220. if( ka == fastAdapter ) {
  221. c.fast = p;
  222. if( c.reliable == null ) {
  223. // Tuck it away for later
  224. connecting.put(tempId, c);
  225. }
  226. } else {
  227. // It must be the reliable one
  228. c.reliable = p;
  229. // Validate the name and version which is only sent
  230. // over the reliable connection at this point.
  231. if( !getGameName().equals(m.getGameName())
  232. || getVersion() != m.getVersion() ) {
  233. log.log( Level.INFO, "Kicking client due to name/version mismatch:{0}.", c );
  234. // Need to kick them off... I may regret doing this from within
  235. // the sync block but the alternative is more code
  236. c.close( "Server client mismatch, server:" + getGameName() + " v" + getVersion()
  237. + " client:" + m.getGameName() + " v" + m.getVersion() );
  238. return;
  239. }
  240. if( c.fast == null && fastAdapter != null ) {
  241. // Still waiting for the fast connection to
  242. // register
  243. connecting.put(tempId, c);
  244. }
  245. }
  246. if( !connecting.containsKey(tempId) ) {
  247. // Then we are fully connected
  248. if( connections.put( c.getId(), c ) == null ) {
  249. if( c.fast != null ) {
  250. endpointConnections.put( c.fast, c );
  251. }
  252. endpointConnections.put( c.reliable, c );
  253. addedConnection = c;
  254. }
  255. }
  256. }
  257. // Best to do this outside of the synch block to avoid
  258. // over synchronizing which is the path to deadlocks
  259. if( addedConnection != null ) {
  260. log.log( Level.INFO, "Client registered:{0}.", addedConnection );
  261. // Now we can notify the listeners about the
  262. // new connection.
  263. fireConnectionAdded( addedConnection );
  264. // Send the ID back to the client letting it know it's
  265. // fully connected.
  266. m = new ClientRegistrationMessage();
  267. m.setId( addedConnection.getId() );
  268. m.setReliable(true);
  269. addedConnection.send(m);
  270. }
  271. }
  272. protected HostedConnection getConnection( Endpoint endpoint )
  273. {
  274. return endpointConnections.get(endpoint);
  275. }
  276. protected void connectionClosed( Endpoint p )
  277. {
  278. // Try to find the endpoint in all ways that it might
  279. // exist. Note: by this point the channel is closed
  280. // already.
  281. // Also note: this method will be called twice per
  282. // HostedConnection if it has two endpoints.
  283. Connection removed = null;
  284. synchronized( this ) {
  285. // Just in case the endpoint was still connecting
  286. connecting.values().remove(p);
  287. // And the regular management
  288. removed = (Connection)endpointConnections.remove(p);
  289. if( removed != null ) {
  290. connections.remove( removed.getId() );
  291. }
  292. }
  293. // Better not to fire events while we hold a lock
  294. // so always do this outside the synch block.
  295. if( removed != null ) {
  296. // Make sure both endpoints are closed. Note: reliable
  297. // should always already be closed through all paths that I
  298. // can conceive... but it doesn't hurt to be sure.
  299. if( removed.reliable != null && removed.reliable.isConnected() ) {
  300. removed.reliable.close();
  301. }
  302. if( removed.fast != null && removed.fast.isConnected() ) {
  303. removed.fast.close();
  304. }
  305. fireConnectionRemoved( removed );
  306. }
  307. }
  308. protected class Connection implements HostedConnection
  309. {
  310. private int id;
  311. private Endpoint reliable;
  312. private Endpoint fast;
  313. private Map<String,Object> sessionData = new ConcurrentHashMap<String,Object>();
  314. public Connection()
  315. {
  316. id = nextId.getAndIncrement();
  317. }
  318. public int getId()
  319. {
  320. return id;
  321. }
  322. public String getAddress()
  323. {
  324. return reliable == null ? null : reliable.getAddress();
  325. }
  326. public void send( Message message )
  327. {
  328. ByteBuffer buffer = MessageProtocol.messageToBuffer(message, null);
  329. if( message.isReliable() || fast == null ) {
  330. reliable.send( buffer );
  331. } else {
  332. fast.send( buffer );
  333. }
  334. }
  335. public void close( String reason )
  336. {
  337. // Send a reason
  338. DisconnectMessage m = new DisconnectMessage();
  339. m.setType( DisconnectMessage.KICK );
  340. m.setReason( reason );
  341. m.setReliable( true );
  342. send( m );
  343. // Note: without a way to flush the pending messages
  344. // during close, the above message may never
  345. // go out.
  346. // Just close the reliable endpoint
  347. // fast will be cleaned up as a side-effect
  348. if( reliable != null ) {
  349. reliable.close();
  350. }
  351. }
  352. public Object setAttribute( String name, Object value )
  353. {
  354. return sessionData.put(name, value);
  355. }
  356. public <T> T getAttribute( String name )
  357. {
  358. return (T)sessionData.get(name);
  359. }
  360. public Set<String> attributeNames()
  361. {
  362. return Collections.unmodifiableSet(sessionData.keySet());
  363. }
  364. public String toString()
  365. {
  366. return "Connection[ id=" + id + ", reliable=" + reliable + ", fast=" + fast + " ]";
  367. }
  368. }
  369. protected class Redispatch implements MessageListener<HostedConnection>
  370. {
  371. public void messageReceived( HostedConnection source, Message m )
  372. {
  373. dispatch( source, m );
  374. }
  375. }
  376. protected class FilterAdapter implements Filter<Endpoint>
  377. {
  378. private Filter<? super HostedConnection> delegate;
  379. public FilterAdapter( Filter<? super HostedConnection> delegate )
  380. {
  381. this.delegate = delegate;
  382. }
  383. public boolean apply( Endpoint input )
  384. {
  385. HostedConnection conn = getConnection( input );
  386. if( conn == null )
  387. return false;
  388. return delegate.apply(conn);
  389. }
  390. }
  391. }