|
@@ -33,11 +33,14 @@
|
|
|
#include "global_data.h"
|
|
|
#include "list.h"
|
|
|
|
|
|
-#define EXPECT_SYNC 1
|
|
|
-#define IGNORE_RESULT 2
|
|
|
-#define IS_RESETTING 4
|
|
|
#define MS_IN_S 1000
|
|
|
|
|
|
+// Database connection state
|
|
|
+#define EXPECT_SYNC 1
|
|
|
+#define IDLE 2
|
|
|
+#define IGNORE_RESULT 4
|
|
|
+#define RESET 8
|
|
|
+
|
|
|
typedef struct {
|
|
|
list_t l;
|
|
|
PGconn *conn;
|
|
@@ -48,7 +51,7 @@ typedef struct {
|
|
|
size_t query_num;
|
|
|
uint_fast32_t flags;
|
|
|
int sd;
|
|
|
- h2o_timeout_entry_t timeout;
|
|
|
+ h2o_timer_t timer;
|
|
|
} db_conn_t;
|
|
|
|
|
|
typedef struct {
|
|
@@ -62,11 +65,11 @@ static int do_execute_query(db_conn_t *conn, db_query_param_t *param);
|
|
|
static void error_notification(db_conn_pool_t *pool, bool timeout, const char *error_string);
|
|
|
static void on_database_connect_error(db_conn_t *conn, bool timeout, const char *error_string);
|
|
|
static void on_database_connect_read_ready(h2o_socket_t *sock, const char *err);
|
|
|
-static void on_database_connect_timeout(h2o_timeout_entry_t *entry);
|
|
|
+static void on_database_connect_timeout(h2o_timer_t *timer);
|
|
|
static void on_database_connect_write_ready(h2o_socket_t *sock, const char *err);
|
|
|
static void on_database_error(db_conn_t *conn, const char *error_string);
|
|
|
static void on_database_read_ready(h2o_socket_t *sock, const char *err);
|
|
|
-static void on_database_timeout(h2o_timeout_entry_t *timeout);
|
|
|
+static void on_database_timeout(h2o_timer_t *timer);
|
|
|
static void on_database_write_ready(h2o_socket_t *sock, const char *err);
|
|
|
static void poll_database_connection(h2o_socket_t *sock, const char *err);
|
|
|
static void prepare_statements(db_conn_t *conn);
|
|
@@ -144,16 +147,16 @@ static int do_execute_query(db_conn_t *conn, db_query_param_t *param)
|
|
|
h2o_socket_notify_write(conn->sock, on_database_write_ready);
|
|
|
|
|
|
if (!conn->queries.head && !(conn->flags & (EXPECT_SYNC | IGNORE_RESULT))) {
|
|
|
- assert(!h2o_timeout_is_linked(&conn->timeout));
|
|
|
- conn->timeout.cb = on_database_timeout;
|
|
|
- h2o_timeout_link(conn->pool->loop, &conn->pool->timeout, &conn->timeout);
|
|
|
- h2o_socket_read_start(conn->sock, on_database_read_ready);
|
|
|
+ assert(!h2o_timer_is_linked(&conn->timer));
|
|
|
+ conn->timer.cb = on_database_timeout;
|
|
|
+ h2o_timer_link(conn->pool->loop, conn->pool->config->db_timeout * MS_IN_S, &conn->timer);
|
|
|
}
|
|
|
|
|
|
param->l.next = NULL;
|
|
|
*conn->queries.tail = ¶m->l;
|
|
|
conn->queries.tail = ¶m->l.next;
|
|
|
conn->query_num--;
|
|
|
+ conn->flags &= ~IDLE;
|
|
|
return 0;
|
|
|
}
|
|
|
|
|
@@ -188,7 +191,7 @@ static void on_database_connect_error(db_conn_t *conn, bool timeout, const char
|
|
|
{
|
|
|
db_conn_pool_t * const pool = conn->pool;
|
|
|
|
|
|
- h2o_timeout_unlink(&conn->timeout);
|
|
|
+ h2o_timer_unlink(&conn->timer);
|
|
|
h2o_socket_read_stop(conn->sock);
|
|
|
h2o_socket_close(conn->sock);
|
|
|
PQfinish(conn->conn);
|
|
@@ -233,8 +236,9 @@ static void on_database_connect_read_ready(h2o_socket_t *sock, const char *err)
|
|
|
break;
|
|
|
case PGRES_PIPELINE_SYNC:
|
|
|
PQclear(result);
|
|
|
- h2o_timeout_unlink(&conn->timeout);
|
|
|
+ h2o_timer_unlink(&conn->timer);
|
|
|
h2o_socket_read_stop(conn->sock);
|
|
|
+ h2o_socket_read_start(conn->sock, on_database_read_ready);
|
|
|
process_queries(conn);
|
|
|
return;
|
|
|
default:
|
|
@@ -249,9 +253,9 @@ static void on_database_connect_read_ready(h2o_socket_t *sock, const char *err)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-static void on_database_connect_timeout(h2o_timeout_entry_t *entry)
|
|
|
+static void on_database_connect_timeout(h2o_timer_t *timer)
|
|
|
{
|
|
|
- db_conn_t * const conn = H2O_STRUCT_FROM_MEMBER(db_conn_t, timeout, entry);
|
|
|
+ db_conn_t * const conn = H2O_STRUCT_FROM_MEMBER(db_conn_t, timer, timer);
|
|
|
|
|
|
ERROR(DB_TIMEOUT_ERROR);
|
|
|
on_database_connect_error(conn, true, DB_TIMEOUT_ERROR);
|
|
@@ -352,9 +356,11 @@ static void on_database_read_ready(h2o_socket_t *sock, const char *err)
|
|
|
|
|
|
if (param->on_result(param, result) == DONE) {
|
|
|
conn->query_num++;
|
|
|
- h2o_timeout_unlink(&conn->timeout);
|
|
|
- conn->timeout.cb = on_database_timeout;
|
|
|
- h2o_timeout_link(conn->pool->loop, &conn->pool->timeout, &conn->timeout);
|
|
|
+ h2o_timer_unlink(&conn->timer);
|
|
|
+ conn->timer.cb = on_database_timeout;
|
|
|
+ h2o_timer_link(conn->pool->loop,
|
|
|
+ conn->pool->config->db_timeout * MS_IN_S,
|
|
|
+ &conn->timer);
|
|
|
conn->flags |= EXPECT_SYNC;
|
|
|
conn->queries.head = next;
|
|
|
|
|
@@ -369,18 +375,20 @@ static void on_database_read_ready(h2o_socket_t *sock, const char *err)
|
|
|
}
|
|
|
else {
|
|
|
assert(!result);
|
|
|
- h2o_timeout_unlink(&conn->timeout);
|
|
|
- h2o_socket_read_stop(conn->sock);
|
|
|
+ h2o_timer_unlink(&conn->timer);
|
|
|
break;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ for (PGnotify *notify = PQnotifies(conn->conn); notify; notify = PQnotifies(conn->conn))
|
|
|
+ PQfreemem(notify);
|
|
|
+
|
|
|
process_queries(conn);
|
|
|
}
|
|
|
|
|
|
-static void on_database_timeout(h2o_timeout_entry_t *timeout)
|
|
|
+static void on_database_timeout(h2o_timer_t *timer)
|
|
|
{
|
|
|
- db_conn_t * const conn = H2O_STRUCT_FROM_MEMBER(db_conn_t, timeout, timeout);
|
|
|
+ db_conn_t * const conn = H2O_STRUCT_FROM_MEMBER(db_conn_t, timer, timer);
|
|
|
|
|
|
ERROR(DB_TIMEOUT_ERROR);
|
|
|
|
|
@@ -425,7 +433,7 @@ static void poll_database_connection(h2o_socket_t *sock, const char *err)
|
|
|
if (err)
|
|
|
ERROR(err);
|
|
|
else {
|
|
|
- const PostgresPollingStatusType status = conn->flags & IS_RESETTING ?
|
|
|
+ const PostgresPollingStatusType status = conn->flags & RESET ?
|
|
|
PQresetPoll(conn->conn) :
|
|
|
PQconnectPoll(conn->conn);
|
|
|
const int sd = PQsocket(conn->conn);
|
|
@@ -450,7 +458,7 @@ static void poll_database_connection(h2o_socket_t *sock, const char *err)
|
|
|
|
|
|
return;
|
|
|
case PGRES_POLLING_OK:
|
|
|
- h2o_timeout_unlink(&conn->timeout);
|
|
|
+ h2o_timer_unlink(&conn->timer);
|
|
|
h2o_socket_read_stop(conn->sock);
|
|
|
|
|
|
if (PQsetnonblocking(conn->conn, 1)) {
|
|
@@ -474,6 +482,7 @@ static void poll_database_connection(h2o_socket_t *sock, const char *err)
|
|
|
conn->sock = sock;
|
|
|
}
|
|
|
|
|
|
+ conn->flags &= ~RESET;
|
|
|
prepare_statements(conn);
|
|
|
return;
|
|
|
case PGRES_POLLING_READING:
|
|
@@ -525,13 +534,15 @@ static void prepare_statements(db_conn_t *conn)
|
|
|
}
|
|
|
|
|
|
conn->prepared_statement = NULL;
|
|
|
- conn->timeout.cb = on_database_connect_timeout;
|
|
|
- h2o_timeout_link(conn->pool->loop, &conn->pool->timeout, &conn->timeout);
|
|
|
+ conn->timer.cb = on_database_connect_timeout;
|
|
|
+ h2o_timer_link(conn->pool->loop, conn->pool->config->db_timeout * MS_IN_S, &conn->timer);
|
|
|
h2o_socket_read_start(conn->sock, on_database_connect_read_ready);
|
|
|
on_database_connect_write_ready(conn->sock, NULL);
|
|
|
}
|
|
|
- else
|
|
|
+ else {
|
|
|
+ h2o_socket_read_start(conn->sock, on_database_read_ready);
|
|
|
process_queries(conn);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
static void process_queries(db_conn_t *conn)
|
|
@@ -558,9 +569,10 @@ static void process_queries(db_conn_t *conn)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- if (!conn->queries.head && !(conn->flags & (EXPECT_SYNC | IGNORE_RESULT))) {
|
|
|
+ if (!conn->queries.head && !(conn->flags & (EXPECT_SYNC | IDLE | IGNORE_RESULT))) {
|
|
|
conn->l.next = conn->pool->conn;
|
|
|
conn->pool->conn = &conn->l;
|
|
|
+ conn->flags |= IDLE;
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -569,7 +581,7 @@ static void start_database_connect(db_conn_pool_t *pool, db_conn_t *conn)
|
|
|
if (conn) {
|
|
|
PGconn * const c = conn->conn;
|
|
|
|
|
|
- h2o_timeout_unlink(&conn->timeout);
|
|
|
+ h2o_timer_unlink(&conn->timer);
|
|
|
h2o_socket_read_stop(conn->sock);
|
|
|
h2o_socket_close(conn->sock);
|
|
|
|
|
@@ -580,7 +592,7 @@ static void start_database_connect(db_conn_pool_t *pool, db_conn_t *conn)
|
|
|
|
|
|
memset(conn, 0, sizeof(*conn));
|
|
|
conn->conn = c;
|
|
|
- conn->flags = IS_RESETTING;
|
|
|
+ conn->flags = RESET;
|
|
|
}
|
|
|
else {
|
|
|
assert(pool->conn_num);
|
|
@@ -610,8 +622,8 @@ static void start_database_connect(db_conn_pool_t *pool, db_conn_t *conn)
|
|
|
conn->prepared_statement = pool->prepared_statements;
|
|
|
conn->queries.tail = &conn->queries.head;
|
|
|
conn->query_num = pool->config->max_pipeline_query_num;
|
|
|
- conn->timeout.cb = on_database_connect_timeout;
|
|
|
- h2o_timeout_link(pool->loop, &pool->timeout, &conn->timeout);
|
|
|
+ conn->timer.cb = on_database_connect_timeout;
|
|
|
+ h2o_timer_link(pool->loop, pool->config->db_timeout * MS_IN_S, &conn->timer);
|
|
|
h2o_socket_notify_write(conn->sock, poll_database_connection);
|
|
|
return;
|
|
|
}
|
|
@@ -642,6 +654,7 @@ int execute_database_query(db_conn_pool_t *pool, db_query_param_t *param)
|
|
|
db_conn_t * const conn = H2O_STRUCT_FROM_MEMBER(db_conn_t, l, pool->conn);
|
|
|
|
|
|
assert(!conn->queries.head);
|
|
|
+ assert(conn->flags & IDLE);
|
|
|
assert(!(conn->flags & (EXPECT_SYNC | IGNORE_RESULT)));
|
|
|
pool->conn = conn->l.next;
|
|
|
ret = do_execute_query(conn, param);
|
|
@@ -684,8 +697,9 @@ void free_database_connection_pool(db_conn_pool_t *pool)
|
|
|
|
|
|
assert(!conn->queries.head);
|
|
|
assert(conn->query_num == pool->config->max_pipeline_query_num);
|
|
|
- assert(!(conn->flags & (EXPECT_SYNC | IGNORE_RESULT)));
|
|
|
- assert(!h2o_timeout_is_linked(&conn->timeout));
|
|
|
+ assert(conn->flags & IDLE);
|
|
|
+ assert(!(conn->flags & (EXPECT_SYNC | IGNORE_RESULT | RESET)));
|
|
|
+ assert(!h2o_timer_is_linked(&conn->timer));
|
|
|
h2o_socket_read_stop(conn->sock);
|
|
|
h2o_socket_close(conn->sock);
|
|
|
PQfinish(conn->conn);
|
|
@@ -695,7 +709,6 @@ void free_database_connection_pool(db_conn_pool_t *pool)
|
|
|
} while (pool->conn);
|
|
|
|
|
|
assert(num + pool->conn_num == pool->config->max_db_conn_num);
|
|
|
- h2o_timeout_dispose(pool->loop, &pool->timeout);
|
|
|
}
|
|
|
|
|
|
void initialize_database_connection_pool(const char *conninfo,
|
|
@@ -712,7 +725,6 @@ void initialize_database_connection_pool(const char *conninfo,
|
|
|
pool->queries.tail = &pool->queries.head;
|
|
|
pool->conn_num = config->max_db_conn_num;
|
|
|
pool->query_num = config->max_query_num;
|
|
|
- h2o_timeout_init(loop, &pool->timeout, config->db_timeout * MS_IN_S);
|
|
|
}
|
|
|
|
|
|
void remove_prepared_statements(list_t *prepared_statements)
|