ThreadMessages.cpp 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207
  1. #include "ThreadMessages.h"
  2. #include "log.h"
  3. namespace oxygine
  4. {
  5. #if 0
  6. #define LOGDN(...) log::messageln(__VA_ARGS__)
  7. #define LOGD(...) log::message(__VA_ARGS__)
  8. #else
  9. #define LOGDN(...) ((void)0)
  10. #define LOGD(...) ((void)0)
  11. #endif
  12. ThreadMessages::ThreadMessages():_id(0), _waitReplyID(0)
  13. {
  14. pthread_cond_init(&_cond, 0);
  15. pthread_mutex_init(&_mutex, 0);
  16. }
  17. ThreadMessages::~ThreadMessages()
  18. {
  19. pthread_mutex_destroy(&_mutex);
  20. pthread_cond_destroy(&_cond);
  21. }
  22. void ThreadMessages::wait()
  23. {
  24. MutexPthreadLock lock(_mutex);
  25. _replyLast(0);
  26. while (_events.empty())
  27. pthread_cond_wait(&_cond, &_mutex);
  28. }
  29. void mywait(pthread_cond_t *cond, pthread_mutex_t *mutex)
  30. {
  31. #ifdef __S3E__
  32. timespec ts;
  33. clock_gettime(CLOCK_REALTIME, &ts);
  34. ts.tv_sec += 1;
  35. //ts.tv_nsec += 5000;
  36. pthread_cond_timedwait(cond, mutex, &ts);
  37. #else
  38. pthread_cond_wait(cond, mutex);
  39. #endif
  40. }
  41. void ThreadMessages::get(message &ev)
  42. {
  43. MutexPthreadLock lock(_mutex);
  44. LOGDN("ThreadMessages::get");
  45. _replyLast(0);
  46. while (_events.empty())
  47. {
  48. LOGDN("ThreadMessages::get pthread_cond_wait");
  49. mywait(&_cond, &_mutex);
  50. }
  51. ev = _events.front();
  52. _events.erase(_events.begin());
  53. _last = ev;
  54. LOGDN("ThreadMessages::get received msgid=%d id=%d", _last.msgid, _last._id);
  55. }
  56. /*
  57. ThreadMessages::messages &ThreadMessages::pause(pthread_mutex_t &mutex)
  58. {
  59. mutex = _mutex;
  60. pthread_mutex_lock(&_mutex);
  61. return _events;
  62. }
  63. void ThreadMessages::resume()
  64. {
  65. pthread_mutex_unlock(&_mutex);
  66. }
  67. */
  68. bool ThreadMessages::empty()
  69. {
  70. MutexPthreadLock lock(_mutex);
  71. return _events.empty();
  72. }
  73. void ThreadMessages::clear()
  74. {
  75. MutexPthreadLock lock(_mutex);
  76. _events.resize(0);
  77. }
  78. bool ThreadMessages::peek(message &ev, bool del)
  79. {
  80. bool has = false;
  81. MutexPthreadLock lock(_mutex);
  82. _replyLast(0);
  83. if (!_events.empty())
  84. {
  85. ev = _events.front();
  86. if (del)
  87. _events.erase(_events.begin());
  88. has = true;
  89. _last = ev;
  90. }
  91. return has;
  92. }
  93. void ThreadMessages::_replyLast(void *val)
  94. {
  95. LOGDN("ThreadMessages::_replyLast");
  96. if (!_last._replied)
  97. {
  98. LOGDN("ThreadMessages::_replyLast not replied yet");
  99. _last._replied = true;
  100. _last._result = val;
  101. if (_last.cb)
  102. _last.cb(_last);
  103. LOGDN("ThreadMessages::_replyLast pre _waitReplyID = %d, _last._id = %d, _last.msgid=%d", _waitReplyID, _last._id, _last.msgid);
  104. if (_waitReplyID && _last._id == _waitReplyID)
  105. {
  106. LOGDN("ThreadMessages::_replyLast pthread_cond_signal _waitReplyID = %d, _last._id = %d, _last.msgid=%d", _waitReplyID, _last._id, _last.msgid);
  107. pthread_cond_signal(&_cond);
  108. }
  109. else
  110. {
  111. LOGDN("ThreadMessages::_replyLast else _waitReplyID = %d, _last._id = %d, _last.msgid=%d", _waitReplyID, _last._id, _last.msgid);
  112. }
  113. }
  114. }
  115. void ThreadMessages::reply(void *val)
  116. {
  117. MutexPthreadLock lock(_mutex);
  118. _replyLast(val);
  119. }
  120. void *ThreadMessages::send(int msgid, void *arg1, void *arg2)
  121. {
  122. message ev;
  123. ev.msgid = msgid;
  124. ev.arg1 = arg1;
  125. ev.arg2 = arg2;
  126. MutexPthreadLock lock(_mutex);
  127. ev._id = ++_id;
  128. _waitReplyID = ev._id;
  129. _events.push_back(ev);
  130. LOGDN("ThreadMessages::send msgid=%d pthread_cond_signal.. _waitReplyID=%d", msgid, _waitReplyID);
  131. pthread_cond_signal(&_cond);
  132. if (_last._replied)
  133. {
  134. LOGDN("ThreadMessages::send msgid=%d already replied", msgid);
  135. }
  136. while (!_last._replied)
  137. {
  138. LOGDN("ThreadMessages::send msgid=%d waiting reply...", msgid);
  139. mywait(&_cond, &_mutex);
  140. }
  141. LOGDN("ThreadMessages::send msgid=%d done", msgid);
  142. _waitReplyID = 0;
  143. _last._replied = false;
  144. return _last._result;
  145. }
  146. void ThreadMessages::sendCallback(int msgid, void *arg1, void *arg2, callback cb, void *cbData)
  147. {
  148. message ev;
  149. ev.msgid = msgid;
  150. ev.arg1 = arg1;
  151. ev.arg2 = arg2;
  152. ev.cb = cb;
  153. ev.cbData = cbData;
  154. MutexPthreadLock lock(_mutex);
  155. ev._id = ++_id;
  156. _events.push_back(ev);
  157. pthread_cond_signal(&_cond);
  158. }
  159. void ThreadMessages::post(int msgid, void *arg1, void *arg2)
  160. {
  161. message ev;
  162. ev.msgid = msgid;
  163. ev.arg1 = arg1;
  164. ev.arg2 = arg2;
  165. MutexPthreadLock lock(_mutex);
  166. ev._id = ++_id;
  167. _events.push_back(ev);
  168. pthread_cond_signal(&_cond);
  169. }
  170. }