11.batch.cpp 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527
  1. /*
  2. * PROGRAM: Object oriented API samples.
  3. * MODULE: 11.batch.cpp
  4. * DESCRIPTION: A trivial sample of using Batch interface.
  5. *
  6. * Example for the following interfaces:
  7. * IBatch - interface to work with FB batches
  8. * IBatchCompletionState - contains result of batch execution
  9. *
  10. * c++ 11.batch.cpp -lfbclient
  11. *
  12. * The contents of this file are subject to the Initial
  13. * Developer's Public License Version 1.0 (the "License");
  14. * you may not use this file except in compliance with the
  15. * License. You may obtain a copy of the License at
  16. * http://www.ibphoenix.com/main.nfs?a=ibphoenix&page=ibp_idpl.
  17. *
  18. * Software distributed under the License is distributed AS IS,
  19. * WITHOUT WARRANTY OF ANY KIND, either express or implied.
  20. * See the License for the specific language governing rights
  21. * and limitations under the License.
  22. *
  23. * The Original Code was created by Alexander Peshkoff
  24. * for the Firebird Open Source RDBMS project.
  25. *
  26. * Copyright (c) 2017 Alexander Peshkoff <[email protected]>
  27. * and all contributors signed below.
  28. *
  29. * All Rights Reserved.
  30. * Contributor(s): ______________________________________.
  31. */
  32. #include "ifaceExamples.h"
  33. #include <firebird/Message.h>
  34. static IMaster* master = fb_get_master_interface();
  35. // output error message to user
  36. static void errPrint(IStatus* status)
  37. {
  38. char buf[256];
  39. master->getUtilInterface()->formatStatus(buf, sizeof(buf), status);
  40. fprintf(stderr, "%s\n", buf);
  41. }
  42. // align target to alignment boundary
  43. template <typename T>
  44. static inline T align(T target, uintptr_t alignment)
  45. {
  46. return (T) ((((uintptr_t) target) + alignment - 1) & ~(alignment - 1));
  47. }
  48. // append given message to buffer ptr
  49. static void putMsg(unsigned char*& ptr, const void* from, unsigned size, unsigned alignment)
  50. {
  51. memcpy(ptr, from, size);
  52. ptr += align(size, alignment);
  53. }
  54. // append blob header with BPB to buffer ptr
  55. // return pointer to blob size field - prefilled with BPB size
  56. static unsigned* putBlobHdr(unsigned char*& ptr, unsigned alignment, ISC_QUAD* id, unsigned bpbSize, const unsigned char* bpb)
  57. {
  58. ptr = align(ptr, alignment);
  59. memcpy(ptr, id, sizeof(ISC_QUAD));
  60. ptr += sizeof(ISC_QUAD);
  61. unsigned* rc = reinterpret_cast<unsigned*>(ptr);
  62. memcpy(ptr, &bpbSize, sizeof(unsigned));
  63. ptr += sizeof(unsigned);
  64. memcpy(ptr, &bpbSize, sizeof(unsigned));
  65. ptr += sizeof(unsigned);
  66. memcpy(ptr, bpb, bpbSize);
  67. ptr += bpbSize;
  68. return rc;
  69. }
  70. // append given blob to buffer ptr
  71. static void putBlob(unsigned char*& ptr, const void* from, unsigned size, unsigned alignment, ISC_QUAD* id)
  72. {
  73. unsigned* sizePtr = putBlobHdr(ptr, alignment, id, 0, NULL);
  74. memcpy(ptr, from, size);
  75. *sizePtr += size;
  76. ptr += size;
  77. ptr = align(ptr, alignment);
  78. }
  79. // append given segment to buffer ptr
  80. unsigned putSegment(unsigned char*& ptr, const char* testData)
  81. {
  82. ptr = align(ptr, IBatch::BLOB_SEGHDR_ALIGN);
  83. unsigned short l = strlen(testData);
  84. memcpy(ptr, &l, sizeof l);
  85. ptr += sizeof l;
  86. memcpy(ptr, testData, l);
  87. ptr += l;
  88. return align(l + sizeof l, IBatch::BLOB_SEGHDR_ALIGN);
  89. }
  90. // BatchCompletionState printer - prints all what we know about completed batch
  91. static void print_cs(ThrowStatusWrapper& status, IBatchCompletionState* cs, IUtil* utl)
  92. {
  93. unsigned p = 0;
  94. IStatus* s2 = NULL;
  95. bool pr1 = false, pr2 = false;
  96. // 1. Print per-message state info
  97. unsigned upcount = cs->getSize(&status);
  98. unsigned unk = 0, succ = 0;
  99. for (p = 0; p < upcount; ++p)
  100. {
  101. int s = cs->getState(&status, p);
  102. switch (s)
  103. {
  104. case IBatchCompletionState::EXECUTE_FAILED:
  105. if (!pr1)
  106. {
  107. printf("Message Status\n", p);
  108. pr1 = true;
  109. }
  110. printf("%5u Execute failed\n", p);
  111. break;
  112. case IBatchCompletionState::SUCCESS_NO_INFO:
  113. ++unk;
  114. break;
  115. default:
  116. if (!pr1)
  117. {
  118. printf("Message Status\n", p);
  119. pr1 = true;
  120. }
  121. printf("%5u Updated %d record(s)\n", p, s);
  122. ++succ;
  123. break;
  124. }
  125. }
  126. printf("Summary: total=%u success=%u success(but no update info)=%u\n", upcount, succ, unk);
  127. // 2. Print detailed errors (if exist) for messages
  128. s2 = master->getStatus();
  129. for(p = 0; (p = cs->findError(&status, p)) != IBatchCompletionState::NO_MORE_ERRORS; ++p)
  130. {
  131. try
  132. {
  133. cs->getStatus(&status, s2, p);
  134. char text[1024];
  135. utl->formatStatus(text, sizeof(text) - 1, s2);
  136. text[sizeof(text) - 1] = 0;
  137. if (!pr2)
  138. {
  139. printf("\nDetailed errors status:\n", p);
  140. pr2 = true;
  141. }
  142. printf("Message %u: %s\n", p, text);
  143. }
  144. catch (const FbException& error)
  145. {
  146. // handle error
  147. fprintf(stderr, "\nError describing message %u\n", p);
  148. errPrint(error.getStatus());
  149. fprintf(stderr, "\n");
  150. }
  151. }
  152. if (s2)
  153. s2->dispose();
  154. }
  155. int main()
  156. {
  157. int rc = 0;
  158. // set default password if none specified in environment
  159. setenv("ISC_USER", "sysdba", 0);
  160. setenv("ISC_PASSWORD", "masterkey", 0);
  161. // With ThrowStatusWrapper passed as status interface FbException will be thrown on error
  162. ThrowStatusWrapper status(master->getStatus());
  163. // Declare pointers to required interfaces
  164. IProvider* prov = master->getDispatcher();
  165. IUtil* utl = master->getUtilInterface();
  166. IAttachment* att = NULL;
  167. ITransaction* tra = NULL;
  168. IBatch* batch = NULL;
  169. IBatchCompletionState* cs = NULL;
  170. IXpbBuilder* pb = NULL;
  171. unsigned char streamBuf[10240]; // big enough for demo
  172. unsigned char* stream = NULL;
  173. try
  174. {
  175. // attach employee db
  176. att = prov->attachDatabase(&status, "employee", 0, NULL);
  177. tra = att->startTransaction(&status, 0, NULL);
  178. // cleanup
  179. att->execute(&status, tra, 0, "delete from project where proj_id like 'BAT%'", SAMPLES_DIALECT,
  180. NULL, NULL, NULL, NULL);
  181. //
  182. printf("\nPart 1. Simple messages. Adding one by one or by groups of messages.\n");
  183. //
  184. // Message to store in a table
  185. FB_MESSAGE(Msg1, ThrowStatusWrapper,
  186. (FB_VARCHAR(5), id)
  187. (FB_VARCHAR(10), name)
  188. ) project1(&status, master);
  189. project1.clear();
  190. IMessageMetadata* meta = project1.getMetadata();
  191. // sizes & alignments
  192. unsigned mesAlign = meta->getAlignment(&status);
  193. unsigned mesLength = meta->getMessageLength(&status);
  194. unsigned char* streamStart = align(streamBuf, mesAlign);
  195. // set batch parameters
  196. pb = utl->getXpbBuilder(&status, IXpbBuilder::BATCH, NULL, 0);
  197. // collect per-message statistics
  198. pb->insertInt(&status, IBatch::TAG_RECORD_COUNTS, 1);
  199. // create batch
  200. const char* sqlStmt1 = "insert into project(proj_id, proj_name) values(?, ?)";
  201. batch = att->createBatch(&status, tra, 0, sqlStmt1, SAMPLES_DIALECT, meta,
  202. pb->getBufferLength(&status), pb->getBuffer(&status));
  203. // fill batch with data record by record
  204. project1->id.set("BAT11");
  205. project1->name.set("SNGL_REC");
  206. batch->add(&status, 1, project1.getData());
  207. project1->id.set("BAT12");
  208. project1->name.set("SNGL_REC2");
  209. batch->add(&status, 1, project1.getData());
  210. // execute it
  211. cs = batch->execute(&status, tra);
  212. print_cs(status, cs, utl);
  213. // fill batch with data using many records at once
  214. stream = streamStart;
  215. project1->id.set("BAT13");
  216. project1->name.set("STRM_REC_A");
  217. putMsg(stream, project1.getData(), mesLength, mesAlign);
  218. project1->id.set("BAT14");
  219. project1->name.set("STRM_REC_B");
  220. putMsg(stream, project1.getData(), mesLength, mesAlign);
  221. project1->id.set("BAT15");
  222. project1->name.set("STRM_REC_C");
  223. putMsg(stream, project1.getData(), mesLength, mesAlign);
  224. batch->add(&status, 3, streamStart);
  225. stream = streamStart;
  226. project1->id.set("BAT15"); // constraint violation
  227. project1->name.set("STRM_REC_D");
  228. putMsg(stream, project1.getData(), mesLength, mesAlign);
  229. project1->id.set("BAT16");
  230. project1->name.set("STRM_REC_E");
  231. putMsg(stream, project1.getData(), mesLength, mesAlign);
  232. batch->add(&status, 1, streamStart);
  233. // execute it
  234. cs = batch->execute(&status, tra);
  235. print_cs(status, cs, utl);
  236. // close batch
  237. batch->close(&status);
  238. batch = NULL;
  239. //
  240. printf("\nPart 2. Simple BLOBs. Multiple errors return.\n");
  241. //
  242. // Message to store in a table
  243. FB_MESSAGE(Msg2, ThrowStatusWrapper,
  244. (FB_VARCHAR(5), id)
  245. (FB_VARCHAR(10), name)
  246. (FB_BLOB, desc)
  247. ) project2(&status, master);
  248. project2.clear();
  249. meta = project2.getMetadata();
  250. mesAlign = meta->getAlignment(&status);
  251. mesLength = meta->getMessageLength(&status);
  252. streamStart = align(streamBuf, mesAlign);
  253. // set batch parameters
  254. pb->clear(&status);
  255. // continue batch processing in case of errors in some messages
  256. pb->insertInt(&status, IBatch::TAG_MULTIERROR, 1);
  257. // enable blobs processing - IDs generated by firebird engine
  258. pb->insertInt(&status, IBatch::TAG_BLOB_POLICY, IBatch::BLOB_ID_ENGINE);
  259. // create batch
  260. const char* sqlStmt2 = "insert into project(proj_id, proj_name, proj_desc) values(?, ?, ?)";
  261. batch = att->createBatch(&status, tra, 0, sqlStmt2, SAMPLES_DIALECT, meta,
  262. pb->getBufferLength(&status), pb->getBuffer(&status));
  263. // fill batch with data
  264. project2->id.set("BAT21");
  265. project2->name.set("SNGL_BLOB");
  266. batch->addBlob(&status, strlen(sqlStmt2), sqlStmt2, &project2->desc, 0, NULL);
  267. batch->appendBlobData(&status, 1, "\n");
  268. batch->appendBlobData(&status, strlen(sqlStmt1), sqlStmt1);
  269. batch->add(&status, 1, project2.getData());
  270. // execute it
  271. cs = batch->execute(&status, tra);
  272. print_cs(status, cs, utl);
  273. // fill batch with data
  274. project2->id.set("BAT22");
  275. project2->name.set("SNGL_REC1");
  276. batch->addBlob(&status, strlen(sqlStmt2), sqlStmt2, &project2->desc, 0, NULL);
  277. batch->add(&status, 1, project2.getData());
  278. project2->id.set("BAT22");
  279. project2->name.set("SNGL_REC2"); // constraint violation
  280. batch->addBlob(&status, 2, "r2", &project2->desc, 0, NULL);
  281. batch->add(&status, 1, project2.getData());
  282. project2->id.set("BAT23");
  283. project2->name.set("SNGL_REC3");
  284. batch->addBlob(&status, 2, "r3", &project2->desc, 0, NULL);
  285. batch->add(&status, 1, project2.getData());
  286. project2->id.set("BAT23"); // constraint violation
  287. project2->name.set("SNGL_REC4");
  288. batch->addBlob(&status, 2, "r4", &project2->desc, 0, NULL);
  289. batch->add(&status, 1, project2.getData());
  290. // execute it
  291. cs = batch->execute(&status, tra);
  292. print_cs(status, cs, utl);
  293. // close batch
  294. batch->close(&status);
  295. batch = NULL;
  296. //
  297. printf("\nPart 3. BLOB stream, including segmented BLOB.\n");
  298. //
  299. // use Msg2/project2/sqlStmt2 to store in a table
  300. // set batch parameters
  301. pb->clear(&status);
  302. // enable blobs processing - blobs are placed in a stream
  303. pb->insertInt(&status, IBatch::TAG_BLOB_POLICY, IBatch::BLOB_STREAM);
  304. // create batch
  305. batch = att->createBatch(&status, tra, 0, sqlStmt2, SAMPLES_DIALECT, meta,
  306. pb->getBufferLength(&status), pb->getBuffer(&status));
  307. unsigned blobAlign = batch->getBlobAlignment(&status);
  308. // prepare blob IDs
  309. ISC_QUAD v1={0,1}, v2={0,2}, v3={0,3};
  310. // send messages to batch
  311. project2->id.set("BAT31");
  312. project2->name.set("STRM_BLB_A");
  313. project2->desc = v1;
  314. batch->add(&status, 1, project2.getData());
  315. project2->id.set("BAT32");
  316. project2->name.set("STRM_BLB_B");
  317. project2->desc = v2;
  318. batch->add(&status, 1, project2.getData());
  319. project2->id.set("BAT33");
  320. project2->name.set("STRM_BLB_C");
  321. project2->desc = v3;
  322. batch->add(&status, 1, project2.getData());
  323. // prepare blobs in the stream buffer
  324. const char* d1 = "1111111111111111111";
  325. const char* d2 = "22222222222222222222";
  326. const char* d3 = "33333333333333333333333333333333333333333333333333333";
  327. stream = streamStart;
  328. putBlob(stream, d1, strlen(d1), blobAlign, &v1);
  329. putBlob(stream, d2, strlen(d2), blobAlign, &v2);
  330. putBlob(stream, d3, strlen(d3), blobAlign, &v3);
  331. batch->addBlobStream(&status, stream - streamStart, streamStart);
  332. // Continue last blob
  333. stream = streamStart;
  334. ISC_QUAD nullId = {0,0};
  335. unsigned* size = putBlobHdr(stream, blobAlign, &nullId, 0, NULL);
  336. const char* d4 = " 444444444444444444444444";
  337. unsigned ld4 = strlen(d4);
  338. memcpy(stream, d4, ld4);
  339. *size += ld4;
  340. stream += ld4;
  341. stream = align(stream, blobAlign);
  342. stream = align(stream, blobAlign);
  343. batch->addBlobStream(&status, stream - streamStart, streamStart);
  344. // Put segmented Blob in the stream
  345. // add message
  346. ISC_QUAD vSeg={0,10};
  347. project2->id.set("BAT35");
  348. project2->name.set("STRM_B_SEG");
  349. project2->desc = vSeg;
  350. batch->add(&status, 1, project2.getData());
  351. // build BPB
  352. pb->dispose();
  353. pb = NULL;
  354. pb = utl->getXpbBuilder(&status, IXpbBuilder::BPB, NULL, 0);
  355. pb->insertInt(&status, isc_bpb_type, isc_bpb_type_segmented);
  356. // make stream
  357. stream = streamStart;
  358. size = putBlobHdr(stream, blobAlign, &vSeg, pb->getBufferLength(&status), pb->getBuffer(&status));
  359. *size += putSegment(stream, d1);
  360. *size += putSegment(stream, "\n");
  361. *size += putSegment(stream, d2);
  362. *size += putSegment(stream, "\n");
  363. *size += putSegment(stream, d3);
  364. // add stream to the batch
  365. stream = align(stream, blobAlign);
  366. batch->addBlobStream(&status, stream - streamStart, streamStart);
  367. // execute batch
  368. cs = batch->execute(&status, tra);
  369. print_cs(status, cs, utl);
  370. //
  371. printf("\nPart 4. BLOB created using IBlob interface.\n");
  372. //
  373. // use Msg2/project2/sqlStmt2 to store in a table
  374. // registerBlob() may be called in BLOB_STREAM batch, ID should be generated by user in this case
  375. // also demonstrates execution of same batch multiple times
  376. // create blob
  377. ISC_QUAD realId;
  378. IBlob* blob = att->createBlob(&status, tra, &realId, 0, NULL);
  379. const char* text = "Blob created using traditional API";
  380. blob->putSegment(&status, strlen(text), text);
  381. blob->close(&status);
  382. // add message
  383. project2->id.set("BAT38");
  384. project2->name.set("FRGN_BLB");
  385. project2->desc = v1; // after execute may reuse IDs
  386. batch->registerBlob(&status, &realId, &project2->desc);
  387. batch->add(&status, 1, project2.getData());
  388. // execute it
  389. cs = batch->execute(&status, tra);
  390. print_cs(status, cs, utl);
  391. // cleanup
  392. batch->close(&status);
  393. batch = NULL;
  394. tra->commit(&status);
  395. tra = NULL;
  396. att->detach(&status);
  397. att = NULL;
  398. }
  399. catch (const FbException& error)
  400. {
  401. // handle error
  402. rc = 1;
  403. errPrint(error.getStatus());
  404. }
  405. // release interfaces after error caught
  406. if (cs)
  407. cs->dispose();
  408. if (batch)
  409. batch->release();
  410. if (tra)
  411. tra->release();
  412. if (att)
  413. att->release();
  414. // cleanup
  415. if (pb)
  416. pb->dispose();
  417. status.dispose();
  418. prov->release();
  419. return rc;
  420. }