task_io_service.ipp 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474
  1. //
  2. // detail/impl/task_io_service.ipp
  3. // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
  4. //
  5. // Copyright (c) 2003-2015 Christopher M. Kohlhoff (chris at kohlhoff dot com)
  6. //
  7. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  8. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  9. //
  10. #ifndef ASIO_DETAIL_IMPL_TASK_IO_SERVICE_IPP
  11. #define ASIO_DETAIL_IMPL_TASK_IO_SERVICE_IPP
  12. #if defined(_MSC_VER) && (_MSC_VER >= 1200)
  13. # pragma once
  14. #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
  15. #include "asio/detail/config.hpp"
  16. #if !defined(ASIO_HAS_IOCP)
  17. #include "asio/detail/event.hpp"
  18. #include "asio/detail/limits.hpp"
  19. #include "asio/detail/reactor.hpp"
  20. #include "asio/detail/task_io_service.hpp"
  21. #include "asio/detail/task_io_service_thread_info.hpp"
  22. #include "asio/detail/push_options.hpp"
  23. namespace asio {
  24. namespace detail {
  25. struct task_io_service::task_cleanup
  26. {
  27. ~task_cleanup()
  28. {
  29. if (this_thread_->private_outstanding_work > 0)
  30. {
  31. asio::detail::increment(
  32. task_io_service_->outstanding_work_,
  33. this_thread_->private_outstanding_work);
  34. }
  35. this_thread_->private_outstanding_work = 0;
  36. // Enqueue the completed operations and reinsert the task at the end of
  37. // the operation queue.
  38. lock_->lock();
  39. task_io_service_->task_interrupted_ = true;
  40. task_io_service_->op_queue_.push(this_thread_->private_op_queue);
  41. task_io_service_->op_queue_.push(&task_io_service_->task_operation_);
  42. }
  43. task_io_service* task_io_service_;
  44. mutex::scoped_lock* lock_;
  45. thread_info* this_thread_;
  46. };
  47. struct task_io_service::work_cleanup
  48. {
  49. ~work_cleanup()
  50. {
  51. if (this_thread_->private_outstanding_work > 1)
  52. {
  53. asio::detail::increment(
  54. task_io_service_->outstanding_work_,
  55. this_thread_->private_outstanding_work - 1);
  56. }
  57. else if (this_thread_->private_outstanding_work < 1)
  58. {
  59. task_io_service_->work_finished();
  60. }
  61. this_thread_->private_outstanding_work = 0;
  62. #if defined(ASIO_HAS_THREADS)
  63. if (!this_thread_->private_op_queue.empty())
  64. {
  65. lock_->lock();
  66. task_io_service_->op_queue_.push(this_thread_->private_op_queue);
  67. }
  68. #endif // defined(ASIO_HAS_THREADS)
  69. }
  70. task_io_service* task_io_service_;
  71. mutex::scoped_lock* lock_;
  72. thread_info* this_thread_;
  73. };
  74. task_io_service::task_io_service(
  75. asio::io_service& io_service, std::size_t concurrency_hint)
  76. : asio::detail::service_base<task_io_service>(io_service),
  77. one_thread_(concurrency_hint == 1),
  78. mutex_(),
  79. task_(0),
  80. task_interrupted_(true),
  81. outstanding_work_(0),
  82. stopped_(false),
  83. shutdown_(false)
  84. {
  85. ASIO_HANDLER_TRACKING_INIT;
  86. }
  87. void task_io_service::shutdown_service()
  88. {
  89. mutex::scoped_lock lock(mutex_);
  90. shutdown_ = true;
  91. lock.unlock();
  92. // Destroy handler objects.
  93. while (!op_queue_.empty())
  94. {
  95. operation* o = op_queue_.front();
  96. op_queue_.pop();
  97. if (o != &task_operation_)
  98. o->destroy();
  99. }
  100. // Reset to initial state.
  101. task_ = 0;
  102. }
  103. void task_io_service::init_task()
  104. {
  105. mutex::scoped_lock lock(mutex_);
  106. if (!shutdown_ && !task_)
  107. {
  108. task_ = &use_service<reactor>(this->get_io_service());
  109. op_queue_.push(&task_operation_);
  110. wake_one_thread_and_unlock(lock);
  111. }
  112. }
  113. std::size_t task_io_service::run(asio::error_code& ec)
  114. {
  115. ec = asio::error_code();
  116. if (outstanding_work_ == 0)
  117. {
  118. stop();
  119. return 0;
  120. }
  121. thread_info this_thread;
  122. this_thread.private_outstanding_work = 0;
  123. thread_call_stack::context ctx(this, this_thread);
  124. mutex::scoped_lock lock(mutex_);
  125. std::size_t n = 0;
  126. for (; do_run_one(lock, this_thread, ec); lock.lock())
  127. if (n != (std::numeric_limits<std::size_t>::max)())
  128. ++n;
  129. return n;
  130. }
  131. std::size_t task_io_service::run_one(asio::error_code& ec)
  132. {
  133. ec = asio::error_code();
  134. if (outstanding_work_ == 0)
  135. {
  136. stop();
  137. return 0;
  138. }
  139. thread_info this_thread;
  140. this_thread.private_outstanding_work = 0;
  141. thread_call_stack::context ctx(this, this_thread);
  142. mutex::scoped_lock lock(mutex_);
  143. return do_run_one(lock, this_thread, ec);
  144. }
  145. std::size_t task_io_service::poll(asio::error_code& ec)
  146. {
  147. ec = asio::error_code();
  148. if (outstanding_work_ == 0)
  149. {
  150. stop();
  151. return 0;
  152. }
  153. thread_info this_thread;
  154. this_thread.private_outstanding_work = 0;
  155. thread_call_stack::context ctx(this, this_thread);
  156. mutex::scoped_lock lock(mutex_);
  157. #if defined(ASIO_HAS_THREADS)
  158. // We want to support nested calls to poll() and poll_one(), so any handlers
  159. // that are already on a thread-private queue need to be put on to the main
  160. // queue now.
  161. if (one_thread_)
  162. if (thread_info* outer_thread_info = ctx.next_by_key())
  163. op_queue_.push(outer_thread_info->private_op_queue);
  164. #endif // defined(ASIO_HAS_THREADS)
  165. std::size_t n = 0;
  166. for (; do_poll_one(lock, this_thread, ec); lock.lock())
  167. if (n != (std::numeric_limits<std::size_t>::max)())
  168. ++n;
  169. return n;
  170. }
  171. std::size_t task_io_service::poll_one(asio::error_code& ec)
  172. {
  173. ec = asio::error_code();
  174. if (outstanding_work_ == 0)
  175. {
  176. stop();
  177. return 0;
  178. }
  179. thread_info this_thread;
  180. this_thread.private_outstanding_work = 0;
  181. thread_call_stack::context ctx(this, this_thread);
  182. mutex::scoped_lock lock(mutex_);
  183. #if defined(ASIO_HAS_THREADS)
  184. // We want to support nested calls to poll() and poll_one(), so any handlers
  185. // that are already on a thread-private queue need to be put on to the main
  186. // queue now.
  187. if (one_thread_)
  188. if (thread_info* outer_thread_info = ctx.next_by_key())
  189. op_queue_.push(outer_thread_info->private_op_queue);
  190. #endif // defined(ASIO_HAS_THREADS)
  191. return do_poll_one(lock, this_thread, ec);
  192. }
  193. void task_io_service::stop()
  194. {
  195. mutex::scoped_lock lock(mutex_);
  196. stop_all_threads(lock);
  197. }
  198. bool task_io_service::stopped() const
  199. {
  200. mutex::scoped_lock lock(mutex_);
  201. return stopped_;
  202. }
  203. void task_io_service::reset()
  204. {
  205. mutex::scoped_lock lock(mutex_);
  206. stopped_ = false;
  207. }
  208. void task_io_service::post_immediate_completion(
  209. task_io_service::operation* op, bool is_continuation)
  210. {
  211. #if defined(ASIO_HAS_THREADS)
  212. if (one_thread_ || is_continuation)
  213. {
  214. if (thread_info* this_thread = thread_call_stack::contains(this))
  215. {
  216. ++this_thread->private_outstanding_work;
  217. this_thread->private_op_queue.push(op);
  218. return;
  219. }
  220. }
  221. #else // defined(ASIO_HAS_THREADS)
  222. (void)is_continuation;
  223. #endif // defined(ASIO_HAS_THREADS)
  224. work_started();
  225. mutex::scoped_lock lock(mutex_);
  226. op_queue_.push(op);
  227. wake_one_thread_and_unlock(lock);
  228. }
  229. void task_io_service::post_deferred_completion(task_io_service::operation* op)
  230. {
  231. #if defined(ASIO_HAS_THREADS)
  232. if (one_thread_)
  233. {
  234. if (thread_info* this_thread = thread_call_stack::contains(this))
  235. {
  236. this_thread->private_op_queue.push(op);
  237. return;
  238. }
  239. }
  240. #endif // defined(ASIO_HAS_THREADS)
  241. mutex::scoped_lock lock(mutex_);
  242. op_queue_.push(op);
  243. wake_one_thread_and_unlock(lock);
  244. }
  245. void task_io_service::post_deferred_completions(
  246. op_queue<task_io_service::operation>& ops)
  247. {
  248. if (!ops.empty())
  249. {
  250. #if defined(ASIO_HAS_THREADS)
  251. if (one_thread_)
  252. {
  253. if (thread_info* this_thread = thread_call_stack::contains(this))
  254. {
  255. this_thread->private_op_queue.push(ops);
  256. return;
  257. }
  258. }
  259. #endif // defined(ASIO_HAS_THREADS)
  260. mutex::scoped_lock lock(mutex_);
  261. op_queue_.push(ops);
  262. wake_one_thread_and_unlock(lock);
  263. }
  264. }
  265. void task_io_service::do_dispatch(
  266. task_io_service::operation* op)
  267. {
  268. work_started();
  269. mutex::scoped_lock lock(mutex_);
  270. op_queue_.push(op);
  271. wake_one_thread_and_unlock(lock);
  272. }
  273. void task_io_service::abandon_operations(
  274. op_queue<task_io_service::operation>& ops)
  275. {
  276. op_queue<task_io_service::operation> ops2;
  277. ops2.push(ops);
  278. }
  279. std::size_t task_io_service::do_run_one(mutex::scoped_lock& lock,
  280. task_io_service::thread_info& this_thread,
  281. const asio::error_code& ec)
  282. {
  283. while (!stopped_)
  284. {
  285. if (!op_queue_.empty())
  286. {
  287. // Prepare to execute first handler from queue.
  288. operation* o = op_queue_.front();
  289. op_queue_.pop();
  290. bool more_handlers = (!op_queue_.empty());
  291. if (o == &task_operation_)
  292. {
  293. task_interrupted_ = more_handlers;
  294. if (more_handlers && !one_thread_)
  295. wakeup_event_.unlock_and_signal_one(lock);
  296. else
  297. lock.unlock();
  298. task_cleanup on_exit = { this, &lock, &this_thread };
  299. (void)on_exit;
  300. // Run the task. May throw an exception. Only block if the operation
  301. // queue is empty and we're not polling, otherwise we want to return
  302. // as soon as possible.
  303. task_->run(!more_handlers, this_thread.private_op_queue);
  304. }
  305. else
  306. {
  307. std::size_t task_result = o->task_result_;
  308. if (more_handlers && !one_thread_)
  309. wake_one_thread_and_unlock(lock);
  310. else
  311. lock.unlock();
  312. // Ensure the count of outstanding work is decremented on block exit.
  313. work_cleanup on_exit = { this, &lock, &this_thread };
  314. (void)on_exit;
  315. // Complete the operation. May throw an exception. Deletes the object.
  316. o->complete(*this, ec, task_result);
  317. return 1;
  318. }
  319. }
  320. else
  321. {
  322. wakeup_event_.clear(lock);
  323. wakeup_event_.wait(lock);
  324. }
  325. }
  326. return 0;
  327. }
  328. std::size_t task_io_service::do_poll_one(mutex::scoped_lock& lock,
  329. task_io_service::thread_info& this_thread,
  330. const asio::error_code& ec)
  331. {
  332. if (stopped_)
  333. return 0;
  334. operation* o = op_queue_.front();
  335. if (o == &task_operation_)
  336. {
  337. op_queue_.pop();
  338. lock.unlock();
  339. {
  340. task_cleanup c = { this, &lock, &this_thread };
  341. (void)c;
  342. // Run the task. May throw an exception. Only block if the operation
  343. // queue is empty and we're not polling, otherwise we want to return
  344. // as soon as possible.
  345. task_->run(false, this_thread.private_op_queue);
  346. }
  347. o = op_queue_.front();
  348. if (o == &task_operation_)
  349. {
  350. wakeup_event_.maybe_unlock_and_signal_one(lock);
  351. return 0;
  352. }
  353. }
  354. if (o == 0)
  355. return 0;
  356. op_queue_.pop();
  357. bool more_handlers = (!op_queue_.empty());
  358. std::size_t task_result = o->task_result_;
  359. if (more_handlers && !one_thread_)
  360. wake_one_thread_and_unlock(lock);
  361. else
  362. lock.unlock();
  363. // Ensure the count of outstanding work is decremented on block exit.
  364. work_cleanup on_exit = { this, &lock, &this_thread };
  365. (void)on_exit;
  366. // Complete the operation. May throw an exception. Deletes the object.
  367. o->complete(*this, ec, task_result);
  368. return 1;
  369. }
  370. void task_io_service::stop_all_threads(
  371. mutex::scoped_lock& lock)
  372. {
  373. stopped_ = true;
  374. wakeup_event_.signal_all(lock);
  375. if (!task_interrupted_ && task_)
  376. {
  377. task_interrupted_ = true;
  378. task_->interrupt();
  379. }
  380. }
  381. void task_io_service::wake_one_thread_and_unlock(
  382. mutex::scoped_lock& lock)
  383. {
  384. if (!wakeup_event_.maybe_unlock_and_signal_one(lock))
  385. {
  386. if (!task_interrupted_ && task_)
  387. {
  388. task_interrupted_ = true;
  389. task_->interrupt();
  390. }
  391. lock.unlock();
  392. }
  393. }
  394. } // namespace detail
  395. } // namespace asio
  396. #include "asio/detail/pop_options.hpp"
  397. #endif // !defined(ASIO_HAS_IOCP)
  398. #endif // ASIO_DETAIL_IMPL_TASK_IO_SERVICE_IPP