|
@@ -30,6 +30,7 @@
|
|
|
|
|
|
#include "database.h"
|
|
|
#include "error.h"
|
|
|
+#include "event_loop.h"
|
|
|
#include "global_data.h"
|
|
|
#include "list.h"
|
|
|
|
|
@@ -37,9 +38,8 @@
|
|
|
|
|
|
// Database connection state
|
|
|
#define EXPECT_SYNC 1
|
|
|
-#define IDLE 2
|
|
|
-#define IGNORE_RESULT 4
|
|
|
-#define RESET 8
|
|
|
+#define IGNORE_RESULT 2
|
|
|
+#define RESET 4
|
|
|
|
|
|
typedef struct {
|
|
|
list_t l;
|
|
@@ -61,8 +61,9 @@ typedef struct {
|
|
|
} prepared_statement_t;
|
|
|
|
|
|
static h2o_socket_t *create_socket(int sd, h2o_loop_t *loop);
|
|
|
-static int do_execute_query(db_conn_t *conn, db_query_param_t *param, bool flush);
|
|
|
+static int do_execute_query(db_conn_t *conn, db_query_param_t *param);
|
|
|
static void error_notification(db_conn_pool_t *pool, bool timeout, const char *error_string);
|
|
|
+static int flush_connection(h2o_socket_cb cb, db_conn_t *conn);
|
|
|
static void on_database_connect_error(db_conn_t *conn, bool timeout, const char *error_string);
|
|
|
static void on_database_connect_read_ready(h2o_socket_t *sock, const char *err);
|
|
|
static void on_database_connect_timeout(h2o_timer_t *timer);
|
|
@@ -71,9 +72,11 @@ static void on_database_error(db_conn_t *conn, const char *error_string);
|
|
|
static void on_database_read_ready(h2o_socket_t *sock, const char *err);
|
|
|
static void on_database_timeout(h2o_timer_t *timer);
|
|
|
static void on_database_write_ready(h2o_socket_t *sock, const char *err);
|
|
|
+static void on_process_queries(void *arg);
|
|
|
static void poll_database_connection(h2o_socket_t *sock, const char *err);
|
|
|
static void prepare_statements(db_conn_t *conn);
|
|
|
-static void process_queries(db_conn_t *conn);
|
|
|
+static void process_queries(db_conn_t *conn, bool removed);
|
|
|
+static void remove_connection(db_conn_t *conn);
|
|
|
static void start_database_connect(db_conn_pool_t *pool, db_conn_t *conn);
|
|
|
|
|
|
static h2o_socket_t *create_socket(int sd, h2o_loop_t *loop)
|
|
@@ -104,7 +107,7 @@ static h2o_socket_t *create_socket(int sd, h2o_loop_t *loop)
|
|
|
return ret;
|
|
|
}
|
|
|
|
|
|
-static int do_execute_query(db_conn_t *conn, db_query_param_t *param, bool flush)
|
|
|
+static int do_execute_query(db_conn_t *conn, db_query_param_t *param)
|
|
|
{
|
|
|
assert(conn->query_num);
|
|
|
assert((conn->queries.head && conn->query_num < conn->pool->config->max_pipeline_query_num) ||
|
|
@@ -137,17 +140,6 @@ static int do_execute_query(db_conn_t *conn, db_query_param_t *param, bool flush
|
|
|
return 1;
|
|
|
}
|
|
|
|
|
|
- if (flush) {
|
|
|
- const int send_status = PQflush(conn->conn);
|
|
|
-
|
|
|
- if (send_status < 0) {
|
|
|
- LIBRARY_ERROR("PQflush", PQerrorMessage(conn->conn));
|
|
|
- return 1;
|
|
|
- }
|
|
|
- else if (send_status)
|
|
|
- h2o_socket_notify_write(conn->sock, on_database_write_ready);
|
|
|
- }
|
|
|
-
|
|
|
if (!conn->queries.head && !(conn->flags & (EXPECT_SYNC | IGNORE_RESULT))) {
|
|
|
assert(!h2o_timer_is_linked(&conn->timer));
|
|
|
conn->timer.cb = on_database_timeout;
|
|
@@ -158,7 +150,6 @@ static int do_execute_query(db_conn_t *conn, db_query_param_t *param, bool flush
|
|
|
*conn->queries.tail = ¶m->l;
|
|
|
conn->queries.tail = ¶m->l.next;
|
|
|
conn->query_num--;
|
|
|
- conn->flags &= ~IDLE;
|
|
|
return 0;
|
|
|
}
|
|
|
|
|
@@ -189,6 +180,18 @@ static void error_notification(db_conn_pool_t *pool, bool timeout, const char *e
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+static int flush_connection(h2o_socket_cb cb, db_conn_t *conn)
|
|
|
+{
|
|
|
+ const int send_status = PQflush(conn->conn);
|
|
|
+
|
|
|
+ if (send_status < 0)
|
|
|
+ LIBRARY_ERROR("PQflush", PQerrorMessage(conn->conn));
|
|
|
+ else if (send_status)
|
|
|
+ h2o_socket_notify_write(conn->sock, cb);
|
|
|
+
|
|
|
+ return send_status < 0;
|
|
|
+}
|
|
|
+
|
|
|
static void on_database_connect_error(db_conn_t *conn, bool timeout, const char *error_string)
|
|
|
{
|
|
|
db_conn_pool_t * const pool = conn->pool;
|
|
@@ -217,17 +220,10 @@ static void on_database_connect_read_ready(h2o_socket_t *sock, const char *err)
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- const int send_status = PQflush(conn->conn);
|
|
|
-
|
|
|
- if (send_status < 0) {
|
|
|
- LIBRARY_ERROR("PQflush", PQerrorMessage(conn->conn));
|
|
|
+ if (flush_connection(on_database_connect_write_ready, conn)) {
|
|
|
on_database_connect_error(conn, false, DB_ERROR);
|
|
|
return;
|
|
|
}
|
|
|
- else if (send_status) {
|
|
|
- h2o_socket_notify_write(conn->sock, on_database_connect_write_ready);
|
|
|
- return;
|
|
|
- }
|
|
|
|
|
|
while (!PQisBusy(conn->conn)) {
|
|
|
PGresult * const result = PQgetResult(conn->conn);
|
|
@@ -241,7 +237,7 @@ static void on_database_connect_read_ready(h2o_socket_t *sock, const char *err)
|
|
|
h2o_timer_unlink(&conn->timer);
|
|
|
h2o_socket_read_stop(conn->sock);
|
|
|
h2o_socket_read_start(conn->sock, on_database_read_ready);
|
|
|
- process_queries(conn);
|
|
|
+ process_queries(conn, true);
|
|
|
return;
|
|
|
default:
|
|
|
LIBRARY_ERROR("PQresultStatus", PQresultErrorMessage(result));
|
|
@@ -271,20 +267,14 @@ static void on_database_connect_write_ready(h2o_socket_t *sock, const char *err)
|
|
|
ERROR(err);
|
|
|
on_database_connect_error(conn, false, err);
|
|
|
}
|
|
|
- else {
|
|
|
- const int send_status = PQflush(conn->conn);
|
|
|
-
|
|
|
- if (send_status < 0) {
|
|
|
- LIBRARY_ERROR("PQflush", PQerrorMessage(conn->conn));
|
|
|
- on_database_connect_error(conn, false, DB_ERROR);
|
|
|
- }
|
|
|
- else if (send_status)
|
|
|
- h2o_socket_notify_write(conn->sock, on_database_connect_write_ready);
|
|
|
- }
|
|
|
+ else if (flush_connection(on_database_connect_write_ready, conn))
|
|
|
+ on_database_connect_error(conn, false, DB_ERROR);
|
|
|
}
|
|
|
|
|
|
static void on_database_error(db_conn_t *conn, const char *error_string)
|
|
|
{
|
|
|
+ remove_connection(conn);
|
|
|
+
|
|
|
if (conn->queries.head)
|
|
|
do {
|
|
|
db_query_param_t * const param = H2O_STRUCT_FROM_MEMBER(db_query_param_t,
|
|
@@ -315,17 +305,12 @@ static void on_database_read_ready(h2o_socket_t *sock, const char *err)
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- const int send_status = PQflush(conn->conn);
|
|
|
-
|
|
|
- if (send_status < 0) {
|
|
|
- LIBRARY_ERROR("PQflush", PQerrorMessage(conn->conn));
|
|
|
+ if (flush_connection(on_database_write_ready, conn)) {
|
|
|
on_database_error(conn, DB_ERROR);
|
|
|
return;
|
|
|
}
|
|
|
- else if (send_status) {
|
|
|
- h2o_socket_notify_write(conn->sock, on_database_write_ready);
|
|
|
- return;
|
|
|
- }
|
|
|
+
|
|
|
+ const bool removed = !conn->query_num;
|
|
|
|
|
|
while (!PQisBusy(conn->conn)) {
|
|
|
PGresult * const result = PQgetResult(conn->conn);
|
|
@@ -385,7 +370,7 @@ static void on_database_read_ready(h2o_socket_t *sock, const char *err)
|
|
|
for (PGnotify *notify = PQnotifies(conn->conn); notify; notify = PQnotifies(conn->conn))
|
|
|
PQfreemem(notify);
|
|
|
|
|
|
- process_queries(conn);
|
|
|
+ process_queries(conn, removed);
|
|
|
}
|
|
|
|
|
|
static void on_database_timeout(h2o_timer_t *timer)
|
|
@@ -414,18 +399,26 @@ static void on_database_write_ready(h2o_socket_t *sock, const char *err)
|
|
|
ERROR(err);
|
|
|
on_database_error(conn, err);
|
|
|
}
|
|
|
- else {
|
|
|
- const int send_status = PQflush(conn->conn);
|
|
|
+ else if (flush_connection(on_database_write_ready, conn))
|
|
|
+ on_database_error(conn, DB_ERROR);
|
|
|
+}
|
|
|
|
|
|
- if (send_status < 0) {
|
|
|
- LIBRARY_ERROR("PQflush", PQerrorMessage(conn->conn));
|
|
|
- on_database_error(conn, DB_ERROR);
|
|
|
- }
|
|
|
- else if (send_status)
|
|
|
- h2o_socket_notify_write(conn->sock, on_database_write_ready);
|
|
|
- else
|
|
|
- process_queries(conn);
|
|
|
+static void on_process_queries(void *arg)
|
|
|
+{
|
|
|
+ db_conn_pool_t * const pool = arg;
|
|
|
+
|
|
|
+ while (pool->queries.head && pool->conn) {
|
|
|
+ db_conn_t * const conn = H2O_STRUCT_FROM_MEMBER(db_conn_t, l, pool->conn);
|
|
|
+
|
|
|
+ pool->conn = conn->l.next;
|
|
|
+ assert(conn->query_num);
|
|
|
+ process_queries(conn, true);
|
|
|
}
|
|
|
+
|
|
|
+ if (pool->queries.head && pool->conn_num)
|
|
|
+ start_database_connect(pool, NULL);
|
|
|
+
|
|
|
+ pool->process_queries = false;
|
|
|
}
|
|
|
|
|
|
static void poll_database_connection(h2o_socket_t *sock, const char *err)
|
|
@@ -543,11 +536,11 @@ static void prepare_statements(db_conn_t *conn)
|
|
|
}
|
|
|
else {
|
|
|
h2o_socket_read_start(conn->sock, on_database_read_ready);
|
|
|
- process_queries(conn);
|
|
|
+ process_queries(conn, true);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-static void process_queries(db_conn_t *conn)
|
|
|
+static void process_queries(db_conn_t *conn, bool removed)
|
|
|
{
|
|
|
const bool flush = conn->query_num && conn->pool->queries.head;
|
|
|
|
|
@@ -563,33 +556,34 @@ static void process_queries(db_conn_t *conn)
|
|
|
|
|
|
conn->pool->queries.head = param->l.next;
|
|
|
|
|
|
- if (do_execute_query(conn, param, false)) {
|
|
|
+ if (do_execute_query(conn, param)) {
|
|
|
param->on_error(param, DB_ERROR);
|
|
|
-
|
|
|
- if (PQstatus(conn->conn) != CONNECTION_OK) {
|
|
|
- on_database_error(conn, DB_ERROR);
|
|
|
- return;
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- if (flush) {
|
|
|
- const int send_status = PQflush(conn->conn);
|
|
|
-
|
|
|
- if (send_status < 0) {
|
|
|
- LIBRARY_ERROR("PQflush", PQerrorMessage(conn->conn));
|
|
|
on_database_error(conn, DB_ERROR);
|
|
|
return;
|
|
|
}
|
|
|
- else if (send_status)
|
|
|
- h2o_socket_notify_write(conn->sock, on_database_write_ready);
|
|
|
}
|
|
|
|
|
|
- if (!conn->queries.head && !(conn->flags & (EXPECT_SYNC | IDLE | IGNORE_RESULT))) {
|
|
|
+ if (flush && flush_connection(on_database_write_ready, conn))
|
|
|
+ on_database_error(conn, DB_ERROR);
|
|
|
+ else if (conn->query_num && removed) {
|
|
|
conn->l.next = conn->pool->conn;
|
|
|
conn->pool->conn = &conn->l;
|
|
|
- conn->flags |= IDLE;
|
|
|
}
|
|
|
+ else if (!conn->query_num && !removed)
|
|
|
+ // This call should not be problematic, assuming a relatively low number of connections.
|
|
|
+ remove_connection(conn);
|
|
|
+}
|
|
|
+
|
|
|
+static void remove_connection(db_conn_t *conn)
|
|
|
+{
|
|
|
+ list_t *iter = conn->pool->conn;
|
|
|
+ list_t **prev = &conn->pool->conn;
|
|
|
+
|
|
|
+ for (; iter && iter != &conn->l; iter = iter->next)
|
|
|
+ prev = &iter->next;
|
|
|
+
|
|
|
+ if (iter)
|
|
|
+ *prev = iter->next;
|
|
|
}
|
|
|
|
|
|
static void start_database_connect(db_conn_pool_t *pool, db_conn_t *conn)
|
|
@@ -666,34 +660,37 @@ int execute_database_query(db_conn_pool_t *pool, db_query_param_t *param)
|
|
|
{
|
|
|
int ret = 1;
|
|
|
|
|
|
- if (pool->conn) {
|
|
|
- db_conn_t * const conn = H2O_STRUCT_FROM_MEMBER(db_conn_t, l, pool->conn);
|
|
|
+ if (pool->query_num) {
|
|
|
+ if (pool->conn) {
|
|
|
+ // Delay sending the database queries to the server, so that if there is a rapid
|
|
|
+ // succession of calls to this function, all resultant queries would be inserted
|
|
|
+ // into a command pipeline with a smaller number of system calls.
|
|
|
+ if (!pool->process_queries) {
|
|
|
+ task_message_t * const msg = h2o_mem_alloc(sizeof(*msg));
|
|
|
+
|
|
|
+ memset(msg, 0, sizeof(*msg));
|
|
|
+ msg->arg = pool;
|
|
|
+ msg->super.type = TASK;
|
|
|
+ msg->task = on_process_queries;
|
|
|
+ send_local_message(&msg->super, pool->local_messages);
|
|
|
+ pool->process_queries = true;
|
|
|
+ }
|
|
|
|
|
|
- assert(!conn->queries.head);
|
|
|
- assert(conn->flags & IDLE);
|
|
|
- assert(!(conn->flags & (EXPECT_SYNC | IGNORE_RESULT)));
|
|
|
- pool->conn = conn->l.next;
|
|
|
- ret = do_execute_query(conn, param, true);
|
|
|
+ ret = 0;
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ if (pool->conn_num)
|
|
|
+ start_database_connect(pool, NULL);
|
|
|
|
|
|
- if (ret) {
|
|
|
- if (PQstatus(conn->conn) == CONNECTION_OK) {
|
|
|
- conn->l.next = pool->conn;
|
|
|
- pool->conn = &conn->l;
|
|
|
- }
|
|
|
- else
|
|
|
- start_database_connect(conn->pool, conn);
|
|
|
+ if (pool->conn_num < pool->config->max_db_conn_num && pool->query_num)
|
|
|
+ ret = 0;
|
|
|
}
|
|
|
- }
|
|
|
- else if (pool->query_num) {
|
|
|
- if (pool->conn_num)
|
|
|
- start_database_connect(pool, NULL);
|
|
|
|
|
|
- if (pool->conn_num < pool->config->max_db_conn_num && pool->query_num) {
|
|
|
+ if (!ret) {
|
|
|
param->l.next = NULL;
|
|
|
*pool->queries.tail = ¶m->l;
|
|
|
pool->queries.tail = ¶m->l.next;
|
|
|
pool->query_num--;
|
|
|
- ret = 0;
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -713,7 +710,6 @@ void free_database_connection_pool(db_conn_pool_t *pool)
|
|
|
|
|
|
assert(!conn->queries.head);
|
|
|
assert(conn->query_num == pool->config->max_pipeline_query_num);
|
|
|
- assert(conn->flags & IDLE);
|
|
|
assert(!(conn->flags & (EXPECT_SYNC | IGNORE_RESULT | RESET)));
|
|
|
assert(!h2o_timer_is_linked(&conn->timer));
|
|
|
h2o_socket_read_stop(conn->sock);
|
|
@@ -731,11 +727,13 @@ void initialize_database_connection_pool(const char *conninfo,
|
|
|
const struct config_t *config,
|
|
|
const list_t *prepared_statements,
|
|
|
h2o_loop_t *loop,
|
|
|
+ h2o_linklist_t *local_messages,
|
|
|
db_conn_pool_t *pool)
|
|
|
{
|
|
|
memset(pool, 0, sizeof(*pool));
|
|
|
pool->config = config;
|
|
|
pool->conninfo = conninfo ? conninfo : "";
|
|
|
+ pool->local_messages = local_messages;
|
|
|
pool->loop = loop;
|
|
|
pool->prepared_statements = prepared_statements;
|
|
|
pool->queries.tail = &pool->queries.head;
|