11.batch.cpp 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590
  1. /*
  2. * PROGRAM: Object oriented API samples.
  3. * MODULE: 11.batch.cpp
  4. * DESCRIPTION: A trivial sample of using Batch interface.
  5. *
  6. * Example for the following interfaces:
  7. * IBatch - interface to work with FB batches
  8. * IBatchCompletionState - contains result of batch execution
  9. *
  10. * c++ 11.batch.cpp -lfbclient
  11. *
  12. * The contents of this file are subject to the Initial
  13. * Developer's Public License Version 1.0 (the "License");
  14. * you may not use this file except in compliance with the
  15. * License. You may obtain a copy of the License at
  16. * http://www.ibphoenix.com/main.nfs?a=ibphoenix&page=ibp_idpl.
  17. *
  18. * Software distributed under the License is distributed AS IS,
  19. * WITHOUT WARRANTY OF ANY KIND, either express or implied.
  20. * See the License for the specific language governing rights
  21. * and limitations under the License.
  22. *
  23. * The Original Code was created by Alexander Peshkoff
  24. * for the Firebird Open Source RDBMS project.
  25. *
  26. * Copyright (c) 2017 Alexander Peshkoff <[email protected]>
  27. * and all contributors signed below.
  28. *
  29. * All Rights Reserved.
  30. * Contributor(s): ______________________________________.
  31. */
  32. #include "ifaceExamples.h"
  33. #include <firebird/Message.h>
  34. static IMaster* master = fb_get_master_interface();
  35. // output error message to user
  36. static void errPrint(IStatus* status)
  37. {
  38. char buf[256];
  39. master->getUtilInterface()->formatStatus(buf, sizeof(buf), status);
  40. fprintf(stderr, "%s\n", buf);
  41. }
  42. // align target to alignment boundary
  43. template <typename T>
  44. static inline T align(T target, uintptr_t alignment)
  45. {
  46. return (T) ((((uintptr_t) target) + alignment - 1) & ~(alignment - 1));
  47. }
  48. // append given message to buffer ptr
  49. static void putMsg(unsigned char*& ptr, const void* from, unsigned size, unsigned alignment)
  50. {
  51. memcpy(ptr, from, size);
  52. ptr += align(size, alignment);
  53. }
  54. // append blob header with BPB to buffer ptr
  55. // return pointer to blob size field - prefilled with BPB size
  56. static unsigned* putBlobHdr(unsigned char*& ptr, unsigned alignment, ISC_QUAD* id, unsigned bpbSize, const unsigned char* bpb)
  57. {
  58. ptr = align(ptr, alignment);
  59. memcpy(ptr, id, sizeof(ISC_QUAD));
  60. ptr += sizeof(ISC_QUAD);
  61. unsigned* rc = reinterpret_cast<unsigned*>(ptr);
  62. memcpy(ptr, &bpbSize, sizeof(unsigned));
  63. ptr += sizeof(unsigned);
  64. memcpy(ptr, &bpbSize, sizeof(unsigned));
  65. ptr += sizeof(unsigned);
  66. memcpy(ptr, bpb, bpbSize);
  67. ptr += bpbSize;
  68. return rc;
  69. }
  70. // append given blob to buffer ptr
  71. static void putBlob(unsigned char*& ptr, const void* from, unsigned size, unsigned alignment, ISC_QUAD* id)
  72. {
  73. unsigned* sizePtr = putBlobHdr(ptr, alignment, id, 0, NULL);
  74. memcpy(ptr, from, size);
  75. *sizePtr += size;
  76. ptr += size;
  77. ptr = align(ptr, alignment);
  78. }
  79. // append given segment to buffer ptr
  80. unsigned putSegment(unsigned char*& ptr, const char* testData)
  81. {
  82. ptr = align(ptr, IBatch::BLOB_SEGHDR_ALIGN);
  83. unsigned short l = strlen(testData);
  84. memcpy(ptr, &l, sizeof l);
  85. ptr += sizeof l;
  86. memcpy(ptr, testData, l);
  87. ptr += l;
  88. return align(l + sizeof l, IBatch::BLOB_SEGHDR_ALIGN);
  89. }
  90. // batch info printer - prints what we know about batch
  91. static void printInfo(ThrowStatusWrapper& status, const char* hdr, IBatch* b, IUtil* utl)
  92. {
  93. printf("\n%s\n", hdr);
  94. const unsigned char items[] = {IBatch::INF_BLOB_ALIGNMENT, IBatch::INF_BUFFER_BYTES_SIZE,
  95. IBatch::INF_DATA_BYTES_SIZE, IBatch::INF_BLOBS_BYTES_SIZE};
  96. unsigned char buffer[29];
  97. b->getInfo(&status, sizeof items, items, sizeof buffer, buffer);
  98. IXpbBuilder* pb = utl->getXpbBuilder(&status, IXpbBuilder::INFO_RESPONSE, buffer, sizeof buffer);
  99. for (pb->rewind(&status); !pb->isEof(&status); pb->moveNext(&status))
  100. {
  101. int val = pb->getInt(&status);
  102. const char* text = "Unknown tag";
  103. switch (pb->getTag(&status))
  104. {
  105. case IBatch::INF_BLOB_ALIGNMENT:
  106. text = "Blob alignment";
  107. break;
  108. case IBatch::INF_BUFFER_BYTES_SIZE:
  109. text = "Buffer size";
  110. break;
  111. case IBatch::INF_DATA_BYTES_SIZE:
  112. text = "Messages size";
  113. break;
  114. case IBatch::INF_BLOBS_BYTES_SIZE:
  115. text = "Blobs size";
  116. break;
  117. case isc_info_truncated:
  118. printf(" truncated\n");
  119. // fall down...
  120. case isc_info_end:
  121. pb->dispose();
  122. return;
  123. default:
  124. printf("Unexpected item %d\n", pb->getTag(&status));
  125. pb->dispose();
  126. return;
  127. }
  128. printf("%s = %d\n", text, val);
  129. }
  130. pb->dispose();
  131. }
  132. // BatchCompletionState printer - prints all what we know about completed batch
  133. static void print_cs(ThrowStatusWrapper& status, IBatchCompletionState* cs, IUtil* utl)
  134. {
  135. unsigned p = 0;
  136. IStatus* s2 = NULL;
  137. bool pr1 = false, pr2 = false;
  138. // 1. Print per-message state info
  139. unsigned upcount = cs->getSize(&status);
  140. unsigned unk = 0, succ = 0;
  141. for (p = 0; p < upcount; ++p)
  142. {
  143. int s = cs->getState(&status, p);
  144. switch (s)
  145. {
  146. case IBatchCompletionState::EXECUTE_FAILED:
  147. if (!pr1)
  148. {
  149. printf("Message Status\n");
  150. pr1 = true;
  151. }
  152. printf("%5u Execute failed\n", p);
  153. break;
  154. case IBatchCompletionState::SUCCESS_NO_INFO:
  155. ++unk;
  156. break;
  157. default:
  158. if (!pr1)
  159. {
  160. printf("Message Status\n");
  161. pr1 = true;
  162. }
  163. printf("%5u Updated %d record(s)\n", p, s);
  164. ++succ;
  165. break;
  166. }
  167. }
  168. printf("Summary: total=%u success=%u success(but no update info)=%u\n", upcount, succ, unk);
  169. // 2. Print detailed errors (if exist) for messages
  170. s2 = master->getStatus();
  171. for(p = 0; (p = cs->findError(&status, p)) != IBatchCompletionState::NO_MORE_ERRORS; ++p)
  172. {
  173. try
  174. {
  175. cs->getStatus(&status, s2, p);
  176. char text[1024];
  177. utl->formatStatus(text, sizeof(text) - 1, s2);
  178. text[sizeof(text) - 1] = 0;
  179. if (!pr2)
  180. {
  181. printf("\nDetailed errors status:\n");
  182. pr2 = true;
  183. }
  184. printf("Message %u: %s\n", p, text);
  185. }
  186. catch (const FbException& error)
  187. {
  188. // handle error
  189. fprintf(stderr, "\nError describing message %u\n", p);
  190. errPrint(error.getStatus());
  191. fprintf(stderr, "\n");
  192. }
  193. }
  194. if (s2)
  195. s2->dispose();
  196. }
  197. int main()
  198. {
  199. int rc = 0;
  200. // set default password if none specified in environment
  201. setenv("ISC_USER", "sysdba", 0);
  202. setenv("ISC_PASSWORD", "masterkey", 0);
  203. // With ThrowStatusWrapper passed as status interface FbException will be thrown on error
  204. ThrowStatusWrapper status(master->getStatus());
  205. // Declare pointers to required interfaces
  206. IProvider* prov = master->getDispatcher();
  207. IUtil* utl = master->getUtilInterface();
  208. IAttachment* att = NULL;
  209. ITransaction* tra = NULL;
  210. IBatch* batch = NULL;
  211. IBatchCompletionState* cs = NULL;
  212. IXpbBuilder* pb = NULL;
  213. unsigned char streamBuf[10240]; // big enough for demo
  214. unsigned char* stream = NULL;
  215. try
  216. {
  217. // attach employee db
  218. att = prov->attachDatabase(&status, "employee", 0, NULL);
  219. tra = att->startTransaction(&status, 0, NULL);
  220. // cleanup
  221. att->execute(&status, tra, 0, "delete from project where proj_id like 'BAT%'", SAMPLES_DIALECT,
  222. NULL, NULL, NULL, NULL);
  223. //
  224. printf("\nPart 1. Simple messages. Adding one by one or by groups of messages, cancel batch.\n");
  225. //
  226. // Message to store in a table
  227. FB_MESSAGE(Msg1, ThrowStatusWrapper,
  228. (FB_VARCHAR(5), id)
  229. (FB_VARCHAR(10), name)
  230. ) project1(&status, master);
  231. project1.clear();
  232. IMessageMetadata* meta = project1.getMetadata();
  233. // sizes & alignments
  234. unsigned mesAlign = meta->getAlignment(&status);
  235. unsigned mesLength = meta->getMessageLength(&status);
  236. unsigned char* streamStart = align(streamBuf, mesAlign);
  237. // set batch parameters
  238. pb = utl->getXpbBuilder(&status, IXpbBuilder::BATCH, NULL, 0);
  239. // collect per-message statistics
  240. pb->insertInt(&status, IBatch::TAG_RECORD_COUNTS, 1);
  241. // create batch
  242. const char* sqlStmt1 = "insert into project(proj_id, proj_name) values(?, ?)";
  243. batch = att->createBatch(&status, tra, 0, sqlStmt1, SAMPLES_DIALECT, meta,
  244. pb->getBufferLength(&status), pb->getBuffer(&status));
  245. // fill batch with data record by record
  246. project1->id.set("BAT11");
  247. project1->name.set("SNGL_REC1");
  248. batch->add(&status, 1, project1.getData());
  249. project1->id.set("BAT12");
  250. project1->name.set("SNGL_REC2");
  251. batch->add(&status, 1, project1.getData());
  252. // execute it
  253. cs = batch->execute(&status, tra);
  254. print_cs(status, cs, utl);
  255. // add a big set of same records ...
  256. for (int i = 0; i < 100000; ++i)
  257. {
  258. project1->id.set("BAT11");
  259. project1->name.set("SNGL_REC");
  260. batch->add(&status, 1, project1.getData());
  261. }
  262. // check batch state
  263. printInfo(status, "Info when added many records", batch, utl);
  264. // ... and cancel that records
  265. batch->cancel(&status);
  266. // fill batch with data using many records at once
  267. stream = streamStart;
  268. project1->id.set("BAT13");
  269. project1->name.set("STRM_REC_A");
  270. putMsg(stream, project1.getData(), mesLength, mesAlign);
  271. project1->id.set("BAT14");
  272. project1->name.set("STRM_REC_B");
  273. putMsg(stream, project1.getData(), mesLength, mesAlign);
  274. project1->id.set("BAT15");
  275. project1->name.set("STRM_REC_C");
  276. putMsg(stream, project1.getData(), mesLength, mesAlign);
  277. batch->add(&status, 3, streamStart);
  278. stream = streamStart;
  279. project1->id.set("BAT15"); // constraint violation
  280. project1->name.set("STRM_REC_D");
  281. putMsg(stream, project1.getData(), mesLength, mesAlign);
  282. project1->id.set("BAT16"); // will not be processed due to return on single error
  283. project1->name.set("STRM_REC_E");
  284. putMsg(stream, project1.getData(), mesLength, mesAlign);
  285. batch->add(&status, 2, streamStart);
  286. // execute it
  287. cs = batch->execute(&status, tra);
  288. print_cs(status, cs, utl);
  289. // close batch
  290. batch->close(&status);
  291. batch = NULL;
  292. //
  293. printf("\nPart 2. Simple BLOBs. Multiple errors return.\n");
  294. //
  295. // Message to store in a table
  296. FB_MESSAGE(Msg2, ThrowStatusWrapper,
  297. (FB_VARCHAR(5), id)
  298. (FB_VARCHAR(10), name)
  299. (FB_BLOB, desc)
  300. ) project2(&status, master);
  301. project2.clear();
  302. meta = project2.getMetadata();
  303. mesAlign = meta->getAlignment(&status);
  304. mesLength = meta->getMessageLength(&status);
  305. streamStart = align(streamBuf, mesAlign);
  306. // set batch parameters
  307. pb->clear(&status);
  308. // continue batch processing in case of errors in some messages
  309. pb->insertInt(&status, IBatch::TAG_MULTIERROR, 1);
  310. // enable blobs processing - IDs generated by firebird engine
  311. pb->insertInt(&status, IBatch::TAG_BLOB_POLICY, IBatch::BLOB_ID_ENGINE);
  312. // create batch
  313. const char* sqlStmt2 = "insert into project(proj_id, proj_name, proj_desc) values(?, ?, ?)";
  314. batch = att->createBatch(&status, tra, 0, sqlStmt2, SAMPLES_DIALECT, meta,
  315. pb->getBufferLength(&status), pb->getBuffer(&status));
  316. // fill batch with data
  317. project2->id.set("BAT21");
  318. project2->name.set("SNGL_BLOB");
  319. batch->addBlob(&status, strlen(sqlStmt2), sqlStmt2, &project2->desc, 0, NULL);
  320. batch->appendBlobData(&status, 1, "\n");
  321. batch->appendBlobData(&status, strlen(sqlStmt1), sqlStmt1);
  322. batch->add(&status, 1, project2.getData());
  323. printInfo(status, "Info with blob", batch, utl);
  324. // execute it
  325. cs = batch->execute(&status, tra);
  326. print_cs(status, cs, utl);
  327. // fill batch with data
  328. project2->id.set("BAT22");
  329. project2->name.set("SNGL_REC1");
  330. batch->addBlob(&status, strlen(sqlStmt2), sqlStmt2, &project2->desc, 0, NULL);
  331. batch->add(&status, 1, project2.getData());
  332. project2->id.set("BAT22");
  333. project2->name.set("SNGL_REC2"); // constraint violation
  334. batch->addBlob(&status, 2, "r2", &project2->desc, 0, NULL);
  335. batch->add(&status, 1, project2.getData());
  336. project2->id.set("BAT23");
  337. project2->name.set("SNGL_REC3");
  338. batch->addBlob(&status, 2, "r3", &project2->desc, 0, NULL);
  339. batch->add(&status, 1, project2.getData());
  340. project2->id.set("BAT23"); // constraint violation
  341. project2->name.set("SNGL_REC4");
  342. batch->addBlob(&status, 2, "r4", &project2->desc, 0, NULL);
  343. batch->add(&status, 1, project2.getData());
  344. // execute it
  345. cs = batch->execute(&status, tra);
  346. print_cs(status, cs, utl);
  347. // close batch
  348. batch->close(&status);
  349. batch = NULL;
  350. //
  351. printf("\nPart 3. BLOB stream, including segmented BLOB.\n");
  352. //
  353. // use Msg2/project2/sqlStmt2 to store in a table
  354. // set batch parameters
  355. pb->clear(&status);
  356. // enable blobs processing - blobs are placed in a stream
  357. pb->insertInt(&status, IBatch::TAG_BLOB_POLICY, IBatch::BLOB_STREAM);
  358. // create batch
  359. batch = att->createBatch(&status, tra, 0, sqlStmt2, SAMPLES_DIALECT, meta,
  360. pb->getBufferLength(&status), pb->getBuffer(&status));
  361. unsigned blobAlign = batch->getBlobAlignment(&status);
  362. // prepare blob IDs
  363. ISC_QUAD v1={0,1}, v2={0,2}, v3={0,3};
  364. // send messages to batch
  365. project2->id.set("BAT31");
  366. project2->name.set("STRM_BLB_A");
  367. project2->desc = v1;
  368. batch->add(&status, 1, project2.getData());
  369. project2->id.set("BAT32");
  370. project2->name.set("STRM_BLB_B");
  371. project2->desc = v2;
  372. batch->add(&status, 1, project2.getData());
  373. project2->id.set("BAT33");
  374. project2->name.set("STRM_BLB_C");
  375. project2->desc = v3;
  376. batch->add(&status, 1, project2.getData());
  377. // prepare blobs in the stream buffer
  378. const char* d1 = "1111111111111111111";
  379. const char* d2 = "22222222222222222222";
  380. const char* d3 = "33333333333333333333333333333333333333333333333333333";
  381. stream = streamStart;
  382. putBlob(stream, d1, strlen(d1), blobAlign, &v1);
  383. putBlob(stream, d2, strlen(d2), blobAlign, &v2);
  384. putBlob(stream, d3, strlen(d3), blobAlign, &v3);
  385. batch->addBlobStream(&status, stream - streamStart, streamStart);
  386. // Continue last blob
  387. stream = streamStart;
  388. ISC_QUAD nullId = {0,0};
  389. unsigned* size = putBlobHdr(stream, blobAlign, &nullId, 0, NULL);
  390. const char* d4 = " 444444444444444444444444";
  391. unsigned ld4 = strlen(d4);
  392. memcpy(stream, d4, ld4);
  393. *size += ld4;
  394. stream += ld4;
  395. stream = align(stream, blobAlign);
  396. stream = align(stream, blobAlign);
  397. batch->addBlobStream(&status, stream - streamStart, streamStart);
  398. // Put segmented Blob in the stream
  399. // add message
  400. ISC_QUAD vSeg={0,10};
  401. project2->id.set("BAT35");
  402. project2->name.set("STRM_B_SEG");
  403. project2->desc = vSeg;
  404. batch->add(&status, 1, project2.getData());
  405. // build BPB
  406. pb->dispose();
  407. pb = NULL;
  408. pb = utl->getXpbBuilder(&status, IXpbBuilder::BPB, NULL, 0);
  409. pb->insertInt(&status, isc_bpb_type, isc_bpb_type_segmented);
  410. // make stream
  411. stream = streamStart;
  412. size = putBlobHdr(stream, blobAlign, &vSeg, pb->getBufferLength(&status), pb->getBuffer(&status));
  413. *size += putSegment(stream, d1);
  414. *size += putSegment(stream, "\n");
  415. *size += putSegment(stream, d2);
  416. *size += putSegment(stream, "\n");
  417. *size += putSegment(stream, d3);
  418. // add stream to the batch
  419. stream = align(stream, blobAlign);
  420. batch->addBlobStream(&status, stream - streamStart, streamStart);
  421. // execute batch
  422. cs = batch->execute(&status, tra);
  423. print_cs(status, cs, utl);
  424. //
  425. printf("\nPart 4. BLOB created using IBlob interface.\n");
  426. //
  427. // use Msg2/project2/sqlStmt2 to store in a table
  428. // registerBlob() may be called in BLOB_STREAM batch, ID should be generated by user in this case
  429. // also demonstrates execution of same batch multiple times
  430. // create blob
  431. ISC_QUAD realId;
  432. IBlob* blob = att->createBlob(&status, tra, &realId, 0, NULL);
  433. const char* text = "Blob created using traditional API";
  434. blob->putSegment(&status, strlen(text), text);
  435. blob->close(&status);
  436. // add message
  437. project2->id.set("BAT38");
  438. project2->name.set("FRGN_BLB");
  439. project2->desc = v1; // after execute may reuse IDs
  440. batch->registerBlob(&status, &realId, &project2->desc);
  441. batch->add(&status, 1, project2.getData());
  442. // execute it
  443. cs = batch->execute(&status, tra);
  444. print_cs(status, cs, utl);
  445. // cleanup
  446. batch->close(&status);
  447. batch = NULL;
  448. tra->commit(&status);
  449. tra = NULL;
  450. att->detach(&status);
  451. att = NULL;
  452. }
  453. catch (const FbException& error)
  454. {
  455. // handle error
  456. rc = 1;
  457. errPrint(error.getStatus());
  458. }
  459. // release interfaces after error caught
  460. if (cs)
  461. cs->dispose();
  462. if (batch)
  463. batch->release();
  464. if (tra)
  465. tra->release();
  466. if (att)
  467. att->release();
  468. // cleanup
  469. if (pb)
  470. pb->dispose();
  471. status.dispose();
  472. prov->release();
  473. return rc;
  474. }