|
@@ -0,0 +1,415 @@
|
|
|
|
|
+// Filename: asyncTaskManager.cxx
|
|
|
|
|
+// Created by: drose (23Aug06)
|
|
|
|
|
+//
|
|
|
|
|
+////////////////////////////////////////////////////////////////////
|
|
|
|
|
+//
|
|
|
|
|
+// PANDA 3D SOFTWARE
|
|
|
|
|
+// Copyright (c) 2001 - 2004, Disney Enterprises, Inc. All rights reserved
|
|
|
|
|
+//
|
|
|
|
|
+// All use of this software is subject to the terms of the Panda 3d
|
|
|
|
|
+// Software license. You should have received a copy of this license
|
|
|
|
|
+// along with this source code; you will also find a current copy of
|
|
|
|
|
+// the license at http://etc.cmu.edu/panda3d/docs/license/ .
|
|
|
|
|
+//
|
|
|
|
|
+// To contact the maintainers of this program write to
|
|
|
|
|
+// [email protected] .
|
|
|
|
|
+//
|
|
|
|
|
+////////////////////////////////////////////////////////////////////
|
|
|
|
|
+
|
|
|
|
|
+#include "asyncTaskManager.h"
|
|
|
|
|
+#include "event.h"
|
|
|
|
|
+#include "pt_Event.h"
|
|
|
|
|
+#include "throw_event.h"
|
|
|
|
|
+#include "eventParameter.h"
|
|
|
|
|
+#include "mutexHolder.h"
|
|
|
|
|
+#include "indent.h"
|
|
|
|
|
+
|
|
|
|
|
+TypeHandle AsyncTaskManager::_type_handle;
|
|
|
|
|
+
|
|
|
|
|
+////////////////////////////////////////////////////////////////////
|
|
|
|
|
+// Function: AsyncTaskManager::Constructor
|
|
|
|
|
+// Access: Published
|
|
|
|
|
+// Description:
|
|
|
|
|
+////////////////////////////////////////////////////////////////////
|
|
|
|
|
+AsyncTaskManager::
|
|
|
|
|
+AsyncTaskManager(const string &name, int num_threads) :
|
|
|
|
|
+ Namable(name),
|
|
|
|
|
+ _num_threads(num_threads),
|
|
|
|
|
+ _cvar(_lock),
|
|
|
|
|
+ _state(S_initial),
|
|
|
|
|
+ _num_tasks(0)
|
|
|
|
|
+{
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+////////////////////////////////////////////////////////////////////
|
|
|
|
|
+// Function: AsyncTaskManager::Destructor
|
|
|
|
|
+// Access: Published, Virtual
|
|
|
|
|
+// Description:
|
|
|
|
|
+////////////////////////////////////////////////////////////////////
|
|
|
|
|
+AsyncTaskManager::
|
|
|
|
|
+~AsyncTaskManager() {
|
|
|
|
|
+ if (_state == S_started) {
|
|
|
|
|
+ // Clean up all of the threads.
|
|
|
|
|
+ MutexHolder holder(_lock);
|
|
|
|
|
+ _state = S_shutdown;
|
|
|
|
|
+ _cvar.signal_all();
|
|
|
|
|
+
|
|
|
|
|
+ Threads::iterator ti;
|
|
|
|
|
+ for (ti = _threads.begin(); ti != _threads.end(); ++ti) {
|
|
|
|
|
+ (*ti)->join();
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+////////////////////////////////////////////////////////////////////
|
|
|
|
|
+// Function: AsyncTaskManager::add
|
|
|
|
|
+// Access: Published
|
|
|
|
|
+// Description: Adds the indicated task to the active queue. It is
|
|
|
|
|
+// an error if the task is already added to this or any
|
|
|
|
|
+// other active queue.
|
|
|
|
|
+////////////////////////////////////////////////////////////////////
|
|
|
|
|
+void AsyncTaskManager::
|
|
|
|
|
+add(AsyncTask *task) {
|
|
|
|
|
+ MutexHolder holder(_lock);
|
|
|
|
|
+
|
|
|
|
|
+ nassertv(task->_manager == NULL &&
|
|
|
|
|
+ task->_state == AsyncTask::S_inactive);
|
|
|
|
|
+ nassertv(find_task(task) == -1);
|
|
|
|
|
+
|
|
|
|
|
+ // Attempt to start the threads, if we haven't already.
|
|
|
|
|
+ if (_state == S_initial) {
|
|
|
|
|
+ _state = S_started;
|
|
|
|
|
+ if (Thread::is_threading_supported()) {
|
|
|
|
|
+ _threads.reserve(_num_threads);
|
|
|
|
|
+ for (int i = 0; i < _num_threads; ++i) {
|
|
|
|
|
+ PT(AsyncTaskManagerThread) thread = new AsyncTaskManagerThread(this);
|
|
|
|
|
+ if (thread->start(TP_low, true, true)) {
|
|
|
|
|
+ _threads.push_back(thread);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ task->_manager = this;
|
|
|
|
|
+ task->_state = AsyncTask::S_active;
|
|
|
|
|
+
|
|
|
|
|
+ _active.push_back(task);
|
|
|
|
|
+ ++_num_tasks;
|
|
|
|
|
+ _cvar.signal_all();
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+////////////////////////////////////////////////////////////////////
|
|
|
|
|
+// Function: AsyncTaskManager::add_and_do
|
|
|
|
|
+// Access: Published
|
|
|
|
|
+// Description: Adds the indicated task to the active queue, and
|
|
|
|
|
+// executes it immediately if this is a non-threaded
|
|
|
|
|
+// task manager.
|
|
|
|
|
+//
|
|
|
|
|
+// The only difference between this method and add() is
|
|
|
|
|
+// in the case of a non-threaded task manager: in this
|
|
|
|
|
+// case, the method will execute the task inline, at
|
|
|
|
|
+// least one frame, before returning. If the task
|
|
|
|
|
+// completes in one frame, this means it will completely
|
|
|
|
|
+// execute the task before returning in the non-threaded
|
|
|
|
|
+// case. In the threaded case, this method behaves
|
|
|
|
|
+// exactly the same as add().
|
|
|
|
|
+
|
|
|
|
|
+// The return value is true if the task has been added
|
|
|
|
|
+// and is still pending, false if it has completed.
|
|
|
|
|
+////////////////////////////////////////////////////////////////////
|
|
|
|
|
+bool AsyncTaskManager::
|
|
|
|
|
+add_and_do(AsyncTask *task) {
|
|
|
|
|
+ MutexHolder holder(_lock);
|
|
|
|
|
+
|
|
|
|
|
+ nassertr(task->_manager == NULL &&
|
|
|
|
|
+ task->_state == AsyncTask::S_inactive, false);
|
|
|
|
|
+ nassertr(find_task(task) == -1, false);
|
|
|
|
|
+
|
|
|
|
|
+ // Attempt to start the threads, if we haven't already.
|
|
|
|
|
+ if (_state == S_initial) {
|
|
|
|
|
+ _state = S_started;
|
|
|
|
|
+ if (Thread::is_threading_supported()) {
|
|
|
|
|
+ _threads.reserve(_num_threads);
|
|
|
|
|
+ for (int i = 0; i < _num_threads; ++i) {
|
|
|
|
|
+ PT(AsyncTaskManagerThread) thread = new AsyncTaskManagerThread(this);
|
|
|
|
|
+ if (thread->start(TP_low, true, true)) {
|
|
|
|
|
+ _threads.push_back(thread);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ task->_manager = this;
|
|
|
|
|
+ task->_state = AsyncTask::S_active;
|
|
|
|
|
+
|
|
|
|
|
+ if (_threads.empty()) {
|
|
|
|
|
+ // Got no threads. Try to execute the task immediately.
|
|
|
|
|
+ if (!task->do_task()) {
|
|
|
|
|
+ // The task has finished in one frame. Don't add it to the
|
|
|
|
|
+ // queue.
|
|
|
|
|
+ task_done(task);
|
|
|
|
|
+ return false;
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ _active.push_back(task);
|
|
|
|
|
+ ++_num_tasks;
|
|
|
|
|
+ _cvar.signal_all();
|
|
|
|
|
+
|
|
|
|
|
+ return true;
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+////////////////////////////////////////////////////////////////////
|
|
|
|
|
+// Function: AsyncTaskManager::remove
|
|
|
|
|
+// Access: Published
|
|
|
|
|
+// Description: Removes the indicated task from the active queue.
|
|
|
|
|
+// Returns true if the task is successfully removed, or
|
|
|
|
|
+// false if it wasn't there.
|
|
|
|
|
+////////////////////////////////////////////////////////////////////
|
|
|
|
|
+bool AsyncTaskManager::
|
|
|
|
|
+remove(AsyncTask *task) {
|
|
|
|
|
+ MutexHolder holder(_lock);
|
|
|
|
|
+
|
|
|
|
|
+ // It's just possible this particular task is currently being
|
|
|
|
|
+ // serviced. Wait for it to finish.
|
|
|
|
|
+ while (task->_manager == this &&
|
|
|
|
|
+ task->_state == AsyncTask::S_servicing) {
|
|
|
|
|
+ _cvar.wait();
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ if (task->_manager != this) {
|
|
|
|
|
+ nassertr(find_task(task) == -1, false);
|
|
|
|
|
+ return false;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ nassertr(task->_state == AsyncTask::S_active, false);
|
|
|
|
|
+
|
|
|
|
|
+ int index = find_task(task);
|
|
|
|
|
+ nassertr(index != -1, false);
|
|
|
|
|
+ _active.erase(_active.begin() + index);
|
|
|
|
|
+ --_num_tasks;
|
|
|
|
|
+
|
|
|
|
|
+ task->_state = AsyncTask::S_inactive;
|
|
|
|
|
+ task->_manager = NULL;
|
|
|
|
|
+
|
|
|
|
|
+ return true;
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+////////////////////////////////////////////////////////////////////
|
|
|
|
|
+// Function: AsyncTaskManager::has_task
|
|
|
|
|
+// Access: Published
|
|
|
|
|
+// Description: Returns true if the indicated task is currently in
|
|
|
|
|
+// this manager's active queue, or false otherwise.
|
|
|
|
|
+////////////////////////////////////////////////////////////////////
|
|
|
|
|
+bool AsyncTaskManager::
|
|
|
|
|
+has_task(AsyncTask *task) const {
|
|
|
|
|
+ MutexHolder holder(_lock);
|
|
|
|
|
+
|
|
|
|
|
+ if (task->_manager != this) {
|
|
|
|
|
+ nassertr(find_task(task) == -1, false);
|
|
|
|
|
+ return false;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // The task might not actually be in the active queue, since it
|
|
|
|
|
+ // might be being serviced right now. That's OK.
|
|
|
|
|
+ return true;
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+////////////////////////////////////////////////////////////////////
|
|
|
|
|
+// Function: AsyncTaskManager::poll
|
|
|
|
|
+// Access: Published
|
|
|
|
|
+// Description: Runs through all the tasks in the task list, once, if
|
|
|
|
|
+// the task manager is running in single-threaded mode
|
|
|
|
|
+// (no threads available). This method does nothing in
|
|
|
|
|
+// threaded mode, so it may safely be called in either
|
|
|
|
|
+// case.
|
|
|
|
|
+////////////////////////////////////////////////////////////////////
|
|
|
|
|
+void AsyncTaskManager::
|
|
|
|
|
+poll() {
|
|
|
|
|
+ MutexHolder holder(_lock);
|
|
|
|
|
+ if (_threads.empty()) {
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ Tasks new_active;
|
|
|
|
|
+ int new_num_tasks = 0;
|
|
|
|
|
+ Tasks::iterator ti;
|
|
|
|
|
+ for (ti = _active.begin(); ti != _active.end(); ++ti) {
|
|
|
|
|
+ AsyncTask *task = (*ti);
|
|
|
|
|
+ nassertv(task->_state == AsyncTask::S_active);
|
|
|
|
|
+ task->_state = AsyncTask::S_servicing;
|
|
|
|
|
+
|
|
|
|
|
+ // Here we keep the manager lock held while we are servicing each
|
|
|
|
|
+ // task. This is the single-threaded implementation, after all,
|
|
|
|
|
+ // so what difference should it make?
|
|
|
|
|
+ if (task->do_task()) {
|
|
|
|
|
+ new_active.push_back(task);
|
|
|
|
|
+ ++new_num_tasks;
|
|
|
|
|
+ task->_state = AsyncTask::S_active;
|
|
|
|
|
+
|
|
|
|
|
+ } else {
|
|
|
|
|
+ // The task has finished.
|
|
|
|
|
+ task_done(task);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ _active.swap(new_active);
|
|
|
|
|
+ _num_tasks = new_num_tasks;
|
|
|
|
|
+ _cvar.signal_all();
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+////////////////////////////////////////////////////////////////////
|
|
|
|
|
+// Function: AsyncTaskManager::output
|
|
|
|
|
+// Access: Published, Virtual
|
|
|
|
|
+// Description:
|
|
|
|
|
+////////////////////////////////////////////////////////////////////
|
|
|
|
|
+void AsyncTaskManager::
|
|
|
|
|
+output(ostream &out) const {
|
|
|
|
|
+ MutexHolder holder(_lock);
|
|
|
|
|
+
|
|
|
|
|
+ out << get_type() << " " << get_name()
|
|
|
|
|
+ << "; " << get_num_tasks() << " tasks";
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+////////////////////////////////////////////////////////////////////
|
|
|
|
|
+// Function: AsyncTaskManager::write
|
|
|
|
|
+// Access: Published, Virtual
|
|
|
|
|
+// Description:
|
|
|
|
|
+////////////////////////////////////////////////////////////////////
|
|
|
|
|
+void AsyncTaskManager::
|
|
|
|
|
+write(ostream &out, int indent_level) const {
|
|
|
|
|
+ MutexHolder holder(_lock);
|
|
|
|
|
+ indent(out, indent_level)
|
|
|
|
|
+ << get_type() << " " << get_name() << "\n";
|
|
|
|
|
+
|
|
|
|
|
+ Threads::const_iterator thi;
|
|
|
|
|
+ for (thi = _threads.begin(); thi != _threads.end(); ++thi) {
|
|
|
|
|
+ AsyncTask *task = (*thi)->_servicing;
|
|
|
|
|
+ if (task != (AsyncTask *)NULL) {
|
|
|
|
|
+ indent(out, indent_level + 1)
|
|
|
|
|
+ << "*" << *task << "\n";
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ Tasks::const_iterator ti;
|
|
|
|
|
+ for (ti = _active.begin(); ti != _active.end(); ++ti) {
|
|
|
|
|
+ AsyncTask *task = (*ti);
|
|
|
|
|
+ indent(out, indent_level + 2)
|
|
|
|
|
+ << *task << "\n";
|
|
|
|
|
+ }
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+////////////////////////////////////////////////////////////////////
|
|
|
|
|
+// Function: AsyncTaskManager::find_task
|
|
|
|
|
+// Access: Protected
|
|
|
|
|
+// Description: Returns the index number within the given request
|
|
|
|
|
+// queue of the indicated task, or -1 if the task is not
|
|
|
|
|
+// found in the active queue (this may mean that it is
|
|
|
|
|
+// currently being serviced). Assumes that the lock is
|
|
|
|
|
+// currently held.
|
|
|
|
|
+////////////////////////////////////////////////////////////////////
|
|
|
|
|
+int AsyncTaskManager::
|
|
|
|
|
+find_task(AsyncTask *task) const {
|
|
|
|
|
+ for (int i = 0; i < (int)_active.size(); ++i) {
|
|
|
|
|
+ if (_active[i] == task) {
|
|
|
|
|
+ return i;
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ return -1;
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+////////////////////////////////////////////////////////////////////
|
|
|
|
|
+// Function: AsyncTaskManager::service_one_task
|
|
|
|
|
+// Access: Protected
|
|
|
|
|
+// Description: Pops a single task off the active queue, services it,
|
|
|
|
|
+// and restores it to the end of the queue. This is
|
|
|
|
|
+// called internally only within one of the task
|
|
|
|
|
+// threads. Assumes the lock is already held.
|
|
|
|
|
+////////////////////////////////////////////////////////////////////
|
|
|
|
|
+void AsyncTaskManager::
|
|
|
|
|
+service_one_task(AsyncTaskManager::AsyncTaskManagerThread *thread) {
|
|
|
|
|
+ if (!_active.empty()) {
|
|
|
|
|
+ AsyncTask *task = _active.front();
|
|
|
|
|
+ _active.pop_front();
|
|
|
|
|
+ thread->_servicing = task;
|
|
|
|
|
+
|
|
|
|
|
+ nassertv(task->_state == AsyncTask::S_active);
|
|
|
|
|
+ task->_state = AsyncTask::S_servicing;
|
|
|
|
|
+
|
|
|
|
|
+ // Now release the manager lock while we actually service the
|
|
|
|
|
+ // task.
|
|
|
|
|
+ _lock.release();
|
|
|
|
|
+ bool keep = task->do_task();
|
|
|
|
|
+
|
|
|
|
|
+ // Now we have to re-acquire the manager lock, so we can put the
|
|
|
|
|
+ // task back on the queue (and so we can return with the lock
|
|
|
|
|
+ // still held).
|
|
|
|
|
+ _lock.lock();
|
|
|
|
|
+
|
|
|
|
|
+ thread->_servicing = NULL;
|
|
|
|
|
+ if (task->_manager == this) {
|
|
|
|
|
+ if (keep) {
|
|
|
|
|
+ // The task is still alive; put it on the end of the active
|
|
|
|
|
+ // queue.
|
|
|
|
|
+ _active.push_back(task);
|
|
|
|
|
+ task->_state = AsyncTask::S_active;
|
|
|
|
|
+
|
|
|
|
|
+ } else {
|
|
|
|
|
+ // The task has finished.
|
|
|
|
|
+ task_done(task);
|
|
|
|
|
+ }
|
|
|
|
|
+ _cvar.signal_all();
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+////////////////////////////////////////////////////////////////////
|
|
|
|
|
+// Function: AsyncTaskManager::task_done
|
|
|
|
|
+// Access: Protected
|
|
|
|
|
+// Description: Called internally when a task has completed and is
|
|
|
|
|
+// about to be removed from the active queue. Assumes
|
|
|
|
|
+// the lock is held.
|
|
|
|
|
+////////////////////////////////////////////////////////////////////
|
|
|
|
|
+void AsyncTaskManager::
|
|
|
|
|
+task_done(AsyncTask *task) {
|
|
|
|
|
+ task->_state = AsyncTask::S_inactive;
|
|
|
|
|
+ task->_manager = NULL;
|
|
|
|
|
+
|
|
|
|
|
+ if (!task->_done_event.empty()) {
|
|
|
|
|
+ PT_Event event = new Event(task->_done_event);
|
|
|
|
|
+ event->add_parameter(EventParameter(task));
|
|
|
|
|
+ throw_event(event);
|
|
|
|
|
+ }
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+////////////////////////////////////////////////////////////////////
|
|
|
|
|
+// Function: AsyncTaskManager::AsyncTaskManagerThread::Constructor
|
|
|
|
|
+// Access: Public
|
|
|
|
|
+// Description:
|
|
|
|
|
+////////////////////////////////////////////////////////////////////
|
|
|
|
|
+AsyncTaskManager::AsyncTaskManagerThread::
|
|
|
|
|
+AsyncTaskManagerThread(AsyncTaskManager *manager) :
|
|
|
|
|
+ Thread(manager->get_name(), manager->get_name()),
|
|
|
|
|
+ _manager(manager),
|
|
|
|
|
+ _servicing(NULL)
|
|
|
|
|
+{
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+////////////////////////////////////////////////////////////////////
|
|
|
|
|
+// Function: AsyncTaskManager::AsyncTaskManagerThread::thread_main
|
|
|
|
|
+// Access: Public, Virtual
|
|
|
|
|
+// Description:
|
|
|
|
|
+////////////////////////////////////////////////////////////////////
|
|
|
|
|
+void AsyncTaskManager::AsyncTaskManagerThread::
|
|
|
|
|
+thread_main() {
|
|
|
|
|
+ MutexHolder holder(_manager->_lock);
|
|
|
|
|
+ while (_manager->_state != AsyncTaskManager::S_shutdown) {
|
|
|
|
|
+ _manager->service_one_task(this);
|
|
|
|
|
+ if (_manager->_active.empty() &&
|
|
|
|
|
+ _manager->_state != AsyncTaskManager::S_shutdown) {
|
|
|
|
|
+ _manager->_cvar.wait();
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+}
|
|
|
|
|
+
|