|
@@ -37,7 +37,6 @@ typedef struct {
|
|
list_t l;
|
|
list_t l;
|
|
PGconn *conn;
|
|
PGconn *conn;
|
|
thread_context_t *ctx;
|
|
thread_context_t *ctx;
|
|
- void (*on_write_ready)(void *);
|
|
|
|
db_query_param_t *param;
|
|
db_query_param_t *param;
|
|
h2o_socket_t *sock;
|
|
h2o_socket_t *sock;
|
|
size_t prep_stmt_idx;
|
|
size_t prep_stmt_idx;
|
|
@@ -53,12 +52,10 @@ static void on_database_connect_timeout(h2o_timeout_entry_t *entry);
|
|
static void on_database_error(db_conn_t *db_conn, const char *error_string);
|
|
static void on_database_error(db_conn_t *db_conn, const char *error_string);
|
|
static void on_database_read_ready(h2o_socket_t *db_sock, const char *err);
|
|
static void on_database_read_ready(h2o_socket_t *db_sock, const char *err);
|
|
static void on_database_timeout(h2o_timeout_entry_t *entry);
|
|
static void on_database_timeout(h2o_timeout_entry_t *entry);
|
|
-static void on_database_write_ready(void *data);
|
|
|
|
|
|
+static void on_database_write_ready(h2o_socket_t *db_sock, const char *err);
|
|
static void poll_database_connection(h2o_socket_t *db_sock, const char *err);
|
|
static void poll_database_connection(h2o_socket_t *db_sock, const char *err);
|
|
static void process_query(db_conn_t *db_conn);
|
|
static void process_query(db_conn_t *db_conn);
|
|
static void start_database_connect(thread_context_t *ctx, db_conn_t *db_conn);
|
|
static void start_database_connect(thread_context_t *ctx, db_conn_t *db_conn);
|
|
-static int start_database_write_polling(db_conn_t *db_conn);
|
|
|
|
-static void stop_database_write_polling(db_conn_t *db_conn);
|
|
|
|
|
|
|
|
static const struct {
|
|
static const struct {
|
|
const char *name;
|
|
const char *name;
|
|
@@ -80,10 +77,10 @@ static int do_database_write(db_conn_t *db_conn)
|
|
ERROR(PQerrorMessage(db_conn->conn));
|
|
ERROR(PQerrorMessage(db_conn->conn));
|
|
on_database_error(db_conn, DB_ERROR);
|
|
on_database_error(db_conn, DB_ERROR);
|
|
}
|
|
}
|
|
- else if (start_database_write_polling(db_conn))
|
|
|
|
- on_database_error(db_conn, DB_ERROR);
|
|
|
|
- else
|
|
|
|
|
|
+ else {
|
|
|
|
+ h2o_socket_notify_write(db_conn->sock, on_database_write_ready);
|
|
ret = 0;
|
|
ret = 0;
|
|
|
|
+ }
|
|
|
|
|
|
return ret;
|
|
return ret;
|
|
}
|
|
}
|
|
@@ -109,7 +106,7 @@ static void do_execute_query(db_conn_t *db_conn)
|
|
&db_conn->ctx->db_state.h2o_timeout,
|
|
&db_conn->ctx->db_state.h2o_timeout,
|
|
&db_conn->h2o_timeout_entry);
|
|
&db_conn->h2o_timeout_entry);
|
|
h2o_socket_read_start(db_conn->sock, on_database_read_ready);
|
|
h2o_socket_read_start(db_conn->sock, on_database_read_ready);
|
|
- on_database_write_ready(&db_conn->on_write_ready);
|
|
|
|
|
|
+ on_database_write_ready(db_conn->sock, NULL);
|
|
}
|
|
}
|
|
else {
|
|
else {
|
|
ERROR(PQerrorMessage(db_conn->conn));
|
|
ERROR(PQerrorMessage(db_conn->conn));
|
|
@@ -192,10 +189,8 @@ static void on_database_read_ready(h2o_socket_t *db_sock, const char *err)
|
|
if (PQconsumeInput(db_conn->conn)) {
|
|
if (PQconsumeInput(db_conn->conn)) {
|
|
const int send_status = PQflush(db_conn->conn);
|
|
const int send_status = PQflush(db_conn->conn);
|
|
|
|
|
|
- if (send_status > 0 && start_database_write_polling(db_conn)) {
|
|
|
|
- on_database_error(db_conn, DB_ERROR);
|
|
|
|
- return;
|
|
|
|
- }
|
|
|
|
|
|
+ if (send_status > 0)
|
|
|
|
+ h2o_socket_notify_write(db_conn->sock, on_database_write_ready);
|
|
|
|
|
|
if (send_status >= 0) {
|
|
if (send_status >= 0) {
|
|
while (!PQisBusy(db_conn->conn)) {
|
|
while (!PQisBusy(db_conn->conn)) {
|
|
@@ -257,26 +252,32 @@ static void on_database_timeout(h2o_timeout_entry_t *entry)
|
|
start_database_connect(db_conn->ctx, db_conn);
|
|
start_database_connect(db_conn->ctx, db_conn);
|
|
}
|
|
}
|
|
|
|
|
|
-static void on_database_write_ready(void *data)
|
|
|
|
|
|
+static void on_database_write_ready(h2o_socket_t *db_sock, const char *err)
|
|
{
|
|
{
|
|
- db_conn_t * const db_conn = H2O_STRUCT_FROM_MEMBER(db_conn_t, on_write_ready, data);
|
|
|
|
|
|
+ db_conn_t * const db_conn = db_sock->data;
|
|
|
|
|
|
- if (db_conn->prep_stmt_idx) {
|
|
|
|
- const int send_status = PQflush(db_conn->conn);
|
|
|
|
|
|
+ if (err) {
|
|
|
|
+ ERROR(err);
|
|
|
|
+ on_database_error(db_conn, DB_ERROR);
|
|
|
|
+ }
|
|
|
|
+ else {
|
|
|
|
+ if (db_conn->prep_stmt_idx) {
|
|
|
|
+ const int send_status = PQflush(db_conn->conn);
|
|
|
|
|
|
- if (!send_status) {
|
|
|
|
- if (db_conn->flags & IS_WRITING && db_conn->param)
|
|
|
|
- do_database_write(db_conn);
|
|
|
|
- }
|
|
|
|
- else if (send_status < 0) {
|
|
|
|
- LIBRARY_ERROR("PQflush", PQerrorMessage(db_conn->conn));
|
|
|
|
- on_database_error(db_conn, DB_ERROR);
|
|
|
|
|
|
+ if (!send_status) {
|
|
|
|
+ if (db_conn->flags & IS_WRITING && db_conn->param)
|
|
|
|
+ do_database_write(db_conn);
|
|
|
|
+ }
|
|
|
|
+ else if (send_status < 0) {
|
|
|
|
+ LIBRARY_ERROR("PQflush", PQerrorMessage(db_conn->conn));
|
|
|
|
+ on_database_error(db_conn, DB_ERROR);
|
|
|
|
+ }
|
|
|
|
+ else
|
|
|
|
+ h2o_socket_notify_write(db_conn->sock, on_database_write_ready);
|
|
}
|
|
}
|
|
- else if (send_status > 0 && start_database_write_polling(db_conn))
|
|
|
|
- on_database_error(db_conn, DB_ERROR);
|
|
|
|
|
|
+ else
|
|
|
|
+ poll_database_connection(db_conn->sock, NULL);
|
|
}
|
|
}
|
|
- else
|
|
|
|
- poll_database_connection(db_conn->sock, NULL);
|
|
|
|
}
|
|
}
|
|
|
|
|
|
static void poll_database_connection(h2o_socket_t *db_sock, const char *err)
|
|
static void poll_database_connection(h2o_socket_t *db_sock, const char *err)
|
|
@@ -292,9 +293,7 @@ static void poll_database_connection(h2o_socket_t *db_sock, const char *err)
|
|
|
|
|
|
switch (status) {
|
|
switch (status) {
|
|
case PGRES_POLLING_WRITING:
|
|
case PGRES_POLLING_WRITING:
|
|
- if (start_database_write_polling(db_conn))
|
|
|
|
- break;
|
|
|
|
-
|
|
|
|
|
|
+ h2o_socket_notify_write(db_conn->sock, on_database_write_ready);
|
|
return;
|
|
return;
|
|
case PGRES_POLLING_OK:
|
|
case PGRES_POLLING_OK:
|
|
if (PQsetnonblocking(db_conn->conn, 1)) {
|
|
if (PQsetnonblocking(db_conn->conn, 1)) {
|
|
@@ -330,7 +329,7 @@ static void process_query(db_conn_t *db_conn)
|
|
&db_conn->ctx->db_state.h2o_timeout,
|
|
&db_conn->ctx->db_state.h2o_timeout,
|
|
&db_conn->h2o_timeout_entry);
|
|
&db_conn->h2o_timeout_entry);
|
|
h2o_socket_read_start(db_conn->sock, on_database_read_ready);
|
|
h2o_socket_read_start(db_conn->sock, on_database_read_ready);
|
|
- on_database_write_ready(&db_conn->on_write_ready);
|
|
|
|
|
|
+ on_database_write_ready(db_conn->sock, NULL);
|
|
}
|
|
}
|
|
else {
|
|
else {
|
|
LIBRARY_ERROR("PQsendPrepare", PQerrorMessage(db_conn->conn));
|
|
LIBRARY_ERROR("PQsendPrepare", PQerrorMessage(db_conn->conn));
|
|
@@ -364,7 +363,6 @@ static void start_database_connect(thread_context_t *ctx, db_conn_t *db_conn)
|
|
db_conn->prep_stmt_idx = 0;
|
|
db_conn->prep_stmt_idx = 0;
|
|
db_conn->flags = IS_RESETTING;
|
|
db_conn->flags = IS_RESETTING;
|
|
h2o_timeout_unlink(&db_conn->h2o_timeout_entry);
|
|
h2o_timeout_unlink(&db_conn->h2o_timeout_entry);
|
|
- stop_database_write_polling(db_conn);
|
|
|
|
h2o_socket_read_stop(db_conn->sock);
|
|
h2o_socket_read_stop(db_conn->sock);
|
|
h2o_socket_close(db_conn->sock);
|
|
h2o_socket_close(db_conn->sock);
|
|
|
|
|
|
@@ -424,13 +422,8 @@ static void start_database_connect(thread_context_t *ctx, db_conn_t *db_conn)
|
|
&ctx->db_state.h2o_timeout,
|
|
&ctx->db_state.h2o_timeout,
|
|
&db_conn->h2o_timeout_entry);
|
|
&db_conn->h2o_timeout_entry);
|
|
h2o_socket_read_start(db_conn->sock, poll_database_connection);
|
|
h2o_socket_read_start(db_conn->sock, poll_database_connection);
|
|
-
|
|
|
|
- if (!start_database_write_polling(db_conn))
|
|
|
|
- return;
|
|
|
|
-
|
|
|
|
- h2o_socket_read_stop(db_conn->sock);
|
|
|
|
- h2o_timeout_unlink(&db_conn->h2o_timeout_entry);
|
|
|
|
- h2o_socket_close(db_conn->sock);
|
|
|
|
|
|
+ h2o_socket_notify_write(db_conn->sock, on_database_write_ready);
|
|
|
|
+ return;
|
|
}
|
|
}
|
|
else {
|
|
else {
|
|
errno = ENOMEM;
|
|
errno = ENOMEM;
|
|
@@ -446,23 +439,6 @@ error:
|
|
error_notification(ctx, false, DB_ERROR);
|
|
error_notification(ctx, false, DB_ERROR);
|
|
}
|
|
}
|
|
|
|
|
|
-static int start_database_write_polling(db_conn_t *db_conn)
|
|
|
|
-{
|
|
|
|
- const bool rearm = !!db_conn->on_write_ready;
|
|
|
|
-
|
|
|
|
- db_conn->on_write_ready = on_database_write_ready;
|
|
|
|
- return start_write_polling(PQsocket(db_conn->conn),
|
|
|
|
- &db_conn->on_write_ready,
|
|
|
|
- rearm,
|
|
|
|
- &db_conn->ctx->event_loop);
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
-static void stop_database_write_polling(db_conn_t *db_conn)
|
|
|
|
-{
|
|
|
|
- db_conn->on_write_ready = NULL;
|
|
|
|
- stop_write_polling(PQsocket(db_conn->conn), &db_conn->ctx->event_loop);
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
void connect_to_database(thread_context_t *ctx)
|
|
void connect_to_database(thread_context_t *ctx)
|
|
{
|
|
{
|
|
for (size_t i = ctx->config->max_db_conn_num - ctx->db_state.db_conn_num; i > 0; i--)
|
|
for (size_t i = ctx->config->max_db_conn_num - ctx->db_state.db_conn_num; i > 0; i--)
|