Ver código fonte

db_postgres: implemented custom REPLACE command for DB API v1

- so far it implements replace as update, if affected rows == 0, then
  insert
- it uses locks to ensure there is no race between update and insert
  commands
- the lock to be used is selected based on the values for the update
  key, this ensuring that operations over the same record are done under
  mutex
- if number of colums to be used for update key is 0, then a straight
  insert without locking is done
Daniel-Constantin Mierla 13 anos atrás
pai
commit
5db7f650ad

+ 1 - 0
modules/db_postgres/km_db_postgres.c

@@ -93,6 +93,7 @@ int db_postgres_bind_api(db_func_t *dbb)
 	dbb->insert           = db_postgres_insert;
 	dbb->delete           = db_postgres_delete; 
 	dbb->update           = db_postgres_update;
+	dbb->replace          = db_postgres_replace;
 	dbb->affected_rows    = db_postgres_affected_rows;
 
 	return 0;

+ 4 - 0
modules/db_postgres/km_db_postgres.h

@@ -36,4 +36,8 @@ int db_postgres_bind_api(db_func_t *dbb);
 
 int km_postgres_mod_init(void);
 
+int pg_init_lock_set(int sz);
+
+void pg_destroy_lock_set(void);
+
 #endif /* _KM_DB_POSTGRES_H */

+ 120 - 0
modules/db_postgres/km_dbase.c

@@ -75,12 +75,50 @@
 #include "../../lib/srdb1/db.h"
 #include "../../lib/srdb1/db_ut.h"
 #include "../../lib/srdb1/db_query.h"
+#include "../../locking.h"
+#include "../../hashes.h"
 #include "km_dbase.h"
 #include "km_pg_con.h"
 #include "km_val.h"
 #include "km_res.h"
 #include "pg_mod.h"
 
+static gen_lock_set_t *_pg_lock_set = NULL;
+static unsigned int _pg_lock_size = 0;
+
+/*!
+ * \brief init lock set used to implement SQL REPLACE via UPDATE/INSERT
+ * \param sz power of two to compute the lock set size 
+ * \return 0 on success, -1 on error
+ */
+int pg_init_lock_set(int sz)
+{
+	if(sz>0 && sz<=10)
+	{
+		_pg_lock_size = 1<<sz;
+	} else {
+		_pg_lock_size = 1<<4;
+	}
+	_pg_lock_set = lock_set_alloc(_pg_lock_size);
+	if(_pg_lock_set==NULL || lock_set_init(_pg_lock_set)==NULL)
+	{
+		LM_ERR("cannot initiate lock set\n");
+		return -1;
+	}
+	return 0;
+}
+
+void pg_destroy_lock_set(void)
+{
+	if(_pg_lock_set!=NULL)
+	{
+		lock_set_destroy(_pg_lock_set);
+		lock_set_dealloc(_pg_lock_set);
+		_pg_lock_set = NULL;
+		_pg_lock_size = 0;
+	}
+}
+
 static void db_postgres_free_query(const db1_con_t* _con);
 
 
@@ -587,3 +625,85 @@ int db_postgres_use_table(db1_con_t* _con, const str* _t)
 {
 	return db_use_table(_con, _t);
 }
+
+
+/*!
+ * \brief SQL REPLACE implementation
+ * \param _h structure representing database connection
+ * \param _k key names
+ * \param _v values of the keys
+ * \param _n number of key=value pairs
+ * \param _un number of keys to build the unique key, starting from first
+ * \param _m mode - first update, then insert, or first insert, then update
+ * \return 0 on success, negative on failure
+ */
+int db_postgres_replace(const db1_con_t* _h, const db_key_t* _k,
+		const db_val_t* _v, const int _n, const int _un, const int _m)
+{
+	unsigned int pos = 0;
+	int i;
+
+	if(_un > _n)
+	{
+		LM_ERR("number of columns for unique key is too high\n");
+		return -1;
+	}
+
+	if(_un > 0)
+	{
+		for(i=0; i<_un; i++)
+		{
+			if(!VAL_NULL(&_v[i]))
+			{
+				switch(VAL_TYPE(&_v[i]))
+				{
+					case DB1_INT:
+						pos += VAL_UINT(&_v[i]);
+						break;
+					case DB1_STR:
+						pos += get_hash1_raw((VAL_STR(&_v[i])).s,
+									(VAL_STR(&_v[i])).len);
+						break;
+					case DB1_STRING:
+						pos += get_hash1_raw(VAL_STRING(&_v[i]),
+									strlen(VAL_STRING(&_v[i])));
+						break;
+					default:
+						break;
+				}
+			}
+		}
+		pos &= (_pg_lock_size-1);
+		lock_set_get(_pg_lock_set, pos);
+		if(db_postgres_update(_h, _k, 0, _v, _k + _un,
+						_v + _un, _un, _n -_un)< 0)
+		{
+			LM_ERR("update failed\n");
+			lock_set_release(_pg_lock_set, pos);
+			return -1;
+		}
+
+		if (db_postgres_affected_rows(_h) <= 0)
+		{
+			if(db_postgres_insert(_h, _k, _v, _n)< 0)
+			{
+				LM_ERR("insert failed\n");
+				lock_set_release(_pg_lock_set, pos);
+				return -1;
+			}
+			LM_DBG("inserted new record in database table\n");
+		} else {
+			LM_DBG("updated record in database table\n");
+		}
+		lock_set_release(_pg_lock_set, pos);
+	} else {
+		if(db_postgres_insert(_h, _k, _v, _n)< 0)
+		{
+			LM_ERR("direct insert failed\n");
+			return -1;
+		}
+		LM_DBG("directly inserted new record in database table\n");
+	}
+	return 0;
+}
+

+ 5 - 0
modules/db_postgres/km_dbase.h

@@ -113,5 +113,10 @@ int db_postgres_affected_rows(const db1_con_t* _h);
  */
 int db_postgres_use_table(db1_con_t* _h, const str* _t);
 
+/*
+ * Replace a row in table (via update/insert)
+ */
+int db_postgres_replace(const db1_con_t* _h, const db_key_t* _k,
+		const db_val_t* _v, const int _n, const int _un, const int _m);
 
 #endif /* KM_DBASE_H */

+ 5 - 0
modules/db_postgres/pg_mod.c

@@ -59,6 +59,8 @@ int pg_connect_timeout = 0;  /* Default is unlimited */
 int pg_retries = 2;  /* How many times should the module try re-execute failed commands.
 					  * 0 disables reconnecting */
 
+int pg_lockset = 4;
+
 /*
  * Postgres module interface
  */
@@ -88,6 +90,7 @@ static cmd_export_t cmds[] = {
  */
 static param_export_t params[] = {
 	{"retries",         PARAM_INT, &pg_retries },
+	{"lockset",         PARAM_INT, &pg_lockset },
 	{0, 0, 0}
 };
 
@@ -547,6 +550,8 @@ static int pg_mod_init(void)
 	}
 	return -1;
 #endif /* PG_TEST */
+	if(pg_init_lock_set(pg_lockset)<0)
+		return -1;
 	return km_postgres_mod_init();
 }