dbcl_api.c 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510
  1. /*
  2. * $Id$
  3. *
  4. * DB CLuster core functions
  5. *
  6. * Copyright (C) 2012 Daniel-Constantin Mierla (asipto.com)
  7. *
  8. * This file is part of Kamailio, a free SIP server.
  9. *
  10. * Kamailio is free software; you can redistribute it and/or modify
  11. * it under the terms of the GNU General Public License as published by
  12. * the Free Software Foundation; either version 2 of the License, or
  13. * (at your option) any later version
  14. *
  15. * Kamailio is distributed in the hope that it will be useful,
  16. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  17. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  18. * GNU General Public License for more details.
  19. *
  20. * You should have received a copy of the GNU General Public License
  21. * along with this program; if not, write to the Free Software
  22. * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
  23. */
  24. /*! \file
  25. * \brief DB_CLUSTER :: Core
  26. * \ingroup db_cluster
  27. * Module: \ref db_cluster
  28. */
  29. #include "../../dprint.h"
  30. #include "../../hashes.h"
  31. #include "../../trim.h"
  32. #include "../../globals.h"
  33. #include "../../lib/srdb1/db.h"
  34. #include "../../timer.h"
  35. #include "dbcl_data.h"
  36. #include "dbcl_api.h"
  37. extern int dbcl_max_query_length;
  38. #define DBCL_READ(qfunc, command) \
  39. do {\
  40. int ret;\
  41. int i;\
  42. int j;\
  43. int k;\
  44. unsigned int sec = 0;\
  45. db1_con_t *dbh=NULL;\
  46. dbcl_cls_t *cls=NULL;\
  47. cls = (dbcl_cls_t*)_h->tail;\
  48. ret = -1;\
  49. for(i=DBCL_PRIO_SIZE-1; i>0; i--)\
  50. {\
  51. if(cls->rlist[i].clen<=0) continue; \
  52. switch(cls->rlist[i].mode) {\
  53. case 's':\
  54. case 'S':\
  55. for(j=0; j<cls->rlist[i].clen; j++)\
  56. {\
  57. if(dbcl_valid_con(cls->rlist[i].clist[j])==0)\
  58. {\
  59. LM_DBG("serial operation - cluster [%.*s] (%d/%d)\n",\
  60. cls->name.len, cls->name.s, i, j);\
  61. sec = get_ticks();\
  62. dbh = cls->rlist[i].clist[j]->dbh;\
  63. if(cls->rlist[i].clist[j]->dbf.qfunc==NULL) {\
  64. LM_ERR("unsupported command by db connector\n");\
  65. return -1;\
  66. }\
  67. ret = cls->rlist[i].clist[j]->dbf.command;\
  68. if (ret==0) {\
  69. cls->usedcon = cls->rlist[i].clist[j];\
  70. return 0;\
  71. } else {\
  72. LM_DBG("serial operation - failre on cluster"\
  73. " [%.*s] (%d/%d)\n",\
  74. cls->name.len, cls->name.s, i, j);\
  75. sec = get_ticks() - sec;\
  76. if(sec >= dbcl_max_query_length){\
  77. dbcl_inactive_con(cls->rlist[i].clist[j]);\
  78. }\
  79. }\
  80. }\
  81. }\
  82. break;\
  83. case 'r':\
  84. case 'R':\
  85. for(k=0; k<cls->rlist[i].clen; k++)\
  86. {\
  87. j = (process_no + k + cls->rlist[i].crt) % cls->rlist[i].clen;\
  88. if(dbcl_valid_con(cls->rlist[i].clist[j])==0)\
  89. {\
  90. LM_DBG("round robin operation - cluster [%.*s] (%d/%d)\n",\
  91. cls->name.len, cls->name.s, i, j);\
  92. sec = get_ticks();\
  93. dbh = cls->rlist[i].clist[j]->dbh;\
  94. if(cls->rlist[i].clist[j]->dbf.qfunc==NULL) {\
  95. LM_ERR("unsupported command by db connector\n");\
  96. return -1;\
  97. }\
  98. ret = cls->rlist[i].clist[j]->dbf.command;\
  99. if (ret==0)\
  100. {\
  101. cls->usedcon = cls->rlist[i].clist[j];\
  102. cls->rlist[i].crt = (j+1) % cls->rlist[i].clen;\
  103. return 0;\
  104. } else {\
  105. LM_DBG("round robin operation - failre on cluster"\
  106. " [%.*s] (%d/%d)\n",\
  107. cls->name.len, cls->name.s, i, j);\
  108. sec = get_ticks() - sec;\
  109. if(sec >= dbcl_max_query_length){\
  110. dbcl_inactive_con(cls->rlist[i].clist[j]);\
  111. }\
  112. }\
  113. }\
  114. }\
  115. break;\
  116. default:\
  117. LM_ERR("invalid mode %c (%d)\n", cls->rlist[i].mode,\
  118. cls->rlist[i].mode);\
  119. return -1;\
  120. }\
  121. }\
  122. LM_DBG("no successful read on cluster [%.*s]\n",\
  123. cls->name.len, cls->name.s);\
  124. return ret;\
  125. } while(0)
  126. #define DBCL_WRITE(qfunc, command) \
  127. do {\
  128. int ret;\
  129. int rc;\
  130. int rok;\
  131. int i;\
  132. int j;\
  133. int k;\
  134. unsigned int sec = 0;\
  135. db1_con_t *dbh=NULL;\
  136. dbcl_cls_t *cls=NULL;\
  137. cls = (dbcl_cls_t*)_h->tail;\
  138. ret = -1;\
  139. rok = 0;\
  140. rc = 0;\
  141. for(i=DBCL_PRIO_SIZE-1; i>0; i--)\
  142. {\
  143. if(cls->wlist[i].clen<=0) continue; \
  144. switch(cls->wlist[i].mode) {\
  145. case 's':\
  146. case 'S':\
  147. for(j=0; j<cls->wlist[i].clen; j++)\
  148. {\
  149. if(dbcl_valid_con(cls->wlist[i].clist[j])==0)\
  150. {\
  151. LM_DBG("serial operation - cluster [%.*s] (%d/%d)\n",\
  152. cls->name.len, cls->name.s, i, j);\
  153. sec = get_ticks();\
  154. dbh = cls->wlist[i].clist[j]->dbh;\
  155. if(cls->rlist[i].clist[j]->dbf.qfunc==NULL) {\
  156. LM_ERR("unsupported command by db connector\n");\
  157. return -1;\
  158. }\
  159. ret = cls->wlist[i].clist[j]->dbf.command;\
  160. if (ret==0) {\
  161. cls->usedcon = cls->wlist[i].clist[j];\
  162. return 0;\
  163. } else {\
  164. LM_DBG("serial operation - failure on cluster"\
  165. " [%.*s] (%d/%d)\n",\
  166. cls->name.len, cls->name.s, i, j);\
  167. sec = get_ticks() - sec;\
  168. if(sec >= dbcl_max_query_length){\
  169. dbcl_inactive_con(cls->wlist[i].clist[j]);\
  170. }\
  171. }\
  172. }\
  173. }\
  174. break;\
  175. case 'r':\
  176. case 'R':\
  177. for(k=0; k<cls->wlist[i].clen; k++)\
  178. {\
  179. j = (process_no + k + cls->wlist[i].crt) % cls->wlist[i].clen;\
  180. if(dbcl_valid_con(cls->wlist[i].clist[j])==0)\
  181. {\
  182. LM_DBG("round robin operation - cluster [%.*s] (%d/%d)\n",\
  183. cls->name.len, cls->name.s, i, j);\
  184. sec = get_ticks();\
  185. dbh = cls->wlist[i].clist[j]->dbh;\
  186. if(cls->rlist[i].clist[j]->dbf.qfunc==NULL) {\
  187. LM_ERR("unsupported command by db connector\n");\
  188. return -1;\
  189. }\
  190. ret = cls->wlist[i].clist[j]->dbf.command;\
  191. if (ret==0)\
  192. {\
  193. cls->usedcon = cls->wlist[i].clist[j];\
  194. cls->wlist[i].crt = (j+1) % cls->wlist[i].clen;\
  195. return 0;\
  196. } else {\
  197. LM_DBG("round robin operation - failure on cluster"\
  198. " [%.*s] (%d/%d)\n",\
  199. cls->name.len, cls->name.s, i, j);\
  200. sec = get_ticks() - sec;\
  201. if(sec >= dbcl_max_query_length){\
  202. dbcl_inactive_con(cls->wlist[i].clist[j]);\
  203. }\
  204. }\
  205. }\
  206. }\
  207. break;\
  208. case 'p':\
  209. case 'P':\
  210. for(j=0; j<cls->wlist[i].clen; j++)\
  211. {\
  212. if(dbcl_valid_con(cls->wlist[i].clist[j])==0)\
  213. {\
  214. LM_DBG("parallel operation - cluster [%.*s] (%d/%d)\n",\
  215. cls->name.len, cls->name.s, i, j);\
  216. sec = get_ticks();\
  217. dbh = cls->wlist[i].clist[j]->dbh;\
  218. if(cls->rlist[i].clist[j]->dbf.qfunc==NULL) {\
  219. LM_ERR("unsupported command by db connector\n");\
  220. return -1;\
  221. }\
  222. rc = cls->wlist[i].clist[j]->dbf.command;\
  223. if(rc==0) {\
  224. cls->usedcon = cls->wlist[i].clist[j];\
  225. rok = 1;\
  226. } else {\
  227. LM_DBG("parallel operation - failure on cluster"\
  228. " [%.*s] (%d/%d)\n",\
  229. cls->name.len, cls->name.s, i, j);\
  230. sec = get_ticks() - sec;\
  231. if(sec >= dbcl_max_query_length){\
  232. dbcl_inactive_con(cls->wlist[i].clist[j]);\
  233. }\
  234. }\
  235. ret |= rc;\
  236. }\
  237. }\
  238. if (rok==1 && cls->wlist[i].clen>0)\
  239. return 0;\
  240. break;\
  241. default:\
  242. LM_ERR("invalid mode %c (%d)\n", cls->rlist[i].mode,\
  243. cls->rlist[i].mode);\
  244. return -1;\
  245. }\
  246. }\
  247. LM_DBG("no successful write on cluster [%.*s]\n",\
  248. cls->name.len, cls->name.s);\
  249. return ret;\
  250. } while(0)
  251. /*! \brief
  252. * Initialize database connection
  253. */
  254. db1_con_t* db_cluster_init(const str* _dburl)
  255. {
  256. db1_con_t *h=NULL;
  257. dbcl_cls_t *cls=NULL;
  258. str name;
  259. LM_DBG("initializing with cluster [%.*s]\n", _dburl->len, _dburl->s);
  260. if(_dburl->len<10 || strncmp(_dburl->s, "cluster://", 10)!=0)
  261. {
  262. LM_ERR("invlaid url for cluster module [%.*s]\n",
  263. _dburl->len, _dburl->s);
  264. return NULL;
  265. }
  266. name.s = _dburl->s + 10;
  267. name.len = _dburl->len - 10;
  268. trim(&name);
  269. cls = dbcl_get_cluster(&name);
  270. if(cls==NULL)
  271. {
  272. LM_ERR("cluster not found [%.*s]\n",
  273. _dburl->len, _dburl->s);
  274. return NULL;
  275. }
  276. if(dbcl_init_dbf(cls)<0)
  277. {
  278. LM_ERR("cluster [%.*s] - unable to bind to DB engines\n",
  279. _dburl->len, _dburl->s);
  280. return NULL;
  281. }
  282. dbcl_init_connections(cls);
  283. cls->ref++;
  284. h = (db1_con_t*)pkg_malloc(sizeof(db1_con_t));
  285. if (h==NULL) {
  286. LM_ERR("out of pkg\n");
  287. return NULL;
  288. }
  289. memset(h, 0, sizeof(db1_con_t));
  290. h->tail = (unsigned long)cls;
  291. return h;
  292. }
  293. /*! \brief
  294. * Close a database connection
  295. */
  296. void db_cluster_close(db1_con_t* _h)
  297. {
  298. dbcl_cls_t *cls=NULL;
  299. LM_DBG("executing db cluster close command\n");
  300. cls = (dbcl_cls_t*)_h->tail;
  301. cls->ref--;
  302. if(cls->ref > 0)
  303. return;
  304. /* close connections */
  305. dbcl_close_connections(cls);
  306. return;
  307. }
  308. /*! \brief
  309. * Free all memory allocated by get_result
  310. */
  311. int db_cluster_free_result(db1_con_t* _h, db1_res_t* _r)
  312. {
  313. dbcl_cls_t *cls=NULL;
  314. LM_DBG("executing db cluster free-result command\n");
  315. cls = (dbcl_cls_t*)_h->tail;
  316. if(cls->usedcon==NULL || cls->usedcon->dbh==NULL)
  317. return -1;
  318. return cls->usedcon->dbf.free_result(cls->usedcon->dbh, _r);
  319. }
  320. /*! \brief
  321. * Do a query
  322. */
  323. int db_cluster_query(const db1_con_t* _h, const db_key_t* _k, const db_op_t* _op,
  324. const db_val_t* _v, const db_key_t* _c, const int _n, const int _nc,
  325. const db_key_t _o, db1_res_t** _r)
  326. {
  327. LM_DBG("executing db cluster query command\n");
  328. DBCL_READ(query, query(dbh, _k, _op, _v, _c, _n, _nc, _o, _r));
  329. }
  330. /*! \brief
  331. * fetch rows from a result
  332. */
  333. int db_cluster_fetch_result(const db1_con_t* _h, db1_res_t** _r, const int nrows)
  334. {
  335. dbcl_cls_t *cls=NULL;
  336. LM_DBG("executing db cluster fetch-result command\n");
  337. cls = (dbcl_cls_t*)_h->tail;
  338. if(cls->usedcon==NULL || cls->usedcon->dbh==NULL
  339. || cls->usedcon->dbf.fetch_result==NULL)
  340. return -1;
  341. return cls->usedcon->dbf.fetch_result(cls->usedcon->dbh, _r, nrows);
  342. }
  343. /*! \brief
  344. * Raw SQL query
  345. */
  346. int db_cluster_raw_query(const db1_con_t* _h, const str* _s, db1_res_t** _r)
  347. {
  348. LM_DBG("executing db cluster raw query command\n");
  349. DBCL_READ(raw_query, raw_query(dbh, _s, _r));
  350. }
  351. /*! \brief
  352. * Insert a row into table
  353. */
  354. int db_cluster_insert(const db1_con_t* _h, const db_key_t* _k, const db_val_t* _v, const int _n)
  355. {
  356. LM_DBG("executing db cluster insert command\n");
  357. DBCL_WRITE(insert, insert(dbh, _k, _v, _n));
  358. }
  359. /*! \brief
  360. * Delete a row from table
  361. */
  362. int db_cluster_delete(const db1_con_t* _h, const db_key_t* _k, const
  363. db_op_t* _o, const db_val_t* _v, const int _n)
  364. {
  365. LM_DBG("executing db cluster delete command\n");
  366. DBCL_WRITE(delete, delete(dbh, _k, _o, _v, _n));
  367. }
  368. /*! \brief
  369. * Update a row in table
  370. */
  371. int db_cluster_update(const db1_con_t* _h, const db_key_t* _k, const db_op_t* _o,
  372. const db_val_t* _v, const db_key_t* _uk, const db_val_t* _uv, const int _n,
  373. const int _un)
  374. {
  375. LM_DBG("executing db cluster update command\n");
  376. DBCL_WRITE(update, update(dbh, _k, _o, _v, _uk, _uv, _n, _un));
  377. }
  378. /*! \brief
  379. * Just like insert, but replace the row if it exists
  380. */
  381. int db_cluster_replace(const db1_con_t* _h, const db_key_t* _k,
  382. const db_val_t* _v, const int _n, const int _un, const int _m)
  383. {
  384. LM_DBG("executing db cluster replace command\n");
  385. DBCL_WRITE(replace, replace(dbh, _k, _v, _n, _un, _m));
  386. }
  387. /*! \brief
  388. * Returns the last inserted ID
  389. */
  390. int db_cluster_last_inserted_id(const db1_con_t* _h)
  391. {
  392. dbcl_cls_t *cls=NULL;
  393. LM_DBG("executing db cluster last inserted id command\n");
  394. cls = (dbcl_cls_t*)_h->tail;
  395. if(cls->usedcon==NULL || cls->usedcon->dbh==NULL
  396. || cls->usedcon->dbf.last_inserted_id==NULL)
  397. return -1;
  398. return cls->usedcon->dbf.last_inserted_id(cls->usedcon->dbh);
  399. }
  400. /*! \brief
  401. * Returns number of affected rows for last query
  402. */
  403. int db_cluster_affected_rows(const db1_con_t* _h)
  404. {
  405. dbcl_cls_t *cls=NULL;
  406. LM_DBG("executing db cluster affected-rows command\n");
  407. cls = (dbcl_cls_t*)_h->tail;
  408. if(cls->usedcon==NULL || cls->usedcon->dbh==NULL
  409. || cls->usedcon->dbf.affected_rows==NULL)
  410. return -1;
  411. return cls->usedcon->dbf.affected_rows(cls->usedcon->dbh);
  412. }
  413. /*! \brief
  414. * Insert a row into table, update on duplicate key
  415. */
  416. int db_cluster_insert_update(const db1_con_t* _h, const db_key_t* _k, const db_val_t* _v,
  417. const int _n)
  418. {
  419. LM_DBG("executing db cluster insert-update command\n");
  420. DBCL_WRITE(insert_update, insert_update(dbh, _k, _v, _n));
  421. }
  422. /*! \brief
  423. * Insert a row into table
  424. */
  425. int db_cluster_insert_delayed(const db1_con_t* _h, const db_key_t* _k,
  426. const db_val_t* _v, const int _n)
  427. {
  428. LM_DBG("executing db cluster insert delayed command\n");
  429. DBCL_WRITE(insert_delayed, insert_delayed(dbh, _k, _v, _n));
  430. }
  431. /*! \brief
  432. * Store name of table that will be used by
  433. * subsequent database functions
  434. */
  435. int db_cluster_use_table(db1_con_t* _h, const str* _t)
  436. {
  437. int i;
  438. int j;
  439. int ret;
  440. dbcl_cls_t *cls=NULL;
  441. cls = (dbcl_cls_t*)_h->tail;
  442. ret = 0;
  443. LM_DBG("use table (%.*s) - cluster [%.*s]\n",
  444. _t->len, _t->s, cls->name.len, cls->name.s);
  445. for(i=DBCL_PRIO_SIZE-1; i>0; i--)
  446. {
  447. for(j=0; j<cls->rlist[i].clen; j++)
  448. {
  449. if(cls->rlist[i].clist[j] != NULL && cls->rlist[i].clist[j]->flags!=0
  450. && cls->rlist[i].clist[j]->dbh != NULL)
  451. {
  452. LM_DBG("set read table (%.*s) - cluster [%.*s] (%d/%d)\n",
  453. _t->len, _t->s, cls->name.len, cls->name.s, i, j);
  454. ret |= cls->rlist[i].clist[j]->dbf.use_table(cls->rlist[i].clist[j]->dbh, _t);
  455. }
  456. }
  457. for(j=0; j<cls->wlist[i].clen; j++)
  458. {
  459. if(cls->wlist[i].clist[j] != NULL && cls->wlist[i].clist[j]->flags!=0
  460. && cls->wlist[i].clist[j]->dbh != NULL)
  461. {
  462. LM_DBG("set write table (%.*s) - cluster [%.*s] (%d/%d)\n",
  463. _t->len, _t->s, cls->name.len, cls->name.s, i, j);
  464. ret |= cls->wlist[i].clist[j]->dbf.use_table(cls->wlist[i].clist[j]->dbh, _t);
  465. }
  466. }
  467. }
  468. return ret;
  469. }