|
@@ -112,7 +112,7 @@ public class SelectorKernel extends AbstractKernel
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- public void broadcast( Filter<? super Endpoint> filter, ByteBuffer data, boolean reliable,
|
|
|
+ public void broadcast( Filter<? super Endpoint> filter, ByteBuffer data, boolean reliable,
|
|
|
boolean copy )
|
|
|
{
|
|
|
if( !reliable )
|
|
@@ -133,7 +133,7 @@ public class SelectorKernel extends AbstractKernel
|
|
|
continue;
|
|
|
|
|
|
// Give it the data... but let each endpoint track their
|
|
|
- // own completion over the shared array of bytes by
|
|
|
+ // own completion over the shared array of bytes by
|
|
|
// duplicating it
|
|
|
p.send( data.duplicate(), false, false );
|
|
|
}
|
|
@@ -164,7 +164,7 @@ public class SelectorKernel extends AbstractKernel
|
|
|
|
|
|
// Enqueue an endpoint event for the listeners
|
|
|
addEvent( EndpointEvent.createRemove( this, p ) );
|
|
|
-
|
|
|
+
|
|
|
// If there are no pending messages then add one so that the
|
|
|
// kernel-user knows to wake up if it is only listening for
|
|
|
// envelopes.
|
|
@@ -174,7 +174,7 @@ public class SelectorKernel extends AbstractKernel
|
|
|
// to check again.
|
|
|
addEnvelope( EVENTS_PENDING );
|
|
|
}
|
|
|
- }
|
|
|
+ }
|
|
|
|
|
|
/**
|
|
|
* Called by the endpoints when they need to be closed.
|
|
@@ -285,10 +285,10 @@ public class SelectorKernel extends AbstractKernel
|
|
|
// efficiently done as change requests... or simply
|
|
|
// keeping a thread-safe set of endpoints with pending
|
|
|
// writes. For most cases, it shouldn't matter.
|
|
|
- for( Map.Entry<NioEndpoint,SelectionKey> e : endpointKeys.entrySet() ) {
|
|
|
+ for( Map.Entry<NioEndpoint,SelectionKey> e : endpointKeys.entrySet() ) {
|
|
|
if( e.getKey().hasPending() ) {
|
|
|
e.getValue().interestOps(SelectionKey.OP_WRITE);
|
|
|
- }
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -318,7 +318,13 @@ public class SelectorKernel extends AbstractKernel
|
|
|
|
|
|
protected void cancel( NioEndpoint p ) throws IOException
|
|
|
{
|
|
|
+ log.log( Level.INFO, "Closing endpoint:{0}.", p );
|
|
|
SelectionKey key = endpointKeys.remove(p);
|
|
|
+ if( key == null ) {
|
|
|
+ log.log( Level.INFO, "Endpoint already closed:{0}.", p );
|
|
|
+ return; // already closed it
|
|
|
+ }
|
|
|
+
|
|
|
SocketChannel c = (SocketChannel)key.channel();
|
|
|
|
|
|
// Note: key.cancel() is specifically thread safe. One of
|
|
@@ -332,6 +338,7 @@ public class SelectorKernel extends AbstractKernel
|
|
|
protected void cancel( SelectionKey key, SocketChannel c ) throws IOException
|
|
|
{
|
|
|
NioEndpoint p = (NioEndpoint)key.attachment();
|
|
|
+ log.log( Level.INFO, "Closing channel endpoint:{0}.", p );
|
|
|
endpointKeys.remove(p);
|
|
|
|
|
|
key.cancel();
|
|
@@ -375,13 +382,13 @@ public class SelectorKernel extends AbstractKernel
|
|
|
if( current == NioEndpoint.CLOSE_MARKER ) {
|
|
|
// This connection wants to be closed now
|
|
|
closeEndpoint(p);
|
|
|
-
|
|
|
+
|
|
|
// Nothing more to do
|
|
|
- return;
|
|
|
+ return;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
c.write( current );
|
|
|
-
|
|
|
+
|
|
|
// If we wrote all of that packet then we need to remove it
|
|
|
if( current.remaining() == 0 ) {
|
|
|
p.removePending();
|