瀏覽代碼

evapi: implemented receive buffering per connected client

- incomplete data may be received on tcp, needing to wait to get more
  (only for netstring format)
Daniel-Constantin Mierla 10 年之前
父節點
當前提交
4eb7656726
共有 1 個文件被更改,包括 68 次插入27 次删除
  1. 68 27
      modules/evapi/evapi_dispatch.c

+ 68 - 27
modules/evapi/evapi_dispatch.c

@@ -45,12 +45,15 @@ static int _evapi_notify_sockets[2];
 static int _evapi_netstring_format = 1;
 
 #define EVAPI_IPADDR_SIZE	64
+#define CLIENT_BUFFER_SIZE	32768
 typedef struct _evapi_client {
 	int connected;
 	int sock;
 	unsigned short af;
 	unsigned short src_port;
 	char src_addr[EVAPI_IPADDR_SIZE];
+	char rbuffer[CLIENT_BUFFER_SIZE];
+	unsigned int rpos;
 } evapi_client_t;
 
 typedef struct _evapi_env {
@@ -60,7 +63,8 @@ typedef struct _evapi_env {
 } evapi_env_t;
 
 #define EVAPI_MAX_CLIENTS	8
-static evapi_client_t _evapi_clients[EVAPI_MAX_CLIENTS];
+/* last one used for error handling, not a real connected client */
+static evapi_client_t _evapi_clients[EVAPI_MAX_CLIENTS+1];
 
 typedef struct _evapi_evroutes {
 	int con_new;
@@ -225,26 +229,18 @@ int evapi_dispatch_notify(char *obuf, int olen)
  */
 void evapi_recv_client(struct ev_loop *loop, struct ev_io *watcher, int revents)
 {
-#define CLIENT_BUFFER_SIZE	32768
-	char rbuffer[CLIENT_BUFFER_SIZE];
 	ssize_t rlen;
 	int i, k;
 	evapi_env_t evenv;
 	str frame;
+	char *sfp;
+	char *efp;
 
 	if(EV_ERROR & revents) {
 		perror("received invalid event\n");
 		return;
 	}
 
-	/* read message from client */
-	rlen = recv(watcher->fd, rbuffer, CLIENT_BUFFER_SIZE-1, 0);
-
-	if(rlen < 0) {
-		LM_ERR("cannot read the client message\n");
-		return;
-	}
-
 	for(i=0; i<EVAPI_MAX_CLIENTS; i++) {
 		if(_evapi_clients[i].connected==1 && _evapi_clients[i].sock==watcher->fd) {
 			break;
@@ -252,9 +248,22 @@ void evapi_recv_client(struct ev_loop *loop, struct ev_io *watcher, int revents)
 	}
 	if(i==EVAPI_MAX_CLIENTS) {
 		LM_ERR("cannot lookup client socket %d\n", watcher->fd);
+		/* try to empty the socket anyhow */
+		rlen = recv(watcher->fd, _evapi_clients[i].rbuffer, CLIENT_BUFFER_SIZE-1, 0);
 		return;
 	}
 
+	/* read message from client */
+	rlen = recv(watcher->fd, _evapi_clients[i].rbuffer + _evapi_clients[i].rpos,
+			CLIENT_BUFFER_SIZE - 1 - _evapi_clients[i].rpos, 0);
+
+	if(rlen < 0) {
+		LM_ERR("cannot read the client message\n");
+		_evapi_clients[i].rpos = 0;
+		return;
+	}
+
+
 	cfg_update();
 
 	evapi_env_reset(&evenv);
@@ -265,6 +274,7 @@ void evapi_recv_client(struct ev_loop *loop, struct ev_io *watcher, int revents)
 		evapi_run_cfg_route(&evenv, _evapi_rts.con_closed);
 		_evapi_clients[i].connected = 0;
 		_evapi_clients[i].sock = 0;
+		_evapi_clients[i].rpos = 0;
 		ev_io_stop(loop, watcher);
 		free(watcher);
 		LM_INFO("client closing connection - pos [%d] addr [%s:%d]\n",
@@ -272,41 +282,72 @@ void evapi_recv_client(struct ev_loop *loop, struct ev_io *watcher, int revents)
 		return;
 	}
 
-	rbuffer[rlen] = '\0';
+	_evapi_clients[i].rbuffer[_evapi_clients[i].rpos+rlen] = '\0';
 
 	LM_NOTICE("{%d} [%s:%d] - received [%.*s]\n",
 			i, _evapi_clients[i].src_addr, _evapi_clients[i].src_port,
-			(int)rlen, rbuffer);
+			(int)rlen, _evapi_clients[i].rbuffer+_evapi_clients[i].rpos);
 	evenv.conidx = i;
 	evenv.eset = 1;
 	if(_evapi_netstring_format) {
 		/* netstring decapsulation */
 		k = 0;
-		while(k<rlen) {
+		while(k<_evapi_clients[i].rpos+rlen) {
 			frame.len = 0;
-			while(k<rlen) {
-				if(rbuffer[k]==' ' || rbuffer[k]=='\t'
-						|| rbuffer[k]=='\r' || rbuffer[k]=='\n')
+			while(k<_evapi_clients[i].rpos+rlen) {
+				if(_evapi_clients[i].rbuffer[k]==' '
+						|| _evapi_clients[i].rbuffer[k]=='\t'
+						|| _evapi_clients[i].rbuffer[k]=='\r'
+						|| _evapi_clients[i].rbuffer[k]=='\n')
 					k++;
 				else break;
 			}
-			if(k==rlen) return;
-			while(k<rlen) {
-				if(rbuffer[k]>='0' && rbuffer[k]<='9') {
-					frame.len = frame.len*10 + rbuffer[k] - '0';
+			if(k==_evapi_clients[i].rpos+rlen) {
+				_evapi_clients[i].rpos = 0;
+				return;
+			}
+			/* pointer to start of whole frame */
+			sfp = _evapi_clients[i].rbuffer + k;
+			while(k<_evapi_clients[i].rpos+rlen) {
+				if(_evapi_clients[i].rbuffer[k]>='0' && _evapi_clients[i].rbuffer[k]<='9') {
+					frame.len = frame.len*10 + _evapi_clients[i].rbuffer[k] - '0';
 				} else {
-					if(rbuffer[k]==':')
+					if(_evapi_clients[i].rbuffer[k]==':')
 						break;
 					/* invalid character - discard the rest */
+					_evapi_clients[i].rpos = 0;
 					return;
 				}
 				k++;
 			}
-			if(k==rlen || frame.len<=0) return;
-			if(frame.len + k>=rlen) return;
+			if(k==_evapi_clients[i].rpos+rlen || frame.len<=0) {
+				_evapi_clients[i].rpos = 0;
+				return;
+			}
+			if(frame.len + k>=_evapi_clients[i].rpos + rlen) {
+				/* partial data - shift back in buffer and wait to read more */
+				efp = _evapi_clients[i].rbuffer + _evapi_clients[i].rpos + rlen;
+				if(efp<=sfp) {
+					_evapi_clients[i].rpos = 0;
+					return;
+				}
+				_evapi_clients[i].rpos = (unsigned int)(efp-sfp);
+				if(efp-sfp > sfp-_evapi_clients[i].rbuffer) {
+					memcpy(_evapi_clients[i].rbuffer, sfp, _evapi_clients[i].rpos);
+				} else {
+					for(k=0; k<_evapi_clients[i].rpos; k++) {
+						_evapi_clients[i].rbuffer[k] = sfp[k];
+					}
+				}
+				return;
+			}
 			k++;
-			frame.s = rbuffer + k;
-			if(frame.s[frame.len]!=',') return;
+			frame.s = _evapi_clients[i].rbuffer + k;
+			if(frame.s[frame.len]!=',') {
+				/* invalid data - discard and reset buffer */
+				_evapi_clients[i].rpos = 0 ;
+				return;
+			}
 			frame.s[frame.len] = '\0';
 			k += frame.len ;
 			evenv.msg.s = frame.s;
@@ -315,7 +356,7 @@ void evapi_recv_client(struct ev_loop *loop, struct ev_io *watcher, int revents)
 			k++;
 		}
 	} else {
-		evenv.msg.s = rbuffer;
+		evenv.msg.s = _evapi_clients[i].rbuffer;
 		evenv.msg.len = rlen;
 		evapi_run_cfg_route(&evenv, _evapi_rts.msg_received);
 	}