|
|
@@ -21,20 +21,17 @@
|
|
|
#include "netDatagram.h"
|
|
|
#include "datagramTCPHeader.h"
|
|
|
#include "datagramUDPHeader.h"
|
|
|
-#include "pprerror.h"
|
|
|
#include "config_net.h"
|
|
|
#include "trueClock.h"
|
|
|
-
|
|
|
+#include "socket_udp.h"
|
|
|
+#include "socket_tcp.h"
|
|
|
+#include "mutexHolder.h"
|
|
|
#include "pnotify.h"
|
|
|
-#include <prerror.h>
|
|
|
-#include <pratom.h>
|
|
|
-#include <algorithm>
|
|
|
+#include "atomicAdjust.h"
|
|
|
|
|
|
static const int read_buffer_size = maximum_udp_datagram + datagram_udp_header_size;
|
|
|
|
|
|
-// We have to impose a maximum timeout on the PR_Poll() call because
|
|
|
-// PR_Poll() doesn't seem to be interruptible! (!)
|
|
|
-static const PRUint32 max_timeout_ms = 100;
|
|
|
+static const int max_timeout_ms = 100;
|
|
|
|
|
|
////////////////////////////////////////////////////////////////////
|
|
|
// Function: ConnectionReader::SocketInfo::Constructor
|
|
|
@@ -56,7 +53,7 @@ SocketInfo(const PT(Connection) &connection) :
|
|
|
////////////////////////////////////////////////////////////////////
|
|
|
bool ConnectionReader::SocketInfo::
|
|
|
is_udp() const {
|
|
|
- return (PR_GetDescType(_connection->get_socket()) == PR_DESC_SOCKET_UDP);
|
|
|
+ return (_connection->get_socket()->is_exact_type(Socket_UDP::get_class_type()));
|
|
|
}
|
|
|
|
|
|
////////////////////////////////////////////////////////////////////
|
|
|
@@ -64,11 +61,34 @@ is_udp() const {
|
|
|
// Access: Public
|
|
|
// Description:
|
|
|
////////////////////////////////////////////////////////////////////
|
|
|
-PRFileDesc *ConnectionReader::SocketInfo::
|
|
|
+Socket_IP *ConnectionReader::SocketInfo::
|
|
|
get_socket() const {
|
|
|
return _connection->get_socket();
|
|
|
}
|
|
|
|
|
|
+////////////////////////////////////////////////////////////////////
|
|
|
+// Function: ConnectionReader::ReaderThread::Constructor
|
|
|
+// Access: Public
|
|
|
+// Description:
|
|
|
+////////////////////////////////////////////////////////////////////
|
|
|
+ConnectionReader::ReaderThread::
|
|
|
+ReaderThread(ConnectionReader *reader, int thread_index) :
|
|
|
+ Thread("ReaderThread", "ReaderThread"),
|
|
|
+ _reader(reader),
|
|
|
+ _thread_index(thread_index)
|
|
|
+{
|
|
|
+}
|
|
|
+
|
|
|
+////////////////////////////////////////////////////////////////////
|
|
|
+// Function: ConnectionReader::ReaderThread::thread_main
|
|
|
+// Access: Public, Virtual
|
|
|
+// Description:
|
|
|
+////////////////////////////////////////////////////////////////////
|
|
|
+void ConnectionReader::ReaderThread::
|
|
|
+thread_main() {
|
|
|
+ _reader->thread_run(_thread_index);
|
|
|
+}
|
|
|
+
|
|
|
////////////////////////////////////////////////////////////////////
|
|
|
// Function: ConnectionReader::Constructor
|
|
|
// Access: Public
|
|
|
@@ -83,10 +103,6 @@ ConnectionReader(ConnectionManager *manager, int num_threads) :
|
|
|
_manager(manager)
|
|
|
{
|
|
|
#ifndef HAVE_THREADS
|
|
|
- // Although this code is written to use thread-locking primitives
|
|
|
- // regardless of the definition of HAVE_THREADS, it is not safe to
|
|
|
- // spawn multiple threads when HAVE_THREADS is not true, since we
|
|
|
- // might be using a non-thread-safe malloc scheme.
|
|
|
#ifndef NDEBUG
|
|
|
if (num_threads != 0) {
|
|
|
net_cat.error()
|
|
|
@@ -101,36 +117,20 @@ ConnectionReader(ConnectionManager *manager, int num_threads) :
|
|
|
_polling = (num_threads <= 0);
|
|
|
|
|
|
_shutdown = false;
|
|
|
- _startup_mutex = PR_NewLock();
|
|
|
|
|
|
_next_index = 0;
|
|
|
_num_results = 0;
|
|
|
- _select_mutex = PR_NewLock();
|
|
|
|
|
|
_currently_polling_thread = -1;
|
|
|
|
|
|
- _reexamine_sockets = false;
|
|
|
- _sockets_mutex = PR_NewLock();
|
|
|
-
|
|
|
- // Before we create all the threads, grab _startup_mutex. That will
|
|
|
- // prevent our new threads from trying to look themselves up in the
|
|
|
- // _threads array before we have filled it up.
|
|
|
- PR_Lock(_startup_mutex);
|
|
|
-
|
|
|
- for (int i = 0; i < num_threads; i++) {
|
|
|
- PRThread *thread =
|
|
|
- PR_CreateThread(PR_USER_THREAD,
|
|
|
- thread_start, (void *)this,
|
|
|
- PR_PRIORITY_NORMAL,
|
|
|
- PR_GLOBAL_THREAD, // Since thread will mostly do I/O.
|
|
|
- PR_JOINABLE_THREAD,
|
|
|
- 0); // Select a suitable stack size.
|
|
|
-
|
|
|
- nassertv(thread != (PRThread *)NULL);
|
|
|
+ int i;
|
|
|
+ for (i = 0; i < num_threads; i++) {
|
|
|
+ PT(ReaderThread) thread = new ReaderThread(this, i);
|
|
|
_threads.push_back(thread);
|
|
|
}
|
|
|
-
|
|
|
- PR_Unlock(_startup_mutex);
|
|
|
+ for (i = 0; i < num_threads; i++) {
|
|
|
+ _threads[i]->start(TP_normal, true, true);
|
|
|
+ }
|
|
|
|
|
|
_manager->add_reader(this);
|
|
|
}
|
|
|
@@ -166,10 +166,6 @@ ConnectionReader::
|
|
|
sinfo->_connection.clear();
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
- PR_DestroyLock(_startup_mutex);
|
|
|
- PR_DestroyLock(_select_mutex);
|
|
|
- PR_DestroyLock(_sockets_mutex);
|
|
|
}
|
|
|
|
|
|
////////////////////////////////////////////////////////////////////
|
|
|
@@ -190,24 +186,21 @@ ConnectionReader::
|
|
|
// will by any thread.
|
|
|
////////////////////////////////////////////////////////////////////
|
|
|
bool ConnectionReader::
|
|
|
-add_connection(const PT(Connection) &connection) {
|
|
|
+add_connection(Connection *connection) {
|
|
|
nassertr(connection != (Connection *)NULL, false);
|
|
|
|
|
|
- PR_Lock(_sockets_mutex);
|
|
|
+ MutexHolder holder(_sockets_mutex);
|
|
|
|
|
|
// Make sure it's not already on the _sockets list.
|
|
|
Sockets::const_iterator si;
|
|
|
for (si = _sockets.begin(); si != _sockets.end(); ++si) {
|
|
|
if ((*si)->_connection == connection) {
|
|
|
// Whoops, already there.
|
|
|
- PR_Unlock(_sockets_mutex);
|
|
|
return false;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
_sockets.push_back(new SocketInfo(connection));
|
|
|
- _reexamine_sockets = true;
|
|
|
- PR_Unlock(_sockets_mutex);
|
|
|
|
|
|
return true;
|
|
|
}
|
|
|
@@ -224,8 +217,8 @@ add_connection(const PT(Connection) &connection) {
|
|
|
// at will by any thread.
|
|
|
////////////////////////////////////////////////////////////////////
|
|
|
bool ConnectionReader::
|
|
|
-remove_connection(const PT(Connection) &connection) {
|
|
|
- PR_Lock(_sockets_mutex);
|
|
|
+remove_connection(Connection *connection) {
|
|
|
+ MutexHolder holder(_sockets_mutex);
|
|
|
|
|
|
// Walk through the list of sockets to find the one we're removing.
|
|
|
Sockets::iterator si;
|
|
|
@@ -234,14 +227,11 @@ remove_connection(const PT(Connection) &connection) {
|
|
|
++si;
|
|
|
}
|
|
|
if (si == _sockets.end()) {
|
|
|
- PR_Unlock(_sockets_mutex);
|
|
|
return false;
|
|
|
}
|
|
|
|
|
|
_removed_sockets.push_back(*si);
|
|
|
_sockets.erase(si);
|
|
|
- _reexamine_sockets = true;
|
|
|
- PR_Unlock(_sockets_mutex);
|
|
|
|
|
|
return true;
|
|
|
}
|
|
|
@@ -258,8 +248,8 @@ remove_connection(const PT(Connection) &connection) {
|
|
|
// the connection.)
|
|
|
////////////////////////////////////////////////////////////////////
|
|
|
bool ConnectionReader::
|
|
|
-is_connection_ok(const PT(Connection) &connection) {
|
|
|
- PR_Lock(_sockets_mutex);
|
|
|
+is_connection_ok(Connection *connection) {
|
|
|
+ MutexHolder holder(_sockets_mutex);
|
|
|
|
|
|
// Walk through the list of sockets to find the one we're asking
|
|
|
// about.
|
|
|
@@ -270,13 +260,11 @@ is_connection_ok(const PT(Connection) &connection) {
|
|
|
}
|
|
|
if (si == _sockets.end()) {
|
|
|
// Don't know that connection.
|
|
|
- PR_Unlock(_sockets_mutex);
|
|
|
return false;
|
|
|
}
|
|
|
|
|
|
SocketInfo *sinfo = (*si);
|
|
|
bool is_ok = !sinfo->_error;
|
|
|
- PR_Unlock(_sockets_mutex);
|
|
|
|
|
|
return is_ok;
|
|
|
}
|
|
|
@@ -299,14 +287,14 @@ poll() {
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- SocketInfo *sinfo = get_next_available_socket(PR_INTERVAL_NO_WAIT, -2);
|
|
|
+ SocketInfo *sinfo = get_next_available_socket(false, -2);
|
|
|
if (sinfo != (SocketInfo *)NULL) {
|
|
|
double max_poll_cycle = get_max_poll_cycle();
|
|
|
if (max_poll_cycle < 0.0) {
|
|
|
// Continue to read all data.
|
|
|
while (sinfo != (SocketInfo *)NULL) {
|
|
|
process_incoming_data(sinfo);
|
|
|
- sinfo = get_next_available_socket(PR_INTERVAL_NO_WAIT, -2);
|
|
|
+ sinfo = get_next_available_socket(false, -2);
|
|
|
}
|
|
|
|
|
|
} else {
|
|
|
@@ -319,7 +307,7 @@ poll() {
|
|
|
if (global_clock->get_short_time() >= stop) {
|
|
|
return;
|
|
|
}
|
|
|
- sinfo = get_next_available_socket(PR_INTERVAL_NO_WAIT, -2);
|
|
|
+ sinfo = get_next_available_socket(false, -2);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
@@ -429,16 +417,7 @@ shutdown() {
|
|
|
// Now wait for all of our threads to terminate.
|
|
|
Threads::iterator ti;
|
|
|
for (ti = _threads.begin(); ti != _threads.end(); ++ti) {
|
|
|
- // Interrupt the thread so it can notice the shutdown.
|
|
|
- PRStatus result = PR_Interrupt(*ti);
|
|
|
- if (result != PR_SUCCESS) {
|
|
|
- pprerror("PR_Interrupt");
|
|
|
- }
|
|
|
-
|
|
|
- result = PR_JoinThread(*ti);
|
|
|
- if (result != PR_SUCCESS) {
|
|
|
- pprerror("PR_JoinThread");
|
|
|
- }
|
|
|
+ (*ti)->join();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -470,34 +449,14 @@ finish_socket(SocketInfo *sinfo) {
|
|
|
// By marking the SocketInfo nonbusy, we make it available for
|
|
|
// future polls.
|
|
|
sinfo->_busy = false;
|
|
|
- _reexamine_sockets = true;
|
|
|
-
|
|
|
- // However, someone might be already blocking on an
|
|
|
- // earlier-established PR_Poll() that doesn't involve this socket.
|
|
|
- // That complicates things. It means we'll have to wake that thread
|
|
|
- // up so it can rebuild the poll with the new socket.
|
|
|
-
|
|
|
- // Actually, don't bother, since it turns out that PR_Poll() isn't
|
|
|
- // interruptible anyway. Sigh. Maybe we'll revisit this later, but
|
|
|
- // in the meantime it means we have to rig up the PR_Poll() to
|
|
|
- // return every so often and check the _reexamine_sockets flag by
|
|
|
- // itself.
|
|
|
-
|
|
|
- /*
|
|
|
- int thread = _currently_polling_thread;
|
|
|
- if (thread != -1) {
|
|
|
- nassertv(thread >= 0 && thread < _threads.size());
|
|
|
- PR_Interrupt(_threads[thread]);
|
|
|
- }
|
|
|
- */
|
|
|
}
|
|
|
|
|
|
////////////////////////////////////////////////////////////////////
|
|
|
// Function: ConnectionReader::process_incoming_data
|
|
|
// Access: Protected, Virtual
|
|
|
// Description: This is run within a thread when the call to
|
|
|
-// PR_Poll() indicates there is data available
|
|
|
-// on a socket.
|
|
|
+// select() indicates there is data available on a
|
|
|
+// socket.
|
|
|
////////////////////////////////////////////////////////////////////
|
|
|
void ConnectionReader::
|
|
|
process_incoming_data(SocketInfo *sinfo) {
|
|
|
@@ -523,21 +482,17 @@ process_incoming_data(SocketInfo *sinfo) {
|
|
|
////////////////////////////////////////////////////////////////////
|
|
|
void ConnectionReader::
|
|
|
process_incoming_udp_data(SocketInfo *sinfo) {
|
|
|
- PRFileDesc *socket = sinfo->get_socket();
|
|
|
- PRNetAddr addr;
|
|
|
+ Socket_UDP *socket;
|
|
|
+ DCAST_INTO_V(socket, sinfo->get_socket());
|
|
|
+ Socket_Address addr;
|
|
|
|
|
|
// Read as many bytes as we can.
|
|
|
- PRInt8 buffer[read_buffer_size];
|
|
|
- PRInt32 bytes_read;
|
|
|
+ char buffer[read_buffer_size];
|
|
|
+ int bytes_read = read_buffer_size;
|
|
|
|
|
|
- bytes_read = PR_RecvFrom(socket, buffer, read_buffer_size, 0,
|
|
|
- &addr, PR_INTERVAL_NO_TIMEOUT);
|
|
|
+ bool okflag = socket->GetPacket(buffer, &bytes_read, addr);
|
|
|
|
|
|
- if (bytes_read < 0) {
|
|
|
- PRErrorCode errcode = PR_GetError();
|
|
|
- if (errcode != PR_PENDING_INTERRUPT_ERROR) {
|
|
|
- pprerror("PR_RecvFrom");
|
|
|
- }
|
|
|
+ if (!okflag) {
|
|
|
finish_socket(sinfo);
|
|
|
return;
|
|
|
|
|
|
@@ -563,7 +518,7 @@ process_incoming_udp_data(SocketInfo *sinfo) {
|
|
|
|
|
|
DatagramUDPHeader header(buffer);
|
|
|
|
|
|
- PRInt8 *dp = buffer + datagram_udp_header_size;
|
|
|
+ char *dp = buffer + datagram_udp_header_size;
|
|
|
bytes_read -= datagram_udp_header_size;
|
|
|
|
|
|
NetDatagram datagram(dp, bytes_read);
|
|
|
@@ -594,44 +549,20 @@ process_incoming_udp_data(SocketInfo *sinfo) {
|
|
|
////////////////////////////////////////////////////////////////////
|
|
|
void ConnectionReader::
|
|
|
process_incoming_tcp_data(SocketInfo *sinfo) {
|
|
|
- PRFileDesc *socket = sinfo->get_socket();
|
|
|
- PRNetAddr addr;
|
|
|
+ Socket_TCP *socket;
|
|
|
+ DCAST_INTO_V(socket, sinfo->get_socket());
|
|
|
|
|
|
// Read only the header bytes to start with.
|
|
|
- PRInt8 buffer[read_buffer_size];
|
|
|
- PRInt32 header_bytes_read = 0;
|
|
|
-
|
|
|
- if (PR_GetSockName(socket, &addr) != PR_SUCCESS) {
|
|
|
- pprerror("PR_GetSockName");
|
|
|
- }
|
|
|
+ char buffer[read_buffer_size];
|
|
|
+ int header_bytes_read = 0;
|
|
|
|
|
|
// First, we have to read the first _tcp_header_size bytes.
|
|
|
while (header_bytes_read < _tcp_header_size) {
|
|
|
- PRInt32 bytes_read =
|
|
|
- PR_Recv(socket, buffer + header_bytes_read,
|
|
|
- _tcp_header_size - header_bytes_read, 0,
|
|
|
- PR_INTERVAL_NO_TIMEOUT);
|
|
|
-
|
|
|
- if (bytes_read < 0) {
|
|
|
- PRErrorCode errcode = PR_GetError();
|
|
|
- if (errcode == PR_CONNECT_RESET_ERROR
|
|
|
-#ifdef PR_SOCKET_SHUTDOWN_ERROR
|
|
|
- || errcode == PR_SOCKET_SHUTDOWN_ERROR
|
|
|
- || errcode == PR_CONNECT_ABORTED_ERROR
|
|
|
-#endif
|
|
|
- ) {
|
|
|
- // The socket was closed.
|
|
|
- if (_manager != (ConnectionManager *)NULL) {
|
|
|
- _manager->connection_reset(sinfo->_connection, errcode);
|
|
|
- }
|
|
|
-
|
|
|
- } else if (errcode != PR_PENDING_INTERRUPT_ERROR) {
|
|
|
- pprerror("PR_Recv");
|
|
|
- }
|
|
|
- finish_socket(sinfo);
|
|
|
- return;
|
|
|
+ int bytes_read =
|
|
|
+ socket->RecvData(buffer + header_bytes_read,
|
|
|
+ _tcp_header_size - header_bytes_read);
|
|
|
|
|
|
- } else if (bytes_read == 0) {
|
|
|
+ if (bytes_read <= 0) {
|
|
|
// The socket was closed. Report that and return.
|
|
|
if (_manager != (ConnectionManager *)NULL) {
|
|
|
_manager->connection_reset(sinfo->_connection, 0);
|
|
|
@@ -655,41 +586,20 @@ process_incoming_tcp_data(SocketInfo *sinfo) {
|
|
|
}
|
|
|
|
|
|
DatagramTCPHeader header(buffer, _tcp_header_size);
|
|
|
- PRInt32 size = header.get_datagram_size(_tcp_header_size);
|
|
|
+ int size = header.get_datagram_size(_tcp_header_size);
|
|
|
|
|
|
// We have to loop until the entire datagram is read.
|
|
|
NetDatagram datagram;
|
|
|
|
|
|
while (!_shutdown && (int)datagram.get_length() < size) {
|
|
|
- PRInt32 bytes_read;
|
|
|
+ int bytes_read;
|
|
|
|
|
|
bytes_read =
|
|
|
- PR_Recv(socket, buffer,
|
|
|
- min((PRInt32)read_buffer_size,
|
|
|
- (PRInt32)(size - datagram.get_length())),
|
|
|
- 0, PR_INTERVAL_NO_TIMEOUT);
|
|
|
- PRInt8 *dp = buffer;
|
|
|
-
|
|
|
- if (bytes_read < 0) {
|
|
|
- PRErrorCode errcode = PR_GetError();
|
|
|
- if (errcode == PR_CONNECT_RESET_ERROR
|
|
|
-#ifdef PR_SOCKET_SHUTDOWN_ERROR
|
|
|
- || errcode == PR_SOCKET_SHUTDOWN_ERROR
|
|
|
- || errcode == PR_CONNECT_ABORTED_ERROR
|
|
|
-#endif
|
|
|
- ) {
|
|
|
- // The socket was closed.
|
|
|
- if (_manager != (ConnectionManager *)NULL) {
|
|
|
- _manager->connection_reset(sinfo->_connection, errcode);
|
|
|
- }
|
|
|
+ socket->RecvData(buffer, min(read_buffer_size,
|
|
|
+ (int)(size - datagram.get_length())));
|
|
|
+ char *dp = buffer;
|
|
|
|
|
|
- } else if (errcode != PR_PENDING_INTERRUPT_ERROR) {
|
|
|
- pprerror("PR_Recv");
|
|
|
- }
|
|
|
- finish_socket(sinfo);
|
|
|
- return;
|
|
|
-
|
|
|
- } else if (bytes_read == 0) {
|
|
|
+ if (bytes_read <= 0) {
|
|
|
// The socket was closed. Report that and return.
|
|
|
if (_manager != (ConnectionManager *)NULL) {
|
|
|
_manager->connection_reset(sinfo->_connection, 0);
|
|
|
@@ -698,8 +608,8 @@ process_incoming_tcp_data(SocketInfo *sinfo) {
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- PRInt32 datagram_bytes =
|
|
|
- min(bytes_read, (PRInt32)(size - datagram.get_length()));
|
|
|
+ int datagram_bytes =
|
|
|
+ min(bytes_read, (int)(size - datagram.get_length()));
|
|
|
datagram.append_data(dp, datagram_bytes);
|
|
|
|
|
|
if (bytes_read > datagram_bytes) {
|
|
|
@@ -725,7 +635,7 @@ process_incoming_tcp_data(SocketInfo *sinfo) {
|
|
|
<< "Ignoring invalid TCP datagram.\n";
|
|
|
} else {
|
|
|
datagram.set_connection(sinfo->_connection);
|
|
|
- datagram.set_address(NetAddress(addr));
|
|
|
+ datagram.set_address(NetAddress(socket->GetPeerName()));
|
|
|
receive_datagram(datagram);
|
|
|
}
|
|
|
}
|
|
|
@@ -737,21 +647,17 @@ process_incoming_tcp_data(SocketInfo *sinfo) {
|
|
|
////////////////////////////////////////////////////////////////////
|
|
|
void ConnectionReader::
|
|
|
process_raw_incoming_udp_data(SocketInfo *sinfo) {
|
|
|
- PRFileDesc *socket = sinfo->get_socket();
|
|
|
- PRNetAddr addr;
|
|
|
+ Socket_UDP *socket;
|
|
|
+ DCAST_INTO_V(socket, sinfo->get_socket());
|
|
|
+ Socket_Address addr;
|
|
|
|
|
|
// Read as many bytes as we can.
|
|
|
- PRInt8 buffer[read_buffer_size];
|
|
|
- PRInt32 bytes_read;
|
|
|
+ char buffer[read_buffer_size];
|
|
|
+ int bytes_read = read_buffer_size;
|
|
|
|
|
|
- bytes_read = PR_RecvFrom(socket, buffer, read_buffer_size, 0,
|
|
|
- &addr, PR_INTERVAL_NO_TIMEOUT);
|
|
|
+ bool okflag = socket->GetPacket(buffer, &bytes_read, addr);
|
|
|
|
|
|
- if (bytes_read < 0) {
|
|
|
- PRErrorCode errcode = PR_GetError();
|
|
|
- if (errcode != PR_PENDING_INTERRUPT_ERROR) {
|
|
|
- pprerror("PR_RecvFrom");
|
|
|
- }
|
|
|
+ if (!okflag) {
|
|
|
finish_socket(sinfo);
|
|
|
return;
|
|
|
|
|
|
@@ -789,29 +695,14 @@ process_raw_incoming_udp_data(SocketInfo *sinfo) {
|
|
|
////////////////////////////////////////////////////////////////////
|
|
|
void ConnectionReader::
|
|
|
process_raw_incoming_tcp_data(SocketInfo *sinfo) {
|
|
|
- PRFileDesc *socket = sinfo->get_socket();
|
|
|
- PRNetAddr addr;
|
|
|
+ Socket_TCP *socket;
|
|
|
+ DCAST_INTO_V(socket, sinfo->get_socket());
|
|
|
|
|
|
// Read as many bytes as we can.
|
|
|
- PRInt8 buffer[read_buffer_size];
|
|
|
- PRInt32 bytes_read;
|
|
|
-
|
|
|
- if (PR_GetSockName(socket, &addr) != PR_SUCCESS) {
|
|
|
- pprerror("PR_GetSockName");
|
|
|
- }
|
|
|
+ char buffer[read_buffer_size];
|
|
|
+ int bytes_read = socket->RecvData(buffer, read_buffer_size);
|
|
|
|
|
|
- bytes_read = PR_Recv(socket, buffer, read_buffer_size, 0,
|
|
|
- PR_INTERVAL_NO_TIMEOUT);
|
|
|
-
|
|
|
- if (bytes_read < 0) {
|
|
|
- PRErrorCode errcode = PR_GetError();
|
|
|
- if (errcode != PR_PENDING_INTERRUPT_ERROR) {
|
|
|
- pprerror("PR_RecvFrom");
|
|
|
- }
|
|
|
- finish_socket(sinfo);
|
|
|
- return;
|
|
|
-
|
|
|
- } else if (bytes_read == 0) {
|
|
|
+ if (bytes_read <= 0) {
|
|
|
// The socket was closed. Report that and return.
|
|
|
if (_manager != (ConnectionManager *)NULL) {
|
|
|
_manager->connection_reset(sinfo->_connection, 0);
|
|
|
@@ -833,24 +724,10 @@ process_raw_incoming_tcp_data(SocketInfo *sinfo) {
|
|
|
}
|
|
|
|
|
|
datagram.set_connection(sinfo->_connection);
|
|
|
- datagram.set_address(NetAddress(addr));
|
|
|
+ datagram.set_address(NetAddress(socket->GetPeerName()));
|
|
|
receive_datagram(datagram);
|
|
|
}
|
|
|
|
|
|
-
|
|
|
-////////////////////////////////////////////////////////////////////
|
|
|
-// Function: ConnectionReader::thread_start
|
|
|
-// Access: Private, Static
|
|
|
-// Description: The static wrapper around the thread's executing
|
|
|
-// function. This must be a static member function
|
|
|
-// because it is passed through the C interface to
|
|
|
-// PR_CreateThread().
|
|
|
-////////////////////////////////////////////////////////////////////
|
|
|
-void ConnectionReader::
|
|
|
-thread_start(void *data) {
|
|
|
- ((ConnectionReader *)data)->thread_run();
|
|
|
-}
|
|
|
-
|
|
|
////////////////////////////////////////////////////////////////////
|
|
|
// Function: ConnectionReader::thread_run
|
|
|
// Access: Private
|
|
|
@@ -858,24 +735,13 @@ thread_start(void *data) {
|
|
|
// thread.
|
|
|
////////////////////////////////////////////////////////////////////
|
|
|
void ConnectionReader::
|
|
|
-thread_run() {
|
|
|
+thread_run(int thread_index) {
|
|
|
nassertv(!_polling);
|
|
|
-
|
|
|
- // First determine our own thread index.
|
|
|
- PR_Lock(_startup_mutex);
|
|
|
- Threads::const_iterator ti =
|
|
|
- find(_threads.begin(), _threads.end(), PR_GetCurrentThread());
|
|
|
-
|
|
|
- nassertv(ti != _threads.end());
|
|
|
- PRInt32 current_thread_index = (ti - _threads.begin());
|
|
|
-
|
|
|
- nassertv(_threads[current_thread_index] == PR_GetCurrentThread());
|
|
|
- PR_Unlock(_startup_mutex);
|
|
|
+ nassertv(_threads[thread_index] == Thread::get_current_thread());
|
|
|
|
|
|
while (!_shutdown) {
|
|
|
SocketInfo *sinfo =
|
|
|
- get_next_available_socket(PR_INTERVAL_NO_TIMEOUT,
|
|
|
- current_thread_index);
|
|
|
+ get_next_available_socket(false, thread_index);
|
|
|
if (sinfo != (SocketInfo *)NULL) {
|
|
|
process_incoming_data(sinfo);
|
|
|
}
|
|
|
@@ -892,59 +758,29 @@ thread_run() {
|
|
|
//
|
|
|
// This function may block indefinitely if it is being
|
|
|
// called by multiple threads; if there are no other
|
|
|
-// threads, it may block only if timeout !=
|
|
|
-// PR_INTERVAL_NO_WAIT.
|
|
|
+// threads, it may block only if allow_block is true.
|
|
|
////////////////////////////////////////////////////////////////////
|
|
|
ConnectionReader::SocketInfo *ConnectionReader::
|
|
|
-get_next_available_socket(PRIntervalTime timeout,
|
|
|
- PRInt32 current_thread_index) {
|
|
|
+get_next_available_socket(bool allow_block, int current_thread_index) {
|
|
|
// Go to sleep on the select() mutex. This guarantees that only one
|
|
|
// thread is in this function at a time.
|
|
|
- PR_Lock(_select_mutex);
|
|
|
-
|
|
|
- int num_sockets = _polled_sockets.size();
|
|
|
- nassertr(num_sockets == (int)_poll.size(), NULL);
|
|
|
+ MutexHolder holder(_select_mutex);
|
|
|
|
|
|
do {
|
|
|
- // First, check the result from the previous PR_Poll() call. If
|
|
|
+ // First, check the result from the previous select call. If
|
|
|
// there are any sockets remaining there, process them first.
|
|
|
while (!_shutdown && _num_results > 0) {
|
|
|
- nassertr(_next_index < num_sockets, NULL);
|
|
|
+ nassertr(_next_index < (int)_selecting_sockets.size(), NULL);
|
|
|
int i = _next_index;
|
|
|
_next_index++;
|
|
|
|
|
|
- if (_poll[i].out_flags != 0) {
|
|
|
+ if (_fdset.IsSetFor(*_selecting_sockets[i]->get_socket())) {
|
|
|
_num_results--;
|
|
|
- SocketInfo *sinfo = _polled_sockets[i];
|
|
|
-
|
|
|
- if ((_poll[i].out_flags & PR_POLL_READ) != 0) {
|
|
|
- // Some genuine noise on the port.
|
|
|
- sinfo->_busy = true;
|
|
|
- _reexamine_sockets = true;
|
|
|
- PR_Unlock(_select_mutex);
|
|
|
- PR_Sleep(PR_INTERVAL_NO_WAIT);
|
|
|
- return sinfo;
|
|
|
-
|
|
|
- } else if ((_poll[i].out_flags &
|
|
|
- (PR_POLL_ERR | PR_POLL_NVAL | PR_POLL_HUP)) != 0) {
|
|
|
- // Something bad happened to this socket. Tell the
|
|
|
- // ConnectionManager to drop it.
|
|
|
- if (_manager != (ConnectionManager *)NULL) {
|
|
|
- // Perform a recv to force an error code.
|
|
|
- char buffer[1];
|
|
|
- PRInt32 got_bytes =
|
|
|
- PR_Recv(sinfo->get_socket(), buffer, 1, 0, PR_INTERVAL_NO_WAIT);
|
|
|
- if (got_bytes > 0) {
|
|
|
- net_cat.error()
|
|
|
- << "poll returned error flags " << hex << _poll[i].out_flags
|
|
|
- << dec << " but read " << got_bytes << " from socket.\n";
|
|
|
- }
|
|
|
- PRErrorCode errcode = PR_GetError();
|
|
|
- _manager->connection_reset(sinfo->_connection, errcode);
|
|
|
- }
|
|
|
- sinfo->_error = true;
|
|
|
- _reexamine_sockets = true;
|
|
|
- }
|
|
|
+ SocketInfo *sinfo = _selecting_sockets[i];
|
|
|
+
|
|
|
+ // Some noise on this socket.
|
|
|
+ sinfo->_busy = true;
|
|
|
+ return sinfo;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -952,93 +788,70 @@ get_next_available_socket(PRIntervalTime timeout,
|
|
|
do {
|
|
|
interrupted = false;
|
|
|
|
|
|
- // Ok, no results from previous PR_Poll() calls. Prepare to set
|
|
|
- // up for a new poll.
|
|
|
+ // Ok, no results from previous select calls. Prepare to set up
|
|
|
+ // for a new select.
|
|
|
|
|
|
// First, report to anyone else who cares that we're the thread
|
|
|
// about to do the poll. That way, if any new sockets come
|
|
|
// available while we're polling, we can service them.
|
|
|
- PR_AtomicSet(&_currently_polling_thread, current_thread_index);
|
|
|
-
|
|
|
- if (_reexamine_sockets) {
|
|
|
- _reexamine_sockets = false;
|
|
|
- rebuild_poll_list();
|
|
|
- num_sockets = _polled_sockets.size();
|
|
|
- nassertr(num_sockets == (int)_poll.size(), NULL);
|
|
|
- }
|
|
|
+ AtomicAdjust::set(_currently_polling_thread, current_thread_index);
|
|
|
+
|
|
|
+ rebuild_select_list();
|
|
|
|
|
|
- // Now we can execute PR_Poll(). This basically maps to a Unix
|
|
|
- // select() call.
|
|
|
+ // Now we can execute the select.
|
|
|
_num_results = 0;
|
|
|
_next_index = 0;
|
|
|
|
|
|
if (!_shutdown) {
|
|
|
- PRIntervalTime poll_timeout =
|
|
|
- PR_MillisecondsToInterval(max_timeout_ms);
|
|
|
- if (timeout != PR_INTERVAL_NO_TIMEOUT) {
|
|
|
- poll_timeout = min(timeout, poll_timeout);
|
|
|
+ PN_uint32 timeout = max_timeout_ms;
|
|
|
+ if (!allow_block) {
|
|
|
+ timeout = 0;
|
|
|
}
|
|
|
|
|
|
- _num_results = PR_Poll(&_poll[0], num_sockets, poll_timeout);
|
|
|
+ _num_results = _fdset.WaitForRead(false, timeout);
|
|
|
}
|
|
|
|
|
|
- if (_num_results == 0 && timeout == PR_INTERVAL_NO_TIMEOUT) {
|
|
|
- // If we reached max_timeout_ms, but the caller didn't request
|
|
|
- // a timeout, consider that an interrupt: go back and
|
|
|
- // reconsider. (This is a kludge around the fact that
|
|
|
- // PR_Poll() appears to be non-interruptible.)
|
|
|
+ if (_num_results == 0 && allow_block) {
|
|
|
+ // If we reached max_timeout_ms, go back and reconsider. (We
|
|
|
+ // never timeout indefinitely, so we can check the shutdown
|
|
|
+ // flag every once in a while.)
|
|
|
interrupted = true;
|
|
|
|
|
|
} else if (_num_results < 0) {
|
|
|
- // If our poll was interrupted by another thread, rebuild the
|
|
|
- // list and poll again.
|
|
|
- PRErrorCode errcode = PR_GetError();
|
|
|
- if (errcode == PR_PENDING_INTERRUPT_ERROR) {
|
|
|
- interrupted = true;
|
|
|
- } else {
|
|
|
- pprerror("PR_Poll");
|
|
|
- }
|
|
|
+ // If we had an error, just return.
|
|
|
+ return (SocketInfo *)NULL;
|
|
|
}
|
|
|
} while (!_shutdown && interrupted);
|
|
|
|
|
|
- PR_AtomicSet(&_currently_polling_thread, -1);
|
|
|
- // Just in case someone interrupted us while we were polling and
|
|
|
- // we didn't catch it, clear it now--we don't care any more.
|
|
|
- PR_ClearInterrupt();
|
|
|
+ AtomicAdjust::set(_currently_polling_thread, current_thread_index);
|
|
|
|
|
|
// Repeat the above until we (a) find a socket with actual noise
|
|
|
// on it, or (b) return from PR_Poll() with no sockets available.
|
|
|
} while (!_shutdown && _num_results > 0);
|
|
|
|
|
|
- PR_Unlock(_select_mutex);
|
|
|
return (SocketInfo *)NULL;
|
|
|
}
|
|
|
|
|
|
|
|
|
////////////////////////////////////////////////////////////////////
|
|
|
-// Function: ConnectionReader::rebuild_poll_list
|
|
|
+// Function: ConnectionReader::rebuild_select_list
|
|
|
// Access: Private
|
|
|
-// Description: Rebuilds the _poll and _polled_sockets arrays based
|
|
|
-// on the sockets that are currently available for
|
|
|
-// polling.
|
|
|
+// Description: Rebuilds the _fdset and _selecting_sockets arrays
|
|
|
+// based on the sockets that are currently available for
|
|
|
+// selecting.
|
|
|
////////////////////////////////////////////////////////////////////
|
|
|
void ConnectionReader::
|
|
|
-rebuild_poll_list() {
|
|
|
- _poll.clear();
|
|
|
- _polled_sockets.clear();
|
|
|
+rebuild_select_list() {
|
|
|
+ _fdset.clear();
|
|
|
+ _selecting_sockets.clear();
|
|
|
|
|
|
- PR_Lock(_sockets_mutex);
|
|
|
+ MutexHolder holder(_sockets_mutex);
|
|
|
Sockets::const_iterator si;
|
|
|
for (si = _sockets.begin(); si != _sockets.end(); ++si) {
|
|
|
SocketInfo *sinfo = (*si);
|
|
|
if (!sinfo->_busy && !sinfo->_error) {
|
|
|
- PRPollDesc pd;
|
|
|
- pd.fd = sinfo->get_socket();
|
|
|
- pd.in_flags = PR_POLL_READ;
|
|
|
- pd.out_flags = 0;
|
|
|
-
|
|
|
- _poll.push_back(pd);
|
|
|
- _polled_sockets.push_back(sinfo);
|
|
|
+ _fdset.setForSocket(*sinfo->get_socket());
|
|
|
+ _selecting_sockets.push_back(sinfo);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -1056,6 +869,4 @@ rebuild_poll_list() {
|
|
|
}
|
|
|
_removed_sockets.swap(still_busy_sockets);
|
|
|
}
|
|
|
-
|
|
|
- PR_Unlock(_sockets_mutex);
|
|
|
}
|