|
@@ -45,7 +45,7 @@ typedef struct {
|
|
} db_conn_t;
|
|
} db_conn_t;
|
|
|
|
|
|
static int do_database_write(db_conn_t *db_conn);
|
|
static int do_database_write(db_conn_t *db_conn);
|
|
-static void do_execute_query(db_conn_t *db_conn);
|
|
|
|
|
|
+static int do_execute_query(db_conn_t *db_conn, bool direct_notification);
|
|
static void error_notification(thread_context_t *ctx, bool timeout, const char *error_string);
|
|
static void error_notification(thread_context_t *ctx, bool timeout, const char *error_string);
|
|
static void on_database_connect_error(db_conn_t *db_conn, bool timeout, const char *error_string);
|
|
static void on_database_connect_error(db_conn_t *db_conn, bool timeout, const char *error_string);
|
|
static void on_database_connect_timeout(h2o_timeout_entry_t *entry);
|
|
static void on_database_connect_timeout(h2o_timeout_entry_t *entry);
|
|
@@ -85,7 +85,7 @@ static int do_database_write(db_conn_t *db_conn)
|
|
return ret;
|
|
return ret;
|
|
}
|
|
}
|
|
|
|
|
|
-static void do_execute_query(db_conn_t *db_conn)
|
|
|
|
|
|
+static int do_execute_query(db_conn_t *db_conn, bool direct_notification)
|
|
{
|
|
{
|
|
const int ec = db_conn->param->flags & IS_PREPARED ?
|
|
const int ec = db_conn->param->flags & IS_PREPARED ?
|
|
PQsendQueryPrepared(db_conn->conn,
|
|
PQsendQueryPrepared(db_conn->conn,
|
|
@@ -96,6 +96,7 @@ static void do_execute_query(db_conn_t *db_conn)
|
|
db_conn->param->paramFormats,
|
|
db_conn->param->paramFormats,
|
|
db_conn->param->resultFormat) :
|
|
db_conn->param->resultFormat) :
|
|
PQsendQuery(db_conn->conn, db_conn->param->command);
|
|
PQsendQuery(db_conn->conn, db_conn->param->command);
|
|
|
|
+ int ret = EXIT_FAILURE;
|
|
|
|
|
|
if (ec) {
|
|
if (ec) {
|
|
if (db_conn->param->flags & IS_SINGLE_ROW)
|
|
if (db_conn->param->flags & IS_SINGLE_ROW)
|
|
@@ -106,12 +107,32 @@ static void do_execute_query(db_conn_t *db_conn)
|
|
&db_conn->ctx->db_state.h2o_timeout,
|
|
&db_conn->ctx->db_state.h2o_timeout,
|
|
&db_conn->h2o_timeout_entry);
|
|
&db_conn->h2o_timeout_entry);
|
|
h2o_socket_read_start(db_conn->sock, on_database_read_ready);
|
|
h2o_socket_read_start(db_conn->sock, on_database_read_ready);
|
|
- on_database_write_ready(db_conn->sock, NULL);
|
|
|
|
|
|
+
|
|
|
|
+ const int send_status = PQflush(db_conn->conn);
|
|
|
|
+
|
|
|
|
+ if (send_status < 0) {
|
|
|
|
+ if (direct_notification)
|
|
|
|
+ db_conn->param = NULL;
|
|
|
|
+
|
|
|
|
+ LIBRARY_ERROR("PQflush", PQerrorMessage(db_conn->conn));
|
|
|
|
+ on_database_error(db_conn, DB_ERROR);
|
|
|
|
+ }
|
|
|
|
+ else {
|
|
|
|
+ ret = EXIT_SUCCESS;
|
|
|
|
+
|
|
|
|
+ if (send_status)
|
|
|
|
+ h2o_socket_notify_write(db_conn->sock, on_database_write_ready);
|
|
|
|
+ }
|
|
}
|
|
}
|
|
else {
|
|
else {
|
|
|
|
+ if (direct_notification)
|
|
|
|
+ db_conn->param = NULL;
|
|
|
|
+
|
|
ERROR(PQerrorMessage(db_conn->conn));
|
|
ERROR(PQerrorMessage(db_conn->conn));
|
|
on_database_error(db_conn, DB_ERROR);
|
|
on_database_error(db_conn, DB_ERROR);
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ return ret;
|
|
}
|
|
}
|
|
|
|
|
|
static void error_notification(thread_context_t *ctx, bool timeout, const char *error_string)
|
|
static void error_notification(thread_context_t *ctx, bool timeout, const char *error_string)
|
|
@@ -348,7 +369,7 @@ static void process_query(db_conn_t *db_conn)
|
|
l,
|
|
l,
|
|
db_conn->ctx->db_state.queries.head);
|
|
db_conn->ctx->db_state.queries.head);
|
|
db_conn->ctx->db_state.queries.head = db_conn->ctx->db_state.queries.head->next;
|
|
db_conn->ctx->db_state.queries.head = db_conn->ctx->db_state.queries.head->next;
|
|
- do_execute_query(db_conn);
|
|
|
|
|
|
+ do_execute_query(db_conn, false);
|
|
}
|
|
}
|
|
else {
|
|
else {
|
|
db_conn->l.next = db_conn->ctx->db_state.db_conn;
|
|
db_conn->l.next = db_conn->ctx->db_state.db_conn;
|
|
@@ -446,7 +467,7 @@ void connect_to_database(thread_context_t *ctx)
|
|
|
|
|
|
int execute_query(thread_context_t *ctx, db_query_param_t *param)
|
|
int execute_query(thread_context_t *ctx, db_query_param_t *param)
|
|
{
|
|
{
|
|
- int ret = EXIT_SUCCESS;
|
|
|
|
|
|
+ int ret = EXIT_FAILURE;
|
|
|
|
|
|
if (ctx->db_state.free_db_conn_num) {
|
|
if (ctx->db_state.free_db_conn_num) {
|
|
db_conn_t * const db_conn = H2O_STRUCT_FROM_MEMBER(db_conn_t, l, ctx->db_state.db_conn);
|
|
db_conn_t * const db_conn = H2O_STRUCT_FROM_MEMBER(db_conn_t, l, ctx->db_state.db_conn);
|
|
@@ -454,19 +475,20 @@ int execute_query(thread_context_t *ctx, db_query_param_t *param)
|
|
ctx->db_state.db_conn = db_conn->l.next;
|
|
ctx->db_state.db_conn = db_conn->l.next;
|
|
ctx->db_state.free_db_conn_num--;
|
|
ctx->db_state.free_db_conn_num--;
|
|
db_conn->param = param;
|
|
db_conn->param = param;
|
|
- do_execute_query(db_conn);
|
|
|
|
|
|
+ ret = do_execute_query(db_conn, true);
|
|
}
|
|
}
|
|
else if (ctx->db_state.query_num < ctx->config->max_query_num) {
|
|
else if (ctx->db_state.query_num < ctx->config->max_query_num) {
|
|
- param->l.next = NULL;
|
|
|
|
- *ctx->db_state.queries.tail = ¶m->l;
|
|
|
|
- ctx->db_state.queries.tail = ¶m->l.next;
|
|
|
|
- ctx->db_state.query_num++;
|
|
|
|
-
|
|
|
|
if (ctx->db_state.db_conn_num < ctx->config->max_db_conn_num)
|
|
if (ctx->db_state.db_conn_num < ctx->config->max_db_conn_num)
|
|
start_database_connect(ctx, NULL);
|
|
start_database_connect(ctx, NULL);
|
|
|
|
+
|
|
|
|
+ if (ctx->db_state.db_conn_num) {
|
|
|
|
+ param->l.next = NULL;
|
|
|
|
+ *ctx->db_state.queries.tail = ¶m->l;
|
|
|
|
+ ctx->db_state.queries.tail = ¶m->l.next;
|
|
|
|
+ ctx->db_state.query_num++;
|
|
|
|
+ ret = EXIT_SUCCESS;
|
|
|
|
+ }
|
|
}
|
|
}
|
|
- else
|
|
|
|
- ret = EXIT_FAILURE;
|
|
|
|
|
|
|
|
return ret;
|
|
return ret;
|
|
}
|
|
}
|