| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622 |
- /*
- -----------------------------------------------------------------------------
- This source file is part of OGRE
- (Object-oriented Graphics Rendering Engine)
- For the latest info, see http://www.ogre3d.org/
- Copyright (c) 2000-2011 Torus Knot Software Ltd
- Permission is hereby granted, free of charge, to any person obtaining a copy
- of this software and associated documentation files (the "Software"), to deal
- in the Software without restriction, including without limitation the rights
- to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- copies of the Software, and to permit persons to whom the Software is
- furnished to do so, subject to the following conditions:
- The above copyright notice and this permission notice shall be included in
- all copies or substantial portions of the Software.
- THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
- THE SOFTWARE.
- -----------------------------------------------------------------------------
- */
- #include "CmWorkQueue.h"
- #include "CmDebug.h"
- namespace CamelotEngine {
- WorkQueue::WorkQueue()
- : mNextChannel(0)
- , mWorkerThreadCount(1)
- , mIsRunning(false)
- , mWorkerFunc(0)
- , mRequestCount(0)
- , mPaused(false)
- , mAcceptRequests(true)
- {}
- //---------------------------------------------------------------------
- WorkQueue::~WorkQueue()
- {
- shutdown();
- for (RequestQueue::iterator i = mRequestQueue.begin(); i != mRequestQueue.end(); ++i)
- {
- delete (*i);
- }
- mRequestQueue.clear();
- for (ResponseQueue::iterator i = mResponseQueue.begin(); i != mResponseQueue.end(); ++i)
- {
- delete (*i);
- }
- mResponseQueue.clear();
- }
- //---------------------------------------------------------------------
- void WorkQueue::startup(bool forceRestart)
- {
- if (mIsRunning)
- {
- if (forceRestart)
- shutdown();
- else
- return;
- }
- mShuttingDown = false;
- mWorkerFunc = new WorkerFunc(this);
- #if CM_THREAD_SUPPORT
- for (UINT8 i = 0; i < mWorkerThreadCount; ++i)
- {
- CM_THREAD_CREATE(t, *mWorkerFunc);
- mWorkers.push_back(t);
- }
- #endif
- mIsRunning = true;
- }
- //---------------------------------------------------------------------
- void WorkQueue::shutdown()
- {
- if( !mIsRunning )
- return;
- mShuttingDown = true;
- abortAllRequests();
- #if CM_THREAD_SUPPORT
- // wake all threads (they should check shutting down as first thing after wait)
- CM_THREAD_NOTIFY_ALL(mRequestCondition)
- // all our threads should have been woken now, so join
- for (WorkerThreadList::iterator i = mWorkers.begin(); i != mWorkers.end(); ++i)
- {
- (*i)->join();
- CM_THREAD_DESTROY(*i);
- }
- mWorkers.clear();
- #endif
- if (mWorkerFunc)
- {
- delete mWorkerFunc;
- mWorkerFunc = 0;
- }
- mIsRunning = false;
- }
- //---------------------------------------------------------------------
- void WorkQueue::addRequestHandler(UINT16 channel, RequestHandler* rh)
- {
- CM_LOCK_RW_MUTEX_WRITE(mRequestHandlerMutex);
- RequestHandlerListByChannel::iterator i = mRequestHandlers.find(channel);
- if (i == mRequestHandlers.end())
- i = mRequestHandlers.insert(RequestHandlerListByChannel::value_type(channel, RequestHandlerList())).first;
- RequestHandlerList& handlers = i->second;
- bool duplicate = false;
- for (RequestHandlerList::iterator j = handlers.begin(); j != handlers.end(); ++j)
- {
- if ((*j)->getHandler() == rh)
- {
- duplicate = true;
- break;
- }
- }
- if (!duplicate)
- handlers.push_back(RequestHandlerHolderPtr(new RequestHandlerHolder(rh)));
- }
- //---------------------------------------------------------------------
- void WorkQueue::removeRequestHandler(UINT16 channel, RequestHandler* rh)
- {
- CM_LOCK_RW_MUTEX_WRITE(mRequestHandlerMutex);
- RequestHandlerListByChannel::iterator i = mRequestHandlers.find(channel);
- if (i != mRequestHandlers.end())
- {
- RequestHandlerList& handlers = i->second;
- for (RequestHandlerList::iterator j = handlers.begin(); j != handlers.end(); ++j)
- {
- if ((*j)->getHandler() == rh)
- {
- // Disconnect - this will make it safe across copies of the list
- // this is threadsafe and will wait for existing processes to finish
- (*j)->disconnectHandler();
- handlers.erase(j);
- break;
- }
- }
- }
- }
- //---------------------------------------------------------------------
- void WorkQueue::addResponseHandler(UINT16 channel, ResponseHandler* rh)
- {
- ResponseHandlerListByChannel::iterator i = mResponseHandlers.find(channel);
- if (i == mResponseHandlers.end())
- i = mResponseHandlers.insert(ResponseHandlerListByChannel::value_type(channel, ResponseHandlerList())).first;
- ResponseHandlerList& handlers = i->second;
- if (std::find(handlers.begin(), handlers.end(), rh) == handlers.end())
- handlers.push_back(rh);
- }
- //---------------------------------------------------------------------
- void WorkQueue::removeResponseHandler(UINT16 channel, ResponseHandler* rh)
- {
- ResponseHandlerListByChannel::iterator i = mResponseHandlers.find(channel);
- if (i != mResponseHandlers.end())
- {
- ResponseHandlerList& handlers = i->second;
- ResponseHandlerList::iterator j = std::find(
- handlers.begin(), handlers.end(), rh);
- if (j != handlers.end())
- handlers.erase(j);
- }
- }
- //---------------------------------------------------------------------
- WorkQueue::RequestID WorkQueue::addRequest(UINT16 channel,
- const boost::any& rData, UINT8 retryCount, bool forceSynchronous)
- {
- Request* req = 0;
- RequestID rid = 0;
- {
- // lock to acquire rid and push request to the queue
- CM_LOCK_MUTEX(mRequestMutex)
- if (!mAcceptRequests || mShuttingDown)
- return 0;
- rid = ++mRequestCount;
- req = new Request(channel, rData, retryCount, rid);
- #if CM_THREAD_SUPPORT
- if (!forceSynchronous)
- {
- mRequestQueue.push_back(req);
- notifyWorkers();
- return rid;
- }
- #endif
- }
- processRequestResponse(req, true);
- return rid;
- }
- //---------------------------------------------------------------------
- void WorkQueue::abortRequest(RequestID id)
- {
- CM_LOCK_MUTEX(mProcessMutex)
- // NOTE: Pending requests are exist any of RequestQueue, ProcessQueue and
- // ResponseQueue when keeping ProcessMutex, so we check all of these queues.
- for (RequestQueue::iterator i = mProcessQueue.begin(); i != mProcessQueue.end(); ++i)
- {
- if ((*i)->getID() == id)
- {
- (*i)->abortRequest();
- break;
- }
- }
- {
- CM_LOCK_MUTEX(mRequestMutex)
- for (RequestQueue::iterator i = mRequestQueue.begin(); i != mRequestQueue.end(); ++i)
- {
- if ((*i)->getID() == id)
- {
- (*i)->abortRequest();
- break;
- }
- }
- }
- {
- CM_LOCK_MUTEX(mResponseMutex)
- for (ResponseQueue::iterator i = mResponseQueue.begin(); i != mResponseQueue.end(); ++i)
- {
- if( (*i)->getRequest()->getID() == id )
- {
- (*i)->abortRequest();
- break;
- }
- }
- }
- }
- //---------------------------------------------------------------------
- void WorkQueue::abortRequestsByChannel(UINT16 channel)
- {
- CM_LOCK_MUTEX(mProcessMutex)
- for (RequestQueue::iterator i = mProcessQueue.begin(); i != mProcessQueue.end(); ++i)
- {
- if ((*i)->getChannel() == channel)
- {
- (*i)->abortRequest();
- }
- }
- {
- CM_LOCK_MUTEX(mRequestMutex)
- for (RequestQueue::iterator i = mRequestQueue.begin(); i != mRequestQueue.end(); ++i)
- {
- if ((*i)->getChannel() == channel)
- {
- (*i)->abortRequest();
- }
- }
- }
- {
- CM_LOCK_MUTEX(mResponseMutex)
- for (ResponseQueue::iterator i = mResponseQueue.begin(); i != mResponseQueue.end(); ++i)
- {
- if( (*i)->getRequest()->getChannel() == channel )
- {
- (*i)->abortRequest();
- }
- }
- }
- }
- //---------------------------------------------------------------------
- void WorkQueue::abortAllRequests()
- {
- CM_LOCK_MUTEX(mProcessMutex)
- for (RequestQueue::iterator i = mProcessQueue.begin(); i != mProcessQueue.end(); ++i)
- {
- (*i)->abortRequest();
- }
- {
- CM_LOCK_MUTEX(mRequestMutex)
- for (RequestQueue::iterator i = mRequestQueue.begin(); i != mRequestQueue.end(); ++i)
- {
- (*i)->abortRequest();
- }
- }
- {
- CM_LOCK_MUTEX(mResponseMutex)
- for (ResponseQueue::iterator i = mResponseQueue.begin(); i != mResponseQueue.end(); ++i)
- {
- (*i)->abortRequest();
- }
- }
- }
- //---------------------------------------------------------------------
- void WorkQueue::setPaused(bool pause)
- {
- CM_LOCK_MUTEX(mRequestMutex)
- mPaused = pause;
- }
- //---------------------------------------------------------------------
- bool WorkQueue::isPaused() const
- {
- return mPaused;
- }
- //---------------------------------------------------------------------
- void WorkQueue::setRequestsAccepted(bool accept)
- {
- CM_LOCK_MUTEX(mRequestMutex)
- mAcceptRequests = accept;
- }
- //---------------------------------------------------------------------
- bool WorkQueue::getRequestsAccepted() const
- {
- return mAcceptRequests;
- }
- //---------------------------------------------------------------------
- size_t WorkQueue::getWorkerThreadCount() const
- {
- return mWorkerThreadCount;
- }
- //---------------------------------------------------------------------
- void WorkQueue::setWorkerThreadCount(size_t c)
- {
- mWorkerThreadCount = c;
- }
- //---------------------------------------------------------------------
- UINT16 WorkQueue::getChannel(const String& channelName)
- {
- CM_LOCK_MUTEX(mChannelMapMutex)
- ChannelMap::iterator i = mChannelMap.find(channelName);
- if (i == mChannelMap.end())
- {
- i = mChannelMap.insert(ChannelMap::value_type(channelName, mNextChannel++)).first;
- }
- return i->second;
- }
- //---------------------------------------------------------------------
- void WorkQueue::processResponses()
- {
- // TODO Low priority - Processing a lot of responses can cause a frame rate spike. Maybe limit the processing to Xms?
- // keep going until we run out of responses
- while(true)
- {
- Response* response = 0;
- {
- CM_LOCK_MUTEX(mResponseMutex)
- if (mResponseQueue.empty())
- break; // exit loop
- else
- {
- response = mResponseQueue.front();
- mResponseQueue.pop_front();
- }
- }
- if (response)
- {
- processResponse(response);
- delete response;
- }
- }
- }
- //---------------------------------------------------------------------
- void WorkQueue::processRequestResponse(Request* r, bool synchronous)
- {
- Response* response = processRequest(r);
- CM_LOCK_MUTEX(mProcessMutex)
- RequestQueue::iterator it;
- for( it = mProcessQueue.begin(); it != mProcessQueue.end(); ++it )
- {
- if( (*it) == r )
- {
- mProcessQueue.erase( it );
- break;
- }
- }
- if (response)
- {
- if (!response->succeeded())
- {
- // Failed, should we retry?
- const Request* req = response->getRequest();
- if (req->getRetryCount())
- {
- addRequestWithRID(req->getID(), req->getChannel(), req->getData(),
- req->getRetryCount() - 1);
- // discard response (this also deletes request)
- delete response;
- return;
- }
- }
- if (synchronous)
- {
- processResponse(response);
- delete response;
- }
- else
- {
- if( response->getRequest()->getAborted() )
- {
- // destroy response user data
- response->abortRequest();
- }
- // Queue response
- CM_LOCK_MUTEX(mResponseMutex)
- mResponseQueue.push_back(response);
- // no need to wake thread, this is processed by the main thread
- }
- }
- else
- {
- // no response, delete request
- gDebug().logWarning("warning: no handler processed request "
- + toString(r->getID()) + ", channel " + toString(r->getChannel()));
- delete r;
- }
- }
- WorkQueue::Response* WorkQueue::processRequest(Request* r)
- {
- RequestHandlerListByChannel handlerListCopy;
- {
- // lock the list only to make a copy of it, to maximise parallelism
- CM_LOCK_RW_MUTEX_READ(mRequestHandlerMutex);
- handlerListCopy = mRequestHandlers;
- }
- Response* response = 0;
- RequestHandlerListByChannel::iterator i = handlerListCopy.find(r->getChannel());
- if (i != handlerListCopy.end())
- {
- RequestHandlerList& handlers = i->second;
- for (RequestHandlerList::reverse_iterator j = handlers.rbegin(); j != handlers.rend(); ++j)
- {
- // threadsafe call which tests canHandleRequest and calls it if so
- response = (*j)->handleRequest(r, this);
- if (response)
- break;
- }
- }
- return response;
- }
- //---------------------------------------------------------------------
- void WorkQueue::processResponse(Response* r)
- {
- ResponseHandlerListByChannel::iterator i = mResponseHandlers.find(r->getRequest()->getChannel());
- if (i != mResponseHandlers.end())
- {
- ResponseHandlerList& handlers = i->second;
- for (ResponseHandlerList::reverse_iterator j = handlers.rbegin(); j != handlers.rend(); ++j)
- {
- if ((*j)->canHandleResponse(r, this))
- {
- (*j)->handleResponse(r, this);
- }
- }
- }
- }
- //---------------------------------------------------------------------
- void WorkQueue::addRequestWithRID(WorkQueue::RequestID rid, UINT16 channel,
- const boost::any& rData, UINT8 retryCount)
- {
- // lock to push request to the queue
- CM_LOCK_MUTEX(mRequestMutex)
- if (mShuttingDown)
- return;
- Request* req = new Request(channel, rData, retryCount, rid);
- #if CM_THREAD_SUPPORT
- mRequestQueue.push_back(req);
- notifyWorkers();
- #else
- processRequestResponse(req, true);
- #endif
- }
- //---------------------------------------------------------------------
- void WorkQueue::processNextRequest()
- {
- Request* request = 0;
- {
- // scoped to only lock while retrieving the next request
- CM_LOCK_MUTEX(mProcessMutex)
- {
- CM_LOCK_MUTEX(mRequestMutex)
- if (!mRequestQueue.empty())
- {
- request = mRequestQueue.front();
- mRequestQueue.pop_front();
- mProcessQueue.push_back( request );
- }
- }
- }
- if (request)
- {
- processRequestResponse(request, false);
- }
- }
- //---------------------------------------------------------------------
- void WorkQueue::waitForNextRequest()
- {
- #if CM_THREAD_SUPPORT
- // Lock; note that OGRE_THREAD_WAIT will free the lock
- CM_LOCK_MUTEX_NAMED(mRequestMutex, queueLock);
- if (mRequestQueue.empty())
- {
- // frees lock and suspends the thread
- CM_THREAD_WAIT(mRequestCondition, mRequestMutex, queueLock);
- }
- // When we get back here, it's because we've been notified
- // and thus the thread has been woken up. Lock has also been
- // re-acquired, but we won't use it. It's safe to try processing and fail
- // if another thread has got in first and grabbed the request
- #endif
- }
- //---------------------------------------------------------------------
- void WorkQueue::threadMain()
- {
- // default worker thread
- #if CM_THREAD_SUPPORT
- // Spin forever until we're told to shut down
- while (!isShuttingDown())
- {
- waitForNextRequest();
- processNextRequest();
- }
- #endif
- }
- //---------------------------------------------------------------------
- void WorkQueue::notifyWorkers()
- {
- // wake up waiting thread
- CM_THREAD_NOTIFY_ONE(mRequestCondition)
- }
- //---------------------------------------------------------------------
- WorkQueue::Request::Request(UINT16 channel, const boost::any& rData, UINT8 retry, RequestID rid)
- : mChannel(channel), mData(rData), mRetryCount(retry), mID(rid), mAborted(false)
- {
- }
- //---------------------------------------------------------------------
- WorkQueue::Request::~Request()
- {
- }
- //---------------------------------------------------------------------
- //---------------------------------------------------------------------
- WorkQueue::Response::Response(const Request* rq, bool success, const boost::any& data)
- : mRequest(rq), mSuccess(success), mData(data)
- {
- }
- //---------------------------------------------------------------------
- WorkQueue::Response::~Response()
- {
- delete mRequest;
- }
- //---------------------------------------------------------------------
- void WorkQueue::WorkerFunc::operator()()
- {
- mQueue->threadMain();
- }
- void WorkQueue::WorkerFunc::run()
- {
- mQueue->threadMain();
- }
- }
|