11.batch.cpp 14 KB


  1. /*
  2. * PROGRAM: Object oriented API samples.
  3. * MODULE: 11.batch.cpp
  4. * DESCRIPTION: A trivial sample of using Batch interface.
  5. *
  6. * Example for the following interfaces:
  7. * IBatch - interface to work with FB pipes
  8. *
  9. * c++ 11.batch.cpp -lfbclient
  10. *
  11. * The contents of this file are subject to the Initial
  12. * Developer's Public License Version 1.0 (the "License");
  13. * you may not use this file except in compliance with the
  14. * License. You may obtain a copy of the License at
  15. * http://www.ibphoenix.com/main.nfs?a=ibphoenix&page=ibp_idpl.
  16. *
  17. * Software distributed under the License is distributed AS IS,
  18. * WITHOUT WARRANTY OF ANY KIND, either express or implied.
  19. * See the License for the specific language governing rights
  20. * and limitations under the License.
  21. *
  22. * The Original Code was created by Alexander Peshkoff
  23. * for the Firebird Open Source RDBMS project.
  24. *
  25. * Copyright (c) 2017 Alexander Peshkoff <[email protected]>
  26. * and all contributors signed below.
  27. *
  28. * All Rights Reserved.
  29. * Contributor(s): ______________________________________.
  30. */
  31. #include "ifaceExamples.h"
  32. #include <firebird/Message.h>
  33. static IMaster* master = fb_get_master_interface();
  34. // output error message to user
  35. static void errPrint(IStatus* status)
  36. {
  37. char buf[256];
  38. master->getUtilInterface()->formatStatus(buf, sizeof(buf), status);
  39. fprintf(stderr, "%s\n", buf);
  40. }
  41. // align target to alignment boundary
  42. template <typename T>
  43. static inline T align(T target, uintptr_t alignment)
  44. {
  45. return (T) ((((uintptr_t) target) + alignment - 1) & ~(alignment - 1));
  46. }
  47. // append given message to buffer ptr
  48. static void putMsg(unsigned char*& ptr, const void* from, unsigned size, unsigned alignment)
  49. {
  50. memcpy(ptr, from, size);
  51. ptr += align(size, alignment);
  52. }
  53. // append blob header with BPB to buffer ptr
  54. // return pointer to blob size field - prefilled with BPB size
  55. static unsigned* putBlobHdr(unsigned char*& ptr, unsigned alignment, ISC_QUAD* id, unsigned bpbSize, const unsigned char* bpb)
  56. {
  57. ptr = align(ptr, alignment);
  58. memcpy(ptr, id, sizeof(ISC_QUAD));
  59. ptr += sizeof(ISC_QUAD);
  60. unsigned* rc = reinterpret_cast<unsigned*>(ptr);
  61. memcpy(ptr, &bpbSize, sizeof(unsigned));
  62. ptr += sizeof(unsigned);
  63. memcpy(ptr, &bpbSize, sizeof(unsigned));
  64. ptr += sizeof(unsigned);
  65. memcpy(ptr, bpb, bpbSize);
  66. ptr += bpbSize;
  67. return rc;
  68. }
  69. // append given blob to buffer ptr
  70. static void putBlob(unsigned char*& ptr, const void* from, unsigned size, unsigned alignment, ISC_QUAD* id)
  71. {
  72. unsigned* sizePtr = putBlobHdr(ptr, alignment, id, 0, NULL);
  73. memcpy(ptr, from, size);
  74. *sizePtr += size;
  75. ptr += size;
  76. ptr = align(ptr, alignment);
  77. }
  78. // append given segment to buffer ptr
  79. unsigned putSegment(unsigned char*& ptr, const char* testData)
  80. {
  81. ptr = align(ptr, IBatch::BLOB_SEGHDR_ALIGN);
  82. unsigned short l = strlen(testData);
  83. memcpy(ptr, &l, sizeof l);
  84. ptr += sizeof l;
  85. memcpy(ptr, testData, l);
  86. ptr += l;
  87. return align(l + sizeof l, IBatch::BLOB_SEGHDR_ALIGN);
  88. }
  89. // BatchCompletionState printer - prints all what we know about completed batch
  90. static void print_cs(ThrowStatusWrapper& status, IBatchCompletionState* cs, IUtil* utl)
  91. {
  92. unsigned p = 0;
  93. IStatus* s2 = NULL;
  94. bool pr1 = false, pr2 = false;
  95. // 1. Print per-message state info
  96. unsigned upcount = cs->getSize(&status);
  97. unsigned unk = 0, succ = 0;
  98. for (p = 0; p < upcount; ++p)
  99. {
  100. int s = cs->getState(&status, p);
  101. switch (s)
  102. {
  103. case IBatchCompletionState::EXECUTE_FAILED:
  104. if (!pr1)
  105. {
  106. printf("Message Status\n", p);
  107. pr1 = true;
  108. }
  109. printf("%5u Execute failed\n", p);
  110. break;
  111. case IBatchCompletionState::SUCCESS_NO_INFO:
  112. ++unk;
  113. break;
  114. default:
  115. if (!pr1)
  116. {
  117. printf("Message Status\n", p);
  118. pr1 = true;
  119. }
  120. printf("%5u Updated %d record(s)\n", p, s);
  121. ++succ;
  122. break;
  123. }
  124. }
  125. printf("Summary: total=%u success=%u success(but no update info)=%u\n", upcount, succ, unk);
  126. // 2. Print detailed errors (if exist) for messages
  127. s2 = master->getStatus();
  128. for(p = 0; (p = cs->findError(&status, p)) != IBatchCompletionState::NO_MORE_ERRORS; ++p)
  129. {
  130. try
  131. {
  132. cs->getStatus(&status, s2, p);
  133. char text[1024];
  134. utl->formatStatus(text, sizeof(text) - 1, s2);
  135. text[sizeof(text) - 1] = 0;
  136. if (!pr2)
  137. {
  138. printf("\nDetailed errors status:\n", p);
  139. pr2 = true;
  140. }
  141. printf("Message %u: %s\n", p, text);
  142. }
  143. catch (const FbException& error)
  144. {
  145. // handle error
  146. fprintf(stderr, "\nError describing message %u\n", p);
  147. errPrint(error.getStatus());
  148. fprintf(stderr, "\n");
  149. }
  150. }
  151. if (s2)
  152. s2->dispose();
  153. }
  154. int main()
  155. {
  156. int rc = 0;
  157. // set default password if none specified in environment
  158. setenv("ISC_USER", "sysdba", 0);
  159. setenv("ISC_PASSWORD", "masterkey", 0);
  160. // With ThrowStatusWrapper passed as status interface FbException will be thrown on error
  161. ThrowStatusWrapper status(master->getStatus());
  162. // Declare pointers to required interfaces
  163. IProvider* prov = master->getDispatcher();
  164. IUtil* utl = master->getUtilInterface();
  165. IAttachment* att = NULL;
  166. ITransaction* tra = NULL;
  167. IBatch* batch = NULL;
  168. IBatchCompletionState* cs = NULL;
  169. IXpbBuilder* pb = NULL;
  170. unsigned char streamBuf[10240]; // big enough for demo
  171. unsigned char* stream = NULL;
  172. try
  173. {
  174. // attach employee db
  175. att = prov->attachDatabase(&status, "employee", 0, NULL);
  176. tra = att->startTransaction(&status, 0, NULL);
  177. // cleanup
  178. att->execute(&status, tra, 0, "delete from project where proj_id like 'BAT%'", SAMPLES_DIALECT,
  179. NULL, NULL, NULL, NULL);
  180. //
  181. printf("\nPart 1. Simple messages. Adding one by one or by groups of messages.\n");
  182. //
  183. // Message to store in a table
  184. FB_MESSAGE(Msg1, ThrowStatusWrapper,
  185. (FB_VARCHAR(5), id)
  186. (FB_VARCHAR(10), name)
  187. ) project1(&status, master);
  188. project1.clear();
  189. IMessageMetadata* meta = project1.getMetadata();
  190. // sizes & alignments
  191. unsigned mesAlign = meta->getAlignment(&status);
  192. unsigned mesLength = meta->getMessageLength(&status);
  193. unsigned char* streamStart = align(streamBuf, mesAlign);
  194. // set batch parameters
  195. pb = utl->getXpbBuilder(&status, IXpbBuilder::BATCH, NULL, 0);
  196. // collect per-message statistics
  197. pb->insertInt(&status, IBatch::TAG_RECORD_COUNTS, 1);
  198. // create batch
  199. const char* sqlStmt1 = "insert into project(proj_id, proj_name) values(?, ?)";
  200. batch = att->createBatch(&status, tra, 0, sqlStmt1, SAMPLES_DIALECT, meta,
  201. pb->getBufferLength(&status), pb->getBuffer(&status));
  202. // fill batch with data record by record
  203. project1->id.set("BAT11");
  204. project1->name.set("SNGL_REC");
  205. batch->add(&status, 1, project1.getData());
  206. project1->id.set("BAT12");
  207. project1->name.set("SNGL_REC2");
  208. batch->add(&status, 1, project1.getData());
  209. // execute it
  210. cs = batch->execute(&status, tra);
  211. print_cs(status, cs, utl);
  212. // fill batch with data using many records at once
  213. stream = streamStart;
  214. project1->id.set("BAT13");
  215. project1->name.set("STRM_REC_A");
  216. putMsg(stream, project1.getData(), mesLength, mesAlign);
  217. project1->id.set("BAT14");
  218. project1->name.set("STRM_REC_B");
  219. putMsg(stream, project1.getData(), mesLength, mesAlign);
  220. project1->id.set("BAT15");
  221. project1->name.set("STRM_REC_C");
  222. putMsg(stream, project1.getData(), mesLength, mesAlign);
  223. batch->add(&status, 3, streamStart);
  224. stream = streamStart;
  225. project1->id.set("BAT15"); // constraint violation
  226. project1->name.set("STRM_REC_D");
  227. putMsg(stream, project1.getData(), mesLength, mesAlign);
  228. project1->id.set("BAT16");
  229. project1->name.set("STRM_REC_E");
  230. putMsg(stream, project1.getData(), mesLength, mesAlign);
  231. batch->add(&status, 1, streamStart);
  232. // execute it
  233. cs = batch->execute(&status, tra);
  234. print_cs(status, cs, utl);
  235. // close batch
  236. batch->release();
  237. batch = NULL;
  238. //
  239. printf("\nPart 2. Simple BLOBs. Multiple errors return.\n");
  240. //
  241. // Message to store in a table
  242. FB_MESSAGE(Msg2, ThrowStatusWrapper,
  243. (FB_VARCHAR(5), id)
  244. (FB_VARCHAR(10), name)
  245. (FB_BLOB, desc)
  246. ) project2(&status, master);
  247. project2.clear();
  248. meta = project2.getMetadata();
  249. mesAlign = meta->getAlignment(&status);
  250. mesLength = meta->getMessageLength(&status);
  251. streamStart = align(streamBuf, mesAlign);
  252. // set batch parameters
  253. pb->clear(&status);
  254. // continue batch processing in case of errors in some messages
  255. pb->insertInt(&status, IBatch::TAG_MULTIERROR, 1);
  256. // enable blobs processing - IDs generated by firebird engine
  257. pb->insertInt(&status, IBatch::TAG_BLOB_POLICY, IBatch::BLOB_ID_ENGINE);
  258. // create batch
  259. const char* sqlStmt2 = "insert into project(proj_id, proj_name, proj_desc) values(?, ?, ?)";
  260. batch = att->createBatch(&status, tra, 0, sqlStmt2, SAMPLES_DIALECT, meta,
  261. pb->getBufferLength(&status), pb->getBuffer(&status));
  262. // fill batch with data
  263. project2->id.set("BAT21");
  264. project2->name.set("SNGL_BLOB");
  265. batch->addBlob(&status, strlen(sqlStmt2), sqlStmt2, &project2->desc, 0, NULL);
  266. batch->appendBlobData(&status, 1, "\n");
  267. batch->appendBlobData(&status, strlen(sqlStmt1), sqlStmt1);
  268. batch->add(&status, 1, project2.getData());
  269. // execute it
  270. cs = batch->execute(&status, tra);
  271. print_cs(status, cs, utl);
  272. // fill batch with data
  273. project2->id.set("BAT22");
  274. project2->name.set("SNGL_REC1");
  275. batch->addBlob(&status, strlen(sqlStmt2), sqlStmt2, &project2->desc, 0, NULL);
  276. batch->add(&status, 1, project2.getData());
  277. project2->id.set("BAT22");
  278. project2->name.set("SNGL_REC2"); // constraint violation
  279. batch->addBlob(&status, 2, "r2", &project2->desc, 0, NULL);
  280. batch->add(&status, 1, project2.getData());
  281. project2->id.set("BAT23");
  282. project2->name.set("SNGL_REC3");
  283. batch->addBlob(&status, 2, "r3", &project2->desc, 0, NULL);
  284. batch->add(&status, 1, project2.getData());
  285. project2->id.set("BAT23"); // constraint violation
  286. project2->name.set("SNGL_REC4");
  287. batch->addBlob(&status, 2, "r4", &project2->desc, 0, NULL);
  288. batch->add(&status, 1, project2.getData());
  289. // execute it
  290. cs = batch->execute(&status, tra);
  291. print_cs(status, cs, utl);
  292. // close batch
  293. batch->release();
  294. batch = NULL;
  295. //
  296. printf("\nPart 3. BLOB stream, including segmented BLOB.\n");
  297. //
  298. // use Msg2/project2/sqlStmt2 to store in a table
  299. // set batch parameters
  300. pb->clear(&status);
  301. // enable blobs processing - blobs are placed in a stream
  302. pb->insertInt(&status, IBatch::TAG_BLOB_POLICY, IBatch::BLOB_STREAM);
  303. // create batch
  304. batch = att->createBatch(&status, tra, 0, sqlStmt2, SAMPLES_DIALECT, meta,
  305. pb->getBufferLength(&status), pb->getBuffer(&status));
  306. unsigned blobAlign = batch->getBlobAlignment(&status);
  307. // prepare blob IDs
  308. ISC_QUAD v1={0,1}, v2={0,2}, v3={0,3};
  309. // send messages to batch
  310. project2->id.set("BAT31");
  311. project2->name.set("STRM_BLB_A");
  312. project2->desc = v1;
  313. batch->add(&status, 1, project2.getData());
  314. project2->id.set("BAT32");
  315. project2->name.set("STRM_BLB_B");
  316. project2->desc = v2;
  317. batch->add(&status, 1, project2.getData());
  318. project2->id.set("BAT33");
  319. project2->name.set("STRM_BLB_C");
  320. project2->desc = v3;
  321. batch->add(&status, 1, project2.getData());
  322. // prepare blobs in the stream buffer
  323. const char* d1 = "1111111111111111111";
  324. const char* d2 = "22222222222222222222";
  325. const char* d3 = "33333333333333333333333333333333333333333333333333333";
  326. stream = streamStart;
  327. putBlob(stream, d1, strlen(d1), blobAlign, &v1);
  328. putBlob(stream, d2, strlen(d2), blobAlign, &v2);
  329. putBlob(stream, d3, strlen(d3), blobAlign, &v3);
  330. batch->addBlobStream(&status, stream - streamStart, streamStart);
  331. // Put segmented Blob in the stream
  332. // add message
  333. ISC_QUAD vSeg={0,10};
  334. project2->id.set("BAT35");
  335. project2->name.set("STRM_B_SEG");
  336. project2->desc = vSeg;
  337. batch->add(&status, 1, project2.getData());
  338. // build BPB
  339. pb->dispose();
  340. pb = NULL;
  341. pb = utl->getXpbBuilder(&status, IXpbBuilder::BPB, NULL, 0);
  342. pb->insertInt(&status, isc_bpb_type, isc_bpb_type_segmented);
  343. // make stream
  344. stream = streamStart;
  345. unsigned* size = putBlobHdr(stream, blobAlign, &vSeg, pb->getBufferLength(&status), pb->getBuffer(&status));
  346. *size += putSegment(stream, d1);
  347. *size += putSegment(stream, "\n");
  348. *size += putSegment(stream, d2);
  349. *size += putSegment(stream, "\n");
  350. *size += putSegment(stream, d3);
  351. // add stream to the batch
  352. stream = align(stream, blobAlign);
  353. batch->addBlobStream(&status, stream - streamStart, streamStart);
  354. // execute batch
  355. cs = batch->execute(&status, tra);
  356. print_cs(status, cs, utl);
  357. //
  358. printf("\nPart 4. BLOB created using IBlob interface.\n");
  359. //
  360. // use Msg2/project2/sqlStmt2 to store in a table
  361. // registerBlob() may be called in BLOB_STREAM batch, ID should be generated by user in this case
  362. // also demonstrates execution of same batch multiple times
  363. // create blob
  364. ISC_QUAD realId;
  365. IBlob* blob = att->createBlob(&status, tra, &realId, 0, NULL);
  366. const char* text = "Blob created using traditional API";
  367. blob->putSegment(&status, strlen(text), text);
  368. blob->close(&status);
  369. // add message
  370. project2->id.set("BAT38");
  371. project2->name.set("FRGN_BLB");
  372. project2->desc = v1; // after execute may reuse IDs
  373. batch->registerBlob(&status, &realId, &project2->desc);
  374. batch->add(&status, 1, project2.getData());
  375. // execute it
  376. cs = batch->execute(&status, tra);
  377. print_cs(status, cs, utl);
  378. // cleanup
  379. batch->release();
  380. batch = NULL;
  381. tra->commit(&status);
  382. tra = NULL;
  383. att->detach(&status);
  384. att = NULL;
  385. }
  386. catch (const FbException& error)
  387. {
  388. // handle error
  389. rc = 1;
  390. errPrint(error.getStatus());
  391. }
  392. // release interfaces after error caught
  393. if (cs)
  394. cs->dispose();
  395. if (batch)
  396. batch->release();
  397. if (tra)
  398. tra->release();
  399. if (att)
  400. att->release();
  401. // cleanup
  402. if (pb)
  403. pb->dispose();
  404. status.dispose();
  405. prov->release();
  406. return rc;
  407. }