Browse Source

Implement awaitable thread-safe future for async operations

This introduces AsyncFuture as a new base class of AsyncTask.  It's modelled after asyncio's Future class, except that it is thread-safe and you can use result() to block the current thread waiting for the future to finish (of course this is not necessary for use with coroutines).

AsyncFuture should be used for any operation that finishes in the future, to get the benefit of awaitability within coroutines as well as a standard interface for querying status and results of the operation as well as cancelling it.  As such, it's been implemented in various places, including texture.prepare() and win.trigger_copy().

Note that AsyncFuture is intended to be used *once*; it cannot be used more than once.  As an example of how this works, tex.prepare() will return the same future as long as the prepare isn't complete, but when it is done, subsequent calls to tex.prepare() will return a new future.
rdb 8 years ago
parent
commit
2e20a0f16e
70 changed files with 1405 additions and 604 deletions
  1. 3 1
      direct/src/interval/Interval.py
  2. 25 11
      direct/src/showbase/Loader.py
  3. 11 1
      direct/src/showbase/Messenger.py
  4. 3 1
      direct/src/task/Task.py
  5. 2 2
      makepanda/makepanda.py
  6. 7 17
      panda/src/audio/audioLoadRequest.I
  7. 1 2
      panda/src/audio/audioLoadRequest.cxx
  8. 0 5
      panda/src/audio/audioLoadRequest.h
  9. 10 4
      panda/src/display/graphicsOutput.I
  10. 4 2
      panda/src/display/graphicsOutput.cxx
  11. 3 2
      panda/src/display/graphicsOutput.h
  12. 10 9
      panda/src/display/graphicsStateGuardian.cxx
  13. 1 1
      panda/src/display/graphicsStateGuardian.h
  14. 109 0
      panda/src/event/asyncFuture.I
  15. 195 0
      panda/src/event/asyncFuture.cxx
  16. 135 0
      panda/src/event/asyncFuture.h
  17. 175 0
      panda/src/event/asyncFuture_ext.cxx
  18. 8 6
      panda/src/event/asyncFuture_ext.h
  19. 0 9
      panda/src/event/asyncTask.I
  20. 65 16
      panda/src/event/asyncTask.cxx
  21. 9 15
      panda/src/event/asyncTask.h
  22. 16 29
      panda/src/event/asyncTaskChain.cxx
  23. 2 1
      panda/src/event/asyncTaskChain.h
  24. 19 19
      panda/src/event/asyncTaskCollection.cxx
  25. 5 5
      panda/src/event/asyncTaskCollection.h
  26. 1 1
      panda/src/event/asyncTaskManager.I
  27. 6 14
      panda/src/event/asyncTaskManager.cxx
  28. 4 3
      panda/src/event/asyncTaskManager.h
  29. 0 75
      panda/src/event/asyncTask_ext.cxx
  30. 2 0
      panda/src/event/config_event.cxx
  31. 32 0
      panda/src/event/eventHandler.cxx
  32. 7 1
      panda/src/event/eventHandler.h
  33. 0 7
      panda/src/event/eventParameter.I
  34. 2 1
      panda/src/event/eventParameter.h
  35. 1 0
      panda/src/event/p3event_composite1.cxx
  36. 3 8
      panda/src/event/pythonTask.I
  37. 45 69
      panda/src/event/pythonTask.cxx
  38. 9 6
      panda/src/event/pythonTask.h
  39. 4 3
      panda/src/gobj/animateVerticesRequest.I
  40. 0 1
      panda/src/gobj/animateVerticesRequest.cxx
  41. 1 2
      panda/src/gobj/animateVerticesRequest.h
  42. 1 0
      panda/src/gobj/config_gobj.cxx
  43. 147 7
      panda/src/gobj/preparedGraphicsObjects.cxx
  44. 54 2
      panda/src/gobj/preparedGraphicsObjects.h
  45. 3 2
      panda/src/gobj/shader.cxx
  46. 2 1
      panda/src/gobj/shader.h
  47. 3 2
      panda/src/gobj/texture.cxx
  48. 2 1
      panda/src/gobj/texture.h
  49. 4 3
      panda/src/gobj/textureReloadRequest.I
  50. 0 1
      panda/src/gobj/textureReloadRequest.cxx
  51. 2 1
      panda/src/gobj/textureReloadRequest.h
  52. 5 4
      panda/src/pgraph/geomNode.cxx
  53. 1 0
      panda/src/pgraph/loader.I
  54. 8 20
      panda/src/pgraph/modelFlattenRequest.I
  55. 2 2
      panda/src/pgraph/modelFlattenRequest.cxx
  56. 0 5
      panda/src/pgraph/modelFlattenRequest.h
  57. 11 18
      panda/src/pgraph/modelLoadRequest.I
  58. 3 4
      panda/src/pgraph/modelLoadRequest.cxx
  59. 0 5
      panda/src/pgraph/modelLoadRequest.h
  60. 5 13
      panda/src/pgraph/modelSaveRequest.I
  61. 0 2
      panda/src/pgraph/modelSaveRequest.cxx
  62. 0 4
      panda/src/pgraph/modelSaveRequest.h
  63. 0 12
      panda/src/pipeline/asyncTaskBase.I
  64. 0 77
      panda/src/pipeline/asyncTaskBase.cxx
  65. 0 61
      panda/src/pipeline/asyncTaskBase.h
  66. 0 2
      panda/src/pipeline/config_pipeline.cxx
  67. 0 1
      panda/src/pipeline/p3pipeline_composite1.cxx
  68. 1 1
      panda/src/pipeline/thread.I
  69. 4 4
      panda/src/pipeline/thread.h
  70. 212 0
      tests/event/test_futures.py

+ 3 - 1
direct/src/interval/Interval.py

@@ -116,8 +116,9 @@ class Interval(DirectObject):
         return self.currT
 
     def start(self, startT = 0.0, endT = -1.0, playRate = 1.0):
+        """ Starts the interval.  Returns an awaitable. """
         self.setupPlay(startT, endT, playRate, 0)
-        self.__spawnTask()
+        return self.__spawnTask()
 
     def loop(self, startT = 0.0, endT = -1.0, playRate = 1.0):
         self.setupPlay(startT, endT, playRate, 1)
@@ -427,6 +428,7 @@ class Interval(DirectObject):
         task = Task(self.__playTask)
         task.interval = self
         taskMgr.add(task, taskName)
+        return task
 
     def __removeTask(self):
         # Kill old task(s), including those from a similarly-named but

+ 25 - 11
direct/src/showbase/Loader.py

@@ -66,7 +66,9 @@ class Loader(DirectObject):
             return not self.requests
 
         def result(self):
-            assert not self.requests, "Result is not ready."
+            "Returns the results, suspending the thread to wait if necessary."
+            for r in list(self.requests):
+                r.wait()
             if self.gotList:
                 return self.objects
             else:
@@ -132,7 +134,8 @@ class Loader(DirectObject):
     # model loading funcs
     def loadModel(self, modelPath, loaderOptions = None, noCache = None,
                   allowInstance = False, okMissing = None,
-                  callback = None, extraArgs = [], priority = None):
+                  callback = None, extraArgs = [], priority = None,
+                  blocking = None):
         """
         Attempts to load a model or models from one or more relative
         pathnames.  If the input modelPath is a string (a single model
@@ -169,10 +172,10 @@ class Loader(DirectObject):
         If callback is not None, then the model load will be performed
         asynchronously.  In this case, loadModel() will initiate a
         background load and return immediately.  The return value will
-        be an object that may later be passed to
-        loader.cancelRequest() to cancel the asynchronous request.  At
-        some later point, when the requested model(s) have finished
-        loading, the callback function will be invoked with the n
+        be an object that can be used to check the status, cancel the
+        request, or use it in an `await` expression.  Unless callback
+        is the special value True, when the requested model(s) have
+        finished loading, it will be invoked with the n
         loaded models passed as its parameter list.  It is possible
         that the callback will be invoked immediately, even before
         loadModel() returns.  If you use callback, you may also
@@ -224,7 +227,10 @@ class Loader(DirectObject):
             modelList = modelPath
             gotList = True
 
-        if callback is None:
+        if blocking is None:
+            blocking = callback is None
+
+        if blocking:
             # We got no callback, so it's a synchronous load.
 
             result = []
@@ -362,7 +368,8 @@ class Loader(DirectObject):
         ModelPool.releaseModel(modelNode)
 
     def saveModel(self, modelPath, node, loaderOptions = None,
-                  callback = None, extraArgs = [], priority = None):
+                  callback = None, extraArgs = [], priority = None,
+                  blocking = None):
         """ Saves the model (a NodePath or PandaNode) to the indicated
         filename path.  Returns true on success, false on failure.  If
         a callback is used, the model is saved asynchronously, and the
@@ -397,7 +404,10 @@ class Loader(DirectObject):
         # From here on, we deal with a list of (filename, node) pairs.
         modelList = list(zip(modelList, nodeList))
 
-        if callback is None:
+        if blocking is None:
+            blocking = callback is None
+
+        if blocking:
             # We got no callback, so it's a synchronous save.
 
             result = []
@@ -1059,7 +1069,7 @@ class Loader(DirectObject):
             return
 
         cb, i = self._requests[request]
-        if cb.cancelled():
+        if cb.cancelled() or request.cancelled():
             # Shouldn't be here.
             del self._requests[request]
             return
@@ -1068,7 +1078,11 @@ class Loader(DirectObject):
         if not cb.requests:
             del self._requests[request]
 
-        cb.gotObject(i, request.result() or None)
+        result = request.result()
+        if isinstance(result, PandaNode):
+            result = NodePath(result)
+
+        cb.gotObject(i, result)
 
     load_model = loadModel
     unload_model = unloadModel

+ 11 - 1
direct/src/showbase/Messenger.py

@@ -109,6 +109,12 @@ class Messenger:
             if record[0] <= 0:
                 del self._id2object[id]
 
+    def future(self, event):
+        """ Returns a future that is triggered by the given event name.  This
+        will function only once. """
+
+        return eventMgr.eventHandler.get_future(event)
+
     def accept(self, event, object, method, extraArgs=[], persistent=1):
         """ accept(self, string, DirectObject, Function, List, Boolean)
 
@@ -409,10 +415,14 @@ class Messenger:
                 # Release the lock temporarily while we call the method.
                 self.lock.release()
                 try:
-                    method (*(extraArgs + sentArgs))
+                    result = method (*(extraArgs + sentArgs))
                 finally:
                     self.lock.acquire()
 
+                if hasattr(result, 'cr_await'):
+                    # It's a coroutine, so schedule it with the task manager.
+                    taskMgr.add(result)
+
     def clear(self):
         """
         Start fresh with a clear dict

+ 3 - 1
direct/src/task/Task.py

@@ -74,7 +74,9 @@ Task = PythonTask
 # Copy the module-level enums above into the class level.  This funny
 # syntax is necessary because it's a C++-wrapped extension type, not a
 # true Python class.
-Task.DtoolClassDict['done'] = done
+# We can't override 'done', which is already a known method.  We have a
+# special check in PythonTask for when the method is being returned.
+#Task.DtoolClassDict['done'] = done
 Task.DtoolClassDict['cont'] = cont
 Task.DtoolClassDict['again'] = again
 Task.DtoolClassDict['pickup'] = pickup

+ 2 - 2
makepanda/makepanda.py

@@ -3632,7 +3632,7 @@ if (not RUNTIME):
   TargetAdd('p3event_composite2.obj', opts=OPTS, input='p3event_composite2.cxx')
 
   OPTS=['DIR:panda/src/event', 'PYTHON']
-  TargetAdd('p3event_asyncTask_ext.obj', opts=OPTS, input='asyncTask_ext.cxx')
+  TargetAdd('p3event_asyncFuture_ext.obj', opts=OPTS, input='asyncFuture_ext.cxx')
   TargetAdd('p3event_pythonTask.obj', opts=OPTS, input='pythonTask.cxx')
   IGATEFILES=GetDirectoryContents('panda/src/event', ["*.h", "*_composite*.cxx"])
   TargetAdd('libp3event.in', opts=OPTS, input=IGATEFILES)
@@ -4226,7 +4226,7 @@ if (not RUNTIME):
   TargetAdd('core.pyd', input='p3pipeline_pythonThread.obj')
   TargetAdd('core.pyd', input='p3putil_ext_composite.obj')
   TargetAdd('core.pyd', input='p3pnmimage_pfmFile_ext.obj')
-  TargetAdd('core.pyd', input='p3event_asyncTask_ext.obj')
+  TargetAdd('core.pyd', input='p3event_asyncFuture_ext.obj')
   TargetAdd('core.pyd', input='p3event_pythonTask.obj')
   TargetAdd('core.pyd', input='p3gobj_ext_composite.obj')
   TargetAdd('core.pyd', input='p3pgraph_ext_composite.obj')

+ 7 - 17
panda/src/audio/audioLoadRequest.I

@@ -20,8 +20,7 @@ AudioLoadRequest(AudioManager *audio_manager, const string &filename,
                  bool positional) :
   _audio_manager(audio_manager),
   _filename(filename),
-  _positional(positional),
-  _is_ready(false)
+  _positional(positional)
 {
 }
 
@@ -55,31 +54,22 @@ get_positional() const {
  * Returns true if this request has completed, false if it is still pending.
  * When this returns true, you may retrieve the sound loaded by calling
  * get_sound().
+ * Equivalent to `req.done() and not req.cancelled()`.
+ * @see done()
  */
 INLINE bool AudioLoadRequest::
 is_ready() const {
-  return _is_ready;
+  return (FutureState)AtomicAdjust::get(_future_state) == FS_finished;
 }
 
 /**
  * Returns the sound that was loaded asynchronously, if any, or nullptr if
- * there was an error.  It is an error to call this unless is_ready() returns
+ * there was an error.  It is an error to call this unless done() returns
  * true.
  * @deprecated Use result() instead.
  */
 INLINE AudioSound *AudioLoadRequest::
 get_sound() const {
-  nassertr(_is_ready, NULL);
-  return _sound;
-}
-
-/**
- * Returns the sound that was loaded asynchronously, if any, or nullptr if
- * there was an error.  It is an error to call this unless is_ready() returns
- * true.
- */
-INLINE AudioSound *AudioLoadRequest::
-result() const {
-  nassertr(_is_ready, nullptr);
-  return _sound;
+  nassertr_always(done(), nullptr);
+  return (AudioSound *)_result;
 }

+ 1 - 2
panda/src/audio/audioLoadRequest.cxx

@@ -21,8 +21,7 @@ TypeHandle AudioLoadRequest::_type_handle;
  */
 AsyncTask::DoneStatus AudioLoadRequest::
 do_task() {
-  _sound = _audio_manager->get_sound(_filename, _positional);
-  _is_ready = true;
+  set_result(_audio_manager->get_sound(_filename, _positional));
 
   // Don't continue the task; we're done.
   return DS_done;

+ 0 - 5
panda/src/audio/audioLoadRequest.h

@@ -43,8 +43,6 @@ PUBLISHED:
   INLINE bool is_ready() const;
   INLINE AudioSound *get_sound() const;
 
-  INLINE AudioSound *result() const;
-
 protected:
   virtual DoneStatus do_task();
 
@@ -53,9 +51,6 @@ private:
   string _filename;
   bool _positional;
 
-  bool _is_ready;
-  PT(AudioSound) _sound;
-
 public:
   static TypeHandle get_class_type() {
     return _type_handle;

+ 10 - 4
panda/src/display/graphicsOutput.I

@@ -489,10 +489,16 @@ get_child_sort() const {
 /**
  * When the GraphicsOutput is in triggered copy mode, this function triggers
  * the copy (at the end of the next frame).
- */
-INLINE void GraphicsOutput::
-trigger_copy()  {
-  _trigger_copy = true;
+ * @returns a future that can be awaited.
+ */
+INLINE AsyncFuture *GraphicsOutput::
+trigger_copy() {
+  AsyncFuture *future = _trigger_copy;
+  if (future == nullptr) {
+    future = new AsyncFuture;
+    _trigger_copy = future;
+  }
+  return future;
 }
 
 /**

+ 4 - 2
panda/src/display/graphicsOutput.cxx

@@ -115,7 +115,6 @@ GraphicsOutput(GraphicsEngine *engine, GraphicsPipe *pipe,
   _sbs_left_dimensions.set(0.0f, 1.0f, 0.0f, 1.0f);
   _sbs_right_dimensions.set(0.0f, 1.0f, 0.0f, 1.0f);
   _delete_flag = false;
-  _trigger_copy = false;
 
   if (_fb_properties.is_single_buffered()) {
     _draw_buffer_type = RenderBuffer::T_front;
@@ -1435,7 +1434,10 @@ copy_to_textures() {
       }
     }
   }
-  _trigger_copy = false;
+  if (_trigger_copy != nullptr) {
+    _trigger_copy->set_result(nullptr);
+    _trigger_copy = nullptr;
+  }
 
   return okflag;
 }

+ 3 - 2
panda/src/display/graphicsOutput.h

@@ -40,6 +40,7 @@
 #include "cycleDataWriter.h"
 #include "pipelineCycler.h"
 #include "updateSeq.h"
+#include "asyncFuture.h"
 
 class PNMImage;
 class GraphicsEngine;
@@ -197,7 +198,7 @@ PUBLISHED:
   INLINE int get_child_sort() const;
   MAKE_PROPERTY(child_sort, get_child_sort, set_child_sort);
 
-  INLINE void trigger_copy();
+  INLINE AsyncFuture *trigger_copy();
 
   INLINE DisplayRegion *make_display_region();
   INLINE DisplayRegion *make_display_region(PN_stdfloat l, PN_stdfloat r, PN_stdfloat b, PN_stdfloat t);
@@ -330,7 +331,7 @@ protected:
   int _target_tex_view;
   DisplayRegion *_prev_page_dr;
   PT(GeomNode) _texture_card;
-  bool _trigger_copy;
+  PT(AsyncFuture) _trigger_copy;
 
   class RenderTexture {
   public:

+ 10 - 9
panda/src/display/graphicsStateGuardian.cxx

@@ -3184,15 +3184,15 @@ get_untextured_state() {
  * Should be called when a texture is encountered that needs to have its RAM
  * image reloaded, and get_incomplete_render() is true.  This will fire off a
  * thread on the current Loader object that will request the texture to load
- * its image.  The image will be available at some point in the future (no
- * event will be generated).
+ * its image.  The image will be available at some point in the future.
+ * @returns a future object that can be used to check its status.
  */
-void GraphicsStateGuardian::
+AsyncFuture *GraphicsStateGuardian::
 async_reload_texture(TextureContext *tc) {
-  nassertv(_loader != (Loader *)NULL);
+  nassertr(_loader != nullptr, nullptr);
 
   int priority = 0;
-  if (_current_display_region != (DisplayRegion *)NULL) {
+  if (_current_display_region != nullptr) {
     priority = _current_display_region->get_texture_reload_priority();
   }
 
@@ -3201,15 +3201,15 @@ async_reload_texture(TextureContext *tc) {
 
   // See if we are already loading this task.
   AsyncTaskCollection orig_tasks = task_mgr->find_tasks(task_name);
-  int num_tasks = orig_tasks.get_num_tasks();
-  for (int ti = 0; ti < num_tasks; ++ti) {
+  size_t num_tasks = orig_tasks.get_num_tasks();
+  for (size_t ti = 0; ti < num_tasks; ++ti) {
     AsyncTask *task = orig_tasks.get_task(ti);
     if (task->is_exact_type(TextureReloadRequest::get_class_type()) &&
-        DCAST(TextureReloadRequest, task)->get_texture() == tc->get_texture()) {
+        ((TextureReloadRequest *)task)->get_texture() == tc->get_texture()) {
       // This texture is already queued to be reloaded.  Don't queue it again,
       // just make sure the priority is updated, and return.
       task->set_priority(max(task->get_priority(), priority));
-      return;
+      return (AsyncFuture *)task;
     }
   }
 
@@ -3220,6 +3220,7 @@ async_reload_texture(TextureContext *tc) {
                              _supports_compressed_texture);
   request->set_priority(priority);
   _loader->load_async(request);
+  return (AsyncFuture *)request.p();
 }
 
 /**

+ 1 - 1
panda/src/display/graphicsStateGuardian.h

@@ -460,7 +460,7 @@ protected:
   static CPT(RenderState) get_unclipped_state();
   static CPT(RenderState) get_untextured_state();
 
-  void async_reload_texture(TextureContext *tc);
+  AsyncFuture *async_reload_texture(TextureContext *tc);
 
 protected:
   PT(SceneSetup) _scene_null;

+ 109 - 0
panda/src/event/asyncFuture.I

@@ -0,0 +1,109 @@
+/**
+ * PANDA 3D SOFTWARE
+ * Copyright (c) Carnegie Mellon University.  All rights reserved.
+ *
+ * All use of this software is subject to the terms of the revised BSD
+ * license.  You should have received a copy of this license along
+ * with this source code in a file named "LICENSE."
+ *
+ * @file asyncFuture.I
+ * @author rdb
+ * @date 2017-11-28
+ */
+
+/**
+ * Initializes the future in the pending state.
+ */
+INLINE AsyncFuture::
+AsyncFuture() :
+  _manager(nullptr),
+  _cvar(nullptr),
+  _future_state(FS_pending),
+  _result(nullptr) {
+}
+
+/**
+ * Returns true if the future is done or has been cancelled.  It is always
+ * safe to call this.
+ */
+INLINE bool AsyncFuture::
+done() const {
+  return (FutureState)AtomicAdjust::get(_future_state) != FS_pending;
+}
+
+/**
+ * Returns true if the future was cancelled.  It is always safe to call this.
+ */
+INLINE bool AsyncFuture::
+cancelled() const {
+  return (FutureState)AtomicAdjust::get(_future_state) == FS_cancelled;
+}
+
+/**
+ * Sets the event name that will be triggered when the future finishes.  Will
+ * not be triggered if the future is cancelled, but it will be triggered for
+ * a coroutine task that exits with an exception.
+ */
+INLINE void AsyncFuture::
+set_done_event(const string &done_event) {
+  _done_event = done_event;
+}
+
+/**
+ * Returns the event name that will be triggered when the future finishes.
+ * See set_done_event().
+ */
+INLINE const string &AsyncFuture::
+get_done_event() const {
+  return _done_event;
+}
+
+/**
+ * Returns this future's result.  Can only be called if done() returns true.
+ */
+INLINE TypedObject *AsyncFuture::
+get_result() const {
+  // This is thread safe, since _result may no longer be modified after the
+  // state is changed to "done".
+  nassertr_always(done(), nullptr);
+  return _result;
+}
+
+/**
+ * Returns this future's result as a pair of TypedObject, ReferenceCount
+ * pointers.  Can only be called if done() returns true.
+ */
+INLINE void AsyncFuture::
+get_result(TypedObject *&ptr, ReferenceCount *&ref_ptr) const {
+  // This is thread safe, since _result may no longer be modified after the
+  // state is changed to "done".
+  nassertd(done()) {
+    ptr = nullptr;
+    ref_ptr = nullptr;
+  }
+  ptr = _result;
+  ref_ptr = _result_ref.p();
+}
+
+/**
+ * Sets this future's result.  Can only be called if done() returns false.
+ */
+INLINE void AsyncFuture::
+set_result(nullptr_t) {
+  set_result(nullptr, nullptr);
+}
+
+INLINE void AsyncFuture::
+set_result(TypedObject *result) {
+  set_result(result, nullptr);
+}
+
+INLINE void AsyncFuture::
+set_result(TypedReferenceCount *result) {
+  set_result(result, result);
+}
+
+INLINE void AsyncFuture::
+set_result(TypedWritableReferenceCount *result) {
+  set_result(result, result);
+}

+ 195 - 0
panda/src/event/asyncFuture.cxx

@@ -0,0 +1,195 @@
+/**
+ * PANDA 3D SOFTWARE
+ * Copyright (c) Carnegie Mellon University.  All rights reserved.
+ *
+ * All use of this software is subject to the terms of the revised BSD
+ * license.  You should have received a copy of this license along
+ * with this source code in a file named "LICENSE."
+ *
+ * @file asyncFuture.cxx
+ * @author rdb
+ * @date 2017-11-28
+ */
+
+#include "asyncFuture.h"
+#include "asyncTask.h"
+#include "asyncTaskManager.h"
+#include "conditionVarFull.h"
+#include "config_event.h"
+#include "pStatTimer.h"
+#include "throw_event.h"
+
+TypeHandle AsyncFuture::_type_handle;
+
+/**
+ * Destroys the future.  Assumes notify_done() has already been called.
+ */
+AsyncFuture::
+~AsyncFuture() {
+  delete _cvar;
+  nassertv(_waiting_tasks.empty());
+}
+
+/**
+ * Cancels the future.  Returns true if it was cancelled, or false if the
+ * future was already done.
+ * In the case of a task, this is equivalent to remove().
+ */
+bool AsyncFuture::
+cancel() {
+  if (!done()) {
+    if (_manager == nullptr) {
+      _manager = AsyncTaskManager::get_global_ptr();
+    }
+    MutexHolder holder(_manager->_lock);
+    notify_done(false);
+    return true;
+  } else {
+    return false;
+  }
+}
+
+/**
+ *
+ */
+void AsyncFuture::
+output(ostream &out) const {
+  out << get_type();
+}
+
+/**
+ * Waits until the future is done.
+ */
+void AsyncFuture::
+wait() {
+  if (done()) {
+    return;
+  }
+
+  // If we don't have a manager, use the global one.
+  if (_manager == nullptr) {
+    _manager = AsyncTaskManager::get_global_ptr();
+  }
+
+  MutexHolder holder(_manager->_lock);
+  if (!done()) {
+    if (_cvar == nullptr) {
+      _cvar = new ConditionVarFull(_manager->_lock);
+    }
+    if (task_cat.is_debug()) {
+      task_cat.debug()
+        << "Waiting for future " << *this << "\n";
+    }
+    PStatTimer timer(AsyncTaskChain::_wait_pcollector);
+    while (!done()) {
+      _cvar->wait();
+    }
+  }
+}
+
+/**
+ * Waits until the future is done, or until the timeout is reached.
+ */
+void AsyncFuture::
+wait(double timeout) {
+  if (done()) {
+    return;
+  }
+
+  // If we don't have a manager, use the global one.
+  if (_manager == nullptr) {
+    _manager = AsyncTaskManager::get_global_ptr();
+  }
+
+  MutexHolder holder(_manager->_lock);
+  if (!done()) {
+    if (_cvar == nullptr) {
+      _cvar = new ConditionVarFull(_manager->_lock);
+    }
+    if (task_cat.is_debug()) {
+      task_cat.debug()
+        << "Waiting up to " << timeout << " seconds for future " << *this << "\n";
+    }
+    PStatTimer timer(AsyncTaskChain::_wait_pcollector);
+    _cvar->wait(timeout);
+  }
+}
+
+/**
+ * Schedules the done callbacks.  Assumes the manager's lock is held, and that
+ * the future is currently in the 'pending' state.
+ * @param clean_exit true if finished successfully, false if cancelled.
+ */
+void AsyncFuture::
+notify_done(bool clean_exit) {
+  nassertv(_manager != nullptr);
+  nassertv(_manager->_lock.debug_is_locked());
+  nassertv(_future_state == FS_pending);
+
+  pvector<AsyncTask *>::iterator it;
+  for (it = _waiting_tasks.begin(); it != _waiting_tasks.end(); ++it) {
+    AsyncTask *task = *it;
+    nassertd(task->_manager == _manager) continue;
+    task->_state = AsyncTask::S_active;
+    task->_chain->_active.push_back(task);
+    --task->_chain->_num_awaiting_tasks;
+    unref_delete(task);
+  }
+  _waiting_tasks.clear();
+
+  // For historical reasons, we don't send the "done event" if the future was
+  // cancelled.
+  if (clean_exit && !_done_event.empty()) {
+    PT_Event event = new Event(_done_event);
+    event->add_parameter(EventParameter(this));
+    throw_event(move(event));
+  }
+
+  nassertv_always(FS_pending ==
+    (FutureState)AtomicAdjust::compare_and_exchange(
+      _future_state,
+      (AtomicAdjust::Integer)FS_pending,
+      (AtomicAdjust::Integer)(clean_exit ? FS_finished : FS_cancelled)));
+
+  // Finally, notify any threads that may be waiting.
+  if (_cvar != nullptr) {
+    _cvar->notify_all();
+  }
+}
+
+/**
+ * Sets this future's result.  Can only be done while the future is not done.
+ * Calling this marks the future as done and schedules the done callbacks.
+ *
+ * This variant takes two pointers; the second one is only set if this object
+ * inherits from ReferenceCount, so that a reference can be held.
+ *
+ * Assumes the manager's lock is *not* held.
+ */
+void AsyncFuture::
+set_result(TypedObject *ptr, ReferenceCount *ref_ptr) {
+  nassertv(!done());
+  // If we don't have a manager, use the global one.
+  if (_manager == nullptr) {
+    _manager = AsyncTaskManager::get_global_ptr();
+  }
+  MutexHolder holder(_manager->_lock);
+  _result = ptr;
+  _result_ref = ref_ptr;
+  notify_done(true);
+}
+
+/**
+ * Indicates that the given task is waiting for this future to complete.  When
+ * the future is done, it will reactivate the given task.
+ * Assumes the manager's lock is held.
+ */
+void AsyncFuture::
+add_waiting_task(AsyncTask *task) {
+  nassertv(!done());
+  nassertv(_manager != nullptr);
+  nassertv(_manager->_lock.debug_is_locked());
+  task->ref();
+  _waiting_tasks.push_back(task);
+  nassertv(task->_manager == _manager);
+}

+ 135 - 0
panda/src/event/asyncFuture.h

@@ -0,0 +1,135 @@
+/**
+ * PANDA 3D SOFTWARE
+ * Copyright (c) Carnegie Mellon University.  All rights reserved.
+ *
+ * All use of this software is subject to the terms of the revised BSD
+ * license.  You should have received a copy of this license along
+ * with this source code in a file named "LICENSE."
+ *
+ * @file asyncFuture.h
+ * @author rdb
+ * @date 2017-11-28
+ */
+
+#ifndef ASYNCFUTURE_H
+#define ASYNCFUTURE_H
+
+#include "pandabase.h"
+#include "typedReferenceCount.h"
+#include "typedWritableReferenceCount.h"
+#include "conditionVar.h"
+#include "atomicAdjust.h"
+
+class AsyncTaskManager;
+class AsyncTask;
+class ConditionVarFull;
+
+/**
+ * This class represents a thread-safe handle to a promised future result of
+ * an asynchronous operation, providing methods to query its status and result
+ * as well as register callbacks for this future's completion.
+ *
+ * An AsyncFuture can be awaited from within a coroutine or task.  It keeps
+ * track of a list of tasks waiting for this future so that they are
+ * automatically reactivated upon this future's completion.
+ *
+ * A task itself is also a subclass of AsyncFuture.  Other subclasses are
+ * possible, although subclassing is not necessary for most purposes.
+ *
+ * The `done()` method is used to check whether the future has completed and
+ * a result is available (whether it is cancelled or finished).
+ *
+ * To get the result of the future in C++, use `wait()` and `get_result()`.
+ * In Python, the functionality of both of those calls is wrapped into the
+ * `result()` method, which waits for the future to complete before either
+ * returning the result or throwing an exception if the future was cancelled.
+ * However, it is preferable to use the `await` keyword when running from a
+ * coroutine.
+ *
+ * This API aims to mirror and be compatible with Python's Future class.
+ */
+class EXPCL_PANDA_EVENT AsyncFuture : public TypedReferenceCount {
+PUBLISHED:
+  INLINE AsyncFuture();
+  virtual ~AsyncFuture();
+
+  EXTENSION(static PyObject *__await__(PyObject *self));
+  EXTENSION(static PyObject *__iter__(PyObject *self));
+
+  INLINE bool done() const;
+  INLINE bool cancelled() const;
+  EXTENSION(PyObject *result(double timeout = -1.0) const);
+
+  virtual bool cancel();
+
+  INLINE void set_done_event(const string &done_event);
+  INLINE const string &get_done_event() const;
+  MAKE_PROPERTY(done_event, get_done_event, set_done_event);
+
+  virtual void output(ostream &out) const;
+
+  void wait();
+  void wait(double timeout);
+
+  INLINE void set_result(nullptr_t);
+  INLINE void set_result(TypedObject *result);
+  INLINE void set_result(TypedReferenceCount *result);
+  INLINE void set_result(TypedWritableReferenceCount *result);
+
+public:
+  INLINE TypedObject *get_result() const;
+  INLINE void get_result(TypedObject *&ptr, ReferenceCount *&ref_ptr) const;
+
+  void notify_done(bool clean_exit);
+
+private:
+  void set_result(TypedObject *ptr, ReferenceCount *ref_ptr);
+  void add_waiting_task(AsyncTask *task);
+
+protected:
+  enum FutureState {
+    FS_pending,
+    FS_finished,
+    FS_cancelled,
+  };
+
+  AsyncTaskManager *_manager;
+  ConditionVarFull *_cvar;
+  TypedObject *_result;
+  PT(ReferenceCount) _result_ref;
+  AtomicAdjust::Integer _future_state;
+
+  string _done_event;
+
+  // Tasks waiting for this one to complete.  These are reference counted, but
+  // we can't store them in a PointerTo for circular dependency reasons.
+  pvector<AsyncTask *> _waiting_tasks;
+
+  friend class PythonTask;
+
+public:
+  static TypeHandle get_class_type() {
+    return _type_handle;
+  }
+  static void init_type() {
+    TypedReferenceCount::init_type();
+    register_type(_type_handle, "AsyncFuture",
+                  TypedReferenceCount::get_class_type());
+  }
+  virtual TypeHandle get_type() const {
+    return get_class_type();
+  }
+  virtual TypeHandle force_init_type() {init_type(); return get_class_type();}
+
+private:
+  static TypeHandle _type_handle;
+};
+
+INLINE ostream &operator << (ostream &out, const AsyncFuture &fut) {
+  fut.output(out);
+  return out;
+};
+
+#include "asyncFuture.I"
+
+#endif

+ 175 - 0
panda/src/event/asyncFuture_ext.cxx

@@ -0,0 +1,175 @@
+/**
+ * PANDA 3D SOFTWARE
+ * Copyright (c) Carnegie Mellon University.  All rights reserved.
+ *
+ * All use of this software is subject to the terms of the revised BSD
+ * license.  You should have received a copy of this license along
+ * with this source code in a file named "LICENSE."
+ *
+ * @file asyncFuture_ext.h
+ * @author rdb
+ * @date 2017-10-29
+ */
+
+#include "asyncFuture_ext.h"
+#include "pythonTask.h"
+
+#ifdef HAVE_PYTHON
+
+#ifndef CPPPARSER
+extern struct Dtool_PyTypedObject Dtool_AsyncFuture;
+extern struct Dtool_PyTypedObject Dtool_TypedObject;
+#endif
+
+/**
+ * Get the result of a future, which may be a PythonTask.  Assumes that the
+ * future is already done.
+ */
+static PyObject *get_done_result(const AsyncFuture *future) {
+  if (!future->cancelled()) {
+    if (future->is_of_type(PythonTask::get_class_type())) {
+      // If it's a PythonTask, defer to its get_result(), since it may store
+      // any PyObject value or raise an exception.
+      const PythonTask *task = (const PythonTask *)future;
+      return task->get_result();
+    } else {
+      ReferenceCount *ref_ptr;
+      TypedObject *ptr;
+      future->get_result(ptr, ref_ptr);
+
+      if (ptr == nullptr) {
+        Py_INCREF(Py_None);
+        return Py_None;
+      }
+
+      if (ref_ptr != nullptr) {
+        ref_ptr->ref();
+      }
+
+      return DTool_CreatePyInstanceTyped
+        ((void *)ptr, Dtool_TypedObject, (ref_ptr != nullptr), false,
+         ptr->get_type_index());
+    }
+  } else {
+    // If the future was cancelled, we should raise an exception.
+    static PyObject *exc_type = nullptr;
+    if (exc_type == nullptr) {
+      // Get the CancelledError that asyncio uses, too.
+      PyObject *module = PyImport_ImportModule("concurrent.futures._base");
+      if (module != nullptr) {
+        exc_type = PyObject_GetAttrString(module, "CancelledError");
+        Py_DECREF(module);
+      }
+      // If we can't get that, we should pretend and make our own.
+      if (exc_type == nullptr) {
+        exc_type = PyErr_NewExceptionWithDoc("concurrent.futures._base.CancelledError",
+                                             "The Future was cancelled.",
+                                             nullptr, nullptr);
+      }
+    }
+    Py_INCREF(exc_type);
+    PyErr_Restore(exc_type, nullptr, nullptr);
+    return nullptr;
+  }
+}
+
+/**
+ * Yields continuously until the task has finished.
+ */
+static PyObject *gen_next(PyObject *self) {
+  const AsyncFuture *future = nullptr;
+  if (!Dtool_Call_ExtractThisPointer(self, Dtool_AsyncFuture, (void **)&future)) {
+    return nullptr;
+  }
+
+  if (!future->done()) {
+    // Continue awaiting the result.
+    Py_INCREF(self);
+    return self;
+  } else {
+    PyObject *result = get_done_result(future);
+    if (result != nullptr) {
+      Py_INCREF(PyExc_StopIteration);
+      PyErr_Restore(PyExc_StopIteration, result, nullptr);
+    }
+    return nullptr;
+  }
+}
+
+/**
+ * Returns a generator that continuously yields an awaitable until the task
+ * has finished.  This allows syntax like `model = await loader.load...` to be
+ * used in a Python coroutine.
+ */
+PyObject *Extension<AsyncFuture>::
+__await__(PyObject *self) {
+  Dtool_GeneratorWrapper *gen;
+  gen = (Dtool_GeneratorWrapper *)PyType_GenericAlloc(&Dtool_GeneratorWrapper_Type, 0);
+  if (gen != nullptr) {
+    Py_INCREF(self);
+    gen->_base._self = self;
+    gen->_iternext_func = &gen_next;
+  }
+  return (PyObject *)gen;
+}
+
+/**
+ * Returns the result of this future, unless it was cancelled, in which case
+ * it returns CancelledError.
+ * If the future is not yet done, waits until the result is available.  If a
+ * timeout is passed and the future is not done within the given timeout,
+ * raises TimeoutError.
+ */
+PyObject *Extension<AsyncFuture>::
+result(double timeout) const {
+  if (!_this->done()) {
+    // Not yet done?  Wait until it is done, or until a timeout occurs.  But
+    // first check to make sure we're not trying to deadlock the thread.
+    Thread *current_thread = Thread::get_current_thread();
+    if (_this == (const AsyncFuture *)current_thread->get_current_task()) {
+      PyErr_SetString(PyExc_RuntimeError, "cannot call task.result() from within the task");
+      return nullptr;
+    }
+
+    // Release the GIL for the duration.
+#if defined(HAVE_THREADS) && !defined(SIMPLE_THREADS)
+    PyThreadState *_save;
+    Py_UNBLOCK_THREADS
+#endif
+    //TODO: check why gcc and clang don't like infinity timeout.
+    if (cinf(timeout) || timeout < 0.0) {
+      _this->wait();
+    } else {
+      _this->wait(timeout);
+    }
+#if defined(HAVE_THREADS) && !defined(SIMPLE_THREADS)
+    Py_BLOCK_THREADS
+#endif
+
+    if (!_this->done()) {
+      // It timed out.  Raise an exception.
+      static PyObject *exc_type = nullptr;
+      if (exc_type == nullptr) {
+        // Get the TimeoutError that asyncio uses, too.
+        PyObject *module = PyImport_ImportModule("concurrent.futures._base");
+        if (module != nullptr) {
+          exc_type = PyObject_GetAttrString(module, "TimeoutError");
+          Py_DECREF(module);
+        }
+        // If we can't get that, we should pretend and make our own.
+        if (exc_type == nullptr) {
+          exc_type = PyErr_NewExceptionWithDoc("concurrent.futures._base.TimeoutError",
+                                               "The operation exceeded the given deadline.",
+                                               nullptr, nullptr);
+        }
+      }
+      Py_INCREF(exc_type);
+      PyErr_Restore(exc_type, nullptr, nullptr);
+      return nullptr;
+    }
+  }
+
+  return get_done_result(_this);
+}
+
+#endif

+ 8 - 6
panda/src/event/asyncTask_ext.h → panda/src/event/asyncFuture_ext.h

@@ -6,13 +6,13 @@
  * license.  You should have received a copy of this license along
  * with this source code in a file named "LICENSE."
  *
- * @file asyncTask_ext.h
+ * @file asyncFuture_ext.h
  * @author rdb
  * @date 2017-10-29
  */
 
-#ifndef ASYNCTASK_EXT_H
-#define ASYNCTASK_EXT_H
+#ifndef ASYNCFUTURE_EXT_H
+#define ASYNCFUTURE_EXT_H
 
 #include "extension.h"
 #include "py_panda.h"
@@ -21,15 +21,17 @@
 #ifdef HAVE_PYTHON
 
 /**
- * Extension class for AsyncTask
+ * Extension class for AsyncFuture
  */
 template<>
-class Extension<AsyncTask> : public ExtensionBase<AsyncTask> {
+class Extension<AsyncFuture> : public ExtensionBase<AsyncFuture> {
 public:
   static PyObject *__await__(PyObject *self);
   static PyObject *__iter__(PyObject *self) { return __await__(self); }
+
+  PyObject *result(double timeout = -1.0) const;
 };
 
 #endif  // HAVE_PYTHON
 
-#endif  // ASYNCTASK_EXT_H
+#endif  // ASYNCFUTURE_EXT_H

+ 0 - 9
panda/src/event/asyncTask.I

@@ -181,15 +181,6 @@ set_done_event(const string &done_event) {
   _done_event = done_event;
 }
 
-/**
- * Returns the event name that will be triggered when the task finishes.  See
- * set_done_event().
- */
-INLINE const string &AsyncTask::
-get_done_event() const {
-  return _done_event;
-}
-
 /**
  * Returns the amount of time elapsed during the task's previous run cycle, in
  * seconds.

+ 65 - 16
panda/src/event/asyncTask.cxx

@@ -35,7 +35,6 @@ AsyncTask(const string &name) :
   _priority(0),
   _state(S_inactive),
   _servicing_thread(NULL),
-  _manager(NULL),
   _chain(NULL),
   _start_time(0.0),
   _start_frame(0),
@@ -68,11 +67,27 @@ AsyncTask::
  * S_inactive (or possible S_servicing_removed).  This is a no-op if the state
  * is already S_inactive.
  */
-void AsyncTask::
+bool AsyncTask::
 remove() {
-  if (_manager != (AsyncTaskManager *)NULL) {
-    _manager->remove(this);
+  AsyncTaskManager *manager = _manager;
+  if (manager != nullptr) {
+    nassertr(_chain->_manager == manager, false);
+    if (task_cat.is_debug()) {
+      task_cat.debug()
+        << "Removing " << *this << "\n";
+    }
+    MutexHolder holder(manager->_lock);
+    if (_chain->do_remove(this, true)) {
+      return true;
+    } else {
+      if (task_cat.is_debug()) {
+        task_cat.debug()
+          << "  (unable to remove " << *this << ")\n";
+      }
+      return false;
+    }
   }
+  return false;
 }
 
 /**
@@ -379,11 +394,26 @@ jump_to_task_chain(AsyncTaskManager *manager) {
  */
 AsyncTask::DoneStatus AsyncTask::
 unlock_and_do_task() {
-  nassertr(_manager != (AsyncTaskManager *)NULL, DS_done);
+  nassertr(_manager != nullptr, DS_done);
   PT(ClockObject) clock = _manager->get_clock();
 
+  // Indicate that this task is now the current task running on the thread.
   Thread *current_thread = Thread::get_current_thread();
-  record_task(current_thread);
+  nassertr(current_thread->_current_task == nullptr, DS_interrupt);
+
+  void *ptr = AtomicAdjust::compare_and_exchange_ptr
+    ((void * TVOLATILE &)current_thread->_current_task,
+     (void *)nullptr, (void *)this);
+
+  // If the return value is other than nullptr, someone else must have
+  // assigned the task first, in another thread.  That shouldn't be possible.
+
+  // But different versions of gcc appear to have problems compiling these
+  // assertions correctly.
+#ifndef __GNUC__
+  nassertr(ptr == nullptr, DS_interrupt);
+  nassertr(current_thread->_current_task == this, DS_interrupt);
+#endif  // __GNUC__
 
   // It's important to release the lock while the task is being serviced.
   _manager->_lock.release();
@@ -403,11 +433,36 @@ unlock_and_do_task() {
 
   _chain->_time_in_frame += _dt;
 
-  clear_task(current_thread);
+  // Now indicate that this is no longer the current task.
+  nassertr(current_thread->_current_task == this, status);
+
+  ptr = AtomicAdjust::compare_and_exchange_ptr
+    ((void * TVOLATILE &)current_thread->_current_task,
+     (void *)this, (void *)nullptr);
+
+  // If the return value is other than this, someone else must have assigned
+  // the task first, in another thread.  That shouldn't be possible.
+
+  // But different versions of gcc appear to have problems compiling these
+  // assertions correctly.
+#ifndef __GNUC__
+  nassertr(ptr == this, DS_interrupt);
+  nassertr(current_thread->_current_task == nullptr, DS_interrupt);
+#endif  // __GNUC__
 
   return status;
 }
 
+/**
+ * Cancels this task.  This is equivalent to remove().
+ */
+bool AsyncTask::
+cancel() {
+  bool result = remove();
+  nassertr(done(), false);
+  return result;
+}
+
 /**
  * Override this function to return true if the task can be successfully
  * executed, false if it cannot.  Mainly intended as a sanity check when
@@ -477,22 +532,16 @@ upon_birth(AsyncTaskManager *manager) {
  * task has been removed because it exited normally (returning DS_done), or
  * false if it was removed for some other reason (e.g.
  * AsyncTaskManager::remove()).  By the time this method is called, _manager
- * has been cleared, so the parameter manager indicates the original
+ * may have been cleared, so the parameter manager indicates the original
  * AsyncTaskManager that owned this task.
  *
- * The normal behavior is to throw the done_event only if clean_exit is true.
- *
  * This function is called with the lock *not* held.
  */
 void AsyncTask::
 upon_death(AsyncTaskManager *manager, bool clean_exit) {
-  if (clean_exit && !_done_event.empty()) {
-    PT_Event event = new Event(_done_event);
-    event->add_parameter(EventParameter(this));
-    throw_event(event);
-  }
+  //NB. done_event is now being thrown in AsyncFuture::notify_done().
 
-  // Also throw a generic remove event for the manager.
+  // Throw a generic remove event for the manager.
   if (manager != (AsyncTaskManager *)NULL) {
     string remove_name = manager->get_name() + "-removeTask";
     PT_Event event = new Event(remove_name);

+ 9 - 15
panda/src/event/asyncTask.h

@@ -15,8 +15,8 @@
 #define ASYNCTASK_H
 
 #include "pandabase.h"
-
-#include "asyncTaskBase.h"
+#include "asyncFuture.h"
+#include "namable.h"
 #include "pmutex.h"
 #include "conditionVar.h"
 #include "pStatCollector.h"
@@ -29,7 +29,7 @@ class AsyncTaskChain;
  * Normally, you would subclass from this class, and override do_task(), to
  * define the functionality you wish to have the task perform.
  */
-class EXPCL_PANDA_EVENT AsyncTask : public AsyncTaskBase {
+class EXPCL_PANDA_EVENT AsyncTask : public AsyncFuture, public Namable {
 public:
   AsyncTask(const string &name = string());
   ALLOC_DELETED_CHAIN(AsyncTask);
@@ -62,7 +62,7 @@ PUBLISHED:
   INLINE bool is_alive() const;
   INLINE AsyncTaskManager *get_manager() const;
 
-  void remove();
+  bool remove();
 
   INLINE void set_delay(double delay);
   INLINE void clear_delay();
@@ -92,7 +92,6 @@ PUBLISHED:
   INLINE int get_priority() const;
 
   INLINE void set_done_event(const string &done_event);
-  INLINE const string &get_done_event() const;
 
   INLINE double get_dt() const;
   INLINE double get_max_dt() const;
@@ -100,13 +99,12 @@ PUBLISHED:
 
   virtual void output(ostream &out) const;
 
-  EXTENSION(static PyObject *__await__(PyObject *self));
-  EXTENSION(static PyObject *__iter__(PyObject *self));
-
 protected:
   void jump_to_task_chain(AsyncTaskManager *manager);
   DoneStatus unlock_and_do_task();
 
+  virtual bool cancel() FINAL;
+
   virtual bool is_runnable();
   virtual DoneStatus do_task();
   virtual void upon_birth(AsyncTaskManager *manager);
@@ -120,11 +118,9 @@ protected:
   double _wake_time;
   int _sort;
   int _priority;
-  string _done_event;
 
   State _state;
   Thread *_servicing_thread;
-  AsyncTaskManager *_manager;
   AsyncTaskChain *_chain;
 
   double _start_time;
@@ -135,9 +131,6 @@ protected:
   double _total_dt;
   int _num_frames;
 
-  // Tasks waiting for this one to complete.
-  pvector<PT(AsyncTask)> _waiting_tasks;
-
   static AtomicAdjust::Integer _next_task_id;
 
   static PStatCollector _show_code_pcollector;
@@ -150,9 +143,9 @@ public:
     return _type_handle;
   }
   static void init_type() {
-    AsyncTaskBase::init_type();
+    AsyncFuture::init_type();
     register_type(_type_handle, "AsyncTask",
-                  AsyncTaskBase::get_class_type());
+                  AsyncFuture::get_class_type());
   }
   virtual TypeHandle get_type() const {
     return get_class_type();
@@ -162,6 +155,7 @@ public:
 private:
   static TypeHandle _type_handle;
 
+  friend class AsyncFuture;
   friend class AsyncTaskManager;
   friend class AsyncTaskChain;
   friend class AsyncTaskSequence;

+ 16 - 29
panda/src/event/asyncTaskChain.cxx

@@ -456,24 +456,21 @@ do_add(AsyncTask *task) {
 /**
  * Removes the indicated task from this chain.  Returns true if removed, false
  * otherwise.  Assumes the lock is already held.  The task->upon_death()
- * method is *not* called.
+ * method is called with clean_exit=false if upon_death is given.
  */
 bool AsyncTaskChain::
-do_remove(AsyncTask *task) {
-  bool removed = false;
-
+do_remove(AsyncTask *task, bool upon_death) {
   nassertr(task->_chain == this, false);
 
   switch (task->_state) {
   case AsyncTask::S_servicing:
-    // This task is being serviced.
+    // This task is being serviced.  upon_death will be called afterwards.
     task->_state = AsyncTask::S_servicing_removed;
-    removed = true;
-    break;
+    return true;
 
   case AsyncTask::S_servicing_removed:
-    // Being serviced, though it will be removed later.
-    break;
+    // Being serviced, though it is already marked to be removed afterwards.
+    return false;
 
   case AsyncTask::S_sleeping:
     // Sleeping, easy.
@@ -482,10 +479,9 @@ do_remove(AsyncTask *task) {
       nassertr(index != -1, false);
       _sleeping.erase(_sleeping.begin() + index);
       make_heap(_sleeping.begin(), _sleeping.end(), AsyncTaskSortWakeTime());
-      removed = true;
-      cleanup_task(task, false, false);
+      cleanup_task(task, upon_death, false);
     }
-    break;
+    return true;
 
   case AsyncTask::S_active:
     {
@@ -503,15 +499,15 @@ do_remove(AsyncTask *task) {
           nassertr(index != -1, false);
         }
       }
-      removed = true;
-      cleanup_task(task, false, false);
+      cleanup_task(task, upon_death, false);
+      return true;
     }
 
   default:
     break;
   }
 
-  return removed;
+  return false;
 }
 
 /**
@@ -776,27 +772,18 @@ cleanup_task(AsyncTask *task, bool upon_death, bool clean_exit) {
   PT(AsyncTask) hold_task = task;
 
   task->_state = AsyncTask::S_inactive;
-  task->_chain = NULL;
-  task->_manager = NULL;
+  task->_chain = nullptr;
   --_num_tasks;
   --(_manager->_num_tasks);
 
   _manager->remove_task_by_name(task);
 
-  // Activate the tasks that were waiting for this one to finish.
-  if (upon_death) {
-    pvector<PT(AsyncTask)>::iterator it;
-    for (it = task->_waiting_tasks.begin(); it != task->_waiting_tasks.end(); ++it) {
-      AsyncTask *task = *it;
-      // Note that this task may not be on the same task chain.
-      nassertd(task->_manager == _manager) continue;
-      task->_state = AsyncTask::S_active;
-      task->_chain->_active.push_back(task);
-      --task->_chain->_num_awaiting_tasks;
-    }
-    task->_waiting_tasks.clear();
+  if (upon_death && !task->done()) {
+    task->notify_done(clean_exit);
   }
 
+  task->_manager = nullptr;
+
   if (upon_death) {
     _manager->_lock.release();
     task->upon_death(_manager, clean_exit);

+ 2 - 1
panda/src/event/asyncTaskChain.h

@@ -96,7 +96,7 @@ protected:
   typedef pvector< PT(AsyncTask) > TaskHeap;
 
   void do_add(AsyncTask *task);
-  bool do_remove(AsyncTask *task);
+  bool do_remove(AsyncTask *task, bool upon_death=false);
   void do_wait_for_tasks();
   void do_cleanup();
 
@@ -206,6 +206,7 @@ public:
 private:
   static TypeHandle _type_handle;
 
+  friend class AsyncFuture;
   friend class AsyncTaskChainThread;
   friend class AsyncTask;
   friend class AsyncTaskManager;

+ 19 - 19
panda/src/event/asyncTaskCollection.cxx

@@ -63,14 +63,15 @@ add_task(AsyncTask *task) {
  */
 bool AsyncTaskCollection::
 remove_task(AsyncTask *task) {
-  int task_index = -1;
-  for (int i = 0; task_index == -1 && i < (int)_tasks.size(); i++) {
+  size_t task_index = (size_t)-1;
+  for (size_t i = 0; i < _tasks.size(); ++i) {
     if (_tasks[i] == task) {
       task_index = i;
+      break;
     }
   }
 
-  if (task_index == -1) {
+  if (task_index == (size_t)-1) {
     // The indicated task was not a member of the collection.
     return false;
   }
@@ -129,12 +130,12 @@ void AsyncTaskCollection::
 remove_duplicate_tasks() {
   AsyncTasks new_tasks;
 
-  int num_tasks = get_num_tasks();
-  for (int i = 0; i < num_tasks; i++) {
+  size_t num_tasks = get_num_tasks();
+  for (size_t i = 0; i < num_tasks; i++) {
     PT(AsyncTask) task = get_task(i);
     bool duplicated = false;
 
-    for (int j = 0; j < i && !duplicated; j++) {
+    for (size_t j = 0; j < i && !duplicated; j++) {
       duplicated = (task == get_task(j));
     }
 
@@ -152,7 +153,7 @@ remove_duplicate_tasks() {
  */
 bool AsyncTaskCollection::
 has_task(AsyncTask *task) const {
-  for (int i = 0; i < get_num_tasks(); i++) {
+  for (size_t i = 0; i < get_num_tasks(); i++) {
     if (task == get_task(i)) {
       return true;
     }
@@ -174,8 +175,8 @@ clear() {
  */
 AsyncTask *AsyncTaskCollection::
 find_task(const string &name) const {
-  int num_tasks = get_num_tasks();
-  for (int i = 0; i < num_tasks; i++) {
+  size_t num_tasks = get_num_tasks();
+  for (size_t i = 0; i < num_tasks; ++i) {
     AsyncTask *task = get_task(i);
     if (task->get_name() == name) {
       return task;
@@ -187,7 +188,7 @@ find_task(const string &name) const {
 /**
  * Returns the number of AsyncTasks in the collection.
  */
-int AsyncTaskCollection::
+size_t AsyncTaskCollection::
 get_num_tasks() const {
   return _tasks.size();
 }
@@ -196,8 +197,8 @@ get_num_tasks() const {
  * Returns the nth AsyncTask in the collection.
  */
 AsyncTask *AsyncTaskCollection::
-get_task(int index) const {
-  nassertr(index >= 0 && index < (int)_tasks.size(), NULL);
+get_task(size_t index) const {
+  nassertr(index < _tasks.size(), nullptr);
 
   return _tasks[index];
 }
@@ -206,7 +207,7 @@ get_task(int index) const {
  * Removes the nth AsyncTask from the collection.
  */
 void AsyncTaskCollection::
-remove_task(int index) {
+remove_task(size_t index) {
   // If the pointer to our internal array is shared by any other
   // AsyncTaskCollections, we have to copy the array now so we won't
   // inadvertently modify any of our brethren AsyncTaskCollection objects.
@@ -217,7 +218,7 @@ remove_task(int index) {
     _tasks.v() = old_tasks.v();
   }
 
-  nassertv(index >= 0 && index < (int)_tasks.size());
+  nassertv(index < _tasks.size());
   _tasks.erase(_tasks.begin() + index);
 }
 
@@ -226,9 +227,8 @@ remove_task(int index) {
  * get_task(), but it may be a more convenient way to access it.
  */
 AsyncTask *AsyncTaskCollection::
-operator [] (int index) const {
-  nassertr(index >= 0 && index < (int)_tasks.size(), NULL);
-
+operator [] (size_t index) const {
+  nassertr(index < _tasks.size(), nullptr);
   return _tasks[index];
 }
 
@@ -236,7 +236,7 @@ operator [] (int index) const {
  * Returns the number of tasks in the collection.  This is the same thing as
  * get_num_tasks().
  */
-int AsyncTaskCollection::
+size_t AsyncTaskCollection::
 size() const {
   return _tasks.size();
 }
@@ -260,7 +260,7 @@ output(ostream &out) const {
  */
 void AsyncTaskCollection::
 write(ostream &out, int indent_level) const {
-  for (int i = 0; i < get_num_tasks(); i++) {
+  for (size_t i = 0; i < get_num_tasks(); i++) {
     indent(out, indent_level) << *get_task(i) << "\n";
   }
 }

+ 5 - 5
panda/src/event/asyncTaskCollection.h

@@ -41,12 +41,12 @@ PUBLISHED:
 
   AsyncTask *find_task(const string &name) const;
 
-  int get_num_tasks() const;
-  AsyncTask *get_task(int index) const;
+  size_t get_num_tasks() const;
+  AsyncTask *get_task(size_t index) const;
   MAKE_SEQ(get_tasks, get_num_tasks, get_task);
-  void remove_task(int index);
-  AsyncTask *operator [] (int index) const;
-  int size() const;
+  void remove_task(size_t index);
+  AsyncTask *operator [] (size_t index) const;
+  size_t size() const;
   INLINE void operator += (const AsyncTaskCollection &other);
   INLINE AsyncTaskCollection operator + (const AsyncTaskCollection &other) const;
 

+ 1 - 1
panda/src/event/asyncTaskManager.I

@@ -36,7 +36,7 @@ get_clock() {
  * Returns the number of tasks that are currently active or sleeping within
  * the task manager.
  */
-INLINE int AsyncTaskManager::
+INLINE size_t AsyncTaskManager::
 get_num_tasks() const {
   MutexHolder holder(_lock);
   return _num_tasks;

+ 6 - 14
panda/src/event/asyncTaskManager.cxx

@@ -307,25 +307,20 @@ find_tasks_matching(const GlobPattern &pattern) const {
  */
 bool AsyncTaskManager::
 remove(AsyncTask *task) {
-  // We pass this up to the multi-task remove() flavor.  Do we care about the
-  // tiny cost of creating an AsyncTaskCollection here?  Probably not.
-  AsyncTaskCollection tasks;
-  tasks.add_task(task);
-  return remove(tasks) != 0;
+  return task->remove();
 }
 
 /**
  * Removes all of the tasks in the AsyncTaskCollection.  Returns the number of
  * tasks removed.
  */
-int AsyncTaskManager::
+size_t AsyncTaskManager::
 remove(const AsyncTaskCollection &tasks) {
   MutexHolder holder(_lock);
-  int num_removed = 0;
+  size_t num_removed = 0;
 
-  int num_tasks = tasks.get_num_tasks();
-  int i;
-  for (i = 0; i < num_tasks; ++i) {
+  size_t num_tasks = tasks.get_num_tasks();
+  for (size_t i = 0; i < num_tasks; ++i) {
     PT(AsyncTask) task = tasks.get_task(i);
 
     if (task->_manager != this) {
@@ -337,10 +332,7 @@ remove(const AsyncTaskCollection &tasks) {
         task_cat.debug()
           << "Removing " << *task << "\n";
       }
-      if (task->_chain->do_remove(task)) {
-        _lock.release();
-        task->upon_death(this, false);
-        _lock.acquire();
+      if (task->_chain->do_remove(task, true)) {
         ++num_removed;
       } else {
         if (task_cat.is_debug()) {

+ 4 - 3
panda/src/event/asyncTaskManager.h

@@ -71,13 +71,13 @@ PUBLISHED:
   AsyncTaskCollection find_tasks_matching(const GlobPattern &pattern) const;
 
   bool remove(AsyncTask *task);
-  int remove(const AsyncTaskCollection &tasks);
+  size_t remove(const AsyncTaskCollection &tasks);
 
   BLOCKING void wait_for_tasks();
   BLOCKING void stop_threads();
   void start_threads();
 
-  INLINE int get_num_tasks() const;
+  INLINE size_t get_num_tasks() const;
 
   AsyncTaskCollection get_tasks() const;
   AsyncTaskCollection get_active_tasks() const;
@@ -126,7 +126,7 @@ protected:
   typedef ov_set<PT(AsyncTaskChain), IndirectCompareNames<AsyncTaskChain> > TaskChains;
   TaskChains _task_chains;
 
-  int _num_tasks;
+  size_t _num_tasks;
   TasksByName _tasks_by_name;
   PT(ClockObject) _clock;
 
@@ -151,6 +151,7 @@ public:
 private:
   static TypeHandle _type_handle;
 
+  friend class AsyncFuture;
   friend class AsyncTaskChain;
   friend class AsyncTaskChain::AsyncTaskChainThread;
   friend class AsyncTask;

+ 0 - 75
panda/src/event/asyncTask_ext.cxx

@@ -1,75 +0,0 @@
-/**
- * PANDA 3D SOFTWARE
- * Copyright (c) Carnegie Mellon University.  All rights reserved.
- *
- * All use of this software is subject to the terms of the revised BSD
- * license.  You should have received a copy of this license along
- * with this source code in a file named "LICENSE."
- *
- * @file asyncTask_ext.h
- * @author rdb
- * @date 2017-10-29
- */
-
-#include "asyncTask_ext.h"
-#include "nodePath.h"
-
-#ifdef HAVE_PYTHON
-
-#ifndef CPPPARSER
-extern struct Dtool_PyTypedObject Dtool_AsyncTask;
-#endif
-
-/**
- * Yields continuously until the task has finished.
- */
-static PyObject *gen_next(PyObject *self) {
-  const AsyncTask *request = nullptr;
-  if (!Dtool_Call_ExtractThisPointer(self, Dtool_AsyncTask, (void **)&request)) {
-    return nullptr;
-  }
-
-  if (request->is_alive()) {
-    // Continue awaiting the result.
-    Py_INCREF(self);
-    return self;
-  } else {
-    // It's done.  Do we have a method like result(), eg. in the case of a
-    // ModelLoadRequest?  In that case we pass that value into the exception.
-    PyObject *method = PyObject_GetAttrString(self, "result");
-    PyObject *result = nullptr;
-    if (method != nullptr) {
-      if (PyCallable_Check(method)) {
-        result = _PyObject_CallNoArg(method);
-        Py_DECREF(method);
-        if (result == nullptr) {
-          // An exception happened.  Pass it on.
-          return nullptr;
-        }
-      }
-      Py_DECREF(method);
-    }
-    Py_INCREF(PyExc_StopIteration);
-    PyErr_Restore(PyExc_StopIteration, result, nullptr);
-    return nullptr;
-  }
-}
-
-/**
- * Returns a generator that continuously yields an awaitable until the task
- * has finished.  This allows syntax like `model = await loader.load...` to be
- * used in a Python coroutine.
- */
-PyObject *Extension<AsyncTask>::
-__await__(PyObject *self) {
-  Dtool_GeneratorWrapper *gen;
-  gen = (Dtool_GeneratorWrapper *)PyType_GenericAlloc(&Dtool_GeneratorWrapper_Type, 0);
-  if (gen != nullptr) {
-    Py_INCREF(self);
-    gen->_base._self = self;
-    gen->_iternext_func = &gen_next;
-  }
-  return (PyObject *)gen;
-}
-
-#endif

+ 2 - 0
panda/src/event/config_event.cxx

@@ -12,6 +12,7 @@
  */
 
 #include "config_event.h"
+#include "asyncFuture.h"
 #include "asyncTask.h"
 #include "asyncTaskChain.h"
 #include "asyncTaskManager.h"
@@ -31,6 +32,7 @@ NotifyCategoryDef(event, "");
 NotifyCategoryDef(task, "");
 
 ConfigureFn(config_event) {
+  AsyncFuture::init_type();
   AsyncTask::init_type();
   AsyncTaskChain::init_type();
   AsyncTaskManager::init_type();

+ 32 - 0
panda/src/event/eventHandler.cxx

@@ -27,6 +27,26 @@ EventHandler::
 EventHandler(EventQueue *ev_queue) : _queue(*ev_queue) {
 }
 
+/**
+ * Returns a pending future that will be marked as done when the event is next
+ * fired.
+ */
+AsyncFuture *EventHandler::
+get_future(const string &event_name) {
+  Futures::iterator fi;
+  fi = _futures.find(event_name);
+
+  // If we already have a future, but someone cancelled it, we need to create
+  // a new future instead.
+  if (fi != _futures.end() && !fi->second->cancelled()) {
+    return fi->second;
+  } else {
+    AsyncFuture *fut = new AsyncFuture;
+    _futures[event_name] = fut;
+    return fut;
+  }
+}
+
 /**
  * The main processing loop of the EventHandler.  This function must be called
  * periodically to service events.  Walks through each pending event and calls
@@ -81,6 +101,18 @@ dispatch_event(const Event *event) {
       ((*cfi).first)(event, (*cfi).second);
     }
   }
+
+  // Finally, check for futures that need to be triggered.
+  Futures::const_iterator fi;
+  fi = _futures.find(event->get_name());
+
+  if (fi != _futures.end()) {
+    AsyncFuture *fut = (*fi).second;
+    if (!fut->done()) {
+      fut->set_result((TypedReferenceCount *)event);
+    }
+    _futures.erase(fi);
+  }
 }
 
 

+ 7 - 1
panda/src/event/eventHandler.h

@@ -18,6 +18,7 @@
 
 #include "event.h"
 #include "pt_Event.h"
+#include "asyncFuture.h"
 
 #include "pset.h"
 #include "pmap.h"
@@ -41,10 +42,13 @@ public:
 
 PUBLISHED:
   explicit EventHandler(EventQueue *ev_queue);
+  ~EventHandler() {}
+
+  AsyncFuture *get_future(const string &event_name);
 
   void process_events();
 
-  virtual void dispatch_event(const Event *);
+  virtual void dispatch_event(const Event *event);
 
   void write(ostream &out) const;
 
@@ -74,9 +78,11 @@ protected:
   typedef pair<EventCallbackFunction*, void*> CallbackFunction;
   typedef pset<CallbackFunction> CallbackFunctions;
   typedef pmap<string, CallbackFunctions> CallbackHooks;
+  typedef pmap<string, PT(AsyncFuture)> Futures;
 
   Hooks _hooks;
   CallbackHooks _cbhooks;
+  Futures _futures;
   EventQueue &_queue;
 
   static EventHandler *_global_event_handler;

+ 0 - 7
panda/src/event/eventParameter.I

@@ -11,13 +11,6 @@
  * @date 1999-02-08
  */
 
-/**
- * Defines an EventParameter that stores nothing: the "empty" parameter.
- */
-INLINE EventParameter::
-EventParameter() {
-}
-
 /**
  * Defines an EventParameter that stores a pointer to any kind of
  * TypedWritableReferenceCount object.  This is the most general constructor.

+ 2 - 1
panda/src/event/eventParameter.h

@@ -34,7 +34,8 @@
  */
 class EXPCL_PANDA_EVENT EventParameter {
 PUBLISHED:
-  INLINE EventParameter();
+  INLINE EventParameter() DEFAULT_CTOR;
+  INLINE EventParameter(nullptr_t) {};
   INLINE EventParameter(const TypedWritableReferenceCount *ptr);
   INLINE EventParameter(const TypedReferenceCount *ptr);
   INLINE EventParameter(int value);

+ 1 - 0
panda/src/event/p3event_composite1.cxx

@@ -1,3 +1,4 @@
+#include "asyncFuture.cxx"
 #include "asyncTask.cxx"
 #include "asyncTaskChain.cxx"
 #include "asyncTaskCollection.cxx"

+ 3 - 8
panda/src/event/pythonTask.I

@@ -45,17 +45,12 @@ get_owner() const {
  */
 INLINE void PythonTask::
 set_result(PyObject *result) {
+  // Note that we don't call notify_done() here since the done status will be
+  // automatically notified upon the task's completion.
   nassertv(is_alive());
+  nassertv(!done());
   nassertv(_exception == nullptr);
   Py_INCREF(result);
   Py_XDECREF(_exc_value);
   _exc_value = result;
 }
-
-/**
- * Same as __await__, for backward compatibility with the old coroutine way.
- */
-INLINE PyObject *PythonTask::
-__iter__(PyObject *self) {
-  return __await__(self);
-}

+ 45 - 69
panda/src/event/pythonTask.cxx

@@ -25,7 +25,7 @@ TypeHandle PythonTask::_type_handle;
 
 #ifndef CPPPARSER
 extern struct Dtool_PyTypedObject Dtool_TypedReferenceCount;
-extern struct Dtool_PyTypedObject Dtool_AsyncTask;
+extern struct Dtool_PyTypedObject Dtool_AsyncFuture;
 extern struct Dtool_PyTypedObject Dtool_PythonTask;
 #endif
 
@@ -238,8 +238,8 @@ set_owner(PyObject *owner) {
  * exception occurred within this task, it is raised instead.
  */
 PyObject *PythonTask::
-result() const {
-  nassertr(!is_alive(), nullptr);
+get_result() const {
+  nassertr(done(), nullptr);
 
   if (_exception == nullptr) {
     // The result of the call is stored in _exc_value.
@@ -273,49 +273,6 @@ exception() const {
   }
 }*/
 
-/**
- * Returns an iterator that continuously yields an awaitable until the task
- * has finished.
- */
-PyObject *PythonTask::
-__await__(PyObject *self) {
-  Dtool_GeneratorWrapper *gen;
-  gen = (Dtool_GeneratorWrapper *)PyType_GenericAlloc(&Dtool_GeneratorWrapper_Type, 0);
-  if (gen != nullptr) {
-    Py_INCREF(self);
-    gen->_base._self = self;
-    gen->_iternext_func = &gen_next;
-  }
-  return (PyObject *)gen;
-}
-
-/**
- * Yields continuously until a task has finished.
- */
-PyObject *PythonTask::
-gen_next(PyObject *self) {
-  const PythonTask *task = nullptr;
-  if (!Dtool_Call_ExtractThisPointer(self, Dtool_PythonTask, (void **)&task)) {
-    return nullptr;
-  }
-
-  if (task->is_alive()) {
-    Py_INCREF(self);
-    return self;
-  } else if (task->_exception != nullptr) {
-    task->_retrieved_exception = true;
-    Py_INCREF(task->_exception);
-    Py_INCREF(task->_exc_value);
-    Py_INCREF(task->_exc_traceback);
-    PyErr_Restore(task->_exception, task->_exc_value, task->_exc_traceback);
-    return nullptr;
-  } else {
-    // The result of the call is stored in _exc_value.
-    PyErr_SetObject(PyExc_StopIteration, task->_exc_value);
-    return nullptr;
-  }
-}
-
 /**
  * Maps from an expression like "task.attr_name = v". This is customized here
  * so we can support some traditional task interfaces that supported directly
@@ -620,36 +577,39 @@ do_python_task() {
           }
         }
 
-        // Tell the task chain we want to kill ourselves.  It doesn't really
-        // matter what we return if we set S_servicing_removed.  If we don't
-        // set it, however, it will think this was a clean exit.
-        _manager->_lock.acquire();
-        _state = S_servicing_removed;
-        _manager->_lock.release();
-        return DS_interrupt;
+        // Tell the task chain we want to kill ourselves.  We indicate this is
+        // a "clean exit" because we still want to run the done callbacks on
+        // exception.
+        return DS_done;
       }
 
     } else if (DtoolCanThisBeAPandaInstance(result)) {
-      // We are waiting for a task to finish.
-      void *ptr = ((Dtool_PyInstDef *)result)->_My_Type->_Dtool_UpcastInterface(result, &Dtool_AsyncTask);
+      // We are waiting for an AsyncFuture (eg. other task) to finish.
+      void *ptr = ((Dtool_PyInstDef *)result)->_My_Type->_Dtool_UpcastInterface(result, &Dtool_AsyncFuture);
       if (ptr != nullptr) {
         // Suspend execution of this task until this other task has completed.
-        AsyncTask *task = (AsyncTask *)ptr;
-        AsyncTaskManager *manager = task->_manager;
-        nassertr(manager != nullptr, DS_interrupt);
+        AsyncFuture *fut = (AsyncFuture *)ptr;
+        AsyncTaskManager *manager = fut->_manager;
+        if (manager == nullptr) {
+          manager = _manager;
+          fut->_manager = manager;
+        }
         nassertr(manager == _manager, DS_interrupt);
-        manager->_lock.acquire();
-        if (task != (AsyncTask *)this) {
-          if (task->is_alive()) {
+        MutexHolder holder(manager->_lock);
+        if (fut != (AsyncFuture *)this) {
+          if (!fut->done()) {
             if (task_cat.is_debug()) {
               task_cat.debug()
-                << *this << " is now awaiting <" << *task << ">.\n";
+                << *this << " is now awaiting <" << *fut << ">.\n";
             }
-            task->_waiting_tasks.push_back(this);
+            fut->add_waiting_task(this);
           } else {
             // The task is already done.  Continue at next opportunity.
+            if (task_cat.is_debug()) {
+              task_cat.debug()
+                << *this << " would await <" << *fut << ">, were it not already done.\n";
+            }
             Py_DECREF(result);
-            manager->_lock.release();
             return DS_cont;
           }
         } else {
@@ -658,14 +618,12 @@ do_python_task() {
           task_cat.error()
             << *this << " cannot await itself\n";
         }
-        task->_manager->_lock.release();
         Py_DECREF(result);
         return DS_await;
       }
-
     } else {
-      // We are waiting for a future to finish.  We currently implement this
-      // by simply checking every frame whether the future is done.
+      // We are waiting for a non-Panda future to finish.  We currently
+      // implement this by checking every frame whether the future is done.
       PyObject *check = PyObject_GetAttrString(result, "_asyncio_future_blocking");
       if (check != nullptr && check != Py_None) {
         Py_DECREF(check);
@@ -680,7 +638,7 @@ do_python_task() {
         if (task_cat.is_debug()) {
           PyObject *str = PyObject_ASCII(result);
           task_cat.debug()
-            << *this << " is now awaiting " << PyUnicode_AsUTF8(str) << ".\n";
+            << *this << " is now polling " << PyUnicode_AsUTF8(str) << ".done()\n";
           Py_DECREF(str);
         }
 #endif
@@ -746,6 +704,24 @@ do_python_task() {
     }
   }
 
+  // This is unfortunate, but some are returning task.done, which nowadays
+  // conflicts with the AsyncFuture method.  Check if that is being returned.
+  PyMethodDef *meth = nullptr;
+  if (PyCFunction_Check(result)) {
+    meth = ((PyCFunctionObject *)result)->m_ml;
+#if PY_MAJOR_VERSION >= 3
+  } else if (Py_TYPE(result) == &PyMethodDescr_Type) {
+#else
+  } else if (strcmp(Py_TYPE(result)->tp_name, "method_descriptor") == 0) {
+#endif
+    meth = ((PyMethodDescrObject *)result)->d_method;
+  }
+
+  if (meth != nullptr && strcmp(meth->ml_name, "done") == 0) {
+    Py_DECREF(result);
+    return DS_done;
+  }
+
   ostringstream strm;
 #if PY_MAJOR_VERSION >= 3
   PyObject *str = PyObject_ASCII(result);

+ 9 - 6
panda/src/event/pythonTask.h

@@ -20,6 +20,7 @@
 
 #ifdef HAVE_PYTHON
 #include "py_panda.h"
+#include "extension.h"
 
 /**
  * This class exists to allow association of a Python function or coroutine
@@ -44,12 +45,14 @@ PUBLISHED:
   INLINE PyObject *get_owner() const;
 
   INLINE void set_result(PyObject *result);
-  PyObject *result() const;
-  //PyObject *exception() const;
 
-  static PyObject *__await__(PyObject *self);
-  INLINE static PyObject *__iter__(PyObject *self);
+public:
+  // This is exposed only for the result() function in asyncFuture_ext.cxx
+  // to use, which is why it is not published.
+  PyObject *get_result() const;
+  //PyObject *exception() const;
 
+PUBLISHED:
   int __setattr__(PyObject *self, PyObject *attr, PyObject *v);
   int __delattr__(PyObject *self, PyObject *attr);
   PyObject *__getattr__(PyObject *attr) const;
@@ -101,8 +104,6 @@ protected:
   virtual void upon_death(AsyncTaskManager *manager, bool clean_exit);
 
 private:
-  static PyObject *gen_next(PyObject *self);
-
   void register_to_owner();
   void unregister_from_owner();
   void call_owner_method(const char *method_name);
@@ -125,6 +126,8 @@ private:
   bool _registered_to_owner;
   mutable bool _retrieved_exception;
 
+  friend class Extension<AsyncFuture>;
+
 public:
   static TypeHandle get_class_type() {
     return _type_handle;

+ 4 - 3
panda/src/gobj/animateVerticesRequest.I

@@ -16,15 +16,16 @@
  */
 INLINE AnimateVerticesRequest::
 AnimateVerticesRequest(GeomVertexData *geom_vertex_data) :
-  _geom_vertex_data(geom_vertex_data),
-  _is_ready(false)
+  _geom_vertex_data(geom_vertex_data)
 {
 }
 
 /**
  * Returns true if this request has completed, false if it is still pending.
+ * Equivalent to `req.done() and not req.cancelled()`.
+ * @see done()
  */
 INLINE bool AnimateVerticesRequest::
 is_ready() const {
-  return _is_ready;
+  return (FutureState)AtomicAdjust::get(_future_state) == FS_finished;
 }

+ 0 - 1
panda/src/gobj/animateVerticesRequest.cxx

@@ -26,7 +26,6 @@ do_task() {
   // There is no need to store or return a result.  The GeomVertexData caches
   // the result and it will be used later in the rendering process.
   _geom_vertex_data->animate_vertices(true, current_thread);
-  _is_ready = true;
 
   // Don't continue the task; we're done.
   return AsyncTask::DS_done;

+ 1 - 2
panda/src/gobj/animateVerticesRequest.h

@@ -40,11 +40,10 @@ PUBLISHED:
   INLINE bool is_ready() const;
 
 protected:
-    virtual AsyncTask::DoneStatus do_task();
+  virtual AsyncTask::DoneStatus do_task();
 
 private:
   PT(GeomVertexData) _geom_vertex_data;
-  bool _is_ready;
 
 public:
   static TypeHandle get_class_type() {

+ 1 - 0
panda/src/gobj/config_gobj.cxx

@@ -572,6 +572,7 @@ ConfigureFn(config_gobj) {
   ParamTextureImage::init_type();
   ParamTextureSampler::init_type();
   PerspectiveLens::init_type();
+  PreparedGraphicsObjects::EnqueuedObject::init_type();
   QueryContext::init_type();
   SamplerContext::init_type();
   SamplerState::init_type();

+ 147 - 7
panda/src/gobj/preparedGraphicsObjects.cxx

@@ -27,6 +27,8 @@
 #include "config_gobj.h"
 #include "throw_event.h"
 
+TypeHandle PreparedGraphicsObjects::EnqueuedObject::_type_handle;
+
 int PreparedGraphicsObjects::_name_index = 0;
 
 /**
@@ -191,7 +193,25 @@ void PreparedGraphicsObjects::
 enqueue_texture(Texture *tex) {
   ReMutexHolder holder(_lock);
 
-  _enqueued_textures.insert(tex);
+  _enqueued_textures.insert(EnqueuedTextures::value_type(tex, nullptr));
+}
+
+/**
+ * Like enqueue_texture, but returns an AsyncFuture that can be used to query
+ * the status of the texture's preparation.
+ */
+PT(PreparedGraphicsObjects::EnqueuedObject) PreparedGraphicsObjects::
+enqueue_texture_future(Texture *tex) {
+  ReMutexHolder holder(_lock);
+
+  pair<EnqueuedTextures::iterator, bool> result =
+    _enqueued_textures.insert(EnqueuedTextures::value_type(tex, nullptr));
+  if (result.first->second == nullptr) {
+    result.first->second = new EnqueuedObject(this, tex);
+  }
+  PT(EnqueuedObject) fut = result.first->second;
+  nassertr(!fut->cancelled(), fut)
+  return fut;
 }
 
 /**
@@ -220,6 +240,9 @@ dequeue_texture(Texture *tex) {
 
   EnqueuedTextures::iterator qi = _enqueued_textures.find(tex);
   if (qi != _enqueued_textures.end()) {
+    if (qi->second != nullptr) {
+      qi->second->notify_removed();
+    }
     _enqueued_textures.erase(qi);
     return true;
   }
@@ -291,6 +314,17 @@ release_all_textures() {
   }
 
   _prepared_textures.clear();
+
+  // Mark any futures as cancelled.
+  EnqueuedTextures::iterator qti;
+  for (qti = _enqueued_textures.begin();
+       qti != _enqueued_textures.end();
+       ++qti) {
+    if (qti->second != nullptr) {
+      qti->second->notify_removed();
+    }
+  }
+
   _enqueued_textures.clear();
 
   return num_textures;
@@ -665,10 +699,28 @@ prepare_geom_now(Geom *geom, GraphicsStateGuardianBase *gsg) {
  * when the GSG is next ready to do this (presumably at the next frame).
  */
 void PreparedGraphicsObjects::
-enqueue_shader(Shader *se) {
+enqueue_shader(Shader *shader) {
   ReMutexHolder holder(_lock);
 
-  _enqueued_shaders.insert(se);
+  _enqueued_shaders.insert(EnqueuedShaders::value_type(shader, nullptr));
+}
+
+/**
+ * Like enqueue_shader, but returns an AsyncFuture that can be used to query
+ * the status of the shader's preparation.
+ */
+PT(PreparedGraphicsObjects::EnqueuedObject) PreparedGraphicsObjects::
+enqueue_shader_future(Shader *shader) {
+  ReMutexHolder holder(_lock);
+
+  pair<EnqueuedShaders::iterator, bool> result =
+    _enqueued_shaders.insert(EnqueuedShaders::value_type(shader, nullptr));
+  if (result.first->second == nullptr) {
+    result.first->second = new EnqueuedObject(this, shader);
+  }
+  PT(EnqueuedObject) fut = result.first->second;
+  nassertr(!fut->cancelled(), fut)
+  return fut;
 }
 
 /**
@@ -697,6 +749,9 @@ dequeue_shader(Shader *se) {
 
   EnqueuedShaders::iterator qi = _enqueued_shaders.find(se);
   if (qi != _enqueued_shaders.end()) {
+    if (qi->second != nullptr) {
+      qi->second->notify_removed();
+    }
     _enqueued_shaders.erase(qi);
     return true;
   }
@@ -759,6 +814,17 @@ release_all_shaders() {
   }
 
   _prepared_shaders.clear();
+
+  // Mark any futures as cancelled.
+  EnqueuedShaders::iterator qsi;
+  for (qsi = _enqueued_shaders.begin();
+       qsi != _enqueued_shaders.end();
+       ++qsi) {
+    if (qsi->second != nullptr) {
+      qsi->second->notify_removed();
+    }
+  }
+
   _enqueued_shaders.clear();
 
   return num_shaders;
@@ -1358,6 +1424,73 @@ prepare_shader_buffer_now(ShaderBuffer *data, GraphicsStateGuardianBase *gsg) {
   return bc;
 }
 
+/**
+ * Creates a new future for the given object.
+ */
+PreparedGraphicsObjects::EnqueuedObject::
+EnqueuedObject(PreparedGraphicsObjects *pgo, TypedWritableReferenceCount *object) :
+  _pgo(pgo),
+  _object(object) {
+}
+
+/**
+ * Indicates that the preparation request is done.
+ */
+void PreparedGraphicsObjects::EnqueuedObject::
+set_result(SavedContext *context) {
+  nassertv(!done());
+  AsyncFuture::set_result(context);
+  _pgo = nullptr;
+}
+
+/**
+ * Called by PreparedGraphicsObjects to indicate that the preparation request
+ * has been cancelled.
+ */
+void PreparedGraphicsObjects::EnqueuedObject::
+notify_removed() {
+  _pgo = nullptr;
+  nassertv_always(AsyncFuture::cancel());
+}
+
+/**
+ * Cancels the pending preparation request.  Has no effect if the preparation
+ * is already complete or was already cancelled.
+ */
+bool PreparedGraphicsObjects::EnqueuedObject::
+cancel() {
+  PreparedGraphicsObjects *pgo = _pgo;
+  if (_object == nullptr || pgo == nullptr) {
+    nassertr(done(), false);
+    return false;
+  }
+
+  // We don't upcall here, because the dequeue function will end up calling
+  // notify_removed().
+  _result = nullptr;
+  _pgo = nullptr;
+
+  if (_object->is_of_type(Texture::get_class_type())) {
+    return pgo->dequeue_texture((Texture *)_object.p());
+
+  } else if (_object->is_of_type(Geom::get_class_type())) {
+    return pgo->dequeue_geom((Geom *)_object.p());
+
+  } else if (_object->is_of_type(Shader::get_class_type())) {
+    return pgo->dequeue_shader((Shader *)_object.p());
+
+  } else if (_object->is_of_type(GeomVertexArrayData::get_class_type())) {
+    return pgo->dequeue_vertex_buffer((GeomVertexArrayData *)_object.p());
+
+  } else if (_object->is_of_type(GeomPrimitive::get_class_type())) {
+    return pgo->dequeue_index_buffer((GeomPrimitive *)_object.p());
+
+  } else if (_object->is_of_type(ShaderBuffer::get_class_type())) {
+    return pgo->dequeue_shader_buffer((ShaderBuffer *)_object.p());
+  }
+  return false;
+}
+
 /**
  * This is called by the GraphicsStateGuardian to indicate that it is about to
  * begin processing of the frame.
@@ -1446,11 +1579,15 @@ begin_frame(GraphicsStateGuardianBase *gsg, Thread *current_thread) {
   for (qti = _enqueued_textures.begin();
        qti != _enqueued_textures.end();
        ++qti) {
-    Texture *tex = (*qti);
+    Texture *tex = qti->first;
+    TextureContext *first_tc = nullptr;
     for (int view = 0; view < tex->get_num_views(); ++view) {
       TextureContext *tc = tex->prepare_now(view, this, gsg);
-      if (tc != (TextureContext *)NULL) {
+      if (tc != nullptr) {
         gsg->update_texture(tc, true);
+        if (view == 0 && qti->second != nullptr) {
+          qti->second->set_result(tc);
+        }
       }
     }
   }
@@ -1481,8 +1618,11 @@ begin_frame(GraphicsStateGuardianBase *gsg, Thread *current_thread) {
   for (qsi = _enqueued_shaders.begin();
        qsi != _enqueued_shaders.end();
        ++qsi) {
-    Shader *shader = (*qsi);
-    shader->prepare_now(this, gsg);
+    Shader *shader = qsi->first;
+    ShaderContext *sc = shader->prepare_now(this, gsg);
+    if (qti->second != nullptr) {
+      qti->second->set_result(sc);
+    }
   }
 
   _enqueued_shaders.clear();

+ 54 - 2
panda/src/gobj/preparedGraphicsObjects.h

@@ -29,6 +29,7 @@
 #include "reMutex.h"
 #include "bufferResidencyTracker.h"
 #include "adaptiveLru.h"
+#include "asyncFuture.h"
 
 class TextureContext;
 class SamplerContext;
@@ -38,6 +39,7 @@ class VertexBufferContext;
 class IndexBufferContext;
 class BufferContext;
 class GraphicsStateGuardianBase;
+class SavedContext;
 
 /**
  * A table of objects that are saved within the graphics context for reference
@@ -158,6 +160,56 @@ PUBLISHED:
                             GraphicsStateGuardianBase *gsg);
 
 public:
+  /**
+   * This is a handle to an enqueued object, from which the result can be
+   * obtained upon completion.
+   */
+  class EXPCL_PANDA_GOBJ EnqueuedObject FINAL : public AsyncFuture {
+  public:
+    EnqueuedObject(PreparedGraphicsObjects *pgo, TypedWritableReferenceCount *object);
+
+    TypedWritableReferenceCount *get_object() { return _object.p(); }
+    SavedContext *get_result() { return (SavedContext *)AsyncFuture::get_result(); }
+    void set_result(SavedContext *result);
+
+    void notify_removed();
+    virtual bool cancel() FINAL;
+
+  PUBLISHED:
+    MAKE_PROPERTY(object, get_object);
+
+  private:
+    PreparedGraphicsObjects *_pgo;
+    PT(TypedWritableReferenceCount) const _object;
+
+  public:
+    static TypeHandle get_class_type() {
+      return _type_handle;
+    }
+    static void init_type() {
+      AsyncFuture::init_type();
+      register_type(_type_handle, "EnqueuedObject",
+                    AsyncFuture::get_class_type());
+    }
+    virtual TypeHandle get_type() const {
+      return get_class_type();
+    }
+    virtual TypeHandle force_init_type() {init_type(); return get_class_type();}
+
+  private:
+    static TypeHandle _type_handle;
+  };
+
+  // These are variations of enqueue_xxx that also return a future.  They are
+  // used to implement texture->prepare(), etc.  They are only marked public
+  // so we don't have to define a whole bunch of friend classes.
+  PT(EnqueuedObject) enqueue_texture_future(Texture *tex);
+  //PT(EnqueuedObject) enqueue_geom_future(Geom *geom);
+  PT(EnqueuedObject) enqueue_shader_future(Shader *shader);
+  //PT(EnqueuedObject) enqueue_vertex_buffer_future(GeomVertexArrayData *data);
+  //PT(EnqueuedObject) enqueue_index_buffer_future(GeomPrimitive *data);
+  //PT(EnqueuedObject) enqueue_shader_buffer_future(ShaderBuffer *data);
+
   void begin_frame(GraphicsStateGuardianBase *gsg,
                    Thread *current_thread);
   void end_frame(Thread *current_thread);
@@ -167,11 +219,11 @@ private:
 
 private:
   typedef phash_set<TextureContext *, pointer_hash> Textures;
-  typedef phash_set< PT(Texture) > EnqueuedTextures;
+  typedef phash_map< PT(Texture), PT(EnqueuedObject) > EnqueuedTextures;
   typedef phash_set<GeomContext *, pointer_hash> Geoms;
   typedef phash_set< PT(Geom) > EnqueuedGeoms;
   typedef phash_set<ShaderContext *, pointer_hash> Shaders;
-  typedef phash_set< PT(Shader) > EnqueuedShaders;
+  typedef phash_map< PT(Shader), PT(EnqueuedObject) > EnqueuedShaders;
   typedef phash_set<BufferContext *, pointer_hash> Buffers;
   typedef phash_set< PT(GeomVertexArrayData) > EnqueuedVertexBuffers;
   typedef phash_set< PT(GeomPrimitive) > EnqueuedIndexBuffers;

+ 3 - 2
panda/src/gobj/shader.cxx

@@ -3407,9 +3407,10 @@ parse_eof() {
  * Use this function instead of prepare_now() to preload textures from a user
  * interface standpoint.
  */
-void Shader::
+PT(AsyncFuture) Shader::
 prepare(PreparedGraphicsObjects *prepared_objects) {
-  prepared_objects->enqueue_shader(this);
+  PT(PreparedGraphicsObjects::EnqueuedObject) obj = prepared_objects->enqueue_shader_future(this);
+  return obj.p();
 }
 
 /**

+ 2 - 1
panda/src/gobj/shader.h

@@ -32,6 +32,7 @@
 #include "pta_LVecBase3.h"
 #include "pta_LVecBase2.h"
 #include "epvector.h"
+#include "asyncFuture.h"
 
 #ifdef HAVE_CG
 // I don't want to include the Cg header file into panda as a whole.  Instead,
@@ -109,7 +110,7 @@ PUBLISHED:
   INLINE bool get_cache_compiled_shader() const;
   INLINE void set_cache_compiled_shader(bool flag);
 
-  void prepare(PreparedGraphicsObjects *prepared_objects);
+  PT(AsyncFuture) prepare(PreparedGraphicsObjects *prepared_objects);
   bool is_prepared(PreparedGraphicsObjects *prepared_objects) const;
   bool release(PreparedGraphicsObjects *prepared_objects);
   int release_all();

+ 3 - 2
panda/src/gobj/texture.cxx

@@ -1417,9 +1417,10 @@ peek() {
  * Use this function instead of prepare_now() to preload textures from a user
  * interface standpoint.
  */
-void Texture::
+PT(AsyncFuture) Texture::
 prepare(PreparedGraphicsObjects *prepared_objects) {
-  prepared_objects->enqueue_texture(this);
+  PT(PreparedGraphicsObjects::EnqueuedObject) obj = prepared_objects->enqueue_texture_future(this);
+  return obj.p();
 }
 
 /**

+ 2 - 1
panda/src/gobj/texture.h

@@ -43,6 +43,7 @@
 #include "colorSpace.h"
 #include "geomEnums.h"
 #include "bamCacheRecord.h"
+#include "asyncFuture.h"
 
 class PNMImage;
 class PfmFile;
@@ -522,7 +523,7 @@ PUBLISHED:
   MAKE_PROPERTY(auto_texture_scale, get_auto_texture_scale,
                                     set_auto_texture_scale);
 
-  void prepare(PreparedGraphicsObjects *prepared_objects);
+  PT(AsyncFuture) prepare(PreparedGraphicsObjects *prepared_objects);
   bool is_prepared(PreparedGraphicsObjects *prepared_objects) const;
   bool was_image_modified(PreparedGraphicsObjects *prepared_objects) const;
   size_t get_data_size_bytes(PreparedGraphicsObjects *prepared_objects) const;

+ 4 - 3
panda/src/gobj/textureReloadRequest.I

@@ -22,8 +22,7 @@ TextureReloadRequest(const string &name,
   AsyncTask(name),
   _pgo(pgo),
   _texture(texture),
-  _allow_compressed(allow_compressed),
-  _is_ready(false)
+  _allow_compressed(allow_compressed)
 {
   nassertv(_pgo != (PreparedGraphicsObjects *)NULL);
   nassertv(_texture != (Texture *)NULL);
@@ -58,8 +57,10 @@ get_allow_compressed() const {
 
 /**
  * Returns true if this request has completed, false if it is still pending.
+ * Equivalent to `req.done() and not req.cancelled()`.
+ * @see done()
  */
 INLINE bool TextureReloadRequest::
 is_ready() const {
-  return _is_ready;
+  return (FutureState)AtomicAdjust::get(_future_state) == FS_finished;
 }

+ 0 - 1
panda/src/gobj/textureReloadRequest.cxx

@@ -43,7 +43,6 @@ do_task() {
       _texture->prepare(_pgo);
     }
   }
-  _is_ready = true;
 
   // Don't continue the task; we're done.
   return DS_done;

+ 2 - 1
panda/src/gobj/textureReloadRequest.h

@@ -43,6 +43,8 @@ PUBLISHED:
   INLINE bool get_allow_compressed() const;
   INLINE bool is_ready() const;
 
+  MAKE_PROPERTY(texture, get_texture);
+
 protected:
   virtual DoneStatus do_task();
 
@@ -50,7 +52,6 @@ private:
   PT(PreparedGraphicsObjects) _pgo;
   PT(Texture) _texture;
   bool _allow_compressed;
-  bool _is_ready;
 
 public:
   static TypeHandle get_class_type() {

+ 5 - 4
panda/src/pgraph/geomNode.cxx

@@ -38,6 +38,7 @@
 #include "boundingBox.h"
 #include "boundingSphere.h"
 #include "config_mathutil.h"
+#include "preparedGraphicsObjects.h"
 
 
 bool allow_flatten_color = ConfigVariableBool
@@ -382,14 +383,14 @@ r_prepare_scene(GraphicsStateGuardianBase *gsg, const RenderState *node_state,
     int num_arrays = vdata_reader.get_num_arrays();
     for (int i = 0; i < num_arrays; ++i) {
       CPT(GeomVertexArrayData) array = vdata_reader.get_array(i);
-      ((GeomVertexArrayData *)array.p())->prepare(prepared_objects);
+      prepared_objects->enqueue_vertex_buffer((GeomVertexArrayData *)array.p());
     }
 
     // And also each of the index arrays.
     int num_primitives = geom->get_num_primitives();
     for (int i = 0; i < num_primitives; ++i) {
       CPT(GeomPrimitive) prim = geom->get_primitive(i);
-      ((GeomPrimitive *)prim.p())->prepare(prepared_objects);
+      prepared_objects->enqueue_index_buffer((GeomPrimitive *)prim.p());
     }
 
     if (munger->is_of_type(StateMunger::get_class_type())) {
@@ -405,7 +406,7 @@ r_prepare_scene(GraphicsStateGuardianBase *gsg, const RenderState *node_state,
         Texture *texture = ta->get_on_texture(ta->get_on_stage(i));
         // TODO: prepare the sampler states, if specified.
         if (texture != nullptr) {
-          texture->prepare(prepared_objects);
+          prepared_objects->enqueue_texture(texture);
         }
       }
     }
@@ -415,7 +416,7 @@ r_prepare_scene(GraphicsStateGuardianBase *gsg, const RenderState *node_state,
     if (geom_state->get_attrib(sa)) {
       Shader *shader = (Shader *)sa->get_shader();
       if (shader != nullptr) {
-        shader->prepare(prepared_objects);
+        prepared_objects->enqueue_shader(shader);
       }
       // TODO: prepare the shader inputs.
     }

+ 1 - 0
panda/src/pgraph/loader.I

@@ -135,6 +135,7 @@ stop_threads() {
 /**
  * Removes a pending asynchronous load request.  Returns true if successful,
  * false otherwise.
+ * @deprecated use task.cancel() to cancel the request instead.
  */
 INLINE bool Loader::
 remove(AsyncTask *task) {

+ 8 - 20
panda/src/pgraph/modelFlattenRequest.I

@@ -18,8 +18,7 @@
 INLINE ModelFlattenRequest::
 ModelFlattenRequest(PandaNode *orig) :
   AsyncTask(orig->get_name()),
-  _orig(orig),
-  _is_ready(false)
+  _orig(orig)
 {
 }
 
@@ -35,32 +34,21 @@ get_orig() const {
  * Returns true if this request has completed, false if it is still pending.
  * When this returns true, you may retrieve the model loaded by calling
  * result().
+ * Equivalent to `req.done() and not req.cancelled()`.
+ * @see done()
  */
 INLINE bool ModelFlattenRequest::
 is_ready() const {
-  return _is_ready;
+  return (FutureState)AtomicAdjust::get(_future_state) == FS_finished;
 }
 
 /**
  * Returns the flattened copy of the model.  It is an error to call this
- * unless is_ready() returns true.
+ * unless done() returns true.
+ * @deprecated Use result() instead.
  */
 INLINE PandaNode *ModelFlattenRequest::
 get_model() const {
-  nassertr(_is_ready, nullptr);
-  return _model;
-}
-
-/**
- * Returns the flattened copy of the model wrapped in a NodePath.  It is an
- * error to call this unless is_ready() returns true.
- */
-INLINE NodePath ModelFlattenRequest::
-result() const {
-  nassertr(_is_ready, NodePath::fail());
-  if (_model != nullptr) {
-    return NodePath(_model);
-  } else {
-    return NodePath::fail();
-  }
+  nassertr_always(done(), nullptr);
+  return (PandaNode *)_result;
 }

+ 2 - 2
panda/src/pgraph/modelFlattenRequest.cxx

@@ -35,8 +35,8 @@ do_task() {
     np.attach_new_node(_orig);
   }
   np.flatten_strong();
-  _model = np.get_child(0).node();
-  _is_ready = true;
+
+  set_result(np.get_child(0).node());
 
   // Don't continue the task; we're done.
   return DS_done;

+ 0 - 5
panda/src/pgraph/modelFlattenRequest.h

@@ -39,18 +39,13 @@ PUBLISHED:
   INLINE bool is_ready() const;
   INLINE PandaNode *get_model() const;
 
-  INLINE NodePath result() const;
-
   MAKE_PROPERTY(orig, get_orig);
-  MAKE_PROPERTY(ready, is_ready);
 
 protected:
   virtual DoneStatus do_task();
 
 private:
   PT(PandaNode) _orig;
-  bool _is_ready;
-  PT(PandaNode) _model;
 
 public:
   static TypeHandle get_class_type() {

+ 11 - 18
panda/src/pgraph/modelLoadRequest.I

@@ -38,31 +38,24 @@ get_loader() const {
 }
 
 /**
- * Returns the model that was loaded asynchronously as a NodePath, if any, or
- * the empty NodePath if there was an error.
- */
-INLINE NodePath ModelLoadRequest::
-result() const {
-  nassertr_always(_is_ready, NodePath::fail());
-  return NodePath(_model);
-}
-
-/**
- * Returns true if this request has completed, false if it is still pending.
- * When this returns true, you may retrieve the model loaded by calling
- * get_model().
+ * Returns true if this request has completed, false if it is still pending or
+ * if it has been cancelled.  When this returns true, you may retrieve the
+ * model loaded by calling get_model().
+ * Equivalent to `req.done() and not req.cancelled()`.
+ * @see done()
  */
 INLINE bool ModelLoadRequest::
 is_ready() const {
-  return _is_ready;
+  return (FutureState)AtomicAdjust::get(_future_state) == FS_finished;
 }
 
 /**
- * Returns the model that was loaded asynchronously, if any, or NULL if there
- * was an error.  It is an error to call this unless is_ready() returns true.
+ * Returns the model that was loaded asynchronously, if any, or null if there
+ * was an error.  It is an error to call this unless done() returns true.
+ * @deprecated Use result() instead.
  */
 INLINE PandaNode *ModelLoadRequest::
 get_model() const {
-  nassertr(_is_ready, NULL);
-  return _model;
+  nassertr_always(done(), nullptr);
+  return (PandaNode *)_result;
 }

+ 3 - 4
panda/src/pgraph/modelLoadRequest.cxx

@@ -28,8 +28,7 @@ ModelLoadRequest(const string &name,
   AsyncTask(name),
   _filename(filename),
   _options(options),
-  _loader(loader),
-  _is_ready(false)
+  _loader(loader)
 {
 }
 
@@ -43,8 +42,8 @@ do_task() {
     Thread::sleep(delay);
   }
 
-  _model = _loader->load_sync(_filename, _options);
-  _is_ready = true;
+  PT(PandaNode) model = _loader->load_sync(_filename, _options);
+  set_result(model);
 
   // Don't continue the task; we're done.
   return DS_done;

+ 0 - 5
panda/src/pgraph/modelLoadRequest.h

@@ -43,15 +43,12 @@ PUBLISHED:
   INLINE const LoaderOptions &get_options() const;
   INLINE Loader *get_loader() const;
 
-  INLINE NodePath result() const;
-
   INLINE bool is_ready() const;
   INLINE PandaNode *get_model() const;
 
   MAKE_PROPERTY(filename, get_filename);
   MAKE_PROPERTY(options, get_options);
   MAKE_PROPERTY(loader, get_loader);
-  MAKE_PROPERTY(ready, is_ready);
 
 protected:
   virtual DoneStatus do_task();
@@ -60,8 +57,6 @@ private:
   Filename _filename;
   LoaderOptions _options;
   PT(Loader) _loader;
-  bool _is_ready;
-  PT(PandaNode) _model;
 
 public:
   static TypeHandle get_class_type() {

+ 5 - 13
panda/src/pgraph/modelSaveRequest.I

@@ -49,28 +49,20 @@ get_loader() const {
  * Returns true if this request has completed, false if it is still pending.
  * When this returns true, you may retrieve the success flag with
  * get_success().
+ * Equivalent to `req.done() and not req.cancelled()`.
+ * @see done()
  */
 INLINE bool ModelSaveRequest::
 is_ready() const {
-  return _is_ready;
+  return (FutureState)AtomicAdjust::get(_future_state) == FS_finished;
 }
 
 /**
  * Returns the true if the model was saved successfully, false otherwise.  It
- * is an error to call this unless is_ready() returns true.
+ * is an error to call this unless done() returns true.
  */
 INLINE bool ModelSaveRequest::
 get_success() const {
-  nassertr(_is_ready, false);
-  return _success;
-}
-
-/**
- * Returns a boolean indicating whether the model saved correctly.  It is an
- * error to call this unless is_ready() returns true.
- */
-INLINE bool ModelSaveRequest::
-result() const {
-  nassertr(_is_ready, false);
+  nassertr_always(done(), false);
   return _success;
 }

+ 0 - 2
panda/src/pgraph/modelSaveRequest.cxx

@@ -30,7 +30,6 @@ ModelSaveRequest(const string &name,
   _options(options),
   _node(node),
   _loader(loader),
-  _is_ready(false),
   _success(false)
 {
 }
@@ -46,7 +45,6 @@ do_task() {
   }
 
   _success = _loader->save_sync(_filename, _options, _node);
-  _is_ready = true;
 
   // Don't continue the task; we're done.
   return DS_done;

+ 0 - 4
panda/src/pgraph/modelSaveRequest.h

@@ -46,13 +46,10 @@ PUBLISHED:
   INLINE bool is_ready() const;
   INLINE bool get_success() const;
 
-  INLINE bool result() const;
-
   MAKE_PROPERTY(filename, get_filename);
   MAKE_PROPERTY(options, get_options);
   MAKE_PROPERTY(node, get_node);
   MAKE_PROPERTY(loader, get_loader);
-  MAKE_PROPERTY(ready, is_ready);
 
 protected:
   virtual DoneStatus do_task();
@@ -62,7 +59,6 @@ private:
   LoaderOptions _options;
   PT(PandaNode) _node;
   PT(Loader) _loader;
-  bool _is_ready;
   bool _success;
 
 public:

+ 0 - 12
panda/src/pipeline/asyncTaskBase.I

@@ -1,12 +0,0 @@
-/**
- * PANDA 3D SOFTWARE
- * Copyright (c) Carnegie Mellon University.  All rights reserved.
- *
- * All use of this software is subject to the terms of the revised BSD
- * license.  You should have received a copy of this license along
- * with this source code in a file named "LICENSE."
- *
- * @file asyncTaskBase.I
- * @author drose
- * @date 2010-02-09
- */

+ 0 - 77
panda/src/pipeline/asyncTaskBase.cxx

@@ -1,77 +0,0 @@
-/**
- * PANDA 3D SOFTWARE
- * Copyright (c) Carnegie Mellon University.  All rights reserved.
- *
- * All use of this software is subject to the terms of the revised BSD
- * license.  You should have received a copy of this license along
- * with this source code in a file named "LICENSE."
- *
- * @file asyncTaskBase.cxx
- * @author drose
- * @date 2010-02-09
- */
-
-#include "asyncTaskBase.h"
-#include "thread.h"
-#include "atomicAdjust.h"
-
-TypeHandle AsyncTaskBase::_type_handle;
-
-/**
- *
- */
-AsyncTaskBase::
-AsyncTaskBase() {
-}
-
-/**
- *
- */
-AsyncTaskBase::
-~AsyncTaskBase() {
-}
-
-/**
- * Indicates that this task is now the current task running on the indicated
- * thread, presumably the current thread.
- */
-void AsyncTaskBase::
-record_task(Thread *current_thread) {
-  nassertv(current_thread->_current_task == NULL);
-
-  void *result = AtomicAdjust::compare_and_exchange_ptr
-    ((void * TVOLATILE &)current_thread->_current_task,
-     (void *)NULL, (void *)this);
-
-  // If the return value is other than NULL, someone else must have assigned
-  // the task first, in another thread.  That shouldn't be possible.
-
-  // But different versions of gcc appear to have problems compiling these
-  // assertions correctly.
-#ifndef __GNUC__
-  nassertv(result == NULL);
-  nassertv(current_thread->_current_task == this);
-#endif  // __GNUC__
-}
-
-/**
- * Indicates that this task is no longer running on the indicated thread.
- */
-void AsyncTaskBase::
-clear_task(Thread *current_thread) {
-  nassertv(current_thread->_current_task == this);
-
-  void *result = AtomicAdjust::compare_and_exchange_ptr
-    ((void * TVOLATILE &)current_thread->_current_task,
-     (void *)this, (void *)NULL);
-
-  // If the return value is other than this, someone else must have assigned
-  // the task first, in another thread.  That shouldn't be possible.
-
-  // But different versions of gcc appear to have problems compiling these
-  // assertions correctly.
-#ifndef __GNUC__
-  nassertv(result == this);
-  nassertv(current_thread->_current_task == NULL);
-#endif  // __GNUC__
-}

+ 0 - 61
panda/src/pipeline/asyncTaskBase.h

@@ -1,61 +0,0 @@
-/**
- * PANDA 3D SOFTWARE
- * Copyright (c) Carnegie Mellon University.  All rights reserved.
- *
- * All use of this software is subject to the terms of the revised BSD
- * license.  You should have received a copy of this license along
- * with this source code in a file named "LICENSE."
- *
- * @file asyncTaskBase.h
- * @author drose
- * @date 2010-02-09
- */
-
-#ifndef ASYNCTASKBASE_H
-#define ASYNCTASKBASE_H
-
-#include "pandabase.h"
-
-#include "typedReferenceCount.h"
-#include "namable.h"
-
-class Thread;
-
-/**
- * The abstract base class for AsyncTask.  This is defined here only so we can
- * store a pointer to the current task on the Thread.
- */
-class EXPCL_PANDA_PIPELINE AsyncTaskBase : public TypedReferenceCount, public Namable {
-protected:
-  AsyncTaskBase();
-public:
-  ALLOC_DELETED_CHAIN(AsyncTaskBase);
-
-PUBLISHED:
-  virtual ~AsyncTaskBase();
-
-protected:
-  void record_task(Thread *current_thread);
-  void clear_task(Thread *current_thread);
-
-public:
-  static TypeHandle get_class_type() {
-    return _type_handle;
-  }
-  static void init_type() {
-    TypedReferenceCount::init_type();
-    register_type(_type_handle, "AsyncTaskBase",
-                  TypedReferenceCount::get_class_type());
-  }
-  virtual TypeHandle get_type() const {
-    return get_class_type();
-  }
-  virtual TypeHandle force_init_type() {init_type(); return get_class_type();}
-
-private:
-  static TypeHandle _type_handle;
-};
-
-#include "asyncTaskBase.I"
-
-#endif

+ 0 - 2
panda/src/pipeline/config_pipeline.cxx

@@ -12,7 +12,6 @@
  */
 
 #include "config_pipeline.h"
-#include "asyncTaskBase.h"
 #include "mainThread.h"
 #include "externalThread.h"
 #include "genericThread.h"
@@ -67,7 +66,6 @@ init_libpipeline() {
   }
   initialized = true;
 
-  AsyncTaskBase::init_type();
   MainThread::init_type();
   ExternalThread::init_type();
   GenericThread::init_type();

+ 0 - 1
panda/src/pipeline/p3pipeline_composite1.cxx

@@ -1,4 +1,3 @@
-#include "asyncTaskBase.cxx"
 #include "conditionVar.cxx"
 #include "conditionVarDebug.cxx"
 #include "conditionVarDirect.cxx"

+ 1 - 1
panda/src/pipeline/thread.I

@@ -277,7 +277,7 @@ preempt() {
  * AsyncTaskManager), if any, or NULL if the thread is not currently servicing
  * a task.
  */
-INLINE AsyncTaskBase *Thread::
+INLINE AsyncTask *Thread::
 get_current_task() const {
   return _current_task;
 }

+ 4 - 4
panda/src/pipeline/thread.h

@@ -28,7 +28,7 @@ class ReMutex;
 class MutexDebug;
 class ConditionVarDebug;
 class ConditionVarFullDebug;
-class AsyncTaskBase;
+class AsyncTask;
 
 /**
  * A thread; that is, a lightweight process.  This is an abstract base class;
@@ -89,7 +89,7 @@ PUBLISHED:
   BLOCKING INLINE void join();
   INLINE void preempt();
 
-  INLINE AsyncTaskBase *get_current_task() const;
+  INLINE AsyncTask *get_current_task() const;
 
   INLINE void set_python_index(int index);
 
@@ -142,7 +142,7 @@ private:
   int _pipeline_stage;
   PStatsCallback *_pstats_callback;
   bool _joinable;
-  AsyncTaskBase *_current_task;
+  AsyncTask *_current_task;
 
   int _python_index;
 
@@ -184,7 +184,7 @@ private:
   friend class ThreadPosixImpl;
   friend class ThreadSimpleImpl;
   friend class MainThread;
-  friend class AsyncTaskBase;
+  friend class AsyncTask;
 };
 
 INLINE ostream &operator << (ostream &out, const Thread &thread);

+ 212 - 0
tests/event/test_futures.py

@@ -0,0 +1,212 @@
+from panda3d import core
+import pytest
+import threading
+import time
+import sys
+
+if sys.version_info >= (3,):
+    from concurrent.futures._base import TimeoutError, CancelledError
+else:
+    TimeoutError = Exception
+    CancelledError = Exception
+
+
+def test_future_cancelled():
+    fut = core.AsyncFuture()
+
+    assert not fut.done()
+    assert not fut.cancelled()
+    fut.cancel()
+    assert fut.done()
+    assert fut.cancelled()
+
+    with pytest.raises(CancelledError):
+        fut.result()
+
+    # Works more than once
+    with pytest.raises(CancelledError):
+        fut.result()
+
+
+def test_future_timeout():
+    fut = core.AsyncFuture()
+
+    with pytest.raises(TimeoutError):
+        fut.result(0.001)
+
+    # Works more than once
+    with pytest.raises(TimeoutError):
+        fut.result(0.001)
+
+
+def test_future_wait():
+    fut = core.AsyncFuture()
+
+    # Launch a thread to set the result value.
+    def thread_main():
+        time.sleep(0.001)
+        fut.set_result(None)
+
+    thread = threading.Thread(target=thread_main)
+    thread.start()
+
+    # Make sure it didn't sneakily already run the thread
+    assert not fut.done()
+
+    assert fut.result() is None
+
+    assert fut.done()
+    assert not fut.cancelled()
+    assert fut.result() is None
+
+
+def test_future_wait_cancel():
+    fut = core.AsyncFuture()
+
+    # Launch a thread to cancel the future.
+    def thread_main():
+        time.sleep(0.001)
+        fut.cancel()
+
+    thread = threading.Thread(target=thread_main)
+    thread.start()
+
+    # Make sure it didn't sneakily already run the thread
+    assert not fut.done()
+
+    with pytest.raises(CancelledError):
+        fut.result()
+
+    assert fut.done()
+    assert fut.cancelled()
+    with pytest.raises(CancelledError):
+        fut.result()
+
+
+def test_task_cancel():
+    task_mgr = core.AsyncTaskManager.get_global_ptr()
+    task = core.PythonTask(lambda task: task.done)
+    task_mgr.add(task)
+
+    assert not task.done()
+    task_mgr.remove(task)
+    assert task.done()
+    assert task.cancelled()
+
+    with pytest.raises(CancelledError):
+        task.result()
+
+
+def test_task_cancel_during_run():
+    task_mgr = core.AsyncTaskManager.get_global_ptr()
+    task_chain = task_mgr.make_task_chain("test_task_cancel_during_run")
+
+    def task_main(task):
+        task.remove()
+
+        # It won't yet be marked done until after it returns.
+        assert not task.done()
+        return task.done
+
+    task = core.PythonTask(task_main)
+    task.set_task_chain(task_chain.name)
+    task_mgr.add(task)
+    task_chain.wait_for_tasks()
+
+    assert task.done()
+    assert task.cancelled()
+    with pytest.raises(CancelledError):
+        task.result()
+
+
+def test_task_result():
+    task_mgr = core.AsyncTaskManager.get_global_ptr()
+    task_chain = task_mgr.make_task_chain("test_task_result")
+
+    def task_main(task):
+        task.set_result(42)
+
+        # It won't yet be marked done until after it returns.
+        assert not task.done()
+        return core.PythonTask.done
+
+    task = core.PythonTask(task_main)
+    task.set_task_chain(task_chain.name)
+    task_mgr.add(task)
+    task_chain.wait_for_tasks()
+
+    assert task.done()
+    assert not task.cancelled()
+    assert task.result() == 42
+
+
+def test_coro_exception():
+    task_mgr = core.AsyncTaskManager.get_global_ptr()
+    task_chain = task_mgr.make_task_chain("test_coro_exception")
+
+    def coro_main():
+        raise RuntimeError
+        yield None
+
+    task = core.PythonTask(coro_main())
+    task.set_task_chain(task_chain.name)
+    task_mgr.add(task)
+    task_chain.wait_for_tasks()
+
+    assert task.done()
+    assert not task.cancelled()
+    with pytest.raises(RuntimeError):
+        task.result()
+
+
+def test_event_future():
+    queue = core.EventQueue()
+    handler = core.EventHandler(queue)
+
+    fut = handler.get_future("test")
+
+    # If we ask again, we should get the same one.
+    assert handler.get_future("test") == fut
+
+    event = core.Event("test")
+    handler.dispatch_event(event)
+
+    assert fut.done()
+    assert not fut.cancelled()
+    assert fut.result() == event
+
+
+def test_event_future_cancel():
+    # This is a very strange thing to do, but it's possible, so let's make
+    # sure it gives defined behavior.
+    queue = core.EventQueue()
+    handler = core.EventHandler(queue)
+
+    fut = handler.get_future("test")
+    fut.cancel()
+
+    assert fut.done()
+    assert fut.cancelled()
+
+    event = core.Event("test")
+    handler.dispatch_event(event)
+
+    assert fut.done()
+    assert fut.cancelled()
+
+
+def test_event_future_cancel2():
+    queue = core.EventQueue()
+    handler = core.EventHandler(queue)
+
+    # Make sure we get a new future if we cancelled the first one.
+    fut = handler.get_future("test")
+    fut.cancel()
+    fut2 = handler.get_future("test")
+
+    assert fut != fut2
+    assert fut.done()
+    assert fut.cancelled()
+    assert not fut2.done()
+    assert not fut2.cancelled()
+