|
@@ -75,7 +75,7 @@ static void on_database_write_ready(h2o_socket_t *sock, const char *err);
|
|
|
static void on_process_queries(void *arg);
|
|
static void on_process_queries(void *arg);
|
|
|
static void poll_database_connection(h2o_socket_t *sock, const char *err);
|
|
static void poll_database_connection(h2o_socket_t *sock, const char *err);
|
|
|
static void prepare_statements(db_conn_t *conn);
|
|
static void prepare_statements(db_conn_t *conn);
|
|
|
-static void process_queries(db_conn_t *conn);
|
|
|
|
|
|
|
+static void process_queries(db_conn_t *conn, bool removed);
|
|
|
static void remove_connection(db_conn_t *conn);
|
|
static void remove_connection(db_conn_t *conn);
|
|
|
static void start_database_connect(db_conn_pool_t *pool, db_conn_t *conn);
|
|
static void start_database_connect(db_conn_pool_t *pool, db_conn_t *conn);
|
|
|
|
|
|
|
@@ -237,7 +237,7 @@ static void on_database_connect_read_ready(h2o_socket_t *sock, const char *err)
|
|
|
h2o_timer_unlink(&conn->timer);
|
|
h2o_timer_unlink(&conn->timer);
|
|
|
h2o_socket_read_stop(conn->sock);
|
|
h2o_socket_read_stop(conn->sock);
|
|
|
h2o_socket_read_start(conn->sock, on_database_read_ready);
|
|
h2o_socket_read_start(conn->sock, on_database_read_ready);
|
|
|
- process_queries(conn);
|
|
|
|
|
|
|
+ process_queries(conn, true);
|
|
|
return;
|
|
return;
|
|
|
default:
|
|
default:
|
|
|
LIBRARY_ERROR("PQresultStatus", PQresultErrorMessage(result));
|
|
LIBRARY_ERROR("PQresultStatus", PQresultErrorMessage(result));
|
|
@@ -273,6 +273,8 @@ static void on_database_connect_write_ready(h2o_socket_t *sock, const char *err)
|
|
|
|
|
|
|
|
static void on_database_error(db_conn_t *conn, const char *error_string)
|
|
static void on_database_error(db_conn_t *conn, const char *error_string)
|
|
|
{
|
|
{
|
|
|
|
|
+ remove_connection(conn);
|
|
|
|
|
+
|
|
|
if (conn->queries.head)
|
|
if (conn->queries.head)
|
|
|
do {
|
|
do {
|
|
|
db_query_param_t * const param = H2O_STRUCT_FROM_MEMBER(db_query_param_t,
|
|
db_query_param_t * const param = H2O_STRUCT_FROM_MEMBER(db_query_param_t,
|
|
@@ -291,9 +293,6 @@ static void on_database_read_ready(h2o_socket_t *sock, const char *err)
|
|
|
{
|
|
{
|
|
|
db_conn_t * const conn = sock->data;
|
|
db_conn_t * const conn = sock->data;
|
|
|
|
|
|
|
|
- // This call should not be problematic, assuming a relatively low number of connections.
|
|
|
|
|
- remove_connection(conn);
|
|
|
|
|
-
|
|
|
|
|
if (err) {
|
|
if (err) {
|
|
|
ERROR(err);
|
|
ERROR(err);
|
|
|
on_database_error(conn, err);
|
|
on_database_error(conn, err);
|
|
@@ -311,6 +310,8 @@ static void on_database_read_ready(h2o_socket_t *sock, const char *err)
|
|
|
return;
|
|
return;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+ const bool removed = !conn->query_num;
|
|
|
|
|
+
|
|
|
while (!PQisBusy(conn->conn)) {
|
|
while (!PQisBusy(conn->conn)) {
|
|
|
PGresult * const result = PQgetResult(conn->conn);
|
|
PGresult * const result = PQgetResult(conn->conn);
|
|
|
|
|
|
|
@@ -369,14 +370,13 @@ static void on_database_read_ready(h2o_socket_t *sock, const char *err)
|
|
|
for (PGnotify *notify = PQnotifies(conn->conn); notify; notify = PQnotifies(conn->conn))
|
|
for (PGnotify *notify = PQnotifies(conn->conn); notify; notify = PQnotifies(conn->conn))
|
|
|
PQfreemem(notify);
|
|
PQfreemem(notify);
|
|
|
|
|
|
|
|
- process_queries(conn);
|
|
|
|
|
|
|
+ process_queries(conn, removed);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
static void on_database_timeout(h2o_timer_t *timer)
|
|
static void on_database_timeout(h2o_timer_t *timer)
|
|
|
{
|
|
{
|
|
|
db_conn_t * const conn = H2O_STRUCT_FROM_MEMBER(db_conn_t, timer, timer);
|
|
db_conn_t * const conn = H2O_STRUCT_FROM_MEMBER(db_conn_t, timer, timer);
|
|
|
|
|
|
|
|
- remove_connection(conn);
|
|
|
|
|
ERROR(DB_TIMEOUT_ERROR);
|
|
ERROR(DB_TIMEOUT_ERROR);
|
|
|
|
|
|
|
|
if (conn->queries.head) {
|
|
if (conn->queries.head) {
|
|
@@ -397,13 +397,10 @@ static void on_database_write_ready(h2o_socket_t *sock, const char *err)
|
|
|
|
|
|
|
|
if (err) {
|
|
if (err) {
|
|
|
ERROR(err);
|
|
ERROR(err);
|
|
|
- remove_connection(conn);
|
|
|
|
|
on_database_error(conn, err);
|
|
on_database_error(conn, err);
|
|
|
}
|
|
}
|
|
|
- else if (flush_connection(on_database_write_ready, conn)) {
|
|
|
|
|
- remove_connection(conn);
|
|
|
|
|
|
|
+ else if (flush_connection(on_database_write_ready, conn))
|
|
|
on_database_error(conn, DB_ERROR);
|
|
on_database_error(conn, DB_ERROR);
|
|
|
- }
|
|
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
static void on_process_queries(void *arg)
|
|
static void on_process_queries(void *arg)
|
|
@@ -415,7 +412,7 @@ static void on_process_queries(void *arg)
|
|
|
|
|
|
|
|
pool->conn = conn->l.next;
|
|
pool->conn = conn->l.next;
|
|
|
assert(conn->query_num);
|
|
assert(conn->query_num);
|
|
|
- process_queries(conn);
|
|
|
|
|
|
|
+ process_queries(conn, true);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
if (pool->queries.head && pool->conn_num)
|
|
if (pool->queries.head && pool->conn_num)
|
|
@@ -539,11 +536,11 @@ static void prepare_statements(db_conn_t *conn)
|
|
|
}
|
|
}
|
|
|
else {
|
|
else {
|
|
|
h2o_socket_read_start(conn->sock, on_database_read_ready);
|
|
h2o_socket_read_start(conn->sock, on_database_read_ready);
|
|
|
- process_queries(conn);
|
|
|
|
|
|
|
+ process_queries(conn, true);
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-static void process_queries(db_conn_t *conn)
|
|
|
|
|
|
|
+static void process_queries(db_conn_t *conn, bool removed)
|
|
|
{
|
|
{
|
|
|
const bool flush = conn->query_num && conn->pool->queries.head;
|
|
const bool flush = conn->query_num && conn->pool->queries.head;
|
|
|
|
|
|
|
@@ -561,20 +558,20 @@ static void process_queries(db_conn_t *conn)
|
|
|
|
|
|
|
|
if (do_execute_query(conn, param)) {
|
|
if (do_execute_query(conn, param)) {
|
|
|
param->on_error(param, DB_ERROR);
|
|
param->on_error(param, DB_ERROR);
|
|
|
-
|
|
|
|
|
- if (PQstatus(conn->conn) != CONNECTION_OK) {
|
|
|
|
|
- on_database_error(conn, DB_ERROR);
|
|
|
|
|
- return;
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ on_database_error(conn, DB_ERROR);
|
|
|
|
|
+ return;
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
if (flush && flush_connection(on_database_write_ready, conn))
|
|
if (flush && flush_connection(on_database_write_ready, conn))
|
|
|
on_database_error(conn, DB_ERROR);
|
|
on_database_error(conn, DB_ERROR);
|
|
|
- else if (conn->query_num) {
|
|
|
|
|
|
|
+ else if (conn->query_num && removed) {
|
|
|
conn->l.next = conn->pool->conn;
|
|
conn->l.next = conn->pool->conn;
|
|
|
conn->pool->conn = &conn->l;
|
|
conn->pool->conn = &conn->l;
|
|
|
}
|
|
}
|
|
|
|
|
+ else if (!conn->query_num && !removed)
|
|
|
|
|
+ // This call should not be problematic, assuming a relatively low number of connections.
|
|
|
|
|
+ remove_connection(conn);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
static void remove_connection(db_conn_t *conn)
|
|
static void remove_connection(db_conn_t *conn)
|
|
@@ -667,7 +664,7 @@ int execute_database_query(db_conn_pool_t *pool, db_query_param_t *param)
|
|
|
if (pool->conn) {
|
|
if (pool->conn) {
|
|
|
// Delay sending the database queries to the server, so that if there is a rapid
|
|
// Delay sending the database queries to the server, so that if there is a rapid
|
|
|
// succession of calls to this function, all resultant queries would be inserted
|
|
// succession of calls to this function, all resultant queries would be inserted
|
|
|
- // into a command pipeline with a single system call.
|
|
|
|
|
|
|
+ // into a command pipeline with a smaller number of system calls.
|
|
|
if (!pool->process_queries) {
|
|
if (!pool->process_queries) {
|
|
|
task_message_t * const msg = h2o_mem_alloc(sizeof(*msg));
|
|
task_message_t * const msg = h2o_mem_alloc(sizeof(*msg));
|
|
|
|
|
|