2
0
Эх сурвалжийг харах

Merge pull request #1047 from jchavanton/db_postgres_upsert

db_postgress: insert_update() with DO UPDATE
Daniel-Constantin Mierla 8 жил өмнө
parent
commit
55bc7309bd

+ 1 - 0
src/modules/db_postgres/db_postgres.c

@@ -90,6 +90,7 @@ int db_postgres_bind_api(db_func_t *dbb)
 	dbb->raw_query        = db_postgres_raw_query;
 	dbb->free_result      = db_postgres_free_result;
 	dbb->insert           = db_postgres_insert;
+	dbb->insert_update    = db_postgres_insert_update;
 	dbb->delete           = db_postgres_delete; 
 	dbb->update           = db_postgres_update;
 	dbb->replace          = db_postgres_replace;

+ 2 - 0
src/modules/db_postgres/db_postgres.h

@@ -38,4 +38,6 @@ int pg_init_lock_set(int sz);
 
 void pg_destroy_lock_set(void);
 
+int pg_alloc_buffer(void);
+
 #endif /* _KM_DB_POSTGRES_H */

+ 216 - 0
src/modules/db_postgres/km_dbase.c

@@ -2,6 +2,7 @@
  * Copyright (C) 2003 August.Net Services, LLC
  * Copyright (C) 2006 Norman Brandinger
  * Copyright (C) 2008 1&1 Internet AG
+ * Copyright (C) 2017 Julien Chavanton, Flowroute
  *
  * This file is part of Kamailio, a free SIP server.
  *
@@ -40,6 +41,7 @@
 #include "../../lib/srdb1/db_query.h"
 #include "../../core/locking.h"
 #include "../../core/hashes.h"
+#include "../../core/clist.h"
 #include "km_dbase.h"
 #include "km_pg_con.h"
 #include "km_val.h"
@@ -49,6 +51,9 @@
 static gen_lock_set_t *_pg_lock_set = NULL;
 static unsigned int _pg_lock_size = 0;
 
+extern unsigned int sql_buffer_size;
+static char *postgres_sql_buf = NULL;
+
 /*!
  * \brief init lock set used to implement SQL REPLACE via UPDATE/INSERT
  * \param sz power of two to compute the lock set size 
@@ -82,6 +87,21 @@ void pg_destroy_lock_set(void)
 	}
 }
 
+int pg_alloc_buffer(void)
+{
+	if (postgres_sql_buf != NULL) {
+		LM_DBG("postgres_sql_buf not NULL on init\n");
+		return 0;
+	}
+	LM_DBG("About to allocate postgres_sql_buf size = %d\n", sql_buffer_size);
+	postgres_sql_buf = pkg_malloc(sql_buffer_size);
+	if (postgres_sql_buf == NULL) {
+		LM_ERR("failed to allocate postgres_sql_buf\n");
+		return -1;
+	}
+	return 1;
+}
+
 static void db_postgres_free_query(const db1_con_t* _con);
 
 
@@ -626,6 +646,202 @@ int db_postgres_delete(const db1_con_t* _h, const db_key_t* _k, const db_op_t* _
 	return ret;
 }
 
+static pg_constraint_t *pg_constraint = NULL;
+
+/*!
+ * \brief add/save a detected constraint to the list in memory
+ * \param pg_constraint_t constraint
+ */
+static void db_postgres_constraint_add(pg_constraint_t *c) {
+	if (!pg_constraint) {
+		pg_constraint = c;
+		LM_DBG("adding init constraint [%s][%s][%s]\n", c->database.s, c->table.s, c->unique.s);
+		clist_init(pg_constraint, next, prev);
+	} else {
+		LM_DBG("adding append constraint [%s][%s][%s]\n", c->database.s, c->table.s, c->unique.s);
+		clist_append(pg_constraint, c, next, prev);
+	}
+}
+
+static void db_postgres_constraint_destroy(pg_constraint_t *c) {
+	if (!c)
+		return;
+	if (c->database.s)
+		pkg_free(c->database.s);
+	if (c->table.s)
+		pkg_free(c->table.s);
+	if (c->unique.s)
+		pkg_free(c->unique.s);
+	pkg_free(c);
+	c = NULL;
+}
+
+static pg_constraint_t *db_postgres_constraint_new(const char *db, const str *table, const char *unique) {
+	pg_constraint_t *c = pkg_malloc(sizeof(pg_constraint_t));
+	if (!c)
+		return NULL;
+	memset(c, 0, sizeof(pg_constraint_t));
+
+	c->database.len = strlen(db);
+	c->database.s = pkg_malloc(c->database.len+1);
+	if (!c->database.s) goto error;
+	strcpy(c->database.s, db);
+
+	c->table.len = table->len;
+	c->table.s = pkg_malloc(c->table.len+1);
+	if (!c->table.s) goto error;
+	strcpy(c->table.s, table->s);
+
+	c->unique.len = strlen(unique);
+	c->unique.s = pkg_malloc(c->unique.len+1);
+	if (!c->unique.s) goto error;
+	strcpy(c->unique.s, unique);
+
+	db_postgres_constraint_add(c);
+	return c;
+error:
+	db_postgres_constraint_destroy(c);
+	return NULL;
+}
+
+static pg_constraint_t *db_postgres_constraint_search(char *db, char *table) {
+	pg_constraint_t *c;
+	if (!pg_constraint)
+		return NULL;
+	clist_foreach(pg_constraint, c, next){
+		LM_DBG("searching[%s][%s][%s]\n", c->database.s, c->table.s, c->unique.s);
+		if (strcmp(db, c->database.s) == 0 && strcmp(table, c->table.s) == 0) {
+			return c;
+		}
+	}
+	return NULL;
+}
+
+static str sql_str;
+
+/*!
+ * \brief search for saved contraint or query pg_constraint to get the unique constraint
+ * \param _h structure representing database connection
+ */
+static char * db_postgres_constraint_get(const db1_con_t* _h) {
+	pg_constraint_t *constraint = db_postgres_constraint_search(PQdb(CON_CONNECTION(_h)), CON_TABLE(_h)->s);
+	if (constraint) {
+		return constraint->unique.s;
+	}
+	db1_res_t *res = NULL;
+	int ret;
+	ret = snprintf(postgres_sql_buf, sql_buffer_size,
+		"select conname, contype from pg_constraint where conrelid = "
+		"(select oid from pg_class where relname like '%s%.*s%s')",
+		CON_TQUOTESZ(_h), CON_TABLE(_h)->len, CON_TABLE(_h)->s, CON_TQUOTESZ(_h));
+
+	if (ret < 0 || ret >= sql_buffer_size) {
+		LM_ERR("error creating pg_constraint query, invalid size[%d]\n", ret);
+		return NULL;
+	}
+
+	sql_str.len = ret;
+	sql_str.s = postgres_sql_buf;
+
+	if (db_postgres_raw_query(_h, &sql_str, &res) < 0) {
+		LM_ERR("error executing pg_constraint query !\n");
+		return NULL;
+	}
+
+	struct db_row* rows = RES_ROWS(res);
+	const char *val = NULL;
+	const char *type = NULL;
+	int x;
+	for (x=0;x<RES_ROW_N(res);x++) {
+		val = (ROW_VALUES(&rows[x])[0]).val.string_val;
+		type = (ROW_VALUES(&rows[x])[0]).val.string_val;
+		LM_DBG("name[%s]type[%s]\n", val, type);
+		if ( type[0] == 'u' )
+			break; // always favor unique constraint over primary key constraint
+	}
+	constraint = db_postgres_constraint_new( PQdb(CON_CONNECTION(_h)), CON_TABLE(_h), val);
+
+	db1_con_t* db_con = (db1_con_t*) _h;
+	if (res) {
+		db_postgres_free_result(db_con, res);
+	}
+	return constraint->unique.s;
+}
+
+/*!
+ * Insert a row into a specified table, update on duplicate key.
+ * \param _h structure representing database connection
+ * \param _k key names
+ * \param _v values of the keys
+ * \param _n number of key=value pairs
+ *
+ * As explained in the following article the design of "UPSERT" in PostgreSQL is requiring to be explicit about the constraint on which we accept to do update
+ * modification to Kamailio database framework/API would be required to expose which specific key constraint should be handled as an update
+ * http://pgeoghegan.blogspot.com/2015/10/avoid-naming-constraint-directly-when.html
+ */
+
+int db_postgres_insert_update(const db1_con_t* _h, const db_key_t* _k, const db_val_t* _v,
+	const int _n)
+{
+	int off, ret;
+
+	if ((!_h) || (!_k) || (!_v) || (!_n)) {
+		LM_ERR("invalid parameter value\n");
+		return -1;
+	}
+	char * constraint = db_postgres_constraint_get(_h);
+
+	ret = snprintf(postgres_sql_buf, sql_buffer_size, "insert into %s%.*s%s (",
+			CON_TQUOTESZ(_h), CON_TABLE(_h)->len, CON_TABLE(_h)->s, CON_TQUOTESZ(_h));
+	if (ret < 0 || ret >= sql_buffer_size) goto error;
+	off = ret;
+
+	ret = db_print_columns(postgres_sql_buf + off, sql_buffer_size - off, _k, _n, CON_TQUOTESZ(_h));
+	if (ret < 0) return -1;
+	off += ret;
+
+	ret = snprintf(postgres_sql_buf + off, sql_buffer_size - off, ") values (");
+	if (ret < 0 || ret >= (sql_buffer_size - off)) goto error;
+	off += ret;
+	ret = db_print_values(_h, postgres_sql_buf + off, sql_buffer_size - off, _v, _n, db_postgres_val2str);
+	if (ret < 0) return -1;
+	off += ret;
+
+	*(postgres_sql_buf + off++) = ')';
+
+	if (constraint) {
+		ret = snprintf(postgres_sql_buf + off, sql_buffer_size - off,
+			" on conflict on constraint %s do update set ", constraint);
+		if (ret < 0 || ret >= (sql_buffer_size - off)) goto error;
+		off += ret;
+
+		ret = db_print_set(_h, postgres_sql_buf + off, sql_buffer_size - off, _k, _v, _n, db_postgres_val2str);
+		if (ret < 0) {
+			LM_ERR("error building query\n");
+			return -1;
+		}
+		off += ret;
+		if (off + 1 > sql_buffer_size) goto error;
+		postgres_sql_buf[off] = '\0';
+	} else {
+		ret = snprintf(postgres_sql_buf + off, sql_buffer_size - off, " on conflict do nothing ");
+		if (ret < 0 || ret >= (sql_buffer_size - off)) goto error;
+		off += ret;
+	}
+
+	sql_str.s = postgres_sql_buf;
+	sql_str.len = off;
+	LM_DBG("query : %s\n", sql_str.s);
+	if (db_postgres_submit_query(_h, &sql_str) < 0) {
+		LM_ERR("error while submitting query\n");
+		return -2;
+	}
+	return 0;
+
+error:
+	LM_ERR("error while preparing insert_update operation\n");
+	return -1;
+}
 
 /*!
  * Update some rows in the specified table

+ 15 - 0
src/modules/db_postgres/km_dbase.h

@@ -35,6 +35,16 @@
 #include "../../lib/srdb1/db_op.h"
 #include "../../lib/srdb1/db_val.h"
 
+/*
+ * Storage for all the unique constraints found by insert_update
+ */
+typedef struct db_pg_constraint_list {
+	struct db_pg_constraint_list* next;
+	struct db_pg_constraint_list* prev;
+	str database;
+	str table;
+	str unique;
+} pg_constraint_t;
 
 /*
  * Initialize database connection
@@ -91,6 +101,11 @@ int db_postgres_raw_query(const db1_con_t* _h, const str* _s, db1_res_t** _r);
 int db_postgres_insert(const db1_con_t* _h, const db_key_t* _k, const db_val_t* _v,
 		const int _n);
 
+/*
+ * Insert and update ON CONFLICT
+ */
+int db_postgres_insert_update(const db1_con_t* _h, const db_key_t* _k, const db_val_t* _v,
+		const int _n);
 
 /*
  * Delete a row from table

+ 4 - 0
src/modules/db_postgres/km_pg_con.c

@@ -116,6 +116,10 @@ struct pg_con* db_postgres_new_connection(struct db_id* id)
 		goto err;
 	}
 
+	if (PQserverVersion(ptr->con) <  90500) {
+		LM_WARN("server version < 9.5 does not support insert_update\n");
+	}
+
 	ptr->connected = 1;
 	ptr->timestamp = time(0);
 	ptr->id = id;

+ 1 - 0
src/modules/db_postgres/km_res.c

@@ -149,6 +149,7 @@ int db_postgres_get_columns(const db1_con_t* _h, db1_res_t* _r)
 			case BOOLOID:
 			case CHAROID:
 			case VARCHAROID:
+			case NAMEOID:
 			case BPCHAROID:
 				LM_DBG("use DB1_STRING result type\n");
 				RES_TYPES(_r)[col] = DB1_STRING;

+ 4 - 0
src/modules/db_postgres/pg_mod.c

@@ -534,6 +534,10 @@ int pg_test(void)
 
 int mod_register(char *path, int *dlflags, void *p1, void *p2)
 {
+	if(!pg_alloc_buffer()) {
+		LM_ERR("failed too allocate buffer");
+		return -1;
+	}
 	if(db_api_init()<0)
 		return -1;
 	return 0;