Forráskód Böngészése

modules/db_cassandra: Added raw query support

The raw queries can be performed through avpops module and have to have
the CQL syntax.
Boudewyn Ligthart 13 éve
szülő
commit
18c5f3fec6

+ 18 - 0
modules/db_cassandra/README

@@ -4,6 +4,10 @@ Anca Vamanu
 
    <[email protected]>
 
+Boudewyn Ligthart
+
+   <[email protected]>
+
 Edited by
 
 Anca Vamanu
@@ -110,6 +114,20 @@ default_validation_class='UTF8Type' and key_validation_class='UTF8Type';
    let Cassandra take care of this with its own mechanism (by setting the
    TTL for columns).
 
+   The module supports raw queries. However these queries must follow the
+   CQL (Cassandra Query Language) syntax. The queries can be issued in the
+   script by means of the AVPOPS module. Keep in mind that when passing
+   back the results from the database only the first row is used to set
+   the AVP variables. (default AVPOPS behaviour) The script lines below
+   can be used as an example for issuing the query towards an cassandra
+   instance.(This example will work once the column family `location` is
+   configured correctly in the cassandra keyspace)
+   ...
+   $var(dballowed)="select * from location where key = 'userx' limit 1;";
+   avp_db_query("$var(dballowed)");
+   xlog("L_INFO","Got result here: [$avp(i:1)] [$avp(i:2)] [$avp(i:3)].\n");
+   ...
+
 2. Dependencies
 
    2.1. SIP Router Modules

+ 1 - 0
modules/db_cassandra/db_cassandra.c

@@ -140,6 +140,7 @@ int db_cassa_bind_api(db_func_t *dbb)
 	dbb->insert_update    = db_cassa_insert;
 	dbb->delete           = db_cassa_delete;
 	dbb->update           = db_cassa_update;
+	dbb->raw_query        = db_cassa_raw_query;
 
 	return 0;
 }

+ 199 - 0
modules/db_cassandra/dbcassa_base.cpp

@@ -24,6 +24,7 @@
  * History:
  * --------
  * 2012-01  first version (Anca Vamanu)
+ * 2012-09  Added support for CQL queries (Boudewyn Ligthart)
  */
 
 #include <stdio.h>
@@ -576,10 +577,208 @@ ColumnVecPtr cassa_translate_query(const db1_con_t* _h, const db_key_t* _k,
 }
 
 
+/** 
+ *  This function check the CQLresult of the CQL query and   
+ *  adds the columns to the returning result structure. 
+ *
+ * \param _cql_res  handle for the CQLResult
+ * \param _r result set for storage
+ * \return zero on success, negative value on failure
+ */
+int cql_get_columns(oac::CqlResult& _cql_res, db1_res_t* _r)
+{
+	std::vector<oac::CqlRow>  res_cql_rows = _cql_res.rows;
+	int rows_no = res_cql_rows.size();
+	int cols_no = 0;
+
+	LM_DBG("cqlrow Vector size =%d\n", rows_no);
+	
+	if (rows_no > 0) {
+		cols_no = res_cql_rows[0].columns.size();
+		LM_DBG("There are %d columns available, this should be the case for all %d rows (consider cql).\n", cols_no, rows_no);
+	} else {
+		LM_DBG("Got 0 rows. There is no result from the query.\n");
+		return 0;
+	}
+
+	RES_COL_N(_r) = cols_no;
+	if (!RES_COL_N(_r)) {
+		LM_ERR("no columns returned from the query\n");
+		return -2;
+	} else {
+		LM_DBG("%d columns returned from the query\n", RES_COL_N(_r));
+	}
+
+	if (db_allocate_columns(_r, RES_COL_N(_r)) != 0) {
+		LM_ERR("Could not allocate columns\n");
+		return -3;
+	}
+
+	/* For fields we will use the columns inside the first columns */
+
+	for(int col = 0; col < RES_COL_N(_r); col++) {
+		RES_NAMES(_r)[col] = (str*)pkg_malloc(sizeof(str));
+		if (! RES_NAMES(_r)[col]) {
+			LM_ERR("no private memory left\n");
+			RES_COL_N(_r) = col;
+			db_free_columns(_r);
+			return -4;
+		}
+		LM_DBG("Allocated %lu bytes for RES_NAMES[%d] at %p\n",
+			(unsigned long)sizeof(str), col, RES_NAMES(_r)[col]);
+
+		/* The pointer that is here returned is part of the result structure. */
+		RES_NAMES(_r)[col]->s = (char*) res_cql_rows[0].columns[col].name.c_str();
+		RES_NAMES(_r)[col]->len = strlen(RES_NAMES(_r)[col]->s);
+		RES_TYPES(_r)[col] = DB1_STR;
+
+		LM_DBG("RES_NAMES(%p)[%d]=[%.*s]\n", RES_NAMES(_r)[col], col,
+			RES_NAMES(_r)[col]->len, RES_NAMES(_r)[col]->s);
+	}
+	return 0;
+}
+
+
+
+/**
+ *  This function convert the rows returned in CQL query 
+ *  and adds the values to the returning result structure.
+ *
+ * Handle CQLresult
+ * \param _cql_res  handle for the CQLResult
+ * \param _r result set for storage
+ * \return zero on success, negative value on failure
+ */
+int cql_convert_row(oac::CqlResult& _cql_res, db1_res_t* _r)
+{
+	std::vector<oac::CqlRow>  res_cql_rows = _cql_res.rows;
+	int rows_no = res_cql_rows.size();
+	int cols_no = res_cql_rows[0].columns.size();
+	str col_val;
+
+	RES_ROW_N(_r) = rows_no;
+
+	if (db_allocate_rows(_r) < 0) {
+		LM_ERR("Could not allocate rows.\n");
+		return -1; 
+	}
+
+	for(int ri=0; ri < rows_no; ri++) {
+		if (db_allocate_row(_r, &(RES_ROWS(_r)[ri])) != 0) {
+			LM_ERR("Could not allocate row.\n");
+			return -2; 
+		}
+
+		/* complete the row with the columns */
+		for(int col = 0; col< cols_no; col++) {
+			RES_ROWS(_r)[ri].values[col].type = DB1_STR;
+
+			col_val.s = (char*)res_cql_rows[ri].columns[col].value.c_str();
+			col_val.len = strlen(col_val.s);
+			pkg_str_dup(&RES_ROWS(_r)[ri].values[col].val.str_val, &col_val);
+			RES_ROWS(_r)[ri].values[col].free  = 1;
+			RES_ROWS(_r)[ri].values[col].nul  = 0;
+
+			LM_DBG("Field index %d. %s = %s.\n", col,
+				res_cql_rows[ri].columns[col].name.c_str(),
+				res_cql_rows[ri].columns[col].value.c_str());
+		}
+	}
+	return 0;
+}
+
+
 /*
  *	The functions for the DB Operations: query, delete, update.
  * */
 
+/**
+ * Execute a raw SQL query.
+ * \param _h handle for the database
+ * \param _s raw query string
+ * \param _r result set for storage
+ * \return zero on success, negative value on failure
+ */
+int db_cassa_raw_query(const db1_con_t* _h, const str* _s, db1_res_t** _r)
+{
+	db1_res_t* db_res = 0;
+
+	if (!_h || !CON_TABLE(_h) || !_r) {
+		LM_ERR("Invalid parameter value\n");
+		return -1;
+	}
+	LM_DBG("query table=%s\n", _h->table->s);
+	LM_DBG("CQL=%s\n", _s->s);
+
+	std::string cql_query(_s->s);
+
+	oac::CqlResult cassa_cql_res;
+
+	try {
+		CON_CASSA(_h)->con->execute_cql_query(cassa_cql_res, cql_query , oac::Compression::NONE);
+	} catch (const oac::InvalidRequestException &irx) {
+		LM_ERR("Invalid Request caused error details: %s.\n", irx.why.c_str());
+	} catch (const at::TException &tx) {
+		LM_ERR("T Exception %s\n", tx.what());
+	} catch (const std::exception &ex) {
+		LM_ERR("Failed: %s\n", ex.what());
+	} catch (...) {
+		LM_ERR("Failed to open connection to Cassandra cluster\n");
+	}
+
+	if (!cassa_cql_res.__isset.rows) {
+		LM_ERR("The resultype rows was not set, no point trying to parse result.\n");
+		return -1;
+	}
+
+	/* TODO Handle the other types */
+	switch(cassa_cql_res.type) {
+		case 1:  LM_DBG("Result set is an ROW Type.\n");
+			break;
+		case 2: LM_DBG("Result set is an VOID Type.\n");
+			break;
+		case 3: LM_DBG("Result set is an INT Type.\n");
+			break;
+	}
+
+	std::vector<oac::CqlRow>  res_cql_rows = cassa_cql_res.rows;
+
+	db_res = db_new_result();
+	if (!db_res) {
+		LM_ERR("no memory left\n");
+		goto error;
+	}
+
+	if(res_cql_rows.size() == 0) {
+		LM_DBG("The query returned no result\n");
+		RES_ROW_N(db_res) = 0;
+		RES_COL_N(db_res)= 0;
+		*_r = db_res;
+		return 0;
+	}
+
+	if (cql_get_columns(cassa_cql_res, db_res) < 0) {
+		LM_ERR("Error getting column names.");
+		goto error;
+	}
+
+	if (cql_convert_row(cassa_cql_res, db_res) < 0) {
+		LM_ERR("Error converting rows");
+		goto error;
+	}
+
+	*_r = db_res;
+	LM_DBG("Exited with success\n");
+	return 0;
+
+error:
+	if(db_res)
+		db_free_result(db_res);
+	return -1;
+}
+
+
+
 /*
  * Query table for specified rows
  * _h: structure representing database connection

+ 2 - 0
modules/db_cassandra/dbcassa_base.h

@@ -93,6 +93,8 @@ int db_cassa_update(const db1_con_t* _h, const db_key_t* _k, const db_op_t* _o,
 
 int db_cassa_free_result(db1_con_t* _h, db1_res_t* _r);
 
+int db_cassa_raw_query(const db1_con_t* _h, const str* _s, db1_res_t** _r);
+
 #ifdef __cplusplus
 }
 #endif

+ 5 - 0
modules/db_cassandra/doc/db_cassandra.xml

@@ -18,6 +18,11 @@
 				<surname>Vamanu</surname>
 				<email>[email protected]</email>
 			</author>
+			<author>
+				<firstname>Boudewyn</firstname>
+				<surname>Ligthart</surname>
+				<email>[email protected]</email>
+			</author>
 			<editor>
 				<firstname>Anca</firstname>
 				<surname>Vamanu</surname>

+ 20 - 0
modules/db_cassandra/doc/db_cassandra_admin.xml

@@ -77,6 +77,26 @@ default_validation_class='UTF8Type' and key_validation_class='UTF8Type';
 		Also, for deleting expired records, we let Cassandra take care of this with
 		its own mechanism (by setting the TTL for columns).
 	</para>
+
+	<para>
+ 		The module supports raw queries. However these queries must follow the
+		CQL (Cassandra Query Language) syntax. The queries can be issued 
+		in the script by means of the AVPOPS module. Keep in mind that when passing back
+		the results from the database only the first row is used to set the AVP variables.
+		(default AVPOPS behaviour)
+
+		The script lines below can be used as an example for issuing the query towards 
+		an cassandra instance.(This example will work once the column family `location` 
+		is configured correctly in the cassandra keyspace)
+	</para>
+<programlisting format="linespecific">
+   ...
+   $var(dballowed)="select * from location where key = 'userx' limit 1;";
+   avp_db_query("$var(dballowed)");
+   xlog("L_INFO","Got result here: [$avp(i:1)] [$avp(i:2)] [$avp(i:3)].\n");
+   ...
+</programlisting>
+
 	</section>
 
 	<section>