|
@@ -33,11 +33,14 @@
|
|
|
#include "global_data.h"
|
|
|
#include "list.h"
|
|
|
|
|
|
-#define EXPECT_SYNC 1
|
|
|
-#define IGNORE_RESULT 2
|
|
|
-#define IS_RESETTING 4
|
|
|
#define MS_IN_S 1000
|
|
|
|
|
|
+// Database connection state
|
|
|
+#define EXPECT_SYNC 1
|
|
|
+#define IDLE 2
|
|
|
+#define IGNORE_RESULT 4
|
|
|
+#define RESET 8
|
|
|
+
|
|
|
typedef struct {
|
|
|
list_t l;
|
|
|
PGconn *conn;
|
|
@@ -147,13 +150,13 @@ static int do_execute_query(db_conn_t *conn, db_query_param_t *param)
|
|
|
assert(!h2o_timeout_is_linked(&conn->timeout));
|
|
|
conn->timeout.cb = on_database_timeout;
|
|
|
h2o_timeout_link(conn->pool->loop, &conn->pool->timeout, &conn->timeout);
|
|
|
- h2o_socket_read_start(conn->sock, on_database_read_ready);
|
|
|
}
|
|
|
|
|
|
param->l.next = NULL;
|
|
|
*conn->queries.tail = ¶m->l;
|
|
|
conn->queries.tail = ¶m->l.next;
|
|
|
conn->query_num--;
|
|
|
+ conn->flags &= ~IDLE;
|
|
|
return 0;
|
|
|
}
|
|
|
|
|
@@ -235,6 +238,7 @@ static void on_database_connect_read_ready(h2o_socket_t *sock, const char *err)
|
|
|
PQclear(result);
|
|
|
h2o_timeout_unlink(&conn->timeout);
|
|
|
h2o_socket_read_stop(conn->sock);
|
|
|
+ h2o_socket_read_start(conn->sock, on_database_read_ready);
|
|
|
process_queries(conn);
|
|
|
return;
|
|
|
default:
|
|
@@ -370,11 +374,13 @@ static void on_database_read_ready(h2o_socket_t *sock, const char *err)
|
|
|
else {
|
|
|
assert(!result);
|
|
|
h2o_timeout_unlink(&conn->timeout);
|
|
|
- h2o_socket_read_stop(conn->sock);
|
|
|
break;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ for (PGnotify *notify = PQnotifies(conn->conn); notify; notify = PQnotifies(conn->conn))
|
|
|
+ PQfreemem(notify);
|
|
|
+
|
|
|
process_queries(conn);
|
|
|
}
|
|
|
|
|
@@ -425,7 +431,7 @@ static void poll_database_connection(h2o_socket_t *sock, const char *err)
|
|
|
if (err)
|
|
|
ERROR(err);
|
|
|
else {
|
|
|
- const PostgresPollingStatusType status = conn->flags & IS_RESETTING ?
|
|
|
+ const PostgresPollingStatusType status = conn->flags & RESET ?
|
|
|
PQresetPoll(conn->conn) :
|
|
|
PQconnectPoll(conn->conn);
|
|
|
const int sd = PQsocket(conn->conn);
|
|
@@ -474,6 +480,7 @@ static void poll_database_connection(h2o_socket_t *sock, const char *err)
|
|
|
conn->sock = sock;
|
|
|
}
|
|
|
|
|
|
+ conn->flags &= ~RESET;
|
|
|
prepare_statements(conn);
|
|
|
return;
|
|
|
case PGRES_POLLING_READING:
|
|
@@ -530,8 +537,10 @@ static void prepare_statements(db_conn_t *conn)
|
|
|
h2o_socket_read_start(conn->sock, on_database_connect_read_ready);
|
|
|
on_database_connect_write_ready(conn->sock, NULL);
|
|
|
}
|
|
|
- else
|
|
|
+ else {
|
|
|
+ h2o_socket_read_start(conn->sock, on_database_read_ready);
|
|
|
process_queries(conn);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
static void process_queries(db_conn_t *conn)
|
|
@@ -558,9 +567,10 @@ static void process_queries(db_conn_t *conn)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- if (!conn->queries.head && !(conn->flags & (EXPECT_SYNC | IGNORE_RESULT))) {
|
|
|
+ if (!conn->queries.head && !(conn->flags & (EXPECT_SYNC | IDLE | IGNORE_RESULT))) {
|
|
|
conn->l.next = conn->pool->conn;
|
|
|
conn->pool->conn = &conn->l;
|
|
|
+ conn->flags |= IDLE;
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -580,7 +590,7 @@ static void start_database_connect(db_conn_pool_t *pool, db_conn_t *conn)
|
|
|
|
|
|
memset(conn, 0, sizeof(*conn));
|
|
|
conn->conn = c;
|
|
|
- conn->flags = IS_RESETTING;
|
|
|
+ conn->flags = RESET;
|
|
|
}
|
|
|
else {
|
|
|
assert(pool->conn_num);
|
|
@@ -642,6 +652,7 @@ int execute_database_query(db_conn_pool_t *pool, db_query_param_t *param)
|
|
|
db_conn_t * const conn = H2O_STRUCT_FROM_MEMBER(db_conn_t, l, pool->conn);
|
|
|
|
|
|
assert(!conn->queries.head);
|
|
|
+ assert(conn->flags & IDLE);
|
|
|
assert(!(conn->flags & (EXPECT_SYNC | IGNORE_RESULT)));
|
|
|
pool->conn = conn->l.next;
|
|
|
ret = do_execute_query(conn, param);
|
|
@@ -684,7 +695,8 @@ void free_database_connection_pool(db_conn_pool_t *pool)
|
|
|
|
|
|
assert(!conn->queries.head);
|
|
|
assert(conn->query_num == pool->config->max_pipeline_query_num);
|
|
|
- assert(!(conn->flags & (EXPECT_SYNC | IGNORE_RESULT)));
|
|
|
+ assert(conn->flags & IDLE);
|
|
|
+ assert(!(conn->flags & (EXPECT_SYNC | IGNORE_RESULT | RESET)));
|
|
|
assert(!h2o_timeout_is_linked(&conn->timeout));
|
|
|
h2o_socket_read_stop(conn->sock);
|
|
|
h2o_socket_close(conn->sock);
|