Преглед на файлове

evapi: added support for tagging connections and select for sending based on tag

- new cfg function evapi_set_tag(tname)
Daniel-Constantin Mierla преди 9 години
родител
ревизия
28b809e41b
променени са 3 файла, в които са добавени 118 реда и са изтрити 35 реда
  1. 98 35
      modules/evapi/evapi_dispatch.c
  2. 2 0
      modules/evapi/evapi_dispatch.h
  3. 18 0
      modules/evapi/evapi_mod.c

+ 98 - 35
modules/evapi/evapi_dispatch.c

@@ -45,6 +45,7 @@ static int _evapi_notify_sockets[2];
 static int _evapi_netstring_format = 1;
 
 #define EVAPI_IPADDR_SIZE	64
+#define EVAPI_TAG_SIZE	64
 #define CLIENT_BUFFER_SIZE	32768
 typedef struct _evapi_client {
 	int connected;
@@ -52,6 +53,8 @@ typedef struct _evapi_client {
 	unsigned short af;
 	unsigned short src_port;
 	char src_addr[EVAPI_IPADDR_SIZE];
+	char tag[EVAPI_IPADDR_SIZE];
+	str  stag;
 	char rbuffer[CLIENT_BUFFER_SIZE];
 	unsigned int rpos;
 } evapi_client_t;
@@ -62,6 +65,11 @@ typedef struct _evapi_env {
 	str msg;
 } evapi_env_t;
 
+typedef struct _evapi_msg {
+	str data;
+	str tag;
+} evapi_msg_t;
+
 #define EVAPI_MAX_CLIENTS	8
 /* last one used for error handling, not a real connected client */
 static evapi_client_t _evapi_clients[EVAPI_MAX_CLIENTS+1];
@@ -162,13 +170,45 @@ int evapi_cfg_close(sip_msg_t *msg)
 	if(msg==NULL)
 		return -1;
 
-	evenv = (evapi_env_t*)msg->date;
+	evenv = evapi_get_msg_env(msg);
 
 	if(evenv==NULL || evenv->conidx<0 || evenv->conidx>=EVAPI_MAX_CLIENTS)
 		return -1;
 	return evapi_close_connection(evenv->conidx);
 }
 
+/**
+ *
+ */
+int evapi_set_tag(sip_msg_t* msg, str* stag)
+{
+	evapi_env_t *evenv;
+
+	if(msg==NULL || stag==NULL)
+		return -1;
+
+	evenv = evapi_get_msg_env(msg);
+
+	if(evenv==NULL || evenv->conidx<0 || evenv->conidx>=EVAPI_MAX_CLIENTS)
+		return -1;
+
+	if(!(_evapi_clients[evenv->conidx].connected==1
+			&& _evapi_clients[evenv->conidx].sock > 0)) {
+		LM_ERR("connection not established\n");
+		return -1;
+	}
+
+	if(stag->len>=EVAPI_TAG_SIZE) {
+		LM_ERR("tag size too big: %d / %d\n", stag->len, EVAPI_TAG_SIZE);
+		return -1;
+	}
+	_evapi_clients[evenv->conidx].stag.s = _evapi_clients[evenv->conidx].tag;
+	strncpy(_evapi_clients[evenv->conidx].stag.s, stag->s, stag->len);
+	_evapi_clients[evenv->conidx].stag.s[stag->len] = '\0';
+	_evapi_clients[evenv->conidx].stag.len = stag->len;
+	return 1;
+}
+
 /**
  *
  */
@@ -203,7 +243,7 @@ void evapi_close_notify_sockets_parent(void)
 /**
  *
  */
-int evapi_dispatch_notify(char *obuf, int olen)
+int evapi_dispatch_notify(evapi_msg_t *emsg)
 {
 	int i;
 	int n;
@@ -212,12 +252,18 @@ int evapi_dispatch_notify(char *obuf, int olen)
 	n = 0;
 	for(i=0; i<EVAPI_MAX_CLIENTS; i++) {
 		if(_evapi_clients[i].connected==1 && _evapi_clients[i].sock>0) {
-			wlen = write(_evapi_clients[i].sock, obuf, olen);
-			if(wlen!=olen) {
-				LM_DBG("failed to write all packet (%d out of %d) on socket %d index [%d]\n",
-						wlen, olen, _evapi_clients[i].sock, i);
+			if(emsg->tag.s==NULL || (emsg->tag.len = _evapi_clients[i].stag.len
+						&& strncmp(_evapi_clients[i].stag.s,
+									emsg->tag.s, emsg->tag.len)==0)) {
+				wlen = write(_evapi_clients[i].sock, emsg->data.s,
+						emsg->data.len);
+				if(wlen!=emsg->data.len) {
+					LM_DBG("failed to write all packet (%d out of %d) on socket"
+							" %d index [%d]\n",
+							wlen, emsg->data.len, _evapi_clients[i].sock, i);
+				}
+				n++;
 			}
-			n++;
 		}
 	}
 
@@ -390,7 +436,7 @@ void evapi_accept_client(struct ev_loop *loop, struct ev_io *watcher, int revent
 	struct ev_io *evapi_client;
 	int i;
 	evapi_env_t evenv;
-	
+
 	evapi_client = (struct ev_io*) malloc (sizeof(struct ev_io));
 	if(evapi_client==NULL) {
 		LM_ERR("no more memory\n");
@@ -472,7 +518,7 @@ void evapi_accept_client(struct ev_loop *loop, struct ev_io *watcher, int revent
  */
 void evapi_recv_notify(struct ev_loop *loop, struct ev_io *watcher, int revents)
 {
-	str *sbuf;
+	evapi_msg_t *emsg = NULL;
 	int rlen;
 
 	if(EV_ERROR & revents) {
@@ -483,16 +529,17 @@ void evapi_recv_notify(struct ev_loop *loop, struct ev_io *watcher, int revents)
 	cfg_update();
 
 	/* read message from client */
-	rlen = read(watcher->fd, &sbuf, sizeof(str*));
+	rlen = read(watcher->fd, &emsg, sizeof(evapi_msg_t*));
 
-	if(rlen != sizeof(str*)) {
+	if(rlen != sizeof(evapi_msg_t*) || emsg==NULL) {
 		LM_ERR("cannot read the sip worker message\n");
 		return;
 	}
 
-	LM_DBG("received [%p] [%.*s] (%d)\n", sbuf, sbuf->len, sbuf->s, sbuf->len);
-	evapi_dispatch_notify(sbuf->s, sbuf->len);
-	shm_free(sbuf);
+	LM_DBG("received [%p] [%.*s] (%d)\n", emsg,
+			emsg->data.len, emsg->data.s, emsg->data.len);
+	evapi_dispatch_notify(emsg);
+	shm_free(emsg);
 }
 
 /**
@@ -519,13 +566,13 @@ int evapi_run_dispatcher(char *laddr, int lport)
 		return -1;
 	}
 
-    h = gethostbyname(laddr);
-    if (h == NULL || (h->h_addrtype != AF_INET && h->h_addrtype != AF_INET6)) {
-    	LM_ERR("cannot resolve local server address [%s]\n", laddr);
-        return -1;
-    }
-    if(h->h_addrtype == AF_INET) {
-    	evapi_srv_sock = socket(PF_INET, SOCK_STREAM, 0);
+	h = gethostbyname(laddr);
+	if (h == NULL || (h->h_addrtype != AF_INET && h->h_addrtype != AF_INET6)) {
+		LM_ERR("cannot resolve local server address [%s]\n", laddr);
+		return -1;
+	}
+	if(h->h_addrtype == AF_INET) {
+		evapi_srv_sock = socket(PF_INET, SOCK_STREAM, 0);
 	} else {
 		evapi_srv_sock = socket(PF_INET6, SOCK_STREAM, 0);
 	}
@@ -543,11 +590,11 @@ int evapi_run_dispatcher(char *laddr, int lport)
 	evapi_srv_addr.sin_addr  = *(struct in_addr*)h->h_addr;
 
 	/* Set SO_REUSEADDR option on listening socket so that we don't
-	 * have to wait for connections in TIME_WAIT to go away before 
+	 * have to wait for connections in TIME_WAIT to go away before
 	 * re-binding.
 	 */
 
-	if(setsockopt(evapi_srv_sock, SOL_SOCKET, SO_REUSEADDR, 
+	if(setsockopt(evapi_srv_sock, SOL_SOCKET, SO_REUSEADDR,
 		&yes_true, sizeof(int)) < 0) {
 		LM_ERR("cannot set SO_REUSEADDR option on descriptor\n");
 		close(evapi_srv_sock);
@@ -591,42 +638,50 @@ int evapi_run_worker(int prank)
 /**
  *
  */
-int evapi_relay(str *evdata)
+int evapi_relay_multicast(str *evdata, str *ctag)
 {
 #define EVAPI_RELAY_FORMAT "%d:%.*s,"
 
 	int len;
 	int sbsize;
-	str *sbuf;
+	evapi_msg_t *emsg;
 
 	LM_DBG("relaying event data [%.*s] (%d)\n",
 			evdata->len, evdata->s, evdata->len);
 
 	sbsize = evdata->len;
-	sbuf = (str*)shm_malloc(sizeof(str) + ((sbsize+32) * sizeof(char)));
-	if(sbuf==NULL) {
+	len = sizeof(evapi_msg_t)
+		+ ((sbsize + 32 + ((ctag && ctag->len>0)?(ctag->len+2):0)) * sizeof(char));
+	emsg = (evapi_msg_t*)shm_malloc(len);
+	if(emsg==NULL) {
 		LM_ERR("no more shared memory\n");
 		return -1;
 	}
-	sbuf->s = (char*)sbuf + sizeof(str);
+	memset(emsg, 0, len);
+	emsg->data.s = (char*)emsg + sizeof(evapi_msg_t);
 	if(_evapi_netstring_format) {
 		/* netstring encapsulation */
-		sbuf->len = snprintf(sbuf->s, sbsize+32,
+		emsg->data.len = snprintf(emsg->data.s, sbsize+32,
 				EVAPI_RELAY_FORMAT,
 				sbsize, evdata->len, evdata->s);
 	} else {
-		sbuf->len = snprintf(sbuf->s, sbsize+32,
+		emsg->data.len = snprintf(emsg->data.s, sbsize+32,
 				"%.*s",
 				evdata->len, evdata->s);
 	}
-	if(sbuf->len<=0 || sbuf->len>sbsize+32) {
-		shm_free(sbuf);
+	if(emsg->data.len<=0 || emsg->data.len>sbsize+32) {
+		shm_free(emsg);
 		LM_ERR("cannot serialize event\n");
 		return -1;
 	}
+	if(ctag && ctag->len>0) {
+		emsg->tag.s = emsg->data.s + sbsize + 32;
+		strncpy(emsg->tag.s, ctag->s, ctag->len);
+		emsg->tag.len = ctag->len;
+	}
 
-	LM_DBG("sending [%p] [%.*s] (%d)\n", sbuf, sbuf->len, sbuf->s, sbuf->len);
-	len = write(_evapi_notify_sockets[1], &sbuf, sizeof(str*));
+	LM_DBG("sending [%p] [%.*s] (%d)\n", emsg, emsg->data.len, emsg->data.s, emsg->data.len);
+	len = write(_evapi_notify_sockets[1], &emsg, sizeof(evapi_msg_t*));
 	if(len<=0) {
 		LM_ERR("failed to pass the pointer to evapi dispatcher\n");
 		return -1;
@@ -634,6 +689,14 @@ int evapi_relay(str *evdata)
 	return 0;
 }
 
+/**
+ *
+ */
+int evapi_relay(str *evdata)
+{
+	return evapi_relay_multicast(evdata, NULL);
+}
+
 #if 0
 /**
  *
@@ -725,7 +788,7 @@ int pv_get_evapi(sip_msg_t *msg, pv_param_t *param, pv_value_t *res)
 	if(param==NULL || res==NULL)
 		return -1;
 
-	evenv = (evapi_env_t*)msg->date;
+	evenv = evapi_get_msg_env(msg);
 
 	if(evenv==NULL || evenv->conidx<0 || evenv->conidx>=EVAPI_MAX_CLIENTS)
 		return pv_get_null(msg, param, res);

+ 2 - 0
modules/evapi/evapi_dispatch.h

@@ -36,6 +36,7 @@ int evapi_run_dispatcher(char *laddr, int lport);
 int evapi_run_worker(int prank);
 
 int evapi_relay(str *evdata);
+int evapi_relay_multicast(str *evdata, str *ctag);
 
 void evapi_init_environment(int dformat);
 
@@ -49,5 +50,6 @@ int pv_set_evapi(sip_msg_t *msg, pv_param_t *param, int op,
 #define evapi_get_msg_env(_msg) ((evapi_env_t*)_msg->date)
 
 int evapi_cfg_close(sip_msg_t *msg);
+int evapi_set_tag(sip_msg_t* msg, str* stag);
 
 #endif

+ 18 - 0
modules/evapi/evapi_mod.c

@@ -59,6 +59,7 @@ static void mod_destroy(void);
 static int w_evapi_relay(sip_msg_t* msg, char* evdata, char* p2);
 static int w_evapi_async_relay(sip_msg_t* msg, char* evdata, char* p2);
 static int w_evapi_close(sip_msg_t* msg, char* p1, char* p2);
+static int w_evapi_set_tag(sip_msg_t* msg, char* ptag, char* p2);
 static int fixup_evapi_relay(void** param, int param_no);
 
 static cmd_export_t cmds[]={
@@ -70,6 +71,8 @@ static cmd_export_t cmds[]={
 		0, ANY_ROUTE},
 	{"evapi_close",       (cmd_function)w_evapi_close,       1, NULL,
 		0, ANY_ROUTE},
+	{"evapi_set_tag",       (cmd_function)w_evapi_set_tag,   1, fixup_spve_null,
+		0, ANY_ROUTE},
 	{0, 0, 0, 0, 0, 0}
 };
 
@@ -317,3 +320,18 @@ static int w_evapi_close(sip_msg_t* msg, char* p1, char* p2)
 		return ret+1;
 	return ret;
 }
+
+/**
+ *
+ */
+static int w_evapi_set_tag(sip_msg_t* msg, char* ptag, char* p2)
+{
+	str stag;
+	if(fixup_get_svalue(msg, (gparam_t*)ptag, &stag)!=0) {
+		LM_ERR("no tag name\n");
+		return -1;
+	}
+	if(evapi_set_tag(msg, &stag)<0)
+		return -1;
+	return 1;
+}