DefaultServer.java 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592
  1. /*
  2. * Copyright (c) 2011 jMonkeyEngine
  3. * All rights reserved.
  4. *
  5. * Redistribution and use in source and binary forms, with or without
  6. * modification, are permitted provided that the following conditions are
  7. * met:
  8. *
  9. * * Redistributions of source code must retain the above copyright
  10. * notice, this list of conditions and the following disclaimer.
  11. *
  12. * * Redistributions in binary form must reproduce the above copyright
  13. * notice, this list of conditions and the following disclaimer in the
  14. * documentation and/or other materials provided with the distribution.
  15. *
  16. * * Neither the name of 'jMonkeyEngine' nor the names of its contributors
  17. * may be used to endorse or promote products derived from this software
  18. * without specific prior written permission.
  19. *
  20. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  21. * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
  22. * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
  23. * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
  24. * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
  25. * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
  26. * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
  27. * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
  28. * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
  29. * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
  30. * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  31. */
  32. package com.jme3.network.base;
  33. import com.jme3.network.*;
  34. import com.jme3.network.kernel.Endpoint;
  35. import com.jme3.network.kernel.Kernel;
  36. import com.jme3.network.message.ChannelInfoMessage;
  37. import com.jme3.network.message.ClientRegistrationMessage;
  38. import com.jme3.network.message.DisconnectMessage;
  39. import java.io.IOException;
  40. import java.nio.ByteBuffer;
  41. import java.util.*;
  42. import java.util.concurrent.ConcurrentHashMap;
  43. import java.util.concurrent.CopyOnWriteArrayList;
  44. import java.util.concurrent.atomic.AtomicInteger;
  45. import java.util.logging.Level;
  46. import java.util.logging.Logger;
  47. /**
  48. * A default implementation of the Server interface that delegates
  49. * its network connectivity to kernel.Kernel.
  50. *
  51. * @version $Revision$
  52. * @author Paul Speed
  53. */
  54. public class DefaultServer implements Server
  55. {
  56. static Logger log = Logger.getLogger(DefaultServer.class.getName());
  57. // First two channels are reserved for reliable and
  58. // unreliable
  59. private static final int CH_RELIABLE = 0;
  60. private static final int CH_UNRELIABLE = 1;
  61. private static final int CH_FIRST = 2;
  62. private boolean isRunning = false;
  63. private AtomicInteger nextId = new AtomicInteger(0);
  64. private String gameName;
  65. private int version;
  66. private KernelFactory kernelFactory = KernelFactory.DEFAULT;
  67. private KernelAdapter reliableAdapter;
  68. private KernelAdapter fastAdapter;
  69. private List<KernelAdapter> channels = new ArrayList<KernelAdapter>();
  70. private List<Integer> alternatePorts = new ArrayList<Integer>();
  71. private Redispatch dispatcher = new Redispatch();
  72. private Map<Integer,HostedConnection> connections = new ConcurrentHashMap<Integer,HostedConnection>();
  73. private Map<Endpoint,HostedConnection> endpointConnections
  74. = new ConcurrentHashMap<Endpoint,HostedConnection>();
  75. // Keeps track of clients for whom we've only received the UDP
  76. // registration message
  77. private Map<Long,Connection> connecting = new ConcurrentHashMap<Long,Connection>();
  78. private MessageListenerRegistry<HostedConnection> messageListeners
  79. = new MessageListenerRegistry<HostedConnection>();
  80. private List<ConnectionListener> connectionListeners = new CopyOnWriteArrayList<ConnectionListener>();
  81. public DefaultServer( String gameName, int version, Kernel reliable, Kernel fast )
  82. {
  83. if( reliable == null )
  84. throw new IllegalArgumentException( "Default server reqiures a reliable kernel instance." );
  85. this.gameName = gameName;
  86. this.version = version;
  87. reliableAdapter = new KernelAdapter( this, reliable, dispatcher, true );
  88. if( fast != null ) {
  89. fastAdapter = new KernelAdapter( this, fast, dispatcher, false );
  90. }
  91. channels.add( reliableAdapter );
  92. channels.add( fastAdapter );
  93. }
  94. public String getGameName()
  95. {
  96. return gameName;
  97. }
  98. public int getVersion()
  99. {
  100. return version;
  101. }
  102. public int addChannel( int port )
  103. {
  104. if( isRunning )
  105. throw new IllegalStateException( "Channels cannot be added once server is started." );
  106. // Note: it does bug me that channels aren't 100% universal and
  107. // setup externally but it requires a more invasive set of changes
  108. // for "connection types" and some kind of registry of kernel and
  109. // connector factories. This really would be the best approach and
  110. // would allow all kinds of channel customization maybe... but for
  111. // now, we hard-code the standard connections and treat the +2 extras
  112. // differently.
  113. // Check for consistency with the channels list
  114. if( channels.size() - CH_FIRST != alternatePorts.size() )
  115. throw new IllegalStateException( "Channel and port lists do not match." );
  116. try {
  117. int result = alternatePorts.size();
  118. alternatePorts.add(port);
  119. Kernel kernel = kernelFactory.createKernel(result, port);
  120. channels.add( new KernelAdapter(this, kernel, dispatcher, true) );
  121. return result;
  122. } catch( IOException e ) {
  123. throw new RuntimeException( "Error adding channel for port:" + port, e );
  124. }
  125. }
  126. protected void checkChannel( int channel )
  127. {
  128. if( channel < 0 || channel >= alternatePorts.size() )
  129. throw new IllegalArgumentException( "Channel is undefined:" + channel );
  130. }
  131. public void start()
  132. {
  133. if( isRunning )
  134. throw new IllegalStateException( "Server is already started." );
  135. // Initialize the kernels
  136. for( KernelAdapter ka : channels ) {
  137. ka.initialize();
  138. }
  139. // Start em up
  140. for( KernelAdapter ka : channels ) {
  141. ka.start();
  142. }
  143. isRunning = true;
  144. }
  145. public boolean isRunning()
  146. {
  147. return isRunning;
  148. }
  149. public void close()
  150. {
  151. if( !isRunning )
  152. throw new IllegalStateException( "Server is not started." );
  153. try {
  154. // Kill the adpaters, they will kill the kernels
  155. for( KernelAdapter ka : channels ) {
  156. ka.close();
  157. }
  158. isRunning = false;
  159. } catch( InterruptedException e ) {
  160. throw new RuntimeException( "Interrupted while closing", e );
  161. }
  162. }
  163. public void broadcast( Message message )
  164. {
  165. broadcast( null, message );
  166. }
  167. public void broadcast( Filter<? super HostedConnection> filter, Message message )
  168. {
  169. if( connections.isEmpty() )
  170. return;
  171. ByteBuffer buffer = MessageProtocol.messageToBuffer(message, null);
  172. FilterAdapter adapter = filter == null ? null : new FilterAdapter(filter);
  173. if( message.isReliable() || fastAdapter == null ) {
  174. // Don't need to copy the data because message protocol is already
  175. // giving us a fresh buffer
  176. reliableAdapter.broadcast( adapter, buffer, true, false );
  177. } else {
  178. fastAdapter.broadcast( adapter, buffer, false, false );
  179. }
  180. }
  181. public void broadcast( int channel, Filter<? super HostedConnection> filter, Message message )
  182. {
  183. if( connections.isEmpty() )
  184. return;
  185. checkChannel(channel);
  186. ByteBuffer buffer = MessageProtocol.messageToBuffer(message, null);
  187. FilterAdapter adapter = filter == null ? null : new FilterAdapter(filter);
  188. channels.get(channel+CH_FIRST).broadcast( adapter, buffer, true, false );
  189. }
  190. public HostedConnection getConnection( int id )
  191. {
  192. return connections.get(id);
  193. }
  194. public boolean hasConnections()
  195. {
  196. return !connections.isEmpty();
  197. }
  198. public Collection<HostedConnection> getConnections()
  199. {
  200. return Collections.unmodifiableCollection((Collection<HostedConnection>)connections.values());
  201. }
  202. public void addConnectionListener( ConnectionListener listener )
  203. {
  204. connectionListeners.add(listener);
  205. }
  206. public void removeConnectionListener( ConnectionListener listener )
  207. {
  208. connectionListeners.remove(listener);
  209. }
  210. public void addMessageListener( MessageListener<? super HostedConnection> listener )
  211. {
  212. messageListeners.addMessageListener( listener );
  213. }
  214. public void addMessageListener( MessageListener<? super HostedConnection> listener, Class... classes )
  215. {
  216. messageListeners.addMessageListener( listener, classes );
  217. }
  218. public void removeMessageListener( MessageListener<? super HostedConnection> listener )
  219. {
  220. messageListeners.removeMessageListener( listener );
  221. }
  222. public void removeMessageListener( MessageListener<? super HostedConnection> listener, Class... classes )
  223. {
  224. messageListeners.removeMessageListener( listener, classes );
  225. }
  226. protected void dispatch( HostedConnection source, Message m )
  227. {
  228. if( source == null ) {
  229. messageListeners.messageReceived( source, m );
  230. } else {
  231. // A semi-heavy handed way to make sure the listener
  232. // doesn't get called at the same time from two different
  233. // threads for the same hosted connection.
  234. synchronized( source ) {
  235. messageListeners.messageReceived( source, m );
  236. }
  237. }
  238. }
  239. protected void fireConnectionAdded( HostedConnection conn )
  240. {
  241. for( ConnectionListener l : connectionListeners ) {
  242. l.connectionAdded( this, conn );
  243. }
  244. }
  245. protected void fireConnectionRemoved( HostedConnection conn )
  246. {
  247. for( ConnectionListener l : connectionListeners ) {
  248. l.connectionRemoved( this, conn );
  249. }
  250. }
  251. protected int getChannel( KernelAdapter ka )
  252. {
  253. return channels.indexOf(ka);
  254. }
  255. protected void registerClient( KernelAdapter ka, Endpoint p, ClientRegistrationMessage m )
  256. {
  257. Connection addedConnection = null;
  258. // generally this will only be called by one thread but it's
  259. // important enough I won't take chances
  260. synchronized( this ) {
  261. // Grab the random ID that the client created when creating
  262. // its two registration messages
  263. long tempId = m.getId();
  264. // See if we already have one
  265. Connection c = connecting.remove(tempId);
  266. if( c == null ) {
  267. c = new Connection(channels.size());
  268. log.log( Level.FINE, "Registering client for endpoint, pass 1:{0}.", p );
  269. } else {
  270. log.log( Level.FINE, "Refining client registration for endpoint:{0}.", p );
  271. }
  272. // Fill in what we now know
  273. int channel = getChannel(ka);
  274. c.setChannel(channel, p);
  275. log.log( Level.FINE, "Setting up channel:{0}", channel );
  276. // If it's channel 0 then this is the initial connection
  277. // and we will send the connection information
  278. if( channel == CH_RELIABLE ) {
  279. // Validate the name and version which is only sent
  280. // over the reliable connection at this point.
  281. if( !getGameName().equals(m.getGameName())
  282. || getVersion() != m.getVersion() ) {
  283. log.log( Level.INFO, "Kicking client due to name/version mismatch:{0}.", c );
  284. // Need to kick them off... I may regret doing this from within
  285. // the sync block but the alternative is more code
  286. c.close( "Server client mismatch, server:" + getGameName() + " v" + getVersion()
  287. + " client:" + m.getGameName() + " v" + m.getVersion() );
  288. return;
  289. }
  290. // Else send the extra channel information to the client
  291. if( !alternatePorts.isEmpty() ) {
  292. ChannelInfoMessage cim = new ChannelInfoMessage( m.getId(), alternatePorts );
  293. c.send(cim);
  294. }
  295. }
  296. if( c.isComplete() ) {
  297. // Then we are fully connected
  298. if( connections.put( c.getId(), c ) == null ) {
  299. for( Endpoint cp : c.channels ) {
  300. if( cp == null )
  301. continue;
  302. endpointConnections.put( cp, c );
  303. }
  304. addedConnection = c;
  305. }
  306. } else {
  307. // Need to keep getting channels so we'll keep it in
  308. // the map
  309. connecting.put(tempId, c);
  310. }
  311. }
  312. // Best to do this outside of the synch block to avoid
  313. // over synchronizing which is the path to deadlocks
  314. if( addedConnection != null ) {
  315. log.log( Level.INFO, "Client registered:{0}.", addedConnection );
  316. // Send the ID back to the client letting it know it's
  317. // fully connected.
  318. m = new ClientRegistrationMessage();
  319. m.setId( addedConnection.getId() );
  320. m.setReliable(true);
  321. addedConnection.send(m);
  322. // Now we can notify the listeners about the
  323. // new connection.
  324. fireConnectionAdded( addedConnection );
  325. }
  326. }
  327. protected HostedConnection getConnection( Endpoint endpoint )
  328. {
  329. return endpointConnections.get(endpoint);
  330. }
  331. protected void connectionClosed( Endpoint p )
  332. {
  333. if( p.isConnected() ) {
  334. log.log( Level.INFO, "Connection closed:{0}.", p );
  335. } else {
  336. log.log( Level.FINE, "Connection closed:{0}.", p );
  337. }
  338. // Try to find the endpoint in all ways that it might
  339. // exist. Note: by this point the raw network channel is
  340. // closed already.
  341. // Also note: this method will be called multiple times per
  342. // HostedConnection if it has multiple endpoints.
  343. Connection removed = null;
  344. synchronized( this ) {
  345. // Just in case the endpoint was still connecting
  346. connecting.values().remove(p);
  347. // And the regular management
  348. removed = (Connection)endpointConnections.remove(p);
  349. if( removed != null ) {
  350. connections.remove( removed.getId() );
  351. }
  352. log.log( Level.FINE, "Connections size:{0}", connections.size() );
  353. log.log( Level.FINE, "Endpoint mappings size:{0}", endpointConnections.size() );
  354. }
  355. // Better not to fire events while we hold a lock
  356. // so always do this outside the synch block.
  357. // Note: checking removed.closed just to avoid spurious log messages
  358. // since in general we are called back for every endpoint closing.
  359. if( removed != null && !removed.closed ) {
  360. log.log( Level.INFO, "Client closed:{0}.", removed );
  361. removed.closeConnection();
  362. }
  363. }
  364. protected class Connection implements HostedConnection
  365. {
  366. private int id;
  367. private boolean closed;
  368. private Endpoint[] channels;
  369. private int setChannelCount = 0;
  370. private Map<String,Object> sessionData = new ConcurrentHashMap<String,Object>();
  371. public Connection( int channelCount )
  372. {
  373. id = nextId.getAndIncrement();
  374. channels = new Endpoint[channelCount];
  375. }
  376. void setChannel( int channel, Endpoint p )
  377. {
  378. if( channels[channel] != null && channels[channel] != p ) {
  379. throw new RuntimeException( "Channel has already been set:" + channel
  380. + " = " + channels[channel] + ", cannot be set to:" + p );
  381. }
  382. channels[channel] = p;
  383. if( p != null )
  384. setChannelCount++;
  385. }
  386. boolean isComplete()
  387. {
  388. return setChannelCount == channels.length;
  389. }
  390. public Server getServer()
  391. {
  392. return DefaultServer.this;
  393. }
  394. public int getId()
  395. {
  396. return id;
  397. }
  398. public String getAddress()
  399. {
  400. return channels[CH_RELIABLE] == null ? null : channels[CH_RELIABLE].getAddress();
  401. }
  402. public void send( Message message )
  403. {
  404. ByteBuffer buffer = MessageProtocol.messageToBuffer(message, null);
  405. if( message.isReliable() || channels[CH_UNRELIABLE] == null ) {
  406. channels[CH_RELIABLE].send( buffer );
  407. } else {
  408. channels[CH_UNRELIABLE].send( buffer );
  409. }
  410. }
  411. public void send( int channel, Message message )
  412. {
  413. checkChannel(channel);
  414. ByteBuffer buffer = MessageProtocol.messageToBuffer(message, null);
  415. channels[channel+CH_FIRST].send(buffer);
  416. }
  417. protected void closeConnection()
  418. {
  419. if( closed )
  420. return;
  421. closed = true;
  422. // Make sure all endpoints are closed. Note: reliable
  423. // should always already be closed through all paths that I
  424. // can conceive... but it doesn't hurt to be sure.
  425. for( Endpoint p : channels ) {
  426. if( p == null )
  427. continue;
  428. p.close();
  429. }
  430. fireConnectionRemoved( this );
  431. }
  432. public void close( String reason )
  433. {
  434. // Send a reason
  435. DisconnectMessage m = new DisconnectMessage();
  436. m.setType( DisconnectMessage.KICK );
  437. m.setReason( reason );
  438. m.setReliable( true );
  439. send( m );
  440. // Just close the reliable endpoint
  441. // fast will be cleaned up as a side-effect
  442. // when closeConnection() is called by the
  443. // connectionClosed() endpoint callback.
  444. if( channels[CH_RELIABLE] != null ) {
  445. // Close with flush so we make sure our
  446. // message gets out
  447. channels[CH_RELIABLE].close(true);
  448. }
  449. }
  450. public Object setAttribute( String name, Object value )
  451. {
  452. if( value == null )
  453. return sessionData.remove(name);
  454. return sessionData.put(name, value);
  455. }
  456. @SuppressWarnings("unchecked")
  457. public <T> T getAttribute( String name )
  458. {
  459. return (T)sessionData.get(name);
  460. }
  461. public Set<String> attributeNames()
  462. {
  463. return Collections.unmodifiableSet(sessionData.keySet());
  464. }
  465. public String toString()
  466. {
  467. return "Connection[ id=" + id + ", reliable=" + channels[CH_RELIABLE]
  468. + ", fast=" + channels[CH_UNRELIABLE] + " ]";
  469. }
  470. }
  471. protected class Redispatch implements MessageListener<HostedConnection>
  472. {
  473. public void messageReceived( HostedConnection source, Message m )
  474. {
  475. dispatch( source, m );
  476. }
  477. }
  478. protected class FilterAdapter implements Filter<Endpoint>
  479. {
  480. private Filter<? super HostedConnection> delegate;
  481. public FilterAdapter( Filter<? super HostedConnection> delegate )
  482. {
  483. this.delegate = delegate;
  484. }
  485. public boolean apply( Endpoint input )
  486. {
  487. HostedConnection conn = getConnection( input );
  488. if( conn == null )
  489. return false;
  490. return delegate.apply(conn);
  491. }
  492. }
  493. }