kqueue_reactor.ipp 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496
  1. //
  2. // detail/impl/kqueue_reactor.ipp
  3. // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
  4. //
  5. // Copyright (c) 2003-2015 Christopher M. Kohlhoff (chris at kohlhoff dot com)
  6. // Copyright (c) 2005 Stefan Arentz (stefan at soze dot com)
  7. //
  8. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  9. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  10. //
  11. #ifndef ASIO_DETAIL_IMPL_KQUEUE_REACTOR_IPP
  12. #define ASIO_DETAIL_IMPL_KQUEUE_REACTOR_IPP
  13. #if defined(_MSC_VER) && (_MSC_VER >= 1200)
  14. # pragma once
  15. #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
  16. #include "asio/detail/config.hpp"
  17. #if defined(ASIO_HAS_KQUEUE)
  18. #include "asio/detail/kqueue_reactor.hpp"
  19. #include "asio/detail/throw_error.hpp"
  20. #include "asio/error.hpp"
  21. #include "asio/detail/push_options.hpp"
  22. #if defined(__NetBSD__)
  23. # define ASIO_KQUEUE_EV_SET(ev, ident, filt, flags, fflags, data, udata) \
  24. EV_SET(ev, ident, filt, flags, fflags, data, \
  25. reinterpret_cast<intptr_t>(static_cast<void*>(udata)))
  26. #else
  27. # define ASIO_KQUEUE_EV_SET(ev, ident, filt, flags, fflags, data, udata) \
  28. EV_SET(ev, ident, filt, flags, fflags, data, udata)
  29. #endif
  30. namespace asio {
  31. namespace detail {
  32. kqueue_reactor::kqueue_reactor(asio::io_service& io_service)
  33. : asio::detail::service_base<kqueue_reactor>(io_service),
  34. io_service_(use_service<io_service_impl>(io_service)),
  35. mutex_(),
  36. kqueue_fd_(do_kqueue_create()),
  37. interrupter_(),
  38. shutdown_(false)
  39. {
  40. struct kevent events[1];
  41. ASIO_KQUEUE_EV_SET(&events[0], interrupter_.read_descriptor(),
  42. EVFILT_READ, EV_ADD, 0, 0, &interrupter_);
  43. if (::kevent(kqueue_fd_, events, 1, 0, 0, 0) == -1)
  44. {
  45. asio::error_code error(errno,
  46. asio::error::get_system_category());
  47. asio::detail::throw_error(error);
  48. }
  49. }
  50. kqueue_reactor::~kqueue_reactor()
  51. {
  52. close(kqueue_fd_);
  53. }
  54. void kqueue_reactor::shutdown_service()
  55. {
  56. mutex::scoped_lock lock(mutex_);
  57. shutdown_ = true;
  58. lock.unlock();
  59. op_queue<operation> ops;
  60. while (descriptor_state* state = registered_descriptors_.first())
  61. {
  62. for (int i = 0; i < max_ops; ++i)
  63. ops.push(state->op_queue_[i]);
  64. state->shutdown_ = true;
  65. registered_descriptors_.free(state);
  66. }
  67. timer_queues_.get_all_timers(ops);
  68. io_service_.abandon_operations(ops);
  69. }
  70. void kqueue_reactor::fork_service(asio::io_service::fork_event fork_ev)
  71. {
  72. if (fork_ev == asio::io_service::fork_child)
  73. {
  74. // The kqueue descriptor is automatically closed in the child.
  75. kqueue_fd_ = -1;
  76. kqueue_fd_ = do_kqueue_create();
  77. interrupter_.recreate();
  78. struct kevent events[2];
  79. ASIO_KQUEUE_EV_SET(&events[0], interrupter_.read_descriptor(),
  80. EVFILT_READ, EV_ADD, 0, 0, &interrupter_);
  81. if (::kevent(kqueue_fd_, events, 1, 0, 0, 0) == -1)
  82. {
  83. asio::error_code ec(errno,
  84. asio::error::get_system_category());
  85. asio::detail::throw_error(ec, "kqueue interrupter registration");
  86. }
  87. // Re-register all descriptors with kqueue.
  88. mutex::scoped_lock descriptors_lock(registered_descriptors_mutex_);
  89. for (descriptor_state* state = registered_descriptors_.first();
  90. state != 0; state = state->next_)
  91. {
  92. if (state->num_kevents_ > 0)
  93. {
  94. ASIO_KQUEUE_EV_SET(&events[0], state->descriptor_,
  95. EVFILT_READ, EV_ADD | EV_CLEAR, 0, 0, state);
  96. ASIO_KQUEUE_EV_SET(&events[1], state->descriptor_,
  97. EVFILT_WRITE, EV_ADD | EV_CLEAR, 0, 0, state);
  98. if (::kevent(kqueue_fd_, events, state->num_kevents_, 0, 0, 0) == -1)
  99. {
  100. asio::error_code ec(errno,
  101. asio::error::get_system_category());
  102. asio::detail::throw_error(ec, "kqueue re-registration");
  103. }
  104. }
  105. }
  106. }
  107. }
  108. void kqueue_reactor::init_task()
  109. {
  110. io_service_.init_task();
  111. }
  112. int kqueue_reactor::register_descriptor(socket_type descriptor,
  113. kqueue_reactor::per_descriptor_data& descriptor_data)
  114. {
  115. descriptor_data = allocate_descriptor_state();
  116. mutex::scoped_lock lock(descriptor_data->mutex_);
  117. descriptor_data->descriptor_ = descriptor;
  118. descriptor_data->num_kevents_ = 0;
  119. descriptor_data->shutdown_ = false;
  120. return 0;
  121. }
  122. int kqueue_reactor::register_internal_descriptor(
  123. int op_type, socket_type descriptor,
  124. kqueue_reactor::per_descriptor_data& descriptor_data, reactor_op* op)
  125. {
  126. descriptor_data = allocate_descriptor_state();
  127. mutex::scoped_lock lock(descriptor_data->mutex_);
  128. descriptor_data->descriptor_ = descriptor;
  129. descriptor_data->num_kevents_ = 1;
  130. descriptor_data->shutdown_ = false;
  131. descriptor_data->op_queue_[op_type].push(op);
  132. struct kevent events[1];
  133. ASIO_KQUEUE_EV_SET(&events[0], descriptor, EVFILT_READ,
  134. EV_ADD | EV_CLEAR, 0, 0, descriptor_data);
  135. if (::kevent(kqueue_fd_, events, 1, 0, 0, 0) == -1)
  136. return errno;
  137. return 0;
  138. }
  139. void kqueue_reactor::move_descriptor(socket_type,
  140. kqueue_reactor::per_descriptor_data& target_descriptor_data,
  141. kqueue_reactor::per_descriptor_data& source_descriptor_data)
  142. {
  143. target_descriptor_data = source_descriptor_data;
  144. source_descriptor_data = 0;
  145. }
  146. void kqueue_reactor::start_op(int op_type, socket_type descriptor,
  147. kqueue_reactor::per_descriptor_data& descriptor_data, reactor_op* op,
  148. bool is_continuation, bool allow_speculative)
  149. {
  150. if (!descriptor_data)
  151. {
  152. op->ec_ = asio::error::bad_descriptor;
  153. post_immediate_completion(op, is_continuation);
  154. return;
  155. }
  156. mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);
  157. if (descriptor_data->shutdown_)
  158. {
  159. post_immediate_completion(op, is_continuation);
  160. return;
  161. }
  162. if (descriptor_data->op_queue_[op_type].empty())
  163. {
  164. static const int num_kevents[max_ops] = { 1, 2, 1 };
  165. if (allow_speculative
  166. && (op_type != read_op
  167. || descriptor_data->op_queue_[except_op].empty()))
  168. {
  169. if (op->perform())
  170. {
  171. descriptor_lock.unlock();
  172. io_service_.post_immediate_completion(op, is_continuation);
  173. return;
  174. }
  175. if (descriptor_data->num_kevents_ < num_kevents[op_type])
  176. {
  177. struct kevent events[2];
  178. ASIO_KQUEUE_EV_SET(&events[0], descriptor, EVFILT_READ,
  179. EV_ADD | EV_CLEAR, 0, 0, descriptor_data);
  180. ASIO_KQUEUE_EV_SET(&events[1], descriptor, EVFILT_WRITE,
  181. EV_ADD | EV_CLEAR, 0, 0, descriptor_data);
  182. if (::kevent(kqueue_fd_, events, num_kevents[op_type], 0, 0, 0) != -1)
  183. {
  184. descriptor_data->num_kevents_ = num_kevents[op_type];
  185. }
  186. else
  187. {
  188. op->ec_ = asio::error_code(errno,
  189. asio::error::get_system_category());
  190. io_service_.post_immediate_completion(op, is_continuation);
  191. return;
  192. }
  193. }
  194. }
  195. else
  196. {
  197. if (descriptor_data->num_kevents_ < num_kevents[op_type])
  198. descriptor_data->num_kevents_ = num_kevents[op_type];
  199. struct kevent events[2];
  200. ASIO_KQUEUE_EV_SET(&events[0], descriptor, EVFILT_READ,
  201. EV_ADD | EV_CLEAR, 0, 0, descriptor_data);
  202. ASIO_KQUEUE_EV_SET(&events[1], descriptor, EVFILT_WRITE,
  203. EV_ADD | EV_CLEAR, 0, 0, descriptor_data);
  204. ::kevent(kqueue_fd_, events, descriptor_data->num_kevents_, 0, 0, 0);
  205. }
  206. }
  207. descriptor_data->op_queue_[op_type].push(op);
  208. io_service_.work_started();
  209. }
  210. void kqueue_reactor::cancel_ops(socket_type,
  211. kqueue_reactor::per_descriptor_data& descriptor_data)
  212. {
  213. if (!descriptor_data)
  214. return;
  215. mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);
  216. op_queue<operation> ops;
  217. for (int i = 0; i < max_ops; ++i)
  218. {
  219. while (reactor_op* op = descriptor_data->op_queue_[i].front())
  220. {
  221. op->ec_ = asio::error::operation_aborted;
  222. descriptor_data->op_queue_[i].pop();
  223. ops.push(op);
  224. }
  225. }
  226. descriptor_lock.unlock();
  227. io_service_.post_deferred_completions(ops);
  228. }
  229. void kqueue_reactor::deregister_descriptor(socket_type descriptor,
  230. kqueue_reactor::per_descriptor_data& descriptor_data, bool closing)
  231. {
  232. if (!descriptor_data)
  233. return;
  234. mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);
  235. if (!descriptor_data->shutdown_)
  236. {
  237. if (closing)
  238. {
  239. // The descriptor will be automatically removed from the kqueue when it
  240. // is closed.
  241. }
  242. else
  243. {
  244. struct kevent events[2];
  245. ASIO_KQUEUE_EV_SET(&events[0], descriptor,
  246. EVFILT_READ, EV_DELETE, 0, 0, 0);
  247. ASIO_KQUEUE_EV_SET(&events[1], descriptor,
  248. EVFILT_WRITE, EV_DELETE, 0, 0, 0);
  249. ::kevent(kqueue_fd_, events, descriptor_data->num_kevents_, 0, 0, 0);
  250. }
  251. op_queue<operation> ops;
  252. for (int i = 0; i < max_ops; ++i)
  253. {
  254. while (reactor_op* op = descriptor_data->op_queue_[i].front())
  255. {
  256. op->ec_ = asio::error::operation_aborted;
  257. descriptor_data->op_queue_[i].pop();
  258. ops.push(op);
  259. }
  260. }
  261. descriptor_data->descriptor_ = -1;
  262. descriptor_data->shutdown_ = true;
  263. descriptor_lock.unlock();
  264. free_descriptor_state(descriptor_data);
  265. descriptor_data = 0;
  266. io_service_.post_deferred_completions(ops);
  267. }
  268. }
  269. void kqueue_reactor::deregister_internal_descriptor(socket_type descriptor,
  270. kqueue_reactor::per_descriptor_data& descriptor_data)
  271. {
  272. if (!descriptor_data)
  273. return;
  274. mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);
  275. if (!descriptor_data->shutdown_)
  276. {
  277. struct kevent events[2];
  278. ASIO_KQUEUE_EV_SET(&events[0], descriptor,
  279. EVFILT_READ, EV_DELETE, 0, 0, 0);
  280. ASIO_KQUEUE_EV_SET(&events[1], descriptor,
  281. EVFILT_WRITE, EV_DELETE, 0, 0, 0);
  282. ::kevent(kqueue_fd_, events, descriptor_data->num_kevents_, 0, 0, 0);
  283. op_queue<operation> ops;
  284. for (int i = 0; i < max_ops; ++i)
  285. ops.push(descriptor_data->op_queue_[i]);
  286. descriptor_data->descriptor_ = -1;
  287. descriptor_data->shutdown_ = true;
  288. descriptor_lock.unlock();
  289. free_descriptor_state(descriptor_data);
  290. descriptor_data = 0;
  291. }
  292. }
  293. void kqueue_reactor::run(bool block, op_queue<operation>& ops)
  294. {
  295. mutex::scoped_lock lock(mutex_);
  296. // Determine how long to block while waiting for events.
  297. timespec timeout_buf = { 0, 0 };
  298. timespec* timeout = block ? get_timeout(timeout_buf) : &timeout_buf;
  299. lock.unlock();
  300. // Block on the kqueue descriptor.
  301. struct kevent events[128];
  302. int num_events = kevent(kqueue_fd_, 0, 0, events, 128, timeout);
  303. // Dispatch the waiting events.
  304. for (int i = 0; i < num_events; ++i)
  305. {
  306. void* ptr = reinterpret_cast<void*>(events[i].udata);
  307. if (ptr == &interrupter_)
  308. {
  309. interrupter_.reset();
  310. }
  311. else
  312. {
  313. descriptor_state* descriptor_data = static_cast<descriptor_state*>(ptr);
  314. mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);
  315. if (events[i].filter == EVFILT_WRITE
  316. && descriptor_data->num_kevents_ == 2
  317. && descriptor_data->op_queue_[write_op].empty())
  318. {
  319. // Some descriptor types, like serial ports, don't seem to support
  320. // EV_CLEAR with EVFILT_WRITE. Since we have no pending write
  321. // operations we'll remove the EVFILT_WRITE registration here so that
  322. // we don't end up in a tight spin.
  323. struct kevent delete_events[1];
  324. ASIO_KQUEUE_EV_SET(&delete_events[0],
  325. descriptor_data->descriptor_, EVFILT_WRITE, EV_DELETE, 0, 0, 0);
  326. ::kevent(kqueue_fd_, delete_events, 1, 0, 0, 0);
  327. descriptor_data->num_kevents_ = 1;
  328. }
  329. // Exception operations must be processed first to ensure that any
  330. // out-of-band data is read before normal data.
  331. #if defined(__NetBSD__)
  332. static const unsigned int filter[max_ops] =
  333. #else
  334. static const int filter[max_ops] =
  335. #endif
  336. { EVFILT_READ, EVFILT_WRITE, EVFILT_READ };
  337. for (int j = max_ops - 1; j >= 0; --j)
  338. {
  339. if (events[i].filter == filter[j])
  340. {
  341. if (j != except_op || events[i].flags & EV_OOBAND)
  342. {
  343. while (reactor_op* op = descriptor_data->op_queue_[j].front())
  344. {
  345. if (events[i].flags & EV_ERROR)
  346. {
  347. op->ec_ = asio::error_code(
  348. static_cast<int>(events[i].data),
  349. asio::error::get_system_category());
  350. descriptor_data->op_queue_[j].pop();
  351. ops.push(op);
  352. }
  353. if (op->perform())
  354. {
  355. descriptor_data->op_queue_[j].pop();
  356. ops.push(op);
  357. }
  358. else
  359. break;
  360. }
  361. }
  362. }
  363. }
  364. }
  365. }
  366. lock.lock();
  367. timer_queues_.get_ready_timers(ops);
  368. }
  369. void kqueue_reactor::interrupt()
  370. {
  371. interrupter_.interrupt();
  372. }
  373. int kqueue_reactor::do_kqueue_create()
  374. {
  375. int fd = ::kqueue();
  376. if (fd == -1)
  377. {
  378. asio::error_code ec(errno,
  379. asio::error::get_system_category());
  380. asio::detail::throw_error(ec, "kqueue");
  381. }
  382. return fd;
  383. }
  384. kqueue_reactor::descriptor_state* kqueue_reactor::allocate_descriptor_state()
  385. {
  386. mutex::scoped_lock descriptors_lock(registered_descriptors_mutex_);
  387. return registered_descriptors_.alloc();
  388. }
  389. void kqueue_reactor::free_descriptor_state(kqueue_reactor::descriptor_state* s)
  390. {
  391. mutex::scoped_lock descriptors_lock(registered_descriptors_mutex_);
  392. registered_descriptors_.free(s);
  393. }
  394. void kqueue_reactor::do_add_timer_queue(timer_queue_base& queue)
  395. {
  396. mutex::scoped_lock lock(mutex_);
  397. timer_queues_.insert(&queue);
  398. }
  399. void kqueue_reactor::do_remove_timer_queue(timer_queue_base& queue)
  400. {
  401. mutex::scoped_lock lock(mutex_);
  402. timer_queues_.erase(&queue);
  403. }
  404. timespec* kqueue_reactor::get_timeout(timespec& ts)
  405. {
  406. // By default we will wait no longer than 5 minutes. This will ensure that
  407. // any changes to the system clock are detected after no longer than this.
  408. long usec = timer_queues_.wait_duration_usec(5 * 60 * 1000 * 1000);
  409. ts.tv_sec = usec / 1000000;
  410. ts.tv_nsec = (usec % 1000000) * 1000;
  411. return &ts;
  412. }
  413. } // namespace detail
  414. } // namespace asio
  415. #undef ASIO_KQUEUE_EV_SET
  416. #include "asio/detail/pop_options.hpp"
  417. #endif // defined(ASIO_HAS_KQUEUE)
  418. #endif // ASIO_DETAIL_IMPL_KQUEUE_REACTOR_IPP