redis_client.c 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517
  1. /**
  2. * $Id$
  3. *
  4. * Copyright (C) 2011 Daniel-Constantin Mierla (asipto.com)
  5. *
  6. * This file is part of Kamailio, a free SIP server.
  7. *
  8. * Kamailio is free software; you can redistribute it and/or modify
  9. * it under the terms of the GNU General Public License as published by
  10. * the Free Software Foundation; either version 2 of the License, or
  11. * (at your option) any later version
  12. *
  13. * Kamailio is distributed in the hope that it will be useful,
  14. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  15. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  16. * GNU General Public License for more details.
  17. *
  18. * You should have received a copy of the GNU General Public License
  19. * along with this program; if not, write to the Free Software
  20. * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
  21. *
  22. */
  23. #include <stdio.h>
  24. #include <unistd.h>
  25. #include <stdlib.h>
  26. #include <string.h>
  27. #include <sys/time.h>
  28. #include <stdarg.h>
  29. #include "../../mem/mem.h"
  30. #include "../../dprint.h"
  31. #include "../../hashes.h"
  32. #include "../../ut.h"
  33. #include "redis_client.h"
  34. #define redisCommandNR(a...) (int)({ void *__tmp; __tmp = redisCommand(a); if (__tmp) freeReplyObject(__tmp); __tmp ? 0 : -1;})
  35. static redisc_server_t *_redisc_srv_list=NULL;
  36. static redisc_reply_t *_redisc_rpl_list=NULL;
  37. /**
  38. *
  39. */
  40. int redisc_init(void)
  41. {
  42. char *addr, *unix_sock_path = NULL;
  43. unsigned int port, db;
  44. redisc_server_t *rsrv=NULL;
  45. param_t *pit = NULL;
  46. struct timeval tv;
  47. tv.tv_sec = 1;
  48. tv.tv_usec = 0;
  49. if(_redisc_srv_list==NULL)
  50. {
  51. LM_ERR("no redis servers defined\n");
  52. return -1;
  53. }
  54. for(rsrv=_redisc_srv_list; rsrv; rsrv=rsrv->next)
  55. {
  56. addr = "127.0.0.1";
  57. port = 6379;
  58. db = 0;
  59. for (pit = rsrv->attrs; pit; pit=pit->next)
  60. {
  61. if(pit->name.len==4 && strncmp(pit->name.s, "unix", 4)==0) {
  62. unix_sock_path = pit->body.s;
  63. unix_sock_path[pit->body.len] = '\0';
  64. } else if(pit->name.len==4 && strncmp(pit->name.s, "addr", 4)==0) {
  65. addr = pit->body.s;
  66. addr[pit->body.len] = '\0';
  67. } else if(pit->name.len==4 && strncmp(pit->name.s, "port", 4)==0) {
  68. if(str2int(&pit->body, &port) < 0)
  69. port = 6379;
  70. } else if(pit->name.len==2 && strncmp(pit->name.s, "db", 2)==0) {
  71. if(str2int(&pit->body, &db) < 0)
  72. db = 0;
  73. }
  74. }
  75. if(unix_sock_path != NULL) {
  76. LM_DBG("Connecting to unix socket: %s\n", unix_sock_path);
  77. rsrv->ctxRedis = redisConnectUnixWithTimeout(unix_sock_path, tv);
  78. } else {
  79. rsrv->ctxRedis = redisConnectWithTimeout(addr, port, tv);
  80. }
  81. if(!rsrv->ctxRedis)
  82. goto err;
  83. if (rsrv->ctxRedis->err)
  84. goto err2;
  85. if (redisCommandNR(rsrv->ctxRedis, "PING"))
  86. goto err2;
  87. if (redisCommandNR(rsrv->ctxRedis, "SELECT %i", db))
  88. goto err2;
  89. }
  90. return 0;
  91. err2:
  92. if (unix_sock_path != NULL) {
  93. LM_ERR("error communicating with redis server [%.*s] (unix:%s db:%d): %s\n",
  94. rsrv->sname->len, rsrv->sname->s, unix_sock_path, db, rsrv->ctxRedis->errstr);
  95. } else {
  96. LM_ERR("error communicating with redis server [%.*s] (%s:%d/%d): %s\n",
  97. rsrv->sname->len, rsrv->sname->s, addr, port, db, rsrv->ctxRedis->errstr);
  98. }
  99. return -1;
  100. err:
  101. if (unix_sock_path != NULL) {
  102. LM_ERR("failed to connect to redis server [%.*s] (unix:%s db:%d)\n",
  103. rsrv->sname->len, rsrv->sname->s, unix_sock_path, db);
  104. } else {
  105. LM_ERR("failed to connect to redis server [%.*s] (%s:%d/%d)\n",
  106. rsrv->sname->len, rsrv->sname->s, addr, port, db);
  107. }
  108. return -1;
  109. }
  110. /**
  111. *
  112. */
  113. int redisc_destroy(void)
  114. {
  115. redisc_reply_t *rpl, *next_rpl;
  116. redisc_server_t *rsrv=NULL;
  117. redisc_server_t *rsrv1=NULL;
  118. rpl = _redisc_rpl_list;
  119. while(rpl != NULL)
  120. {
  121. next_rpl = rpl->next;
  122. if(rpl->rplRedis)
  123. freeReplyObject(rpl->rplRedis);
  124. if(rpl->rname.s != NULL)
  125. pkg_free(rpl->rname.s);
  126. pkg_free(rpl);
  127. rpl = next_rpl;
  128. }
  129. _redisc_rpl_list = NULL;
  130. if(_redisc_srv_list==NULL)
  131. return -1;
  132. rsrv=_redisc_srv_list;
  133. while(rsrv!=NULL)
  134. {
  135. rsrv1 = rsrv;
  136. rsrv=rsrv->next;
  137. if(rsrv1->ctxRedis!=NULL)
  138. redisFree(rsrv1->ctxRedis);
  139. free_params(rsrv1->attrs);
  140. pkg_free(rsrv1);
  141. }
  142. _redisc_srv_list = NULL;
  143. return 0;
  144. }
  145. /**
  146. *
  147. */
  148. int redisc_add_server(char *spec)
  149. {
  150. param_t *pit=NULL;
  151. param_hooks_t phooks;
  152. redisc_server_t *rsrv=NULL;
  153. str s;
  154. s.s = spec;
  155. s.len = strlen(spec);
  156. if(s.s[s.len-1]==';')
  157. s.len--;
  158. if (parse_params(&s, CLASS_ANY, &phooks, &pit)<0)
  159. {
  160. LM_ERR("failed parsing params value\n");
  161. goto error;
  162. }
  163. rsrv = (redisc_server_t*)pkg_malloc(sizeof(redisc_server_t));
  164. if(rsrv==NULL)
  165. {
  166. LM_ERR("no more pkg\n");
  167. goto error;
  168. }
  169. memset(rsrv, 0, sizeof(redisc_server_t));
  170. rsrv->attrs = pit;
  171. for (pit = rsrv->attrs; pit; pit=pit->next)
  172. {
  173. if(pit->name.len==4 && strncmp(pit->name.s, "name", 4)==0) {
  174. rsrv->sname = &pit->body;
  175. rsrv->hname = get_hash1_raw(rsrv->sname->s, rsrv->sname->len);
  176. break;
  177. }
  178. }
  179. if(rsrv->sname==NULL)
  180. {
  181. LM_ERR("no server name\n");
  182. goto error;
  183. }
  184. rsrv->next = _redisc_srv_list;
  185. _redisc_srv_list = rsrv;
  186. return 0;
  187. error:
  188. if(pit!=NULL)
  189. free_params(pit);
  190. if(rsrv!=NULL)
  191. pkg_free(rsrv);
  192. return -1;
  193. }
  194. /**
  195. *
  196. */
  197. redisc_server_t *redisc_get_server(str *name)
  198. {
  199. redisc_server_t *rsrv=NULL;
  200. unsigned int hname;
  201. hname = get_hash1_raw(name->s, name->len);
  202. rsrv=_redisc_srv_list;
  203. while(rsrv!=NULL)
  204. {
  205. if(rsrv->hname==hname && rsrv->sname->len==name->len
  206. && strncmp(rsrv->sname->s, name->s, name->len)==0)
  207. return rsrv;
  208. rsrv=rsrv->next;
  209. }
  210. return NULL;
  211. }
  212. /**
  213. *
  214. */
  215. int redisc_reconnect_server(redisc_server_t *rsrv)
  216. {
  217. char *addr, *unix_sock_path = NULL;
  218. unsigned int port, db;
  219. param_t *pit = NULL;
  220. struct timeval tv;
  221. tv.tv_sec = 1;
  222. tv.tv_usec = 0;
  223. addr = "127.0.0.1";
  224. port = 6379;
  225. db = 0;
  226. for (pit = rsrv->attrs; pit; pit=pit->next)
  227. {
  228. if(pit->name.len==4 && strncmp(pit->name.s, "unix", 4)==0) {
  229. unix_sock_path = pit->body.s;
  230. unix_sock_path[pit->body.len] = '\0';
  231. } else if(pit->name.len==4 && strncmp(pit->name.s, "addr", 4)==0) {
  232. addr = pit->body.s;
  233. addr[pit->body.len] = '\0';
  234. } else if(pit->name.len==4 && strncmp(pit->name.s, "port", 4)==0) {
  235. if(str2int(&pit->body, &port) < 0)
  236. port = 6379;
  237. } else if(pit->name.len==2 && strncmp(pit->name.s, "db", 2)==0) {
  238. if(str2int(&pit->body, &db) < 0)
  239. db = 0;
  240. }
  241. }
  242. if(rsrv->ctxRedis!=NULL) {
  243. redisFree(rsrv->ctxRedis);
  244. rsrv->ctxRedis = NULL;
  245. }
  246. if(unix_sock_path != NULL) {
  247. rsrv->ctxRedis = redisConnectUnixWithTimeout(unix_sock_path, tv);
  248. } else {
  249. rsrv->ctxRedis = redisConnectWithTimeout(addr, port, tv);
  250. }
  251. if(!rsrv->ctxRedis)
  252. goto err;
  253. if (rsrv->ctxRedis->err)
  254. goto err2;
  255. if (redisCommandNR(rsrv->ctxRedis, "PING"))
  256. goto err2;
  257. if (redisCommandNR(rsrv->ctxRedis, "SELECT %i", db))
  258. goto err2;
  259. return 0;
  260. err2:
  261. if (unix_sock_path != NULL) {
  262. LM_ERR("error communicating with redis server [%.*s] (unix:%s db:%d): %s\n",
  263. rsrv->sname->len, rsrv->sname->s, unix_sock_path, db, rsrv->ctxRedis->errstr);
  264. } else {
  265. LM_ERR("error communicating with redis server [%.*s] (%s:%d/%d): %s\n",
  266. rsrv->sname->len, rsrv->sname->s, addr, port, db, rsrv->ctxRedis->errstr);
  267. }
  268. err:
  269. if (unix_sock_path != NULL) {
  270. LM_ERR("failed to connect to redis server [%.*s] (unix:%s db:%d)\n",
  271. rsrv->sname->len, rsrv->sname->s, unix_sock_path, db);
  272. } else {
  273. LM_ERR("failed to connect to redis server [%.*s] (%s:%d/%d)\n",
  274. rsrv->sname->len, rsrv->sname->s, addr, port, db);
  275. }
  276. return -1;
  277. }
  278. /**
  279. *
  280. */
  281. int redisc_exec(str *srv, str *res, str *cmd, ...)
  282. {
  283. redisc_server_t *rsrv=NULL;
  284. redisc_reply_t *rpl;
  285. char c;
  286. va_list ap, ap2;
  287. va_start(ap, cmd);
  288. va_copy(ap2, ap);
  289. rsrv = redisc_get_server(srv);
  290. if(srv==NULL || cmd==NULL || res==NULL)
  291. {
  292. LM_ERR("invalid parameters");
  293. goto error_exec;
  294. }
  295. if(srv->len==0 || res->len==0 || cmd->len==0)
  296. {
  297. LM_ERR("invalid parameters");
  298. goto error_exec;
  299. }
  300. if(rsrv==NULL)
  301. {
  302. LM_ERR("no redis server found: %.*s\n", srv->len, srv->s);
  303. goto error_exec;
  304. }
  305. if(rsrv->ctxRedis==NULL)
  306. {
  307. LM_ERR("no redis context for server: %.*s\n", srv->len, srv->s);
  308. goto error_exec;
  309. }
  310. rpl = redisc_get_reply(res);
  311. if(rpl==NULL)
  312. {
  313. LM_ERR("no redis reply id found: %.*s\n", res->len, res->s);
  314. goto error_exec;
  315. }
  316. if(rpl->rplRedis!=NULL)
  317. {
  318. /* clean up previous redis reply */
  319. freeReplyObject(rpl->rplRedis);
  320. rpl->rplRedis = NULL;
  321. }
  322. c = cmd->s[cmd->len];
  323. cmd->s[cmd->len] = '\0';
  324. rpl->rplRedis = redisvCommand(rsrv->ctxRedis, cmd->s, ap );
  325. if(rpl->rplRedis == NULL)
  326. {
  327. /* null reply, reconnect and try again */
  328. if(rsrv->ctxRedis->err)
  329. {
  330. LM_ERR("Redis error: %s\n", rsrv->ctxRedis->errstr);
  331. }
  332. if(redisc_reconnect_server(rsrv)==0)
  333. {
  334. rpl->rplRedis = redisvCommand(rsrv->ctxRedis, cmd->s, ap2);
  335. } else {
  336. LM_ERR("unable to reconnect to redis server: %.*s\n", srv->len, srv->s);
  337. cmd->s[cmd->len] = c;
  338. goto error_exec;
  339. }
  340. }
  341. cmd->s[cmd->len] = c;
  342. va_end(ap);
  343. va_end(ap2);
  344. return 0;
  345. error_exec:
  346. va_end(ap);
  347. va_end(ap2);
  348. return -1;
  349. }
  350. /**
  351. * Executes a redis command.
  352. * Command is coded using a vector of strings, and a vector of lenghts.
  353. *
  354. * @param rsrv Pointer to a redis_server_t structure.
  355. * @param argc number of elements in the command vector.
  356. * @param argv vector of zero terminated strings forming the command.
  357. * @param argvlen vector of command string lenghts or NULL.
  358. * @return redisReply structure or NULL if there was an error.
  359. */
  360. void * redisc_exec_argv(redisc_server_t *rsrv, int argc, const char **argv, const size_t *argvlen)
  361. {
  362. redisReply *res=NULL;
  363. if(rsrv==NULL || rsrv->ctxRedis==NULL)
  364. {
  365. LM_ERR("no redis context found for server %.*s\n",
  366. rsrv->sname->len, rsrv->sname->s);
  367. return NULL;
  368. }
  369. if(argc<=0)
  370. {
  371. LM_ERR("invalid parameters\n");
  372. return NULL;
  373. }
  374. if(argv==NULL || *argv==NULL)
  375. {
  376. LM_ERR("invalid parameters\n");
  377. return NULL;
  378. }
  379. res = redisCommandArgv(rsrv->ctxRedis, argc, argv, argvlen);
  380. if(res)
  381. {
  382. return res;
  383. }
  384. /* null reply, reconnect and try again */
  385. if(rsrv->ctxRedis->err)
  386. {
  387. LM_ERR("Redis error: %s\n", rsrv->ctxRedis->errstr);
  388. }
  389. if(redisc_reconnect_server(rsrv)==0)
  390. {
  391. res = redisCommandArgv(rsrv->ctxRedis, argc, argv, argvlen);
  392. }
  393. else
  394. {
  395. LM_ERR("Unable to reconnect to server: %.*s\n",
  396. rsrv->sname->len, rsrv->sname->s);
  397. return NULL;
  398. }
  399. return res;
  400. }
  401. /**
  402. *
  403. */
  404. redisc_reply_t *redisc_get_reply(str *name)
  405. {
  406. redisc_reply_t *rpl;
  407. unsigned int hid;
  408. hid = get_hash1_raw(name->s, name->len);
  409. for(rpl=_redisc_rpl_list; rpl; rpl=rpl->next) {
  410. if(rpl->hname==hid && rpl->rname.len==name->len
  411. && strncmp(rpl->rname.s, name->s, name->len)==0)
  412. return rpl;
  413. }
  414. /* not found - add a new one */
  415. rpl = (redisc_reply_t*)pkg_malloc(sizeof(redisc_reply_t));
  416. if(rpl==NULL)
  417. {
  418. LM_ERR("no more pkg\n");
  419. return NULL;
  420. }
  421. memset(rpl, 0, sizeof(redisc_reply_t));
  422. rpl->hname = hid;
  423. rpl->rname.s = (char*)pkg_malloc(name->len+1);
  424. if(rpl->rname.s==NULL)
  425. {
  426. LM_ERR("no more pkg.\n");
  427. pkg_free(rpl);
  428. return NULL;
  429. }
  430. strncpy(rpl->rname.s, name->s, name->len);
  431. rpl->rname.len = name->len;
  432. rpl->rname.s[name->len] = '\0';
  433. rpl->next = _redisc_rpl_list;
  434. _redisc_rpl_list = rpl;
  435. return rpl;
  436. }
  437. /**
  438. *
  439. */
  440. int redisc_free_reply(str *name)
  441. {
  442. redisc_reply_t *rpl;
  443. unsigned int hid;
  444. if(name==NULL || name->len==0) {
  445. LM_ERR("invalid parameters");
  446. return -1;
  447. }
  448. hid = get_hash1_raw(name->s, name->len);
  449. rpl = _redisc_rpl_list;
  450. while(rpl) {
  451. if(rpl->hname==hid && rpl->rname.len==name->len
  452. && strncmp(rpl->rname.s, name->s, name->len)==0) {
  453. if(rpl->rplRedis) {
  454. freeReplyObject(rpl->rplRedis);
  455. rpl->rplRedis = NULL;
  456. }
  457. return 0;
  458. }
  459. rpl = rpl->next;
  460. }
  461. /* reply entry not found. */
  462. return -1;
  463. }