test_pipeline.cxx 2.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364
  1. #include <chrono>
  2. #include <pqxx/pipeline>
  3. #include <pqxx/transaction>
  4. #include "../test_helpers.hxx"
  5. namespace
  6. {
  7. void test_pipeline()
  8. {
  9. pqxx::connection conn;
  10. pqxx::work tx{conn};
  11. // A pipeline grabs transaction focus, blocking regular queries and such.
  12. pqxx::pipeline pipe(tx, "test_pipeline_detach");
  13. PQXX_CHECK_THROWS(
  14. tx.exec("SELECT 1"), std::logic_error,
  15. "Pipeline does not block regular queries");
  16. // Flushing a pipeline relinquishes transaction focus.
  17. pipe.flush();
  18. auto r{tx.exec("SELECT 2")};
  19. PQXX_CHECK_EQUAL(
  20. std::size(r), 1, "Wrong query result after flushing pipeline.");
  21. PQXX_CHECK_EQUAL(
  22. r[0][0].as<int>(), 2, "Query returns wrong data after flushing pipeline.");
  23. // Inserting a query makes the pipeline grab transaction focus back.
  24. auto q{pipe.insert("SELECT 2")};
  25. PQXX_CHECK_THROWS(
  26. tx.exec("SELECT 3"), std::logic_error,
  27. "Pipeline does not block regular queries");
  28. // Invoking complete() also detaches the pipeline from the transaction.
  29. pipe.complete();
  30. r = tx.exec("SELECT 4");
  31. PQXX_CHECK_EQUAL(std::size(r), 1, "Wrong query result after complete().");
  32. PQXX_CHECK_EQUAL(
  33. r[0][0].as<int>(), 4, "Query returns wrong data after complete().");
  34. // The complete() also received any pending query results from the backend.
  35. r = pipe.retrieve(q);
  36. PQXX_CHECK_EQUAL(std::size(r), 1, "Wrong result from pipeline.");
  37. PQXX_CHECK_EQUAL(r[0][0].as<int>(), 2, "Pipeline returned wrong data.");
  38. // We can cancel while the pipe is empty, and things will still work.
  39. pipe.cancel();
  40. // Issue a query and cancel it. Measure time to see that we don't really
  41. // wait.
  42. using clock = std::chrono::steady_clock;
  43. auto const start{clock::now()};
  44. pipe.retain(0);
  45. pipe.insert("pg_sleep(10)");
  46. pipe.cancel();
  47. auto const finish{clock::now()};
  48. auto const seconds{
  49. std::chrono::duration_cast<std::chrono::seconds>(finish - start).count()};
  50. PQXX_CHECK_LESS(seconds, 5, "Canceling a sleep took suspiciously long.");
  51. }
  52. } // namespace
  53. PQXX_REGISTER_TEST(test_pipeline);