DefaultServer.java 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586
  1. /*
  2. * Copyright (c) 2011 jMonkeyEngine
  3. * All rights reserved.
  4. *
  5. * Redistribution and use in source and binary forms, with or without
  6. * modification, are permitted provided that the following conditions are
  7. * met:
  8. *
  9. * * Redistributions of source code must retain the above copyright
  10. * notice, this list of conditions and the following disclaimer.
  11. *
  12. * * Redistributions in binary form must reproduce the above copyright
  13. * notice, this list of conditions and the following disclaimer in the
  14. * documentation and/or other materials provided with the distribution.
  15. *
  16. * * Neither the name of 'jMonkeyEngine' nor the names of its contributors
  17. * may be used to endorse or promote products derived from this software
  18. * without specific prior written permission.
  19. *
  20. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  21. * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
  22. * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
  23. * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
  24. * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
  25. * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
  26. * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
  27. * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
  28. * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
  29. * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
  30. * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  31. */
  32. package com.jme3.network.base;
  33. import com.jme3.network.*;
  34. import com.jme3.network.kernel.Endpoint;
  35. import com.jme3.network.kernel.Kernel;
  36. import com.jme3.network.message.ChannelInfoMessage;
  37. import com.jme3.network.message.ClientRegistrationMessage;
  38. import com.jme3.network.message.DisconnectMessage;
  39. import java.io.IOException;
  40. import java.nio.ByteBuffer;
  41. import java.util.*;
  42. import java.util.concurrent.ConcurrentHashMap;
  43. import java.util.concurrent.CopyOnWriteArrayList;
  44. import java.util.concurrent.atomic.AtomicInteger;
  45. import java.util.logging.Level;
  46. import java.util.logging.Logger;
  47. /**
  48. * A default implementation of the Server interface that delegates
  49. * its network connectivity to kernel.Kernel.
  50. *
  51. * @version $Revision$
  52. * @author Paul Speed
  53. */
  54. public class DefaultServer implements Server
  55. {
  56. static Logger log = Logger.getLogger(DefaultServer.class.getName());
  57. // First two channels are reserved for reliable and
  58. // unreliable
  59. private static final int CH_RELIABLE = 0;
  60. private static final int CH_UNRELIABLE = 1;
  61. private static final int CH_FIRST = 2;
  62. private boolean isRunning = false;
  63. private AtomicInteger nextId = new AtomicInteger(0);
  64. private String gameName;
  65. private int version;
  66. private KernelFactory kernelFactory = KernelFactory.DEFAULT;
  67. private KernelAdapter reliableAdapter;
  68. private KernelAdapter fastAdapter;
  69. private List<KernelAdapter> channels = new ArrayList<KernelAdapter>();
  70. private List<Integer> alternatePorts = new ArrayList<Integer>();
  71. private Redispatch dispatcher = new Redispatch();
  72. private Map<Integer,HostedConnection> connections = new ConcurrentHashMap<Integer,HostedConnection>();
  73. private Map<Endpoint,HostedConnection> endpointConnections
  74. = new ConcurrentHashMap<Endpoint,HostedConnection>();
  75. // Keeps track of clients for whom we've only received the UDP
  76. // registration message
  77. private Map<Long,Connection> connecting = new ConcurrentHashMap<Long,Connection>();
  78. private MessageListenerRegistry<HostedConnection> messageListeners
  79. = new MessageListenerRegistry<HostedConnection>();
  80. private List<ConnectionListener> connectionListeners = new CopyOnWriteArrayList<ConnectionListener>();
  81. public DefaultServer( String gameName, int version, Kernel reliable, Kernel fast )
  82. {
  83. if( reliable == null )
  84. throw new IllegalArgumentException( "Default server reqiures a reliable kernel instance." );
  85. this.gameName = gameName;
  86. this.version = version;
  87. reliableAdapter = new KernelAdapter( this, reliable, dispatcher, true );
  88. if( fast != null ) {
  89. fastAdapter = new KernelAdapter( this, fast, dispatcher, false );
  90. }
  91. channels.add( reliableAdapter );
  92. channels.add( fastAdapter );
  93. }
  94. public String getGameName()
  95. {
  96. return gameName;
  97. }
  98. public int getVersion()
  99. {
  100. return version;
  101. }
  102. public int addChannel( int port )
  103. {
  104. if( isRunning )
  105. throw new IllegalStateException( "Channels cannot be added once server is started." );
  106. // Note: it does bug me that channels aren't 100% universal and
  107. // setup externally but it requires a more invasive set of changes
  108. // for "connection types" and some kind of registry of kernel and
  109. // connector factories. This really would be the best approach and
  110. // would allow all kinds of channel customization maybe... but for
  111. // now, we hard-code the standard connections and treat the +2 extras
  112. // differently.
  113. // Check for consistency with the channels list
  114. if( channels.size() - CH_FIRST != alternatePorts.size() )
  115. throw new IllegalStateException( "Channel and port lists do not match." );
  116. try {
  117. int result = alternatePorts.size();
  118. alternatePorts.add(port);
  119. Kernel kernel = kernelFactory.createKernel(result, port);
  120. channels.add( new KernelAdapter(this, kernel, dispatcher, true) );
  121. return result;
  122. } catch( IOException e ) {
  123. throw new RuntimeException( "Error adding channel for port:" + port, e );
  124. }
  125. }
  126. protected void checkChannel( int channel )
  127. {
  128. if( channel < 0 || channel >= alternatePorts.size() )
  129. throw new IllegalArgumentException( "Channel is undefined:" + channel );
  130. }
  131. public void start()
  132. {
  133. if( isRunning )
  134. throw new IllegalStateException( "Server is already started." );
  135. // Initialize the kernels
  136. for( KernelAdapter ka : channels ) {
  137. ka.initialize();
  138. }
  139. // Start em up
  140. for( KernelAdapter ka : channels ) {
  141. ka.start();
  142. }
  143. isRunning = true;
  144. }
  145. public boolean isRunning()
  146. {
  147. return isRunning;
  148. }
  149. public void close()
  150. {
  151. if( !isRunning )
  152. throw new IllegalStateException( "Server is not started." );
  153. try {
  154. // Kill the adpaters, they will kill the kernels
  155. for( KernelAdapter ka : channels ) {
  156. ka.close();
  157. }
  158. isRunning = false;
  159. } catch( InterruptedException e ) {
  160. throw new RuntimeException( "Interrupted while closing", e );
  161. }
  162. }
  163. public void broadcast( Message message )
  164. {
  165. broadcast( null, message );
  166. }
  167. public void broadcast( Filter<? super HostedConnection> filter, Message message )
  168. {
  169. if( connections.isEmpty() )
  170. return;
  171. ByteBuffer buffer = MessageProtocol.messageToBuffer(message, null);
  172. FilterAdapter adapter = filter == null ? null : new FilterAdapter(filter);
  173. if( message.isReliable() || fastAdapter == null ) {
  174. // Don't need to copy the data because message protocol is already
  175. // giving us a fresh buffer
  176. reliableAdapter.broadcast( adapter, buffer, true, false );
  177. } else {
  178. fastAdapter.broadcast( adapter, buffer, false, false );
  179. }
  180. }
  181. public void broadcast( int channel, Filter<? super HostedConnection> filter, Message message )
  182. {
  183. if( connections.isEmpty() )
  184. return;
  185. checkChannel(channel);
  186. ByteBuffer buffer = MessageProtocol.messageToBuffer(message, null);
  187. FilterAdapter adapter = filter == null ? null : new FilterAdapter(filter);
  188. channels.get(channel+CH_FIRST).broadcast( adapter, buffer, true, false );
  189. }
  190. public HostedConnection getConnection( int id )
  191. {
  192. return connections.get(id);
  193. }
  194. public boolean hasConnections()
  195. {
  196. return !connections.isEmpty();
  197. }
  198. public Collection<HostedConnection> getConnections()
  199. {
  200. return Collections.unmodifiableCollection((Collection<HostedConnection>)connections.values());
  201. }
  202. public void addConnectionListener( ConnectionListener listener )
  203. {
  204. connectionListeners.add(listener);
  205. }
  206. public void removeConnectionListener( ConnectionListener listener )
  207. {
  208. connectionListeners.remove(listener);
  209. }
  210. public void addMessageListener( MessageListener<? super HostedConnection> listener )
  211. {
  212. messageListeners.addMessageListener( listener );
  213. }
  214. public void addMessageListener( MessageListener<? super HostedConnection> listener, Class... classes )
  215. {
  216. messageListeners.addMessageListener( listener, classes );
  217. }
  218. public void removeMessageListener( MessageListener<? super HostedConnection> listener )
  219. {
  220. messageListeners.removeMessageListener( listener );
  221. }
  222. public void removeMessageListener( MessageListener<? super HostedConnection> listener, Class... classes )
  223. {
  224. messageListeners.removeMessageListener( listener, classes );
  225. }
  226. protected void dispatch( HostedConnection source, Message m )
  227. {
  228. if( source == null ) {
  229. messageListeners.messageReceived( source, m );
  230. } else {
  231. // A semi-heavy handed way to make sure the listener
  232. // doesn't get called at the same time from two different
  233. // threads for the same hosted connection.
  234. synchronized( source ) {
  235. messageListeners.messageReceived( source, m );
  236. }
  237. }
  238. }
  239. protected void fireConnectionAdded( HostedConnection conn )
  240. {
  241. for( ConnectionListener l : connectionListeners ) {
  242. l.connectionAdded( this, conn );
  243. }
  244. }
  245. protected void fireConnectionRemoved( HostedConnection conn )
  246. {
  247. for( ConnectionListener l : connectionListeners ) {
  248. l.connectionRemoved( this, conn );
  249. }
  250. }
  251. protected int getChannel( KernelAdapter ka )
  252. {
  253. return channels.indexOf(ka);
  254. }
  255. protected void registerClient( KernelAdapter ka, Endpoint p, ClientRegistrationMessage m )
  256. {
  257. Connection addedConnection = null;
  258. // generally this will only be called by one thread but it's
  259. // important enough I won't take chances
  260. synchronized( this ) {
  261. // Grab the random ID that the client created when creating
  262. // its two registration messages
  263. long tempId = m.getId();
  264. // See if we already have one
  265. Connection c = connecting.remove(tempId);
  266. if( c == null ) {
  267. c = new Connection(channels.size());
  268. log.log( Level.FINE, "Registering client for endpoint, pass 1:{0}.", p );
  269. } else {
  270. log.log( Level.FINE, "Refining client registration for endpoint:{0}.", p );
  271. }
  272. // Fill in what we now know
  273. int channel = getChannel(ka);
  274. c.setChannel(channel, p);
  275. log.log( Level.FINE, "Setting up channel:{0}", channel );
  276. // If it's channel 0 then this is the initial connection
  277. // and we will send the connection information
  278. if( channel == CH_RELIABLE ) {
  279. // Validate the name and version which is only sent
  280. // over the reliable connection at this point.
  281. if( !getGameName().equals(m.getGameName())
  282. || getVersion() != m.getVersion() ) {
  283. log.log( Level.INFO, "Kicking client due to name/version mismatch:{0}.", c );
  284. // Need to kick them off... I may regret doing this from within
  285. // the sync block but the alternative is more code
  286. c.close( "Server client mismatch, server:" + getGameName() + " v" + getVersion()
  287. + " client:" + m.getGameName() + " v" + m.getVersion() );
  288. return;
  289. }
  290. // Else send the extra channel information to the client
  291. if( !alternatePorts.isEmpty() ) {
  292. ChannelInfoMessage cim = new ChannelInfoMessage( m.getId(), alternatePorts );
  293. c.send(cim);
  294. }
  295. }
  296. if( c.isComplete() ) {
  297. // Then we are fully connected
  298. if( connections.put( c.getId(), c ) == null ) {
  299. for( Endpoint cp : c.channels ) {
  300. if( cp == null )
  301. continue;
  302. endpointConnections.put( cp, c );
  303. }
  304. addedConnection = c;
  305. }
  306. } else {
  307. // Need to keep getting channels so we'll keep it in
  308. // the map
  309. connecting.put(tempId, c);
  310. }
  311. }
  312. // Best to do this outside of the synch block to avoid
  313. // over synchronizing which is the path to deadlocks
  314. if( addedConnection != null ) {
  315. log.log( Level.INFO, "Client registered:{0}.", addedConnection );
  316. // Send the ID back to the client letting it know it's
  317. // fully connected.
  318. m = new ClientRegistrationMessage();
  319. m.setId( addedConnection.getId() );
  320. m.setReliable(true);
  321. addedConnection.send(m);
  322. // Now we can notify the listeners about the
  323. // new connection.
  324. fireConnectionAdded( addedConnection );
  325. }
  326. }
  327. protected HostedConnection getConnection( Endpoint endpoint )
  328. {
  329. return endpointConnections.get(endpoint);
  330. }
  331. protected void connectionClosed( Endpoint p )
  332. {
  333. log.log( Level.INFO, "Connection closed:{0}.", p );
  334. // Try to find the endpoint in all ways that it might
  335. // exist. Note: by this point the raw network channel is
  336. // closed already.
  337. // Also note: this method will be called multiple times per
  338. // HostedConnection if it has multiple endpoints.
  339. Connection removed = null;
  340. synchronized( this ) {
  341. // Just in case the endpoint was still connecting
  342. connecting.values().remove(p);
  343. // And the regular management
  344. removed = (Connection)endpointConnections.remove(p);
  345. if( removed != null ) {
  346. connections.remove( removed.getId() );
  347. }
  348. log.log( Level.FINE, "Connections size:{0}", connections.size() );
  349. log.log( Level.FINE, "Endpoint mappings size:{0}", endpointConnections.size() );
  350. }
  351. // Better not to fire events while we hold a lock
  352. // so always do this outside the synch block.
  353. if( removed != null ) {
  354. log.log( Level.INFO, "Client closed:{0}.", removed );
  355. removed.closeConnection();
  356. }
  357. }
  358. protected class Connection implements HostedConnection
  359. {
  360. private int id;
  361. private boolean closed;
  362. private Endpoint[] channels;
  363. private int setChannelCount = 0;
  364. private Map<String,Object> sessionData = new ConcurrentHashMap<String,Object>();
  365. public Connection( int channelCount )
  366. {
  367. id = nextId.getAndIncrement();
  368. channels = new Endpoint[channelCount];
  369. }
  370. void setChannel( int channel, Endpoint p )
  371. {
  372. if( channels[channel] != null && channels[channel] != p ) {
  373. throw new RuntimeException( "Channel has already been set:" + channel
  374. + " = " + channels[channel] + ", cannot be set to:" + p );
  375. }
  376. channels[channel] = p;
  377. if( p != null )
  378. setChannelCount++;
  379. }
  380. boolean isComplete()
  381. {
  382. return setChannelCount == channels.length;
  383. }
  384. public Server getServer()
  385. {
  386. return DefaultServer.this;
  387. }
  388. public int getId()
  389. {
  390. return id;
  391. }
  392. public String getAddress()
  393. {
  394. return channels[CH_RELIABLE] == null ? null : channels[CH_RELIABLE].getAddress();
  395. }
  396. public void send( Message message )
  397. {
  398. ByteBuffer buffer = MessageProtocol.messageToBuffer(message, null);
  399. if( message.isReliable() || channels[CH_UNRELIABLE] == null ) {
  400. channels[CH_RELIABLE].send( buffer );
  401. } else {
  402. channels[CH_UNRELIABLE].send( buffer );
  403. }
  404. }
  405. public void send( int channel, Message message )
  406. {
  407. checkChannel(channel);
  408. ByteBuffer buffer = MessageProtocol.messageToBuffer(message, null);
  409. channels[channel+CH_FIRST].send(buffer);
  410. }
  411. protected void closeConnection()
  412. {
  413. if( closed )
  414. return;
  415. closed = true;
  416. // Make sure all endpoints are closed. Note: reliable
  417. // should always already be closed through all paths that I
  418. // can conceive... but it doesn't hurt to be sure.
  419. for( Endpoint p : channels ) {
  420. if( p == null )
  421. continue;
  422. p.close();
  423. }
  424. fireConnectionRemoved( this );
  425. }
  426. public void close( String reason )
  427. {
  428. // Send a reason
  429. DisconnectMessage m = new DisconnectMessage();
  430. m.setType( DisconnectMessage.KICK );
  431. m.setReason( reason );
  432. m.setReliable( true );
  433. send( m );
  434. // Just close the reliable endpoint
  435. // fast will be cleaned up as a side-effect
  436. // when closeConnection() is called by the
  437. // connectionClosed() endpoint callback.
  438. if( channels[CH_RELIABLE] != null ) {
  439. // Close with flush so we make sure our
  440. // message gets out
  441. channels[CH_RELIABLE].close(true);
  442. }
  443. }
  444. public Object setAttribute( String name, Object value )
  445. {
  446. if( value == null )
  447. return sessionData.remove(name);
  448. return sessionData.put(name, value);
  449. }
  450. @SuppressWarnings("unchecked")
  451. public <T> T getAttribute( String name )
  452. {
  453. return (T)sessionData.get(name);
  454. }
  455. public Set<String> attributeNames()
  456. {
  457. return Collections.unmodifiableSet(sessionData.keySet());
  458. }
  459. public String toString()
  460. {
  461. return "Connection[ id=" + id + ", reliable=" + channels[CH_RELIABLE]
  462. + ", fast=" + channels[CH_UNRELIABLE] + " ]";
  463. }
  464. }
  465. protected class Redispatch implements MessageListener<HostedConnection>
  466. {
  467. public void messageReceived( HostedConnection source, Message m )
  468. {
  469. dispatch( source, m );
  470. }
  471. }
  472. protected class FilterAdapter implements Filter<Endpoint>
  473. {
  474. private Filter<? super HostedConnection> delegate;
  475. public FilterAdapter( Filter<? super HostedConnection> delegate )
  476. {
  477. this.delegate = delegate;
  478. }
  479. public boolean apply( Endpoint input )
  480. {
  481. HostedConnection conn = getConnection( input );
  482. if( conn == null )
  483. return false;
  484. return delegate.apply(conn);
  485. }
  486. }
  487. }