threadpool.H 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130
  1. /*
  2. * threadpool.H
  3. *
  4. * Created on: May 17, 2013
  5. * Author: xaxaxa
  6. */
  7. #ifndef THREADPOOL_H_
  8. #define THREADPOOL_H_
  9. #include <delegate.H>
  10. #include <sys/types.h>
  11. #include <pthread.h>
  12. #include <vector>
  13. #include <unistd.h>
  14. #include <errno.h>
  15. #include "cpoll.H"
  16. using namespace std;
  17. namespace CP
  18. {
  19. class ThreadPool;
  20. struct IDLETHREAD
  21. {
  22. ThreadPool* tp;
  23. Delegate<void()> func;
  24. pthread_t thread;
  25. int32_t efd;
  26. enum
  27. {
  28. none = 0, invoke, kill
  29. } op;
  30. IDLETHREAD() :
  31. efd(eventfd(0, EFD_SEMAPHORE)), op(none) {
  32. }
  33. ~IDLETHREAD() {
  34. close(efd);
  35. }
  36. void signal() {
  37. eventfd_t i = 1;
  38. eventfd_write(efd, i);
  39. }
  40. };
  41. void* idleThread(void* v);
  42. class ThreadPool: public RGC::Object
  43. {
  44. public:
  45. Delegate<void()> afterStart, beforeExit;
  46. vector<IDLETHREAD*> threads;
  47. PThreadMutex mutex;
  48. int32_t max;
  49. pthread_attr_t _attr;
  50. ThreadPool(int32_t max = 8) :
  51. max(max) {
  52. pthread_attr_init(&_attr);
  53. pthread_attr_setdetachstate(&_attr, PTHREAD_CREATE_DETACHED);
  54. }
  55. ~ThreadPool() {
  56. for (size_t i = 0; i < threads.size(); i++) {
  57. threads[i]->op = IDLETHREAD::kill;
  58. threads[i]->signal();
  59. }
  60. pthread_attr_destroy(&_attr);
  61. }
  62. IDLETHREAD* create() {
  63. IDLETHREAD* tmp = new IDLETHREAD();
  64. tmp->tp = this;
  65. if (pthread_create(&tmp->thread, &_attr, idleThread, (void*) tmp) == 0) return tmp;
  66. else {
  67. throw runtime_error(strerror(errno));
  68. }
  69. }
  70. IDLETHREAD* get() {
  71. size_t threadcount;
  72. {
  73. ScopeLock l(mutex);
  74. threadcount = threads.size();
  75. if (threadcount > 0) {
  76. IDLETHREAD* t = threads[threads.size() - 1];
  77. threads.resize(threads.size() - 1);
  78. return t;
  79. }
  80. }
  81. return create();
  82. }
  83. void put(IDLETHREAD* t) {
  84. bool b;
  85. {
  86. ScopeLock l(mutex);
  87. if ((b = ((int32_t) threads.size() < max))) threads.push_back(t);
  88. }
  89. if (!b) {
  90. t->op = IDLETHREAD::kill;
  91. t->signal();
  92. }
  93. }
  94. void invoke(const Delegate<void()>& func) {
  95. IDLETHREAD* t = get();
  96. t->func = func;
  97. t->op = IDLETHREAD::invoke;
  98. t->signal();
  99. }
  100. };
  101. void* idleThread(void* v) {
  102. IDLETHREAD* thr = (IDLETHREAD*) v;
  103. if (thr->tp->afterStart != nullptr) thr->tp->afterStart();
  104. while (true) {
  105. {
  106. eventfd_t i;
  107. eventfd_read(thr->efd, &i);
  108. }
  109. switch (thr->op) {
  110. case IDLETHREAD::invoke:
  111. try {
  112. thr->func();
  113. } catch (...) {
  114. }
  115. thr->tp->put(thr);
  116. break;
  117. case IDLETHREAD::kill:
  118. if (thr->tp->beforeExit != nullptr) thr->tp->beforeExit();
  119. delete thr;
  120. return NULL;
  121. default:
  122. break;
  123. }
  124. }
  125. }
  126. }
  127. #endif /* THREADPOOL_H_ */