Browse Source

core: tcp - added core parameter tcp_wait_data

- specify how long to wait (in milliseconds) to wait for data on tcp
connections in certain cases
- default is 5000ms (5secs)
- now applies when reading on tcp connection for haproxy protocol
Daniel-Constantin Mierla 3 years ago
parent
commit
7a5ae41ec7
5 changed files with 57 additions and 1 deletions
  1. 3 0
      src/core/cfg.lex
  2. 9 0
      src/core/cfg.y
  3. 41 1
      src/core/tcp_main.c
  4. 3 0
      src/core/tcp_options.c
  5. 1 0
      src/core/tcp_options.h

+ 3 - 0
src/core/cfg.lex

@@ -422,6 +422,7 @@ TCP_OPT_ACCEPT_HEP3	"tcp_accept_hep3"
 TCP_OPT_ACCEPT_HAPROXY	"tcp_accept_haproxy"
 TCP_CLONE_RCVBUF	"tcp_clone_rcvbuf"
 TCP_REUSE_PORT		"tcp_reuse_port"
+TCP_WAIT_DATA	"tcp_wait_data"
 DISABLE_TLS		"disable_tls"|"tls_disable"
 ENABLE_TLS		"enable_tls"|"tls_enable"
 TLSLOG			"tlslog"|"tls_log"
@@ -916,6 +917,8 @@ IMPORTFILE      "import_file"
 <INITIAL>{TCP_CLONE_RCVBUF}		{ count(); yylval.strval=yytext;
 									return TCP_CLONE_RCVBUF; }
 <INITIAL>{TCP_REUSE_PORT}	{ count(); yylval.strval=yytext; return TCP_REUSE_PORT; }
+<INITIAL>{TCP_WAIT_DATA}	{ count(); yylval.strval=yytext;
+									return TCP_WAIT_DATA; }
 <INITIAL>{DISABLE_TLS}	{ count(); yylval.strval=yytext; return DISABLE_TLS; }
 <INITIAL>{ENABLE_TLS}	{ count(); yylval.strval=yytext; return ENABLE_TLS; }
 <INITIAL>{TLSLOG}		{ count(); yylval.strval=yytext; return TLS_PORT_NO; }

+ 9 - 0
src/core/cfg.y

@@ -451,6 +451,7 @@ extern char *default_routename;
 %token TCP_OPT_ACCEPT_HAPROXY
 %token TCP_CLONE_RCVBUF
 %token TCP_REUSE_PORT
+%token TCP_WAIT_DATA
 %token DISABLE_TLS
 %token ENABLE_TLS
 %token TLSLOG
@@ -1319,6 +1320,14 @@ assign_stm:
 		#endif
 	}
 	| TCP_REUSE_PORT EQUAL error { yyerror("boolean value expected"); }
+	| TCP_WAIT_DATA EQUAL intno {
+		#ifdef USE_TCP
+			tcp_default_cfg.wait_data_ms=$3;
+		#else
+			warn("tcp support not compiled in");
+		#endif
+	}
+	| TCP_WAIT_DATA EQUAL error { yyerror("number expected"); }
 	| DISABLE_TLS EQUAL NUMBER {
 		#ifdef USE_TLS
 			tls_disable=$3;

+ 41 - 1
src/core/tcp_main.c

@@ -908,6 +908,8 @@ int tcpconn_read_haproxy(struct tcp_connection *c) {
 	uint32_t size, port;
 	char *p, *end;
 	struct ip_addr *src_ip, *dst_ip;
+	int twaitms = 0;
+	int tsleepus = 0;
 
 	const char v2sig[12] = "\x0D\x0A\x0D\x0A\x00\x0D\x0A\x51\x55\x49\x54\x0A";
 
@@ -949,10 +951,35 @@ int tcpconn_read_haproxy(struct tcp_connection *c) {
 
 	} hdr;
 
+	if(cfg_get(tcp, tcp_cfg, wait_data_ms) > 10000) {
+		tsleepus = 100000;
+	} else if (cfg_get(tcp, tcp_cfg, wait_data_ms) < 1000) {
+		tsleepus = 50000;
+	} else {
+		tsleepus = 10 * cfg_get(tcp, tcp_cfg, wait_data_ms);
+	}
+
+	twaitms = 0;
 	do {
 		bytes = recv(c->s, &hdr, sizeof(hdr), MSG_PEEK);
-	} while (bytes == -1 && (errno == EINTR || errno == EAGAIN));
+		if(bytes==-1 && (errno == EINTR || errno == EAGAIN)) {
+			if(twaitms <= cfg_get(tcp, tcp_cfg, wait_data_ms)) {
+				/* LM_DBG("bytes: %d - errno: %d (%d/%d) - twait: %dms\n", bytes,
+						errno, EINTR, EAGAIN, twaitms); */
+				sleep_us(tsleepus);
+				twaitms += tsleepus/1000;
+			} else {
+				break;
+			}
+		} else {
+			break;
+		}
+	} while (1);
 
+	if(bytes == -1) {
+		/* no data received during tcp_wait_data */
+		return -1;
+	}
 	/* copy original tunnel address details */
 	memcpy(&c->cinfo.src_ip, &c->rcv.src_ip, sizeof(ip_addr_t));
 	memcpy(&c->cinfo.dst_ip, &c->rcv.dst_ip, sizeof(ip_addr_t));
@@ -1115,8 +1142,21 @@ int tcpconn_read_haproxy(struct tcp_connection *c) {
 
 done:
 	/* we need to consume the appropriate amount of data from the socket */
+	twaitms = 0;
 	do {
 		bytes = recv(c->s, &hdr, size, 0);
+		if(bytes==-1 && errno == EINTR)) {
+			if(twaitms <= cfg_get(tcp, tcp_cfg, wait_data_ms)) {
+				/* LM_DBG("bytes: %d - errno: %d (%d/%d) - twait: %dms\n", bytes,
+						errno, EINTR, EAGAIN, twaitms); */
+				sleep_us(tsleepus);
+				twaitms += tsleepus/1000;
+			} else {
+				break;
+			}
+		} else {
+			break;
+		}
 	} while (bytes == -1 && errno == EINTR);
 
 	return (bytes >= 0) ? retval : -1;

+ 3 - 0
src/core/tcp_options.c

@@ -108,6 +108,8 @@ static cfg_def_t tcp_cfg_def[] = {
 		"accept TCP messages without Content-Length "},
 	{ "reuse_port",   CFG_VAR_INT | CFG_ATOMIC,   0,        1,  0,         0,
 		"reuse TCP ports "},
+	{ "wait_data_ms",  CFG_VAR_INT | CFG_ATOMIC,      0, 7200000, 0,        0,
+		"wait for data on new tcp connetions (milliseconds)"},
 	/* internal and/or "fixed" versions of some vars
 	   (not supposed to be writeable, read will provide only debugging value*/
 	{ "rd_buf_size", CFG_VAR_INT | CFG_ATOMIC,    512,    16777216,  0,         0,
@@ -164,6 +166,7 @@ void init_tcp_options()
 	tcp_default_cfg.rd_buf_size=DEFAULT_TCP_BUF_SIZE;
 	tcp_default_cfg.wq_blk_size=DEFAULT_TCP_WBUF_SIZE;
 	tcp_default_cfg.reuse_port=0;
+	tcp_default_cfg.wait_data_ms=5000;
 }
 
 

+ 1 - 0
src/core/tcp_options.h

@@ -138,6 +138,7 @@ struct cfg_group_tcp{
 	int new_conn_alias_flags;
 	int accept_no_cl;  /* on/off - accept messages without content-length */
 	int reuse_port;  /* enable SO_REUSEPORT */
+	int wait_data_ms;  /* wait for data in milliseconds */
 
 	/* internal, "fixed" vars */
 	unsigned int rd_buf_size; /* read buffer size (should be > max. datagram)*/