|
|
@@ -19,28 +19,52 @@
|
|
|
#include "py_panda.h"
|
|
|
|
|
|
#include "pythonThread.h"
|
|
|
+#include "asyncTaskManager.h"
|
|
|
|
|
|
TypeHandle PythonTask::_type_handle;
|
|
|
|
|
|
#ifndef CPPPARSER
|
|
|
extern struct Dtool_PyTypedObject Dtool_TypedReferenceCount;
|
|
|
+extern struct Dtool_PyTypedObject Dtool_AsyncTask;
|
|
|
+extern struct Dtool_PyTypedObject Dtool_PythonTask;
|
|
|
#endif
|
|
|
|
|
|
/**
|
|
|
*
|
|
|
*/
|
|
|
PythonTask::
|
|
|
-PythonTask(PyObject *function, const string &name) :
|
|
|
- AsyncTask(name)
|
|
|
-{
|
|
|
- _function = NULL;
|
|
|
- _args = NULL;
|
|
|
- _upon_death = NULL;
|
|
|
- _owner = NULL;
|
|
|
- _registered_to_owner = false;
|
|
|
- _generator = NULL;
|
|
|
+PythonTask(PyObject *func_or_coro, const string &name) :
|
|
|
+ AsyncTask(name),
|
|
|
+ _function(nullptr),
|
|
|
+ _args(nullptr),
|
|
|
+ _upon_death(nullptr),
|
|
|
+ _owner(nullptr),
|
|
|
+ _registered_to_owner(false),
|
|
|
+ _exception(nullptr),
|
|
|
+ _exc_value(nullptr),
|
|
|
+ _exc_traceback(nullptr),
|
|
|
+ _generator(nullptr),
|
|
|
+ _future_done(nullptr),
|
|
|
+ _retrieved_exception(false) {
|
|
|
+
|
|
|
+ nassertv(func_or_coro != nullptr);
|
|
|
+ if (func_or_coro == Py_None || PyCallable_Check(func_or_coro)) {
|
|
|
+ _function = func_or_coro;
|
|
|
+ Py_INCREF(_function);
|
|
|
+#if PY_VERSION_HEX >= 0x03050000
|
|
|
+ } else if (PyCoro_CheckExact(func_or_coro)) {
|
|
|
+ // We also allow passing in a coroutine, because why not.
|
|
|
+ _generator = func_or_coro;
|
|
|
+ Py_INCREF(_generator);
|
|
|
+#endif
|
|
|
+ } else if (PyGen_CheckExact(func_or_coro)) {
|
|
|
+ // Something emulating a coroutine.
|
|
|
+ _generator = func_or_coro;
|
|
|
+ Py_INCREF(_generator);
|
|
|
+ } else {
|
|
|
+ nassert_raise("Invalid function passed to PythonTask");
|
|
|
+ }
|
|
|
|
|
|
- set_function(function);
|
|
|
set_args(Py_None, true);
|
|
|
set_upon_death(Py_None);
|
|
|
set_owner(Py_None);
|
|
|
@@ -60,9 +84,24 @@ PythonTask(PyObject *function, const string &name) :
|
|
|
*/
|
|
|
PythonTask::
|
|
|
~PythonTask() {
|
|
|
- Py_DECREF(_function);
|
|
|
+#ifndef NDEBUG
|
|
|
+ // If the coroutine threw an exception, and there was no opportunity to
|
|
|
+ // handle it, let the user know.
|
|
|
+ if (_exception != nullptr && !_retrieved_exception) {
|
|
|
+ task_cat.error()
|
|
|
+ << *this << " exception was never retrieved:\n";
|
|
|
+ PyErr_Restore(_exception, _exc_value, _exc_traceback);
|
|
|
+ PyErr_Print();
|
|
|
+ PyErr_Restore(nullptr, nullptr, nullptr);
|
|
|
+ }
|
|
|
+#endif
|
|
|
+
|
|
|
+ Py_XDECREF(_function);
|
|
|
Py_DECREF(_args);
|
|
|
Py_DECREF(__dict__);
|
|
|
+ Py_XDECREF(_exception);
|
|
|
+ Py_XDECREF(_exc_value);
|
|
|
+ Py_XDECREF(_exc_traceback);
|
|
|
Py_XDECREF(_generator);
|
|
|
Py_XDECREF(_owner);
|
|
|
Py_XDECREF(_upon_death);
|
|
|
@@ -83,15 +122,6 @@ set_function(PyObject *function) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-/**
|
|
|
- * Returns the function that is called when the task runs.
|
|
|
- */
|
|
|
-PyObject *PythonTask::
|
|
|
-get_function() {
|
|
|
- Py_INCREF(_function);
|
|
|
- return _function;
|
|
|
-}
|
|
|
-
|
|
|
/**
|
|
|
* Replaces the argument list that is passed to the task function. The
|
|
|
* parameter should be a tuple or list of arguments, or None to indicate the
|
|
|
@@ -166,15 +196,6 @@ set_upon_death(PyObject *upon_death) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-/**
|
|
|
- * Returns the function that is called when the task finishes.
|
|
|
- */
|
|
|
-PyObject *PythonTask::
|
|
|
-get_upon_death() {
|
|
|
- Py_INCREF(_upon_death);
|
|
|
- return _upon_death;
|
|
|
-}
|
|
|
-
|
|
|
/**
|
|
|
* Specifies a Python object that serves as the "owner" for the task. This
|
|
|
* owner object must have two methods: _addTask() and _clearTask(), which will
|
|
|
@@ -212,12 +233,87 @@ set_owner(PyObject *owner) {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Returns the "owner" object. See set_owner().
|
|
|
+ * Returns the result of this task's execution, as set by set_result() within
|
|
|
+ * the task or returned from a coroutine added to the task manager. If an
|
|
|
+ * exception occurred within this task, it is raised instead.
|
|
|
*/
|
|
|
PyObject *PythonTask::
|
|
|
-get_owner() {
|
|
|
- Py_INCREF(_owner);
|
|
|
- return _owner;
|
|
|
+result() const {
|
|
|
+ nassertr(!is_alive(), nullptr);
|
|
|
+
|
|
|
+ if (_exception == nullptr) {
|
|
|
+ // The result of the call is stored in _exc_value.
|
|
|
+ Py_XINCREF(_exc_value);
|
|
|
+ return _exc_value;
|
|
|
+ } else {
|
|
|
+ _retrieved_exception = true;
|
|
|
+ Py_INCREF(_exception);
|
|
|
+ Py_XINCREF(_exc_value);
|
|
|
+ Py_XINCREF(_exc_traceback);
|
|
|
+ PyErr_Restore(_exception, _exc_value, _exc_traceback);
|
|
|
+ return nullptr;
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+/**
|
|
|
+ * If an exception occurred during execution of this task, returns it. This
|
|
|
+ * is only set if this task returned a coroutine or generator.
|
|
|
+ */
|
|
|
+/*PyObject *PythonTask::
|
|
|
+exception() const {
|
|
|
+ if (_exception == nullptr) {
|
|
|
+ Py_INCREF(Py_None);
|
|
|
+ return Py_None;
|
|
|
+ } else if (_exc_value == nullptr || _exc_value == Py_None) {
|
|
|
+ return _PyObject_CallNoArg(_exception);
|
|
|
+ } else if (PyTuple_Check(_exc_value)) {
|
|
|
+ return PyObject_Call(_exception, _exc_value, nullptr);
|
|
|
+ } else {
|
|
|
+ return PyObject_CallFunctionObjArgs(_exception, _exc_value, nullptr);
|
|
|
+ }
|
|
|
+}*/
|
|
|
+
|
|
|
+/**
|
|
|
+ * Returns an iterator that continuously yields an awaitable until the task
|
|
|
+ * has finished.
|
|
|
+ */
|
|
|
+PyObject *PythonTask::
|
|
|
+__await__(PyObject *self) {
|
|
|
+ Dtool_GeneratorWrapper *gen;
|
|
|
+ gen = (Dtool_GeneratorWrapper *)PyType_GenericAlloc(&Dtool_GeneratorWrapper_Type, 0);
|
|
|
+ if (gen != nullptr) {
|
|
|
+ Py_INCREF(self);
|
|
|
+ gen->_base._self = self;
|
|
|
+ gen->_iternext_func = &gen_next;
|
|
|
+ }
|
|
|
+ return (PyObject *)gen;
|
|
|
+}
|
|
|
+
|
|
|
+/**
|
|
|
+ * Yields continuously until a task has finished.
|
|
|
+ */
|
|
|
+PyObject *PythonTask::
|
|
|
+gen_next(PyObject *self) {
|
|
|
+ const PythonTask *task = nullptr;
|
|
|
+ if (!Dtool_Call_ExtractThisPointer(self, Dtool_PythonTask, (void **)&task)) {
|
|
|
+ return nullptr;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (task->is_alive()) {
|
|
|
+ Py_INCREF(self);
|
|
|
+ return self;
|
|
|
+ } else if (task->_exception != nullptr) {
|
|
|
+ task->_retrieved_exception = true;
|
|
|
+ Py_INCREF(task->_exception);
|
|
|
+ Py_INCREF(task->_exc_value);
|
|
|
+ Py_INCREF(task->_exc_traceback);
|
|
|
+ PyErr_Restore(task->_exception, task->_exc_value, task->_exc_traceback);
|
|
|
+ return nullptr;
|
|
|
+ } else {
|
|
|
+ // The result of the call is stored in _exc_value.
|
|
|
+ PyErr_SetObject(PyExc_StopIteration, task->_exc_value);
|
|
|
+ return nullptr;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
@@ -396,16 +492,30 @@ do_task() {
|
|
|
*/
|
|
|
AsyncTask::DoneStatus PythonTask::
|
|
|
do_python_task() {
|
|
|
- PyObject *result = NULL;
|
|
|
+ PyObject *result = nullptr;
|
|
|
+
|
|
|
+ // Are we waiting for a future to finish?
|
|
|
+ if (_future_done != nullptr) {
|
|
|
+ PyObject *is_done = PyObject_CallObject(_future_done, nullptr);
|
|
|
+ if (!PyObject_IsTrue(is_done)) {
|
|
|
+ // Nope, ask again next frame.
|
|
|
+ Py_DECREF(is_done);
|
|
|
+ return DS_cont;
|
|
|
+ }
|
|
|
+ Py_DECREF(is_done);
|
|
|
+ Py_DECREF(_future_done);
|
|
|
+ _future_done = nullptr;
|
|
|
+ }
|
|
|
|
|
|
- if (_generator == (PyObject *)NULL) {
|
|
|
+ if (_generator == nullptr) {
|
|
|
// We are calling the function directly.
|
|
|
+ nassertr(_function != nullptr, DS_interrupt);
|
|
|
+
|
|
|
PyObject *args = get_args();
|
|
|
result = PythonThread::call_python_func(_function, args);
|
|
|
Py_DECREF(args);
|
|
|
|
|
|
-#ifdef PyGen_Check
|
|
|
- if (result != (PyObject *)NULL && PyGen_Check(result)) {
|
|
|
+ if (result != nullptr && PyGen_Check(result)) {
|
|
|
// The function has yielded a generator. We will call into that
|
|
|
// henceforth, instead of calling the function from the top again.
|
|
|
if (task_cat.is_debug()) {
|
|
|
@@ -423,30 +533,166 @@ do_python_task() {
|
|
|
Py_DECREF(str);
|
|
|
}
|
|
|
_generator = result;
|
|
|
- result = NULL;
|
|
|
- }
|
|
|
+ result = nullptr;
|
|
|
+
|
|
|
+#if PY_VERSION_HEX >= 0x03050000
|
|
|
+ } else if (result != nullptr && Py_TYPE(result)->tp_as_async != nullptr) {
|
|
|
+ // The function yielded a coroutine, or something of the sort.
|
|
|
+ if (task_cat.is_debug()) {
|
|
|
+ PyObject *str = PyObject_ASCII(_function);
|
|
|
+ PyObject *str2 = PyObject_ASCII(result);
|
|
|
+ task_cat.debug()
|
|
|
+ << PyUnicode_AsUTF8(str) << " in " << *this
|
|
|
+ << " yielded an awaitable: " << PyUnicode_AsUTF8(str2) << "\n";
|
|
|
+ Py_DECREF(str);
|
|
|
+ Py_DECREF(str2);
|
|
|
+ }
|
|
|
+ if (PyCoro_CheckExact(result)) {
|
|
|
+ // If a coroutine, am_await is possible but senseless, since we can
|
|
|
+ // just call send(None) on the coroutine itself.
|
|
|
+ _generator = result;
|
|
|
+ } else {
|
|
|
+ unaryfunc await = Py_TYPE(result)->tp_as_async->am_await;
|
|
|
+ _generator = await(result);
|
|
|
+ Py_DECREF(result);
|
|
|
+ }
|
|
|
+ result = nullptr;
|
|
|
#endif
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
- if (_generator != (PyObject *)NULL) {
|
|
|
- // We are calling a generator.
|
|
|
- PyObject *func = PyObject_GetAttrString(_generator, "next");
|
|
|
- nassertr(func != (PyObject *)NULL, DS_interrupt);
|
|
|
-
|
|
|
- result = PyObject_CallObject(func, NULL);
|
|
|
+ if (_generator != nullptr) {
|
|
|
+ // We are calling a generator. Use "send" rather than PyIter_Next since
|
|
|
+ // we need to be able to read the value from a StopIteration exception.
|
|
|
+ PyObject *func = PyObject_GetAttrString(_generator, "send");
|
|
|
+ nassertr(func != nullptr, DS_interrupt);
|
|
|
+ result = PyObject_CallFunctionObjArgs(func, Py_None, nullptr);
|
|
|
Py_DECREF(func);
|
|
|
|
|
|
- if (result == (PyObject *)NULL && PyErr_Occurred() &&
|
|
|
- PyErr_ExceptionMatches(PyExc_StopIteration)) {
|
|
|
- // "Catch" StopIteration and treat it like DS_done.
|
|
|
- PyErr_Clear();
|
|
|
+ if (result == nullptr) {
|
|
|
+ // An error happened. If StopIteration, that indicates the task has
|
|
|
+ // returned. Otherwise, we need to save it so that it can be re-raised
|
|
|
+ // in the function that awaited this task.
|
|
|
Py_DECREF(_generator);
|
|
|
- _generator = NULL;
|
|
|
- return DS_done;
|
|
|
+ _generator = nullptr;
|
|
|
+
|
|
|
+#if PY_VERSION_HEX >= 0x03030000
|
|
|
+ if (_PyGen_FetchStopIterationValue(&result) == 0) {
|
|
|
+#else
|
|
|
+ if (PyErr_ExceptionMatches(PyExc_StopIteration)) {
|
|
|
+ result = Py_None;
|
|
|
+ Py_INCREF(result);
|
|
|
+#endif
|
|
|
+ PyErr_Restore(nullptr, nullptr, nullptr);
|
|
|
+
|
|
|
+ // If we passed a coroutine into the task, eg. something like:
|
|
|
+ // taskMgr.add(my_async_function())
|
|
|
+ // then we cannot rerun the task, so the return value is always
|
|
|
+ // assumed to be DS_done. Instead, we pass the return value to the
|
|
|
+ // result of the `await` expression.
|
|
|
+ if (_function == nullptr) {
|
|
|
+ if (task_cat.is_debug()) {
|
|
|
+ task_cat.debug()
|
|
|
+ << *this << " received StopIteration from coroutine.\n";
|
|
|
+ }
|
|
|
+ // Store the result in _exc_value because that's not used anyway.
|
|
|
+ Py_XDECREF(_exc_value);
|
|
|
+ _exc_value = result;
|
|
|
+ return DS_done;
|
|
|
+ }
|
|
|
+ } else if (_function == nullptr) {
|
|
|
+ // We got an exception. If this is a scheduled coroutine, we will
|
|
|
+ // keep it and instead throw it into whatever 'awaits' this task.
|
|
|
+ // Otherwise, fall through and handle it the regular way.
|
|
|
+ Py_XDECREF(_exception);
|
|
|
+ Py_XDECREF(_exc_value);
|
|
|
+ Py_XDECREF(_exc_traceback);
|
|
|
+ PyErr_Fetch(&_exception, &_exc_value, &_exc_traceback);
|
|
|
+ _retrieved_exception = false;
|
|
|
+
|
|
|
+ if (task_cat.is_debug()) {
|
|
|
+ if (_exception != nullptr && Py_TYPE(_exception) == &PyType_Type) {
|
|
|
+ task_cat.debug()
|
|
|
+ << *this << " received " << ((PyTypeObject *)_exception)->tp_name << " from coroutine.\n";
|
|
|
+ } else {
|
|
|
+ task_cat.debug()
|
|
|
+ << *this << " received exception from coroutine.\n";
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // Tell the task chain we want to kill ourselves. It doesn't really
|
|
|
+ // matter what we return if we set S_servicing_removed. If we don't
|
|
|
+ // set it, however, it will think this was a clean exit.
|
|
|
+ _manager->_lock.acquire();
|
|
|
+ _state = S_servicing_removed;
|
|
|
+ _manager->_lock.release();
|
|
|
+ return DS_interrupt;
|
|
|
+ }
|
|
|
+
|
|
|
+ } else if (DtoolCanThisBeAPandaInstance(result)) {
|
|
|
+ // We are waiting for a task to finish.
|
|
|
+ void *ptr = ((Dtool_PyInstDef *)result)->_My_Type->_Dtool_UpcastInterface(result, &Dtool_AsyncTask);
|
|
|
+ if (ptr != nullptr) {
|
|
|
+ // Suspend execution of this task until this other task has completed.
|
|
|
+ AsyncTask *task = (AsyncTask *)ptr;
|
|
|
+ AsyncTaskManager *manager = task->_manager;
|
|
|
+ nassertr(manager != nullptr, DS_interrupt);
|
|
|
+ nassertr(manager == _manager, DS_interrupt);
|
|
|
+ manager->_lock.acquire();
|
|
|
+ if (task != (AsyncTask *)this) {
|
|
|
+ if (task->is_alive()) {
|
|
|
+ if (task_cat.is_debug()) {
|
|
|
+ task_cat.debug()
|
|
|
+ << *this << " is now awaiting <" << *task << ">.\n";
|
|
|
+ }
|
|
|
+ task->_waiting_tasks.push_back(this);
|
|
|
+ } else {
|
|
|
+ // The task is already done. Continue at next opportunity.
|
|
|
+ Py_DECREF(result);
|
|
|
+ manager->_lock.release();
|
|
|
+ return DS_cont;
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ // This is an error. If we wanted to be fancier we could also
|
|
|
+ // detect deeper circular dependencies.
|
|
|
+ task_cat.error()
|
|
|
+ << *this << " cannot await itself\n";
|
|
|
+ }
|
|
|
+ task->_manager->_lock.release();
|
|
|
+ Py_DECREF(result);
|
|
|
+ return DS_await;
|
|
|
+ }
|
|
|
+
|
|
|
+ } else {
|
|
|
+ // We are waiting for a future to finish. We currently implement this
|
|
|
+ // by simply checking every frame whether the future is done.
|
|
|
+ PyObject *check = PyObject_GetAttrString(result, "_asyncio_future_blocking");
|
|
|
+ if (check != nullptr && check != Py_None) {
|
|
|
+ Py_DECREF(check);
|
|
|
+ // Next frame, check whether this future is done.
|
|
|
+ _future_done = PyObject_GetAttrString(result, "done");
|
|
|
+ if (_future_done == nullptr || !PyCallable_Check(_future_done)) {
|
|
|
+ task_cat.error()
|
|
|
+ << "future.done is not callable\n";
|
|
|
+ return DS_interrupt;
|
|
|
+ }
|
|
|
+#if PY_MAJOR_VERSION >= 3
|
|
|
+ if (task_cat.is_debug()) {
|
|
|
+ PyObject *str = PyObject_ASCII(result);
|
|
|
+ task_cat.debug()
|
|
|
+ << *this << " is now awaiting " << PyUnicode_AsUTF8(str) << ".\n";
|
|
|
+ Py_DECREF(str);
|
|
|
+ }
|
|
|
+#endif
|
|
|
+ Py_DECREF(result);
|
|
|
+ return DS_cont;
|
|
|
+ }
|
|
|
+ PyErr_Clear();
|
|
|
+ Py_XDECREF(check);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- if (result == (PyObject *)NULL) {
|
|
|
+ if (result == nullptr) {
|
|
|
if (PyErr_Occurred() && PyErr_ExceptionMatches(PyExc_SystemExit)) {
|
|
|
// Don't print an error message for SystemExit. Or rather, make it a
|
|
|
// debug message.
|