Browse Source

AsyncFuture improvements, incl. support for gathering futures

rdb 8 years ago
parent
commit
ed5e5386b9

+ 2 - 0
direct/src/task/Task.py

@@ -86,6 +86,8 @@ Task.DtoolClassDict['exit'] = exit
 pause = AsyncTaskPause
 Task.DtoolClassDict['pause'] = staticmethod(pause)
 
+gather = Task.gather
+
 def sequence(*taskList):
     seq = AsyncTaskSequence('sequence')
     for task in taskList:

+ 95 - 4
panda/src/event/asyncFuture.I

@@ -17,7 +17,6 @@
 INLINE AsyncFuture::
 AsyncFuture() :
   _manager(nullptr),
-  _cvar(nullptr),
   _future_state(FS_pending),
   _result(nullptr) {
 }
@@ -28,7 +27,7 @@ AsyncFuture() :
  */
 INLINE bool AsyncFuture::
 done() const {
-  return (FutureState)AtomicAdjust::get(_future_state) != FS_pending;
+  return (FutureState)AtomicAdjust::get(_future_state) >= FS_finished;
 }
 
 /**
@@ -46,6 +45,7 @@ cancelled() const {
  */
 INLINE void AsyncFuture::
 set_done_event(const string &done_event) {
+  nassertv(!done());
   _done_event = done_event;
 }
 
@@ -104,6 +104,97 @@ set_result(TypedReferenceCount *result) {
 }
 
 INLINE void AsyncFuture::
-set_result(TypedWritableReferenceCount *result) {
-  set_result(result, result);
+set_result(const EventParameter &result) {
+  set_result(result.get_ptr(), result.get_ptr());
+}
+
+/**
+ * Creates a new future that returns `done()` when all of the contained
+ * futures are done.
+ *
+ * Calling `cancel()` on the returned future will result in all contained
+ * futures that have not yet finished to be cancelled.
+ */
+INLINE AsyncFuture *AsyncFuture::
+gather(Futures futures) {
+  if (futures.empty()) {
+    AsyncFuture *fut = new AsyncFuture;
+    fut->_future_state = (AtomicAdjust::Integer)FS_finished;
+    return fut;
+  } else if (futures.size() == 1) {
+    return futures[0].p();
+  } else {
+    return (AsyncFuture *)new AsyncGatheringFuture(move(futures));
+  }
+}
+
+/**
+ * Tries to atomically lock the future, assuming it is pending.  Returns false
+ * if it is not in the pending state, implying it's either done or about to be
+ * cancelled.
+ */
+INLINE bool AsyncFuture::
+try_lock_pending() {
+  return set_future_state(FS_locked_pending);
+}
+
+/**
+ * Should be called after try_lock_pending() returns true.
+ */
+INLINE void AsyncFuture::
+unlock(FutureState new_state) {
+  nassertv(new_state != FS_locked_pending);
+  FutureState orig_state = (FutureState)AtomicAdjust::set(_future_state, (AtomicAdjust::Integer)new_state);
+  nassertv(orig_state == FS_locked_pending);
+}
+
+/**
+ * Atomically changes the future state from pending to another state.  Returns
+ * true if successful, false if the future was already done.
+ * Note that once a future is in a "done" state (ie. cancelled or finished) it
+ * can never change state again.
+ */
+INLINE bool AsyncFuture::
+set_future_state(FutureState state) {
+  FutureState orig_state = (FutureState)
+    AtomicAdjust::compare_and_exchange(
+      _future_state,
+      (AtomicAdjust::Integer)FS_pending,
+      (AtomicAdjust::Integer)state);
+
+  while (orig_state == FS_locked_pending) {
+    Thread::force_yield();
+    orig_state = (FutureState)AtomicAdjust::compare_and_exchange(
+      _future_state,
+      (AtomicAdjust::Integer)FS_pending,
+      (AtomicAdjust::Integer)state);
+  }
+
+  return orig_state == FS_pending;
+}
+
+/**
+ * Returns the number of futures that were passed to the constructor.
+ */
+INLINE size_t AsyncGatheringFuture::
+get_num_futures() const {
+  return _futures.size();
+}
+
+/**
+ * Returns the nth future that was passed into the constructor.
+ */
+INLINE AsyncFuture *AsyncGatheringFuture::
+get_future(size_t i) const {
+  nassertr(i < _futures.size(), nullptr);
+  return _futures[i].p();
+}
+
+/**
+ * Returns the result of the nth future that was passed into the constructor.
+ */
+INLINE TypedObject *AsyncGatheringFuture::
+get_result(size_t i) const {
+  nassertr(i < _futures.size(), nullptr);
+  return _futures[i]->get_result();
 }

+ 265 - 84
panda/src/event/asyncFuture.cxx

@@ -20,31 +20,33 @@
 #include "throw_event.h"
 
 TypeHandle AsyncFuture::_type_handle;
+TypeHandle AsyncGatheringFuture::_type_handle;
 
 /**
  * Destroys the future.  Assumes notify_done() has already been called.
  */
 AsyncFuture::
 ~AsyncFuture() {
-  delete _cvar;
-  nassertv(_waiting_tasks.empty());
+  // If this triggers, the future destroyed before it was cancelled, which is
+  // not valid.  Unless we should simply call cancel() here?
+  nassertv(_waiting.empty());
 }
 
 /**
  * Cancels the future.  Returns true if it was cancelled, or false if the
- * future was already done.
+ * future was already done.  Either way, done() will return true after this
+ * call returns.
+ *
  * In the case of a task, this is equivalent to remove().
  */
 bool AsyncFuture::
 cancel() {
-  if (!done()) {
-    if (_manager == nullptr) {
-      _manager = AsyncTaskManager::get_global_ptr();
-    }
-    MutexHolder holder(_manager->_lock);
+  if (set_future_state(FS_cancelled)) {
+    // The compare-swap operation succeeded, so schedule the callbacks.
     notify_done(false);
     return true;
   } else {
+    // It's already done.
     return false;
   }
 }
@@ -55,6 +57,22 @@ cancel() {
 void AsyncFuture::
 output(ostream &out) const {
   out << get_type();
+  FutureState state = (FutureState)AtomicAdjust::get(_future_state);
+  switch (state) {
+  case FS_pending:
+  case FS_locked_pending:
+    out << " (pending)";
+    break;
+  case FS_finished:
+    out << " (finished)";
+    break;
+  case FS_cancelled:
+    out << " (cancelled)";
+    break;
+  default:
+    out << " (**INVALID**)";
+    break;
+  }
 }
 
 /**
@@ -66,25 +84,18 @@ wait() {
     return;
   }
 
-  // If we don't have a manager, use the global one.
-  if (_manager == nullptr) {
-    _manager = AsyncTaskManager::get_global_ptr();
+  PStatTimer timer(AsyncTaskChain::_wait_pcollector);
+  if (task_cat.is_debug()) {
+    task_cat.debug()
+      << "Waiting for future " << *this << "\n";
   }
 
-  MutexHolder holder(_manager->_lock);
-  if (!done()) {
-    if (_cvar == nullptr) {
-      _cvar = new ConditionVarFull(_manager->_lock);
-    }
-    if (task_cat.is_debug()) {
-      task_cat.debug()
-        << "Waiting for future " << *this << "\n";
-    }
-    PStatTimer timer(AsyncTaskChain::_wait_pcollector);
-    while (!done()) {
-      _cvar->wait();
-    }
-  }
+  // Continue to yield while the future isn't done.  It may be more efficient
+  // to use a condition variable, but let's not add the extra complexity
+  // unless we're sure that we need it.
+  do {
+    Thread::force_yield();
+  } while (!done());
 }
 
 /**
@@ -96,46 +107,52 @@ wait(double timeout) {
     return;
   }
 
-  // If we don't have a manager, use the global one.
-  if (_manager == nullptr) {
-    _manager = AsyncTaskManager::get_global_ptr();
+  PStatTimer timer(AsyncTaskChain::_wait_pcollector);
+  if (task_cat.is_debug()) {
+    task_cat.debug()
+      << "Waiting up to " << timeout << " seconds for future " << *this << "\n";
   }
 
-  MutexHolder holder(_manager->_lock);
-  if (!done()) {
-    if (_cvar == nullptr) {
-      _cvar = new ConditionVarFull(_manager->_lock);
-    }
-    if (task_cat.is_debug()) {
-      task_cat.debug()
-        << "Waiting up to " << timeout << " seconds for future " << *this << "\n";
-    }
-    PStatTimer timer(AsyncTaskChain::_wait_pcollector);
-    _cvar->wait(timeout);
-  }
+  // Continue to yield while the future isn't done.  It may be more efficient
+  // to use a condition variable, but let's not add the extra complexity
+  // unless we're sure that we need it.
+  ClockObject *clock = ClockObject::get_global_clock();
+  double end = clock->get_real_time() + timeout;
+  do {
+    Thread::force_yield();
+  } while (!done() && clock->get_real_time() < end);
 }
 
 /**
- * Schedules the done callbacks.  Assumes the manager's lock is held, and that
- * the future is currently in the 'pending' state.
+ * Schedules the done callbacks.  Called after the future has just entered the
+ * 'done' state.
  * @param clean_exit true if finished successfully, false if cancelled.
  */
 void AsyncFuture::
 notify_done(bool clean_exit) {
-  nassertv(_manager != nullptr);
-  nassertv(_manager->_lock.debug_is_locked());
-  nassertv(_future_state == FS_pending);
-
-  pvector<AsyncTask *>::iterator it;
-  for (it = _waiting_tasks.begin(); it != _waiting_tasks.end(); ++it) {
-    AsyncTask *task = *it;
-    nassertd(task->_manager == _manager) continue;
-    task->_state = AsyncTask::S_active;
-    task->_chain->_active.push_back(task);
-    --task->_chain->_num_awaiting_tasks;
-    unref_delete(task);
+  nassertv(done());
+
+  // This will only be called by the thread that managed to set the
+  // _future_state away from the "pending" state, so this is thread safe.
+
+  Futures::iterator it;
+  for (it = _waiting.begin(); it != _waiting.end(); ++it) {
+    AsyncFuture *fut = *it;
+    if (fut->is_task()) {
+      // It's a task.  Make it active again.
+      wake_task((AsyncTask *)fut);
+    } else {
+      // It's a gathering future.  Decrease the pending count on it, and if
+      // we're the last one, call notify_done() on it.
+      AsyncGatheringFuture *gather = (AsyncGatheringFuture *)fut;
+      if (!AtomicAdjust::dec(gather->_num_pending)) {
+        if (gather->set_future_state(FS_finished)) {
+          gather->notify_done(true);
+        }
+      }
+    }
   }
-  _waiting_tasks.clear();
+  _waiting.clear();
 
   // For historical reasons, we don't send the "done event" if the future was
   // cancelled.
@@ -144,17 +161,6 @@ notify_done(bool clean_exit) {
     event->add_parameter(EventParameter(this));
     throw_event(move(event));
   }
-
-  nassertv_always(FS_pending ==
-    (FutureState)AtomicAdjust::compare_and_exchange(
-      _future_state,
-      (AtomicAdjust::Integer)FS_pending,
-      (AtomicAdjust::Integer)(clean_exit ? FS_finished : FS_cancelled)));
-
-  // Finally, notify any threads that may be waiting.
-  if (_cvar != nullptr) {
-    _cvar->notify_all();
-  }
 }
 
 /**
@@ -168,28 +174,203 @@ notify_done(bool clean_exit) {
  */
 void AsyncFuture::
 set_result(TypedObject *ptr, ReferenceCount *ref_ptr) {
-  nassertv(!done());
-  // If we don't have a manager, use the global one.
-  if (_manager == nullptr) {
-    _manager = AsyncTaskManager::get_global_ptr();
-  }
-  MutexHolder holder(_manager->_lock);
-  _result = ptr;
-  _result_ref = ref_ptr;
-  notify_done(true);
+  // We don't strictly need to lock the future since only one thread is
+  // allowed to call set_result(), but we might as well.
+  FutureState orig_state = (FutureState)AtomicAdjust::
+    compare_and_exchange(_future_state, (AtomicAdjust::Integer)FS_pending,
+                                        (AtomicAdjust::Integer)FS_locked_pending);
+
+  while (orig_state == FS_locked_pending) {
+    Thread::force_yield();
+    orig_state = (FutureState)AtomicAdjust::
+      compare_and_exchange(_future_state, (AtomicAdjust::Integer)FS_pending,
+                                          (AtomicAdjust::Integer)FS_locked_pending);
+  }
+
+  if (orig_state == FS_pending) {
+    _result = ptr;
+    _result_ref = ref_ptr;
+    unlock(FS_finished);
+
+    // OK, now our thread owns the _waiting vector et al.
+    notify_done(true);
+
+  } else if (orig_state == FS_cancelled) {
+    // This was originally illegal, but there is a chance that the future was
+    // cancelled while another thread was setting the result.  So, we drop
+    // this, but we can issue a warning.
+    task_cat.warning()
+      << "Ignoring set_result() called on cancelled " << *this << "\n";
+
+  } else {
+    task_cat.error()
+      << "set_result() was called on finished " << *this << "\n";
+  }
 }
 
 /**
  * Indicates that the given task is waiting for this future to complete.  When
- * the future is done, it will reactivate the given task.
- * Assumes the manager's lock is held.
+ * the future is done, it will reactivate the given task.  If this is called
+ * while the future is already done, schedules the task immediately.
+ * Assumes the manager's lock is not held.
+ * @returns true if the future was pending, false if it was already done.
  */
-void AsyncFuture::
+bool AsyncFuture::
 add_waiting_task(AsyncTask *task) {
-  nassertv(!done());
-  nassertv(_manager != nullptr);
-  nassertv(_manager->_lock.debug_is_locked());
-  task->ref();
-  _waiting_tasks.push_back(task);
-  nassertv(task->_manager == _manager);
+  nassertr(task->is_runnable(), false);
+
+  // We have to make sure we're not going to change state while we're in the
+  // process of adding the task.
+  if (try_lock_pending()) {
+    if (_manager == nullptr) {
+      _manager = task->_manager;
+    }
+
+    _waiting.push_back(task);
+
+    // Unlock the state.
+    unlock();
+    nassertr(task->_manager == nullptr || task->_manager == _manager, true);
+    return true;
+  } else {
+    // It's already done.  Wake the task immediately.
+    wake_task(task);
+    return false;
+  }
+}
+
+/**
+ * Reactivates the given task.  Assumes the manager lock is not held.
+ */
+void AsyncFuture::
+wake_task(AsyncTask *task) {
+  cerr << "waking task\n";
+  AsyncTaskManager *manager = task->_manager;
+  if (manager == nullptr) {
+    // If it's an unscheduled task, schedule it on the same manager as the
+    // rest of the waiting tasks.
+    manager = _manager;
+    if (manager == nullptr) {
+      manager = AsyncTaskManager::get_global_ptr();
+    }
+  }
+
+  MutexHolder holder(manager->_lock);
+  switch (task->_state) {
+  case AsyncTask::S_servicing_removed:
+    nassertv(task->_manager == _manager);
+    // Re-adding a self-removed task; this just means clearing the removed
+    // flag.
+    task->_state = AsyncTask::S_servicing;
+    return;
+
+  case AsyncTask::S_inactive:
+    // Schedule it immediately.
+    nassertv(task->_manager == nullptr);
+
+    if (task_cat.is_debug()) {
+      task_cat.debug()
+        << "Adding " << *task << " (woken by future " << *this << ")\n";
+    }
+
+    {
+      manager->_lock.release();
+      task->upon_birth(manager);
+      manager->_lock.acquire();
+      nassertv(task->_manager == nullptr &&
+               task->_state == AsyncTask::S_inactive);
+
+      AsyncTaskChain *chain = manager->do_find_task_chain(task->_chain_name);
+      if (chain == nullptr) {
+        task_cat.warning()
+          << "Creating implicit AsyncTaskChain " << task->_chain_name
+          << " for " << manager->get_type() << " " << manager->get_name() << "\n";
+        chain = manager->do_make_task_chain(task->_chain_name);
+      }
+      chain->do_add(task);
+    }
+    return;
+
+  case AsyncTask::S_awaiting:
+    nassertv(task->_manager == _manager);
+    task->_state = AsyncTask::S_active;
+    task->_chain->_active.push_back(task);
+    --task->_chain->_num_awaiting_tasks;
+    return;
+
+  default:
+    nassertv(false);
+    return;
+  }
+}
+
+/**
+ * @see AsyncFuture::gather
+ */
+AsyncGatheringFuture::
+AsyncGatheringFuture(AsyncFuture::Futures futures) :
+  _futures(move(futures)),
+  _num_pending(0) {
+
+  bool any_pending = false;
+
+  AsyncFuture::Futures::const_iterator it;
+  for (it = _futures.begin(); it != _futures.end(); ++it) {
+    AsyncFuture *fut = *it;
+    // If this returns true, the future is not yet done and we need to
+    // register ourselves with it.  This creates a circular reference, but it
+    // is resolved when the future is completed or cancelled.
+    if (fut->try_lock_pending()) {
+      if (_manager == nullptr) {
+        _manager = fut->_manager;
+      }
+      fut->_waiting.push_back((AsyncFuture *)this);
+      AtomicAdjust::inc(_num_pending);
+      fut->unlock();
+      any_pending = true;
+    }
+  }
+  if (!any_pending) {
+    // Start in the done state if all the futures we were passed are done.
+    // Note that it is only safe to set this member in this manner if indeed
+    // no other future holds a reference to us.
+    _future_state = (AtomicAdjust::Integer)FS_finished;
+  }
+}
+
+/**
+ * Cancels all the futures.  Returns true if any futures were cancelled.
+ * Makes sure that all the futures finish before this one is marked done, in
+ * order to maintain the guarantee that calling result() is safe when done()
+ * returns true.
+ */
+bool AsyncGatheringFuture::
+cancel() {
+  if (!done()) {
+    // Temporarily increase the pending count so that the notify_done()
+    // callbacks won't end up causing it to be set to "finished".
+    AtomicAdjust::inc(_num_pending);
+
+    bool any_cancelled = false;
+    AsyncFuture::Futures::const_iterator it;
+    for (it = _futures.begin(); it != _futures.end(); ++it) {
+      AsyncFuture *fut = *it;
+      if (fut->cancel()) {
+        any_cancelled = true;
+      }
+    }
+
+    // Now change state to "cancelled" and call the notify_done() callbacks.
+    // Don't call notify_done() if another thread has beaten us to it.
+    if (set_future_state(FS_cancelled)) {
+      notify_done(false);
+    }
+
+    // Decreasing the pending count is kind of pointless now, so we do it only
+    // in a debug build.
+    nassertr(!AtomicAdjust::dec(_num_pending), any_cancelled);
+    return any_cancelled;
+  } else {
+    return false;
+  }
 }

+ 85 - 21
panda/src/event/asyncFuture.h

@@ -17,7 +17,7 @@
 #include "pandabase.h"
 #include "typedReferenceCount.h"
 #include "typedWritableReferenceCount.h"
-#include "conditionVar.h"
+#include "eventParameter.h"
 #include "atomicAdjust.h"
 
 class AsyncTaskManager;
@@ -30,21 +30,29 @@ class ConditionVarFull;
  * as well as register callbacks for this future's completion.
  *
  * An AsyncFuture can be awaited from within a coroutine or task.  It keeps
- * track of a list of tasks waiting for this future so that they are
- * automatically reactivated upon this future's completion.
+ * track of tasks waiting for this future and automatically reactivates them
+ * upon this future's completion.
  *
  * A task itself is also a subclass of AsyncFuture.  Other subclasses are
- * possible, although subclassing is not necessary for most purposes.
+ * not generally necessary, except to override the function of `cancel()`.
  *
- * The `done()` method is used to check whether the future has completed and
- * a result is available (whether it is cancelled or finished).
+ * Until the future is done, it is "owned" by the resolver thread, though it's
+ * still legal for other threads to query its state.  When the resolver thread
+ * resolves this future using `set_result()`, or any thread calls `cancel()`,
+ * it instantly enters the "done" state, after which the result becomes a
+ * read-only field that all threads can access.
  *
- * To get the result of the future in C++, use `wait()` and `get_result()`.
- * In Python, the functionality of both of those calls is wrapped into the
- * `result()` method, which waits for the future to complete before either
- * returning the result or throwing an exception if the future was cancelled.
+ * When the future returns true for done(), a thread can use cancelled() to
+ * determine whether the future was cancelled or get_result() to access the
+ * result of the operation.  Not all operations define a meaningful result
+ * value, so some will always return nullptr.
+ *
+ * In Python, the `cancelled()`, `wait()` and `get_result()` methods are
+ * wrapped up into a single `result()` method which waits for the future to
+ * complete before either returning the result or throwing an exception if the
+ * future was cancelled.
  * However, it is preferable to use the `await` keyword when running from a
- * coroutine.
+ * coroutine, which only suspends the current task and not the entire thread.
  *
  * This API aims to mirror and be compatible with Python's Future class.
  */
@@ -58,7 +66,7 @@ PUBLISHED:
 
   INLINE bool done() const;
   INLINE bool cancelled() const;
-  EXTENSION(PyObject *result(double timeout = -1.0) const);
+  EXTENSION(PyObject *result(PyObject *timeout = Py_None) const);
 
   virtual bool cancel();
 
@@ -66,45 +74,63 @@ PUBLISHED:
   INLINE const string &get_done_event() const;
   MAKE_PROPERTY(done_event, get_done_event, set_done_event);
 
+  EXTENSION(PyObject *add_done_callback(PyObject *self, PyObject *fn));
+
+  EXTENSION(static PyObject *gather(PyObject *args));
+
   virtual void output(ostream &out) const;
 
-  void wait();
-  void wait(double timeout);
+  BLOCKING void wait();
+  BLOCKING void wait(double timeout);
 
   INLINE void set_result(nullptr_t);
   INLINE void set_result(TypedObject *result);
   INLINE void set_result(TypedReferenceCount *result);
-  INLINE void set_result(TypedWritableReferenceCount *result);
+  INLINE void set_result(const EventParameter &result);
 
 public:
+  void set_result(TypedObject *ptr, ReferenceCount *ref_ptr);
+
   INLINE TypedObject *get_result() const;
   INLINE void get_result(TypedObject *&ptr, ReferenceCount *&ref_ptr) const;
 
+  typedef pvector<PT(AsyncFuture)> Futures;
+  INLINE static AsyncFuture *gather(Futures futures);
+
+  virtual bool is_task() const {return false;}
+
   void notify_done(bool clean_exit);
+  bool add_waiting_task(AsyncTask *task);
 
 private:
-  void set_result(TypedObject *ptr, ReferenceCount *ref_ptr);
-  void add_waiting_task(AsyncTask *task);
+  void wake_task(AsyncTask *task);
 
 protected:
   enum FutureState {
+    // Pending states
     FS_pending,
+    FS_locked_pending,
+
+    // Done states
     FS_finished,
     FS_cancelled,
   };
+  INLINE bool try_lock_pending();
+  INLINE void unlock(FutureState new_state = FS_pending);
+  INLINE bool set_future_state(FutureState state);
 
   AsyncTaskManager *_manager;
-  ConditionVarFull *_cvar;
   TypedObject *_result;
   PT(ReferenceCount) _result_ref;
   AtomicAdjust::Integer _future_state;
 
   string _done_event;
 
-  // Tasks waiting for this one to complete.  These are reference counted, but
-  // we can't store them in a PointerTo for circular dependency reasons.
-  pvector<AsyncTask *> _waiting_tasks;
+  // Tasks and gathering futures waiting for this one to complete.
+  Futures _waiting;
 
+  friend class AsyncGatheringFuture;
+  friend class AsyncTaskChain;
   friend class PythonTask;
 
 public:
@@ -130,6 +156,44 @@ INLINE ostream &operator << (ostream &out, const AsyncFuture &fut) {
   return out;
 };
 
+/**
+ * Specific future that collects the results of several futures.
+ */
+class EXPCL_PANDA_EVENT AsyncGatheringFuture FINAL : public AsyncFuture {
+private:
+  AsyncGatheringFuture(AsyncFuture::Futures futures);
+
+public:
+  virtual bool cancel() override;
+
+  INLINE size_t get_num_futures() const;
+  INLINE AsyncFuture *get_future(size_t i) const;
+  INLINE TypedObject *get_result(size_t i) const;
+
+private:
+  const Futures _futures;
+  AtomicAdjust::Integer _num_pending;
+
+  friend class AsyncFuture;
+
+public:
+  static TypeHandle get_class_type() {
+    return _type_handle;
+  }
+  static void init_type() {
+    AsyncFuture::init_type();
+    register_type(_type_handle, "AsyncGatheringFuture",
+                  AsyncFuture::get_class_type());
+  }
+  virtual TypeHandle get_type() const {
+    return get_class_type();
+  }
+  virtual TypeHandle force_init_type() {init_type(); return get_class_type();}
+
+private:
+  static TypeHandle _type_handle;
+};
+
 #include "asyncFuture.I"
 
 #endif

+ 152 - 9
panda/src/event/asyncFuture_ext.cxx

@@ -12,12 +12,16 @@
  */
 
 #include "asyncFuture_ext.h"
+#include "asyncTaskSequence.h"
+#include "eventParameter.h"
+#include "paramValue.h"
 #include "pythonTask.h"
 
 #ifdef HAVE_PYTHON
 
 #ifndef CPPPARSER
 extern struct Dtool_PyTypedObject Dtool_AsyncFuture;
+extern struct Dtool_PyTypedObject Dtool_ParamValueBase;
 extern struct Dtool_PyTypedObject Dtool_TypedObject;
 #endif
 
@@ -32,7 +36,45 @@ static PyObject *get_done_result(const AsyncFuture *future) {
       // any PyObject value or raise an exception.
       const PythonTask *task = (const PythonTask *)future;
       return task->get_result();
+
+    } else if (future->is_of_type(AsyncTaskSequence::get_class_type())) {
+      // If it's an AsyncTaskSequence, get the result for each task.
+      const AsyncTaskSequence *task = (const AsyncTaskSequence *)future;
+      Py_ssize_t num_tasks = (Py_ssize_t)task->get_num_tasks();
+      PyObject *results = PyTuple_New(num_tasks);
+
+      for (Py_ssize_t i = 0; i < num_tasks; ++i) {
+        PyObject *result = get_done_result(task->get_task(i));
+        if (result != nullptr) {
+          // This steals a reference.
+          PyTuple_SET_ITEM(results, i, result);
+        } else {
+          Py_DECREF(results);
+          return nullptr;
+        }
+      }
+      return results;
+
+    } else if (future->is_of_type(AsyncGatheringFuture::get_class_type())) {
+      // If it's an AsyncGatheringFuture, get the result for each future.
+      const AsyncGatheringFuture *gather = (const AsyncGatheringFuture *)future;
+      Py_ssize_t num_futures = (Py_ssize_t)gather->get_num_futures();
+      PyObject *results = PyTuple_New(num_futures);
+
+      for (Py_ssize_t i = 0; i < num_futures; ++i) {
+        PyObject *result = get_done_result(gather->get_future((size_t)i));
+        if (result != nullptr) {
+          // This steals a reference.
+          PyTuple_SET_ITEM(results, i, result);
+        } else {
+          Py_DECREF(results);
+          return nullptr;
+        }
+      }
+      return results;
+
     } else {
+      // It's any other future.
       ReferenceCount *ref_ptr;
       TypedObject *ptr;
       future->get_result(ptr, ref_ptr);
@@ -42,13 +84,36 @@ static PyObject *get_done_result(const AsyncFuture *future) {
         return Py_None;
       }
 
+      TypeHandle type = ptr->get_type();
+      if (type.is_derived_from(ParamValueBase::get_class_type())) {
+        // If this is a ParamValueBase, return the 'value' property.
+        // EventStoreInt and Double are not exposed to Python for some reason.
+        if (type == EventStoreInt::get_class_type()) {
+          return Dtool_WrapValue(((EventStoreInt *)ptr)->get_value());
+        } else if (type == EventStoreDouble::get_class_type()) {
+          return Dtool_WrapValue(((EventStoreDouble *)ptr)->get_value());
+        }
+
+        ParamValueBase *value = (ParamValueBase *)ptr;
+        PyObject *wrap = DTool_CreatePyInstanceTyped
+          ((void *)value, Dtool_ParamValueBase, false, false, type.get_index());
+        if (wrap != nullptr) {
+          PyObject *value = PyObject_GetAttrString(wrap, "value");
+          if (value != nullptr) {
+            return value;
+          }
+          PyErr_Restore(nullptr, nullptr, nullptr);
+          Py_DECREF(wrap);
+        }
+      }
+
       if (ref_ptr != nullptr) {
         ref_ptr->ref();
       }
 
       return DTool_CreatePyInstanceTyped
         ((void *)ptr, Dtool_TypedObject, (ref_ptr != nullptr), false,
-         ptr->get_type_index());
+         type.get_index());
     }
   } else {
     // If the future was cancelled, we should raise an exception.
@@ -62,8 +127,8 @@ static PyObject *get_done_result(const AsyncFuture *future) {
       }
       // If we can't get that, we should pretend and make our own.
       if (exc_type == nullptr) {
-        exc_type = PyErr_NewExceptionWithDoc("concurrent.futures._base.CancelledError",
-                                             "The Future was cancelled.",
+        exc_type = PyErr_NewExceptionWithDoc((char*)"concurrent.futures._base.CancelledError",
+                                             (char*)"The Future was cancelled.",
                                              nullptr, nullptr);
       }
     }
@@ -121,7 +186,7 @@ __await__(PyObject *self) {
  * raises TimeoutError.
  */
 PyObject *Extension<AsyncFuture>::
-result(double timeout) const {
+result(PyObject *timeout) const {
   if (!_this->done()) {
     // Not yet done?  Wait until it is done, or until a timeout occurs.  But
     // first check to make sure we're not trying to deadlock the thread.
@@ -136,11 +201,15 @@ result(double timeout) const {
     PyThreadState *_save;
     Py_UNBLOCK_THREADS
 #endif
-    //TODO: check why gcc and clang don't like infinity timeout.
-    if (cinf(timeout) || timeout < 0.0) {
+    if (timeout == Py_None) {
       _this->wait();
     } else {
-      _this->wait(timeout);
+      PyObject *num = PyNumber_Float(timeout);
+      if (num != nullptr) {
+        _this->wait(PyFloat_AS_DOUBLE(num));
+      } else {
+        return Dtool_Raise_ArgTypeError(timeout, 0, "result", "float");
+      }
     }
 #if defined(HAVE_THREADS) && !defined(SIMPLE_THREADS)
     Py_BLOCK_THREADS
@@ -158,8 +227,8 @@ result(double timeout) const {
         }
         // If we can't get that, we should pretend and make our own.
         if (exc_type == nullptr) {
-          exc_type = PyErr_NewExceptionWithDoc("concurrent.futures._base.TimeoutError",
-                                               "The operation exceeded the given deadline.",
+          exc_type = PyErr_NewExceptionWithDoc((char*)"concurrent.futures._base.TimeoutError",
+                                               (char*)"The operation exceeded the given deadline.",
                                                nullptr, nullptr);
         }
       }
@@ -172,4 +241,78 @@ result(double timeout) const {
   return get_done_result(_this);
 }
 
+/**
+ * Schedules the given function to be run as soon as the future is complete.
+ * This is also called if the future is cancelled.
+ * If the future is already done, the callback is scheduled right away.
+ */
+PyObject *Extension<AsyncFuture>::
+add_done_callback(PyObject *self, PyObject *fn) {
+  if (!PyCallable_Check(fn)) {
+    return Dtool_Raise_ArgTypeError(fn, 0, "add_done_callback", "callable");
+  }
+
+  PythonTask *task = new PythonTask(fn);
+  Py_DECREF(task->_args);
+  task->_args = PyTuple_Pack(1, self);
+  task->_append_task = false;
+  task->_ignore_return = true;
+
+  // If this is an AsyncTask, make sure it is scheduled on the same chain.
+  if (_this->is_task()) {
+    AsyncTask *this_task = (AsyncTask *)_this;
+    task->set_task_chain(this_task->get_task_chain());
+  }
+
+  _this->add_waiting_task(task);
+
+  Py_INCREF(Py_None);
+  return Py_None;
+}
+
+/**
+ * Creates a new future that returns `done()` when all of the contained
+ * futures are done.
+ *
+ * Calling `cancel()` on the returned future will result in all contained
+ * futures that have not yet finished to be cancelled.
+ */
+PyObject *Extension<AsyncFuture>::
+gather(PyObject *args) {
+  if (!PyTuple_Check(args)) {
+    return Dtool_Raise_TypeError("args is not a tuple");
+  }
+
+  Py_ssize_t size = Py_SIZE(args);
+  AsyncFuture::Futures futures;
+  futures.reserve(size);
+
+  for (Py_ssize_t i = 0; i < size; ++i) {
+    PyObject *item = PyTuple_GET_ITEM(args, i);
+    if (DtoolInstance_Check(item)) {
+      AsyncFuture *fut = (AsyncFuture *)DtoolInstance_UPCAST(item, Dtool_AsyncFuture);
+      if (fut != nullptr) {
+        futures.push_back(fut);
+        continue;
+      }
+#if PY_VERSION_HEX >= 0x03050000
+    } else if (PyCoro_CheckExact(item)) {
+      // We allow passing in a coroutine instead of a future.  This causes it
+      // to be scheduled as a task.
+      futures.push_back(new PythonTask(item));
+      continue;
+#endif
+    }
+    return Dtool_Raise_ArgTypeError(item, i, "gather", "coroutine, task or future");
+  }
+
+  AsyncFuture *future = AsyncFuture::gather(move(futures));
+  if (future != nullptr) {
+    future->ref();
+    return DTool_CreatePyInstanceTyped((void *)future, Dtool_AsyncFuture, true, false, future->get_type_index());
+  } else {
+    return PyErr_NoMemory();
+  }
+}
+
 #endif

+ 5 - 1
panda/src/event/asyncFuture_ext.h

@@ -29,7 +29,11 @@ public:
   static PyObject *__await__(PyObject *self);
   static PyObject *__iter__(PyObject *self) { return __await__(self); }
 
-  PyObject *result(double timeout = -1.0) const;
+  PyObject *result(PyObject *timeout = Py_None) const;
+
+  PyObject *add_done_callback(PyObject *self, PyObject *fn);
+
+  static PyObject *gather(PyObject *args);
 };
 
 #endif  // HAVE_PYTHON

+ 1 - 0
panda/src/event/asyncTask.h

@@ -104,6 +104,7 @@ protected:
   DoneStatus unlock_and_do_task();
 
   virtual bool cancel() FINAL;
+  virtual bool is_task() const FINAL {return true;}
 
   virtual bool is_runnable();
   virtual DoneStatus do_task();

+ 2 - 1
panda/src/event/asyncTaskChain.cxx

@@ -778,7 +778,8 @@ cleanup_task(AsyncTask *task, bool upon_death, bool clean_exit) {
 
   _manager->remove_task_by_name(task);
 
-  if (upon_death && !task->done()) {
+  if (upon_death && task->set_future_state(clean_exit ? AsyncFuture::FS_finished
+                                                      : AsyncFuture::FS_cancelled)) {
     task->notify_done(clean_exit);
   }
 

+ 1 - 0
panda/src/event/config_event.cxx

@@ -33,6 +33,7 @@ NotifyCategoryDef(task, "");
 
 ConfigureFn(config_event) {
   AsyncFuture::init_type();
+  AsyncGatheringFuture::init_type();
   AsyncTask::init_type();
   AsyncTaskChain::init_type();
   AsyncTaskManager::init_type();

+ 17 - 22
panda/src/event/pythonTask.cxx

@@ -45,6 +45,7 @@ PythonTask(PyObject *func_or_coro, const string &name) :
   _exc_traceback(nullptr),
   _generator(nullptr),
   _future_done(nullptr),
+  _ignore_return(false),
   _retrieved_exception(false) {
 
   nassertv(func_or_coro != nullptr);
@@ -169,9 +170,7 @@ get_args() {
     }
 
     this->ref();
-    PyObject *self =
-      DTool_CreatePyInstanceTyped(this, Dtool_TypedReferenceCount,
-                                  true, false, get_type_index());
+    PyObject *self = DTool_CreatePyInstance(this, Dtool_PythonTask, true, false);
     PyTuple_SET_ITEM(with_task, num_args, self);
     return with_task;
 
@@ -588,20 +587,21 @@ do_python_task() {
       AsyncFuture *fut = (AsyncFuture *)DtoolInstance_UPCAST(result, Dtool_AsyncFuture);
       if (fut != nullptr) {
         // Suspend execution of this task until this other task has completed.
-        AsyncTaskManager *manager = fut->_manager;
-        if (manager == nullptr) {
-          manager = _manager;
-          fut->_manager = manager;
-        }
-        nassertr(manager == _manager, DS_interrupt);
-        MutexHolder holder(manager->_lock);
-        if (fut != (AsyncFuture *)this) {
-          if (!fut->done()) {
+        if (fut != (AsyncFuture *)this && !fut->done()) {
+          if (fut->is_task()) {
+            // This is actually a task, do we need to schedule it with the
+            // manager?  This allows doing something like
+            //   await Task.pause(1.0)
+            // directly instead of having to do:
+            //   await taskMgr.add(Task.pause(1.0))
+            AsyncTask *task = (AsyncTask *)fut;
+            _manager->add(task);
+          }
+          if (fut->add_waiting_task(this)) {
             if (task_cat.is_debug()) {
               task_cat.debug()
                 << *this << " is now awaiting <" << *fut << ">.\n";
             }
-            fut->add_waiting_task(this);
           } else {
             // The task is already done.  Continue at next opportunity.
             if (task_cat.is_debug()) {
@@ -664,7 +664,7 @@ do_python_task() {
     return DS_interrupt;
   }
 
-  if (result == Py_None) {
+  if (result == Py_None || _ignore_return) {
     Py_DECREF(result);
     return DS_done;
   }
@@ -861,15 +861,10 @@ void PythonTask::
 call_function(PyObject *function) {
   if (function != Py_None) {
     this->ref();
-    PyObject *self =
-      DTool_CreatePyInstanceTyped(this, Dtool_TypedReferenceCount,
-                                  true, false, get_type_index());
-    PyObject *args = Py_BuildValue("(O)", self);
-    Py_DECREF(self);
-
-    PyObject *result = PyObject_CallObject(function, args);
+    PyObject *self = DTool_CreatePyInstance(this, Dtool_PythonTask, true, false);
+    PyObject *result = PyObject_CallFunctionObjArgs(function, self, nullptr);
     Py_XDECREF(result);
-    Py_DECREF(args);
+    Py_DECREF(self);
   }
 }
 

+ 2 - 1
panda/src/event/pythonTask.h

@@ -26,7 +26,7 @@
  * This class exists to allow association of a Python function or coroutine
  * with the AsyncTaskManager.
  */
-class PythonTask : public AsyncTask {
+class PythonTask FINAL : public AsyncTask {
 PUBLISHED:
   PythonTask(PyObject *function = Py_None, const string &name = string());
   virtual ~PythonTask();
@@ -123,6 +123,7 @@ private:
   PyObject *_future_done;
 
   bool _append_task;
+  bool _ignore_return;
   bool _registered_to_owner;
   mutable bool _retrieved_exception;
 

+ 99 - 0
tests/event/test_futures.py

@@ -159,6 +159,105 @@ def test_coro_exception():
         task.result()
 
 
+def test_future_gather():
+    fut1 = core.AsyncFuture()
+    fut2 = core.AsyncFuture()
+
+    # 0 and 1 arguments are special
+    assert core.AsyncFuture.gather().done()
+    assert core.AsyncFuture.gather(fut1) == fut1
+
+    # Gathering not-done futures
+    gather = core.AsyncFuture.gather(fut1, fut2)
+    assert not gather.done()
+
+    # One future done
+    fut1.set_result(1)
+    assert not gather.done()
+
+    # Two futures done
+    fut2.set_result(2)
+    assert gather.done()
+
+    assert not gather.cancelled()
+    assert tuple(gather.result()) == (1, 2)
+
+
+def test_future_gather_cancel_inner():
+    fut1 = core.AsyncFuture()
+    fut2 = core.AsyncFuture()
+
+    # Gathering not-done futures
+    gather = core.AsyncFuture.gather(fut1, fut2)
+    assert not gather.done()
+
+    # One future cancelled
+    fut1.cancel()
+    assert not gather.done()
+
+    # Two futures cancelled
+    fut2.set_result(2)
+    assert gather.done()
+
+    assert not gather.cancelled()
+    with pytest.raises(CancelledError):
+        assert gather.result()
+
+
+def test_future_gather_cancel_outer():
+    fut1 = core.AsyncFuture()
+    fut2 = core.AsyncFuture()
+
+    # Gathering not-done futures
+    gather = core.AsyncFuture.gather(fut1, fut2)
+    assert not gather.done()
+
+    assert gather.cancel()
+    assert gather.done()
+    assert gather.cancelled()
+
+    with pytest.raises(CancelledError):
+        assert gather.result()
+
+
+def test_future_done_callback():
+    fut = core.AsyncFuture()
+
+    # Use the list hack since Python 2 doesn't have the "nonlocal" keyword.
+    called = [False]
+    def on_done(arg):
+        assert arg == fut
+        called[0] = True
+
+    fut.add_done_callback(on_done)
+    fut.cancel()
+    assert fut.done()
+
+    task_mgr = core.AsyncTaskManager.get_global_ptr()
+    task_mgr.poll()
+    assert called[0]
+
+
+def test_future_done_callback_already_done():
+    # Same as above, but with the future already done when add_done_callback
+    # is called.
+    fut = core.AsyncFuture()
+    fut.cancel()
+    assert fut.done()
+
+    # Use the list hack since Python 2 doesn't have the "nonlocal" keyword.
+    called = [False]
+    def on_done(arg):
+        assert arg == fut
+        called[0] = True
+
+    fut.add_done_callback(on_done)
+
+    task_mgr = core.AsyncTaskManager.get_global_ptr()
+    task_mgr.poll()
+    assert called[0]
+
+
 def test_event_future():
     queue = core.EventQueue()
     handler = core.EventHandler(queue)