123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223 |
- #include "cursor.h"
- #include "cursor_p.h"
- #include "exceptions.h"
- namespace RethinkDB {
- // for type completion, in order to forward declare with unique_ptr
- Cursor::Cursor(Cursor&&) = default;
- Cursor& Cursor::operator=(Cursor&&) = default;
- CursorPrivate::CursorPrivate(uint64_t token_, Connection *conn_)
- : single(false), no_more(false), index(0),
- token(token_), conn(conn_)
- { }
- CursorPrivate::CursorPrivate(uint64_t token_, Connection *conn_, Datum&& datum)
- : single(true), no_more(true), index(0), buffer(Array{std::move(datum)}),
- token(token_), conn(conn_)
- { }
- Cursor::Cursor(CursorPrivate *dd) : d(dd) {}
- Cursor::~Cursor() {
- try {
- if (d && d->conn) {
- close();
- }
- } catch ( ... ) {}
- }
- Datum& Cursor::next(double wait) const {
- if (!has_next(wait)) {
- throw Error("next: No more data");
- }
- return d->buffer[d->index++];
- }
- Datum& Cursor::peek(double wait) const {
- if (!has_next(wait)) {
- throw Error("next: No more data");
- }
- return d->buffer[d->index];
- }
- void Cursor::each(std::function<void(Datum&&)> f, double wait) const {
- while (has_next(wait)) {
- f(std::move(d->buffer[d->index++]));
- }
- }
- void CursorPrivate::convert_single() const {
- if (index != 0) {
- throw Error("Cursor: already consumed");
- }
- if (buffer.size() != 1) {
- throw Error("Cursor: invalid response from server");
- }
- if (!buffer[0].is_array()) {
- throw Error("Cursor: not an array");
- }
- buffer.swap(buffer[0].extract_array());
- single = false;
- }
- void CursorPrivate::clear_and_read_all() const {
- if (single) {
- convert_single();
- }
- if (index != 0) {
- buffer.erase(buffer.begin(), buffer.begin() + index);
- index = 0;
- }
- while (!no_more) {
- add_response(conn->d->wait_for_response(token, FOREVER));
- }
- }
- Array&& Cursor::to_array() && {
- d->clear_and_read_all();
- return std::move(d->buffer);
- }
- Array Cursor::to_array() const & {
- d->clear_and_read_all();
- return d->buffer;
- }
- Datum Cursor::to_datum() const & {
- if (d->single) {
- if (d->index != 0) {
- throw Error("to_datum: already consumed");
- }
- return d->buffer[0];
- }
- d->clear_and_read_all();
- return d->buffer;
- }
- Datum Cursor::to_datum() && {
- Datum ret((Nil()));
- if (d->single) {
- if (d->index != 0) {
- throw Error("to_datum: already consumed");
- }
- ret = std::move(d->buffer[0]);
- } else {
- d->clear_and_read_all();
- ret = std::move(d->buffer);
- }
- return ret;
- }
- void Cursor::close() const {
- d->conn->stop_query(d->token);
- d->no_more = true;
- }
- bool Cursor::has_next(double wait) const {
- if (d->single) {
- d->convert_single();
- }
- while (true) {
- if (d->index >= d->buffer.size()) {
- if (d->no_more) {
- return false;
- }
- d->add_response(d->conn->d->wait_for_response(d->token, wait));
- } else {
- return true;
- }
- }
- }
- bool Cursor::is_single() const {
- return d->single;
- }
- void CursorPrivate::add_results(Array&& results) const {
- if (index >= buffer.size()) {
- buffer = std::move(results);
- index = 0;
- } else {
- for (auto& it : results) {
- buffer.emplace_back(std::move(it));
- }
- }
- }
- void CursorPrivate::add_response(Response&& response) const {
- using RT = Protocol::Response::ResponseType;
- switch (response.type) {
- case RT::SUCCESS_SEQUENCE:
- add_results(std::move(response.result));
- no_more = true;
- break;
- case RT::SUCCESS_PARTIAL:
- conn->continue_query(token);
- add_results(std::move(response.result));
- break;
- case RT::SUCCESS_ATOM:
- add_results(std::move(response.result));
- single = true;
- no_more = true;
- break;
- case RT::SERVER_INFO:
- add_results(std::move(response.result));
- single = true;
- no_more = true;
- break;
- case RT::WAIT_COMPLETE:
- case RT::CLIENT_ERROR:
- case RT::COMPILE_ERROR:
- case RT::RUNTIME_ERROR:
- no_more = true;
- throw response.as_error();
- }
- }
- Cursor::iterator Cursor::begin() {
- return iterator(this);
- }
- Cursor::iterator Cursor::end() {
- return iterator(nullptr);
- }
- Cursor::iterator::iterator(Cursor* cursor_) : cursor(cursor_) {}
- Cursor::iterator& Cursor::iterator::operator++ () {
- if (cursor == nullptr) {
- throw Error("incrementing an exhausted Cursor iterator");
- }
- cursor->next();
- return *this;
- }
- Datum& Cursor::iterator::operator* () {
- if (cursor == nullptr) {
- throw Error("reading from empty Cursor iterator");
- }
- return cursor->peek();
- }
- bool Cursor::iterator::operator!= (const Cursor::iterator& other) const {
- if (cursor == other.cursor) {
- return false;
- }
- return !((cursor == nullptr && !other.cursor->has_next()) ||
- (other.cursor == nullptr && !cursor->has_next()));
- }
- }
|