fbSampleReplicator.cpp 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572
  1. /*
  2. *
  3. * Sample replication plugin
  4. *
  5. */
  6. #include <algorithm>
  7. #include <atomic>
  8. #define __USE_MINGW_ANSI_STDIO 1
  9. #include <stdio.h>
  10. #include <time.h>
  11. #include "firebird/Interface.h"
  12. using namespace Firebird;
  13. #define WriteLog(file, ...) fprintf(file, __VA_ARGS__), fflush(file)
  14. class ReplPlugin : public IReplicatedSessionImpl<ReplPlugin, CheckStatusWrapper>
  15. {
  16. public:
  17. ReplPlugin(IPluginConfig* config);
  18. virtual ~ReplPlugin();
  19. // IReferenceCounted implementation
  20. void addRef() override;
  21. int release() override;
  22. // IPluginBase implementation
  23. void setOwner(IReferenceCounted* r) override;
  24. IReferenceCounted* getOwner() override;
  25. // IReplicatedSession implementation
  26. IStatus* getStatus() override;
  27. void setAttachment(IAttachment* attachment) override;
  28. IReplicatedTransaction* startTransaction(ITransaction* transaction, ISC_INT64 number) override;
  29. FB_BOOLEAN cleanupTransaction(ISC_INT64 number) override;
  30. FB_BOOLEAN setSequence(const char* name, ISC_INT64 value) override;
  31. private:
  32. friend class ReplTransaction;
  33. IAttachment* att = nullptr;
  34. FILE* log = nullptr;
  35. IStatus* status = nullptr;
  36. std::atomic_int refCounter;
  37. IReferenceCounted* owner;
  38. void dumpInfo(const unsigned char* buffer, size_t length);
  39. };
  40. class ReplTransaction: public IReplicatedTransactionImpl<ReplTransaction, CheckStatusWrapper>
  41. {
  42. public:
  43. ReplTransaction(ReplPlugin* session, ITransaction* transaction, ISC_INT64 number);
  44. ~ReplTransaction();
  45. // IDisposable implementation
  46. void dispose() override;
  47. // IReplicatedTransaction implementation
  48. FB_BOOLEAN prepare() override;
  49. FB_BOOLEAN commit() override;
  50. FB_BOOLEAN rollback() override;
  51. FB_BOOLEAN startSavepoint() override;
  52. FB_BOOLEAN releaseSavepoint() override;
  53. FB_BOOLEAN rollbackSavepoint() override;
  54. FB_BOOLEAN insertRecord(const char* name, IReplicatedRecord* record) override;
  55. FB_BOOLEAN updateRecord(const char* name, IReplicatedRecord* orgRecord, IReplicatedRecord* newRecord) override;
  56. FB_BOOLEAN deleteRecord(const char* name, IReplicatedRecord* record) override;
  57. FB_BOOLEAN executeSql(const char* sql) override;
  58. FB_BOOLEAN executeSqlIntl(unsigned charset, const char* sql) override;
  59. private:
  60. ReplPlugin* parent;
  61. ITransaction* trans;
  62. bool dumpData(IReplicatedRecord* record);
  63. };
  64. IMaster* master = nullptr;
  65. class PluginModule : public IPluginModuleImpl<PluginModule, CheckStatusWrapper>
  66. {
  67. public:
  68. void doClean() override {}
  69. void threadDetach() override {}
  70. } module;
  71. class Factory : public IPluginFactoryImpl<Factory, CheckStatusWrapper>
  72. {
  73. public:
  74. IPluginBase* createPlugin(CheckStatusWrapper* status, IPluginConfig* factoryParameter)
  75. {
  76. IPluginBase* p = new ReplPlugin(factoryParameter);
  77. p->addRef();
  78. return p;
  79. }
  80. } factory;
  81. extern "C"
  82. {
  83. #if defined(__WIN32__)
  84. void __declspec(dllexport) FB_PLUGIN_ENTRY_POINT(IMaster* m);
  85. #else
  86. void FB_PLUGIN_ENTRY_POINT(IMaster* m)
  87. __attribute__((visibility("default")));
  88. #endif // __WIN32__
  89. void FB_PLUGIN_ENTRY_POINT(IMaster* m)
  90. {
  91. master = m;
  92. IPluginManager* pm = m->getPluginManager();
  93. pm->registerModule(&module);
  94. pm->registerPluginFactory(IPluginManager::TYPE_REPLICATOR, "fbSampleReplicator", &factory);
  95. }
  96. }
  97. static std::atomic_int logCounter;
  98. static const ISC_STATUS err[] = { isc_arg_gds, isc_random, isc_arg_string, (ISC_STATUS)"Intolerable integer value", isc_arg_end };
  99. static const ISC_STATUS wrn[] = { isc_arg_gds, isc_random, isc_arg_string, (ISC_STATUS)"Just a warning", isc_arg_end };
  100. ReplPlugin::ReplPlugin(IPluginConfig* conf)
  101. {
  102. char fn[100];
  103. sprintf(fn, "session_%08x_%d.log", (unsigned)time(nullptr), logCounter++);
  104. log = fopen(fn, "w");
  105. WriteLog(log, "%p\tReplicatedSession constructed\n", this);
  106. status = master->getStatus();
  107. }
  108. ReplPlugin::~ReplPlugin()
  109. {
  110. if (log != nullptr)
  111. {
  112. WriteLog(log, "%p\tReplicatedSession destructed\n", this);
  113. fclose(log);
  114. }
  115. if (att != nullptr)
  116. att->release();
  117. if (status != nullptr)
  118. status->dispose();
  119. }
  120. void ReplPlugin::addRef()
  121. {
  122. WriteLog(log, "%p\taddRef() to %d\n", this, ++refCounter);
  123. }
  124. int ReplPlugin::release()
  125. {
  126. WriteLog(log, "%p\trelease at %d\n", this, refCounter.load());
  127. if (--refCounter == 0)
  128. {
  129. delete this;
  130. return 0;
  131. }
  132. return 1;
  133. }
  134. void ReplPlugin::setOwner(IReferenceCounted* r)
  135. {
  136. WriteLog(log, "%p\tsetOwner(%p)\n", this, r);
  137. owner = r;
  138. }
  139. IReferenceCounted* ReplPlugin::getOwner()
  140. {
  141. WriteLog(log, "%p\tgetOwner()\n", this);
  142. return owner;
  143. }
  144. IStatus* ReplPlugin::getStatus()
  145. {
  146. WriteLog(log, "%p\tgetStatus()\n", this);
  147. return status;
  148. }
  149. void ReplPlugin::dumpInfo(const unsigned char* buffer, size_t length)
  150. {
  151. const unsigned char* p = buffer;
  152. while (p < buffer + length)
  153. {
  154. unsigned char item = *p++;
  155. // Handle terminating items fist
  156. if (item == isc_info_end)
  157. {
  158. return;
  159. }
  160. if (item == isc_info_truncated)
  161. {
  162. WriteLog(log, "\t\tDatabase info truncated\n");
  163. return;
  164. }
  165. // Now data items
  166. const unsigned len = p[0] | p[1] << 8;
  167. p += 2;
  168. switch (item)
  169. {
  170. case fb_info_db_guid:
  171. {
  172. WriteLog(log, "\t\tDatabase GUID = %.*s\n", len, p);
  173. break;
  174. }
  175. case isc_info_error:
  176. {
  177. unsigned err = p[1];
  178. for (unsigned i = 1; i < std::min(len, 4U); i++)
  179. {
  180. err |= p[i + 1] << (8 * i);
  181. }
  182. WriteLog(log, "\t\tDatabase info error %u for item %d\n", err, p[0]);
  183. return;
  184. }
  185. default:
  186. WriteLog(log, "\t\tUnexpected info item %d\n", item);
  187. break;
  188. }
  189. p += len;
  190. }
  191. WriteLog(log, "\t\tSuspicious exit from info parse loop\n");
  192. }
  193. void ReplPlugin::setAttachment(IAttachment* attachment)
  194. {
  195. WriteLog(log, "%p\tAssigned attachment %p\n", this, attachment);
  196. att = attachment;
  197. att->addRef();
  198. CheckStatusWrapper ExtStatus(status);
  199. const unsigned char items[] = { fb_info_db_guid };
  200. unsigned char response[80];
  201. att->getInfo(&ExtStatus, sizeof(items), items, sizeof(response), response);
  202. if (status->getState() == 0)
  203. {
  204. dumpInfo(response, sizeof(response));
  205. }
  206. }
  207. IReplicatedTransaction* ReplPlugin::startTransaction(ITransaction* transaction, ISC_INT64 number)
  208. {
  209. WriteLog(log, "%p\tstartTransaction(%p, %lld)\n", this, transaction, number);
  210. return new ReplTransaction(this, transaction, number);
  211. }
  212. FB_BOOLEAN ReplPlugin::cleanupTransaction(ISC_INT64 number)
  213. {
  214. WriteLog(log, "%p\tcleanupTransaction(%lld)\n", this, number);
  215. return FB_TRUE;
  216. }
  217. FB_BOOLEAN ReplPlugin::setSequence(const char* name, ISC_INT64 value)
  218. {
  219. WriteLog(log, "%p\tsetSequence(%s, %lld)\n", this, name, value);
  220. return FB_TRUE;
  221. }
  222. ReplTransaction::ReplTransaction(ReplPlugin* session, ITransaction* transaction, ISC_INT64 number):
  223. parent(session), trans(transaction)
  224. {
  225. parent->addRef(); // Lock parent from disappearing
  226. trans->addRef();
  227. WriteLog(parent->log, "%p\tTransaction started\n", this);
  228. }
  229. ReplTransaction::~ReplTransaction()
  230. {
  231. WriteLog(parent->log, "%p\tTransaction destructed\n", this);
  232. trans->release();
  233. parent->release();
  234. }
  235. void ReplTransaction::dispose()
  236. {
  237. WriteLog(parent->log, "%p\tdispose()\n", this);
  238. delete this;
  239. }
  240. FB_BOOLEAN ReplTransaction::prepare()
  241. {
  242. WriteLog(parent->log, "%p\tprepare()\n", this);
  243. return FB_TRUE;
  244. }
  245. FB_BOOLEAN ReplTransaction::commit()
  246. {
  247. WriteLog(parent->log, "%p\tcommit()\n", this);
  248. return FB_TRUE;
  249. }
  250. FB_BOOLEAN ReplTransaction::rollback()
  251. {
  252. WriteLog(parent->log, "%p\trollback()\n", this);
  253. parent->status->setWarnings(wrn);
  254. return FB_FALSE;
  255. }
  256. FB_BOOLEAN ReplTransaction::startSavepoint()
  257. {
  258. WriteLog(parent->log, "%p\tstartSavepoint()\n", this);
  259. return FB_TRUE;
  260. }
  261. FB_BOOLEAN ReplTransaction::releaseSavepoint()
  262. {
  263. WriteLog(parent->log, "%p\treleaseSavepoint()\n", this);
  264. return FB_TRUE;
  265. }
  266. FB_BOOLEAN ReplTransaction::rollbackSavepoint()
  267. {
  268. WriteLog(parent->log, "%p\trollbackSavepoint()\n", this);
  269. return FB_TRUE;
  270. }
  271. bool ReplTransaction::dumpData(IReplicatedRecord* record)
  272. {
  273. for (unsigned i = 0; i < record->getCount(); i++)
  274. {
  275. IReplicatedField* field = record->getField(i);
  276. if (field == nullptr)
  277. {
  278. WriteLog(parent->log, "\t\tNO FIELD %u FOUND\n", i);
  279. continue;
  280. }
  281. unsigned fieldType = field->getType();
  282. WriteLog(parent->log, "\tfield %u (%s), type %u:\n", i, field->getName(), fieldType);
  283. const void* fieldData = field->getData();
  284. if (fieldData == nullptr)
  285. {
  286. WriteLog(parent->log, "\t\tNULL\n");
  287. }
  288. else
  289. {
  290. switch (fieldType)
  291. {
  292. case SQL_TEXT:
  293. {
  294. unsigned length = field->getLength();
  295. unsigned charSet = field->getCharSet();
  296. if (charSet == 1) // OCTETS
  297. {
  298. WriteLog(parent->log, "\t\tBINARY data length %u: ", length);
  299. const unsigned char* data = reinterpret_cast<const unsigned char*>(fieldData);
  300. for (unsigned j = 0; j < length; j++)
  301. WriteLog(parent->log, "%02u", *data++);
  302. WriteLog(parent->log, "\n");
  303. }
  304. else
  305. WriteLog(parent->log, "\t\tTEXT with charset %u, length %u: \"%.*s\"\n", charSet, length, length, reinterpret_cast<const char*>(fieldData));
  306. break;
  307. }
  308. case SQL_VARYING:
  309. {
  310. unsigned charSet = field->getCharSet();
  311. const paramvary* data = static_cast<const paramvary*>(fieldData);
  312. if (charSet == 1) // OCTETS
  313. {
  314. fprintf(parent->log, "\t\tVARBINARY data length %u: ", data->vary_length);
  315. for (unsigned j = 0; j < data->vary_length; j++)
  316. fprintf(parent->log, "%02u", data->vary_string[j]);
  317. WriteLog(parent->log, "\n");
  318. }
  319. else
  320. WriteLog(parent->log, "\t\tVARCHAR with charset %u, length %u: \"%.*s\"\n", charSet, data->vary_length, data->vary_length, data->vary_string);
  321. break;
  322. }
  323. case SQL_SHORT:
  324. {
  325. WriteLog(parent->log, "\t\tSMALLINT with scale %u: %d\n", field->getScale(), *reinterpret_cast<const int16_t*>(fieldData));
  326. break;
  327. }
  328. case SQL_LONG:
  329. {
  330. int value = *reinterpret_cast<const int32_t*>(fieldData);
  331. WriteLog(parent->log, "\t\tINTEGER with scale %u: %d\n", field->getScale(), value);
  332. if (value == 666)
  333. throw value;
  334. break;
  335. }
  336. case SQL_ARRAY:
  337. {
  338. WriteLog(parent->log, "\t\tARRAY\n");
  339. break;
  340. }
  341. case SQL_BLOB:
  342. {
  343. unsigned subType = field->getSubType();
  344. WriteLog(parent->log, "\t\tBLOB subtype %u:\n", subType);
  345. unsigned charSet = 1;
  346. if (subType == 1)
  347. {
  348. charSet = field->getCharSet();
  349. }
  350. CheckStatusWrapper ExtStatus(parent->status);
  351. ISC_QUAD blobId = *reinterpret_cast<const ISC_QUAD*>(fieldData);
  352. IBlob* blob(parent->att->openBlob(&ExtStatus, trans, &blobId, 0, nullptr));
  353. if (ExtStatus.getState() & IStatus::STATE_ERRORS)
  354. return false;
  355. char buffer[USHRT_MAX];
  356. do
  357. {
  358. unsigned length;
  359. int ret = blob->getSegment(&ExtStatus, sizeof(buffer), buffer, &length);
  360. if (ret == IStatus::RESULT_ERROR)
  361. {
  362. blob->release();
  363. return false;
  364. }
  365. if (ret == IStatus::RESULT_NO_DATA)
  366. break;
  367. if (length > 0)
  368. {
  369. fprintf(parent->log, "\t\t - segment of length %u: ", length);
  370. if (subType != 1 || charSet == 1)
  371. {
  372. for (unsigned j = 0; j < length; j++)
  373. fprintf(parent->log, "%02u", buffer[j]);
  374. WriteLog(parent->log, "\n");
  375. }
  376. else
  377. WriteLog(parent->log, "(charset %u) \"%.*s\"\n", charSet, length, buffer);
  378. }
  379. } while (true);
  380. blob->close(&ExtStatus);
  381. blob->release();
  382. if (ExtStatus.getState() & IStatus::STATE_ERRORS)
  383. return false;
  384. break;
  385. }
  386. case SQL_FLOAT:
  387. {
  388. WriteLog(parent->log, "\t\tFLOAT: %f\n", *reinterpret_cast<const float*>(fieldData));
  389. break;
  390. }
  391. case SQL_DOUBLE:
  392. {
  393. WriteLog(parent->log, "\t\tDOUBLE: %f\n", *reinterpret_cast<const double*>(fieldData));
  394. break;
  395. }
  396. case SQL_TYPE_DATE:
  397. {
  398. ISC_DATE value = *reinterpret_cast<const ISC_DATE*>(fieldData);
  399. IUtil* utl = master->getUtilInterface();
  400. unsigned year, month, day;
  401. utl->decodeDate(value, &year, &month, &day);
  402. WriteLog(parent->log, "\t\tDATE: %04u-%02u-%02u\n", year, month, day);
  403. break;
  404. }
  405. case SQL_TYPE_TIME:
  406. {
  407. ISC_TIME value = *reinterpret_cast<const ISC_TIME*>(fieldData);
  408. IUtil* utl = master->getUtilInterface();
  409. unsigned hours, minutes, seconds, fractions;
  410. utl->decodeTime(value, &hours, &minutes, &seconds, &fractions);
  411. WriteLog(parent->log, "\t\tTIME: %02u:%02u:%02u.%04u\n", hours, minutes, seconds, fractions);
  412. break;
  413. }
  414. case SQL_TIMESTAMP:
  415. {
  416. ISC_TIMESTAMP value = *reinterpret_cast<const ISC_TIMESTAMP*>(fieldData);
  417. IUtil* utl = master->getUtilInterface();
  418. unsigned year, month, day, hours, minutes, seconds, fractions;
  419. utl->decodeDate(value.timestamp_date, &year, &month, &day);
  420. utl->decodeTime(value.timestamp_time, &hours, &minutes, &seconds, &fractions);
  421. WriteLog(parent->log, "\t\tTIMESTAMP: %04u-%02u-%02u %02u:%02u:%02u.%04u\n", year, month, day, hours, minutes, seconds, fractions);
  422. break;
  423. }
  424. case SQL_INT64:
  425. {
  426. WriteLog(parent->log, "\t\tBIGINT with scale %u: %lld\n", field->getScale(), *reinterpret_cast<const int64_t*>(fieldData));
  427. break;
  428. }
  429. case SQL_BOOLEAN:
  430. {
  431. WriteLog(parent->log, "\t\tBOOLEAN: %s\n",
  432. *reinterpret_cast<const FB_BOOLEAN*>(fieldData) == FB_TRUE ?
  433. "true" : "false");
  434. break;
  435. }
  436. case SQL_INT128:
  437. {
  438. char buffer[50];
  439. unsigned scale = field->getScale();
  440. CheckStatusWrapper ExtStatus(parent->status);
  441. master->getUtilInterface()->getInt128(&ExtStatus)->toString(&ExtStatus,
  442. reinterpret_cast<const FB_I128*>(fieldData), scale,
  443. sizeof(buffer), buffer);
  444. WriteLog(parent->log, "\t\tINT128 with scale %u: %s\n", scale, buffer);
  445. break;
  446. }
  447. default:
  448. {
  449. WriteLog(parent->log, "\t\twhatever\n");
  450. }
  451. }
  452. }
  453. }
  454. return true;
  455. }
  456. FB_BOOLEAN ReplTransaction::insertRecord(const char* name, IReplicatedRecord* record)
  457. {
  458. WriteLog(parent->log, "%p\tInsert record into %s\n", this, name);
  459. try
  460. {
  461. return dumpData(record) ? FB_TRUE : FB_FALSE;
  462. }
  463. catch (const int)
  464. {
  465. parent->status->setErrors(err);
  466. return FB_FALSE;
  467. }
  468. }
  469. FB_BOOLEAN ReplTransaction::updateRecord(const char* name, IReplicatedRecord* orgRecord, IReplicatedRecord* newRecord)
  470. {
  471. WriteLog(parent->log, "%p\tUpdate %s\nOldData:\n", this, name);
  472. try
  473. {
  474. if (!dumpData(orgRecord))
  475. return FB_FALSE;
  476. WriteLog(parent->log, "NewData:\n");
  477. return dumpData(newRecord) ? FB_TRUE : FB_FALSE;
  478. }
  479. catch (const int)
  480. {
  481. parent->status->setErrors(err);
  482. return FB_FALSE;
  483. }
  484. }
  485. FB_BOOLEAN ReplTransaction::deleteRecord(const char* name, IReplicatedRecord* record)
  486. {
  487. WriteLog(parent->log, "%p\tDelete from %s\n", this, name);
  488. try
  489. {
  490. return dumpData(record) ? FB_TRUE : FB_FALSE;
  491. }
  492. catch (const int)
  493. {
  494. parent->status->setErrors(err);
  495. return FB_FALSE;
  496. }
  497. }
  498. FB_BOOLEAN ReplTransaction::executeSql(const char* sql)
  499. {
  500. WriteLog(parent->log, "%p\tExecuteSql(%s)\n", this, sql);
  501. return FB_TRUE;
  502. }
  503. FB_BOOLEAN ReplTransaction::executeSqlIntl(unsigned charset, const char* sql)
  504. {
  505. WriteLog(parent->log, "%p\tExecuteSqlIntl(%u, %s)\n", this, charset, sql);
  506. return FB_TRUE;
  507. }