|
@@ -652,7 +652,10 @@ inline static int _wbufq_add(struct tcp_connection* c, char* data,
|
|
q->first=wb;
|
|
q->first=wb;
|
|
q->last_used=0;
|
|
q->last_used=0;
|
|
q->offset=0;
|
|
q->offset=0;
|
|
- q->wr_timeout=get_ticks_raw()+cfg_get(tcp, tcp_cfg, tcp_wq_timeout);
|
|
|
|
|
|
+ q->wr_timeout=get_ticks_raw()+
|
|
|
|
+ ((c->state==S_CONN_CONNECT)?
|
|
|
|
+ S_TO_TICKS(cfg_get(tcp, tcp_cfg, connect_timeout_s)):
|
|
|
|
+ cfg_get(tcp, tcp_cfg, tcp_wq_timeout));
|
|
}else{
|
|
}else{
|
|
wb=q->last;
|
|
wb=q->last;
|
|
}
|
|
}
|
|
@@ -774,12 +777,10 @@ inline static int wbufq_run(int fd, struct tcp_connection* c, int* empty)
|
|
int n;
|
|
int n;
|
|
int ret;
|
|
int ret;
|
|
int block_size;
|
|
int block_size;
|
|
- ticks_t t;
|
|
|
|
char* buf;
|
|
char* buf;
|
|
|
|
|
|
*empty=0;
|
|
*empty=0;
|
|
ret=0;
|
|
ret=0;
|
|
- t=get_ticks_raw();
|
|
|
|
lock_get(&c->write_lock);
|
|
lock_get(&c->write_lock);
|
|
q=&c->wbuf_q;
|
|
q=&c->wbuf_q;
|
|
while(q->first){
|
|
while(q->first){
|
|
@@ -802,7 +803,6 @@ inline static int wbufq_run(int fd, struct tcp_connection* c, int* empty)
|
|
atomic_add_int((int*)tcp_total_wq, -n);
|
|
atomic_add_int((int*)tcp_total_wq, -n);
|
|
break;
|
|
break;
|
|
}
|
|
}
|
|
- q->wr_timeout=t+cfg_get(tcp, tcp_cfg, tcp_wq_timeout);
|
|
|
|
}else{
|
|
}else{
|
|
if (n<0){
|
|
if (n<0){
|
|
/* EINTR is handled inside _tcpconn_write_nb */
|
|
/* EINTR is handled inside _tcpconn_write_nb */
|
|
@@ -835,9 +835,12 @@ inline static int wbufq_run(int fd, struct tcp_connection* c, int* empty)
|
|
q->offset=0;
|
|
q->offset=0;
|
|
*empty=1;
|
|
*empty=1;
|
|
}
|
|
}
|
|
- if (unlikely(c->state==S_CONN_CONNECT && (ret>0)))
|
|
|
|
- c->state=S_CONN_OK;
|
|
|
|
lock_release(&c->write_lock);
|
|
lock_release(&c->write_lock);
|
|
|
|
+ if (likely(ret>0)){
|
|
|
|
+ q->wr_timeout=get_ticks_raw()+cfg_get(tcp, tcp_cfg, tcp_wq_timeout);
|
|
|
|
+ if (unlikely(c->state==S_CONN_CONNECT || c->state==S_CONN_ACCEPT))
|
|
|
|
+ c->state=S_CONN_OK;
|
|
|
|
+ }
|
|
return ret;
|
|
return ret;
|
|
}
|
|
}
|
|
|
|
|
|
@@ -1693,7 +1696,7 @@ no_id:
|
|
return -1;
|
|
return -1;
|
|
}
|
|
}
|
|
c=tcpconn_new(-1, &dst->to, from, 0, dst->proto,
|
|
c=tcpconn_new(-1, &dst->to, from, 0, dst->proto,
|
|
- S_CONN_PENDING);
|
|
|
|
|
|
+ S_CONN_CONNECT);
|
|
if (unlikely(c==0)){
|
|
if (unlikely(c==0)){
|
|
LOG(L_ERR, "ERROR: tcp_send %s: could not create new"
|
|
LOG(L_ERR, "ERROR: tcp_send %s: could not create new"
|
|
" connection\n",
|
|
" connection\n",
|
|
@@ -1738,6 +1741,9 @@ no_id:
|
|
DBG("tcp_send: pending write on new connection %p "
|
|
DBG("tcp_send: pending write on new connection %p "
|
|
" (%d/%d bytes written)\n", c, n, len);
|
|
" (%d/%d bytes written)\n", c, n, len);
|
|
if (n<0) n=0;
|
|
if (n<0) n=0;
|
|
|
|
+ else
|
|
|
|
+ c->state=S_CONN_OK; /* partial write => connect()
|
|
|
|
+ ended */
|
|
/* add to the write queue */
|
|
/* add to the write queue */
|
|
lock_get(&c->write_lock);
|
|
lock_get(&c->write_lock);
|
|
if (unlikely(_wbufq_insert(c, buf+n, len-n)<0)){
|
|
if (unlikely(_wbufq_insert(c, buf+n, len-n)<0)){
|
|
@@ -1785,6 +1791,7 @@ no_id:
|
|
goto conn_wait_error;
|
|
goto conn_wait_error;
|
|
}
|
|
}
|
|
LOG(L_INFO, "tcp_send: quick connect for %p\n", c);
|
|
LOG(L_INFO, "tcp_send: quick connect for %p\n", c);
|
|
|
|
+ c->state=S_CONN_OK;
|
|
/* send to tcp_main */
|
|
/* send to tcp_main */
|
|
response[0]=(long)c;
|
|
response[0]=(long)c;
|
|
response[1]=CONN_NEW_COMPLETE;
|
|
response[1]=CONN_NEW_COMPLETE;
|
|
@@ -1832,13 +1839,13 @@ get_fd:
|
|
if (unlikely(cfg_get(tcp, tcp_cfg, async) &&
|
|
if (unlikely(cfg_get(tcp, tcp_cfg, async) &&
|
|
(_wbufq_non_empty(c)
|
|
(_wbufq_non_empty(c)
|
|
#ifdef TCP_CONNECT_WAIT
|
|
#ifdef TCP_CONNECT_WAIT
|
|
- || (c->state==S_CONN_PENDING)
|
|
|
|
|
|
+ || (c->flags&F_CONN_PENDING)
|
|
#endif /* TCP_CONNECT_WAIT */
|
|
#endif /* TCP_CONNECT_WAIT */
|
|
) )){
|
|
) )){
|
|
lock_get(&c->write_lock);
|
|
lock_get(&c->write_lock);
|
|
if (likely(_wbufq_non_empty(c)
|
|
if (likely(_wbufq_non_empty(c)
|
|
#ifdef TCP_CONNECT_WAIT
|
|
#ifdef TCP_CONNECT_WAIT
|
|
- || (c->state==S_CONN_PENDING)
|
|
|
|
|
|
+ || (c->flags&F_CONN_PENDING)
|
|
#endif /* TCP_CONNECT_WAIT */
|
|
#endif /* TCP_CONNECT_WAIT */
|
|
|
|
|
|
)){
|
|
)){
|
|
@@ -1916,7 +1923,7 @@ send_it:
|
|
if (likely(cfg_get(tcp, tcp_cfg, async))){
|
|
if (likely(cfg_get(tcp, tcp_cfg, async))){
|
|
if (_wbufq_non_empty(c)
|
|
if (_wbufq_non_empty(c)
|
|
#ifdef TCP_CONNECT_WAIT
|
|
#ifdef TCP_CONNECT_WAIT
|
|
- || (c->state==S_CONN_PENDING)
|
|
|
|
|
|
+ || (c->flags&F_CONN_PENDING)
|
|
#endif /* TCP_CONNECT_WAIT */
|
|
#endif /* TCP_CONNECT_WAIT */
|
|
){
|
|
){
|
|
if (unlikely(_wbufq_add(c, buf, len)<0)){
|
|
if (unlikely(_wbufq_add(c, buf, len)<0)){
|
|
@@ -1953,6 +1960,9 @@ send_it:
|
|
((n>=0) || errno==EAGAIN || errno==EWOULDBLOCK)){
|
|
((n>=0) || errno==EAGAIN || errno==EWOULDBLOCK)){
|
|
enable_write_watch=_wbufq_empty(c);
|
|
enable_write_watch=_wbufq_empty(c);
|
|
if (n<0) n=0;
|
|
if (n<0) n=0;
|
|
|
|
+ else if (unlikely(c->state==S_CONN_CONNECT ||
|
|
|
|
+ c->state==S_CONN_ACCEPT))
|
|
|
|
+ c->state=S_CONN_OK; /* something was written */
|
|
if (unlikely(_wbufq_add(c, buf+n, len-n)<0)){
|
|
if (unlikely(_wbufq_add(c, buf+n, len-n)<0)){
|
|
lock_release(&c->write_lock);
|
|
lock_release(&c->write_lock);
|
|
n=-1;
|
|
n=-1;
|
|
@@ -2026,11 +2036,10 @@ error:
|
|
|
|
|
|
#ifdef TCP_ASYNC
|
|
#ifdef TCP_ASYNC
|
|
lock_release(&c->write_lock);
|
|
lock_release(&c->write_lock);
|
|
- if (likely(cfg_get(tcp, tcp_cfg, async))){
|
|
|
|
- if (unlikely(c->state==S_CONN_CONNECT))
|
|
|
|
- c->state=S_CONN_OK;
|
|
|
|
- }
|
|
|
|
#endif /* TCP_ASYNC */
|
|
#endif /* TCP_ASYNC */
|
|
|
|
+ /* in non-async mode here we're either in S_CONN_OK or S_CONN_ACCEPT*/
|
|
|
|
+ if (unlikely(c->state==S_CONN_CONNECT || c->state==S_CONN_ACCEPT))
|
|
|
|
+ c->state=S_CONN_OK;
|
|
end:
|
|
end:
|
|
#ifdef TCP_FD_CACHE
|
|
#ifdef TCP_FD_CACHE
|
|
if (unlikely((fd_cache_e==0) && use_fd_cache)){
|
|
if (unlikely((fd_cache_e==0) && use_fd_cache)){
|
|
@@ -2249,9 +2258,10 @@ inline static int tcpconn_chld_put(struct tcp_connection* tcpconn)
|
|
tcpconn->s, tcpconn->flags);
|
|
tcpconn->s, tcpconn->flags);
|
|
/* sanity checks */
|
|
/* sanity checks */
|
|
membar_read_atomic_op(); /* make sure we see the current flags */
|
|
membar_read_atomic_op(); /* make sure we see the current flags */
|
|
- if (unlikely(!(tcpconn->flags & F_CONN_FD_CLOSED) ||
|
|
|
|
- (tcpconn->flags &
|
|
|
|
- (F_CONN_HASHED|F_CONN_MAIN_TIMER|F_CONN_READ_W|F_CONN_WRITE_W)) )){
|
|
|
|
|
|
+ if (unlikely(!(tcpconn->flags & F_CONN_FD_CLOSED) ||
|
|
|
|
+ (tcpconn->flags &
|
|
|
|
+ (F_CONN_HASHED|F_CONN_MAIN_TIMER|
|
|
|
|
+ F_CONN_READ_W|F_CONN_WRITE_W)) )){
|
|
LOG(L_CRIT, "BUG: tcpconn_chld_put: %p bad flags = %0x\n",
|
|
LOG(L_CRIT, "BUG: tcpconn_chld_put: %p bad flags = %0x\n",
|
|
tcpconn, tcpconn->flags);
|
|
tcpconn, tcpconn->flags);
|
|
abort();
|
|
abort();
|
|
@@ -2978,8 +2988,8 @@ inline static int handle_ser_child(struct process_table* p, int fd_i)
|
|
case CONN_NEW_PENDING_WRITE:
|
|
case CONN_NEW_PENDING_WRITE:
|
|
/* received when a pending connect completes in the same
|
|
/* received when a pending connect completes in the same
|
|
* tcp_send() that initiated it
|
|
* tcp_send() that initiated it
|
|
- * the connection is already in the hash with S_CONN_PENDING
|
|
|
|
- * state (added by tcp_send()) and refcnt at least 1 (for the
|
|
|
|
|
|
+ * the connection is already in the hash with F_CONN_PENDING
|
|
|
|
+ * flag (added by tcp_send()) and refcnt at least 1 (for the
|
|
* hash)*/
|
|
* hash)*/
|
|
tcpconn->flags&=~(F_CONN_PENDING|F_CONN_FD_CLOSED);
|
|
tcpconn->flags&=~(F_CONN_PENDING|F_CONN_FD_CLOSED);
|
|
if (unlikely((tcpconn->state==S_CONN_BAD) || (fd==-1))){
|
|
if (unlikely((tcpconn->state==S_CONN_BAD) || (fd==-1))){
|
|
@@ -3003,7 +3013,6 @@ inline static int handle_ser_child(struct process_table* p, int fd_i)
|
|
tcpconn->timeout=t+con_lifetime;
|
|
tcpconn->timeout=t+con_lifetime;
|
|
nxt_timeout=con_lifetime;
|
|
nxt_timeout=con_lifetime;
|
|
if (unlikely(cmd==CONN_NEW_COMPLETE)){
|
|
if (unlikely(cmd==CONN_NEW_COMPLETE)){
|
|
- tcpconn->state=S_CONN_OK;
|
|
|
|
/* check if needs to be watched for write */
|
|
/* check if needs to be watched for write */
|
|
lock_get(&tcpconn->write_lock);
|
|
lock_get(&tcpconn->write_lock);
|
|
/* if queue non empty watch it for write */
|
|
/* if queue non empty watch it for write */
|
|
@@ -3023,10 +3032,6 @@ inline static int handle_ser_child(struct process_table* p, int fd_i)
|
|
F_CONN_WANTS_RD;
|
|
F_CONN_WANTS_RD;
|
|
}else{
|
|
}else{
|
|
/* CONN_NEW_PENDING_WRITE */
|
|
/* CONN_NEW_PENDING_WRITE */
|
|
- /* we don't know if we successfully sent anything, but
|
|
|
|
- for sure we haven't sent all what we wanted, so consider
|
|
|
|
- the connection in "connecting" state */
|
|
|
|
- tcpconn->state=S_CONN_CONNECT;
|
|
|
|
/* no need to check, we have something queued for write */
|
|
/* no need to check, we have something queued for write */
|
|
flags=POLLOUT;
|
|
flags=POLLOUT;
|
|
if (TICKS_LT(tcpconn->wbuf_q.wr_timeout, tcpconn->timeout)
|
|
if (TICKS_LT(tcpconn->wbuf_q.wr_timeout, tcpconn->timeout)
|