fbSampleReplicator.cpp 17 KB


  1. /*
  2. *
  3. * Sample replication plugin
  4. *
  5. */
  6. #include <algorithm>
  7. #include <atomic>
  8. #define __USE_MINGW_ANSI_STDIO 1
  9. #include <stdio.h>
  10. #include <time.h>
  11. #include "firebird/Interface.h"
  12. using namespace Firebird;
  13. #define WriteLog(file, ...) fprintf(file, __VA_ARGS__), fflush(file)
  14. class ReplPlugin : public IReplicatedSessionImpl<ReplPlugin, CheckStatusWrapper>
  15. {
  16. public:
  17. ReplPlugin(IPluginConfig* config);
  18. virtual ~ReplPlugin();
  19. // IReferenceCounted implementation
  20. void addRef() override;
  21. int release() override;
  22. // IPluginBase implementation
  23. void setOwner(IReferenceCounted* r) override;
  24. IReferenceCounted* getOwner() override;
  25. // IReplicatedSession implementation
  26. IStatus* getStatus() override;
  27. void setAttachment(IAttachment* attachment) override;
  28. IReplicatedTransaction* startTransaction(ITransaction* transaction, ISC_INT64 number) override;
  29. FB_BOOLEAN cleanupTransaction(ISC_INT64 number) override;
  30. FB_BOOLEAN deprecatedSetSequence(const char* name, ISC_INT64 value) override;
  31. FB_BOOLEAN setSequence2(const char* schemaName, const char* genName, ISC_INT64 value) override;
  32. private:
  33. friend class ReplTransaction;
  34. IAttachment* att = nullptr;
  35. FILE* log = nullptr;
  36. IStatus* status = nullptr;
  37. std::atomic_int refCounter;
  38. IReferenceCounted* owner;
  39. void dumpInfo(const unsigned char* buffer, size_t length);
  40. };
  41. class ReplTransaction: public IReplicatedTransactionImpl<ReplTransaction, CheckStatusWrapper>
  42. {
  43. public:
  44. ReplTransaction(ReplPlugin* session, ITransaction* transaction, ISC_INT64 number);
  45. ~ReplTransaction();
  46. // IDisposable implementation
  47. void dispose() override;
  48. // IReplicatedTransaction implementation
  49. FB_BOOLEAN prepare() override;
  50. FB_BOOLEAN commit() override;
  51. FB_BOOLEAN rollback() override;
  52. FB_BOOLEAN startSavepoint() override;
  53. FB_BOOLEAN releaseSavepoint() override;
  54. FB_BOOLEAN rollbackSavepoint() override;
  55. FB_BOOLEAN deprecatedInsertRecord(const char* name, IReplicatedRecord* record) override;
  56. FB_BOOLEAN insertRecord2(const char* schemaName, const char* tableName, IReplicatedRecord* record) override;
  57. FB_BOOLEAN deprecatedUpdateRecord(const char* name,
  58. IReplicatedRecord* orgRecord, IReplicatedRecord* newRecord) override;
  59. FB_BOOLEAN updateRecord2(const char* schemaName, const char* tableName,
  60. IReplicatedRecord* orgRecord, IReplicatedRecord* newRecord) override;
  61. FB_BOOLEAN deprecatedDeleteRecord(const char* name, IReplicatedRecord* record) override;
  62. FB_BOOLEAN deleteRecord2(const char* schemaName, const char* tableName, IReplicatedRecord* record) override;
  63. FB_BOOLEAN deprecatedExecuteSql(const char* sql) override;
  64. FB_BOOLEAN deprecatedExecuteSqlIntl(unsigned charset, const char* sql) override;
  65. FB_BOOLEAN executeSqlIntl2(unsigned charset, const char* schemaSearchPath, const char* sql) override;
  66. private:
  67. ReplPlugin* parent;
  68. ITransaction* trans;
  69. bool dumpData(IReplicatedRecord* record);
  70. };
  71. IMaster* master = nullptr;
  72. class PluginModule : public IPluginModuleImpl<PluginModule, CheckStatusWrapper>
  73. {
  74. public:
  75. void doClean() override {}
  76. void threadDetach() override {}
  77. } module;
  78. class Factory : public IPluginFactoryImpl<Factory, CheckStatusWrapper>
  79. {
  80. public:
  81. IPluginBase* createPlugin(CheckStatusWrapper* status, IPluginConfig* factoryParameter)
  82. {
  83. IPluginBase* p = new ReplPlugin(factoryParameter);
  84. p->addRef();
  85. return p;
  86. }
  87. } factory;
  88. extern "C"
  89. {
  90. FB_DLL_EXPORT void FB_PLUGIN_ENTRY_POINT(IMaster* m)
  91. {
  92. master = m;
  93. IPluginManager* pm = m->getPluginManager();
  94. pm->registerModule(&module);
  95. pm->registerPluginFactory(IPluginManager::TYPE_REPLICATOR, "fbSampleReplicator", &factory);
  96. }
  97. }
  98. static std::atomic_int logCounter;
  99. static const ISC_STATUS err[] = { isc_arg_gds, isc_random, isc_arg_string, (ISC_STATUS)"Intolerable integer value", isc_arg_end };
  100. static const ISC_STATUS wrn[] = { isc_arg_gds, isc_random, isc_arg_string, (ISC_STATUS)"Just a warning", isc_arg_end };
  101. ReplPlugin::ReplPlugin(IPluginConfig* conf)
  102. {
  103. char fn[100];
  104. snprintf(fn, sizeof(fn), "session_%08x_%d.log", (unsigned)time(nullptr), logCounter++);
  105. log = fopen(fn, "w");
  106. WriteLog(log, "%p\tReplicatedSession constructed\n", this);
  107. status = master->getStatus();
  108. }
  109. ReplPlugin::~ReplPlugin()
  110. {
  111. if (log != nullptr)
  112. {
  113. WriteLog(log, "%p\tReplicatedSession destructed\n", this);
  114. fclose(log);
  115. }
  116. if (att != nullptr)
  117. att->release();
  118. if (status != nullptr)
  119. status->dispose();
  120. }
  121. void ReplPlugin::addRef()
  122. {
  123. WriteLog(log, "%p\taddRef() to %d\n", this, ++refCounter);
  124. }
  125. int ReplPlugin::release()
  126. {
  127. WriteLog(log, "%p\trelease at %d\n", this, refCounter.load());
  128. if (--refCounter == 0)
  129. {
  130. delete this;
  131. return 0;
  132. }
  133. return 1;
  134. }
  135. void ReplPlugin::setOwner(IReferenceCounted* r)
  136. {
  137. WriteLog(log, "%p\tsetOwner(%p)\n", this, r);
  138. owner = r;
  139. }
  140. IReferenceCounted* ReplPlugin::getOwner()
  141. {
  142. WriteLog(log, "%p\tgetOwner()\n", this);
  143. return owner;
  144. }
  145. IStatus* ReplPlugin::getStatus()
  146. {
  147. WriteLog(log, "%p\tgetStatus()\n", this);
  148. return status;
  149. }
  150. void ReplPlugin::dumpInfo(const unsigned char* buffer, size_t length)
  151. {
  152. const unsigned char* p = buffer;
  153. while (p < buffer + length)
  154. {
  155. unsigned char item = *p++;
  156. // Handle terminating items fist
  157. if (item == isc_info_end)
  158. {
  159. return;
  160. }
  161. if (item == isc_info_truncated)
  162. {
  163. WriteLog(log, "\t\tDatabase info truncated\n");
  164. return;
  165. }
  166. // Now data items
  167. const unsigned len = p[0] | p[1] << 8;
  168. p += 2;
  169. switch (item)
  170. {
  171. case fb_info_db_guid:
  172. {
  173. WriteLog(log, "\t\tDatabase GUID = %.*s\n", len, p);
  174. break;
  175. }
  176. case isc_info_error:
  177. {
  178. unsigned err = p[1];
  179. for (unsigned i = 1; i < std::min(len, 4U); i++)
  180. {
  181. err |= p[i + 1] << (8 * i);
  182. }
  183. WriteLog(log, "\t\tDatabase info error %u for item %d\n", err, p[0]);
  184. return;
  185. }
  186. default:
  187. WriteLog(log, "\t\tUnexpected info item %d\n", item);
  188. break;
  189. }
  190. p += len;
  191. }
  192. WriteLog(log, "\t\tSuspicious exit from info parse loop\n");
  193. }
  194. void ReplPlugin::setAttachment(IAttachment* attachment)
  195. {
  196. WriteLog(log, "%p\tAssigned attachment %p\n", this, attachment);
  197. att = attachment;
  198. att->addRef();
  199. CheckStatusWrapper ExtStatus(status);
  200. const unsigned char items[] = { fb_info_db_guid };
  201. unsigned char response[80];
  202. att->getInfo(&ExtStatus, sizeof(items), items, sizeof(response), response);
  203. if (status->getState() == 0)
  204. {
  205. dumpInfo(response, sizeof(response));
  206. }
  207. }
  208. IReplicatedTransaction* ReplPlugin::startTransaction(ITransaction* transaction, ISC_INT64 number)
  209. {
  210. WriteLog(log, "%p\tstartTransaction(%p, %lld)\n", this, transaction, number);
  211. return new ReplTransaction(this, transaction, number);
  212. }
  213. FB_BOOLEAN ReplPlugin::cleanupTransaction(ISC_INT64 number)
  214. {
  215. WriteLog(log, "%p\tcleanupTransaction(%lld)\n", this, number);
  216. return FB_TRUE;
  217. }
  218. FB_BOOLEAN ReplPlugin::deprecatedSetSequence(const char* name, ISC_INT64 value)
  219. {
  220. WriteLog(log, "%p\tdeprecatedSetSequence(%s, %lld)\n", this, name, value);
  221. return FB_TRUE;
  222. }
  223. FB_BOOLEAN ReplPlugin::setSequence2(const char* schemaName, const char* genName, ISC_INT64 value)
  224. {
  225. WriteLog(log, "%p\tsetSequence2(%s.%s, %lld)\n", this, schemaName, genName, value);
  226. return FB_TRUE;
  227. }
  228. ReplTransaction::ReplTransaction(ReplPlugin* session, ITransaction* transaction, ISC_INT64 number):
  229. parent(session), trans(transaction)
  230. {
  231. parent->addRef(); // Lock parent from disappearing
  232. trans->addRef();
  233. WriteLog(parent->log, "%p\tTransaction started\n", this);
  234. }
  235. ReplTransaction::~ReplTransaction()
  236. {
  237. WriteLog(parent->log, "%p\tTransaction destructed\n", this);
  238. trans->release();
  239. parent->release();
  240. }
  241. void ReplTransaction::dispose()
  242. {
  243. WriteLog(parent->log, "%p\tdispose()\n", this);
  244. delete this;
  245. }
  246. FB_BOOLEAN ReplTransaction::prepare()
  247. {
  248. WriteLog(parent->log, "%p\tprepare()\n", this);
  249. return FB_TRUE;
  250. }
  251. FB_BOOLEAN ReplTransaction::commit()
  252. {
  253. WriteLog(parent->log, "%p\tcommit()\n", this);
  254. return FB_TRUE;
  255. }
  256. FB_BOOLEAN ReplTransaction::rollback()
  257. {
  258. WriteLog(parent->log, "%p\trollback()\n", this);
  259. parent->status->setWarnings(wrn);
  260. return FB_FALSE;
  261. }
  262. FB_BOOLEAN ReplTransaction::startSavepoint()
  263. {
  264. WriteLog(parent->log, "%p\tstartSavepoint()\n", this);
  265. return FB_TRUE;
  266. }
  267. FB_BOOLEAN ReplTransaction::releaseSavepoint()
  268. {
  269. WriteLog(parent->log, "%p\treleaseSavepoint()\n", this);
  270. return FB_TRUE;
  271. }
  272. FB_BOOLEAN ReplTransaction::rollbackSavepoint()
  273. {
  274. WriteLog(parent->log, "%p\trollbackSavepoint()\n", this);
  275. return FB_TRUE;
  276. }
  277. bool ReplTransaction::dumpData(IReplicatedRecord* record)
  278. {
  279. for (unsigned i = 0; i < record->getCount(); i++)
  280. {
  281. IReplicatedField* field = record->getField(i);
  282. if (field == nullptr)
  283. {
  284. WriteLog(parent->log, "\t\tNO FIELD %u FOUND\n", i);
  285. continue;
  286. }
  287. unsigned fieldType = field->getType();
  288. WriteLog(parent->log, "\tfield %u (%s), type %u:\n", i, field->getName(), fieldType);
  289. const void* fieldData = field->getData();
  290. if (fieldData == nullptr)
  291. {
  292. WriteLog(parent->log, "\t\tNULL\n");
  293. }
  294. else
  295. {
  296. switch (fieldType)
  297. {
  298. case SQL_TEXT:
  299. {
  300. unsigned length = field->getLength();
  301. unsigned charSet = field->getCharSet();
  302. if (charSet == 1) // OCTETS
  303. {
  304. WriteLog(parent->log, "\t\tBINARY data length %u: ", length);
  305. const unsigned char* data = reinterpret_cast<const unsigned char*>(fieldData);
  306. for (unsigned j = 0; j < length; j++)
  307. WriteLog(parent->log, "%02u", *data++);
  308. WriteLog(parent->log, "\n");
  309. }
  310. else
  311. WriteLog(parent->log, "\t\tTEXT with charset %u, length %u: \"%.*s\"\n", charSet, length, length, reinterpret_cast<const char*>(fieldData));
  312. break;
  313. }
  314. case SQL_VARYING:
  315. {
  316. unsigned charSet = field->getCharSet();
  317. const paramvary* data = static_cast<const paramvary*>(fieldData);
  318. if (charSet == 1) // OCTETS
  319. {
  320. fprintf(parent->log, "\t\tVARBINARY data length %u: ", data->vary_length);
  321. for (unsigned j = 0; j < data->vary_length; j++)
  322. fprintf(parent->log, "%02u", data->vary_string[j]);
  323. WriteLog(parent->log, "\n");
  324. }
  325. else
  326. WriteLog(parent->log, "\t\tVARCHAR with charset %u, length %u: \"%.*s\"\n", charSet, data->vary_length, data->vary_length, data->vary_string);
  327. break;
  328. }
  329. case SQL_SHORT:
  330. {
  331. WriteLog(parent->log, "\t\tSMALLINT with scale %u: %d\n", field->getScale(), *reinterpret_cast<const int16_t*>(fieldData));
  332. break;
  333. }
  334. case SQL_LONG:
  335. {
  336. int value = *reinterpret_cast<const int32_t*>(fieldData);
  337. WriteLog(parent->log, "\t\tINTEGER with scale %u: %d\n", field->getScale(), value);
  338. if (value == 666)
  339. throw value;
  340. break;
  341. }
  342. case SQL_ARRAY:
  343. {
  344. WriteLog(parent->log, "\t\tARRAY\n");
  345. break;
  346. }
  347. case SQL_BLOB:
  348. {
  349. unsigned subType = field->getSubType();
  350. WriteLog(parent->log, "\t\tBLOB subtype %u:\n", subType);
  351. unsigned charSet = 1;
  352. if (subType == 1)
  353. {
  354. charSet = field->getCharSet();
  355. }
  356. CheckStatusWrapper ExtStatus(parent->status);
  357. ISC_QUAD blobId = *reinterpret_cast<const ISC_QUAD*>(fieldData);
  358. IBlob* blob(parent->att->openBlob(&ExtStatus, trans, &blobId, 0, nullptr));
  359. if (ExtStatus.getState() & IStatus::STATE_ERRORS)
  360. return false;
  361. char buffer[USHRT_MAX];
  362. do
  363. {
  364. unsigned length;
  365. int ret = blob->getSegment(&ExtStatus, sizeof(buffer), buffer, &length);
  366. if (ret == IStatus::RESULT_ERROR)
  367. {
  368. blob->release();
  369. return false;
  370. }
  371. if (ret == IStatus::RESULT_NO_DATA)
  372. break;
  373. if (length > 0)
  374. {
  375. fprintf(parent->log, "\t\t - segment of length %u: ", length);
  376. if (subType != 1 || charSet == 1)
  377. {
  378. for (unsigned j = 0; j < length; j++)
  379. fprintf(parent->log, "%02u", buffer[j]);
  380. WriteLog(parent->log, "\n");
  381. }
  382. else
  383. WriteLog(parent->log, "(charset %u) \"%.*s\"\n", charSet, length, buffer);
  384. }
  385. } while (true);
  386. blob->close(&ExtStatus);
  387. blob->release();
  388. if (ExtStatus.getState() & IStatus::STATE_ERRORS)
  389. return false;
  390. break;
  391. }
  392. case SQL_FLOAT:
  393. {
  394. WriteLog(parent->log, "\t\tFLOAT: %f\n", *reinterpret_cast<const float*>(fieldData));
  395. break;
  396. }
  397. case SQL_DOUBLE:
  398. {
  399. WriteLog(parent->log, "\t\tDOUBLE: %f\n", *reinterpret_cast<const double*>(fieldData));
  400. break;
  401. }
  402. case SQL_TYPE_DATE:
  403. {
  404. ISC_DATE value = *reinterpret_cast<const ISC_DATE*>(fieldData);
  405. IUtil* utl = master->getUtilInterface();
  406. unsigned year, month, day;
  407. utl->decodeDate(value, &year, &month, &day);
  408. WriteLog(parent->log, "\t\tDATE: %04u-%02u-%02u\n", year, month, day);
  409. break;
  410. }
  411. case SQL_TYPE_TIME:
  412. {
  413. ISC_TIME value = *reinterpret_cast<const ISC_TIME*>(fieldData);
  414. IUtil* utl = master->getUtilInterface();
  415. unsigned hours, minutes, seconds, fractions;
  416. utl->decodeTime(value, &hours, &minutes, &seconds, &fractions);
  417. WriteLog(parent->log, "\t\tTIME: %02u:%02u:%02u.%04u\n", hours, minutes, seconds, fractions);
  418. break;
  419. }
  420. case SQL_TIMESTAMP:
  421. {
  422. ISC_TIMESTAMP value = *reinterpret_cast<const ISC_TIMESTAMP*>(fieldData);
  423. IUtil* utl = master->getUtilInterface();
  424. unsigned year, month, day, hours, minutes, seconds, fractions;
  425. utl->decodeDate(value.timestamp_date, &year, &month, &day);
  426. utl->decodeTime(value.timestamp_time, &hours, &minutes, &seconds, &fractions);
  427. WriteLog(parent->log, "\t\tTIMESTAMP: %04u-%02u-%02u %02u:%02u:%02u.%04u\n", year, month, day, hours, minutes, seconds, fractions);
  428. break;
  429. }
  430. case SQL_INT64:
  431. {
  432. WriteLog(parent->log, "\t\tBIGINT with scale %u: %lld\n", field->getScale(), *reinterpret_cast<const int64_t*>(fieldData));
  433. break;
  434. }
  435. case SQL_BOOLEAN:
  436. {
  437. WriteLog(parent->log, "\t\tBOOLEAN: %s\n",
  438. *reinterpret_cast<const FB_BOOLEAN*>(fieldData) == FB_TRUE ?
  439. "true" : "false");
  440. break;
  441. }
  442. case SQL_INT128:
  443. {
  444. char buffer[50];
  445. unsigned scale = field->getScale();
  446. CheckStatusWrapper ExtStatus(parent->status);
  447. master->getUtilInterface()->getInt128(&ExtStatus)->toString(&ExtStatus,
  448. reinterpret_cast<const FB_I128*>(fieldData), scale,
  449. sizeof(buffer), buffer);
  450. WriteLog(parent->log, "\t\tINT128 with scale %u: %s\n", scale, buffer);
  451. break;
  452. }
  453. default:
  454. {
  455. WriteLog(parent->log, "\t\twhatever\n");
  456. }
  457. }
  458. }
  459. }
  460. return true;
  461. }
  462. FB_BOOLEAN ReplTransaction::deprecatedInsertRecord(const char* name, IReplicatedRecord* record)
  463. {
  464. WriteLog(parent->log, "%p\tdeprecated Insert record into %s\n", this, name);
  465. try
  466. {
  467. return dumpData(record) ? FB_TRUE : FB_FALSE;
  468. }
  469. catch (const int)
  470. {
  471. parent->status->setErrors(err);
  472. return FB_FALSE;
  473. }
  474. }
  475. FB_BOOLEAN ReplTransaction::insertRecord2(const char* schemaName, const char* tableName, IReplicatedRecord* record)
  476. {
  477. WriteLog(parent->log, "%p\tInsert record into %s.%s\n", this, schemaName, tableName);
  478. try
  479. {
  480. return dumpData(record) ? FB_TRUE : FB_FALSE;
  481. }
  482. catch (const int)
  483. {
  484. parent->status->setErrors(err);
  485. return FB_FALSE;
  486. }
  487. }
  488. FB_BOOLEAN ReplTransaction::deprecatedUpdateRecord(const char* name,
  489. IReplicatedRecord* orgRecord, IReplicatedRecord* newRecord)
  490. {
  491. WriteLog(parent->log, "%p\tdeprecated Update %s\nOldData:\n", this, name);
  492. try
  493. {
  494. if (!dumpData(orgRecord))
  495. return FB_FALSE;
  496. WriteLog(parent->log, "NewData:\n");
  497. return dumpData(newRecord) ? FB_TRUE : FB_FALSE;
  498. }
  499. catch (const int)
  500. {
  501. parent->status->setErrors(err);
  502. return FB_FALSE;
  503. }
  504. }
  505. FB_BOOLEAN ReplTransaction::updateRecord2(const char* schemaName, const char* tableName,
  506. IReplicatedRecord* orgRecord, IReplicatedRecord* newRecord)
  507. {
  508. WriteLog(parent->log, "%p\tUpdate %s.%s\nOldData:\n", this, schemaName, tableName);
  509. try
  510. {
  511. if (!dumpData(orgRecord))
  512. return FB_FALSE;
  513. WriteLog(parent->log, "NewData:\n");
  514. return dumpData(newRecord) ? FB_TRUE : FB_FALSE;
  515. }
  516. catch (const int)
  517. {
  518. parent->status->setErrors(err);
  519. return FB_FALSE;
  520. }
  521. }
  522. FB_BOOLEAN ReplTransaction::deprecatedDeleteRecord(const char* name, IReplicatedRecord* record)
  523. {
  524. WriteLog(parent->log, "%p\tdeprecated Delete from %s\n", this, name);
  525. try
  526. {
  527. return dumpData(record) ? FB_TRUE : FB_FALSE;
  528. }
  529. catch (const int)
  530. {
  531. parent->status->setErrors(err);
  532. return FB_FALSE;
  533. }
  534. }
  535. FB_BOOLEAN ReplTransaction::deleteRecord2(const char* schemaName, const char* tableName, IReplicatedRecord* record)
  536. {
  537. WriteLog(parent->log, "%p\tDelete from %s.%s\n", this, schemaName, tableName);
  538. try
  539. {
  540. return dumpData(record) ? FB_TRUE : FB_FALSE;
  541. }
  542. catch (const int)
  543. {
  544. parent->status->setErrors(err);
  545. return FB_FALSE;
  546. }
  547. }
  548. FB_BOOLEAN ReplTransaction::deprecatedExecuteSql(const char* sql)
  549. {
  550. WriteLog(parent->log, "%p\tdeprecatedExecuteSql(%s)\n", this, sql);
  551. return FB_TRUE;
  552. }
  553. FB_BOOLEAN ReplTransaction::deprecatedExecuteSqlIntl(unsigned charset, const char* sql)
  554. {
  555. WriteLog(parent->log, "%p\tdeprecatedExecuteSqlIntl(%u, %s)\n", this, charset, sql);
  556. return FB_TRUE;
  557. }
  558. FB_BOOLEAN ReplTransaction::executeSqlIntl2(unsigned charset, const char* schemaSearchPath, const char* sql)
  559. {
  560. WriteLog(parent->log, "%p\tExecuteSqlIntl2(%u, %s, %s)\n", this, charset, schemaSearchPath, sql);
  561. return FB_TRUE;
  562. }