|
@@ -427,36 +427,39 @@ dbcassa_column_p cassa_search_col(dbcassa_table_p tbc, db_key_t col_name)
|
|
|
return 0;
|
|
|
}
|
|
|
|
|
|
-typedef std::auto_ptr<std::vector<oac::ColumnOrSuperColumn> > ColumnVecPtr;
|
|
|
+typedef std::vector<oac::ColumnOrSuperColumn> ColumnVec;
|
|
|
+typedef std::auto_ptr<ColumnVec> ColumnVecPtr;
|
|
|
|
|
|
ColumnVecPtr cassa_translate_query(const db1_con_t* _h, const db_key_t* _k,
|
|
|
- const db_val_t* _v, const db_key_t* _c, int _n, int _nc)
|
|
|
+ const db_val_t* _v, const db_key_t* _c, int _n, int _nc, int* ret_rows_no)
|
|
|
{
|
|
|
char row_key[cassa_max_key_len];
|
|
|
char sec_key[cassa_max_key_len];
|
|
|
- int key_len, seckey_len = 0;
|
|
|
+ int key_len=0, seckey_len = 0;
|
|
|
int no_kc, no_sec_kc;
|
|
|
dbcassa_table_p tbc;
|
|
|
|
|
|
/** Lock table schema and construct primary and secondary key **/
|
|
|
- tbc = dbcassa_db_get_table(&CON_CASSA(_h)->db_name, CON_TABLE(_h));
|
|
|
- if(!tbc) {
|
|
|
- LM_ERR("table %.*s does not exist!\n", CON_TABLE(_h)->len, CON_TABLE(_h)->s);
|
|
|
- return ColumnVecPtr(NULL);
|
|
|
- }
|
|
|
- cassa_constr_key(_k, _v, _n, tbc->key_len, tbc->key, &no_kc, row_key);
|
|
|
+ if(_k) {
|
|
|
+ tbc = dbcassa_db_get_table(&CON_CASSA(_h)->db_name, CON_TABLE(_h));
|
|
|
+ if(!tbc) {
|
|
|
+ LM_ERR("table %.*s does not exist!\n", CON_TABLE(_h)->len, CON_TABLE(_h)->s);
|
|
|
+ return ColumnVecPtr(NULL);
|
|
|
+ }
|
|
|
+ cassa_constr_key(_k, _v, _n, tbc->key_len, tbc->key, &no_kc, row_key);
|
|
|
|
|
|
- if(no_kc != tbc->key_len) {/* was not able to construct the whole key */
|
|
|
- LM_ERR("Query not supported - key not provided\n");
|
|
|
- dbcassa_lock_release(tbc);
|
|
|
- return ColumnVecPtr(NULL);
|
|
|
- }
|
|
|
- key_len = tbc->key_len;
|
|
|
+ if(no_kc != tbc->key_len) {/* was not able to construct the whole key */
|
|
|
+ LM_ERR("Query not supported - key not provided\n");
|
|
|
+ dbcassa_lock_release(tbc);
|
|
|
+ return ColumnVecPtr(NULL);
|
|
|
+ }
|
|
|
+ key_len = tbc->key_len;
|
|
|
|
|
|
- cassa_constr_key(_k, _v, _n, tbc->seckey_len, tbc->sec_key, &no_sec_kc, sec_key);
|
|
|
- seckey_len = tbc->seckey_len;
|
|
|
+ cassa_constr_key(_k, _v, _n, tbc->seckey_len, tbc->sec_key, &no_sec_kc, sec_key);
|
|
|
+ seckey_len = tbc->seckey_len;
|
|
|
|
|
|
- dbcassa_lock_release(tbc);
|
|
|
+ dbcassa_lock_release(tbc);
|
|
|
+ }
|
|
|
|
|
|
try {
|
|
|
oac::SlicePredicate sp;
|
|
@@ -514,7 +517,42 @@ ColumnVecPtr cassa_translate_query(const db1_con_t* _h, const db_key_t* _k,
|
|
|
do {
|
|
|
if(CON_CASSA(_h)->con) {
|
|
|
try {
|
|
|
- CON_CASSA(_h)->con->get_slice(*cassa_result, row_key, cparent, sp, oac::ConsistencyLevel::ONE);
|
|
|
+
|
|
|
+ if(_k) {
|
|
|
+ CON_CASSA(_h)->con->get_slice(*cassa_result, row_key, cparent, sp, oac::ConsistencyLevel::ONE);
|
|
|
+ *ret_rows_no = 1;
|
|
|
+ } else {
|
|
|
+ oac::KeyRange keyRange;
|
|
|
+ keyRange.start_key = "";
|
|
|
+ keyRange.start_key = "";
|
|
|
+ std::vector<oac::KeySlice> key_slice_vect;
|
|
|
+ keyRange.__isset.start_key = 1;
|
|
|
+ keyRange.__isset.end_key = 1;
|
|
|
+ ColumnVec::iterator it = cassa_result->begin();
|
|
|
+
|
|
|
+ /* get in a loop 100 records at a time */
|
|
|
+ int rows_no =0;
|
|
|
+ while(1) {
|
|
|
+ CON_CASSA(_h)->con->get_range_slices(key_slice_vect, cparent, sp, keyRange, oac::ConsistencyLevel::ONE);
|
|
|
+ /* construct cassa_result */
|
|
|
+ LM_DBG("Retuned %d key slices\n", key_slice_vect.size());
|
|
|
+ for(unsigned int i = 0; i< key_slice_vect.size(); i++) {
|
|
|
+ if(key_slice_vect[i].columns.size()==0) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ cassa_result->insert(it, key_slice_vect[i].columns.begin(), key_slice_vect[i].columns.end());
|
|
|
+ it = cassa_result->begin();
|
|
|
+ row_slices[rows_no][0] = cassa_result->size();
|
|
|
+ row_slices[rows_no][1] = 0;
|
|
|
+ rows_no++;
|
|
|
+ }
|
|
|
+ if(key_slice_vect.size() < (unsigned int)keyRange.count)
|
|
|
+ break;
|
|
|
+ }
|
|
|
+
|
|
|
+ *ret_rows_no = rows_no;
|
|
|
+ }
|
|
|
+
|
|
|
return cassa_result;
|
|
|
} catch (const att::TTransportException &tx) {
|
|
|
LM_ERR("Failed to query: %s\n", tx.what());
|
|
@@ -571,7 +609,7 @@ int db_cassa_query(const db1_con_t* _h, const db_key_t* _k, const db_op_t* _op,
|
|
|
|
|
|
/** Construct and send the query to Cassandra Cluster **/
|
|
|
|
|
|
- cassa_result = cassa_translate_query(_h, _k, _v, _c, _n, _nc);
|
|
|
+ cassa_result = cassa_translate_query(_h, _k, _v, _c, _n, _nc, &rows_no);
|
|
|
|
|
|
if(cassa_result.get() == NULL) {
|
|
|
LM_ERR("Failed to query Cassandra cluster\n");
|
|
@@ -626,6 +664,7 @@ int db_cassa_query(const db1_con_t* _h, const db_key_t* _k, const db_op_t* _op,
|
|
|
LM_DBG("RES_NAMES(%p)[%d]=[%.*s]\n", RES_NAMES(db_res)[col], col,
|
|
|
RES_NAMES(db_res)[col]->len, RES_NAMES(db_res)[col]->s);
|
|
|
}
|
|
|
+ /* TODO if all columns asked - take from table schema */
|
|
|
seckey_len = tbc->seckey_len;
|
|
|
dbcassa_lock_release(tbc);
|
|
|
|
|
@@ -636,16 +675,11 @@ int db_cassa_query(const db1_con_t* _h, const db_key_t* _k, const db_op_t* _op,
|
|
|
}
|
|
|
|
|
|
/* Initialize the row_slices vector for the case with one column and no secondary key */
|
|
|
- row_slices[0][0]= cassa_result->size();
|
|
|
- row_slices[0][1]= 0;
|
|
|
-
|
|
|
- if(seckey_len) { /* if the table has a secondary key defined */
|
|
|
-/* if the secondary key was computed */
|
|
|
-/* if (no_sec_kc == seckey_len) {
|
|
|
- rows_no = 1;
|
|
|
- row_slices[0][1] = strlen(sec_key) + 1;
|
|
|
- }
|
|
|
- else */ {
|
|
|
+ if(rows_no == 1) {
|
|
|
+ row_slices[0][0]= cassa_result->size();
|
|
|
+ row_slices[0][1]= 0;
|
|
|
+
|
|
|
+ if(seckey_len) { /* if the table has a secondary key defined */
|
|
|
/* pass through the result once to see how many rows there are */
|
|
|
rows_no = cassa_result_separate_rows(*cassa_result);
|
|
|
if(rows_no < 0) {
|
|
@@ -654,8 +688,6 @@ int db_cassa_query(const db1_con_t* _h, const db_key_t* _k, const db_op_t* _op,
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
- else
|
|
|
- rows_no = 1;
|
|
|
|
|
|
RES_ROW_N(db_res) = rows_no;
|
|
|
|