Browse Source

db_mongodb: proper result conversion to DB APIv1

Daniel-Constantin Mierla 11 years ago
parent
commit
666ffd55f4
2 changed files with 405 additions and 14 deletions
  1. 3 3
      modules/db_mongodb/db_mongodb_mod.c
  2. 402 11
      modules/db_mongodb/mongodb_dbase.c

+ 3 - 3
modules/db_mongodb/db_mongodb_mod.c

@@ -75,13 +75,13 @@ static int db_mongodb_bind_api(db_func_t *dbb)
 	dbb->init             = db_mongodb_init;
 	dbb->close            = db_mongodb_close;
 	dbb->query            = db_mongodb_query;
-	dbb->fetch_result     = db_mongodb_fetch_result;
-	dbb->raw_query        = db_mongodb_raw_query;
+	dbb->fetch_result     = 0; //db_mongodb_fetch_result;
+	dbb->raw_query        = 0; //db_mongodb_raw_query;
 	dbb->free_result      = db_mongodb_free_result;
 	dbb->insert           = db_mongodb_insert;
 	dbb->delete           = db_mongodb_delete; 
 	dbb->update           = db_mongodb_update;
-	dbb->replace          = db_mongodb_replace;
+	dbb->replace          = 0; //db_mongodb_replace;
 
 	return 0;
 }

+ 402 - 11
modules/db_mongodb/mongodb_dbase.c

@@ -27,6 +27,8 @@
 #include "mongodb_connection.h"
 #include "mongodb_dbase.h"
 
+#define DB_MONGODB_ROWS_STEP	1000
+
 typedef struct db_mongodb_result {
 	mongoc_collection_t *collection;   /*!< Collection link */
 	mongoc_cursor_t *cursor;           /*!< Cursor link */
@@ -34,6 +36,7 @@ typedef struct db_mongodb_result {
 	int idx;
 	bson_t *colsdoc;
 	int nrcols;
+	int maxrows;
 } db_mongodb_result_t;
 
 /*
@@ -148,6 +151,387 @@ error:
 	return -1;
 }
 
+/*!
+ * \brief Get and convert columns from a result
+ *
+ * Get and convert columns from a result, fills the result structure
+ * with data from the database.
+ * \param _h database connection
+ * \param _r database result set
+ * \return 0 on success, negative on failure
+ */
+int db_mongodb_get_columns(const db1_con_t* _h, db1_res_t* _r)
+{
+	int col;
+	db_mongodb_result_t *mgres;
+	bson_iter_t riter;
+	bson_iter_t citer;
+	bson_t *cdoc;
+	const char *colname;
+	bson_type_t coltype;
+
+	if ((!_h) || (!_r)) {
+		LM_ERR("invalid parameter\n");
+		return -1;
+	}
+
+	mgres = (db_mongodb_result_t*)RES_PTR(_r);
+	if(!mgres->rdoc) {
+		mgres->nrcols = 0;
+		return 0;
+	}
+	if(mgres->nrcols==0 || mgres->colsdoc==NULL) {
+		mgres->nrcols = (int)bson_count_keys(mgres->rdoc);
+		if(mgres->nrcols==0) {
+			LM_ERR("no keys in bson document\n");
+			return -1;
+		}
+		cdoc = mgres->rdoc;
+	} else {
+		cdoc = mgres->colsdoc;
+	}
+	RES_COL_N(_r) = mgres->nrcols;
+	if (!RES_COL_N(_r)) {
+		LM_ERR("no columns returned from the query\n");
+		return -2;
+	} else {
+		LM_DBG("%d columns returned from the query\n", RES_COL_N(_r));
+	}
+
+	if (db_allocate_columns(_r, RES_COL_N(_r)) != 0) {
+		RES_COL_N(_r) = 0;
+		LM_ERR("could not allocate columns\n");
+		return -3;
+	}
+
+	if (!bson_iter_init (&citer, cdoc)) {
+		LM_ERR("failed to initialize columns iterator\n");
+		return -3;
+	}
+	if(mgres->colsdoc) {
+		if (!bson_iter_init (&riter, mgres->rdoc)) {
+			LM_ERR("failed to initialize result iterator\n");
+			return -3;
+		}
+	}
+
+	col = 0;
+	while (bson_iter_next (&citer)) {
+		if(col >= RES_COL_N(_r)) {
+			LM_ERR("invalid number of columns (%d/%d)\n", col, RES_COL_N(_r));
+			return -4;
+		}
+
+		colname = bson_iter_key (&citer);
+		LM_DBG("Found a field[%d] named: %s\n", col, colname);
+		if(mgres->colsdoc) {
+			if(!bson_iter_find(&riter, colname)) {
+				LM_ERR("field [%s] not found in result iterator\n",
+						colname);
+				return -4;
+			}
+			coltype = bson_iter_type(&riter);
+		} else {
+			coltype = bson_iter_type(&citer);
+		}
+
+		RES_NAMES(_r)[col] = (str*)pkg_malloc(sizeof(str));
+		if (! RES_NAMES(_r)[col]) {
+			LM_ERR("no private memory left\n");
+			db_free_columns(_r);
+			return -4;
+		}
+		LM_DBG("allocate %lu bytes for RES_NAMES[%d] at %p\n",
+				(unsigned long)sizeof(str), col, RES_NAMES(_r)[col]);
+
+		/* pointer linked here is part of the result structure */
+		RES_NAMES(_r)[col]->s = (char*)colname;
+		RES_NAMES(_r)[col]->len = strlen(colname);
+
+		switch(coltype) {
+			case BSON_TYPE_BOOL:
+			case BSON_TYPE_INT32:
+			case BSON_TYPE_TIMESTAMP:
+				LM_DBG("use DB1_INT result type\n");
+				RES_TYPES(_r)[col] = DB1_INT;
+				break;
+
+			case BSON_TYPE_INT64:
+				LM_DBG("use DB1_BIGINT result type\n");
+				RES_TYPES(_r)[col] = DB1_BIGINT;
+				break;
+
+			case BSON_TYPE_DOUBLE:
+				LM_DBG("use DB1_DOUBLE result type\n");
+				RES_TYPES(_r)[col] = DB1_DOUBLE;
+				break;
+
+			case BSON_TYPE_DATE_TIME:
+				LM_DBG("use DB1_DATETIME result type\n");
+				RES_TYPES(_r)[col] = DB1_DATETIME;
+				break;
+
+			case BSON_TYPE_BINARY:
+				LM_DBG("use DB1_BLOB result type\n");
+				RES_TYPES(_r)[col] = DB1_BLOB;
+				break;
+
+			case BSON_TYPE_UTF8:
+				LM_DBG("use DB1_STRING result type\n");
+				RES_TYPES(_r)[col] = DB1_STRING;
+				break;
+
+#if 0
+			case BSON_TYPE_EOD:
+			case BSON_TYPE_DOCUMENT:
+			case BSON_TYPE_ARRAY:
+			case BSON_TYPE_UNDEFINED:
+			case BSON_TYPE_OID:
+			case BSON_TYPE_NULL:
+			case BSON_TYPE_REGEX:
+			case BSON_TYPE_DBPOINTER:
+			case BSON_TYPE_CODE:
+			case BSON_TYPE_SYMBOL:
+			case BSON_TYPE_CODEWSCOPE:
+			case BSON_TYPE_MAXKEY:
+			case BSON_TYPE_MINKEY:
+#endif
+
+			default:
+				LM_WARN("unhandled data type column (%.*s) type id (%d), "
+						"use DB1_STRING as default\n", RES_NAMES(_r)[col]->len,
+						RES_NAMES(_r)[col]->s, coltype);
+				RES_TYPES(_r)[col] = DB1_STRING;
+				break;
+		}
+
+		LM_DBG("RES_NAMES(%p)[%d]=[%.*s] (%d)\n", RES_NAMES(_r)[col], col,
+				RES_NAMES(_r)[col]->len, RES_NAMES(_r)[col]->s, coltype);
+		col++;
+	}
+	return 0;
+}
+
+
+/*!
+ * \brief Convert rows from mongodb to db API representation
+ * \param _h database connection
+ * \param _r database result set
+ * \return 0 on success, negative on failure
+ */
+static int db_mongdob_convert_bson(const db1_con_t* _h, db1_res_t* _r,
+		int _row, const bson_t *_rdoc)
+{
+	static str dummy_string = {"", 0};
+	int col;
+	db_mongodb_result_t *mgres;
+	const char *colname;
+	bson_type_t coltype;
+	bson_iter_t riter;
+	bson_iter_t citer;
+	bson_iter_t *piter;
+	db_val_t* dval;
+	uint32_t i32tmp;
+    bson_subtype_t subtype;
+	bson_t *cdoc;
+
+	mgres = (db_mongodb_result_t*)RES_PTR(_r);
+	if(mgres->nrcols==0) {
+		LM_ERR("no fields to convert\n");
+		return -1;
+	}
+	if(mgres->colsdoc==NULL) {
+		cdoc = (bson_t*)_rdoc;
+	} else {
+		cdoc = (bson_t*)mgres->colsdoc;
+	}
+
+	if (!bson_iter_init (&citer, cdoc)) {
+		LM_ERR("failed to initialize columns iterator\n");
+		return -3;
+	}
+	if(mgres->colsdoc) {
+		if (!bson_iter_init (&riter, _rdoc)) {
+			LM_ERR("failed to initialize result iterator\n");
+			return -3;
+		}
+	}
+	if (db_allocate_row(_r, &(RES_ROWS(_r)[_row])) != 0) {
+		LM_ERR("could not allocate row: %d\n", _row);
+		return -2;
+	}
+	col = 0;
+	while (bson_iter_next (&citer)) {
+		if(col >= RES_COL_N(_r)) {
+			LM_ERR("invalid number of columns (%d/%d)\n", col, RES_COL_N(_r));
+			return -4;
+		}
+
+		colname = bson_iter_key (&citer);
+		LM_DBG("looking for field[%d] named: %s\n", col, colname);
+		if(mgres->colsdoc) {
+			if(!bson_iter_find(&riter, colname)) {
+				LM_ERR("field [%s] not found in result iterator\n",
+						colname);
+				return -4;
+			}
+			piter = &riter;
+		} else {
+			piter = &citer;
+		}
+		coltype = bson_iter_type(piter);
+
+		dval = &(ROW_VALUES(&(RES_ROWS(_r)[_row]))[col]);
+		VAL_TYPE(dval) = RES_TYPES(_r)[col];
+
+		switch(coltype) {
+			case BSON_TYPE_BOOL:
+				VAL_INT(dval) = (int)bson_iter_bool (piter);
+				break;
+			case BSON_TYPE_INT32:
+				VAL_INT(dval) = bson_iter_int32 (piter);
+				break;
+			case BSON_TYPE_TIMESTAMP:
+				bson_iter_timestamp (piter,
+						(uint32_t*)&VAL_INT(dval), &i32tmp);
+				break;
+
+			case BSON_TYPE_INT64:
+				VAL_BIGINT(dval) = bson_iter_int64 (piter);
+				break;
+
+			case BSON_TYPE_DOUBLE:
+				VAL_DOUBLE(dval) = bson_iter_double (piter);
+				break;
+
+			case BSON_TYPE_DATE_TIME:
+				VAL_TIME(dval) = (time_t)(bson_iter_date_time (piter)/1000);
+				break;
+
+			case BSON_TYPE_BINARY:
+				bson_iter_binary (piter, &subtype,
+                  (uint32_t*)&VAL_BLOB(dval).len, (const uint8_t**)&VAL_BLOB(dval).s);
+				break;
+
+			case BSON_TYPE_UTF8:
+				VAL_STRING(dval) = (char*)bson_iter_utf8 (piter, &i32tmp);
+				break;
+
+			case BSON_TYPE_OID:
+				break;
+
+			case BSON_TYPE_NULL:
+				memset(dval, 0, sizeof(db_val_t));
+				/* Initialize the string pointers to a dummy empty
+				 * string so that we do not crash when the NULL flag
+				 * is set but the module does not check it properly
+				 */
+				VAL_STRING(dval) = dummy_string.s;
+				VAL_STR(dval) = dummy_string;
+				VAL_BLOB(dval) = dummy_string;
+				VAL_TYPE(dval) = RES_TYPES(_r)[col];
+				VAL_NULL(dval) = 1;
+				break;
+
+#if 0
+			case BSON_TYPE_EOD:
+			case BSON_TYPE_DOCUMENT:
+			case BSON_TYPE_ARRAY:
+			case BSON_TYPE_UNDEFINED:
+			case BSON_TYPE_REGEX:
+			case BSON_TYPE_DBPOINTER:
+			case BSON_TYPE_CODE:
+			case BSON_TYPE_SYMBOL:
+			case BSON_TYPE_CODEWSCOPE:
+			case BSON_TYPE_MAXKEY:
+			case BSON_TYPE_MINKEY:
+#endif
+
+			default:
+				LM_WARN("unhandled data type column (%.*s) type id (%d), "
+						"use DB1_STRING as default\n", RES_NAMES(_r)[col]->len,
+						RES_NAMES(_r)[col]->s, coltype);
+				RES_TYPES(_r)[col] = DB1_STRING;
+				break;
+		}
+
+		LM_DBG("RES_NAMES(%p)[%d]=[%.*s] (%d)\n", RES_NAMES(_r)[col], col,
+				RES_NAMES(_r)[col]->len, RES_NAMES(_r)[col]->s, coltype);
+		col++;
+	}
+	return 0;
+}
+
+/*!
+ * \brief Convert rows from mongodb to db API representation
+ * \param _h database connection
+ * \param _r database result set
+ * \return 0 on success, negative on failure
+ */
+static int db_mongdob_convert_result(const db1_con_t* _h, db1_res_t* _r)
+{
+	int row;
+	db_mongodb_result_t *mgres;
+	const bson_t *itdoc;
+	char *jstr;
+
+	if ((!_h) || (!_r)) {
+		LM_ERR("invalid parameter\n");
+		return -1;
+	}
+
+	mgres = (db_mongodb_result_t*)RES_PTR(_r);
+	if(!mgres->rdoc) {
+		mgres->nrcols = 0;
+		return 0;
+	}
+	if(mgres->nrcols==0) {
+		LM_DBG("no fields to return\n");
+		return 0;
+	}
+
+	if(!mongoc_cursor_more (mgres->cursor)) {
+		RES_ROW_N(_r) = 1;
+		mgres->maxrows = 1;
+	} else {
+		RES_ROW_N(_r) = DB_MONGODB_ROWS_STEP;
+		mgres->maxrows = DB_MONGODB_ROWS_STEP;
+	}
+
+	if (db_allocate_rows(_r) < 0) {
+		LM_ERR("could not allocate rows\n");
+		RES_ROW_N(_r) = 0;
+		return -2;
+	}
+
+	itdoc = mgres->rdoc;
+	row = 0;
+	do {
+		if(row >= RES_ROW_N(_r)) {
+			if (db_reallocate_rows(_r,
+						RES_ROW_N(_r)+DB_MONGODB_ROWS_STEP) < 0) {
+				LM_ERR("could not reallocate rows\n");
+				return -2;
+			}
+			mgres->maxrows = RES_ROW_N(_r);
+		}
+		if(is_printable(L_DBG)) {
+			jstr = bson_as_json (itdoc, NULL);
+			LM_DBG("selected document: %s\n", jstr);
+			bson_free (jstr);
+		}
+		if(db_mongdob_convert_bson(_h, _r, row, itdoc)) {
+			LM_ERR("failed to convert bson at pos %d\n", row);
+			return -1;
+		}
+		row++;
+	} while (mongoc_cursor_more (mgres->cursor)
+			&& mongoc_cursor_next (mgres->cursor, &itdoc));
+	RES_ROW_N(_r) = row;
+	LM_DBG("retrieved number of rows: %d\n", row);
+	return 0;
+}
+
 db1_res_t* db_mongodb_new_result(void)
 {
 	db1_res_t* obj;
@@ -171,7 +555,6 @@ static int db_mongodb_store_result(const db1_con_t* _h, db1_res_t** _r)
 	km_mongodb_con_t *mgcon;
 	db_mongodb_result_t *mgres;
 	const bson_t *itdoc;
-	char *jstr;
 
 	mgcon = MONGODB_CON(_h);
 	if(!_r) {
@@ -193,13 +576,21 @@ static int db_mongodb_store_result(const db1_con_t* _h, db1_res_t** _r)
 	mgcon->colsdoc = NULL;
 	mgres->nrcols = mgcon->nrcols;
 	mgcon->nrcols = 0;
-	while (mongoc_cursor_more (mgres->cursor)
-			&& mongoc_cursor_next (mgres->cursor, &itdoc)) {
-		if(is_printable(L_DBG)) {
-			jstr = bson_as_json (itdoc, NULL);
-			LM_DBG("selected document: %s\n", jstr);
-			bson_free (jstr);
-		}
+	if(!mongoc_cursor_more (mgres->cursor)
+			|| !mongoc_cursor_next (mgres->cursor, &itdoc)
+			|| !itdoc) {
+		LM_DBG("no result from mongodb\n");
+		return 0;
+	}
+	/* first document linked internally in result to get columns */
+	mgres->rdoc = (bson_t*)itdoc;
+	if(db_mongodb_get_columns(_h, *_r)<0) {
+		LM_ERR("failed to set the columns\n");
+		goto error;
+	}
+	if(db_mongdob_convert_result(_h, *_r)<0) {
+		LM_ERR("failed to set the rows in result\n");
+		goto error;
 	}
 	return 0;
 
@@ -247,7 +638,7 @@ int db_mongodb_free_result(db1_con_t* _h, db1_res_t* _r)
 		}
 		pkg_free(RES_PTR(_r));
 	}
-		db_free_result(_r);
+	db_free_result(_r);
 	return 0;
 }
 
@@ -336,7 +727,7 @@ int db_mongodb_query(const db1_con_t* _h, const db_key_t* _k, const db_op_t* _op
 			LM_ERR("cannot initialize columns bson document\n");
 			goto error;
 		}
-		for(i = 0; i < _n; i++) {
+		for(i = 0; i < _nc; i++) {
 			if(!bson_append_int32(mgcon->colsdoc, _c[i]->s, _c[i]->len, 1))
 			{
 				LM_ERR("failed to append int to columns bson %.*s = %d [%d]\n",
@@ -346,7 +737,7 @@ int db_mongodb_query(const db1_con_t* _h, const db_key_t* _k, const db_op_t* _op
 		}
 		if(is_printable(L_DBG)) {
 			jstr = bson_as_json (mgcon->colsdoc, NULL);
-			LM_DBG("query filter: %s\n", jstr);
+			LM_DBG("columns filter: %s\n", jstr);
 			bson_free (jstr);
 		}
 		mgcon->nrcols = _nc;