cnxcc_redis.c 15 KB


  1. /*
  2. * Copyright (C) 2014 Carlos Ruiz Díaz (caruizdiaz.com),
  3. * ConexionGroup (www.conexiongroup.com)
  4. *
  5. * This file is part of Kamailio, a free SIP server.
  6. *
  7. * Kamailio is free software; you can redistribute it and/or modify
  8. * it under the terms of the GNU General Public License as published by
  9. * the Free Software Foundation; either version 2 of the License, or
  10. * (at your option) any later version
  11. *
  12. * Kamailio is distributed in the hope that it will be useful,
  13. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  14. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  15. * GNU General Public License for more details.
  16. *
  17. * You should have received a copy of the GNU General Public License
  18. * along with this program; if not, write to the Free Software
  19. * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
  20. *
  21. */
  22. #include <stdlib.h>
  23. #include "cnxcc_redis.h"
  24. #include "cnxcc_mod.h"
  25. #define DEFAULT_EXPIRE_SECS 70
  26. extern data_t _data;
  27. static int __redis_select_db(redisContext *ctxt, int db);
  28. static int __redis_exec(
  29. credit_data_t *credit_data, const char *cmd, redisReply **rpl);
  30. static struct redis *__redis_connect(struct redis *redis);
  31. static void __async_connect_cb(const redisAsyncContext *c, int status);
  32. static void __async_disconnect_cb(const redisAsyncContext *c, int status);
  33. static void __subscription_cb(redisAsyncContext *c, void *r, void *privdata);
  34. static struct redis *__redis_connect_async(struct redis *redis);
  35. static struct redis *__alloc_redis(char *ip, int port, int db);
  36. static void __redis_subscribe_to_kill_list(struct redis *redis);
  37. static inline const char *__get_table_name(credit_type_t type)
  38. {
  39. switch(type) {
  40. case CREDIT_MONEY:
  41. return "money";
  42. break;
  43. case CREDIT_TIME:
  44. return "time";
  45. break;
  46. case CREDIT_CHANNEL:
  47. return "channel";
  48. break;
  49. default:
  50. LM_ERR("BUG: Something went terribly wrong: invalid credit type\n");
  51. return NULL;
  52. }
  53. }
  54. int redis_get_or_create_credit_data(credit_data_t *credit_data)
  55. {
  56. int exists = 0;
  57. // concurrent_calls is just a dummy key. It can be any of the valid keys
  58. if(redis_get_int(credit_data, "HEXISTS", "concurrent_calls", &exists) < 0)
  59. goto error;
  60. if(!exists) { // doesn't exist
  61. LM_DBG("credit_data with ID=[%s] DOES NOT exist in the cluster, "
  62. "creating it...\n",
  63. credit_data->str_id);
  64. return redis_insert_credit_data(credit_data);
  65. }
  66. LM_DBG("credit_data with ID=[%s] DOES exist in the cluster, retrieving "
  67. "it...\n",
  68. credit_data->str_id);
  69. if(redis_get_double(credit_data, "HGET", "consumed_amount",
  70. &credit_data->consumed_amount)
  71. < 0)
  72. goto error;
  73. if(redis_get_double(credit_data, "HGET", "ended_calls_consumed_amount",
  74. &credit_data->ended_calls_consumed_amount)
  75. < 0)
  76. goto error;
  77. if(redis_get_double(
  78. credit_data, "HGET", "max_amount", &credit_data->max_amount)
  79. < 0)
  80. goto error;
  81. if(redis_get_int(credit_data, "HGET", "type", (int *)&credit_data->type)
  82. < 0)
  83. goto error;
  84. return 1;
  85. error:
  86. return -1;
  87. }
  88. int redis_insert_credit_data(credit_data_t *credit_data)
  89. {
  90. LM_DBG("Inserting credit_data_t using ID [%s]\n", credit_data->str_id);
  91. if(redis_insert_int_value(
  92. credit_data, "concurrent_calls", credit_data->concurrent_calls)
  93. < 0)
  94. goto error;
  95. if(redis_insert_double_value(
  96. credit_data, "consumed_amount", credit_data->consumed_amount)
  97. < 0)
  98. goto error;
  99. if(redis_insert_double_value(credit_data, "ended_calls_consumed_amount",
  100. credit_data->ended_calls_consumed_amount)
  101. < 0)
  102. goto error;
  103. if(redis_insert_double_value(
  104. credit_data, "max_amount", credit_data->max_amount)
  105. < 0)
  106. goto error;
  107. if(redis_insert_int_value(
  108. credit_data, "number_of_calls", credit_data->number_of_calls)
  109. < 0)
  110. goto error;
  111. if(redis_insert_int_value(credit_data, "type", credit_data->type) < 0)
  112. goto error;
  113. // make sure when don't have any leftover member on the kill list for this new entry
  114. if(redis_remove_kill_list_member(credit_data) < 0)
  115. goto error;
  116. return 1;
  117. error:
  118. return -1;
  119. }
  120. static struct redis *__alloc_redis(char *ip, int port, int db)
  121. {
  122. struct redis *redis = pkg_malloc(sizeof(struct redis));
  123. if(!redis) {
  124. PKG_MEM_ERROR;
  125. return NULL;
  126. }
  127. int len = strlen(ip);
  128. redis->ip = pkg_malloc(len + 1);
  129. if(!redis->ip) {
  130. PKG_MEM_ERROR;
  131. pkg_free(redis);
  132. return NULL;
  133. }
  134. strcpy(redis->ip, ip);
  135. redis->port = port;
  136. redis->db = db;
  137. redis->ctxt = NULL;
  138. return redis;
  139. }
  140. struct redis *redis_connect_all(char *ip, int port, int db)
  141. {
  142. return __redis_connect_async(__redis_connect(__alloc_redis(ip, port, db)));
  143. }
  144. struct redis *redis_connect(char *ip, int port, int db)
  145. {
  146. return __redis_connect(__alloc_redis(ip, port, db));
  147. }
  148. struct redis *redis_connect_async(char *ip, int port, int db)
  149. {
  150. return __redis_connect_async(__alloc_redis(ip, port, db));
  151. }
  152. static struct redis *__redis_connect_async(struct redis *redis)
  153. {
  154. redis->eb = event_base_new();
  155. LM_INFO("Connecting (ASYNC) to Redis at %s:%d\n", redis->ip, redis->port);
  156. redis->async_ctxt = redisAsyncConnect(redis->ip, redis->port);
  157. if(redis->async_ctxt->err) {
  158. LM_ERR("%s\n", redis->async_ctxt->errstr);
  159. return NULL;
  160. }
  161. redisLibeventAttach(redis->async_ctxt, redis->eb);
  162. redisAsyncSetConnectCallback(redis->async_ctxt, __async_connect_cb);
  163. redisAsyncSetDisconnectCallback(redis->async_ctxt, __async_disconnect_cb);
  164. redisAsyncCommand(redis->async_ctxt, NULL, NULL, "SELECT %d", redis->db);
  165. __redis_subscribe_to_kill_list(redis);
  166. event_base_dispatch(redis->eb);
  167. return redis;
  168. }
  169. static struct redis *__redis_connect(struct redis *redis)
  170. {
  171. struct timeval timeout = {1, 500000}; // 1.5 seconds
  172. LM_INFO("Connecting to Redis at %s:%d\n", redis->ip, redis->port);
  173. if(redis->ctxt)
  174. redisFree(redis->ctxt);
  175. redis->ctxt = redisConnectWithTimeout(redis->ip, redis->port, timeout);
  176. if(redis->ctxt == NULL || redis->ctxt->err) {
  177. if(!redis->ctxt)
  178. LM_ERR("Connection error: can't allocate Redis context\n");
  179. else {
  180. LM_ERR("Connection error: %s\n", redis->ctxt->errstr);
  181. redisFree(redis->ctxt);
  182. }
  183. return NULL;
  184. }
  185. if(!__redis_select_db(redis->ctxt, redis->db))
  186. return NULL;
  187. return redis;
  188. }
  189. static int __redis_select_db(redisContext *ctxt, int db)
  190. {
  191. redisReply *rpl = redisCommand(ctxt, "SELECT %d", db);
  192. if(!rpl || rpl->type == REDIS_REPLY_ERROR) {
  193. if(!rpl)
  194. LM_ERR("%s\n", ctxt->errstr);
  195. else {
  196. LM_ERR("%.*s\n", (int)rpl->len, rpl->str);
  197. freeReplyObject(rpl);
  198. }
  199. return -1;
  200. }
  201. return 1;
  202. }
  203. static int __redis_exec(
  204. credit_data_t *credit_data, const char *cmd, redisReply **rpl)
  205. {
  206. redisReply *rpl_aux = NULL;
  207. char cmd_buffer[1024];
  208. *rpl = redisCommand(_data.redis->ctxt, cmd);
  209. if(!(*rpl) || (*rpl)->type == REDIS_REPLY_ERROR) {
  210. if(!*rpl)
  211. LM_ERR("%s\n", _data.redis->ctxt->errstr);
  212. else {
  213. LM_ERR("%.*s\n", (int)(*rpl)->len, (*rpl)->str);
  214. freeReplyObject(*rpl);
  215. }
  216. // reconnect on error
  217. __redis_connect(_data.redis);
  218. return -1;
  219. }
  220. if(credit_data == NULL) {
  221. freeReplyObject(*rpl);
  222. return 1;
  223. }
  224. // this will update the TTL of the key to DEFAULT_EXPIRE_SECS for every r/w.
  225. // It will guarantee us that if a server crashes, the key will automatically disappear
  226. // from Redis if no other client is updating the key, leaving us with some level of
  227. // consistency
  228. snprintf(cmd_buffer, sizeof(cmd_buffer), "EXPIRE cnxcc:%s:%s %d",
  229. __get_table_name(credit_data->type), credit_data->str_id,
  230. DEFAULT_EXPIRE_SECS);
  231. return __redis_exec(NULL, cmd_buffer, &rpl_aux);
  232. }
  233. int redis_incr_by_double(
  234. credit_data_t *credit_data, const char *key, double value)
  235. {
  236. redisReply *rpl = NULL;
  237. int ret = -1;
  238. char cmd_buffer[1024];
  239. snprintf(cmd_buffer, sizeof(cmd_buffer), "HINCRBYFLOAT cnxcc:%s:%s %s %f",
  240. __get_table_name(credit_data->type), credit_data->str_id, key,
  241. value);
  242. ret = __redis_exec(credit_data, cmd_buffer, &rpl);
  243. if(ret > 0)
  244. freeReplyObject(rpl);
  245. return ret;
  246. }
  247. int redis_get_double(credit_data_t *credit_data, const char *instruction,
  248. const char *key, double *value)
  249. {
  250. str str_double = STR_NULL;
  251. char buffer[128];
  252. if(redis_get_str(credit_data, instruction, key, &str_double) < 0)
  253. return -1;
  254. snprintf(buffer, sizeof(buffer), "%.*s", str_double.len, str_double.s);
  255. *value = atof(buffer);
  256. LM_DBG("Got DOUBLE value: %s=%f\n", key, *value);
  257. pkg_free(str_double.s);
  258. return 1;
  259. }
  260. int redis_incr_by_int(credit_data_t *credit_data, const char *key, int value)
  261. {
  262. redisReply *rpl = NULL;
  263. int ret = -1;
  264. char cmd_buffer[1024];
  265. snprintf(cmd_buffer, sizeof(cmd_buffer), "HINCRBY cnxcc:%s:%s %s %d",
  266. __get_table_name(credit_data->type), credit_data->str_id, key,
  267. value);
  268. ret = __redis_exec(credit_data, cmd_buffer, &rpl);
  269. if(ret > 0)
  270. freeReplyObject(rpl);
  271. return ret;
  272. }
  273. int redis_get_int(credit_data_t *credit_data, const char *instruction,
  274. const char *key, int *value)
  275. {
  276. redisReply *rpl = NULL;
  277. char cmd_buffer[1024];
  278. snprintf(cmd_buffer, sizeof(cmd_buffer), "%s cnxcc:%s:%s %s", instruction,
  279. __get_table_name(credit_data->type), credit_data->str_id, key);
  280. if(__redis_exec(credit_data, cmd_buffer, &rpl) < 0)
  281. return -1;
  282. if(rpl->type == REDIS_REPLY_INTEGER)
  283. *value = rpl->integer;
  284. else if(rpl->type == REDIS_REPLY_NIL)
  285. *value = 0;
  286. else {
  287. *value = atoi(rpl->str);
  288. }
  289. freeReplyObject(rpl);
  290. LM_DBG("Got INT value: %s=%di\n", key, *value);
  291. return 1;
  292. }
  293. int redis_get_str(credit_data_t *credit_data, const char *instruction,
  294. const char *key, str *value)
  295. {
  296. redisReply *rpl = NULL;
  297. char cmd_buffer[1024];
  298. snprintf(cmd_buffer, sizeof(cmd_buffer), "%s cnxcc:%s:%s %s", instruction,
  299. __get_table_name(credit_data->type), credit_data->str_id, key);
  300. value->s = NULL;
  301. value->len = 0;
  302. if(__redis_exec(credit_data, cmd_buffer, &rpl) < 0)
  303. return -1;
  304. if(rpl->type != REDIS_REPLY_STRING && rpl->type != REDIS_REPLY_NIL) {
  305. LM_ERR("Redis reply to [%s] is not a string/nil: type[%d]\n",
  306. cmd_buffer, rpl->type);
  307. freeReplyObject(rpl);
  308. return -1;
  309. }
  310. if(rpl->type == REDIS_REPLY_NIL) {
  311. LM_DBG("Value of %s is (nil)\n", key);
  312. goto done;
  313. }
  314. if(rpl->len <= 0) {
  315. LM_ERR("RPL len is equal to %d\n", (int)rpl->len);
  316. goto done;
  317. }
  318. value->s = pkg_malloc(rpl->len);
  319. if(!value->s) {
  320. PKG_MEM_ERROR;
  321. freeReplyObject(rpl);
  322. return -1;
  323. }
  324. value->len = rpl->len;
  325. memcpy(value->s, rpl->str, rpl->len);
  326. done:
  327. freeReplyObject(rpl);
  328. LM_DBG("Got STRING value: %s=[%.*s]\n", key, value->len, value->s);
  329. return 1;
  330. }
  331. int redis_remove_credit_data(credit_data_t *credit_data)
  332. {
  333. redisReply *rpl = NULL;
  334. char cmd_buffer[1024];
  335. int ret;
  336. snprintf(cmd_buffer, sizeof(cmd_buffer), "DEL cnxcc:%s:%s",
  337. __get_table_name(credit_data->type), credit_data->str_id);
  338. ret = __redis_exec(NULL, cmd_buffer, &rpl);
  339. // if (ret > 0)
  340. // freeReplyObject(rpl);
  341. return ret;
  342. }
  343. int redis_append_kill_list_member(credit_data_t *credit_data)
  344. {
  345. redisReply *rpl = NULL;
  346. char cmd_buffer[1024];
  347. int ret;
  348. snprintf(cmd_buffer, sizeof(cmd_buffer), "SADD cnxcc:kill_list:%s \"%s\"",
  349. __get_table_name(credit_data->type), credit_data->str_id);
  350. ret = __redis_exec(credit_data, cmd_buffer, &rpl);
  351. if(ret > 0)
  352. freeReplyObject(rpl);
  353. return ret;
  354. }
  355. int redis_remove_kill_list_member(credit_data_t *credit_data)
  356. {
  357. redisReply *rpl = NULL;
  358. char cmd_buffer[1024];
  359. int ret;
  360. snprintf(cmd_buffer, sizeof(cmd_buffer), "SREM cnxcc:kill_list:%s \"%s\"",
  361. __get_table_name(credit_data->type), credit_data->str_id);
  362. ret = __redis_exec(credit_data, cmd_buffer, &rpl);
  363. if(ret > 0)
  364. freeReplyObject(rpl);
  365. return ret;
  366. }
  367. int redis_insert_str_value(
  368. credit_data_t *credit_data, const char *key, str *value)
  369. {
  370. redisReply *rpl = NULL;
  371. int ret = -1;
  372. char cmd_buffer[2048];
  373. if(value == NULL) {
  374. LM_ERR("str value is null\n");
  375. return -1;
  376. }
  377. if(value->len == 0) {
  378. LM_WARN("[%s] value is empty\n", key);
  379. return 1;
  380. }
  381. snprintf(cmd_buffer, sizeof(cmd_buffer), "HSET cnxcc:%s:%s %s \"%.*s\"",
  382. __get_table_name(credit_data->type), credit_data->str_id, key,
  383. value->len, value->s);
  384. ret = __redis_exec(credit_data, cmd_buffer, &rpl);
  385. if(ret > 0)
  386. freeReplyObject(rpl);
  387. return ret;
  388. }
  389. int redis_insert_int_value(
  390. credit_data_t *credit_data, const char *key, int value)
  391. {
  392. redisReply *rpl = NULL;
  393. int ret = -1;
  394. char cmd_buffer[1024];
  395. snprintf(cmd_buffer, sizeof(cmd_buffer), "HSET cnxcc:%s:%s %s %d",
  396. __get_table_name(credit_data->type), credit_data->str_id, key,
  397. value);
  398. ret = __redis_exec(credit_data, cmd_buffer, &rpl);
  399. if(ret > 0)
  400. freeReplyObject(rpl);
  401. return ret;
  402. }
  403. int redis_insert_double_value(
  404. credit_data_t *credit_data, const char *key, double value)
  405. {
  406. redisReply *rpl = NULL;
  407. int ret = -1;
  408. char cmd_buffer[1024];
  409. snprintf(cmd_buffer, sizeof(cmd_buffer), "HSET cnxcc:%s:%s %s %f",
  410. __get_table_name(credit_data->type), credit_data->str_id, key,
  411. value);
  412. ret = __redis_exec(credit_data, cmd_buffer, &rpl);
  413. if(ret > 0)
  414. freeReplyObject(rpl);
  415. return ret;
  416. }
  417. int redis_kill_list_member_exists(credit_data_t *credit_data)
  418. {
  419. redisReply *rpl = NULL;
  420. int exists = 0;
  421. char cmd_buffer[1024];
  422. snprintf(cmd_buffer, sizeof(cmd_buffer),
  423. "SISMEMBER cnxcc:kill_list:%s \"%s\"",
  424. __get_table_name(credit_data->type), credit_data->str_id);
  425. if(__redis_exec(credit_data, cmd_buffer, &rpl) < 0)
  426. return -1;
  427. exists = rpl->integer;
  428. freeReplyObject(rpl);
  429. return exists;
  430. }
  431. int redis_clean_up_if_last(credit_data_t *credit_data)
  432. {
  433. int counter = 0;
  434. if(redis_get_int(credit_data, "HGET", "number_of_calls", &counter) < 0)
  435. return -1;
  436. return counter > 0 ? 1 : redis_remove_credit_data(credit_data);
  437. }
  438. static void __redis_subscribe_to_kill_list(struct redis *redis)
  439. {
  440. redisAsyncCommand(redis->async_ctxt, __subscription_cb, NULL,
  441. "SUBSCRIBE cnxcc:kill_list");
  442. }
  443. int redis_publish_to_kill_list(credit_data_t *credit_data)
  444. {
  445. redisReply *rpl = NULL;
  446. char cmd_buffer[1024];
  447. snprintf(cmd_buffer, sizeof(cmd_buffer), "PUBLISH cnxcc:kill_list %s",
  448. credit_data->str_id);
  449. return __redis_exec(NULL, cmd_buffer, &rpl) < 0;
  450. }
  451. static void __async_connect_cb(const redisAsyncContext *c, int status)
  452. {
  453. if(status != REDIS_OK) {
  454. LM_ERR("error connecting to Redis db in async mode\n");
  455. abort();
  456. }
  457. LM_INFO("connected to Redis in async mode\n");
  458. }
  459. static void __async_disconnect_cb(const redisAsyncContext *c, int status)
  460. {
  461. LM_ERR("async DB connection was lost\n");
  462. }
  463. static void __subscription_cb(redisAsyncContext *c, void *r, void *privdata)
  464. {
  465. redisReply *reply = r;
  466. str key = STR_NULL;
  467. credit_data_t *credit_data = NULL;
  468. if(reply == NULL) {
  469. LM_ERR("reply is NULL\n");
  470. return;
  471. }
  472. if(reply->type != REDIS_REPLY_ARRAY || reply->elements != 3)
  473. return;
  474. if(strcmp(reply->element[1]->str, "cnxcc:kill_list") != 0)
  475. return;
  476. if(!reply->element[2]->str)
  477. return;
  478. key.len = strlen(reply->element[2]->str);
  479. if(key.len <= 0) {
  480. LM_ERR("Invalid credit_data key\n");
  481. return;
  482. }
  483. key.s = reply->element[2]->str;
  484. if(try_get_credit_data_entry(&key, &credit_data) < 0)
  485. return;
  486. cnxcc_lock(credit_data->lock);
  487. if(credit_data->deallocating)
  488. goto done; // no need to terminate the calls. They are already being terminated
  489. LM_ALERT("Got kill list entry for key [%.*s]\n", key.len, key.s);
  490. terminate_all_calls(credit_data);
  491. done:
  492. cnxcc_unlock(credit_data->lock);
  493. }