cursor.cc 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223
  1. #include "cursor.h"
  2. #include "cursor_p.h"
  3. #include "exceptions.h"
  4. namespace RethinkDB {
  5. // for type completion, in order to forward declare with unique_ptr
  6. Cursor::Cursor(Cursor&&) = default;
  7. Cursor& Cursor::operator=(Cursor&&) = default;
  8. CursorPrivate::CursorPrivate(uint64_t token_, Connection *conn_)
  9. : single(false), no_more(false), index(0),
  10. token(token_), conn(conn_)
  11. { }
  12. CursorPrivate::CursorPrivate(uint64_t token_, Connection *conn_, Datum&& datum)
  13. : single(true), no_more(true), index(0), buffer(Array{std::move(datum)}),
  14. token(token_), conn(conn_)
  15. { }
  16. Cursor::Cursor(CursorPrivate *dd) : d(dd) {}
  17. Cursor::~Cursor() {
  18. try {
  19. if (d && d->conn) {
  20. close();
  21. }
  22. } catch ( ... ) {}
  23. }
  24. Datum& Cursor::next(double wait) const {
  25. if (!has_next(wait)) {
  26. throw Error("next: No more data");
  27. }
  28. return d->buffer[d->index++];
  29. }
  30. Datum& Cursor::peek(double wait) const {
  31. if (!has_next(wait)) {
  32. throw Error("next: No more data");
  33. }
  34. return d->buffer[d->index];
  35. }
  36. void Cursor::each(std::function<void(Datum&&)> f, double wait) const {
  37. while (has_next(wait)) {
  38. f(std::move(d->buffer[d->index++]));
  39. }
  40. }
  41. void CursorPrivate::convert_single() const {
  42. if (index != 0) {
  43. throw Error("Cursor: already consumed");
  44. }
  45. if (buffer.size() != 1) {
  46. throw Error("Cursor: invalid response from server");
  47. }
  48. if (!buffer[0].is_array()) {
  49. throw Error("Cursor: not an array");
  50. }
  51. buffer.swap(buffer[0].extract_array());
  52. single = false;
  53. }
  54. void CursorPrivate::clear_and_read_all() const {
  55. if (single) {
  56. convert_single();
  57. }
  58. if (index != 0) {
  59. buffer.erase(buffer.begin(), buffer.begin() + index);
  60. index = 0;
  61. }
  62. while (!no_more) {
  63. add_response(conn->d->wait_for_response(token, FOREVER));
  64. }
  65. }
  66. Array&& Cursor::to_array() && {
  67. d->clear_and_read_all();
  68. return std::move(d->buffer);
  69. }
  70. Array Cursor::to_array() const & {
  71. d->clear_and_read_all();
  72. return d->buffer;
  73. }
  74. Datum Cursor::to_datum() const & {
  75. if (d->single) {
  76. if (d->index != 0) {
  77. throw Error("to_datum: already consumed");
  78. }
  79. return d->buffer[0];
  80. }
  81. d->clear_and_read_all();
  82. return d->buffer;
  83. }
  84. Datum Cursor::to_datum() && {
  85. Datum ret((Nil()));
  86. if (d->single) {
  87. if (d->index != 0) {
  88. throw Error("to_datum: already consumed");
  89. }
  90. ret = std::move(d->buffer[0]);
  91. } else {
  92. d->clear_and_read_all();
  93. ret = std::move(d->buffer);
  94. }
  95. return ret;
  96. }
  97. void Cursor::close() const {
  98. d->conn->stop_query(d->token);
  99. d->no_more = true;
  100. }
  101. bool Cursor::has_next(double wait) const {
  102. if (d->single) {
  103. d->convert_single();
  104. }
  105. while (true) {
  106. if (d->index >= d->buffer.size()) {
  107. if (d->no_more) {
  108. return false;
  109. }
  110. d->add_response(d->conn->d->wait_for_response(d->token, wait));
  111. } else {
  112. return true;
  113. }
  114. }
  115. }
  116. bool Cursor::is_single() const {
  117. return d->single;
  118. }
  119. void CursorPrivate::add_results(Array&& results) const {
  120. if (index >= buffer.size()) {
  121. buffer = std::move(results);
  122. index = 0;
  123. } else {
  124. for (auto& it : results) {
  125. buffer.emplace_back(std::move(it));
  126. }
  127. }
  128. }
  129. void CursorPrivate::add_response(Response&& response) const {
  130. using RT = Protocol::Response::ResponseType;
  131. switch (response.type) {
  132. case RT::SUCCESS_SEQUENCE:
  133. add_results(std::move(response.result));
  134. no_more = true;
  135. break;
  136. case RT::SUCCESS_PARTIAL:
  137. conn->continue_query(token);
  138. add_results(std::move(response.result));
  139. break;
  140. case RT::SUCCESS_ATOM:
  141. add_results(std::move(response.result));
  142. single = true;
  143. no_more = true;
  144. break;
  145. case RT::SERVER_INFO:
  146. add_results(std::move(response.result));
  147. single = true;
  148. no_more = true;
  149. break;
  150. case RT::WAIT_COMPLETE:
  151. case RT::CLIENT_ERROR:
  152. case RT::COMPILE_ERROR:
  153. case RT::RUNTIME_ERROR:
  154. no_more = true;
  155. throw response.as_error();
  156. }
  157. }
  158. Cursor::iterator Cursor::begin() {
  159. return iterator(this);
  160. }
  161. Cursor::iterator Cursor::end() {
  162. return iterator(nullptr);
  163. }
  164. Cursor::iterator::iterator(Cursor* cursor_) : cursor(cursor_) {}
  165. Cursor::iterator& Cursor::iterator::operator++ () {
  166. if (cursor == nullptr) {
  167. throw Error("incrementing an exhausted Cursor iterator");
  168. }
  169. cursor->next();
  170. return *this;
  171. }
  172. Datum& Cursor::iterator::operator* () {
  173. if (cursor == nullptr) {
  174. throw Error("reading from empty Cursor iterator");
  175. }
  176. return cursor->peek();
  177. }
  178. bool Cursor::iterator::operator!= (const Cursor::iterator& other) const {
  179. if (cursor == other.cursor) {
  180. return false;
  181. }
  182. return !((cursor == nullptr && !other.cursor->has_next()) ||
  183. (other.cursor == nullptr && !cursor->has_next()));
  184. }
  185. }