瀏覽代碼

acc_json: adding CDR as JSON functionality

Lucian Balaceanu 5 年之前
父節點
當前提交
09f1c59bfe
共有 3 個文件被更改,包括 374 次插入37 次删除
  1. 202 24
      src/modules/acc_json/acc_json_mod.c
  2. 8 2
      src/modules/acc_json/acc_json_mod.h
  3. 164 11
      src/modules/acc_json/doc/acc_json_admin.xml

+ 202 - 24
src/modules/acc_json/acc_json_mod.c

@@ -43,22 +43,36 @@ static int child_init(int rank);
 
 int acc_json_init(acc_init_info_t *inf);
 int acc_json_send_request(struct sip_msg *req, acc_info_t *inf);
+int cdr_json_init(void);
+int cdr_json_write(struct dlg_cell *dlg, struct sip_msg *req, cdr_info_t *inf);
 
 // acc API
 acc_api_t accb;
 acc_engine_t _acc_json_engine;
+cdr_engine_t _cdr_json_engine;
 // mqueue API
 mq_api_t mq_api;
+// dlg API
+struct dlg_binds dlgb;
 
 int acc_flag = -1;
 int acc_missed_flag = -1;
 int acc_time_mode = 0;
 static char *acc_extra_str = 0;
 acc_extra_t *acc_extra = 0;
-int output_syslog = -1;
-char *output_mqueue_str = 0; /* see mqueue module queue name */
-str q_name = {0, 0};
-static char *log_facility_str = 0;
+int acc_output_syslog = -1;
+char *acc_output_mqueue_str = 0; /* see mqueue module queue name */
+str acc_q_name = {0, 0};
+static char *acc_log_facility_str = 0;
+
+int cdr_enable  = 0;
+static char *cdr_extra_str = 0;
+acc_extra_t *cdr_extra = 0;
+int cdr_expired_dlg_enable = 0;
+int cdr_output_syslog = -1;
+char *cdr_output_mqueue_str = 0; /* see mqueue module queue name */
+str cdr_q_name = {0, 0};
+static char *cdr_log_facility_str = 0;
 
 static cmd_export_t cmds[] = {{0, 0, 0, 0, 0, 0}};
 
@@ -68,10 +82,17 @@ static param_export_t params[] = {{"acc_flag", INT_PARAM, &acc_flag},
 		{"acc_extra", PARAM_STRING, &acc_extra_str},
 		{"acc_time_mode", INT_PARAM, &acc_time_mode},
 		{"acc_time_format", PARAM_STRING, &acc_time_format},
-		{"log_level", INT_PARAM, &log_level},
-		{"log_facility", PARAM_STRING, &log_facility_str},
-		{"output_mqueue", PARAM_STRING, &output_mqueue_str},
-		{"output_syslog", INT_PARAM, &output_syslog}, {0, 0, 0}};
+		{"acc_log_level", INT_PARAM, &acc_log_level},
+		{"acc_log_facility", PARAM_STRING, &acc_log_facility_str},
+		{"acc_output_mqueue", PARAM_STRING, &acc_output_mqueue_str},
+		{"acc_output_syslog", INT_PARAM, &acc_output_syslog},
+		{"cdr_extra", PARAM_STRING, &cdr_extra_str},
+		{"cdr_enable", INT_PARAM, &cdr_enable},
+		{"cdr_expired_dlg_enable", INT_PARAM, &cdr_expired_dlg_enable},
+		{"cdr_log_level", INT_PARAM, &cdr_log_level},
+		{"cdr_log_facility", PARAM_STRING, &cdr_log_facility_str},
+		{"cdr_output_mqueue", PARAM_STRING, &cdr_output_mqueue_str},
+		{"cdr_output_syslog", INT_PARAM, &cdr_output_syslog}, {0, 0, 0}};
 
 
 struct module_exports exports = {
@@ -96,15 +117,34 @@ static int mod_init(void)
 		return -1;
 	}
 
+	if( cdr_enable < 0 || cdr_enable > 1) {
+		LM_ERR("cdr_enable is out of range\n");
+		return -1;
+	}
+	if( cdr_expired_dlg_enable < 0 || cdr_expired_dlg_enable > 1) {
+		LM_ERR("cdr_expired_dlg_enable is out of range\n");
+		return -1;
+	}
+
 	LM_INFO("janson version : %s\n", JANSSON_VERSION);
 #if JANSSON_VERSION_HEX >= 0x010300
 /* Code specific to version 1.3 and above */
 #endif
 
-	if(log_facility_str) {
-		int tmp = str2facility(log_facility_str);
+	if(acc_log_facility_str) {
+		int tmp = str2facility(acc_log_facility_str);
 		if(tmp != -1)
-			log_facility = tmp;
+			acc_log_facility = tmp;
+		else {
+			LM_ERR("invalid log facility configured");
+			return -1;
+		}
+	}
+
+	if (cdr_log_facility_str) {
+		int tmp = str2facility(cdr_log_facility_str);
+		if (tmp != -1)
+			cdr_log_facility = tmp;
 		else {
 			LM_ERR("invalid log facility configured");
 			return -1;
@@ -112,15 +152,18 @@ static int mod_init(void)
 	}
 
 	/* load the MQUEUE API */
-	if(output_mqueue_str && (load_mq_api(&mq_api) != 0)) {
+	if((acc_output_mqueue_str || cdr_output_mqueue_str) && (load_mq_api(&mq_api) != 0)) {
 		LM_ERR("can't load mqueue module API, disabling json acc to mqueue\n");
-		output_mqueue_str = NULL;
+		acc_output_mqueue_str = NULL;
 	}
-	if(output_mqueue_str) {
-		q_name.s = output_mqueue_str;
-		q_name.len = strlen(output_mqueue_str);
+	if(acc_output_mqueue_str) {
+		acc_q_name.s = acc_output_mqueue_str;
+		acc_q_name.len = strlen(acc_output_mqueue_str);
+	}
+	if(cdr_output_mqueue_str) {
+		cdr_q_name.s = cdr_output_mqueue_str;
+		cdr_q_name.len = strlen(cdr_output_mqueue_str);
 	}
-
 	/* parse the extra string, if any */
 	if(acc_extra_str && (acc_extra = accb.parse_extra(acc_extra_str)) == 0) {
 		LM_ERR("failed to parse acc_extra param\n");
@@ -141,6 +184,29 @@ static int mod_init(void)
 		return -1;
 	}
 
+	if (cdr_enable) {
+		if(load_dlg_api( &dlgb) != 0) {
+			LM_ERR("can't load dialog API\n");
+			return -1;
+		}
+		/* parse the extra string, if any */
+		if(cdr_extra_str && (cdr_extra = accb.parse_extra(cdr_extra_str)) == 0) {
+			LM_ERR("failed to parse cdr_extra param\n");
+			return -1;
+		}
+		memset(&_cdr_json_engine, 0, sizeof(cdr_engine_t));
+
+		_cdr_json_engine.cdr_write = cdr_json_write;
+		_cdr_json_engine.cdr_init = cdr_json_init;
+		memcpy(_cdr_json_engine.name, "json", 4);
+
+		if (!accb.register_cdr_engine
+				|| (accb.register_cdr_engine
+						&& (accb.register_cdr_engine(&_cdr_json_engine) < 0))) {
+			LM_ERR("cannot register ACC CDR JSON engine\n");
+			return -1;
+		}
+	}
 	return 0;
 }
 
@@ -166,11 +232,20 @@ int acc_json_init(acc_init_info_t *inf)
 }
 
 
-void syslog_write(const char *acc)
+void acc_syslog_write(const char *acc)
 {
 	//setlogmask(LOG_UPTO (LOG_NOTICE));
-	openlog("json_acc", LOG_CONS | LOG_PID | LOG_NDELAY, log_facility);
-	syslog(log_level, "%s", acc);
+	openlog("json_acc", LOG_CONS | LOG_PID | LOG_NDELAY, acc_log_facility);
+	syslog(acc_log_level, "%s", acc);
+	closelog();
+}
+
+
+void cdr_syslog_write(const char *cdr)
+{
+	//setlogmask(LOG_UPTO (LOG_NOTICE));
+	openlog("json_acc", LOG_CONS | LOG_PID | LOG_NDELAY, cdr_log_facility);
+	syslog(cdr_log_level, "%s", cdr);
 	closelog();
 }
 
@@ -279,9 +354,9 @@ int acc_json_send_request(struct sip_msg *req, acc_info_t *inf)
 		str acc_str = {json_string, strlen(json_string)};
 
 		// json acc output to mqueue
-		if(output_mqueue_str) {
+		if(acc_output_mqueue_str) {
 			str key = str_init("acc");
-			if(mq_api.add(&q_name, &key, &acc_str)) {
+			if(mq_api.add(&acc_q_name, &key, &acc_str)) {
 				LM_DBG("ACC queued [%d][%s]\n", acc_str.len, acc_str.s);
 			} else {
 				LM_DBG("ACC mqueue add error [%d][%s]\n", acc_str.len,
@@ -289,8 +364,8 @@ int acc_json_send_request(struct sip_msg *req, acc_info_t *inf)
 			}
 		}
 		// json acc output to syslog
-		if(output_syslog)
-			syslog_write(json_string);
+		if(acc_output_syslog)
+			acc_syslog_write(json_string);
 		free(json_string);
 		json_object_clear(object);
 		json_decref(object);
@@ -299,3 +374,106 @@ int acc_json_send_request(struct sip_msg *req, acc_info_t *inf)
 	free_strar_mem(&(inf->tarr[m - o]), &(inf->varr[m - o]), o, m);
 	return 1;
 }
+
+
+int cdr_json_init(void)
+{
+	LM_DBG(" init ...\n");
+	return 0;
+}
+
+
+int cdr_json_write(struct dlg_cell *dlg, struct sip_msg *req, cdr_info_t *inf)
+{
+	int attr_cnt = 0;
+	int i;
+	int extra_cnt = 0;
+	int core_cnt = 0;
+
+	json_t *object = json_object();
+
+	/* get default values */
+	core_cnt = accb.get_core_cdr_attrs( dlg, inf->varr, inf->iarr, inf->tarr);
+	attr_cnt += core_cnt;
+
+	for(i = 0; i < attr_cnt; i++) {
+		LM_DBG("[%d][%.*s]\n", i, inf->varr[i].len, inf->varr[i].s);
+		char *tmp = strndup(inf->varr[i].s, inf->varr[i].len);
+		json_t *value = json_string(tmp);
+		if(!value)
+			value = json_string("NON-UTF8");
+		if(i == 0) {
+			json_object_set_new(object, cdr_start_str.s, value);
+		} else if(i == 1) {
+			json_object_set_new(object, cdr_end_str.s, value);
+		} else if(i == 2) {
+			json_object_set_new(object, cdr_duration_str.s, value);
+		}
+		free(tmp);
+	}
+
+	/* get extra values */
+	if (req)
+	{
+		/* free memory allocated by get_extra_attrs */
+		extra_cnt += accb.get_extra_attrs( cdr_extra,
+				req,
+				inf->varr + attr_cnt,
+				inf->iarr + attr_cnt,
+				inf->tarr + attr_cnt);
+		attr_cnt += extra_cnt;
+	} else if (cdr_expired_dlg_enable){
+		int dlg_index = 0;
+		dlg_index += accb.get_extra_dlg_attrs( cdr_extra,
+				dlg,
+				inf->varr + attr_cnt,
+				inf->iarr + attr_cnt,
+				inf->tarr + attr_cnt,
+				&dlgb);
+		attr_cnt += dlg_index;
+	}
+
+	struct acc_extra *extra = cdr_extra;
+	for( ; i < attr_cnt; i++)
+	{
+		LM_DBG("[%d][%s][%.*s]\n", i, extra->name.s, inf->varr[i].len,
+						inf->varr[i].s);
+		char *tmp = strndup(inf->varr[i].s, inf->varr[i].len);
+		json_t *value = json_string(tmp);
+		if(!value)
+			value = json_string("NON-UTF8");
+		json_object_set_new(object, extra->name.s, value);
+		free(tmp);
+		extra = extra->next;
+	}
+
+	if(object) {
+		if(json_object_size(object) == 0) {
+			LM_ERR("json object empty\n");
+			json_decref(object);
+			return 0;
+		}
+		char *json_string = json_dumps(object, JSON_ENSURE_ASCII);
+		str cdr_str = {json_string, strlen(json_string)};
+
+		// json acc output to mqueue
+		if (cdr_output_mqueue_str) {
+			str key = str_init("cdr");
+			if (mq_api.add(&cdr_q_name, &key, &cdr_str)) {
+				LM_DBG("CDR queued [%d][%s]\n", cdr_str.len, cdr_str.s);
+			} else {
+				LM_DBG("CDR mqueue add error [%d][%s]\n", cdr_str.len,
+						cdr_str.s);
+			}
+		}
+		// json acc output to syslog
+		if(cdr_output_syslog)
+			cdr_syslog_write(json_string);
+		free(json_string);
+		json_object_clear(object);
+		json_decref(object);
+	}
+	/* free memory allocated by get_extra_attrs */
+	free_strar_mem(&(inf->tarr[core_cnt]), &(inf->varr[core_cnt]), extra_cnt, attr_cnt);
+	return 1;
+}

+ 8 - 2
src/modules/acc_json/acc_json_mod.h

@@ -32,11 +32,17 @@ str acc_sipcode_key = str_init("sip_code");
 str acc_sipreason_key = str_init("sip_reason");
 str acc_time_key = str_init("time");
 
+str cdr_start_str = str_init("start_time");
+str cdr_end_str = str_init("end_time");
+str cdr_duration_str = str_init("duration");
+
 #define ACC_TIME_FORMAT_SIZE 128
 static char acc_time_format_buf[ACC_TIME_FORMAT_SIZE];
 char *acc_time_format = "%Y-%m-%d %H:%M:%S";
 
-int log_level = L_NOTICE;
-int log_facility = LOG_DAEMON;
+int acc_log_level = L_NOTICE;
+int acc_log_facility = LOG_DAEMON;
+int cdr_log_level = L_NOTICE;
+int cdr_log_facility = LOG_DAEMON;
 
 #endif

+ 164 - 11
src/modules/acc_json/doc/acc_json_admin.xml

@@ -232,7 +232,8 @@ route[RUN_ACC_PUBLISH] {
 ...
 </programlisting>
 		</example>
-	</section>
+	</section>	
+	
 	<section id="acc_json.p.output_syslog">
 		<title><varname>output_syslog</varname> (integer)</title>
 		<para>
@@ -249,15 +250,15 @@ route[RUN_ACC_PUBLISH] {
 		<programlisting format="linespecific">
 ...
 modparam("acc_json", "output_syslog", 1)
-modparam("acc_json", "log_level", 2)
-modparam("acc_json", "log_facility", "LOG_DAEMON")
+modparam("acc_json", "acc_log_level", 2)
+modparam("acc_json", "acc_log_facility", "LOG_DAEMON")
 ...
 </programlisting>
 		</example>
 	</section>
 
-	<section id="acc_json.p.log_facility">
-		<title><varname>log_facility</varname> (integer)</title>
+	<section id="acc_json.p.acc_log_facility">
+		<title><varname>acc_log_facility</varname> (integer)</title>
 		<para>
 		Log facility to which accounting messages are issued to syslog.
                 This allows to easily separate the accounting specific logging
@@ -267,10 +268,10 @@ modparam("acc_json", "log_facility", "LOG_DAEMON")
 		Default value is LOG_DAEMON.
 		</para>
 		<example>
-		<title>log_facility example</title>
+		<title>acc_log_facility example</title>
 		<programlisting format="linespecific">
 ...
-modparam("acc_json", "log_facility", "LOG_LOCAL0")
+modparam("acc_json", "acc_log_facility", "LOG_LOCAL0")
 
 # modify you syslog/rsyslog config
 # /etc/rsyslog.d/default.conf
@@ -283,8 +284,160 @@ modparam("acc_json", "log_facility", "LOG_LOCAL0")
 		</example>
 	</section>
 
-	<section id="acc_json.p.log_level">
-		<title><varname>log_level</varname> (integer)</title>
+	<section id="acc_json.p.acc_log_level">
+		<title><varname>acc_log_level</varname> (integer)</title>
+		<para>
+		Log level at which accounting messages are issued to syslog.
+		</para>
+		<para>
+		Default value is 1 (L_NOTICE).
+		</para>
+		<example>
+			<title>acc_log_level example</title>
+		<programlisting format="linespecific">
+...
+modparam("acc_json", "acc_log_level", 2) # Set acc_log_level to 2 (L_INFO)
+...
+</programlisting>
+		</example>
+	</section>
+	
+	<section id="acc.p.cdr_enable">
+		<title><varname>cdr_enable</varname> (str)</title>
+		<para>
+		Enable Call Data Record generation.
+		</para>
+		<para>
+		Default value is 0 (disabled).
+		</para>
+		<example>
+		<title>cdr_enable example</title>
+		<programlisting format="linespecific">
+...
+modparam("acc_json", "cdr_enable", 1)
+...
+</programlisting>
+		</example>
+	</section>	
+	
+	<section id="acc.p.cdr_extra">
+		<title><varname>cdr_extra</varname> (str)</title>
+		<para>
+		Set of pseudo-variables defining custom CDR fields. 
+		</para>
+		<para>
+		Default value is NULL.
+		</para>
+		<example>
+		<title>cdr_extra example</title>
+		<programlisting format="linespecific">
+...
+modparam("acc_json", "cdr_extra", "ci=$dlg_var(call_id);ft=$dlg_var(from_tag)")
+...
+</programlisting>
+		</example>
+	</section>
+
+	
+	<section id="acc.p.cdr_expired_dlg_enable">
+		<title><varname>cdr_expired_dlg_enable</varname> (str)</title>
+		<para>
+		Should CDR-based logging be enabled in case of expired dialogs? 
+		</para>
+		<para>
+		0 - off (default). 1 - on. 
+		</para>
+		<example>
+		<title>cdr_expired_dlg_enable example</title>
+		<programlisting format="linespecific">
+...
+modparam("acc_json", "cdr_expired_dlg_enable", 1)
+...
+</programlisting>
+		</example>
+	</section>
+	
+	<section id="acc_json.p.cdr_output_mqueue">
+		<title><varname>cdr_output_mqueue</varname> (integer)</title>
+		<para>
+                Requires the mqueue module.
+                The acc module will queue json cdr events in the specified mqueue.
+                Using a rtimer module exec you can access the queue and process them.
+		</para>
+		<para>
+                You can also fetch the cdr events using mqueue.fetch over JSON-RPC.
+		</para>
+		<para>
+		Default value is not-set mqueue will not be required.
+		</para>
+		<example>
+		<title>cdr_output_mqueue usage example</title>
+		<programlisting format="linespecific">
+...
+# example using json_mqueue/http_client to publish to NSQD
+modparam("mqueue", "mqueue", "name=cdr_events;size=100000")
+modparam("acc_json", "cdr_enable", 1)
+modparam("acc_json", "cdr_output_mqueue", "cdr_events")
+modparam("acc_json", "cdr_extra", "ci=$dlg_var(call_id)")
+modparam("rtimer", "timer", "name=nsqt;interval=1;mode=1;")
+modparam("rtimer", "exec", "timer=nsqt;route=RUN_CDR_PUBLISH")
+modparam("http_client", "keep_connections", 1)
+modparam("http_client", "httpcon", "nsqd=>http://localhost:4151/pub?topic=acc")
+
+route[RUN_CDR_PUBLISH] {
+   $var(count) = 0;
+   while (mq_fetch("cdr_events")) {
+      $var(q_size) = mq_size("cdr_events");
+      $var(count) = $var(count) + 1;
+      xinfo("[RUN_CDR_PUBLISH][$var(q_size)][$var(count)][$mqk(cdr_events)][$mqv(cdr_events)]\n");
+      $var(res) = http_connect_raw("nsqd", "", "application/json", $mqv(cdr_events), "$var(nsq_res)");
+      if ($var(res) &lt; 0) {
+         xerr("[RUN_CDR_PUBLISH][$var(res)] http_connect_raw: timeout or error !\n");
+         mq_add("cdr_events", "cdr_key", "$mqv(cdr_events)");
+      } else if ($var(res) &lt; 200 || $var(res) &gt; 299) {
+         xerr("[RUN_CDR_PUBLISH][$var(res)] http unexpected response code !\n");
+         mq_add("cdr_dead_letter_queue", "cdr_key", "$mqv(cdr_events)");
+         return;
+      }
+   }
+   if ($var(count) &gt; 0 ) {
+      xinfo("[RUN_CDR_PUBLISH]done count[$var(count)]\n");
+   }
+}
+...
+</programlisting>
+		</example>
+	</section>
+	
+<section id="acc_json.p.cdr_log_facility">
+		<title><varname>cdr_log_facility</varname> (integer)</title>
+		<para>
+		Log facility to which accounting messages are issued to syslog.
+                This allows to easily separate the accounting specific logging
+                from the other log messages.
+		</para>
+		<para>
+		Default value is LOG_DAEMON.
+		</para>
+		<example>
+		<title>cdr_log_facility example</title>
+		<programlisting format="linespecific">
+...
+modparam("acc_json", "cdr_log_facility", "LOG_LOCAL0")
+
+# modify you syslog/rsyslog config
+# /etc/rsyslog.d/default.conf
+# remove local0 from default log file
+# *.*;local0,auth,authpriv.none /var/log/syslog
+# add local0 to another log file
+# local0.*                      /var/log/json_cdr.log
+...
+</programlisting>
+		</example>
+	</section>
+
+	<section id="acc_json.p.cdr_log_level">
+		<title><varname>cdr_log_level</varname> (integer)</title>
 		<para>
 		Log level at which accounting messages are issued to syslog.
 		</para>
@@ -292,10 +445,10 @@ modparam("acc_json", "log_facility", "LOG_LOCAL0")
 		Default value is 1 (L_NOTICE).
 		</para>
 		<example>
-			<title>log_level example</title>
+			<title>cdr_log_level example</title>
 		<programlisting format="linespecific">
 ...
-modparam("acc_json", "log_level", 2) # Set log_level to 2 (L_INFO)
+modparam("acc_json", "cdr_log_level", 2) # Set cdr_log_level to 2 (L_INFO)
 ...
 </programlisting>
 		</example>