| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359 |
- /* Copyright The kNet Project.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License. */
- /** @file NetworkWorkerThread.cpp
- @brief */
- // Modified by Lasse Oorni for Urho3D
- #include <utility>
- #ifdef KNET_USE_BOOST
- #include <boost/thread/thread.hpp>
- #endif
- #include "kNet/DebugMemoryLeakCheck.h"
- #include "kNet/UDPMessageConnection.h"
- #include "kNet/NetworkWorkerThread.h"
- #include "kNet/NetworkLogging.h"
- #include "kNet/Event.h"
- #include "kNet/EventArray.h"
- #include "kNet/Clock.h"
- #include "kNet/Thread.h"
- using namespace std;
- namespace kNet
- {
- NetworkWorkerThread::NetworkWorkerThread()
- {
- }
- void NetworkWorkerThread::AddConnection(MessageConnection *connection)
- {
- workThread.Hold();
- Lockable<std::vector<MessageConnection *> >::LockType lock = connections.Acquire();
- lock->push_back(connection);
- LOG(LogVerbose, "Added connection %p to NetworkWorkerThread.", connection);
- workThread.Resume();
- }
- void NetworkWorkerThread::RemoveConnection(MessageConnection *connection)
- {
- workThread.Hold();
- Lockable<std::vector<MessageConnection *> >::LockType lock = connections.Acquire();
- for(size_t i = 0; i < lock->size(); ++i)
- if ((*lock)[i] == connection)
- {
- lock->erase(lock->begin() + i);
- LOG(LogVerbose, "NetworkWorkerThread::RemoveConnection: Connection %p removed.", connection);
- workThread.Resume();
- return;
- }
- LOG(LogError, "NetworkWorkerThread::RemoveConnection called for a nonexisting connection %p!", connection);
- workThread.Resume();
- }
- void NetworkWorkerThread::AddServer(NetworkServer *server)
- {
- workThread.Hold();
- Lockable<std::vector<NetworkServer *> >::LockType lock = servers.Acquire();
- lock->push_back(server);
- LOG(LogVerbose, "Added server %p to NetworkWorkerThread.", server);
- workThread.Resume();
- }
- void NetworkWorkerThread::RemoveServer(NetworkServer *server)
- {
- workThread.Hold();
- PolledTimer timer;
- Lockable<std::vector<NetworkServer *> >::LockType lock = servers.Acquire();
- float lockWait = timer.MSecsElapsed();
- LOG(LogWaits, "NetworkWorkerThread::RemoveServer: Waited %f msecs to lock servers list.",
- lockWait);
- for(size_t i = 0; i < lock->size(); ++i)
- if ((*lock)[i] == server)
- {
- lock->erase(lock->begin() + i);
- LOG(LogVerbose, "NetworkWorkerThread::RemoveServer: Server %p removed.", server);
- workThread.Resume();
- return;
- }
- LOG(LogError, "NetworkWorkerThread::RemoveServer called for a nonexisting server %p!", server);
- workThread.Resume();
- }
- void NetworkWorkerThread::StartThread()
- {
- workThread.Run(this, &NetworkWorkerThread::MainLoop);
- }
- void NetworkWorkerThread::StopThread()
- {
- workThread.Stop();
- {
- Lockable<std::vector<NetworkServer *> >::LockType lock = servers.Acquire();
- for(size_t i = 0; i < lock->size(); ++i)
- {
- LOG(LogError, "NetworkWorkerThread::StopThread: Warning: NetworkServer %p was not detached from workerThread %p prior to stopping the thread!.", (*lock)[i], this);
- (*lock)[i]->SetWorkerThread(0);
- }
- }
- {
- Lockable<std::vector<MessageConnection *> >::LockType lock = connections.Acquire();
- for(size_t i = 0; i < lock->size(); ++i)
- {
- LOG(LogError, "NetworkWorkerThread::StopThread: Warning: MessageConnection %p was not detached from workerThread %p prior to stopping the thread!.", (*lock)[i], this);
- (*lock)[i]->SetWorkerThread(0);
- }
- }
- }
- int NetworkWorkerThread::NumConnections() const
- {
- return connections.Acquire()->size();
- }
- int NetworkWorkerThread::NumServers() const
- {
- return servers.Acquire()->size();
- }
- void NetworkWorkerThread::MainLoop()
- {
- std::vector<MessageConnection*> writeWaitConnections;
- EventArray waitEvents;
- // This is an event that is always false and will never be set.
- Event falseEvent = CreateNewEvent(EventWaitDummy);
- assert(!falseEvent.IsNull());
- assert(falseEvent.Test() == false);
- LOG(LogInfo, "NetworkWorkerThread starting main loop.");
- std::vector<MessageConnection*> connectionList;
- std::vector<NetworkServer*> serverList;
- while(!workThread.ShouldQuit())
- {
- workThread.CheckHold();
- if (workThread.ShouldQuit())
- break;
-
- {
- Lockable<std::vector<MessageConnection *> >::LockType lock = connections.Acquire();
- connectionList = *lock;
- }
- {
- Lockable<std::vector<NetworkServer *> >::LockType serverLock = servers.Acquire();
- serverList = *serverLock;
- }
- // Inconveniency: Cannot wait for long time periods, since this will call select() or WSAWaitForMultipleObjects,
- // which does not support aborting from the wait if the thread is signalled to interrupt and quit/join. To fix
- // this, should add a custom "interrupt Event" into the WaitArray to wake the thread up when it is supposed to be killed.
- // For now, just sleep only small periods of time at once to make this issue not a problem at application exit time.
- // Urho3D: changed smaller for potentially less network latency
- const int maxWaitTime = 10; // msecs. ///\todo Make this a lot larger, like, 2000msecs, once the thread interrupts are handled in Sleep and EventArray::Wait.
- int waitTime = maxWaitTime;
- waitEvents.Clear();
- writeWaitConnections.clear();
- // Next, build the event array that is used for waiting on the sockets.
- // At odd indices we will have socket read events, and at even indices the socket write events.
- // After the events for each connection, we will have the UDP listen sockets for each UDP server connection.
- for(size_t i = 0; i < connectionList.size(); ++i)
- {
- MessageConnection &connection = *connectionList[i];
- try
- {
- connection.UpdateConnection();
- } catch(const NetException &e)
- {
- LOG(LogError, (std::string("kNet::NetException thrown when processing UpdateConnection() for client connection: ") + e.what()).c_str());
- if (connection.GetSocket())
- connection.GetSocket()->Close();
- }
- if (connection.GetConnectionState() == ConnectionClosed || !connection.GetSocket() || !connection.GetSocket()->Connected()) // This does not need to be checked each iteration.
- {
- connectionList.erase(connectionList.begin()+i);
- --i;
- continue;
- }
- // The event that is triggered when data is received on the socket.
- Event readEvent = connection.GetSocket()->GetOverlappedReceiveEvent();
- if (readEvent.IsNull() || connection.GetSocket()->IsUDPSlaveSocket())
- readEvent = falseEvent; // If this socket is not readable, add a false event to skip this event slot.
- waitEvents.AddEvent(readEvent);
- // Determine which event to listen to for sending out data. There are three factors:
- // 1) Is the socket ready for sending (data buffer -wise)?
- // 2) Are there new messages to send?
- // 3) Does the send throttle timer allow us to send data to the socket? (UDP only)
- // If true, this socket is ready to receive new data to be sent.
- bool socketSendReady = connection.GetSocket()->IsOverlappedSendReady() || connection.GetSocket()->GetOverlappedSendEvent().Test();
- // If true, this MessageConnection has new unsent data that needs to be sent out.
- bool socketMessagesAvailable = connection.NumOutboundMessagesPending() > 0 || connection.NewOutboundMessagesEvent().Test();
- if (socketSendReady && socketMessagesAvailable)
- {
- if (connection.GetSocket()->TransportLayer() == SocketOverUDP)
- {
- int msecsLeftUntilWrite = (int)connection.TimeUntilCanSendPacket();
- waitTime = min(waitTime, msecsLeftUntilWrite);
- writeWaitConnections.push_back(&connection);
- waitEvents.AddEvent(falseEvent);
- assert(falseEvent.Test() == false && !falseEvent.IsNull());
- }
- else // TCP socket
- waitEvents.AddEvent(connection.NewOutboundMessagesEvent());
- }
- else if (socketMessagesAvailable) // Here, socketSendReady == false
- {
- Event sendEvent = connection.GetSocket()->GetOverlappedSendEvent();
- if (sendEvent.IsNull())
- waitEvents.AddEvent(falseEvent);
- else
- waitEvents.AddEvent(sendEvent);
- }
- else // Here, socketMessagesAvailable == false
- {
- waitEvents.AddEvent(connection.NewOutboundMessagesEvent());
- }
- }
- // Add all the UDP server listen sockets to the wait event list.
- // For UDP servers, only a single socket is used for receiving data from all clients.
- // In this case, the NetworkServer object handlesg all data reads, but data sends
- // are still managed by the individual MessageConnection objects.
- // For TCP servers, this step is not needed, since each connection has its own independent socket.
- for(size_t i = 0; i < serverList.size(); ++i)
- {
- NetworkServer &server = *serverList[i];
- std::vector<Socket *> &listenSockets = server.ListenSockets();
- for(size_t j = 0; j < listenSockets.size(); ++j)
- if (listenSockets[j]->TransportLayer() == SocketOverUDP)
- {
- Event listenEvent = listenSockets[j]->GetOverlappedReceiveEvent();
- if (listenEvent.IsNull())
- waitEvents.AddEvent(falseEvent);
- else
- waitEvents.AddEvent(listenEvent);
- }
- }
- // If we did not end up adding any wait events to the queue above, the worker thread does not have
- // any connections to manage. Sleep for a moment, until we get some connections to handle.
- if (waitEvents.Size() == 0)
- {
- Thread::Sleep(maxWaitTime);
- continue;
- }
- // Wait until an event occurs either from the application end or in the socket.
- // When the application wants to send out a message, it is signaled by an event here.
- // Also, when the socket is ready for reading, writing or if it has been closed, it is signaled here.
- int index = waitEvents.Wait(max<int>(1, waitTime));
- if (index >= 0 && index < waitEvents.Size()) // An event was triggered?
- {
- if ((index >> 1) < (int)connectionList.size())
- {
- MessageConnection *connection = connectionList[index>>1];
- assert(connection);
- try
- {
- // A socket event was raised. We can either read or write.
- if ((index & 1) == 0)
- {
- connection->ReadSocket();
- connection->SendOutPackets();
- }
- else // new outbound messages were received from the application.
- connection->SendOutPackets();
- } catch(const NetException &e)
- {
- LOG(LogError, (std::string("kNet::NetException thrown when processing client connection: ") + e.what()).c_str());
- if (connection->GetSocket())
- connection->GetSocket()->Close();
- }
- }
- else // A UDP server received a message.
- {
- int socketIndex = index - connectionList.size() * 2;
- if (serverList.size() > 0)
- {
- NetworkServer &server = *serverList[0]; ///\bug In case of multiple servers, this is not correct!
- std::vector<Socket *> &listenSockets = server.ListenSockets();
- if (socketIndex < (int)listenSockets.size())
- {
- try
- {
- server.ReadUDPSocketData(listenSockets[socketIndex]);
- } catch(const NetException &e)
- {
- LOG(LogError, (std::string("kNet::NetException thrown when reading server socket: ") + e.what()).c_str());
- ///\todo Could Close(0) the connection here.
- }
- }
- else
- {
- LOG(LogError, "NetworkWorkerThread::MainLoop: Warning: Cannot find server socket to read from: EventArray::Wait returned index %d (socketIndex %d), but "
- "serverList.size()=%d, connectionList.size()=%d!", index, socketIndex, (int)serverList.size(), (int)connectionList.size());
- }
- }
- else
- {
- LOG(LogError, "NetworkWorkerThread::MainLoop: Warning: EventArray::Wait returned index %d (socketIndex %d), but "
- "serverList.size()=%d, connectionList.size()=%d!", index, socketIndex, (int)serverList.size(), (int)connectionList.size());
- }
- }
- }
- // The UDP send throttle timers are not read through events. The writeWaitConnections list
- // contains a list of UDP connections which are now, or will very soon (in less than 1msec) be ready for writing.
- // Poll each and try to send a message.
- for(size_t i = 0; i < writeWaitConnections.size(); ++i)
- {
- try
- {
- writeWaitConnections[i]->SendOutPackets();
- } catch(const NetException &e)
- {
- LOG(LogError, (std::string("kNet::NetException thrown when sending out a network message: ") + e.what()).c_str());
- }
- }
- }
- falseEvent.Close();
- LOG(LogInfo, "NetworkWorkerThread quit.");
- }
- } // ~kNet
|