|
@@ -316,12 +316,19 @@ void MessageQueue::flush() {
|
|
|
|
|
|
while (read_pos < buffer_end) {
|
|
|
|
|
|
- _THREAD_SAFE_UNLOCK_
|
|
|
-
|
|
|
//lock on each interation, so a call can re-add itself to the message queue
|
|
|
|
|
|
Message *message = (Message *)&buffer[read_pos];
|
|
|
|
|
|
+ uint32_t advance = sizeof(Message);
|
|
|
+ if ((message->type & FLAG_MASK) != TYPE_NOTIFICATION)
|
|
|
+ advance += sizeof(Variant) * message->args;
|
|
|
+
|
|
|
+ //pre-advance so this function is reentrant
|
|
|
+ read_pos += advance;
|
|
|
+
|
|
|
+ _THREAD_SAFE_UNLOCK_
|
|
|
+
|
|
|
Object *target = ObjectDB::get_instance(message->instance_ID);
|
|
|
|
|
|
if (target != NULL) {
|
|
@@ -357,13 +364,9 @@ void MessageQueue::flush() {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- uint32_t advance = sizeof(Message);
|
|
|
- if ((message->type & FLAG_MASK) != TYPE_NOTIFICATION)
|
|
|
- advance += sizeof(Variant) * message->args;
|
|
|
message->~Message();
|
|
|
|
|
|
_THREAD_SAFE_LOCK_
|
|
|
- read_pos += advance;
|
|
|
}
|
|
|
|
|
|
buffer_end = 0; // reset buffer
|